文档菜单
文档首页
/
MongoDB Kafka 连接器
/

探索 MongoDB 变更流

在本页

  • 探索变更流
  • 摘要
  • 了解更多

按照此教程学习如何在MongoDB集合上创建变更流并观察其创建的变更事件。

1

完成以下步骤Kafka Connector 教程设置以启动 Confluent Kafka Connect 和 MongoDB 环境。

2

在教程 Docker 容器中创建两个交互式 shell 会话,每个会话在一个单独的窗口中。

从终端运行以下命令以启动交互式 shell。

docker exec -it mongo1 /bin/bash

在本教程中,我们将称此交互式 shell 为 ChangeStreamShell1

在第二个终端中运行以下命令

docker exec -it mongo1 /bin/bash

在本教程中,我们将称此交互式 shell 为 ChangeStreamShell2

3

ChangeStreamShell1 中,使用 PyMongo 驱动创建一个打开变更流的 Python 脚本。

nano openchangestream.py

将以下代码粘贴到文件中并保存更改

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

运行 Python 脚本

python3 openchangestream.py

脚本启动成功后输出以下信息

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
4

ChangeStreamShell2 中,使用以下命令连接到 MongoDB:mongosh,MongoDB 壳

mongosh "mongodb://mongo1"

连接成功后,您应该看到以下 MongoDB 壳提示

rs0 [direct: primary] test>

在提示符下,输入以下命令

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

输入上述命令后,切换到 ChangeStreamShell1 查看变更流输出,应类似于以下内容

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "<_id value of document>"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "<_id value of document>"
}
}
}

要停止脚本,请按Ctrl+C.

完成此步骤后,您已成功触发并观察到了变更流事件。

5

您可以通过传递一个聚合管道来对一个变更流应用过滤器。

ChangeStreamShell1 中,创建一个新的 Python 脚本来使用 PyMongo 驱动程序打开一个过滤后的变更流。

nano pipeline.py

将以下代码粘贴到文件中并保存更改

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

运行 Python 脚本

python3 pipeline.py

脚本启动成功后输出以下信息

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
6

返回您的 ChangeStreamShell2 会话,该会话应通过 mongosh 连接到 MongoDB。

在提示符下,输入以下命令

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

如脚本输出所示,变更流创建了一个变更事件,因为它匹配以下管道

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

ChangeStreamShell2 中尝试插入以下文档以验证变更流仅在文档匹配过滤器时产生事件

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
7

完成本教程后,通过停止或删除 Docker 资产来释放您计算机上的资源。您可以选择删除容器和镜像,或者仅删除容器。如果您删除容器和镜像,您必须重新下载它们以重新启动大小约为 2.4 GB 的 MongoDB Kafka 连接器开发环境。如果您仅删除容器,您可以重用镜像,并避免在示例数据管道中下载大部分大型文件。

提示

更多教程

如果您计划完成更多MongoDB Kafka Connector教程,考虑仅删除容器。如果您不打算完成更多MongoDB Kafka Connector教程,考虑删除容器和镜像。

选择您要运行的任务对应的选项卡。

运行以下shell命令以删除开发环境的Docker容器和镜像:

docker-compose -p mongo-kafka down --rmi all

运行以下shell命令以删除开发环境的Docker容器但保留镜像:

docker-compose -p mongo-kafka down

要重启容器,请遵循在教程设置中启动容器的相同步骤。

在本教程中,您在MongoDB上创建了一个更改流并观察了输出。MongoDB Kafka源连接器读取您配置的更改流中的更改事件,并将它们写入Kafka主题。

要了解如何为源连接器配置更改流和Kafka主题,请参阅MongoDB Kafka源连接器入门教程。

阅读以下资源以了解更多关于本教程中提到的概念:

  • 更改流和源连接器

  • 修改更改流输出

  • MongoDB Shell (mongosh)

返回

教程设置