指定模式
本使用示例演示了如何配置您的 MongoDB Kafka 源连接器以应用自定义 模式 到您的数据中。模式是一个定义,它指定了 Apache Kafka 主题中数据的结构和类型信息。当您必须确保由源连接器填充主题的数据具有一致的结构时,请使用模式。
要了解有关使用连接器与模式的信息,请参阅应用模式 指南。
示例
假设您的应用程序在 MongoDB 集合中跟踪客户数据,并且您想将此数据发布到 Kafka 主题。您希望客户数据的订阅者接收格式一致的数据。您选择对数据进行应用模式。
您的需求和解决方案如下
需求 | 解决方案 |
---|---|
从 MongoDB 集合接收客户数据 | 配置 MongoDB 源连接器以接收来自特定数据库和集合的数据更新。 见从集合接收数据. |
提供客户数据模式 | 指定与客户数据结构和数据类型相对应的模式。 见 创建自定义模式。 |
从客户数据中省略 Kafka 元数据 | 仅包括来自 fullDocument 字段的。见 从发布的记录中省略元数据。 |
有关满足上述要求的完整配置文件的详细信息,请参阅 指定配置。
从集合接收数据
要配置源连接器以从MongoDB集合接收数据,请指定数据库名和集合名。对于本例,您可以配置连接器以从customers
数据库中的purchases
集合读取,如下所示
database=customers collection=purchases
创建自定义模式
您的集合中的示例客户数据文档包含以下信息
{ "name": "Zola", "visits": [ { "$date": "2021-07-25T17:30:00.000Z" }, { "$date": "2021-10-03T14:06:00.000Z" } ], "goods_purchased": { "apples": 1, "bananas": 10 } }
从示例文档中,您决定您的模式应使用以下数据类型呈现字段
字段名 | 数据类型 | 描述 |
---|---|---|
name | 客户名称 | |
visits | 客户访问的日期 | |
购买的商品 | 购买的商品名称和每种商品的购买数量 |
您可以使用以下示例架构中所示的模式格式来描述您的数据,该模式格式采用Apache Avro架构格式
{ "type": "record", "name": "Customer", "fields": [{ "name": "name", "type": "string" },{ "name": "visits", "type": { "type": "array", "items": { "type": "long", "logicalType": "timestamp-millis" } } },{ "name": "goods_purchased", "type": { "type": "map", "values": "int" } } ] }
从发布的记录中省略元数据
连接器将客户数据文档和描述文档的元数据发布到Kafka主题。您可以通过以下设置将连接器设置为仅包含记录中fullDocument
字段包含的文档数据
publish.full.document.only=true
有关fullDocument
字段的更多信息,请参阅变更流指南。
指定配置
您的自定义架构连接器配置应类似于以下内容
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your MongoDB connection URI> database=customers collection=purchases publish.full.document.only=true output.format.value=schema output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]} value.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter
有关指定架构的更多信息,请参阅应用架构指南。