自定义管道以过滤更改事件
本使用示例演示了如何配置一个管道来自定义MongoDB Kafka源连接器所消耗的数据。管道是由对数据库进行筛选或转换数据的指令组成的MongoDB聚合管道。
MongoDB在变更流中通知连接器与您的聚合管道匹配的数据更改。变更流是描述客户端在实时中对MongoDB部署进行的数据更改的一系列事件。有关更多信息,请参阅MongoDB服务器手册中的变更流.
示例
假设您正在协调一个活动,并想收集每个嘉宾在特定活动中的姓名和到达时间。每当嘉宾参加活动时,应用程序都会插入一个包含以下详细信息的文档
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
您可以将连接器的pipeline
设置定义为指示变更流按以下方式筛选变更事件信息
为插入操作创建变更事件,并省略其他所有类型操作的变更事件。
只为与
fullDocument.eventId
值"321"匹配的文档创建变更事件,并省略所有其他文档。使用投影省略
_id
和eventId
字段从fullDocument
对象中。
为了应用这些转换,将以下聚合管道分配给您的pipeline
设置
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
重要
确保管道的结果包含payload
对象的最顶层_id
和ns
字段。MongoDB使用id
作为恢复令牌的值,并使用ns
生成Kafka输出主题名称。
当应用程序插入示例文档时,您配置的连接器将以下记录发布到您的Kafka主题
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
有关使用源连接器管理变更流的更多信息,请参阅连接器文档中的变更流.