写入模型策略
概述
本指南向您展示如何更改MongoDB Kafka sink连接器将数据写入MongoDB的方式。
您可以通过指定以下用例更改连接器将数据写入MongoDB的方式
插入文档而不是更新它们
替换或更新除_id字段以外的匹配的文档
_id
字段删除匹配的文档
您可以通过指定一个写入模型策略来配置连接器如何将数据写入MongoDB。写入模型策略是一个定义连接器如何使用写入模型写入数据的类。写入模型是MongoDB Java驱动程序接口,它定义了写入操作的结构。
要了解如何在连接器将数据写入MongoDB之前修改连接器接收到的数据,请阅读有关Sink连接器后处理器.
要查看写入模型策略的实现,请参阅InsertOneDefaultStrategy类的源代码.
批量写入操作
连接器使用批量写入操作将数据写入MongoDB。批量写入将多个写入操作(如插入、更新或删除)组合在一起。
默认情况下,连接器执行有序的批量写入,这保证了数据更改的顺序。在有序的批量写入中,如果任何写入操作导致错误,则连接器跳过该批中剩余的写入。
如果您不需要保证数据更改的顺序,可以将bulk.write.ordered
设置设置为false
,以便连接器执行无序的批量写入。连接器并行执行无序的批量写入,这可以提高性能。
此外,当您启用无序的批量写入并将errors.tolerance
设置设置为all
时,即使批量写入中的任何写入操作失败,连接器也会继续执行该批中未返回错误的其他写入操作。
提示
要了解更多关于bulk.write.ordered
设置的信息,请参阅连接器消息处理属性。
要了解更多关于批量写入操作的信息,请参阅以下文档
如何指定写入模型策略
要指定写入模型策略,请使用以下设置
writemodel.strategy=<write model strategy classname>
有关连接器中包含的预构建写入模型策略的列表,请参阅写入模型策略配置指南。
指定业务键
业务键是由一个或多个字段组成的价值,用于在您的接收记录中识别它作为唯一的。默认情况下,接收连接器使用接收记录的 _id
字段来检索业务键。要指定不同的业务键,配置文档 ID 添加后处理程序以使用自定义值。
您可以将文档 ID 添加器配置为将 _id
字段从接收记录键设置,如下面的示例属性所示
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy document.id.strategy.partial.key.projection.list=<comma-separated field names> document.id.strategy.partial.key.projection.type=AllowList
或者,您可以将它配置为从接收记录值设置 _id
字段,如下面的示例属性所示
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=<comma-separated field names> document.id.strategy.partial.value.projection.type=AllowList
以下写入模型策略需要业务键
ReplaceOneBusinessKeyStrategy
删除一个业务键策略
更新一个业务键时间戳策略
有关Document Id Adder后处理程序的更多信息,请参阅配置Document Id Adder后处理程序。
示例
本节展示了以下写入模型策略的配置和输出示例
更新一个时间戳策略
您可以将更新一个时间戳策略配置为在将文档写入MongoDB时添加和更新时间戳。此策略执行以下操作
当连接器插入一个新的MongoDB文档时,它将连接器服务器上的当前时间设置为
_insertedTS
和_modifiedTS
字段。当连接器更新现有的MongoDB文档时,它将连接器服务器上的当前时间更新到
_modifiedTS
字段。
假设您想跟踪一列火车在路线上的位置,并且您的接收器连接器接收以下结构的消息
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
使用ProvidedInValueStrategy
指定您的连接器应使用消息的_id
值来分配MongoDB文档中的_id
字段。按如下方式指定您的id和写入模型策略属性
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
在您的接收器连接器处理前面的示例记录后,它插入一个包含以下字段的文档,如以下文档所示
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T15:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
一小时后,火车报告其沿路线的新位置,如下所示的新位置
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
一旦您的接收器连接器处理前面的记录,它将插入一个包含以下数据的文档
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T16:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
有关ProvidedInValueStrategy
的更多信息,请参阅配置Document Id Adder后处理程序。部分。
替换一个业务键策略
您可以配置替换一个业务键策略,以替换与业务键值匹配的文档。要在多个字段上定义业务键并配置连接器以替换包含匹配业务键的文档,请执行以下任务
在您的集合中创建一个与业务键字段对应的唯一索引。
在连接器配置中指定
PartialValueStrategy
id策略,以识别业务键所属的字段。在连接器配置中指定
ReplaceOneBusinessKeyStrategy
写入模型策略。
假设您想通过flight_no
(航班号)和airport_code
(机场代码)分别表示的航班号和机场位置来跟踪飞机容量。一个示例消息包含以下信息
{ "flight_no": "Z342", "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
要实现此策略,首先在MongoDB shell中在这些字段上创建唯一索引
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })
接下来,在投影列表中指定PartialValueStrategy
策略和业务键字段。指定id和写入模型策略配置如下
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=flight_no,airport_code document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
插入到集合中的示例数据如下
{ "flight_no": "Z342" "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
当连接器处理与现有文档的业务键匹配的目标数据时,它会用新值替换文档,而不更改业务键字段
{ "flight_no": "Z342" "airport_code": "LAX", "status": "canceled" }
连接器处理完目标数据后,它将用前面的一个替换MongoDB中的原始示例文档。
删除一个业务键策略
您可以通过配置连接器以使用删除一个业务键策略来删除在收到与业务键匹配的消息时收到的文档。要设置来自记录多个字段的业务键并配置连接器以删除包含匹配业务键的文档,请执行以下任务
在您的 MongoDB 集合中创建一个对应于业务键字段的 唯一索引。
在连接器配置中将
PartialValueStrategy
指定为 ID 策略,以识别属于业务键的字段。在连接器配置中指定
DeleteOneBusinessKeyStrategy
写模型策略。
假设您想从包含类似以下文档的集合中删除特定年份的日历事件
{ "year": 2005, "month": 3, "day": 15, "event": "Dentist Appointment" }
要实现此策略,首先在 MongoDB 脚本中使用 year
作为业务键,在这些字段上创建唯一索引
db.collection.createIndex({ "year": 1 }, { unique: true })
接下来,按照以下方式在您的配置中指定业务键和写模型策略
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=year document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy
如果您的连接器处理包含业务键 year
的目标记录,则删除 MongoDB 返回的第一个匹配字段值的文档。假设您的连接器处理包含以下值数据的记录
{ "year": 2005, ... }
当连接器处理前面的记录时,它会从包含 year
字段且值为 "2005" 的第一个文档开始删除,例如原始的 "牙医预约" 示例文档。
自定义写模型策略
如果连接器提供的写模型策略都不符合您的用例,您可以创建自己的策略。
写模型策略是一个实现了 WriteModelStrategy
接口的 Java 类,并且必须重写 createWriteModel()
方法。
请参阅 WriteModelStrategy 接口的源代码 以获取所需的方法签名。
示例写入模型策略
以下自定义写入模型策略返回一个写入操作,该操作用您的目标记录的 _id
字段的值替换 MongoDB 文档的 fullDocument
字段的值
/** * Custom write model strategy * * This class reads the 'fullDocument' field from a change stream and * returns a ReplaceOne operation. */ public class CustomWriteModelStrategy implements WriteModelStrategy { private static String ID = "_id"; public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) { BsonDocument changeStreamDocument = document.getValueDoc() .orElseThrow(() -> new DataException("Missing value document")); BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument()); if (fullDocument.isEmpty()) { return null; // Return null to indicate no op. } return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument); } }
有关自定义写入模型策略的另一个示例,请参阅 GitHub 上的 UpsertAsPartOfDocumentStrategy 示例策略。
如何安装您的策略
要配置您的目标连接器以使用自定义写入策略,您必须完成以下操作
将自定义写入策略类编译成 JAR 文件。
将编译好的JAR文件添加到Kafka工作进程的类路径/插件路径中。有关插件路径的更多信息,请参阅Confluent文档。
注意
Kafka Connect以隔离模式加载插件。当部署自定义写入策略时,连接器JAR文件和写入模型策略JAR文件必须在同一路径上。您的路径应类似于以下示例
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
<plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar
有关Kafka Connect插件的更多信息,请参阅Confluent的此指南。
在writemodel.strategy配置设置中指定您的自定义类。
要了解如何将类编译成JAR文件,请参阅来自Java SE文档的JAR部署指南。