文档菜单
文档首页
/
MongoDB Kafka 连接器
/

将现有集合迁移到时间序列集合

本页内容

  • 将集合迁移到时间序列集合
  • 摘要
  • 了解更多

遵循本教程了解如何使用 MongoDB Kafka 连接器将现有 MongoDB 集合转换为 时间序列集合

时间序列集合有效地存储时间序列数据。时间序列数据包括在时间间隔内测量的数据、描述测量的元数据以及测量的时间。

要使用连接器将 MongoDB 集合中的数据转换为时间序列集合,您必须执行以下任务

  1. 确定集合中所有文档共有的时间字段。

  2. 配置源连接器,将现有集合数据复制到 Kafka 主题。

  3. 配置目标连接器,将 Kafka 主题数据复制到时间序列集合。

在本教程中,您将执行上述任务,将股票数据从集合迁移到时间序列集合。时间序列集合更有效地存储和索引数据,并保留使用聚合运算符分析股票性能随时间变化的能力。

1

完成以下步骤Kafka Connector 教程设置以启动 Confluent Kafka Connect 和 MongoDB 环境。

2

运行以下命令以在您的 Docker 环境中启动一个脚本,该脚本生成一个包含伪造的股票符号及其价格的时间序列集合,该集合位于您的教程 MongoDB 副本集中

docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"

数据生成器启动后,您应该看到以下类似的数据

...
1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15
2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15
3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15
...
3

在单独的终端窗口中,使用以下命令在为教程设置下载的Docker容器上创建一个交互式shell会话

docker exec -it mongo1 /bin/bash

创建一个名为stock-source.json的源配置文件,使用以下命令

nano stock-source.json

将以下配置信息粘贴到文件中并保存更改

{
"name": "mongo-source-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": "true",
"connection.uri": "mongodb://mongo1",
"topic.prefix": "marketdata",
"database": "Stocks",
"collection": "PriceData",
"copy.existing": "true"
}
}

此配置指示连接器将现有数据从PriceData MongoDB集合复制到marketdata.Stocks.PriceData Kafka主题,完成后,任何插入该集合的未来数据。

在shell中运行以下命令以使用您创建的配置文件启动源连接器

cx stock-source.json

注意

cx命令是教程开发环境中的一个自定义脚本。此脚本执行以下等效请求到Kafka Connect REST API以创建一个新的连接器

curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"

在shell中运行以下命令以检查连接器的状态

status

如果您的源连接器成功启动,您应该看到以下输出

Kafka topics:
...
The status of the connectors:
source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-source-marketdata"
]
...

一旦源连接器启动,请通过运行以下命令确认Kafka主题是否收到了集合数据

kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData

输出应显示与以下类似的主题数据,类似于源连接器发布的数据

{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}

您可以通过键入以下内容退出kafkacatCTRL+C.

4

配置一个目标连接器,从Kafka主题读取数据并将其写入到名为StockDataMigrate的时间序列集合中,该集合位于名为Stocks的数据库中。

使用以下命令创建名为stock-sink.json的目标配置文件

nano stock-sink.json

将以下配置信息粘贴到文件中并保存更改

{
"name": "mongo-sink-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "marketdata.Stocks.PriceData",
"connection.uri": "mongodb://mongo1",
"database": "Stocks",
"collection": "StockDataMigrate",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield": "tx_time",
"timeseries.timefield.auto.convert": "true",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}

提示

上述的sink连接器配置使用了时间字段日期格式转换器。或者,您可以使用TimestampConverter单消息转换(SMT)将tx_time字段从String转换为ISODate。当使用TimestampConverter SMT时,您必须为Kafka主题中的数据定义一个模式。

有关如何使用TimestampConverter SMT的信息,请参阅TimestampConverter Confluent文档。

在shell中运行以下命令以启动sink连接器,使用您更新的配置文件

cx stock-sink.json

当您的sink连接器完成主题数据的处理后,StockDataMigrate时间序列集合中的文档包含具有ISODate类型值的tx_time字段。

5

一旦sink连接器完成主题数据的处理,StockDataMigrate时间序列集合应该包含来自您的PriceData集合的所有市场数据。

要查看MongoDB中的数据,运行以下命令以使用mongosh连接到您的副本集

mongosh "mongodb://mongo1"

在提示符下,键入以下命令以检索MongoDB命名空间Stocks.StockDataMigrate中的所有文档

use Stocks
db.StockDataMigrate.find()

您应该会看到命令返回的文档列表,类似于以下文档

{
tx_time: ISODate("2022-05-25T21:16:35.983Z"),
_id: ObjectId("628e9..."),
symbol: 'FAV',
price: 18.43,
company_name: 'FUZZY ATTACK VENTURES'
}

在本教程中,您创建了一个股票行情数据生成器,该生成器定期将数据写入MongoDB集合。您配置了一个源连接器将数据复制到Kafka主题,并配置了一个目标连接器将该数据写入新的MongoDB时序集合。

阅读以下资源,了解更多关于本教程中提到的概念

返回

使用变更数据捕获处理器复制数据

© . All rights reserved.