文档菜单
文档首页
/
MongoDB Kafka 连接器
/ /

写入模型策略

本页内容

  • 概述
  • 批量写入操作
  • 如何指定写入模型策略
  • 指定业务键
  • 示例
  • 更新一个时间戳策略
  • 替换一个业务键策略
  • 删除一个业务键策略
  • 自定义写入模型策略
  • 示例写入模型策略
  • 如何安装您的策略

本指南向您展示如何更改MongoDB Kafka sink连接器将数据写入MongoDB的方式。

您可以通过指定以下用例更改连接器将数据写入MongoDB的方式

  • 插入文档而不是更新它们

  • 替换或更新除_id字段以外的匹配的文档_id字段

  • 删除匹配的文档

您可以通过指定一个写入模型策略来配置连接器如何将数据写入MongoDB。写入模型策略是一个定义连接器如何使用写入模型写入数据的类。写入模型是MongoDB Java驱动程序接口,它定义了写入操作的结构。

要了解如何在连接器将数据写入MongoDB之前修改连接器接收到的数据,请阅读有关Sink连接器后处理器.

要查看写入模型策略的实现,请参阅InsertOneDefaultStrategy类的源代码.

连接器使用批量写入操作将数据写入MongoDB。批量写入将多个写入操作(如插入、更新或删除)组合在一起。

默认情况下,连接器执行有序的批量写入,这保证了数据更改的顺序。在有序的批量写入中,如果任何写入操作导致错误,则连接器跳过该批中剩余的写入。

如果您不需要保证数据更改的顺序,可以将bulk.write.ordered设置设置为false,以便连接器执行无序的批量写入。连接器并行执行无序的批量写入,这可以提高性能。

此外,当您启用无序的批量写入并将errors.tolerance设置设置为all时,即使批量写入中的任何写入操作失败,连接器也会继续执行该批中未返回错误的其他写入操作。

提示

要了解更多关于bulk.write.ordered设置的信息,请参阅连接器消息处理属性

要了解更多关于批量写入操作的信息,请参阅以下文档

要指定写入模型策略,请使用以下设置

writemodel.strategy=<write model strategy classname>

有关连接器中包含的预构建写入模型策略的列表,请参阅写入模型策略配置指南。

业务键是由一个或多个字段组成的价值,用于在您的接收记录中识别它作为唯一的。默认情况下,接收连接器使用接收记录的 _id 字段来检索业务键。要指定不同的业务键,配置文档 ID 添加后处理程序以使用自定义值。

您可以将文档 ID 添加器配置为将 _id 字段从接收记录键设置,如下面的示例属性所示

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
document.id.strategy.partial.key.projection.list=<comma-separated field names>
document.id.strategy.partial.key.projection.type=AllowList

或者,您可以将它配置为从接收记录值设置 _id 字段,如下面的示例属性所示

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList

重要

提高写入性能

在您的目标集合中创建一个唯一索引,该索引与您的业务键字段相对应。这提高了从接收连接器写入操作的性能。有关更多信息,请参阅唯一索引指南。

以下写入模型策略需要业务键

  • ReplaceOneBusinessKeyStrategy

  • 删除一个业务键策略

  • 更新一个业务键时间戳策略

有关Document Id Adder后处理程序的更多信息,请参阅配置Document Id Adder后处理程序。

本节展示了以下写入模型策略的配置和输出示例

您可以将更新一个时间戳策略配置为在将文档写入MongoDB时添加和更新时间戳。此策略执行以下操作

  • 当连接器插入一个新的MongoDB文档时,它将连接器服务器上的当前时间设置为_insertedTS_modifiedTS字段。

  • 当连接器更新现有的MongoDB文档时,它将连接器服务器上的当前时间更新到_modifiedTS字段。

假设您想跟踪一列火车在路线上的位置,并且您的接收器连接器接收以下结构的消息

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

使用ProvidedInValueStrategy指定您的连接器应使用消息的_id值来分配MongoDB文档中的_id字段。按如下方式指定您的id和写入模型策略属性

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

在您的接收器连接器处理前面的示例记录后,它插入一个包含以下字段的文档,如以下文档所示

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

一小时后,火车报告其沿路线的新位置,如下所示的新位置

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

一旦您的接收器连接器处理前面的记录,它将插入一个包含以下数据的文档

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

有关ProvidedInValueStrategy的更多信息,请参阅配置Document Id Adder后处理程序。部分。

您可以配置替换一个业务键策略,以替换与业务键值匹配的文档。要在多个字段上定义业务键并配置连接器以替换包含匹配业务键的文档,请执行以下任务

  1. 在您的集合中创建一个与业务键字段对应的唯一索引

  2. 在连接器配置中指定PartialValueStrategy id策略,以识别业务键所属的字段。

  3. 在连接器配置中指定ReplaceOneBusinessKeyStrategy 写入模型策略。

假设您想通过flight_no(航班号)和airport_code(机场代码)分别表示的航班号和机场位置来跟踪飞机容量。一个示例消息包含以下信息

{
"flight_no": "Z342",
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

要实现此策略,首先在MongoDB shell中在这些字段上创建唯一索引

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

接下来,在投影列表中指定PartialValueStrategy策略和业务键字段。指定id和写入模型策略配置如下

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

插入到集合中的示例数据如下

{
"flight_no": "Z342"
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

当连接器处理与现有文档的业务键匹配的目标数据时,它会用新值替换文档,而不更改业务键字段

{
"flight_no": "Z342"
"airport_code": "LAX",
"status": "canceled"
}

连接器处理完目标数据后,它将用前面的一个替换MongoDB中的原始示例文档。

您可以通过配置连接器以使用删除一个业务键策略来删除在收到与业务键匹配的消息时收到的文档。要设置来自记录多个字段的业务键并配置连接器以删除包含匹配业务键的文档,请执行以下任务

  1. 在您的 MongoDB 集合中创建一个对应于业务键字段的 唯一索引

  2. 在连接器配置中将 PartialValueStrategy 指定为 ID 策略,以识别属于业务键的字段。

  3. 在连接器配置中指定 DeleteOneBusinessKeyStrategy 写模型策略。

假设您想从包含类似以下文档的集合中删除特定年份的日历事件

{
"year": 2005,
"month": 3,
"day": 15,
"event": "Dentist Appointment"
}

要实现此策略,首先在 MongoDB 脚本中使用 year 作为业务键,在这些字段上创建唯一索引

db.collection.createIndex({ "year": 1 }, { unique: true })

接下来,按照以下方式在您的配置中指定业务键和写模型策略

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=year
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

如果您的连接器处理包含业务键 year 的目标记录,则删除 MongoDB 返回的第一个匹配字段值的文档。假设您的连接器处理包含以下值数据的记录

{
"year": 2005,
...
}

当连接器处理前面的记录时,它会从包含 year 字段且值为 "2005" 的第一个文档开始删除,例如原始的 "牙医预约" 示例文档。

如果连接器提供的写模型策略都不符合您的用例,您可以创建自己的策略。

写模型策略是一个实现了 WriteModelStrategy 接口的 Java 类,并且必须重写 createWriteModel() 方法。

请参阅 WriteModelStrategy 接口的源代码 以获取所需的方法签名。

以下自定义写入模型策略返回一个写入操作,该操作用您的目标记录的 _id 字段的值替换 MongoDB 文档的 fullDocument 字段的值

/**
* Custom write model strategy
*
* This class reads the 'fullDocument' field from a change stream and
* returns a ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

有关自定义写入模型策略的另一个示例,请参阅 GitHub 上的 UpsertAsPartOfDocumentStrategy 示例策略。

要配置您的目标连接器以使用自定义写入策略,您必须完成以下操作

  1. 将自定义写入策略类编译成 JAR 文件。

  2. 将编译好的JAR文件添加到Kafka工作进程的类路径/插件路径中。有关插件路径的更多信息,请参阅Confluent文档。

    注意

    Kafka Connect以隔离模式加载插件。当部署自定义写入策略时,连接器JAR文件和写入模型策略JAR文件必须在同一路径上。您的路径应类似于以下示例

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    有关Kafka Connect插件的更多信息,请参阅Confluent的此指南。

  3. writemodel.strategy配置设置中指定您的自定义类。

要了解如何将类编译成JAR文件,请参阅来自Java SE文档的JAR部署指南

返回

基础