探索 MongoDB 变更流
按照此教程学习如何在MongoDB集合上创建变更流并观察其创建的变更事件。
探索变更流
完成教程设置
完成以下步骤Kafka Connector 教程设置以启动 Confluent Kafka Connect 和 MongoDB 环境。
打开变更流
在 ChangeStreamShell1 中,使用 PyMongo 驱动创建一个打开变更流的 Python 脚本。
nano openchangestream.py
将以下代码粘贴到文件中并保存更改
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
运行 Python 脚本
python3 openchangestream.py
脚本启动成功后输出以下信息
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
触发变更事件
在 ChangeStreamShell2 中,使用以下命令连接到 MongoDB:mongosh
,MongoDB 壳
mongosh "mongodb://mongo1"
连接成功后,您应该看到以下 MongoDB 壳提示
rs0 [direct: primary] test>
在提示符下,输入以下命令
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
输入上述命令后,切换到 ChangeStreamShell1 查看变更流输出,应类似于以下内容
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
要停止脚本,请按Ctrl+C
.
完成此步骤后,您已成功触发并观察到了变更流事件。
打开过滤后的变更流
您可以通过传递一个聚合管道来对一个变更流应用过滤器。
在 ChangeStreamShell1 中,创建一个新的 Python 脚本来使用 PyMongo 驱动程序打开一个过滤后的变更流。
nano pipeline.py
将以下代码粘贴到文件中并保存更改
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
运行 Python 脚本
python3 pipeline.py
脚本启动成功后输出以下信息
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
观察过滤后的变更流
返回您的 ChangeStreamShell2 会话,该会话应通过 mongosh
连接到 MongoDB。
在提示符下,输入以下命令
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
如脚本输出所示,变更流创建了一个变更事件,因为它匹配以下管道
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
在 ChangeStreamShell2 中尝试插入以下文档以验证变更流仅在文档匹配过滤器时产生事件
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(可选) 停止 Docker 容器
完成本教程后,通过停止或删除 Docker 资产来释放您计算机上的资源。您可以选择删除容器和镜像,或者仅删除容器。如果您删除容器和镜像,您必须重新下载它们以重新启动大小约为 2.4 GB 的 MongoDB Kafka 连接器开发环境。如果您仅删除容器,您可以重用镜像,并避免在示例数据管道中下载大部分大型文件。
提示
更多教程
如果您计划完成更多MongoDB Kafka Connector教程,考虑仅删除容器。如果您不打算完成更多MongoDB Kafka Connector教程,考虑删除容器和镜像。
选择您要运行的任务对应的选项卡。
运行以下shell命令以删除开发环境的Docker容器和镜像:
docker-compose -p mongo-kafka down --rmi all
运行以下shell命令以删除开发环境的Docker容器但保留镜像:
docker-compose -p mongo-kafka down
要重启容器,请遵循在教程设置中启动容器的相同步骤。
摘要
在本教程中,您在MongoDB上创建了一个更改流并观察了输出。MongoDB Kafka源连接器读取您配置的更改流中的更改事件,并将它们写入Kafka主题。
要了解如何为源连接器配置更改流和Kafka主题,请参阅MongoDB Kafka源连接器入门教程。
了解更多
阅读以下资源以了解更多关于本教程中提到的概念: