从 Kafka Connect MongoDB 迁移
使用本指南将您的 Kafka 部署从社区创建的Kafka Connect MongoDB 目标连接器到 官方 MongoDB Kafka 连接器。
以下部分列出了您必须对 Kafka Connect 目标连接器配置设置和自定义类进行的更改,以过渡到 MongoDB Kafka 目标连接器。
更新配置设置
在使用 MongoDB Kafka 连接器部署之前,对 Kafka Connect 部署的配置设置进行以下更改
替换包含包
at.grahsl.kafka.connect.mongodb
的值,使用包com.mongodb.kafka.connect
。将您的
connector.class
设置替换为 MongoDB Kafka 目标连接器类。connector.class=com.mongodb.kafka.connect.MongoSinkConnector 从 Kafka Connect 属性名称中删除
mongodb.
前缀。例如,将mongodb.connection.uri
更改为connection.uri
。如果存在,请删除
document.id.strategies
设置。如果此设置的值引用了自定义策略,请将它们移动到document.id.strategy
设置。阅读 更新自定义类 部分,以了解您必须对自定义类进行哪些更改。将您用于指定包含
mongodb.collection
前缀的每个主题或集合覆盖的任何属性名替换为下游连接器 Kafka 主题配置主题属性中的等效键.
更新自定义类
如果您在 Kafka Connect 下游连接器部署中使用任何自定义类,请在将它们添加到 MongoDB Kafka 连接器部署之前对它们进行以下更改
将包含
at.grahsl.kafka.connect.mongodb
的导入替换为com.mongodb.kafka.connect
。将对
MongoDbSinkConnector
类的引用替换为MongoSinkConnector
类。更新自定义下游连接器策略类,以实现
com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy
接口。更新对
MongoDbSinkConnectorConfig
类的引用。在 MongoDB Kafka 连接器中,该类的逻辑被拆分为以下类
更新后处理器子类
如果你在Kafka Connect连接器部署中使用了继承自后处理器的类,请更新覆盖Kafka Connect PostProcessor
类中方法的代码,以匹配MongoDB Kafka连接器的 PostProcessor类。