监控数据变化
概述
在本指南中,您可以了解如何使用 变更流 来监控数据实时变更。变更流是 MongoDB 服务器的一项功能,允许您的应用程序订阅集合、数据库或部署上的数据变更。
示例数据
本指南中的示例使用的是sample_restaurants.restaurants
集合,来自Atlas 示例数据集。要了解如何创建免费的 MongoDB Atlas 集群并加载数据集,请参阅快速入门.
本页的示例使用以下 Restaurant
、Address
和 GradeEntry
类作为模型
public class Restaurant { public ObjectId Id { get; set; } public string Name { get; set; } [ ] public string RestaurantId { get; set; } public string Cuisine { get; set; } public Address Address { get; set; } public string Borough { get; set; } public List<GradeEntry> Grades { get; set; } }
public class Address { public string Building { get; set; } [ ] public double[] Coordinates { get; set; } public string Street { get; set; } [ ] public string ZipCode { get; set; } }
public class GradeEntry { public DateTime Date { get; set; } public string Grade { get; set; } public float? Score { get; set; } }
注意
在 restaurants
集合中的文档使用蛇形命名约定。本指南中的示例使用 ConventionPack
将集合中的字段反序列化为 Pascal 大小写,并将它们映射到 Restaurant
类的属性。
要了解更多关于自定义序列化的信息,请参阅 自定义序列化。
打开变更流
要打开变更流,请调用 Watch()
或 WatchAsync()
方法。您调用此方法的实例决定了变更流监听的事件范围。您可以在以下类上调用 Watch()
或 WatchAsync()
方法:
MongoClient
:用于监视MongoDB部署的所有变更Database
:用于监视数据库中所有集合的变更Collection
:用于监视集合的变更
以下示例在 restaurants
集合上打开变更流,并输出发生的变更。选择异步 或 同步 选项卡以查看相应的代码。
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change streams and print the changes as they're received using var cursor = await collection.WatchAsync(); await cursor.ForEachAsync(change => { Console.WriteLine("Received the following type of change: " + change.BackingDocument); });
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change stream and prints the changes as they're received using (var cursor = collection.Watch()) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following type of change: " + change.BackingDocument); } }
要开始监视变更,运行应用程序。然后,在单独的应用程序或shell中,修改 restaurants
集合。更新具有 "name"
值为 "Blarney Castle"
的文档将导致以下变更流输出:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...), "wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [], "truncatedArrays" : [] } }
修改变更流输出
您可以将 pipeline
参数传递给 Watch()
和 WatchAsync()
方法以修改变更流输出。此参数允许您仅监视指定的变更事件。使用 EmptyPipelineDefinition
类创建管道,并附加相关的聚合阶段方法。
您可以在 pipeline
参数中指定以下聚合阶段:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
要了解如何使用PipelineDefinitionBuilder
类构建聚合管道,请参阅《使用构建器进行操作》指南中的构建聚合管道部分。
以下示例使用pipeline
参数打开仅记录更新操作的更改流。选择异步或同步选项卡,查看相应的代码。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change stream and prints the changes as they're received using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) { await cursor.ForEachAsync(change => { Console.WriteLine("Received the following change: " + change); }); }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change streams and print the changes as they're received using (var cursor = _restaurantsCollection.Watch(pipeline)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following change: " + change); } }
要了解如何修改更改流输出,请参阅MongoDB服务器手册中的修改更改流输出部分。
修改Watch()
行为
Watch()
和WatchAsync()
方法接受可选参数,这些参数代表您可以用来自定义操作的选项。如果您未指定任何选项,驱动程序不会自定义操作。
以下表格描述了您可以设置的选项,以自定义Watch()
和WatchAsync()
的行为
选项 | 描述 |
---|---|
| 指定是否在更改后显示完整文档,而不是仅显示对文档所做的更改。有关此选项的更多信息,请参阅包含预图像和后图像。 |
| 指定是否在更改前显示完整文档,而不是仅显示对文档所做的更改。有关此选项的更多信息,请参阅包含预图像和后图像。 |
| 指导 Watch() 或WatchAsync() 在指定在恢复令牌中的操作后恢复返回更改。每个更改流事件文档都包含一个作为 _id 字段的恢复令牌。传递表示您想要在之后恢复的操作的更改事件文档的整个_id 字段。ResumeAfter 与StartAfter 和StartAtOperationTime 互斥。 |
| 将 Watch() 或 WatchAsync() 指令用于在恢复令牌指定的操作之后启动一个新的更改流。允许在无效事件之后恢复通知。每个更改流事件文档都包含一个作为 _id 字段的恢复令牌。传递表示您想要在之后恢复的操作的更改事件文档的整个_id 字段。StartAfter 与 ResumeAfter 和 StartAtOperationTime 互斥。 |
| 将 Watch() 或 WatchAsync() 指令用于返回仅发生在指定时间戳之后的事件。StartAtOperationTime 与 ResumeAfter 和 StartAfter 互斥。 |
| 指定服务器在向更改流光标返回空批处理之前,等待新数据更改报告的最大时间(毫秒)。默认为1000毫秒。 |
| 从 MongoDB 服务器 v6.0 版本开始,更改流支持数据定义语言 (DDL) 事件(如 createIndexes 和 dropIndexes 事件)的更改通知。要在更改流中包含扩展事件,创建更改流光标并将此参数设置为 True 。 |
| 指定从 MongoDB 集群响应的每个批次中返回的最大更改事件数。 |
| 指定更改流光标要使用的排序规则。 |
| 将注释附加到操作。 |
Include Pre-Images and Post-Images
重要
只有当您的部署使用 MongoDB v6.0 或更高版本时,您才能在集合上启用预图像和后图像。
默认情况下,当您对一个集合执行操作时,相应的更改事件仅包含该操作修改的字段的变化量。要查看更改前后完整的文档,请创建一个 ChangeStreamOptions
对象并指定 FullDocumentBeforeChange
或 FullDocument
选项。然后,将 ChangeStreamOptions
对象传递给 Watch()
或 WatchAsync()
方法。
预图像 是更改之前的文档的完整版本。要包括预图像在更改流事件中,将 FullDocumentBeforeChange
选项设置为以下值之一
ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable
:仅当预图像可用时,更改事件包括修改文档的预图像。ChangeStreamFullDocumentBeforeChangeOption.Required
:变更事件包括修改文档的预图像。如果预图像不可用,驱动程序将引发错误。
后图像是文档变更后的完整版本。要将后图像包含在变更流事件中,请将FullDocument
选项设置为以下值之一
ChangeStreamFullDocumentOption.UpdateLookup
:变更事件包括变更后某个时间点后整个更改文档的副本。ChangeStreamFullDocumentOption.WhenAvailable
:仅当后图像可用时,变更事件才包括修改文档的后图像。ChangeStreamFullDocumentOption.Required
:变更事件包括修改文档的后图像。如果后图像不可用,驱动程序将引发错误。
以下示例在集合上打开一个变更流,并通过指定FullDocument
选项包括更新文档的后图像。选择异步或同步选项卡以查看相应的代码。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options); await cursor.ForEachAsync(change => { Console.WriteLine(change.FullDocument.ToBsonDocument()); });
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using (var cursor = _restaurantsCollection.Watch(pipeline, options)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine(change.FullDocument.ToBsonDocument()); } }
运行上述代码示例并更新具有"name"
值为"Blarney Castle"
的文档,将产生以下变更流输出
{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356", "cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }
有关预图像和后图像的更多信息,请参阅MongoDB服务器手册中的包含文档预图像和后图像的变更流。
更多信息
有关变更流的更多信息,请参阅MongoDB服务器手册中的变更流。
API文档
要了解本指南中讨论的任何方法或类型,请参阅以下API文档