文档菜单
文档首页
/
MongoDB Kafka 连接器
/ /

复制现有数据

本使用示例演示了如何使用 MongoDB Kafka 源连接器将数据从 MongoDB 集合复制到 Apache Kafka 主题。

假设您想将 MongoDB 集合复制到 Apache Kafka 并过滤一些数据。

您的需求和解决方案如下

需求
解决方案
复制将您的MongoDB部署中shopping数据库的customers集合复制到Apache Kafka主题。
请参阅本指南的复制数据部分。
仅复制具有“墨西哥”值在country字段的文档。
请参阅本指南的过滤数据部分。

customers集合包含以下文档

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

通过在源连接器中指定以下配置选项来复制shopping数据库的customers集合的内容

database=shopping
collection=customers
startup.mode=copy_existing

您的源连接器通过创建描述将每个文档插入集合中的变更事件文档来复制您的集合。

注意

数据复制可能会产生重复事件

如果在源连接器将现有数据从数据库转换的同时任何系统更改了数据库中的数据,MongoDB可能会产生重复的变更流事件来反映最新的更改。由于数据复制所依赖的变更流事件是幂等的,复制的数据最终是一致的。

有关变更事件文档的更多信息,请参阅变更流指南。

有关startup.mode选项的更多信息,请参阅启动属性

您可以通过在源连接器配置的startup.mode.copy.existing.pipeline选项中指定聚合管道来过滤数据。以下配置指定了一个聚合管道,它匹配所有在country字段中具有“墨西哥”的文档

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

要了解更多关于 startup.mode.copy.existing.pipeline 选项的信息,请参阅 启动属性。

要了解更多关于聚合管道的信息,请参阅以下资源

您的最终源连接器配置以复制 customers 集合应该如下所示

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

一旦连接器复制了您的数据,您将在 shopping.customers Apache Kafka 主题中看到与前面的示例集合 对应的以下更改事件文档

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

注意

将您的主题中的数据写入集合

使用更改数据捕获处理程序将 Apache Kafka 主题中的更改事件文档转换为 MongoDB 写操作。要了解更多信息,请参阅 更改数据捕获处理程序 指南。

返回

主题命名