将现有集合迁移到时间序列集合
遵循本教程了解如何使用 MongoDB Kafka 连接器将现有 MongoDB 集合转换为 时间序列集合。
时间序列集合有效地存储时间序列数据。时间序列数据包括在时间间隔内测量的数据、描述测量的元数据以及测量的时间。
要使用连接器将 MongoDB 集合中的数据转换为时间序列集合,您必须执行以下任务
确定集合中所有文档共有的时间字段。
配置源连接器,将现有集合数据复制到 Kafka 主题。
配置目标连接器,将 Kafka 主题数据复制到时间序列集合。
在本教程中,您将执行上述任务,将股票数据从集合迁移到时间序列集合。时间序列集合更有效地存储和索引数据,并保留使用聚合运算符分析股票性能随时间变化的能力。
将集合迁移到时间序列集合
完成教程设置
完成以下步骤Kafka Connector 教程设置以启动 Confluent Kafka Connect 和 MongoDB 环境。
生成示例数据
运行以下命令以在您的 Docker 环境中启动一个脚本,该脚本生成一个包含伪造的股票符号及其价格的时间序列集合,该集合位于您的教程 MongoDB 副本集中
docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"
数据生成器启动后,您应该看到以下类似的数据
... 1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15 2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15 3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15 ...
配置源连接器
在单独的终端窗口中,使用以下命令在为教程设置下载的Docker容器上创建一个交互式shell会话
docker exec -it mongo1 /bin/bash
创建一个名为stock-source.json
的源配置文件,使用以下命令
nano stock-source.json
将以下配置信息粘贴到文件中并保存更改
{ "name": "mongo-source-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only": "true", "connection.uri": "mongodb://mongo1", "topic.prefix": "marketdata", "database": "Stocks", "collection": "PriceData", "copy.existing": "true" } }
此配置指示连接器将现有数据从PriceData
MongoDB集合复制到marketdata.Stocks.PriceData
Kafka主题,完成后,任何插入该集合的未来数据。
在shell中运行以下命令以使用您创建的配置文件启动源连接器
cx stock-source.json
注意
cx
命令是教程开发环境中的一个自定义脚本。此脚本执行以下等效请求到Kafka Connect REST API以创建一个新的连接器
curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"
在shell中运行以下命令以检查连接器的状态
status
如果您的源连接器成功启动,您应该看到以下输出
Kafka topics: ... The status of the connectors: source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-source-marketdata" ] ...
一旦源连接器启动,请通过运行以下命令确认Kafka主题是否收到了集合数据
kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData
输出应显示与以下类似的主题数据,类似于源连接器发布的数据
{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}
您可以通过键入以下内容退出kafkacat
CTRL+C
.
配置目标连接器
配置一个目标连接器,从Kafka主题读取数据并将其写入到名为StockDataMigrate
的时间序列集合中,该集合位于名为Stocks
的数据库中。
使用以下命令创建名为stock-sink.json
的目标配置文件
nano stock-sink.json
将以下配置信息粘贴到文件中并保存更改
{ "name": "mongo-sink-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "marketdata.Stocks.PriceData", "connection.uri": "mongodb://mongo1", "database": "Stocks", "collection": "StockDataMigrate", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield": "tx_time", "timeseries.timefield.auto.convert": "true", "timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'" } }
提示
上述的sink连接器配置使用了时间字段日期格式转换器。或者,您可以使用TimestampConverter
单消息转换(SMT)将tx_time
字段从String
转换为ISODate
。当使用TimestampConverter
SMT时,您必须为Kafka主题中的数据定义一个模式。
有关如何使用TimestampConverter
SMT的信息,请参阅TimestampConverter Confluent文档。
在shell中运行以下命令以启动sink连接器,使用您更新的配置文件
cx stock-sink.json
当您的sink连接器完成主题数据的处理后,StockDataMigrate
时间序列集合中的文档包含具有ISODate
类型值的tx_time
字段。
验证时间序列集合数据
一旦sink连接器完成主题数据的处理,StockDataMigrate
时间序列集合应该包含来自您的PriceData
集合的所有市场数据。
要查看MongoDB中的数据,运行以下命令以使用mongosh
连接到您的副本集
mongosh "mongodb://mongo1"
在提示符下,键入以下命令以检索MongoDB命名空间Stocks.StockDataMigrate
中的所有文档
use Stocks db.StockDataMigrate.find()
您应该会看到命令返回的文档列表,类似于以下文档
{ tx_time: ISODate("2022-05-25T21:16:35.983Z"), _id: ObjectId("628e9..."), symbol: 'FAV', price: 18.43, company_name: 'FUZZY ATTACK VENTURES' }
摘要
在本教程中,您创建了一个股票行情数据生成器,该生成器定期将数据写入MongoDB集合。您配置了一个源连接器将数据复制到Kafka主题,并配置了一个目标连接器将该数据写入新的MongoDB时序集合。
了解更多
阅读以下资源,了解更多关于本教程中提到的概念