文档菜单
文档首页
/
MongoDB Kafka 连接器
/

开始使用 MongoDB Kafka 源连接器

本页

  • 开始使用 MongoDB Kafka 源连接器
  • 摘要
  • 了解更多信息

按照此教程学习如何配置 MongoDB Kafka 源连接器,以从变更流中读取数据并将其发布到 Apache Kafka 主题。

1

完成以下步骤:Kafka Connector 教程设置以启动 Confluent Kafka Connect 和 MongoDB 环境。

2

使用以下命令在为教程设置下载的 Docker 容器上创建一个交互式 shell 会话

docker exec -it mongo1 /bin/bash

创建一个名为simplesource.json的源配置文件,使用以下命令

nano simplesource.json

将以下配置信息粘贴到文件中并保存更改

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "Tutorial1",
"collection": "orders"
}
}

在 shell 中运行以下命令以使用您创建的配置文件启动源连接器

cx simplesource.json

注意

cx命令是教程开发环境中包含的定制脚本。此脚本运行以下等效请求到 Kafka Connect REST API 以创建新的连接器

curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"

在 shell 中运行以下命令以检查连接器的状态

status

如果您的源连接器启动成功,您应看到以下输出

Kafka topics:
...
The status of the connectors:
source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-simple-source"
]
...
3

在相同的shell中,使用以下命令连接到MongoDB,即MongoDB shell的mongosh

mongosh "mongodb://mongo1"

成功连接后,您应该看到以下MongoDB shell提示符

rs0 [direct: primary] test>

在提示符下,输入以下命令以插入新文档

use Tutorial1
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

一旦MongoDB完成插入命令,您应该收到类似于以下文本的确认

{
acknowledged: true,
insertedId: ObjectId("627e7e...")
}

通过输入命令exit退出MongoDB shell。

使用以下命令检查Kafka环境的状态

status

在上一个命令的输出中,您应该看到源连接器在收到变更事件后创建的新主题

...
"topic": "Tutorial1.orders",
...

通过运行以下命令确认新Kafka主题上的数据内容

kc Tutorial1.orders

注意

kc命令是一个辅助脚本,用于输出Kafka主题的内容。

运行前面的命令时,您应该看到以下Kafka主题数据,按“键”和“值”部分组织

从输出中的“值”部分,您可以找到包含以下格式化的JSON文档中突出显示的payload部分的fullDocument数据

{
"_id": {
"_data": "8262655A..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650809557,
"i": 2
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "62655a..."
},
"order_id": 1,
"item": "coffee"
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "62655a..."
}
}
}
4

您可以通过将变更流配置为仅返回fullDocument字段来省略由变更流创建的事件中的元数据。

使用以下命令停止连接器

del mongo-simple-source

注意

del命令是一个辅助脚本,它调用Kafka Connect REST API来停止连接器,相当于以下命令

curl -X DELETE connect:8083/connectors/<parameter>

使用以下命令编辑名为simplesource.json的源配置文件

nano simplesource.json

删除现有配置,添加以下配置,并保存文件

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"publish.full.document.only": true,
"database": "Tutorial1",
"collection": "orders"
}
}

在shell中运行以下命令以使用更新的配置文件启动源连接器

cx simplesource.json

使用以下命令通过mongosh连接到MongoDB

mongosh "mongodb://mongo1"

在提示符下,输入以下命令以插入新文档

use Tutorial1
db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )

通过运行以下命令退出mongosh

exit

通过运行以下命令确认新Kafka主题上的数据内容

kc Tutorial1.orders

在“值”文档中,负载字段应仅包含以下文档数据

{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
5

完成本教程后,通过停止或删除Docker资产来释放您计算机上的免费资源。您可以选择删除Docker容器和镜像,或者仅删除容器。如果您删除容器和镜像,您必须重新下载它们以重启MongoDB Kafka Connector开发环境,该环境大小约为2.4 GB。如果您仅删除容器,您可以重用镜像并避免下载样本数据管道中的大部分大型文件。

提示

更多教程

如果您计划完成更多MongoDB Kafka Connector教程,请考虑仅删除容器。如果您不打算完成更多MongoDB Kafka Connector教程,请考虑删除容器和镜像。

选择您要运行的删除任务的标签页。

运行以下shell命令以删除开发环境的Docker容器和镜像

docker-compose -p mongo-kafka down --rmi all

运行以下shell命令以删除Docker容器但保留镜像

docker-compose -p mongo-kafka down

要重启容器,请遵循教程设置中启动它们的相同步骤。

在本教程中,您使用不同的配置启动了源连接器,以更改发布到Kafka主题的更改流事件数据。

阅读以下资源,了解更多关于本教程中提到的概念

  • 源连接器配置属性

  • Kafka Connect REST API

返回

探索 MongoDB Change Streams