使用更改数据捕获处理器复制数据
概述
按照本教程学习如何使用 变更数据捕获(CDC)处理器 通过 MongoDB Kafka 连接器进行数据复制。CDC 处理器是一个将 CDC 事件转换为 MongoDB 写操作的应用程序。当您必须将一个数据存储中的更改复制到另一个数据存储时,请使用 CDC 处理器。
在本教程中,您将配置和运行 MongoDB Kafka 源和目标连接器,使用 CDC 使两个 MongoDB 集合包含相同的文档。源连接器将原始集合的变化流数据写入 Kafka 主题,目标连接器将 Kafka 主题数据写入目标 MongoDB 集合。
如果您想了解更多关于 CDC 处理器的工作原理,请参阅变更数据捕获处理器 指南。
使用 CDC 处理器复制数据
完成教程设置
完成 Kafka 连接器教程设置 中的步骤以启动 Confluent Kafka Connect 和 MongoDB 环境。
开始交互式Shell
在Docker容器中分别打开两个交互式Shell窗口。在教程中,您可以使用Shell来运行和观察不同的任务。
从终端运行以下命令以启动交互式Shell。
docker exec -it mongo1 /bin/bash
在本教程中,我们将称此交互式Shell为CDCShell1。
在第二个终端中运行以下命令以启动交互式Shell
docker exec -it mongo1 /bin/bash
在本教程中,我们将称此交互式Shell为CDCShell2。
将两个窗口排列在您的屏幕上,以便同时可见,以便查看实时更新。
使用CDCShell1配置您的连接器并监控您的Kafka主题。使用CDCShell2在MongoDB中执行写操作。
配置源连接器
在CDCShell1中,配置一个源连接器,从CDCTutorial.Source
MongoDB命名空间读取,并写入到CDCTutorial.Source
Kafka主题。
使用以下命令创建一个名为cdc-source.json
的配置文件
nano cdc-source.json
将以下配置信息粘贴到文件中并保存更改
{ "name": "mongo-cdc-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Source" } }
在CDCShell1中运行以下命令以使用您创建的配置文件启动源连接器
cx cdc-source.json
注意
cx
命令是教程开发环境中的一个自定义脚本。此脚本运行以下等效请求到Kafka Connect REST API以创建新的连接器
curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"
在Shell中运行以下命令以检查连接器的状态
status
如果您的源连接器启动成功,您应该看到以下输出
Kafka topics: ... The status of the connectors: source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-source" ] ...
配置接收连接器
在 CDCShell1 中,配置一个接收连接器,将 CDCTutorial.Source
Kafka 主题中的数据复制到 CDCTutorial.Destination
MongoDB 命名空间。
使用以下命令创建一个名为 cdc-sink.json
的配置文件
nano cdc-sink.json
将以下配置信息粘贴到文件中并保存更改
{ "name": "mongo-cdc-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "CDCTutorial.Source", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Destination" } }
在 shell 中运行以下命令以启动接收连接器并使用您创建的配置文件
cx cdc-sink.json
在Shell中运行以下命令以检查连接器的状态
status
如果您的接收连接器成功启动,您应该会看到以下输出
Kafka topics: ... The status of the connectors: sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-sink" "mongo-cdc-source" ] ...
将数据写入源并观察数据流
在 CDCShell2 中,使用 mongosh
连接到 MongoDB,通过运行以下命令
mongosh "mongodb://mongo1"
连接成功后,您应该会看到以下 MongoDB 命令行提示符
rs0 [direct: primary] test>
在提示符下,键入以下命令以向 CDCTutorial.Source
MongoDB 命名空间中插入一个新文档
use CDCTutorial db.Source.insertOne({ proclaim: "Hello World!" });
一旦 MongoDB 完成插入命令,您应该会收到以下类似文本的确认
{ acknowledged: true, insertedId: ObjectId("600b38ad...") }
源连接器获取更改并将其发布到 Kafka 主题。您应该在 CDCShell1 窗口中看到以下主题消息
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8260..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } }, "wallTime": { "$date": "..." }, "fullDocument": { "_id": { "$oid": "600b38ad..." }, "proclaim": "Hello World!" }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "600b38a..." } } } }
目标连接器获取 Kafka 消息并将数据存储到 MongoDB 中。您可以通过在 CDCShell2 中启动的 MongoDB 命令行中运行以下命令从 MongoDB 中的 CDCTutorial.Destination
命名空间检索文档
db.Destination.find()
您应该会看到以下文档作为结果返回
[ { _id: ObjectId("600b38a..."), proclaim: 'Hello World' } ]
(可选) 生成更多更改
尝试通过从 MongoDB 命令行运行以下命令从 CDCTutorial.Source
命名空间中删除文档
db.Source.deleteMany({})
您应该在 CDCShell1 窗口中看到以下主题消息
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8261...." }, ... "operationType": "delete", "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "6138..." } } } }
运行以下命令以检索集合中当前文档的数量
db.Destination.count()
这返回以下输出,表示集合为空
0
运行以下命令退出 MongoDB 命令行
exit
摘要
在本教程中,您将设置一个源连接器以捕获 MongoDB 集合的更改并将其发送到 Apache Kafka。您还配置了一个带有 MongoDB CDC 处理器的目标连接器,以将数据从 Apache Kafka 移动到 MongoDB 集合。
了解更多
阅读以下资源以了解更多关于本教程中提到的概念