Kafka 连接器快速入门
概述
本指南展示了如何配置 MongoDB Kafka 连接器,以在 MongoDB 和 Apache Kafka 之间发送数据。
完成本指南后,您应该了解如何使用 Kafka Connect REST API 配置 MongoDB Kafka 连接器,以从 MongoDB 读取数据并将其写入 Kafka 主题,以及从 Kafka 主题读取数据并将其写入 MongoDB。
为了完成本指南中的步骤,您必须下载并在一个沙盒中工作,沙盒是一个容器化开发环境,包含构建示例数据管道所需的服务。
阅读以下部分以设置您的沙盒和示例数据管道。
注意
完成本指南后,您可以按照移除沙盒部分的说明来删除环境。
安装所需的软件包
下载并安装以下软件包
沙盒使用Docker以便于一致性和便利性。有关Apache Kafka的部署选项,请参阅以下资源
下载沙盒
我们创建了一个沙盒,其中包含了本教程中构建示例数据管道所需的所有服务。
要下载沙盒,请将教程仓库克隆到您的开发环境中。然后导航到与快速入门教程对应的目录。如果您使用bash或类似的shell,请使用以下命令
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/mongodb-kafka-base/
启动沙盒
沙盒在Docker容器中启动以下服务
MongoDB,配置为副本集
Apache Kafka
安装了MongoDB Kafka Connect的Kafka Connect
管理Apache Kafka配置的Apache Zookeeper
要从教程目录启动沙盒,请运行以下命令
docker compose -p mongo-kafka up -d --force-recreate
当您启动沙盒时,Docker将下载它运行所需的所有镜像。
注意
下载需要多长时间?
本教程的Docker镜像总共需要约2.4GB的空间。以下列表显示了不同互联网速度下下载镜像所需的时间
每秒40兆比特:8分钟
每秒20兆比特:16分钟
每秒10兆比特:32分钟
在Docker下载并构建镜像后,您应该在开发环境中看到以下输出
... Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating connect ... done Creating rest-proxy ... done Creating mongo1 ... done Creating mongo1-setup ... done
注意
端口映射
沙盒将以下服务映射到宿主机的端口
沙盒MongoDB服务器映射到端口
35001
在您的宿主机器上沙盒Kafka Connect JMX服务器映射到端口
35000
在您的宿主机器上
这些端口必须空闲才能启动沙盒。
添加连接器
为了完成示例数据管道,您必须向Kafka Connect添加连接器,以在Kafka Connect和MongoDB之间传输数据。添加一个源连接器以从MongoDB传输数据到Apache Kafka。添加一个目标连接器以从Apache Kafka传输数据到MongoDB。
要在沙盒中添加连接器,首先使用以下命令在你的Docker容器中启动一个交互式bash shell
docker exec -it mongo1 /bin/bash
在您的shell会话开始后,您应该看到以下提示
MongoDB Kafka Connector Sandbox $
添加源连接器
使用Docker容器中的shell添加源连接器,使用Kafka Connect REST API。
以下API请求添加了一个源连接器,配置了以下属性
Kafka Connect用于实例化连接器的类
连接URI、数据库和MongoDB副本集的集合,连接器从中读取数据
一个聚合管道,为连接器从MongoDB读取的插入文档添加一个字段
travel
,其值为"MongoDB Kafka Connector"
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-source", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"sampleData", "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]" } } ' \ http://connect:8083/connectors -w "\n"
注意
为什么我会看到“连接失败”的消息?
Kafka Connect REST API启动可能需要最多三分钟。如果您收到以下错误,请等待三分钟,然后再次运行前面的命令
... curl: (7) Failed to connect to connect port 8083: Connection refused
要确认您已添加源连接器,请运行以下命令
curl -X GET http://connect:8083/connectors
前面的命令应输出正在运行的连接器名称
["mongo-source"]
有关源连接器属性的更多信息,请参阅源连接器配置属性页面.
有关聚合管道的更多信息,请参阅MongoDB手册中的聚合管道
添加一个汇接器
使用 Docker 容器中的 shell,通过 Kafka Connect REST API 添加汇接器。
以下 API 请求添加了一个汇接器,其配置了以下属性
Kafka Connect用于实例化连接器的类
连接 URI、数据库和集合,该集合是 MongoDB 副本集,汇接器将数据写入其中
从其中读取数据的 Apache Kafka 主题
用于 MongoDB 变更事件文档的变更数据捕获处理器
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"topicData", "topics":"quickstart.sampleData", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://connect:8083/connectors -w "\n"
要确认已添加源和汇接器,请运行以下命令
curl -X GET http://connect:8083/connectors
前面的命令应输出正在运行的连接器名称
["mongo-source", "mongo-sink"]
有关汇接器属性的信息,请参阅汇接器配置属性页面。
有关变更数据捕获事件的信息,请参阅变更数据捕获处理器指南。
通过您的连接器发送文档内容
要发送文档内容通过您的连接器,将文档插入到您的源连接器所读取数据的 MongoDB 集合中。
要将新文档插入您的集合,从 Docker 容器中的 shell 进入 MongoDB shell,使用以下命令
mongosh mongodb://mongo1:27017/?replicaSet=rs0
运行上述命令后,您应该会看到以下提示
rs0 [primary] test>
从 MongoDB shell 中,使用以下命令将文档插入到 sampleData
集合和 quickstart
数据库中
use quickstart db.sampleData.insertOne({"hello":"world"})
将文档插入到 sampleData
集合后,请确认您的连接器已处理更改。使用以下命令检查 topicData
集合的内容
db.topicData.find()
您应该会看到类似于以下内容的输出
[ { _id: ObjectId(...), hello: 'world', travel: 'MongoDB Kafka Connector' } ]
使用以下命令退出 MongoDB shell
exit
移除沙盒
为了节省开发环境中的资源,请移除沙盒。
在移除沙盒之前,请通过运行以下命令退出Docker容器中的shell会话:
exit
您可以选择移除Docker容器和镜像,或者仅移除容器。如果您移除了容器和镜像,您需要重新下载它们以重新启动沙盒,沙盒大小约为2.4GB。如果您仅移除容器,您可以重复使用镜像并避免下载样本数据管道中的大部分大文件。
选择要运行的移除任务的对应标签页。
运行以下shell命令以从沙盒中移除Docker容器和镜像:
docker-compose -p mongo-kafka down --rmi all
运行以下shell命令以移除Docker容器但保留沙盒中的镜像:
docker-compose -p mongo-kafka down
下一步操作
要了解如何安装MongoDB Kafka连接器,请参阅安装MongoDB Kafka连接器指南。
要了解如何从Apache Kafka处理和移动数据到MongoDB的更多信息,请参阅目标连接器指南。
要了解如何从MongoDB处理和移动数据到Apache Kafka的更多信息,请参阅源连接器指南。