文档菜单
文档首页
/
MongoDB Kafka 连接器
/ /

自定义管道以过滤更改事件

本使用示例演示了如何配置一个管道来自定义MongoDB Kafka源连接器所消耗的数据。管道是由对数据库进行筛选或转换数据的指令组成的MongoDB聚合管道。

MongoDB在变更流中通知连接器与您的聚合管道匹配的数据更改。变更流是描述客户端在实时中对MongoDB部署进行的数据更改的一系列事件。有关更多信息,请参阅MongoDB服务器手册中的变更流.

假设您正在协调一个活动,并想收集每个嘉宾在特定活动中的姓名和到达时间。每当嘉宾参加活动时,应用程序都会插入一个包含以下详细信息的文档

{
"_id": ObjectId(...),
"eventId": 321,
"name": "Dorothy Gale",
"arrivalTime": 2021-10-31T20:30:00.245Z
}

您可以将连接器的pipeline设置定义为指示变更流按以下方式筛选变更事件信息

  • 为插入操作创建变更事件,并省略其他所有类型操作的变更事件。

  • 只为与fullDocument.eventId值"321"匹配的文档创建变更事件,并省略所有其他文档。

  • 使用投影省略_ideventId字段从fullDocument对象中。

为了应用这些转换,将以下聚合管道分配给您的pipeline设置

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

重要

确保管道的结果包含payload对象的最顶层_idns字段。MongoDB使用id作为恢复令牌的值,并使用ns生成Kafka输出主题名称。

当应用程序插入示例文档时,您配置的连接器将以下记录发布到您的Kafka主题

{
...
"payload": {
_id: { _data: ... },
"operationType": "insert",
"fullDocument": {
"name": "Dorothy Gale",
"arrivalTime": "2021-10-31T20:30:00.245Z",
},
"ns": { ... },
"documentKey": {
_id: {"$oid": ... }
}
}
}

有关使用源连接器管理变更流的更多信息,请参阅连接器文档中的变更流.

返回

使用示例