文档菜单
文档首页
/
MongoDB Kafka 连接器
/ /

连接器错误处理属性

本页

  • 概述
  • 设置
  • 批量写入异常
  • 死信队列配置示例

使用以下配置设置来指定 MongoDB Kafka 目标连接器如何处理错误,并配置死信队列。

有关按类别组织的连接器配置设置列表,请参阅目标连接器配置属性指南.

名称
描述
mongo.errors.tolerance
类型:字符串

描述
当连接器遇到错误时,是否继续处理消息。允许连接器覆盖Kafka集群的errors.tolerance设置。

当设置为none时,连接器报告任何错误并阻止进一步处理其余消息。

当设置为all时,连接器忽略任何有问题的消息。

当设置为data时,连接器仅容忍数据错误,对其他所有错误失败。

有关错误处理策略的更多信息,请参阅处理错误页面。

此属性覆盖了errors.tolerance
连接框架的属性。

默认:errors.tolerance设置继承值。
接受值"none""all"
mongo.errors.log.enable
类型:布尔值

描述
连接器是否应将包括失败操作的错误详情写入日志文件。连接器使用errors.tolerancemongo.errors.tolerance设置将错误分类为“可容忍”或“不可容忍”。

当设置为true时,连接器记录“可容忍”和“不可容忍”错误。
当设置为false时,连接器记录“不可容忍”错误。

此属性覆盖了errors.log.enable
连接框架的属性。

默认: false
可接受值: truefalse
errors.log.include.messages
类型:布尔值

描述
是否在记录错误时将连接器包括无效消息。无效消息包括记录键、值和标题等数据。

默认: false
可接受值: truefalse
errors.deadletterqueue.topic.name
类型:字符串

描述
用作死信队列的主题名称。如果为空,连接器不会将任何无效消息发送到死信队列。

有关死信队列的更多信息,请参阅死信队列配置示例

默认: ""
可接受值: 一个有效的Kafka主题名称
errors.deadletterqueue.context.headers.enable
类型:布尔值

描述
连接器在向死信队列写入消息时是否应包括上下文标题。

有关死信队列的更多信息,请参阅死信队列配置示例

有关连接器通过上下文标题定义和报告的异常的更多信息,请参阅批量写入异常

默认: false
可接受值: truefalse
errors.deadletterqueue.topic.replication.factor
类型: 整数

描述
在哪些节点上复制死信队列主题的节点数。如果您正在运行单节点Kafka集群,您必须将其设置为1

有关死信队列的更多信息,请参阅死信队列配置示例

默认: 3
可接受值: 一个有效的节点数

连接器在执行批量写入时可以将以下异常报告给死信队列作为上下文标题

名称
描述
WriteException
描述
包含您连接器遇到的 BulkWriteError 的详细信息。

消息格式

此类以以下格式输出错误

v=%d, code=%d, message=%s, details=%s

前面消息中的字段包含以下信息

  • v:表示 WriteException 消息格式版本的版本。此字段有助于解析此异常产生的消息。对于连接器的 1.14 版本,消息格式的版本为 1。

  • code:与错误关联的代码。要了解更多信息,请参阅 getCode() 方法文档。

  • message:与错误关联的消息。要了解更多信息,请参阅 getMessage() 方法文档。

  • details:以 JSON 格式与错误关联的详细信息。要了解更多信息,请参阅以下方法文档

WriteConcernException
描述
包含您连接器遇到的 WriteConcernError 的详细信息。

消息格式

此类以以下格式输出错误

v=%d, code=%d, codeName=%d, message=%s, details=%s

前面消息中的字段包含以下信息

  • v:表示 WriteConcernException 消息格式的版本。此字段有助于解析此异常产生的消息。对于连接器版本 1.14,消息格式的版本是 1。

  • code:与错误相关的代码。要了解更多信息,请参阅 getCode() 方法的文档。

  • codeName:与错误相关的代码名称。要了解更多信息,请参阅 getCodeName() 方法的文档。

  • message:与错误相关的消息。要了解更多信息,请参阅getMessage()方法文档。

  • details:以 JSON 格式与错误关联的详细信息。要了解更多信息,请参阅以下方法文档

WriteSkippedException
描述
通知MongoDB在以下场景中没有尝试写入SinkRecord
  1. 连接器向MongoDB发送一个有序批量写入操作

  2. MongoDB无法处理有序批量写入中的写入操作

  3. MongoDB不会尝试执行有序批量写入中的所有后续写入操作

要了解如何将连接器设置为执行无序批量写入操作,请参阅连接器消息处理属性页面。
消息格式

此异常不产生消息。

要启用将批量写入异常报告到死信队列,请使用以下连接器配置

errors.tolerance=all
errors.deadletterqueue.topic.name=<name of topic to use as dead letter queue>
errors.deadletterqueue.context.headers.enable=true

Apache Kafka 2.6版本增加了处理错误记录的支持。Kafka连接器自动将无法处理的消息发送到死信队列。一旦在死信队列中,您可以检查错误记录,更新它们,并将它们重新提交以进行处理。

以下是一个启用死信队列主题 example.deadletterqueue 的示例配置。此配置指定死信队列和日志文件应记录无效消息,并且死信队列消息应包含上下文头信息。

mongo.errors.tolerance=all
mongo.errors.log.enable=true
errors.log.include.messages=true
errors.deadletterqueue.topic.name=example.deadletterqueue
errors.deadletterqueue.context.headers.enable=true

要了解更多关于死信队列的信息,请参阅 将错误和异常消息写入主题。

返回

连接器消息处理