转换器
概述
本指南介绍了如何使用 MongoDB Kafka 连接器的 转换器。转换器是程序,用于在字节和 Kafka Connect 的运行时数据格式之间进行转换。
转换器在 Kafka Connect 和 Apache Kafka 之间传递数据。连接器在 MongoDB 和 Kafka Connect 之间传递数据。以下图表显示了这些关系

要了解更多关于转换器的信息,请参阅以下资源
可用转换器
由于连接器将您的 MongoDB 数据转换为 Kafka Connect 的运行时数据格式,因此连接器与所有可用的转换器一起工作。
重要
为您的源和目标连接器使用相同的转换器
您必须在 MongoDB Kafka 源连接器和 MongoDB Kafka 目标连接器中使用相同的转换器。例如,如果您的源连接器使用 Protobuf 将数据写入主题,则目标连接器必须使用 Protobuf 从主题中读取。
了解应使用哪种转换器,请参阅Confluent的此页面。
具有模式的转换器
如果您使用基于模式的转换器,例如Kafka Connect Avro转换器(Avro转换器)、Kafka Connect Protobuf转换器或Kafka Connect JSON模式转换器,则应在您的源连接器中定义一个模式。
有关指定模式的说明,请参阅应用模式指南。
连接器配置
本节提供了配置连接器管道中以下转换器的属性文件模板
Avro转换器
点击以下选项卡查看与Avro转换器一起工作的属性文件
以下属性文件定义了一个源连接器。该连接器使用默认模式和一个Avro转换器将数据写入Apache Kafka主题
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=<your schema registry uri>
重要
具有MongoDB数据源的Avro转换器
Avro转换器非常适合静态结构的数据,但不适合动态或变化的数据。MongoDB的无模式文档模型支持动态数据,因此在使用Avro转换器之前,请确保您的MongoDB数据源具有静态结构。
以下属性文件定义了一个目标连接器。该连接器使用Avro转换器从Apache Kafka主题中读取数据
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=<your schema registry uri>
要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。
Protobuf转换器
点击以下选项卡查看与Protobuf转换器一起工作的属性文件
以下属性文件定义了一个源连接器。该连接器使用默认模式和一个Protobuf转换器将数据写入Apache Kafka主题
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=<your schema registry uri>
以下属性文件定义了一个目标连接器。该连接器使用Protobuf转换器从Apache Kafka主题中读取数据
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=<your schema registry uri>
要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。
JSON Schema 转换器
点击以下标签查看与 JSON Schema 转换器一起工作的属性文件
以下属性文件配置您的连接器使用 Confluent 模式注册表管理 JSON 模式
以下属性文件定义了一个源连接器。此连接器使用默认模式和一个 JSON 模式转换器将数据写入 Apache Kafka 主题
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=<your schema registry uri>
以下属性文件定义了一个目标连接器。此连接器使用一个 JSON 模式转换器从 Apache Kafka 主题读取数据
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=<your schema registry uri> value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=<your schema registry uri>
以下属性文件配置您的连接器在消息中内嵌 JSON 模式
重要
消息大小增加
在消息中嵌入 JSON 模式会增加消息的大小。为了在使用 JSON 模式的同时减小消息大小,请使用模式注册表。
以下属性文件定义了一个源连接器。此连接器使用默认模式和一个 JSON 模式转换器将数据写入 Apache Kafka 主题
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=schema output.format.key=schema output.schema.infer.value=true key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
以下属性文件定义了一个目标连接器。此连接器使用一个 JSON 模式转换器从 Apache Kafka 主题读取数据
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。
JSON 转换器
点击以下标签查看与 JSON 转换器一起工作的属性文件
以下属性文件定义了一个源连接器。此连接器使用 JSON 转换器将数据写入 Apache Kafka 主题
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=json output.format.key=json key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
以下属性文件定义了一个目标连接器。此连接器使用 JSON 转换器从 Apache Kafka 主题读取数据
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。
字符串转换器(原始JSON)
点击以下选项卡查看与字符串转换器一起工作的属性文件
以下属性文件定义了一个源连接器。此连接器使用字符串转换器将数据写入Apache Kafka主题
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your mongodb uri> database=<your database to read from> collection=<your collection to read from> output.format.value=json output.format.key=json key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
以下属性文件定义了一个目标连接器。此连接器使用字符串转换器从Apache Kafka主题读取数据
connector.class=com.mongodb.kafka.connect.MongoSinkConnector connection.uri=<your mongodb uri> database=<your database to write to> collection=<your collection to write to> topics=<your topic to read from> key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
重要
接收的字符串必须是有效的JSON
即使使用字符串转换器,您的目标连接器也必须从您的Apache Kafka主题接收有效的JSON字符串。
要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。