文档菜单
文档首页
/
MongoDB Kafka 连接器
/

使用更改数据捕获处理器复制数据

在本页中

  • 概述
  • 使用 CDC 处理器复制数据
  • 完成教程设置
  • 启动交互式外壳
  • 配置源连接器
  • 配置目标连接器
  • 监控 Kafka 主题
  • 将数据写入源并观察数据流
  • (可选) 生成更多更改
  • 总结
  • 了解更多信息

按照本教程学习如何使用 变更数据捕获(CDC)处理器 通过 MongoDB Kafka 连接器进行数据复制。CDC 处理器是一个将 CDC 事件转换为 MongoDB 写操作的应用程序。当您必须将一个数据存储中的更改复制到另一个数据存储时,请使用 CDC 处理器。

在本教程中,您将配置和运行 MongoDB Kafka 源和目标连接器,使用 CDC 使两个 MongoDB 集合包含相同的文档。源连接器将原始集合的变化流数据写入 Kafka 主题,目标连接器将 Kafka 主题数据写入目标 MongoDB 集合。

如果您想了解更多关于 CDC 处理器的工作原理,请参阅变更数据捕获处理器 指南。

1

完成 Kafka 连接器教程设置 中的步骤以启动 Confluent Kafka Connect 和 MongoDB 环境。

2

在Docker容器中分别打开两个交互式Shell窗口。在教程中,您可以使用Shell来运行和观察不同的任务。

从终端运行以下命令以启动交互式Shell。

docker exec -it mongo1 /bin/bash

在本教程中,我们将称此交互式Shell为CDCShell1

在第二个终端中运行以下命令以启动交互式Shell

docker exec -it mongo1 /bin/bash

在本教程中,我们将称此交互式Shell为CDCShell2

将两个窗口排列在您的屏幕上,以便同时可见,以便查看实时更新。

使用CDCShell1配置您的连接器并监控您的Kafka主题。使用CDCShell2在MongoDB中执行写操作。

3

CDCShell1中,配置一个源连接器,从CDCTutorial.Source MongoDB命名空间读取,并写入到CDCTutorial.Source Kafka主题。

使用以下命令创建一个名为cdc-source.json的配置文件

nano cdc-source.json

将以下配置信息粘贴到文件中并保存更改

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

CDCShell1中运行以下命令以使用您创建的配置文件启动源连接器

cx cdc-source.json

注意

cx命令是教程开发环境中的一个自定义脚本。此脚本运行以下等效请求到Kafka Connect REST API以创建新的连接器

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

在Shell中运行以下命令以检查连接器的状态

status

如果您的源连接器启动成功,您应该看到以下输出

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

CDCShell1 中,配置一个接收连接器,将 CDCTutorial.Source Kafka 主题中的数据复制到 CDCTutorial.Destination MongoDB 命名空间。

使用以下命令创建一个名为 cdc-sink.json 的配置文件

nano cdc-sink.json

将以下配置信息粘贴到文件中并保存更改

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

在 shell 中运行以下命令以启动接收连接器并使用您创建的配置文件

cx cdc-sink.json

在Shell中运行以下命令以检查连接器的状态

status

如果您的接收连接器成功启动,您应该会看到以下输出

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

CDCShell1 中,监控 Kafka 主题的传入事件。运行以下命令以启动 kafkacat 应用程序,该程序输出主题发布的数据

kc CDCTutorial.Source

注意

kc 命令是教程开发环境中包含的一个自定义脚本,用于调用 kafkacat 应用程序,并带有连接到 Kafka 和格式化指定主题输出的选项。

启动后,您应该会看到以下输出,指示目前没有可读取的数据

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

CDCShell2 中,使用 mongosh 连接到 MongoDB,通过运行以下命令

mongosh "mongodb://mongo1"

连接成功后,您应该会看到以下 MongoDB 命令行提示符

rs0 [direct: primary] test>

在提示符下,键入以下命令以向 CDCTutorial.Source MongoDB 命名空间中插入一个新文档

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

一旦 MongoDB 完成插入命令,您应该会收到以下类似文本的确认

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

源连接器获取更改并将其发布到 Kafka 主题。您应该在 CDCShell1 窗口中看到以下主题消息

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

目标连接器获取 Kafka 消息并将数据存储到 MongoDB 中。您可以通过在 CDCShell2 中启动的 MongoDB 命令行中运行以下命令从 MongoDB 中的 CDCTutorial.Destination 命名空间检索文档

db.Destination.find()

您应该会看到以下文档作为结果返回

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

尝试通过从 MongoDB 命令行运行以下命令从 CDCTutorial.Source 命名空间中删除文档

db.Source.deleteMany({})

您应该在 CDCShell1 窗口中看到以下主题消息

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

运行以下命令以检索集合中当前文档的数量

db.Destination.count()

这返回以下输出,表示集合为空

0

运行以下命令退出 MongoDB 命令行

exit

在本教程中,您将设置一个源连接器以捕获 MongoDB 集合的更改并将其发送到 Apache Kafka。您还配置了一个带有 MongoDB CDC 处理器的目标连接器,以将数据从 Apache Kafka 移动到 MongoDB 集合。

阅读以下资源以了解更多关于本教程中提到的概念

返回

使用 MongoDB Kafka 目标连接器入门