文档菜单
文档首页
/
MongoDB Kafka 连接器
/

转换器

本页内容

  • 概述
  • 可用转换器
  • 具有模式的转换器
  • 连接器配置
  • Avro 转换器
  • Protobuf 转换器
  • JSON Schema 转换器
  • JSON 转换器
  • 字符串转换器(原始 JSON)

本指南介绍了如何使用 MongoDB Kafka 连接器的 转换器。转换器是程序,用于在字节和 Kafka Connect 的运行时数据格式之间进行转换。

转换器在 Kafka Connect 和 Apache Kafka 之间传递数据。连接器在 MongoDB 和 Kafka Connect 之间传递数据。以下图表显示了这些关系

Diagram illustrating converters' role in Kafka Connect

要了解更多关于转换器的信息,请参阅以下资源

  • Confluent 文章.

  • Kafka Connect 概念的 Confluent 文章

  • 转换器接口 API 文档

由于连接器将您的 MongoDB 数据转换为 Kafka Connect 的运行时数据格式,因此连接器与所有可用的转换器一起工作。

重要

为您的源和目标连接器使用相同的转换器

您必须在 MongoDB Kafka 源连接器和 MongoDB Kafka 目标连接器中使用相同的转换器。例如,如果您的源连接器使用 Protobuf 将数据写入主题,则目标连接器必须使用 Protobuf 从主题中读取。

了解应使用哪种转换器,请参阅Confluent的此页面。

如果您使用基于模式的转换器,例如Kafka Connect Avro转换器(Avro转换器)、Kafka Connect Protobuf转换器或Kafka Connect JSON模式转换器,则应在您的源连接器中定义一个模式。

有关指定模式的说明,请参阅应用模式指南。

本节提供了配置连接器管道中以下转换器的属性文件模板

点击以下选项卡查看与Avro转换器一起工作的属性文件

以下属性文件定义了一个源连接器。该连接器使用默认模式和一个Avro转换器将数据写入Apache Kafka主题

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<your schema registry uri>

重要

具有MongoDB数据源的Avro转换器

Avro转换器非常适合静态结构的数据,但不适合动态或变化的数据。MongoDB的无模式文档模型支持动态数据,因此在使用Avro转换器之前,请确保您的MongoDB数据源具有静态结构。

以下属性文件定义了一个目标连接器。该连接器使用Avro转换器从Apache Kafka主题中读取数据

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<your schema registry uri>

要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。

点击以下选项卡查看与Protobuf转换器一起工作的属性文件

以下属性文件定义了一个源连接器。该连接器使用默认模式和一个Protobuf转换器将数据写入Apache Kafka主题

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=<your schema registry uri>

以下属性文件定义了一个目标连接器。该连接器使用Protobuf转换器从Apache Kafka主题中读取数据

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=<your schema registry uri>

要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。

点击以下标签查看与 JSON Schema 转换器一起工作的属性文件

以下属性文件配置您的连接器使用 Confluent 模式注册表管理 JSON 模式

以下属性文件定义了一个源连接器。此连接器使用默认模式和一个 JSON 模式转换器将数据写入 Apache Kafka 主题

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=<your schema registry uri>

以下属性文件定义了一个目标连接器。此连接器使用一个 JSON 模式转换器从 Apache Kafka 主题读取数据

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=<your schema registry uri>
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=<your schema registry uri>

以下属性文件配置您的连接器在消息中内嵌 JSON 模式

重要

消息大小增加

在消息中嵌入 JSON 模式会增加消息的大小。为了在使用 JSON 模式的同时减小消息大小,请使用模式注册表。

以下属性文件定义了一个源连接器。此连接器使用默认模式和一个 JSON 模式转换器将数据写入 Apache Kafka 主题

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=schema
output.format.key=schema
output.schema.infer.value=true
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

以下属性文件定义了一个目标连接器。此连接器使用一个 JSON 模式转换器从 Apache Kafka 主题读取数据

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。

点击以下标签查看与 JSON 转换器一起工作的属性文件

以下属性文件定义了一个源连接器。此连接器使用 JSON 转换器将数据写入 Apache Kafka 主题

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=json
output.format.key=json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

以下属性文件定义了一个目标连接器。此连接器使用 JSON 转换器从 Apache Kafka 主题读取数据

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。

点击以下选项卡查看与字符串转换器一起工作的属性文件

以下属性文件定义了一个源连接器。此连接器使用字符串转换器将数据写入Apache Kafka主题

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your mongodb uri>
database=<your database to read from>
collection=<your collection to read from>
output.format.value=json
output.format.key=json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

以下属性文件定义了一个目标连接器。此连接器使用字符串转换器从Apache Kafka主题读取数据

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=<your mongodb uri>
database=<your database to write to>
collection=<your collection to write to>
topics=<your topic to read from>
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

重要

接收的字符串必须是有效的JSON

即使使用字符串转换器,您的目标连接器也必须从您的Apache Kafka主题接收有效的JSON字符串。

要使用上述属性文件,请将尖括号中的占位符文本替换为您自己的信息。

返回

数据格式