文档菜单
文档首页
/
MongoDB Kafka 连接器
/ /

指定模式

本使用示例演示了如何配置您的 MongoDB Kafka 源连接器以应用自定义 模式 到您的数据中。模式是一个定义,它指定了 Apache Kafka 主题中数据的结构和类型信息。当您必须确保由源连接器填充主题的数据具有一致的结构时,请使用模式。

要了解有关使用连接器与模式的信息,请参阅应用模式 指南。

假设您的应用程序在 MongoDB 集合中跟踪客户数据,并且您想将此数据发布到 Kafka 主题。您希望客户数据的订阅者接收格式一致的数据。您选择对数据进行应用模式。

您的需求和解决方案如下

需求
解决方案
从 MongoDB 集合接收客户数据
配置 MongoDB 源连接器以接收来自特定数据库和集合的数据更新。
从集合接收数据.
提供客户数据模式
指定与客户数据结构和数据类型相对应的模式。
从客户数据中省略 Kafka 元数据
仅包括来自fullDocument 字段的。

有关满足上述要求的完整配置文件的详细信息,请参阅 指定配置

要配置源连接器以从MongoDB集合接收数据,请指定数据库名和集合名。对于本例,您可以配置连接器以从customers数据库中的purchases集合读取,如下所示

database=customers
collection=purchases

您的集合中的示例客户数据文档包含以下信息

{
"name": "Zola",
"visits": [
{
"$date": "2021-07-25T17:30:00.000Z"
},
{
"$date": "2021-10-03T14:06:00.000Z"
}
],
"goods_purchased": {
"apples": 1,
"bananas": 10
}
}

从示例文档中,您决定您的模式应使用以下数据类型呈现字段

字段名
数据类型
描述
name
客户名称
visits
客户访问的日期
购买的商品
映射 of string (the assumed type) to 整数 values
购买的商品名称和每种商品的购买数量

您可以使用以下示例架构中所示的模式格式来描述您的数据,该模式格式采用Apache Avro架构格式

{
"type": "record",
"name": "Customer",
"fields": [{
"name": "name",
"type": "string"
},{
"name": "visits",
"type": {
"type": "array",
"items": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
},{
"name": "goods_purchased",
"type": {
"type": "map",
"values": "int"
}
}
]
}

重要

转换器

如果您想使用Avro二进制编码通过Apache Kafka发送数据,则必须使用Avro转换器。更多信息,请参阅转换器。

连接器将客户数据文档和描述文档的元数据发布到Kafka主题。您可以通过以下设置将连接器设置为仅包含记录中fullDocument字段包含的文档数据

publish.full.document.only=true

有关fullDocument字段的更多信息,请参阅变更流指南。

您的自定义架构连接器配置应类似于以下内容

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your MongoDB connection URI>
database=customers
collection=purchases
publish.full.document.only=true
output.format.value=schema
output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

注意

内嵌架构

在前面的配置中,Kafka Connect JSON Schema Converter将自定义架构嵌入到您的消息中。有关JSON Schema转换器的更多信息,请参阅转换器指南。

有关指定架构的更多信息,请参阅应用架构指南。

返回

复制现有数据