接收器连接器后处理器
本页
概述
在本页中,您可以了解如何在 MongoDB Kafka 接入器中配置 后处理器。后处理器在将记录存储到 MongoDB 集合之前,修改从 Kafka 主题读取的记录。后处理器可以进行的某些数据修改示例包括:
设置文档
_id
字段为自定义值包含或排除消息键或值字段
重命名字段
您可以使用接入器中包含的预构建后处理器或实现自己的后处理器。
请参阅以下部分以获取有关后处理器的更多信息
后处理器如何修改数据
后处理器修改从 Kafka 主题读取的数据。接入器将消息存储在包含 Kafka SinkRecord
键和值字段表示的 SinkDocument
类中。接入器按配置中指定的顺序应用任何后处理器,并将结果存储在 MongoDB 集合中。
后处理器执行数据修改任务,例如生成文档_id
字段、投影消息键或值字段以及重命名字段。您可以使用连接器中包含的预构建后处理器,或者通过扩展以下类实现自己的后处理器:PostProcessor。
如何指定后处理器
您可以在post.processor.chain
配置设置中指定一个或多个后处理器,以逗号分隔的列表形式。如果您指定了多个,连接器将按顺序应用它们,其中每个后处理器修改由前一个后处理器生成的数据。
为确保连接器写入MongoDB的文档包含唯一的_id
字段,如果您没有其他包括它,它会自动将DocumentIdAdder
后处理器添加到链的第一个位置。
以下示例设置指定连接器应首先运行KafkaMetaAdder
后处理器,然后运行AllowListValueProjector
后处理器。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
预置后处理程序
以下表格包含连接器中包含的所有后处理程序的列表。
后处理程序名称 | 描述 | |
---|---|---|
DocumentIdAdder | 完整路径
插入由配置策略确定的 _id 字段。默认策略是 BsonOidStrategy 。有关策略选项和配置信息,请参阅配置文档ID添加后处理程序部分。 | |
BlockListKeyProjector | 完整路径
从目标记录中删除匹配的键字段。 有关配置的更多信息,请参阅允许列表和阻止列表示例。 | |
BlockListValueProjector | 完整路径
从目标记录中删除匹配的值字段。 有关配置的更多信息,请参阅允许列表和阻止列表示例。 | |
AllowListKeyProjector | 完整路径
仅包含目标记录中匹配的键字段。 有关配置的更多信息,请参阅允许列表和阻止列表示例。 | |
AllowListValueProjector | 完整路径
仅包含目标记录中匹配的值字段。 有关配置的更多信息,请参阅允许列表和阻止列表示例。 | |
KafkaMetaAdder | 完整路径
向文档添加一个名为 "topic-partition-offset" 的字段,并将值设置为Kafka主题、分区和偏移量的组合。 | |
RenameByMapping | 完整路径
将键或值文档中与指定字段名完全匹配的字段重命名。 有关配置信息,请参阅通过映射重命名示例。 | |
RenameByRegex | 完整路径
将键或值文档中与正则表达式匹配的字段重命名。 有关配置信息,请参阅通过正则表达式重命名示例。 | |
NullFieldValueRemover | 完整路径
从目标记录中删除所有包含 null 值的文档字段。 |
配置文档ID添加器后处理程序
后处理程序 DocumentIdAdder
使用一种 策略 来确定它应该如何格式化 MongoDB 文档中的 _id
字段。策略定义了预设行为,您可以针对您的用例进行自定义。
您可以在 document.id.strategy
设置中指定此后处理程序的策略,如下面的示例所示
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
以下表格显示了您可以用于配置 DocumentIdAdder
后处理程序的可用的策略列表
策略名称 | 描述 | |
---|---|---|
BsonOidStrategy | 完整路径
生成 MongoDB BSON ObjectId。 DocumentIdAdder 后处理程序的默认策略。 | |
KafkaMetaDataStrategy | 完整路径
构建一个由 Kafka 主题、分区和偏移量拼接而成的字符串。 | |
FullKeyStrategy | 完整路径
使用汇出文档的完整键结构来生成 _id 字段的值。如果不存在键,则默认为空白文档。 | |
ProvidedInKeyStrategy | 完整路径
使用汇出文档键结构中指定的 _id 字段。如果汇出文档中缺少字段,则抛出异常。 | |
ProvidedInValueStrategy | 完整路径
使用汇出文档值结构中指定的 _id 字段。如果汇出文档中缺少字段,则抛出异常。 | |
PartialKeyStrategy | 完整路径
使用汇出文档键结构的块列表或允许列表投影。 如果不存在键,则默认为空白文档。 | |
PartialValueStrategy | 完整路径
使用汇出文档值结构的块列表或允许列表投影。 如果不存在值,则默认为空白文档。 | |
UuidProvidedInKeyStrategy | 完整路径
将 _id 键字段转换为 UUID。值必须是字符串或二进制类型,并且必须符合UUID 格式。 | |
UuidProvidedInValueStrategy | 完整路径
将 _id 值字段转换为 UUID。值必须是字符串或二进制类型,并且必须符合UUID 格式。 | |
UuidStrategy | 完整路径
使用随机生成的字符串格式的 UUID。 |
创建自定义文档 ID 策略
如果内置的文档 ID 添加策略不适用于您的用例,您可以通过以下步骤定义自定义文档 ID 策略。
创建一个实现了接口 IdStrategy 的 Java 类,并在其中包含您自定义的配置逻辑。
将类编译成 JAR 文件。
将编译好的 JAR 文件添加到所有 Kafka 工作进程的类路径或插件路径中。有关插件路径的更多信息,请参阅Confluent 文档。
更新所有 Kafka 工作进程中的
document.id.strategy
设置为您的自定义类的全类名。
注意
所选策略可能对交付语义有影响
BSON ObjectId 或 UUID 策略只能保证至少一次的交付,因为连接器在重试或再次处理记录时生成新的 ids。其他策略如果可以保证构成文档 id 的字段是唯一的,则允许精确一次的交付。
例如,关于 IdStrategy
接口的实现,请参阅包含 id 策略实现 的源代码目录,该目录与连接器一起打包。
后处理器示例
本节展示了以下类型后处理器的配置和示例输出
允许列表和阻止列表示例
允许列表 和 阻止列表 投影后处理器确定要包含和排除的输出字段。
当使用 允许列表 投影器时,后处理器仅输出您指定的字段的数据。
当使用 阻止列表 投影器时,后处理器仅省略您指定的字段的数据。
注意
您可以使用 "."(点)表示法来引用记录中的嵌套字段。您还可以使用此表示法来引用数组中文档的字段。
当您将投影器添加到后处理器链时,您必须指定投影器类型以及是否将其应用于接收文档的关键部分或值部分。
请参阅以下部分以获取示例投影器配置和输出。
允许列表投影仪示例
假设您的Kafka记录值文档类似于以下用户资料数据
{ "name": "Sally Kimball", "age": 10, "address": { "city": "Idaville", "country": "USA" }, "hobbies": [ "reading", "solving crime" ] }
您可以通过以下设置配置AllowList
值投影仪,以存储从值文档中选择的数据,例如“name”、“address.city”和“hobbies”字段
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies
在后处理程序应用投影后,它输出以下记录
{ "name": "Sally Kimball", "address": { "city": "Idaville" }, "hobbies": [ "reading", "solving crime" ] }
阻塞列表投影仪示例
假设您的Kafka记录键文档类似于以下用户识别数据
{ "username": "user5983", "registration": { "date": "2021-09-13", "source": "mobile" }, "authToken": { "alg": "HS256", "type": "JWT", "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk" } }
您可以通过以下设置配置BlockList
键投影仪,在存储数据之前省略“authToken”和“registration.source”字段
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source
在后处理程序应用投影后,它输出以下记录
{ "username": "user5983", "registration": { "date": "2021-09-13", } }
投影通配符模式匹配示例
本节演示了如何配置投影仪后处理程序以匹配通配符模式以匹配字段名称。
模式 | 描述 |
* | 匹配当前级别的任何数量的字符。 |
** | 匹配当前级别及其所有嵌套级别的任何字符。 |
本节中关于允许列表和阻塞列表通配符模式匹配的示例,请参考以下包含气象测量的值文档
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" }, "moisture": { "average": 340, "units": "mm" } } }
允许列表通配符示例
您可以使用*
通配符来匹配多个字段名称。以下示例配置匹配以下字段:
名为"city"的顶级字段。
名为"average"的字段,它们是任何以"wind_speed"开头的顶级字段的子文档。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average
在后处理器应用允许列表投影后,它输出以下记录:
{ "city": "Springfield", "wind_speed_10m": { "average": 3, }, "wind_speed_80m": { "average": 8, } }
您可以使用**
通配符,它匹配从您指定通配符的级别开始的所有级别的对象。以下通配符匹配示例投影包含名为"low"的任何文档。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low
应用投影的后处理器输出以下记录:
{ "temperature": { "high": 28, "low": 24, "units": "C" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" } } }
阻止列表通配符示例
您可以使用通配符模式来匹配特定文档级别的字段,如下面的阻止列表配置示例所示:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "moisture": { "average": 340, "units": "mm" } } }
字段重命名示例
本节展示了如何配置 RenameByMapping
和 RenameByRegex
字段重命名后处理器来更新接收器记录中的字段名称。字段重命名设置指定以下内容
是否更新记录中的键或值文档
要更新的字段名称
新的字段名称
您必须在 JSON 数组中指定 RenameByMapping
和 RenameByRegex
设置。您可以使用点表示法或模式匹配来指定嵌套字段。
字段重命名后处理器的示例使用以下示例接收器记录
键文档
{ "location": "Provence", "date_month": "October", "date_day": 17 }
值文档
{ "flapjacks": { "purchased": 598, "size": "large" } }
通过映射重命名示例
RenameByMapping
后处理器设置指定一个或多个 JSON 对象,这些对象将匹配的字符串字段分配给新名称。每个对象包含在 oldName
元素中匹配的文本和 newName
元素中的替换文本,如下表所述。
键名 | 描述 |
---|---|
oldName | 指定是否匹配键或值文档中的字段以及要替换的字段名称。该设置使用 "." 字符分隔两个值。 |
newName | 指定所有字段匹配的替换字段名称。 |
以下示例属性匹配键文档中的 "location" 字段并将其重命名为 "country"。
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
此设置指示 RenameByMapping
后处理器将以下文档转换为原始键文档
{ "country": "Provence", "date_month": "October", "date_day": 17 }
您可以通过在 oldName
字段中指定附加字段名称的值文档来执行类似的字段名称分配,如下所示
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]
此设置指示 RenameByMapping
后处理器将以下文档转换为原始值文档
{ "crepes": { "purchased": 598, "size": "large" } }
您还可以使用字符串格式的 JSON 数组在 field.renamer.mapping
属性中指定一个或多个映射,如下所示设置
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]
使用正则表达式重命名
《RenameByRegex
》后处理器设置指定了应匹配的字段名称和文本模式,以及匹配文本的替换值。您可以在包含以下表格中描述的字段JSON对象中指定一个或多个重命名表达式
键名 | 描述 |
---|---|
正则表达式 | 包含一个正则表达式,用于匹配字段以执行替换。 |
模式 | 包含一个正则表达式,用于匹配要替换的文本。 |
替换 | 包含您在 模式 字段中定义的正则表达式所有匹配项的替换文本。 |
以下示例设置指示后处理器执行以下操作
匹配键文档中以“date”开头的任何字段名称。在匹配字段集中,将所有匹配模式
_
的文本替换为字符-
。匹配值文档中是
crepes
子文档的任何字段。在匹配字段集中,将所有匹配模式purchased
的文本替换为quantity
。
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
当连接器将后处理器应用于示例键文档和示例值文档时,它输出以下内容
键文档
{ "location": "Provence", "date-month": "October", "date-day": 17 }
值文档
{ "crepes": { "quantity": 598, "size": "large" } }
警告
重命名后处理器不会覆盖现有字段名称
您在重命名后处理器中设置的目标字段名称可能会导致同一文档中的字段名称重复。为了避免这种情况,后处理器在会重复同一级别的现有字段名称时跳过重命名。
如何创建自定义后处理器
如果内置的后处理器无法满足您的需求,您可以使用以下步骤创建一个自定义的后处理器类:
创建一个继承自 PostProcessor 抽象类的 Java 类。
在您的类中重写
process()
方法。您可以在方法中更新SinkDocument
,这是下游记录键和值字段的 BSON 表示,并访问原始 KafkaSinkRecord
。将类编译成 JAR 文件。
将编译后的 JAR 添加到所有 Kafka 工作器的类路径/插件路径。有关插件路径的更多信息,请参阅 Confluent 文档中的手动安装社区连接器。
将您的后处理器完整类名添加到后处理器链配置中。
例如,对于后处理器,您可以浏览内置后处理器类的源代码。