复制现有数据
本使用示例演示了如何使用 MongoDB Kafka 源连接器将数据从 MongoDB 集合复制到 Apache Kafka 主题。
示例
假设您想将 MongoDB 集合复制到 Apache Kafka 并过滤一些数据。
您的需求和解决方案如下
需求 | 解决方案 |
---|---|
复制将您的MongoDB部署中 shopping 数据库的customers 集合复制到Apache Kafka主题。 | 请参阅本指南的复制数据部分。 |
仅复制具有“墨西哥”值在 country 字段的文档。 | 请参阅本指南的过滤数据部分。 |
customers
集合包含以下文档
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
复制数据
通过在源连接器中指定以下配置选项来复制shopping
数据库的customers
集合的内容
database=shopping collection=customers startup.mode=copy_existing
您的源连接器通过创建描述将每个文档插入集合中的变更事件文档来复制您的集合。
注意
数据复制可能会产生重复事件
如果在源连接器将现有数据从数据库转换的同时任何系统更改了数据库中的数据,MongoDB可能会产生重复的变更流事件来反映最新的更改。由于数据复制所依赖的变更流事件是幂等的,复制的数据最终是一致的。
有关变更事件文档的更多信息,请参阅变更流指南。
有关startup.mode
选项的更多信息,请参阅启动属性。
过滤数据
您可以通过在源连接器配置的startup.mode.copy.existing.pipeline
选项中指定聚合管道来过滤数据。以下配置指定了一个聚合管道,它匹配所有在country
字段中具有“墨西哥”的文档
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
要了解更多关于 startup.mode.copy.existing.pipeline
选项的信息,请参阅 启动属性。
要了解更多关于聚合管道的信息,请参阅以下资源
自定义管道以过滤更改事件 使用示例
聚合 在 MongoDB 手册中。
指定配置
您的最终源连接器配置以复制 customers
集合应该如下所示
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
一旦连接器复制了您的数据,您将在 shopping.customers
Apache Kafka 主题中看到与前面的示例集合 对应的以下更改事件文档
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }