文档菜单
文档首页
/
MongoDB Kafka 连接器

Kafka 连接器快速入门

本页内容

  • 概述
  • 安装所需软件包
  • 下载沙盒
  • 启动沙盒
  • 添加连接器
  • 添加源连接器
  • 添加目标连接器
  • 通过您的连接器发送文档内容
  • 删除沙盒
  • 下一步

本指南展示了如何配置 MongoDB Kafka 连接器,以在 MongoDB 和 Apache Kafka 之间发送数据。

完成本指南后,您应该了解如何使用 Kafka Connect REST API 配置 MongoDB Kafka 连接器,以从 MongoDB 读取数据并将其写入 Kafka 主题,以及从 Kafka 主题读取数据并将其写入 MongoDB。

为了完成本指南中的步骤,您必须下载并在一个沙盒中工作,沙盒是一个容器化开发环境,包含构建示例数据管道所需的服务。

阅读以下部分以设置您的沙盒和示例数据管道。

注意

完成本指南后,您可以按照移除沙盒部分的说明来删除环境。

下载并安装以下软件包

提示

阅读Docker文档

本指南使用以下Docker特定术语

从Docker官方入门指南了解有关Docker的更多信息。

沙盒使用Docker以便于一致性和便利性。有关Apache Kafka的部署选项,请参阅以下资源

我们创建了一个沙盒,其中包含了本教程中构建示例数据管道所需的所有服务。

要下载沙盒,请将教程仓库克隆到您的开发环境中。然后导航到与快速入门教程对应的目录。如果您使用bash或类似的shell,请使用以下命令

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

沙盒在Docker容器中启动以下服务

  • MongoDB,配置为副本集

  • Apache Kafka

  • 安装了MongoDB Kafka Connect的Kafka Connect

  • 管理Apache Kafka配置的Apache Zookeeper

要从教程目录启动沙盒,请运行以下命令

docker compose -p mongo-kafka up -d --force-recreate

当您启动沙盒时,Docker将下载它运行所需的所有镜像。

注意

下载需要多长时间?

本教程的Docker镜像总共需要约2.4GB的空间。以下列表显示了不同互联网速度下下载镜像所需的时间

  • 每秒40兆比特:8分钟

  • 每秒20兆比特:16分钟

  • 每秒10兆比特:32分钟

在Docker下载并构建镜像后,您应该在开发环境中看到以下输出

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

注意

端口映射

沙盒将以下服务映射到宿主机的端口

  • 沙盒MongoDB服务器映射到端口35001在您的宿主机器上

  • 沙盒Kafka Connect JMX服务器映射到端口35000在您的宿主机器上

这些端口必须空闲才能启动沙盒。

为了完成示例数据管道,您必须向Kafka Connect添加连接器,以在Kafka Connect和MongoDB之间传输数据。添加一个源连接器以从MongoDB传输数据到Apache Kafka。添加一个目标连接器以从Apache Kafka传输数据到MongoDB。

要在沙盒中添加连接器,首先使用以下命令在你的Docker容器中启动一个交互式bash shell

docker exec -it mongo1 /bin/bash

在您的shell会话开始后,您应该看到以下提示

MongoDB Kafka Connector Sandbox $

使用Docker容器中的shell添加源连接器,使用Kafka Connect REST API。

以下API请求添加了一个源连接器,配置了以下属性

  • Kafka Connect用于实例化连接器的类

  • 连接URI、数据库和MongoDB副本集的集合,连接器从中读取数据

  • 一个聚合管道,为连接器从MongoDB读取的插入文档添加一个字段travel,其值为"MongoDB Kafka Connector"

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

注意

为什么我会看到“连接失败”的消息?

Kafka Connect REST API启动可能需要最多三分钟。如果您收到以下错误,请等待三分钟,然后再次运行前面的命令

...
curl: (7) Failed to connect to connect port 8083: Connection refused

要确认您已添加源连接器,请运行以下命令

curl -X GET http://connect:8083/connectors

前面的命令应输出正在运行的连接器名称

["mongo-source"]

有关源连接器属性的更多信息,请参阅源连接器配置属性页面.

有关聚合管道的更多信息,请参阅MongoDB手册中的聚合管道

使用 Docker 容器中的 shell,通过 Kafka Connect REST API 添加汇接器。

以下 API 请求添加了一个汇接器,其配置了以下属性

  • Kafka Connect用于实例化连接器的类

  • 连接 URI、数据库和集合,该集合是 MongoDB 副本集,汇接器将数据写入其中

  • 从其中读取数据的 Apache Kafka 主题

  • 用于 MongoDB 变更事件文档的变更数据捕获处理器

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

要确认已添加源和汇接器,请运行以下命令

curl -X GET http://connect:8083/connectors

前面的命令应输出正在运行的连接器名称

["mongo-source", "mongo-sink"]

有关汇接器属性的信息,请参阅汇接器配置属性页面。

有关变更数据捕获事件的信息,请参阅变更数据捕获处理器指南。

要发送文档内容通过您的连接器,将文档插入到您的源连接器所读取数据的 MongoDB 集合中。

要将新文档插入您的集合,从 Docker 容器中的 shell 进入 MongoDB shell,使用以下命令

mongosh mongodb://mongo1:27017/?replicaSet=rs0

运行上述命令后,您应该会看到以下提示

rs0 [primary] test>

从 MongoDB shell 中,使用以下命令将文档插入到 sampleData 集合和 quickstart 数据库中

use quickstart
db.sampleData.insertOne({"hello":"world"})

将文档插入到 sampleData 集合后,请确认您的连接器已处理更改。使用以下命令检查 topicData 集合的内容

db.topicData.find()

您应该会看到类似于以下内容的输出

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

使用以下命令退出 MongoDB shell

exit

为了节省开发环境中的资源,请移除沙盒。

在移除沙盒之前,请通过运行以下命令退出Docker容器中的shell会话:

exit

您可以选择移除Docker容器和镜像,或者仅移除容器。如果您移除了容器和镜像,您需要重新下载它们以重新启动沙盒,沙盒大小约为2.4GB。如果您仅移除容器,您可以重复使用镜像并避免下载样本数据管道中的大部分大文件。

选择要运行的移除任务的对应标签页。

运行以下shell命令以从沙盒中移除Docker容器和镜像:

docker-compose -p mongo-kafka down --rmi all

运行以下shell命令以移除Docker容器但保留沙盒中的镜像:

docker-compose -p mongo-kafka down

要了解如何安装MongoDB Kafka连接器,请参阅安装MongoDB Kafka连接器指南。

要了解如何从Apache Kafka处理和移动数据到MongoDB的更多信息,请参阅目标连接器指南。

要了解如何从MongoDB处理和移动数据到Apache Kafka的更多信息,请参阅源连接器指南。

返回

新增功能