文档菜单
文档首页
/
MongoDB Kafka 连接器
/ /

所有源连接器配置属性

本页内容

  • 概述
  • MongoDB 连接
  • Kafka 主题
  • 变更流
  • 输出格式
  • 启动
  • 错误处理和从中断恢复

在本页中,您可以查看您的 MongoDB Kafka 源连接器的所有可用配置属性。本页复制了其他源连接器配置属性页面上的内容。

要查看所有源连接器配置属性页面的列表,请参阅源连接器配置属性页面。

使用以下配置设置来指定您的 MongoDB Kafka 源连接器如何建立连接并与您的 MongoDB 集群进行通信。

要仅查看与您的 MongoDB 连接相关的选项,请参阅 MongoDB 源连接器属性页面。

名称
描述
connection.uri
必需

类型:字符串

描述
连接到您的 MongoDB 实例或集群的URI 连接字符串

有关更多信息,请参阅 连接到 MongoDB

重要:为避免在您的配置connection.uri,使用一个ConfigProvider并设置相应的配置参数。

默认值: mongodb://localhost:27017,localhost:27018,localhost:27019
接受值: MongoDB URI 连接字符串
数据库
类型:字符串

描述
要监视更改的数据库名称。如果未设置,连接器将监视所有数据库的更改。

默认值: ""
接受值: 单个数据库名称
集合
类型:字符串

描述
要监视更改的数据库中的集合名称。如果未设置,连接器将监视所有集合的更改。

重要:如果您的数据库配置设置为"",连接器将忽略集合设置。

默认值: ""
接受值: 单个集合名称
server.api.version
类型:字符串

描述
您希望与您的 MongoDB 集群一起使用的稳定 API 版本。有关稳定 API 和支持它的 MongoDB 服务器版本的更多信息,请参阅稳定 API指南。

默认值: ""
接受值: 空字符串或有效的稳定 API 版本。
server.api.deprecationErrors
类型:布尔值

描述
当设置为true时,如果连接器在您的 MongoDB 实例上调用了一个在声明的稳定 API 版本中已弃用的命令,它将引发异常。

您可以使用server.api.version配置选项设置 API 版本。有关稳定 API 的更多信息,请参阅 MongoDB 手册中的稳定 API条目。

默认值: false
接受值: truefalse
server.api.strict
类型:布尔值

描述
当设置为true时,如果连接器在您的 MongoDB 实例上调用了一个在声明的稳定 API 版本中没有涵盖的命令,它将引发异常。

您可以使用server.api.version配置选项设置 API 版本。有关稳定 API 的更多信息,请参阅 MongoDB 手册中的稳定 API条目。

默认值: false
接受值: truefalse

使用以下配置设置来指定 MongoDB Kafka 源连接器应将数据发布到哪些 Kafka 主题。

要查看与您的 Kafka 主题相关的选项,请参阅Kafka 主题属性页面。

名称
描述
topic.prefix
类型:字符串

描述
指定连接器发布更改流事件的目的 Kafka 主题名称的前一部分。目标主题名称由 topic.prefix 值、数据库名称和集合名称组成,这些名称由 topic.separator 属性指定的值分隔。

有关更多信息,请参阅主题命名前缀示例

默认值: ""
可接受值:由 ASCII 字符组成的字符串,包括 "."、"-" 和 "_"
topic.suffix
类型:字符串

描述
指定连接器发布更改流事件的目的 Kafka 主题名称的后一部分。目标主题名称由数据库名称、集合名称和 topic.suffix 值组成,这些名称由 topic.separator 属性指定的值分隔。

有关更多信息,请参阅主题命名后缀示例

默认值: ""
可接受值:由 ASCII 字符组成的字符串,包括 "."、"-" 和 "_"
topic.namespace.map
类型:字符串

描述
指定更改流文档 命名空间 和主题名称之间的 JSON 映射。

您可以使用 topic.namespace.map 属性来指定复杂的映射。此属性支持正则表达式和通配符匹配。

有关这些行为的信息和示例,请参阅主题命名空间映射

默认值: ""
可接受值:有效的 JSON 对象
topic.separator
类型:字符串

描述
指定连接器用于连接创建主题名称的值的字符串。连接器将记录发布到名为以下字段的值的主题,按照以下顺序连接
  1. topic.prefix

  2. 数据库

  3. 集合

  4. topic.suffix

例如,以下配置指示连接器将 coll 集合的 db 数据库中的更改流文档发布到 prefix-db-coll 主题
topic.prefix=prefix
database=db
collection=coll
topic.separator=-
重要: 当您使用 topic.separator 属性时,请注意它不会影响您定义 topic.namespace.map 属性的方式。 topic.namespace.map 属性使用 MongoDB 命名空间,您必须始终使用一个 . 字符来分隔数据库和集合名称。

默认值: "."
接受值: 一个字符串
topic.mapper
类型:字符串

描述
定义您自定义主题映射逻辑的 Java 类。

默认值: com.mongodb.kafka.connect.source.topic.mapping.DefaultTopicMapper
接受值: 一个实现了 TopicMapper 类的有效完整类名。

使用以下配置设置来指定更改流的聚合管道和更改流游标的读取偏好。

要查看与更改流相关的选项,请参阅更改流属性页面。

名称
描述
pipeline
类型:字符串

描述
要运行在更改流中的聚合管道数组。您必须为更改流事件文档配置此设置,而不是 fullDocument 字段。

例如
[{"$match": { "$and": [{"operationType": "insert"}, {"fullDocument.eventId": 1404 }] } }]
有关更多示例,请参阅
默认值: "[]"
接受值: 有效的聚合管道阶段
change.stream.full.document
类型:字符串

描述
确定更改流在更新操作中返回哪些值。

默认设置返回原始文档和更新文档之间的差异。

updateLookup 设置返回原始文档和更新文档之间的差异以及更新后 某个时间点 的整个更新文档的副本。

whenAvailable 设置在可用的情况下返回更新文档。

必需设置返回更新后的文档,如果不可用则引发错误。

有关此更改流选项如何工作的更多信息,请参阅MongoDB手册中的更新操作的全文档查找

默认值: ""
接受值"""updateLookup""whenAvailable""required"
change.stream.full.document.before.change
类型:字符串

描述
配置更改流在更新操作返回的文档的前镜像。在复制现有数据时发布的数据源记录的前镜像不可用,并且前镜像配置对复制没有影响。

要了解如何配置集合以启用前镜像,请参阅MongoDB手册中的带有文档前/后镜像的更改流

默认设置会抑制文档前镜像。

当设置为whenAvailable时,如果可用,则返回文档前镜像,在它被替换、更新或删除之前。

当设置为required时,如果不可用,则返回文档前镜像并引发错误。

默认值: ""
接受值"""whenAvailable""required"
publish.full.document.only
类型:布尔值

描述
是否只从任何更新事件生成的更改流事件文档中返回fullDocument字段。该fullDocument字段包含更新文档的最新版本。有关fullDocument字段的更多信息,请参阅服务器手册中的更新事件

当设置为true时,连接器覆盖change.stream.full.document设置并将其设置为updateLookup,以便fullDocument字段包含更新文档。

默认值: false
接受值: truefalse
publish.full.document.only.tombstone.on.delete
类型:布尔值

描述
是否在删除文档时返回墓碑事件。墓碑事件包含已删除文档的键,其值为null。此设置仅当publish.full.document.only设置为true时适用。

默认值: false
接受值: truefalse
change.stream.document.key.as.key
类型:布尔值

描述
如果存在文档键,是否使用文档键作为源记录键。

当设置为true时,连接器将已删除文档的键添加到墓碑事件中。当设置为false时,连接器使用恢复令牌作为墓碑事件的源键。

默认值true
接受值: truefalse
collation
类型:字符串

描述
一个JSON校对文档,指定MongoDB应用于更改流返回的文档的语言特定排序规则。

默认值: ""
接受值:有效的校对JSON文档
batch.size
类型: int

描述
更改流游标批量大小。

默认值0
接受值:一个整数
poll.await.time.ms
类型:long

描述
服务器等待新数据更改报告给更改流游标,然后返回空批次的最大毫秒数。

默认值5000
接受值:一个整数
poll.max.batch.size
类型: int

描述
在轮询更改流游标以获取新数据时,一次读取的文档数最大值。您可以使用此设置来限制连接器内部缓冲的数据量。

默认值1000
接受值:一个整数

使用以下配置设置来指定MongoDB Kafka源连接器发布到Kafka主题的数据格式。

要查看与输出格式相关的选项,请参阅输出格式属性页面。

名称
描述
output.format.key
类型:字符串

描述
指定源连接器输出键文档的数据格式。

默认值json
接受值bsonjsonschema
output.format.value
类型:字符串

描述
指定源连接器输出值文档的数据格式。

连接器支持Protobuf作为输出数据格式。您可以通过指定schema值并安装和Kafka Connect Protobuf Converter来启用此格式。

默认值json
接受值bsonjsonschema
output.json.formatter
类型:字符串

描述
连接器输出数据应使用的JSON格式化程序类名。

默认:
com.mongodb.kafka.connect.source.json.formatter.DefaultJson
接受值:
您的自定义JSON格式化程序完整类名或以下内置格式化程序类名之一
com.mongodb.kafka.connect.source.json.formatter.DefaultJson
com.mongodb.kafka.connect.source.json.formatter.ExtendedJson
com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
有关这些输出格式的更多信息,请参阅JSON格式化程序
output.schema.key
类型:字符串

描述
指定了 SourceRecord 的 Avro 模式定义。

要了解更多关于 Avro 模式的信息,请参阅 Avro数据格式指南

默认:
{
"type": "record",
"name": "keySchema",
"fields" : [ { "name": "_id", "type": "string" } ]"
}
接受值:一个有效的 Avro 模式
output.schema.value
类型:字符串

描述
指定了 SourceRecord 的值文档的 Avro 模式定义。

要了解更多关于 Avro 模式的信息,请参阅 Avro数据格式指南

默认:
{
"name": "ChangeStream",
"type": "record",
"fields": [
{ "name": "_id", "type": "string" },
{ "name": "operationType", "type": ["string", "null"] },
{ "name": "fullDocument", "type": ["string", "null"] },
{ "name": "ns",
"type": [{"name": "ns", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "to",
"type": [{"name": "to", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "documentKey", "type": ["string", "null"] },
{ "name": "updateDescription",
"type": [{"name": "updateDescription", "type": "record", "fields": [
{"name": "updatedFields", "type": ["string", "null"]},
{"name": "removedFields",
"type": [{"type": "array", "items": "string"}, "null"]
}] }, "null"] },
{ "name": "clusterTime", "type": ["string", "null"] },
{ "name": "txnNumber", "type": ["long", "null"]},
{ "name": "lsid", "type": [{"name": "lsid", "type": "record",
"fields": [ {"name": "id", "type": "string"},
{"name": "uid", "type": "string"}] }, "null"] }
]
}
接受值:一个有效的 JSON 模式
output.schema.infer.value
类型:布尔值

描述
连接器是否应该推断 SourceRecord 的值文档的模式。由于连接器独立处理每个文档,连接器可能会生成许多模式。

重要: 连接器仅在您将 output.format.value 设置为 schema 时读取此设置。

默认值: false
接受值: truefalse

使用以下配置设置来配置 MongoDB Kafka 源连接器的启动,以便将 MongoDB 集合转换为 Change Stream 事件。

要查看与启动相关的选项,请参阅 启动属性 页面。

名称
描述
startup.mode
类型:字符串

描述
指定连接器在没有源偏移量可用时如何启动。恢复更改流需要恢复令牌,该令牌由连接器从源偏移量获取。如果没有源偏移量,连接器可能忽略所有或部分现有源数据,或者首先复制所有现有源数据,然后继续处理新数据。

如果 startup.mode=latest,连接器忽略所有现有源数据。

如果 startup.mode=timestamp,连接器激活 startup.mode.timestamp.* 属性。如果没有配置属性,则 timestamplatest 相同。

如果 startup.mode=copy_existing,连接器将所有现有源数据复制到 Change Stream 事件。此设置等同于已弃用的设置 copy.existing=true

如果在源连接器将现有数据从中转换为现有数据的同时,任何系统更改数据库中的数据,MongoDB可能会产生重复的更改流事件来反映最新的更改。由于依赖于数据副本的更改流事件是幂等的,因此复制的数据最终是一致的。

默认值latest
接受值latesttimestampcopy_existing
startup.mode.timestamp.start.at.operation.time
类型:字符串

描述
仅在startup.mode=timestamp时触发。指定更改流的起始点。

有关更改流参数的更多信息,请参阅MongoDB手册中的 $changeStream (聚合)

默认值: ""
接受值:
  • 自纪元以来的整数秒数,以十进制格式表示(例如,30

  • ISO-8601格式的一秒精度瞬间(例如,1970-01-01T00:00:30Z

  • 以规范扩展JSON(v2)格式表示的BSON时间戳(例如,{"$timestamp": {"t": 30, "i": 0}}

startup.mode.copy.existing.namespace.regex
类型:字符串

描述
连接器使用的正则表达式,用于匹配从中复制数据的命名空间。命名空间描述了由点分隔的MongoDB数据库名和集合(例如,databaseName.collectionName)。

例如,以下正则表达式设置匹配在stats数据库中以"page"开头的集合
startup.mode.copy.existing.namespace.regex=stats\.page.*
示例中的\字符转义了正则表达式中的下一个.字符。有关如何构建正则表达式的信息,请参阅Java API文档中的模式

默认值: ""
接受值:有效的正则表达式
startup.mode.copy.existing.pipeline
类型:字符串

描述
连接器在复制现有数据时运行的管道操作的内联数组。您可以使用此设置来过滤源集合并提高复制过程中的索引使用率。

例如,以下设置使用$match聚合运算符来指示连接器仅复制包含值为falseclosed字段的文档。
startup.mode.copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
默认值: ""
接受值:有效的聚合管道阶段
startup.mode.copy.existing.max.threads
类型: int

描述
连接器可以用于复制数据的最多的线程数。

默认值:环境中可用的处理器数量
接受值:一个整数
startup.mode.copy.existing.queue.size
类型: int

描述
连接器在复制数据时可以使用的队列大小。

默认值: 16000
接受值:一个整数
startup.mode.copy.existing.allow.disk.use
类型:布尔值

描述
当设置为 true 时,连接器使用临时磁盘存储来复制现有的聚合。

默认值true
接受值: truefalse

使用以下配置设置来指定连接器在遇到错误时的行为,以及指定与恢复中断读取相关的设置。

要查看与处理错误相关的选项,请参阅错误处理和从中断恢复属性页面。

名称
描述
mongo.errors.tolerance
类型:字符串

描述
连接器遇到错误时是否继续处理消息。

如果您想连接器在遇到错误时停止处理消息并报告问题,请将其设置为 "none"

如果您想连接器继续处理消息并忽略遇到的任何错误,请将其设置为 "all"

重要:此属性覆盖了 errors.tolerance Connect 框架属性。

默认值: "none"
接受值: "none""all"
mongo.errors.log.enable
类型:布尔值

描述
连接器是否应在日志文件中报告错误。

将此设置为 true 以记录连接器遇到的全部错误。

将此设置为 false 以记录连接器无法容忍的错误。您可以使用 errors.tolerancemongo.errors.tolerance 设置来指定连接器应容忍哪些错误。

重要:此属性覆盖了errors.log.enable Connect 框架属性。

默认值: false
接受值: truefalse
mongo.errors.deadletterqueue.topic.name
类型:字符串

描述
用作死信队列的主题名称。

如果您指定了值,连接器将无效消息写入死信队列主题,作为扩展 JSON 字符串

如果您留空此设置,连接器不会将无效消息写入任何主题。
重要:您必须将 errors.tolerancemongo.errors.tolerance 设置为 "all" 以启用此属性。

默认值: ""
可接受值:有效的 Kafka 主题名称
offset.partition.name
类型:字符串

描述
要使用的自定义偏移分区名称。您可以使用此选项指示连接器在现有偏移包含无效恢复令牌时启动新的更改流。

如果您留空此设置,连接器将使用基于连接详情的默认分区名称。

要查看命名偏移分区的策略,请参阅重置存储偏移量

默认值: ""
可接受值:字符串。要了解更多关于命名分区的信息,请参阅SourceRecord 在 Apache Kafka API 文档中。
heartbeat.interval.ms
类型:long

描述
连接器发送心跳消息之间的毫秒数。当源记录在指定间隔内未发布时,连接器会发送心跳消息。此机制提高了连接器在低流量命名空间中的可恢复性。

心跳消息包含一个postBatchResumeToken数据字段。该字段的值包含连接器从变更流中最后读取的MongoDB服务器oplog条目。

将其设置为0以禁用心跳消息。

了解更多信息,请参阅预防措施,位于无效恢复令牌页面。

默认值0
接受值:一个整数
heartbeat.topic.name
类型:字符串

描述
连接器应发布心跳消息的主题名称。您必须在heartbeat.interval.ms设置中提供正值以启用此功能。

默认值: __mongodb_heartbeats
可接受值:有效的 Kafka 主题名称

返回

错误处理和从中断恢复

© . All rights reserved.