开始使用 MongoDB Kafka 溢出连接器
按照此教程学习如何配置 MongoDB Kafka 溢出连接器以从 Apache Kafka 主题中读取数据并将其写入 MongoDB 集合。
开始使用MongoDB Kafka Sink Connector
完成教程设置
完成以下步骤:Kafka Connector教程设置 以启动Confluent Kafka Connect和MongoDB环境。
配置Sink Connector
使用以下命令在教程Docker容器上创建一个交互式shell会话:
docker exec -it mongo1 /bin/bash
创建一个名为simplesink.json
的源配置文件,使用以下命令:
nano simplesink.json
将以下配置信息粘贴到文件中并保存更改
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "Tutorial2.pets", "connection.uri": "mongodb://mongo1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "Tutorial2", "collection": "pets" } }
注意
配置属性中高亮显示的行指定了 转换器,这些转换器指示连接器如何将数据从Kafka转换。
在shell中运行以下命令以使用您创建的配置文件启动sink连接器
cx simplesink.json
注意
cx
命令是教程开发环境中包含的自定义脚本。此脚本向Kafka Connect REST API发送以下等效请求以创建一个新的连接器
curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"
在shell中运行以下命令以检查连接器的状态
status
如果您的sink连接器启动成功,您应该看到以下输出
Kafka topics: ... The status of the connectors: sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector Currently configured connectors [ "mongo-tutorial-sink" ] ...
将数据写入Kafka主题
在同一个shell中,创建一个Python脚本以将数据写入Kafka主题。
nano kafkawrite.py
将以下代码粘贴到文件中并保存更改
from kafka import KafkaProducer import json from json import dumps p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8')) data = {'name': 'roscoe'} p.send('Tutorial2.pets', value = data) p.flush()
运行Python脚本
python3 kafkawrite.py
(可选) 停止Docker容器
完成本教程后,您可以通过停止或删除Docker资源来释放计算机上的空闲资源。您可以选择删除Docker容器和镜像,或仅删除容器。如果您删除容器和镜像,则必须重新下载它们以重启MongoDB Kafka Connector开发环境,该环境大小约为2.4 GB。如果您仅删除容器,则可以重复使用镜像,并避免下载样本数据管道中的大部分大型文件。
提示
更多教程
如果您计划完成更多MongoDB Kafka Connector教程,请考虑仅删除容器。如果您不计划完成更多MongoDB Kafka Connector教程,请考虑删除容器和镜像。
选择要运行的删除任务的标签页。
运行以下shell命令以删除开发环境的Docker容器和镜像
docker-compose -p mongo-kafka down --rmi all
运行以下shell命令以删除Docker容器但保留镜像
docker-compose -p mongo-kafka down
要重启容器,请按照教程设置中启动它们的相同步骤进行。
总结
在本教程中,您配置了一个sink连接器,用于将Kafka主题中的数据保存到MongoDB集群中的集合中。
了解更多
阅读以下资源以了解更多关于本教程中提到的概念