文档菜单
文档首页
/ / /
C#/.NET
/ / /

监控数据变化

本页内容

  • 概述
  • 示例数据
  • 打开变更流
  • 修改变更流输出
  • 修改Watch() 行为
  • 包含预图像和后图像
  • 附加信息
  • API 文档

在本指南中,您可以了解如何使用 变更流 来监控数据实时变更。变更流是 MongoDB 服务器的一项功能,允许您的应用程序订阅集合、数据库或部署上的数据变更。

本指南中的示例使用的是sample_restaurants.restaurants 集合,来自Atlas 示例数据集。要了解如何创建免费的 MongoDB Atlas 集群并加载数据集,请参阅快速入门.

本页的示例使用以下 RestaurantAddressGradeEntry 类作为模型

public class Restaurant
{
public ObjectId Id { get; set; }
public string Name { get; set; }
[BsonElement("restaurant_id")]
public string RestaurantId { get; set; }
public string Cuisine { get; set; }
public Address Address { get; set; }
public string Borough { get; set; }
public List<GradeEntry> Grades { get; set; }
}
public class Address
{
public string Building { get; set; }
[BsonElement("coord")]
public double[] Coordinates { get; set; }
public string Street { get; set; }
[BsonElement("zipcode")]
public string ZipCode { get; set; }
}
public class GradeEntry
{
public DateTime Date { get; set; }
public string Grade { get; set; }
public float? Score { get; set; }
}

注意

restaurants 集合中的文档使用蛇形命名约定。本指南中的示例使用 ConventionPack 将集合中的字段反序列化为 Pascal 大小写,并将它们映射到 Restaurant 类的属性。

要了解更多关于自定义序列化的信息,请参阅 自定义序列化。

要打开变更流,请调用 Watch()WatchAsync() 方法。您调用此方法的实例决定了变更流监听的事件范围。您可以在以下类上调用 Watch()WatchAsync() 方法:

  • MongoClient:用于监视MongoDB部署的所有变更

  • Database:用于监视数据库中所有集合的变更

  • Collection:用于监视集合的变更

以下示例在 restaurants 集合上打开变更流,并输出发生的变更。选择异步同步 选项卡以查看相应的代码。

var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change streams and print the changes as they're received
using var cursor = await collection.WatchAsync();
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
});
var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change stream and prints the changes as they're received
using (var cursor = collection.Watch())
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
}
}

要开始监视变更,运行应用程序。然后,在单独的应用程序或shell中,修改 restaurants 集合。更新具有 "name" 值为 "Blarney Castle" 的文档将导致以下变更流输出:

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

您可以将 pipeline 参数传递给 Watch()WatchAsync() 方法以修改变更流输出。此参数允许您仅监视指定的变更事件。使用 EmptyPipelineDefinition 类创建管道,并附加相关的聚合阶段方法。

您可以在 pipeline 参数中指定以下聚合阶段:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

要了解如何使用PipelineDefinitionBuilder类构建聚合管道,请参阅《使用构建器进行操作》指南中的构建聚合管道部分。

以下示例使用pipeline参数打开仅记录更新操作的更改流。选择异步同步选项卡,查看相应的代码。

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change stream and prints the changes as they're received
using (var cursor = await _restaurantsCollection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following change: " + change);
});
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change streams and print the changes as they're received
using (var cursor = _restaurantsCollection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following change: " + change);
}
}

要了解如何修改更改流输出,请参阅MongoDB服务器手册中的修改更改流输出部分。

Watch()WatchAsync()方法接受可选参数,这些参数代表您可以用来自定义操作的选项。如果您未指定任何选项,驱动程序不会自定义操作。

以下表格描述了您可以设置的选项,以自定义Watch()WatchAsync()的行为

选项
描述

FullDocument

指定是否在更改后显示完整文档,而不是仅显示对文档所做的更改。有关此选项的更多信息,请参阅包含预图像和后图像

FullDocumentBeforeChange

指定是否在更改前显示完整文档,而不是仅显示对文档所做的更改。有关此选项的更多信息,请参阅包含预图像和后图像

ResumeAfter

指导Watch()WatchAsync()在指定在恢复令牌中的操作后恢复返回更改。
每个更改流事件文档都包含一个作为_id字段的恢复令牌。传递表示您想要在之后恢复的操作的更改事件文档的整个_id字段。
ResumeAfterStartAfterStartAtOperationTime互斥。

StartAfter

Watch()WatchAsync() 指令用于在恢复令牌指定的操作之后启动一个新的更改流。允许在无效事件之后恢复通知。
每个更改流事件文档都包含一个作为_id字段的恢复令牌。传递表示您想要在之后恢复的操作的更改事件文档的整个_id字段。
StartAfterResumeAfterStartAtOperationTime 互斥。

StartAtOperationTime

Watch()WatchAsync() 指令用于返回仅发生在指定时间戳之后的事件。
StartAtOperationTimeResumeAfterStartAfter 互斥。

MaxAwaitTime

指定服务器在向更改流光标返回空批处理之前,等待新数据更改报告的最大时间(毫秒)。默认为1000毫秒。

ShowExpandedEvents

从 MongoDB 服务器 v6.0 版本开始,更改流支持数据定义语言 (DDL) 事件(如 createIndexesdropIndexes 事件)的更改通知。要在更改流中包含扩展事件,创建更改流光标并将此参数设置为 True

BatchSize

指定从 MongoDB 集群响应的每个批次中返回的最大更改事件数。

Collation

指定更改流光标要使用的排序规则。

Comment

将注释附加到操作。

重要

只有当您的部署使用 MongoDB v6.0 或更高版本时,您才能在集合上启用预图像和后图像。

默认情况下,当您对一个集合执行操作时,相应的更改事件仅包含该操作修改的字段的变化量。要查看更改前后完整的文档,请创建一个 ChangeStreamOptions 对象并指定 FullDocumentBeforeChangeFullDocument 选项。然后,将 ChangeStreamOptions 对象传递给 Watch()WatchAsync() 方法。

预图像 是更改之前的文档的完整版本。要包括预图像在更改流事件中,将 FullDocumentBeforeChange 选项设置为以下值之一

  • ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable:仅当预图像可用时,更改事件包括修改文档的预图像。

  • ChangeStreamFullDocumentBeforeChangeOption.Required:变更事件包括修改文档的预图像。如果预图像不可用,驱动程序将引发错误。

后图像是文档变更后的完整版本。要将后图像包含在变更流事件中,请将FullDocument选项设置为以下值之一

  • ChangeStreamFullDocumentOption.UpdateLookup:变更事件包括变更后某个时间点后整个更改文档的副本。

  • ChangeStreamFullDocumentOption.WhenAvailable:仅当后图像可用时,变更事件才包括修改文档的后图像。

  • ChangeStreamFullDocumentOption.Required:变更事件包括修改文档的后图像。如果后图像不可用,驱动程序将引发错误。

以下示例在集合上打开一个变更流,并通过指定FullDocument选项包括更新文档的后图像。选择异步同步选项卡以查看相应的代码。

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
});
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = _restaurantsCollection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
}
}

运行上述代码示例并更新具有"name"值为"Blarney Castle"的文档,将产生以下变更流输出

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

有关预图像和后图像的更多信息,请参阅MongoDB服务器手册中的包含文档预图像和后图像的变更流

有关变更流的更多信息,请参阅MongoDB服务器手册中的变更流

要了解本指南中讨论的任何方法或类型,请参阅以下API文档

Back

列出不同值