更改数据捕获处理器
概述
学习如何使用MongoDB Kafka连接器复制更改数据捕获(CDC)事件。CDC是一种将数据存储中的更改转换为CDC事件流的软件架构。CDC事件是一个包含对数据存储执行更改的可重现表示的消息。复制数据是将包含在CDC事件中的更改应用到从数据存储到不同数据存储的过程,以便更改在两个数据存储中发生。
使用CDC处理器将存储在Apache Kafka主题上的CDC事件复制到MongoDB。CDC处理器是一个将特定CDC事件生产者中的CDC事件转换为MongoDB写操作的程序。
CDC事件生产者是生成CDC事件的程序。CDC事件生产者可以是数据存储,也可以是监视数据存储并生成与数据存储中更改相对应的CDC事件的程序。
注意
MongoDB更改流是CDC架构的一个例子。要了解更多关于更改流的信息,请参阅MongoDB Kafka连接器指南中的更改流.
如果您想查看演示如何复制数据的教程,请参阅使用更改数据捕获处理器复制数据教程。
指定一个CDC处理器
您可以使用以下配置选项在您的连接器上指定CDC处理器
change.data.capture.handler=<cdc handler class>
有关更多信息,请参阅更改数据捕获配置选项。
可用的CDC处理器
sink连接器为以下CDC事件生产者提供CDC处理器
MongoDB
点击以下选项卡了解如何配置 preceding事件生产者的CDC处理器
以下属性文件配置了一个sink连接器以复制MongoDB更改事件文档
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your connection uri> database=<your database> collection=<your collection> topics=<topic containing mongodb change event documents> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
要查看MongoDB CDC处理器的源代码,请参阅MongoDB Kafka Connector源代码。
您的sink连接器可以复制来自以下数据存储的Debezium CDC事件
MongoDB
Postgres
MySQL
点击以下选项卡查看如何配置Debezium CDC处理器以复制来自前面数据存储的CDC事件
以下属性文件配置了一个sink连接器以复制与MongoDB实例更改相对应的Debezium CDC事件
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.ChangeStreamHandler
注意
如果您使用的是低于2.0的Debezium CDC版本,请将change.data.capture.handler
属性值设置为 com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler
。
要查看Debezium CDC处理程序的源代码,请参阅 MongoDB Kafka Connector源代码。
以下属性文件配置了一个数据源连接器,用于复制对应Postgres实例变化的Debezium CDC事件
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler
要查看Debezium CDC处理程序的源代码,请参阅 MongoDB Kafka Connector源代码。
以下属性文件配置了一个数据源连接器,用于复制对应MySQL实例变化的Debezium CDC事件
connector.class=com.mongodb.kafka.connect.sink.MongoSinkConnector connection.uri=<your connection uri> database=<your mongodb database> collection=<your mongodb collection> topics=<topic containing debezium cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler
要查看Debezium CDC处理程序的源代码,请参阅 MongoDB Kafka Connector源代码。
注意
自定义Debezium CDC处理程序
如果Debezium CDC处理器无法从您的数据存储中复制CDC事件,您可以通过扩展DebeziumCdcHandler类来自定义处理器。有关自定义CDC处理器的更多信息,请参阅本指南的创建您自己的CDC处理器部分。
您的sink连接器可以复制来自以下数据存储的Qlik Replicate CDC事件
OracleDB
MySQL
Postgres
以下属性文件配置了一个sink连接器,用于复制Qlik Replicate CDC事件
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your connection uri> database=<your database> collection=<your collection> topics=<topic containing qlik replicate cdc events> change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler
要查看Qlik Replicate CDC处理器的源代码,请参阅MongoDB Kafka Connector源代码。
注意
自定义Qlik Replicate CDC处理器
如果Qlik Replicate CDC处理程序无法从您的数据存储中复制CDC事件,您可以通过扩展QlikCdcHandler类来自定义处理程序。有关自定义CDC处理程序的信息,请参阅本指南中的创建自己的CDC处理程序部分。
创建自己的CDC处理程序
如果预构建的CDC处理程序不符合您的用例,您可以创建自己的。您的自定义CDC处理程序是一个实现了CdcHandler
接口的Java类。
要了解更多信息,请参阅CdcHandler接口的源代码。
要查看CDC处理器实现的示例,请参阅预构建CDC处理器的源代码。
如何使用您的CDC处理器
要将您的sink连接器配置为使用自定义CDC处理器,您必须执行以下操作
将您的自定义CDC处理器类编译成JAR文件。
将编译好的JAR添加到您的Kafka工作者的classpath/plugin路径中。有关插件路径的更多信息,请参阅Confluent文档。
注意
Kafka Connect在隔离模式下加载插件。当您部署自定义写入策略时,连接器JAR和CDC处理器JAR应位于同一路径。您的路径应类似于以下内容
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
<plugin.path>/mongo-kafka-connect/custom-CDC-handler.jar
要了解更多关于Kafka Connect插件的信息,请参阅Confluent的本指南。
在
change.data.capture.handler
配置设置中指定您的自定义类。请参阅此配置。
要了解如何将类编译成JAR文件,请参阅Oracle的本指南。