文档菜单
文档首页
/
MongoDB Kafka 连接器
/

开始使用 MongoDB Kafka 溢出连接器

在本页面上

  • 开始使用 MongoDB Kafka 溢出连接器
  • 摘要
  • 了解更多信息

按照此教程学习如何配置 MongoDB Kafka 溢出连接器以从 Apache Kafka 主题中读取数据并将其写入 MongoDB 集合。

1

完成以下步骤:Kafka Connector教程设置 以启动Confluent Kafka Connect和MongoDB环境。

2

使用以下命令在教程Docker容器上创建一个交互式shell会话:

docker exec -it mongo1 /bin/bash

创建一个名为simplesink.json 的源配置文件,使用以下命令:

nano simplesink.json

将以下配置信息粘贴到文件中并保存更改

{
"name": "mongo-tutorial-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "Tutorial2.pets",
"connection.uri": "mongodb://mongo1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "Tutorial2",
"collection": "pets"
}
}

注意

配置属性中高亮显示的行指定了 转换器,这些转换器指示连接器如何将数据从Kafka转换。

在shell中运行以下命令以使用您创建的配置文件启动sink连接器

cx simplesink.json

注意

cx 命令是教程开发环境中包含的自定义脚本。此脚本向Kafka Connect REST API发送以下等效请求以创建一个新的连接器

curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"

在shell中运行以下命令以检查连接器的状态

status

如果您的sink连接器启动成功,您应该看到以下输出

Kafka topics:
...
The status of the connectors:
sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
Currently configured connectors
[
"mongo-tutorial-sink"
]
...
3

在同一个shell中,创建一个Python脚本以将数据写入Kafka主题。

nano kafkawrite.py

将以下代码粘贴到文件中并保存更改

from kafka import KafkaProducer
import json
from json import dumps
p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8'))
data = {'name': 'roscoe'}
p.send('Tutorial2.pets', value = data)
p.flush()

运行Python脚本

python3 kafkawrite.py
4

在同一个shell中,使用mongosh连接到MongoDB,运行以下命令

mongosh "mongodb://mongo1"

连接成功后,您应该看到以下MongoDB shell提示符

rs0 [direct: primary] test>

在提示符下,键入以下命令以检索MongoDB命名空间 Tutorial2.pets 中的所有文档

use Tutorial2
db.pets.find()

您应该看到以下文档作为结果返回

{ _id: ObjectId("62659..."), name: 'roscoe' }

通过输入命令 exit 退出MongoDB shell。

5

完成本教程后,您可以通过停止或删除Docker资源来释放计算机上的空闲资源。您可以选择删除Docker容器和镜像,或仅删除容器。如果您删除容器和镜像,则必须重新下载它们以重启MongoDB Kafka Connector开发环境,该环境大小约为2.4 GB。如果您仅删除容器,则可以重复使用镜像,并避免下载样本数据管道中的大部分大型文件。

提示

更多教程

如果您计划完成更多MongoDB Kafka Connector教程,请考虑仅删除容器。如果您不计划完成更多MongoDB Kafka Connector教程,请考虑删除容器和镜像。

选择要运行的删除任务的标签页。

运行以下shell命令以删除开发环境的Docker容器和镜像

docker-compose -p mongo-kafka down --rmi all

运行以下shell命令以删除Docker容器但保留镜像

docker-compose -p mongo-kafka down

要重启容器,请按照教程设置中启动它们的相同步骤进行。

在本教程中,您配置了一个sink连接器,用于将Kafka主题中的数据保存到MongoDB集群中的集合中。

阅读以下资源以了解更多关于本教程中提到的概念

  • 下游连接器配置属性

  • Kafka连接器转换器简介

  • Kafka Connect REST API

返回

开始使用 MongoDB Kafka 源连接器