文档菜单
文档首页
/
MongoDB Kafka 连接器
/ /

错误处理和中断恢复属性

本页内容

  • 概述
  • 设置
  • 带有单个消息转换的心跳

使用以下配置设置来指定MongoDB Kafka源连接器在遇到错误时的行为,以及指定与恢复中断读取相关的设置。

名称
描述
mongo.errors.tolerance
类型: 字符串

描述
当连接器遇到错误时,是否继续处理消息。

如果希望连接器在遇到错误时停止处理消息并报告问题,请将其设置为"none"

如果希望连接器继续处理消息并忽略遇到的任何错误,请将其设置为 "all"

重要: 此属性覆盖了errors.tolerance Connect框架属性。

默认值: "none"
可接受值: "none""all"
mongo.errors.log.enable
类型: 布尔值

描述
连接器是否应在日志文件中报告错误。

将此设置为 true 以记录连接器遇到的全部错误。

将此设置为 false 以记录连接器无法容忍的错误。您可以使用 errors.tolerancemongo.errors.tolerance 设置来指定连接器应容忍哪些错误。

重要:此属性覆盖了errors.log.enable Connect 框架属性。

默认值: false
可接受值: truefalse
mongo.errors.deadletterqueue.topic.name
类型: 字符串

描述
使用作为死信队列的topic名称。

如果您指定了一个值,连接器将无效消息作为扩展JSON字符串写入死信队列主题。

如果您留空此设置,连接器不会将无效消息写入任何主题。
重要:您必须将errors.tolerancemongo.errors.tolerance设置设置为"all"才能启用此属性。

默认值: ""
可接受值: 一个有效的Kafka主题名称
offset.partition.name
类型: 字符串

描述
要使用的自定义偏移量分区名称。您可以使用此选项来指示连接器在现有偏移量包含无效恢复令牌时启动新的更改流。

如果您留空此设置,连接器将使用基于连接细节的默认分区名称。

要查看命名偏移量分区的策略,请参阅重置存储的偏移量.

默认值: ""
可接受值: 一个字符串。有关命名分区的更多信息,请参阅SourceRecord在Apache Kafka API文档中。
heartbeat.interval.ms
类型: long

描述
连接器在发送心跳消息之间等待的毫秒数。当源记录在指定间隔内未发布时,连接器会发送心跳消息。此机制提高了连接器在低流量命名空间中的可恢复性。

心跳消息包含一个postBatchResumeToken数据字段。此字段的值包含连接器从更改流中最后读取的MongoDB服务器oplog条目。

将其设置为0以禁用心跳消息。

要了解更多信息,请参阅预防措施,在无效的恢复令牌页面。

默认值0
接受值:一个整数
heartbeat.topic.name
类型: 字符串

描述
连接器应发布心跳消息的主题名称。您必须在heartbeat.interval.ms设置中提供正数值以启用此功能。

默认值__mongodb_heartbeats
可接受值: 一个有效的Kafka主题名称

如果您启用了心跳并在Kafka Connect部署中指定了单个消息转换(SMTs),您必须从SMTs中排除您的心跳消息。SMTs是Kafka Connect的一个功能,允许您在不需要部署流处理应用程序的情况下指定对通过源连接器传输的消息的转换。

要排除心跳消息从SMTs中,您必须为SMTs创建并应用一个断言。断言是SMTs的一个功能,允许您在应用转换之前检查消息是否匹配条件语句。

以下配置定义了匹配发送到默认心跳主题的心跳消息的IsHeartbeat断言

predicates=IsHeartbeat
predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsHeartbeat.pattern=__mongodb_heartbeats

以下配置使用前面的断言排除从ExtractField转换中排除心跳消息

transforms=Extract
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=<the field to extract from your Apache Kafka key>
transforms.Extract.predicate=IsHeartbeat
transforms.Extract.negate=true
# apply the default key schema as the extract transformation requires a struct object
output.format.key=schema

如果您没有从前面的转换中排除心跳消息,当连接器处理心跳消息时,它会引发以下错误

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ...
...
Only Struct objects supported for [field extraction], found: java.lang.String

要了解更多关于SMTs的信息,请参阅Confluent的如何在Kafka Connect中使用单个消息转换

了解更多关于谓词的信息,请参阅来自 Confluent 的过滤器(Apache Kafka)

了解更多关于 ExtractField 转换的信息,请参阅来自 Confluent 的ExtractField

了解更多关于默认键模式的信息,请参阅默认模式页面。

返回

启动