监控变化
打开变更流
您可以使用以下对象的 watch()
方法来监视 MongoDB 中的更改watch()
方法用于以下对象
对于每个对象,当发生更改时,watch()
方法都会打开一个 变更流 并发出 变更事件 文档。
watch()
方法可以可选地接受一个 聚合管道,该管道作为第一个参数由一个 聚合阶段 数组组成。聚合阶段筛选和转换更改事件。
在下面的代码片段中,$match
阶段匹配所有具有小于 15 的 runtime
值的更改事件文档,过滤掉其他所有文档。
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
watch()
方法接受一个作为第二个参数的 options
对象。有关可以使用此对象配置的设置的更多信息,请参阅本节末尾的链接。
watch()
方法返回一个 ChangeStream 的实例。您可以通过迭代它们或监听事件来读取更改流中的事件。
选择对应于您想要从更改流中读取事件的方式的选项卡
从版本 4.12 开始,ChangeStream
对象是异步可迭代的。随着此更改,您可以使用 for-await
循环从打开的更改流检索事件
for await (const change of changeStream) { console.log("Received change: ", change); }
您可以在 ChangeStream
对象上调用方法,例如
hasNext()
以检查流中是否有剩余的文档next()
以请求流中的下一个文档close()
以关闭 ChangeStream
您可以通过调用on()
方法将监听器函数附加到ChangeStream
对象上。此方法继承自JavaScript的EventEmitter
类。将字符串"change"
作为第一个参数,将您的监听器函数作为第二个参数传递,如下所示:
changeStream.on("change", (changeEvent) => { /* your listener function */ });
当触发change
事件时,监听器函数会被调用。您可以在监听器中指定逻辑来处理接收到的更改事件文档。
您可以通过调用pause()
停止事件触发,或者调用resume()
继续触发事件来控制更改流。
要停止处理更改事件,请在ChangeStream
实例上调用close()方法来停止处理。这将关闭更改流并释放资源。
changeStream.close();
警告
驱动程序不支持在EventEmitter
和Iterator
模式下同时使用ChangeStream
,这会导致错误。这是为了防止未定义的行为,因为驱动程序无法保证哪个消费者先接收文档。
示例
迭代
以下示例在insertDB
数据库中的haikus
集合上打开更改流,并按发生顺序打印更改事件。
注意
您可以使用此示例连接到MongoDB的一个实例,并与包含样本数据的数据库交互。有关连接到您的MongoDB实例和加载样本数据集的更多信息,请参阅使用示例指南.
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
注意
相同的代码片段
上面的JavaScript和TypeScript代码片段是相同的。此用例中与驱动程序相关的TypeScript特定功能没有。
运行此代码后,如果您对haikus
集合进行更改,例如执行插入或删除操作,您可以在终端中看到打印的更改事件文档。
例如,如果您向集合中插入一个文档,则代码将打印以下输出
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
注意
从更新中接收完整文档
默认情况下,仅包含更新操作信息的更改事件只返回修改的字段,而不是完整的更新文档。您可以配置更改流以返回文档的最新版本,方法是将选项对象的fullDocument
字段设置为"updateLookup"
,如下所示
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
监听函数
以下示例在insertDB
数据库中的haikus
集合上打开更改流。让我们创建一个监听函数来接收并打印在集合上发生的更改事件。
首先,在集合上打开更改流,然后使用on()
方法定义更改流上的监听器。一旦设置监听器,通过在集合上执行更改来生成更改事件。
为了在集合上生成变更事件,我们可以使用insertOne()
方法来添加新文档。由于insertOne()
可能在监听器函数注册之前运行,我们使用一个定时器,定义为simulateAsyncPause
,在执行插入之前等待1秒钟。
在文档插入后,我们也使用了simulateAsyncPause
。这为监听器函数接收变更事件以及监听器在关闭ChangeStream
实例之前完成其执行提供了足够的时间,关闭实例的方法是使用close()
方法。
注意
包含定时器的原因
在此示例中使用的定时器仅用于演示目的。它们确保有足够的时间注册监听器,并在退出之前让监听器处理变更事件。
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
注意
相同的代码片段
上面的JavaScript和TypeScript代码片段是相同的。此用例中与驱动程序相关的TypeScript特定功能没有。
访问以下资源以获取更多关于此页面上提到的类和方法的信息