Debezium日常分享系列之:Debezium2.4版本之用于 MongoDB的Debezium 连接器
最佳答案 问答题库498位专家为你答疑解惑
Debezium日常分享系列之:Debezium2.4版本之用于 MongoDB的Debezium 连接器
- 一、综述
- 二、改变流
- 三、阅读偏好
- 四、MongoDB 连接器的工作原理
- 五、支持的 MongoDB 拓扑
- 六、所需的用户权限
- 七、逻辑连接器名称
- 八、执行快照
- 九、临时快照
- 十、增量快照
- 1.增量快照流程
- 2.Debezium 如何解决具有相同主键的记录之间的冲突
- 3.快照窗口
- 4.分片集群的增量快照
- 5.触发增量快照
- 6.使用Kafka信令通道触发增量快照
- 7.具有附加条件的临时增量快照
- 8.停止增量快照
- 9.使用Kafka信令通道停止增量快照
- 10.阻止快照
- 11.阻塞快照进程
- 12.配置快照
- 13.可能重复
- 十一、流变化
- 十二、原像支持
- 十三、主题名称
- 十四、分区
- 十五、交易元数据
- 十六、变更数据事件丰富
- 十七、数据变更事件
- 十八、更改事件键
- 十九、更改事件值
- 二十、创建事件
- 二十一、更新事件
- 二十二、删除事件
- 二十三、墓碑事件
- 二十四、设置mongodb
- 二十五、最佳 Oplog 配置
- 二十六、部署
- 二十七、MongoDB 连接器配置示例
- 二十八、连接器属性
- 二十九、Debezium 连接器 Kafka 信号配置属性
- 三十、监控
Debezium打通Mongodb数据库数据采集系列文章:
- Debezium系列之:打通Debezium对低版本MongoDB数据库4.0版本的数据采集技术
- Debezium系列之:Debezium2.X版本Mysql数据库、Sqlserver数据库、MongoDB数据库debezium connector最新完整的参数配置,并详细介绍参数含义
- Debezium系列之:安装jmx导出器监控debezium指标
- Debezium系列之:深入解读Debezium重要的jmx指标
- Debezium系列之:prometheus采集debezium的jmx数据,grafana通过dashboard展示debezium的jmx数据
更多debezium技术文章请阅读博主专栏:
- Debezium专栏
Debezium 的 MongoDB 连接器跟踪 MongoDB 副本集或 MongoDB 分片集群以获取数据库和集合中的文档更改,并将这些更改记录为 Kafka 主题中的事件。连接器自动处理分片集群中分片的添加或删除、每个副本集成员资格的更改、每个副本集中的选举以及等待通信问题的解决。
一、综述
MongoDB 的复制机制提供了冗余和高可用性,是在生产环境中运行 MongoDB 的首选方式。 MongoDB 连接器捕获副本集或分片集群中的更改。
MongoDB 副本集由一组服务器组成,这些服务器都具有相同数据的副本,并且复制可确保客户端对副本集主服务器上的文档所做的所有更改都正确应用于其他副本集的服务器(称为辅助服务器)。 MongoDB 复制的工作原理是让主数据库记录其 oplog(或操作日志)中的更改,然后每个辅助数据库读取主数据库的 oplog 并按顺序将所有操作应用到自己的文档中。将新服务器添加到副本集时,该服务器首先对主服务器上的所有数据库和集合执行快照,然后读取主服务器的 oplog 以应用自开始快照以来可能进行的所有更改。当这个新服务器赶上主服务器 oplog 的尾部时,它就成为辅助服务器(并且能够处理查询)。
二、改变流
尽管 Debezium MongoDB 连接器不会成为副本集的一部分,但它使用类似的复制机制来获取 oplog 数据。主要区别在于连接器不直接读取 oplog。相反,它将 oplog 数据的捕获和解码委托给 MongoDB 更改流功能。通过更改流,MongoDB 服务器将集合中发生的更改公开为事件流。 Debezium 连接器监视流,然后将更改传递给下游。连接器第一次检测到副本集时,它会检查 oplog 以获取最后记录的事务,然后执行主数据库和集合的快照。连接器完成数据复制后,它会从之前读取的 oplog 位置开始创建更改流。
当 MongoDB 连接器处理更改时,它会定期记录 oplog 流中事件起源的位置。当连接器停止时,它会记录它处理的最后一个 oplog 流位置,以便在重新启动后它可以从该位置恢复流式传输。换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,并且始终准确地从中断处继续,而不会丢失任何事件。当然,MongoDB oplog 通常有最大大小上限,因此如果连接器长时间停止,oplog 中的操作可能会在连接器有机会读取它们之前被清除。在这种情况下,重新启动后,连接器会检测到丢失的 oplog 操作,执行快照,然后继续流式传输更改。
MongoDB 连接器还能够很好地容忍副本集的成员资格和领导权的变化、分片集群中分片的添加或删除以及可能导致通信失败的网络问题。连接器始终使用副本集的主节点来流式传输更改,因此当副本集进行选举并且不同的节点成为主节点时,连接器将立即停止流式传输更改,连接到新的主节点,并开始使用新的主节点流式传输更改节点。同样,如果连接器无法与主副本集通信,它会尝试重新连接(使用指数退避,以免淹没网络或副本集)。重新建立连接后,连接器将继续传输其捕获的最后一个事件的更改。通过这种方式,连接器可以动态调整以适应副本集成员资格的变化,并自动处理通信中断。
其他资源:
- 复制机制
- 副本集
- 副本集选举
- 分片集群
- 分片添加
- 分片清除
- 改变流
三、阅读偏好
可以在连接器属性中指定连接的 MongoDB 读取首选项。用于设置读取首选项的方法取决于 MongoDB 拓扑和 mongodb.connection.mode。
副本集拓扑:
- 在 mongodb.connection.string 中设置读取首选项。
分片集群拓扑:
- 根据连接方式设置读取优先级,如下表:
表 1. 根据 mongodb.connection.mode 设置分片集群的读取首选项
在分片集群中,连接器首先启动与 mongodb.connection.string 中指定的 mongos 路由器的连接。对于该初始连接,无论连接模式如何,连接器都会遵循 mongodb.connection.string 中指定的读取首选项。当连接模式设置为replica_set时,连接器建立初始路由器连接后,会从路由器的config.shards中检索拓扑信息。然后,它使用检索到的分片地址连接到集群中的各个分片,构建使用 mongodb.connection.string.shard.params 中的连接参数的连接字符串。对于特定于分片的连接,连接器会忽略 mongodb.connection.string 中设置的读取首选项。
四、MongoDB 连接器的工作原理
连接器支持的 MongoDB 拓扑概述对于规划您的应用程序非常有用。
配置和部署 MongoDB 连接器时,它首先连接到种子地址处的 MongoDB 服务器,并确定有关每个可用副本集的详细信息。由于每个副本集都有自己独立的oplog,因此连接器将尝试为每个副本集使用单独的任务。连接器可以限制它将使用的最大任务数,如果没有足够的任务可用,连接器将为每个任务分配多个副本集,尽管该任务仍将为每个副本集使用单独的线程。
针对分片集群运行连接器时,请使用大于副本集数量的tasks.max 值。这将允许连接器为每个副本集创建一个任务,并让 Kafka Connect 协调、分配和管理所有可用工作进程中的任务。
五、支持的 MongoDB 拓扑
MongoDB 连接器支持以下 MongoDB 拓扑:
-
MongoDB 副本集
- Debezium MongoDB 连接器可以捕获单个 MongoDB 副本集的更改。生产副本集至少需要三个成员。
- 要将 MongoDB 连接器与副本集结合使用,必须将连接器配置中的mongodb.connection.string 属性的值设置为副本集连接字符串。当连接器准备好开始从 MongoDB 更改流捕获更改时,它会启动连接任务。然后,连接任务使用指定的连接字符串建立与可用副本集成员的连接。
MongoDB 分片集群
-
MongoDB 分片集群包括:
- 一个或多个分片,每个分片部署为一个副本集;
- 充当集群配置服务器的单独副本集
- 客户端连接的一个或多个路由器(也称为 mongos),并将请求路由到适当的分片
- 要将 MongoDB 连接器与分片集群结合使用,请在连接器配置中,将 mongodb.connection.string 属性的值设置为分片集群连接字符串。
mongodb.connection.string 属性替换了已删除的 mongodb.hosts 属性,该属性用于为早期版本的连接器提供配置服务器副本的主机地址。在当前版本中,使用 mongodb.connection.string 为连接器提供 MongoDB 路由器(也称为 mongos)的地址。
当连接器连接到分片集群时,它会发现有关代表集群中分片的每个副本集的信息。连接器使用单独的任务来捕获每个分片的更改。当在集群中添加或删除分片时,连接器会动态调整任务数量以补偿变化。
MongoDB 独立服务器:
- MongoDB 连接器无法监视独立 MongoDB 服务器的更改,因为独立服务器没有 oplog。如果独立服务器转换为具有一名成员的副本集,则连接器将起作用。
MongoDB 不建议在生产中运行独立服务器。
六、所需的用户权限
为了从 MongoDB 捕获数据,Debezium 以 MongoDB 用户身份连接到数据库。您为 Debezium 创建的 MongoDB 用户帐户需要特定的数据库权限才能从数据库中读取数据。连接器用户需要以下权限:
- 从数据库中读取。
- 运行 ping 命令。
连接器用户可能还需要以下权限:
- 从 config.shards 系统集合中读取。
数据库读取权限
连接器用户必须能够从所有数据库读取,或从特定数据库读取,具体取决于连接器的 capture.scope 属性的值。根据 capture.scope 设置,向用户分配以下权限之一:
- capture.scope 设置为部署:授予用户读取任何数据库的权限。
- capture.scope 设置为数据库:授予用户读取连接器的 capture.target 属性指定的数据库的权限。
使用 MongoDB ping 命令的权限
- 无论 capture.scope 设置如何,用户都需要权限才能运行 MongoDB ping 命令。
读取 config.shards 集合的权限
- 对于从分片 MongoDB 集群集群更改且 mongodb.connection.mode 属性设置为replica_set 的连接器,必须配置用户读取 config.shards 系统集合的权限。
七、逻辑连接器名称
连接器配置属性 topic.prefix 用作 MongoDB 副本集或分片集群的逻辑名称。连接器以多种方式使用逻辑名称:作为所有主题名称的前缀,以及在记录每个副本集的更改流位置时作为唯一标识符。
应该为每个 MongoDB 连接器指定一个唯一的逻辑名称,以有意义地描述源 MongoDB 系统。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。
八、执行快照
当 Debezium 任务开始使用副本集时,它使用连接器的逻辑名称和副本集名称来查找描述连接器先前停止读取更改的位置的偏移量。如果可以找到偏移量并且它仍然存在于 oplog 中,则任务立即从记录的偏移位置开始继续进行流式更改。
但是,如果未找到偏移量,或者 oplog 不再包含该位置,则任务必须首先通过执行快照来获取副本集内容的当前状态。该过程首先记录 oplog 的当前位置并将其记录为偏移量(以及表示快照已启动的标志)。然后,该任务继续复制每个集合,生成尽可能多的线程(最多为 snapshot.max.threads 配置属性的值)以并行执行此工作。连接器为其看到的每个文档记录一个单独的读取事件。每个读取事件都包含对象的标识符、对象的完整状态以及有关找到该对象的 MongoDB 副本集的源信息。源信息还包括一个标志,表示该事件是在快照期间生成的。
此快照将继续,直到复制了与连接器的过滤器匹配的所有集合。如果连接器在任务快照完成之前停止,则连接器重新启动后将再次开始快照。
当连接器执行任何副本集的快照时,尽量避免任务重新分配和重新配置。连接器生成日志消息来报告快照的进度。为了提供最大程度的控制,请为每个连接器运行单独的 Kafka Connect 集群。
九、临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在此初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流处理传入。
但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获收集数据的机制,Debezium 包含一个执行临时快照的选项。在 Debezium 环境中发生以下任何更改后,可能需要执行临时快照:
- 修改连接器配置以捕获一组不同的集合。
- Kafka 主题已删除,必须重建。
- 由于配置错误或其他一些问题,会发生数据损坏。
可以通过启动所谓的临时快照为之前捕获快照的集合重新运行快照。特别快照需要使用信令集合。可以通过向 Debezium 信令集合发送信号请求来启动临时快照。
当启动现有集合的临时快照时,连接器会将内容附加到该集合已存在的主题中。如果删除了先前存在的主题,并且启用了自动主题创建,Debezium 可以自动创建主题。
即席快照信号指定要包含在快照中的集合。快照可以捕获数据库的全部内容,也可以仅捕获数据库中集合的子集。
可以通过向信令集合发送执行快照消息来指定要捕获的集合。将执行快照信号的类型设置为增量或阻塞,并提供要包含在快照中的集合的名称,如下表所述:
表 2. 即席执行快照信号记录示例
触发临时增量快照
您可以通过将具有执行快照信号类型的条目添加到信令集合来启动临时增量快照。连接器处理消息后,开始快照操作。快照进程读取第一个和最后一个主键值,并将这些值用作每个集合的起点和终点。根据集合中的条目数和配置的块大小,Debezium 将集合划分为块,并继续对每个块进行快照,一次一个。
触发临时阻塞快照
您可以通过将具有执行快照信号类型的条目添加到信令集合来启动临时阻塞快照。连接器处理消息后,开始快照操作。连接器暂时停止流式传输,然后启动指定集合的快照,遵循初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。
十、增量快照
为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。增量快照依赖 Debezium 机制向 Debezium 连接器发送信号。增量快照基于DDD-3设计文档。
在增量快照中,Debezium 不像初始快照那样一次性捕获数据库的完整状态,而是以一系列可配置块的形式分阶段捕获每个集合。您可以指定希望快照捕获的集合以及每个块的大小。块大小确定快照在数据库上的每个提取操作期间收集的行数。增量快照的默认块大小为 1024 行。
随着增量快照的进行,Debezium 使用水印来跟踪其进度,维护其捕获的每个集合行的记录。与标准初始快照过程相比,这种分阶段捕获数据的方法具有以下优点:
- 您可以与流数据捕获并行运行增量快照,而不是推迟流数据直到快照完成。连接器在整个快照过程中持续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻塞另一个操作。
- 如果增量快照的进度中断,您可以恢复增量快照而不会丢失任何数据。进程恢复后,快照会从停止点开始,而不是从头开始重新捕获集合。
- 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置以将集合添加到其 collection.include.list 属性后重新运行快照。
1.增量快照流程
当您运行增量快照时,Debezium 按主键对每个集合进行排序,然后根据配置的块大小将集合拆分为块。逐块工作,然后捕获块中的每个集合行。对于它捕获的每一行,快照都会发出一个 READ 事件。该事件表示块快照开始时行的值。
随着快照的进行,其他进程可能会继续访问数据库,从而可能修改集合记录。为了反映此类更改,INSERT、UPDATE 或 DELETE 操作将照常提交到事务日志。同样,正在进行的 Debezium 流处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。
2.Debezium 如何解决具有相同主键的记录之间的冲突
在某些情况下,流处理发出的 UPDATE 或 DELETE 事件的接收顺序不正确。也就是说,在快照捕获包含该行的 READ 事件的块之前,流处理可能会发出一个修改集合行的事件。当快照最终发出该行相应的 READ 事件时,其值已被取代。为了确保按正确的逻辑顺序处理不按顺序到达的增量快照事件,Debezium 采用缓冲方案来解决冲突。仅当快照事件和流式事件之间的冲突得到解决后,Debezium 才会向 Kafka 发送事件记录。
3.快照窗口
为了帮助解决迟到的 READ 事件和修改同一集合行的流式事件之间的冲突,Debezium 采用了所谓的快照窗口。快照窗口划定了增量快照捕获指定收集块的数据的时间间隔。在块的快照窗口打开之前,Debezium 会遵循其通常的行为,并将事件从事务日志直接向下游发送到目标 Kafka 主题。但从特定块的快照打开的那一刻起,直到其关闭,Debezium 都会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且更新事务日志以反映每次提交,Debezium 会针对每次更改发出 UPDATE 或 DELETE 操作。
当快照窗口打开时,Debezium 开始处理快照块,它将快照记录传送到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果未找到匹配项,则流式事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配,它会丢弃缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代静态快照事件。块的快照窗口关闭后,缓冲区仅包含不存在相关事务日志事件的 READ 事件。 Debezium 将这些剩余的 READ 事件发送到集合的 Kafka 主题。
连接器对每个快照块重复该过程。
增量快照需要主键稳定有序。但是,字符串可能无法保证稳定的排序,因为编码和特殊字符可能会导致意外行为(Mongo 排序字符串)。执行增量快照时请考虑使用其他类型的主键。
4.分片集群的增量快照
要将增量快照与分片 MongoDB 集群一起使用,您必须为以下属性设置特定值:
- 将 mongodb.connection.mode 设置为分片。
- 将incremental.snapshot.chunk.size设置为足够高的值,以补偿变更流管道增加的复杂性。
5.触发增量快照
目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令集合。
您可以使用 MongoDB insert() 方法向信令集合提交信号。
Debezium 检测到信号集合中的变化后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的集合,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值增量。
要指定要包含在快照中的集合,请提供一个列出集合的数据集合数组或用于匹配集合的正则表达式数组,例如,{“data-collections”: [“public.Collection1”, “public.Collection2”]}
增量快照信号的数据收集数组没有默认值。如果数据收集数组为空,Debezium 会检测到不需要执行任何操作,并且不会执行快照。
如果要包含在快照中的集合的名称在数据库、架构或表的名称中包含点 (.),则要将该集合添加到数据集合数组中,必须转义该集合的每个部分名称用双引号引起来。
例如,要包含公共数据库中存在且名称为 My.Collection 的数据集合,请使用以下格式:“public”.“My.Collection”。
先决条件
- 信令已启用。
- 源数据库中存在信令数据集合。
- 信令数据收集在 signal.data.collection 属性中指定。
使用源信令通道触发增量快照
- 将快照信号文档插入到信令集合中:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});
例如,
db.debeziumSignal.insert({
"type" : "execute-snapshot",
"data" : {
"data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""],
"type": "incremental"}
});
命令中的id、type、data参数的取值与信令集合的字段相对应。
示例中的参数说明如下表:
表 3. 用于将增量快照信号发送到信令集合的 MongoDB insert() 命令中的字段描述
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增量快照事件消息
{"before":null,"after": {"pk":"1","value":"New data"},"source": {..."snapshot":"incremental" 1},"op":"r", 2"ts_ms":"1620393591654","transaction":null
}
6.使用Kafka信令通道触发增量快照
您可以向配置的 Kafka 主题发送消息,请求连接器运行临时增量快照。
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
消息的值是一个带有类型和数据字段的 JSON 对象。
信号类型为execute-snapshot,数据字段必须有以下字段:
表 4. 执行快照数据字段
执行快照 Kafka 消息的示例:
Key = `test_connector`Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
7.具有附加条件的临时增量快照
Debezium 使用附加条件字段来选择集合内容的子集。
通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:
SELECT * FROM <tableName> ….
当快照请求包含附加条件属性时,该属性的数据收集和过滤参数将附加到 SQL 查询中,例如:
SELECT * FROM <data-collection> WHERE <filter> ….
例如,给定一个包含 id(主键)、颜色和品牌列的产品集合,如果您希望快照仅包含 color=‘blue’ 的内容,则当您请求快照时,您可以添加附加 -用于过滤内容的条件属性:
Key = `test_connector`Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`
您可以使用additional-conditions 属性来传递基于多列的条件。例如,使用与上一示例中相同的产品集合,如果您希望快照仅包含产品集合中 color=‘blue’、brand=‘MyBrand’ 的内容,则可以发送以下请求:
Key = `test_connector`Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
8.停止增量快照
您还可以通过向源数据库上的集合发送信号来停止增量快照。您可以通过将文档插入信号集合来提交停止快照信号。 Debezium 检测到信号集合中的变化后,会读取信号,并停止正在进行的增量快照操作。
您提交的查询指定增量快照操作,以及(可选)要删除的当前运行快照的集合。
先决条件
- 信令已启用。
- 源数据库中存在信令数据集合。
- 信令数据收集在 signal.data.collection 属性中指定。
使用源信令通道停止增量快照
- 将停止快照信号文档插入到信号集合中:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});
例如,
db.debeziumSignal.insert({
"type" : "stop-snapshot",
"data" : {
"data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""],
"type": "incremental"}
});
signal命令中的id、type、data参数的取值对应于信令集合的字段。
示例中的参数说明如下表:
表5 向信令集合发送停止增量快照文档的插入命令字段说明
9.使用Kafka信令通道停止增量快照
您可以向配置的 Kafka 信令主题发送信号消息以停止即席增量快照。
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
消息的值是一个带有类型和数据字段的 JSON 对象。
信号类型为stop-snapshot,数据字段必须有以下字段:
表 6. 执行快照数据字段
以下示例显示了典型的停止快照 Kafka 消息:
Key = `test_connector`Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
10.阻止快照
为了在管理快照方面提供更大的灵活性,Debezium 包含一个补充的临时快照机制,称为阻塞快照。阻止快照依赖 Debezium 机制向 Debezium 连接器发送信号。
阻塞快照的行为就像初始快照一样,只是您可以在运行时触发它。
在以下情况下,您可能希望运行阻塞快照而不是使用标准初始快照进程:
- 您添加了一个新集合,并且希望在连接器运行时完成快照。
- 您添加了一个大型集合,并且希望快照在比增量快照更短的时间内完成。
11.阻塞快照进程
当您运行阻塞快照时,Debezium 会停止流式传输,然后启动指定集合的快照,遵循初始快照期间使用的相同流程。快照完成后,流将恢复。
12.配置快照
您可以在信号的数据组件中设置以下属性:
- data-collections:指定哪些集合必须是快照
- 附加条件:您可以为不同的集合指定不同的过滤器。
- data-collection 属性是要应用过滤器的集合的完全限定名称。
- 过滤器属性将具有与 snapshot.select.statement.overrides 中使用的相同值
例如:
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
13.可能重复
发送信号以触发快照的时间与流停止和快照开始的时间之间可能存在延迟。由于此延迟,在快照完成后,连接器可能会发出一些与快照捕获的记录重复的事件记录。
十一、流变化
副本集的连接器任务记录偏移量后,它使用该偏移量来确定 oplog 中应开始流式传输更改的位置。然后,该任务(取决于配置)要么连接到副本集的主节点,要么连接到副本集范围的更改流并从该位置开始流式传输更改。它处理所有创建、插入和删除操作,并将它们转换为 Debezium 更改事件。每个更改事件都包含 oplog 中找到操作的位置,并且连接器会定期将其记录为最近的偏移量。记录偏移量的时间间隔由 offset.flush.interval.ms 控制,这是 Kafka Connect 工作线程配置属性。
当连接器正常停止时,会记录最后处理的偏移量,以便在重新启动时,连接器将准确地从其停止的位置继续。但是,如果连接器的任务意外终止,则任务可能在最后记录偏移量之后但在记录最后偏移量之前处理并生成事件;重新启动后,连接器从最后记录的偏移量开始,可能会生成一些与崩溃之前生成的事件相同的事件。
注意:当 Kafka 管道中的所有组件正常运行时,Kafka 消费者只会接收每条消息一次。然而,当出现问题时,Kafka 只能保证消费者至少收到每条消息一次。为了避免意外结果,消费者必须能够处理重复的消息。
如前所述,连接器任务始终使用副本集的主节点来传输来自 oplog 的更改,从而确保连接器尽可能看到最新的操作,并且能够以比辅助节点更低的延迟捕获更改。代替使用。当副本集选择新的主节点时,连接器立即停止流式传输更改,连接到新的主节点,并开始从新主节点的同一位置流式传输更改。同样,如果连接器在与副本集成员通信时遇到任何问题,它会尝试使用指数退避来重新连接,以免淹没副本集,并且一旦连接,它就会从上次停止的位置继续流式传输更改。通过这种方式,连接器能够动态调整以适应副本集成员资格的变化并自动处理通信故障。
总而言之,MongoDB 连接器在大多数情况下都会继续运行。通信问题可能会导致连接器等待问题解决。
十二、原像支持
在 MongoDB 6.0 及更高版本中,您可以配置更改流以发出文档的原像状态,以填充 MongoDB 更改事件的 before 字段。要在 MongoDB 中使用原像,您必须使用 db.createCollection()、create 或 collMod 为集合设置changeStreamPreAndPostImages。要使 Debezium MongoDB 能够在更改事件中包含原像,请将连接器的 capture.mode 设置为 *_with_pre_image 选项之一。
注意:MongoDB 更改流事件的大小限制
MongoDB 更改流事件的大小限制为 16 MB。因此,原像的使用增加了超过该阈值的可能性,这可能导致失败。
十三、主题名称
MongoDB 连接器将每个集合中文档的所有插入、更新和删除操作的事件写入单个 Kafka 主题。 Kafka 主题的名称始终采用逻辑名称.数据库名称.集合名称的形式,其中逻辑名称是使用 topic.prefix 配置属性指定的连接器的逻辑名称,数据库名称是发生操作的数据库的名称,集合名称是受影响文档所在的 MongoDB 集合的名称。
例如,考虑一个 MongoDB 副本集,其库存数据库包含四个集合:产品、现有产品、客户和订单。如果监控此数据库的连接器被赋予了fulfillment的逻辑名称,那么连接器将生成关于这四个 Kafka 主题的事件:
- fulfillment.inventory.products
- fulfillment.inventory.products_on_hand
- fulfillment.inventory.customers
- fulfillment.inventory.orders
请注意,主题名称不包含副本集名称或分片名称。因此,对分片集合(其中每个分片包含集合文档的子集)的所有更改都将转到同一个 Kafka 主题。
您可以将 Kafka 设置为根据需要自动创建主题。如果没有,则必须在启动连接器之前使用 Kafka 管理工具创建主题。
十四、分区
MongoDB 连接器不会明确确定如何对事件主题进行分区。相反,它允许 Kafka 根据事件键确定如何对主题进行分区。您可以通过在 Kafka Connect 工作配置中定义 Partitioner 实现的名称来更改 Kafka 的分区逻辑。
Kafka 仅维护写入单个主题分区的事件的总顺序。按键对事件进行分区确实意味着具有相同键的所有事件始终进入同一分区。这可确保特定文档的所有事件始终完全有序。
十五、交易元数据
Debezium 可以生成代表事务元数据边界的事件并丰富变更数据事件消息。
Debezium 接收交易元数据的时间限制
Debezium 仅注册和接收部署连接器后发生的事务的元数据。部署连接器之前发生的事务的元数据不可用。
对于每笔交易的 BEGIN 和 END,Debezium 都会生成一个包含以下字段的事件:
状态:
- 开始或结束
ID:
- 唯一交易标识符的字符串表示形式。
event_count(对于 END 事件):
- 事务发出的事件总数。
data_collections(对于 END 事件):
- data_collection 和 event_count 对的数组,提供源自给定数据集合的更改所发出的事件数。
以下示例显示了一条典型消息:
{"status": "BEGIN","id": "1462833718356672513","event_count": null,"data_collections": null
}{"status": "END","id": "1462833718356672513","event_count": 2,"data_collections": [{"data_collection": "rs0.testDB.collectiona","event_count": 1},{"data_collection": "rs0.testDB.collectionb","event_count": 1}]
}
除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic.prefix>.transaction 的主题。
十六、变更数据事件丰富
启用事务元数据后,数据消息信封将通过新的事务字段进行丰富。该字段以字段组合的形式提供有关每个事件的信息:
ID:
- 唯一交易标识符的字符串表示形式。
总订单数:
- 该事件在事务生成的所有事件中的绝对位置。
数据收集顺序:
- 事件在事务发出的所有事件中的每个数据收集位置。
以下是消息的示例:
{"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}","source": {
...},"op": "c","ts_ms": "1580390884335","transaction": {"id": "1462833718356672513","total_order": "1","data_collection_order": "1"}
}
十七、数据变更事件
Debezium MongoDB 连接器为插入、更新或删除数据的每个文档级操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于更改的集合。
Debezium 和 Kafka Connect 是围绕连续的事件消息流而设计的。然而,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的架构,或者,如果您使用架构注册表,则还包含消费者可用于从注册表获取架构的架构 ID。这使得每个事件都是独立的。
以下 JSON 框架显示了更改事件的基本四个部分。但是,您选择在应用程序中使用的 Kafka Connect 转换器的配置方式决定了这四个部分在更改事件中的表示。仅当您配置转换器来生成模式字段时,模式字段才会处于更改事件中。同样,仅当您配置转换器来生成事件键和事件负载时,事件键和事件负载才会出现在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:
{"schema": { 1...},"payload": { 2...},"schema": { 3...},"payload": { 4...},
}
表 7. 变更事件基本内容概述
默认情况下,连接器将事件记录流更改为名称与事件的原始集合相同的主题。
MongoDB 连接器确保所有 Kafka Connect 架构名称都遵循 Avro 架构名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 _。如果存在无效字符,则将其替换为下划线字符。
如果逻辑服务器名称、数据库名称或集合名称包含无效字符,并且唯一区分名称的字符无效并用下划线替换,则可能会导致意外冲突。
十八、更改事件键
更改事件的键包含已更改文档的键的架构和已更改文档的实际键。对于给定的集合,模式及其相应的负载都包含单个 id 字段。该字段的值是文档的标识符,表示为从 MongoDB 扩展 JSON 序列化严格模式派生的字符串。
考虑一个逻辑名称为“fulfillment”的连接器、一个包含库存数据库的副本集以及一个包含如下文档的客户集合。
示例文档
{"_id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"
}
更改事件键示例
捕获客户集合更改的每个更改事件都具有相同的事件键架构。只要客户集合具有先前的定义,捕获客户集合更改的每个更改事件都具有以下关键结构。在 JSON 中,它看起来像这样:
{"schema": { 1"type": "struct","name": "fulfillment.inventory.customers.Key", 2"optional": false, 3"fields": [ 4{"field": "id","type": "string","optional": false}]},"payload": { 5"id": "1004"}
}
表 8. 更改事件键的说明
此示例使用带有整数标识符的文档,但任何有效的 MongoDB 文档标识符都以相同的方式工作,包括文档标识符。对于文档标识符,事件键的 Payload.id 值是一个字符串,表示更新文档的原始 _id 字段,作为使用严格模式的 MongoDB 扩展 JSON 序列化。下表提供了如何表示不同类型的 _id 字段的示例。
表 9. 事件密钥有效负载中表示文档 _id 字段的示例
十九、更改事件值
更改事件中的值比键稍微复杂一些。与键一样,值也具有模式部分和有效负载部分。模式部分包含描述有效负载部分的 Envelope 结构的模式,包括其嵌套字段。创建、更新或删除数据的操作的更改事件都具有带有信封结构的值有效负载。
考虑用于显示更改事件键示例的相同示例文档:
示例文档
{"_id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"
}
针对每种事件类型描述了对此文档的更改的更改事件的值部分:
- 创建事件
- 更新事件
- 删除事件
- 墓碑事件
二十、创建事件
以下示例显示连接器为在客户集合中创建数据的操作生成的更改事件的值部分:
{"schema": { 1"type": "struct","fields": [{"type": "string","optional": true,"name": "io.debezium.data.Json", 2"version": 1,"field": "after"},{"type": "string","optional": true,"name": "io.debezium.data.Json", "version": 1,"field": "patch"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "boolean","optional": true,"default": false,"field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": false,"field": "rs"},{"type": "string","optional": false,"field": "collection"},{"type": "int32","optional": false,"field": "ord"},{"type": "int64","optional": true,"field": "h"}],"optional": false,"name": "io.debezium.connector.mongo.Source", 3"field": "source"},{"type": "string","optional": true,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"}],"optional": false,"name": "dbserver1.inventory.customers.Envelope" 4},"payload": { 5"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", 6"source": { 7"version": "2.4.0.Final","connector": "mongodb","name": "fulfillment","ts_ms": 1558965508000,"snapshot": false,"db": "inventory","rs": "rs0","collection": "customers","ord": 31,"h": 1546547425148721999},"op": "c", 8"ts_ms": 1558965515240 9}}
表 10. 创建事件值字段的描述
二十一、更新事件
更改流捕获模式
示例客户集合中更新的更改事件的值与该集合的创建事件具有相同的架构。同样,事件值的有效负载具有相同的结构。但是,事件值有效负载在更新事件中包含不同的值。仅当 capture.mode 选项设置为change_streams_update_full 时,更新事件才包含后值。如果 capture.mode 选项设置为 *_with_pre_image 选项之一,则提供之前值。在本例中,有一个新的结构化字段 updateDescription 以及一些附加字段:
- UpdatedFields 是一个字符串字段,其中包含已更新文档字段及其值的 JSON 表示形式
- removedFields 是从文档中删除的字段名称的列表
- truncatedArrays 是文档中被截断的数组列表
以下是连接器为客户集合中的更新生成的事件中的更改事件值的示例:
{"schema": { ... },"payload": {"op": "u", 1"ts_ms": 1465491461815, 2"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": 3\"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", "after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 4"updateDescription": {"removedFields": null,"updatedFields": "{\"first_name\": \"Anne Marie\"}", 5"truncatedArrays": null},"source": { 6"version": "2.4.0.Final","connector": "mongodb","name": "fulfillment","ts_ms": 1558965508000,"snapshot": false,"db": "inventory","rs": "rs0","collection": "customers","ord": 1,"h": null,"tord": null,"stxnid": null,"lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}","txnNumber":1}}}
表 11. 更新事件值字段说明
注意:
事件中的后值应作为文档的时间点值进行处理。该值不是动态计算的,而是从集合中获取的。因此,如果多个更新紧密地一个接一个地进行,则所有更新更新事件将包含相同的后值,该后值将表示存储在文档中的最后一个值。
如果您的应用程序依赖于渐变演变,那么您应该仅依赖 updateDescription。
二十二、删除事件
删除更改事件中的值与同一集合的创建和更新事件具有相同的架构部分。删除事件中的有效负载部分包含与同一集合的创建和更新事件不同的值。特别是,删除事件既不包含后值也不包含 updateDescription 值。以下是客户集合中文档的删除事件的示例:
{"schema": { ... },"payload": {"op": "d", 1"ts_ms": 1465495462115, 2"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 3"source": { 4"version": "2.4.0.Final","connector": "mongodb","name": "fulfillment","ts_ms": 1558965508000,"snapshot": true,"db": "inventory","rs": "rs0","collection": "customers","ord": 6,"h": 1546547425148721999}}}
表 12. 删除事件值字段说明
二十三、墓碑事件
MongoDB 连接器事件旨在与 Kafka 日志压缩配合使用。只要保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这使得 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并且可用于重新加载基于键的状态。
二十四、设置mongodb
MongoDB 连接器使用 MongoDB 的更改流来捕获更改,因此该连接器仅适用于 MongoDB 副本集或分片集群,其中每个分片都是一个单独的副本集。有关设置副本集或分片集群的信息,请参阅 MongoDB 文档。另外,请务必了解如何使用副本集启用访问控制和身份验证。
您还必须有一个 MongoDB 用户,该用户具有适当的角色来读取可以读取操作日志的管理数据库。此外,用户还必须能够读取分片集群的配置服务器中的配置数据库,并且必须具有 listDatabases 权限操作。当使用变更流(默认)时,用户还必须具有集群范围的权限操作 find 和 changeStream。
当您打算使用原像并填充 before 字段时,您需要首先使用 db.createCollection()、create 或 collMod 为集合启用changeStreamPreAndPostImages。
云中的 MongoDB
您可以将 MongoDB 的 Debezium 连接器与 MongoDB Atlas 结合使用。请注意,MongoDB Atlas 仅支持通过 SSL 的安全连接,即 +mongodb.ssl.enabled 连接器选项必须设置为 true。
二十五、最佳 Oplog 配置
Debezium MongoDB 连接器读取更改流以获取副本集的 oplog 数据。因为 oplog 是一个固定大小、有上限的集合,所以如果它超过其最大配置大小,它就会开始覆盖其最旧的条目。如果连接器因任何原因停止,当它重新启动时,它会尝试从最后一个 oplog 流位置恢复流式传输。但是,如果从 oplog 中删除了最后一个流位置,则根据连接器的 snapshot.mode 属性中指定的值,连接器可能无法启动,并报告无效的恢复令牌错误。如果发生故障,您必须创建一个新的连接器,以使 Debezium 能够继续从数据库捕获记录。如果 snapshot.mode 设置为初始,连接器在长时间停止后会失败。
为了确保 oplog 保留 Debezium 恢复流式传输所需的偏移值,您可以使用以下方法之一:
- 增加 oplog 的大小。根据您的典型工作负载,将 oplog 大小设置为大于每小时 oplog 条目峰值数的值。
- 增加 oplog 条目保留的最短小时数(MongoDB 4.4 及更高版本)。此设置是基于时间的,这样即使 oplog 达到其最大配置大小,也保证过去 n 小时内的条目可用。尽管这通常是首选选项,但对于具有接近容量的高工作负载的集群,请指定最大 oplog 大小。
为了帮助防止与丢失 oplog 条目相关的故障,跟踪报告复制行为的指标并优化 oplog 大小以支持 Debezium 非常重要。特别是,您应该监视 Oplog GB/Hour 和 Replication Oplog Window 的值。如果 Debezium 离线的时间间隔超过了复制 oplog 窗口的值,并且主 oplog 的增长速度快于 Debezium 消耗条目的速度,则可能会导致连接器故障。
有关如何监控这些指标的信息,请参阅 MongoDB 文档。
最好将最大 oplog 大小设置为基于 oplog 的预期每小时增长(Oplog GB/小时)的值,乘以解决 Debezium 故障可能需要的时间。
那是,
Oplog GB/Hour X average reaction time to Debezium failure
例如,如果oplog大小限制设置为1GB,并且oplog每小时增长3GB,则oplog条目每小时会被清除3次。如果 Debezium 在这段时间内失败,它的最后一个 oplog 位置可能会被删除。
如果 oplog 以 3GB/小时的速度增长,并且 Debezium 离线两个小时,那么您可以将 oplog 大小设置为 3GB/小时 X 2 小时,即 6GB。
二十六、部署
下载mongodb数据库的debezium2.4版本插件:
- mongodb数据库的debezium2.4插件下载地址
部署加载mongodb数据库debezium2.4版本的详细步骤:
- Debezium系列之:安装部署debezium详细步骤,并把debezium服务托管到systemctl
二十七、MongoDB 连接器配置示例
以下是连接器实例的配置示例,该实例从 192.168.99.100 端口 27017 上的 MongoDB 副本集 rs0 捕获数据,我们在逻辑上将其命名为 fullfillment。通常,您可以通过设置连接器可用的配置属性在 JSON 文件中配置 Debezium MongoDB 连接器。
您可以选择为特定 MongoDB 副本集或分片集群生成事件。或者,您可以过滤掉不需要的集合。
{"name": "inventory-connector", 1"config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 2"mongodb.connection.string": "mongodb://192.168.99.100:27017/?replicaSet=rs0", 3"topic.prefix": "fullfillment", 4"collection.include.list": "inventory[.]*" 5}
}
- 1.当我们向 Kafka Connect 服务注册连接器时的名称。
- 2.MongoDB 连接器类的名称。
- 3.用于连接到 MongoDB 副本集的连接字符串。
- 4.MongoDB 副本集的逻辑名称,它形成生成事件的命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 架构名称以及 Avro 启动时对应的 Avro 架构的命名空间使用转换器。
- 5.与要监视的所有集合的集合命名空间(例如,.)匹配的正则表达式列表。这是可选的。
您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个执行以下操作的连接器任务:
- 连接到 MongoDB 副本集或分片集群。
- 为每个副本集分配任务。
- 如有必要,执行快照。
- 读取变更流。
- 流将事件记录更改为 Kafka 主题。
完整案例:
- Debezium系列之:打通Debezium对低版本MongoDB数据库4.0版本的数据采集技术
- Debezium系列之:Debezium2.X版本Mysql数据库、Sqlserver数据库、MongoDB数据库debezium connector最新完整的参数配置,并详细介绍参数含义
二十八、连接器属性
Debezium MongoDB 连接器具有许多配置属性,您可以使用它们来为您的应用程序实现正确的连接器行为。许多属性都有默认值。有关属性的信息组织如下:
- 必需的 Debezium MongoDB 连接器配置属性
- 高级 Debezium MongoDB 连接器配置属性
除非有默认值,否则需要以下配置属性。
表 13. 所需的 Debezium MongoDB 连接器配置属性
以下高级配置属性具有良好的默认值,适用于大多数情况,因此很少需要在连接器的配置中指定。
表 14. Debezium MongoDB 连接器高级配置属性
二十九、Debezium 连接器 Kafka 信号配置属性
Debezium 提供了一组 signal.* 属性,用于控制连接器如何与 Kafka 信号主题交互。
下表描述了 Kafka 信号属性。
表 15. Kafka 信号配置属性
Debezium 连接器传递信号 Kafka 消费者客户端配置属性
Debezium 连接器提供信号 Kafka 消费者的直通配置。直通信号属性以前缀 Signals.consumer.* 开头。例如,连接器将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 使用者。
Debezium 在将属性传递给 Kafka 信号使用者之前会去除属性中的前缀。
Debezium 连接器接收器通知配置属性
下表描述了通知属性。
表 16. 接收器通知配置属性
三十、监控
- Debezium系列之:安装jmx导出器监控debezium指标
- Debezium系列之:深入解读Debezium重要的jmx指标
- Debezium系列之:prometheus采集debezium的jmx数据,grafana通过dashboard展示debezium的jmx数据
99%的人还看了
猜你感兴趣
版权申明
本文"Debezium日常分享系列之:Debezium2.4版本之用于 MongoDB的Debezium 连接器":http://eshow365.cn/6-26773-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!
- 上一篇: 简化geojson策略
- 下一篇: 自动驾驶之—LaneAF学习相关总结