错误处理和中断恢复属性
本页内容
概述
使用以下配置设置来指定MongoDB Kafka源连接器在遇到错误时的行为,以及指定与恢复中断读取相关的设置。
设置
名称 | 描述 |
---|---|
mongo.errors.tolerance | 类型: 字符串 描述 当连接器遇到错误时,是否继续处理消息。 如果希望连接器在遇到错误时停止处理消息并报告问题,请将其设置为 "none" 如果希望连接器继续处理消息并忽略遇到的任何错误,请将其设置为 "all" 重要: 此属性覆盖了errors.tolerance Connect框架属性。 默认值: "none" 可接受值: "none" 或 "all" |
mongo.errors.log.enable | 类型: 布尔值 描述 连接器是否应在日志文件中报告错误。 将此设置为 true 以记录连接器遇到的全部错误。将此设置为 false 以记录连接器无法容忍的错误。您可以使用 errors.tolerance 或 mongo.errors.tolerance 设置来指定连接器应容忍哪些错误。重要:此属性覆盖了errors.log.enable Connect 框架属性。 默认值: false 可接受值: true 或 false |
mongo.errors.deadletterqueue.topic.name | 重要:您必须将 errors.tolerance 或mongo.errors.tolerance 设置设置为"all" 才能启用此属性。默认值: "" 可接受值: 一个有效的Kafka主题名称 |
offset.partition.name | 类型: 字符串 描述 要使用的自定义偏移量分区名称。您可以使用此选项来指示连接器在现有偏移量包含无效恢复令牌时启动新的更改流。 如果您留空此设置,连接器将使用基于连接细节的默认分区名称。 要查看命名偏移量分区的策略,请参阅重置存储的偏移量. 默认值: "" 可接受值: 一个字符串。有关命名分区的更多信息,请参阅SourceRecord在Apache Kafka API文档中。 |
heartbeat.interval.ms | |
heartbeat.topic.name | 类型: 字符串 描述 连接器应发布心跳消息的主题名称。您必须在 heartbeat.interval.ms 设置中提供正数值以启用此功能。默认值: __mongodb_heartbeats 可接受值: 一个有效的Kafka主题名称 |
带有单个消息转换的心跳
如果您启用了心跳并在Kafka Connect部署中指定了单个消息转换(SMTs),您必须从SMTs中排除您的心跳消息。SMTs是Kafka Connect的一个功能,允许您在不需要部署流处理应用程序的情况下指定对通过源连接器传输的消息的转换。
要排除心跳消息从SMTs中,您必须为SMTs创建并应用一个断言。断言是SMTs的一个功能,允许您在应用转换之前检查消息是否匹配条件语句。
以下配置定义了匹配发送到默认心跳主题的心跳消息的IsHeartbeat
断言
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
以下配置使用前面的断言排除从ExtractField
转换中排除心跳消息
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
如果您没有从前面的转换中排除心跳消息,当连接器处理心跳消息时,它会引发以下错误
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
要了解更多关于SMTs的信息,请参阅Confluent的如何在Kafka Connect中使用单个消息转换。
了解更多关于谓词的信息,请参阅来自 Confluent 的过滤器(Apache Kafka)。
了解更多关于 ExtractField
转换的信息,请参阅来自 Confluent 的ExtractField。
了解更多关于默认键模式的信息,请参阅默认模式页面。