文档菜单
文档首页
/ / /
Node.js 驱动程序
/

监控变化

您可以使用以下对象的 watch() 方法来监视 MongoDB 中的更改watch() 方法用于以下对象

  • 集合

  • 数据库

  • MongoClient

对于每个对象,当发生更改时,watch() 方法都会打开一个 变更流 并发出 变更事件 文档。

watch() 方法可以可选地接受一个 聚合管道,该管道作为第一个参数由一个 聚合阶段 数组组成。聚合阶段筛选和转换更改事件。

在下面的代码片段中,$match 阶段匹配所有具有小于 15 的 runtime 值的更改事件文档,过滤掉其他所有文档。

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

watch() 方法接受一个作为第二个参数的 options 对象。有关可以使用此对象配置的设置的更多信息,请参阅本节末尾的链接。

watch() 方法返回一个 ChangeStream 的实例。您可以通过迭代它们或监听事件来读取更改流中的事件。

选择对应于您想要从更改流中读取事件的方式的选项卡

从版本 4.12 开始,ChangeStream 对象是异步可迭代的。随着此更改,您可以使用 for-await 循环从打开的更改流检索事件

for await (const change of changeStream) {
console.log("Received change: ", change);
}

您可以在 ChangeStream 对象上调用方法,例如

  • hasNext() 以检查流中是否有剩余的文档

  • next() 以请求流中的下一个文档

  • close() 以关闭 ChangeStream

您可以通过调用on()方法将监听器函数附加到ChangeStream对象上。此方法继承自JavaScript的EventEmitter类。将字符串"change"作为第一个参数,将您的监听器函数作为第二个参数传递,如下所示:

changeStream.on("change", (changeEvent) => { /* your listener function */ });

当触发change事件时,监听器函数会被调用。您可以在监听器中指定逻辑来处理接收到的更改事件文档。

您可以通过调用pause()停止事件触发,或者调用resume()继续触发事件来控制更改流。

要停止处理更改事件,请在ChangeStream实例上调用close()方法来停止处理。这将关闭更改流并释放资源。

changeStream.close();

警告

驱动程序不支持在EventEmitterIterator模式下同时使用ChangeStream,这会导致错误。这是为了防止未定义的行为,因为驱动程序无法保证哪个消费者先接收文档。

以下示例在insertDB数据库中的haikus集合上打开更改流,并按发生顺序打印更改事件。

注意

您可以使用此示例连接到MongoDB的一个实例,并与包含样本数据的数据库交互。有关连接到您的MongoDB实例和加载样本数据集的更多信息,请参阅使用示例指南.

1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);
1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);

注意

相同的代码片段

上面的JavaScript和TypeScript代码片段是相同的。此用例中与驱动程序相关的TypeScript特定功能没有。

运行此代码后,如果您对haikus集合进行更改,例如执行插入或删除操作,您可以在终端中看到打印的更改事件文档。

例如,如果您向集合中插入一个文档,则代码将打印以下输出

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

注意

从更新中接收完整文档

默认情况下,仅包含更新操作信息的更改事件只返回修改的字段,而不是完整的更新文档。您可以配置更改流以返回文档的最新版本,方法是将选项对象的fullDocument字段设置为"updateLookup",如下所示

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

以下示例在insertDB数据库中的haikus集合上打开更改流。让我们创建一个监听函数来接收并打印在集合上发生的更改事件。

首先,在集合上打开更改流,然后使用on()方法定义更改流上的监听器。一旦设置监听器,通过在集合上执行更改来生成更改事件。

为了在集合上生成变更事件,我们可以使用insertOne()方法来添加新文档。由于insertOne()可能在监听器函数注册之前运行,我们使用一个定时器,定义为simulateAsyncPause,在执行插入之前等待1秒钟。

在文档插入后,我们也使用了simulateAsyncPause。这为监听器函数接收变更事件以及监听器在关闭ChangeStream实例之前完成其执行提供了足够的时间,关闭实例的方法是使用close()方法。

注意

包含定时器的原因

在此示例中使用的定时器仅用于演示目的。它们确保有足够的时间注册监听器,并在退出之前让监听器处理变更事件。

1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);
1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);

注意

相同的代码片段

上面的JavaScript和TypeScript代码片段是相同的。此用例中与驱动程序相关的TypeScript特定功能没有。

访问以下资源以获取更多关于此页面上提到的类和方法的信息

返回

运行命令