主题命名
概述
本页上的示例展示了如何配置您的MongoDB Kafka源连接器以自定义它发布的记录的主题名称。
默认情况下,MongoDB Kafka源连接器将更改事件数据发布到与MongoDB 命名空间相同的Kafka主题,该命名空间是从更改事件中起源的。命名空间是由点(.
)字符连接的数据库和集合名称的字符串。
以下部分展示了您可以为连接器发布更改事件数据到Kafka主题的不同方式
主题前缀
您可以将源连接器配置为在更改事件数据的命名空间前添加一个字符串,并将记录发布到该Kafka主题。此设置会自动将您的前缀与命名空间连接起来,并用点(".")字符连接。
要指定主题前缀,请使用以下示例中的topic.prefix
配置设置
topic.prefix=myPrefix database=test collection=data
设置后,您的连接器将任何对data
集合的更改发布到名为myPrefix.test.data
的Kafka主题。
主题后缀
您可以将源连接器配置为将字符串附加到更改事件数据的命名空间中,并将记录发布到该Kafka主题。此设置会自动使用“.”字符将您的命名空间与后缀连接起来。
要指定主题后缀,请使用以下示例中所示的topic.suffix
配置设置
topic.suffix=mySuffix database=test collection=data
设置后,您的连接器将发布对test
数据库中data
集合的任何更改到名为test.data.mySuffix
的Kafka主题。
主题命名空间映射
您可以将源连接器配置为将命名空间值映射到传入更改事件数据的Kafka主题名称。主题命名空间映射包含由命名空间模式和目标主题名称模板组成的对。
以下各节描述了连接器如何解释命名空间并将它们映射到Kafka主题。除了直接将数据库和集合映射到Kafka主题外,连接器还支持在主题命名空间映射中使用正则表达式和通配符对。
您命名空间映射中的对顺序会影响连接器将事件写入主题的方式。连接器按以下顺序匹配命名空间
具有数据库和集合名称的命名空间模式中的对。有关此命名空间模式的更多信息,请参阅数据库和集合名称示例。
具有数据库名称的命名空间模式中的对。有关此命名空间模式的更多信息,请参阅数据库和集合名称示例。
正则表达式对按顺序。有关此命名空间模式的更多信息,请参阅正则表达式示例。
通配符对。有关此命名空间模式的更多信息,请参阅通配符示例。
数据库和集合名称
您可以在主题命名空间映射中指定特定数据库和集合的名称,以将来自这些源的变化事件写入Kafka主题。
如果变化事件的数据库名称或命名空间与映射中的任何字段匹配,连接器将根据相应的映射主题名称模板计算目标主题名称,并将事件发布到该主题。
如果变化事件的数据库名称或命名空间与任何映射不匹配,连接器将使用默认的主题命名方案发布记录,除非有其他主题命名设置。
重要
因为 /
字符表示命名空间是正则表达式模式,如果命名空间包含此字符且不在正则表达式上下文中,连接器将引发 ConnectConfigException
。
包含数据库和集合的任何映射都比仅指定源数据库名称的映射具有优先权。
重要
在连接器应用其他主题命名设置之前,先进行命名空间映射。如果已定义,连接器在映射后将应用 topic.prefix
和 topic.suffix
设置。
以下示例显示如何指定 topic.namespace.map
设置,以定义从 carDb
数据库到 automobiles
主题名称模板的映射,以及从 carDb.ev
命名空间到 electricVehicles
主题名称模板的映射
topic.namespace.map={"carDb": "automobiles", "carDb.ev": "electricVehicles"}
由于 carDb.ev
命名空间映射比 carDb
映射具有优先权,因此连接器执行以下操作
如果变化事件来自数据库
carDb
和集合ev
,连接器将目标设置为electricVehicles
主题。如果变化事件来自数据库
carDb
且不是ev
的集合,连接器将目标设置为automobiles.<collection name>
主题。如果变化文档来自除
carDb
之外的任何数据库,连接器将目标主题设置为默认命名空间命名方案。如果您定义了
topic.prefix
和topic.suffix
设置,连接器在执行命名空间映射后将应用它们的值。
正则表达式
您可以在主题命名空间映射中使用正则表达式(regex)。要使用正则表达式,请以正斜杠字符(/
)开始命名空间模式。根据java.util.regex.Pattern
类指定的语法和行为创建正则表达式。
连接器通过在主题名称模板上执行变量扩展来计算主题名称。连接器支持以下变量:
db
:匹配命名空间中的数据库名称。sep
:topic.separator
配置属性的值。有关此属性的更多信息,请参阅Kafka 主题属性.coll
:匹配命名空间中的集合名称,如果没有集合名称则为空字符串。sep_coll
:sep
变量值前面加上coll
变量值,如果coll
的值为空,则为空字符串。coll_sep
:coll
变量值后面加上sep
变量值,如果coll
的值为空,则为空字符串。sep_coll_sep
:coll
变量值前面和后面都加上sep
变量值,如果coll
的值为空,则为空字符串。
如果您在主题名称模板中使用任何上述变量,则必须将它们放在花括号({}
)内,以便连接器进行扩展。您不能在主题名称模板中用任何其他目的使用花括号。
注意
转义字符
在创建命名空间模式时,请确保正确处理需要转义的字符。例如,为了匹配.
,正则表达式语法要求您将其转义为\.
然而,根据JSON语法,您必须将反斜杠\
字符转义为\\
。然后,为了在命名空间模式中匹配.
,您必须将其写为\\.
。
以下示例展示了如何指定topic.namespace.map
设置,以便连接器执行以下映射
将匹配正则表达式的数据库事件写入由
industrial{sep_coll}
主题名称模板计算出的主题。正则表达式模式匹配具有vertical
数据库名称的任何命名空间。将
vertical.health
命名空间的事件写入healthcare
主题名称。在这种情况下,主题名称模板和计算出的主题名称是相同的。
topic.namespace.map={"/vertical(?:\\..*)?": "industrial{sep_coll}", "vertical.health": "healthcare"}
注意
在本例中,topic.separator
配置属性是 "."
,默认值。
由于 vertical.health
命名空间映射优先于正则表达式命名空间映射,因此连接器执行以下操作
如果更改事件来自
vertical
数据库的health
集合,连接器将目标设置为healthcare
主题。如果更改事件来自具有
vertical
数据库名称的任何其他命名空间,连接器将根据industrial{sep_coll}
主题名称模板计算目标主题。以下示例演示了此映射如果更改事件来自
vertical.wasteManagement
命名空间,连接器将写入industrial.wasteManagement
主题。如果更改事件来自
vertical
数据库但没有特定集合,连接器将写入industrial
主题。
如果更改文档来自任何不匹配正则表达式模式的数据库,连接器将目标主题设置为默认命名空间命名方案。
如果您定义了
topic.prefix
和topic.suffix
设置,连接器在执行命名空间映射后将应用它们的值。
通配符
除了在主题命名空间映射中指定数据库名称和命名空间,如数据库和集合名称中所示外,您还可以使用通配符 *
匹配来自所有数据库和未映射命名空间的更改事件。
topic.namespace.map={"carDb": "automobiles", "carDb.ev": "electricVehicles", "*": "otherVehicles"}
在先前的通配符示例中,连接器将来自所有数据库(除 carDb
之外)的更改文档发布到 otherVehicles
主题。