开始使用 MongoDB Kafka 源连接器
按照此教程学习如何配置 MongoDB Kafka 源连接器,以从变更流中读取数据并将其发布到 Apache Kafka 主题。
开始使用 MongoDB Kafka 源连接器
完成教程设置
完成以下步骤:Kafka Connector 教程设置以启动 Confluent Kafka Connect 和 MongoDB 环境。
配置源连接器
使用以下命令在为教程设置下载的 Docker 容器上创建一个交互式 shell 会话
docker exec -it mongo1 /bin/bash
创建一个名为simplesource.json
的源配置文件,使用以下命令
nano simplesource.json
将以下配置信息粘贴到文件中并保存更改
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "Tutorial1", "collection": "orders" } }
在 shell 中运行以下命令以使用您创建的配置文件启动源连接器
cx simplesource.json
注意
cx
命令是教程开发环境中包含的定制脚本。此脚本运行以下等效请求到 Kafka Connect REST API 以创建新的连接器
curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"
在 shell 中运行以下命令以检查连接器的状态
status
如果您的源连接器启动成功,您应看到以下输出
Kafka topics: ... The status of the connectors: source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-simple-source" ] ...
创建变更事件
在相同的shell中,使用以下命令连接到MongoDB,即MongoDB shell的mongosh
mongosh "mongodb://mongo1"
成功连接后,您应该看到以下MongoDB shell提示符
rs0 [direct: primary] test>
在提示符下,输入以下命令以插入新文档
use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
一旦MongoDB完成插入命令,您应该收到类似于以下文本的确认
{ acknowledged: true, insertedId: ObjectId("627e7e...") }
通过输入命令exit
退出MongoDB shell。
使用以下命令检查Kafka环境的状态
status
在上一个命令的输出中,您应该看到源连接器在收到变更事件后创建的新主题
... "topic": "Tutorial1.orders", ...
通过运行以下命令确认新Kafka主题上的数据内容
kc Tutorial1.orders
注意
kc
命令是一个辅助脚本,用于输出Kafka主题的内容。
运行前面的命令时,您应该看到以下Kafka主题数据,按“键”和“值”部分组织
从输出中的“值”部分,您可以找到包含以下格式化的JSON文档中突出显示的payload
部分的fullDocument
数据
{ "_id": { "_data": "8262655A..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650809557, "i": 2 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "62655a..." }, "order_id": 1, "item": "coffee" }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "62655a..." } } }
重新配置Change Stream
您可以通过将变更流配置为仅返回fullDocument
字段来省略由变更流创建的事件中的元数据。
使用以下命令停止连接器
del mongo-simple-source
注意
del
命令是一个辅助脚本,它调用Kafka Connect REST API来停止连接器,相当于以下命令
curl -X DELETE connect:8083/connectors/<parameter>
使用以下命令编辑名为simplesource.json
的源配置文件
nano simplesource.json
删除现有配置,添加以下配置,并保存文件
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } }
在shell中运行以下命令以使用更新的配置文件启动源连接器
cx simplesource.json
使用以下命令通过mongosh
连接到MongoDB
mongosh "mongodb://mongo1"
在提示符下,输入以下命令以插入新文档
use Tutorial1 db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )
通过运行以下命令退出mongosh
exit
通过运行以下命令确认新Kafka主题上的数据内容
kc Tutorial1.orders
在“值”文档中,负载
字段应仅包含以下文档数据
{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
(可选) 停止Docker容器
完成本教程后,通过停止或删除Docker资产来释放您计算机上的免费资源。您可以选择删除Docker容器和镜像,或者仅删除容器。如果您删除容器和镜像,您必须重新下载它们以重启MongoDB Kafka Connector开发环境,该环境大小约为2.4 GB。如果您仅删除容器,您可以重用镜像并避免下载样本数据管道中的大部分大型文件。
提示
更多教程
如果您计划完成更多MongoDB Kafka Connector教程,请考虑仅删除容器。如果您不打算完成更多MongoDB Kafka Connector教程,请考虑删除容器和镜像。
选择您要运行的删除任务的标签页。
运行以下shell命令以删除开发环境的Docker容器和镜像
docker-compose -p mongo-kafka down --rmi all
运行以下shell命令以删除Docker容器但保留镜像
docker-compose -p mongo-kafka down
要重启容器,请遵循教程设置中启动它们的相同步骤。
总结
在本教程中,您使用不同的配置启动了源连接器,以更改发布到Kafka主题的更改流事件数据。
了解更多
阅读以下资源,了解更多关于本教程中提到的概念