文档菜单
文档首页
/ / /
C++ 驱动
/

监控数据变化

本页内容

  • 概述
  • 示例数据
  • 打开变更流
  • 修改更改流输出
  • 修改watch() 行为
  • 包含预图像和后图像
  • 附加信息
  • API 文档

在本指南中,您可以学习如何使用 更改流 来监视数据库的实时更改。更改流是 MongoDB 服务器的一个功能,允许您的应用程序订阅集合、数据库或部署上的数据更改。

当使用 C++ 驱动程序时,您可以实例化一个mongocxx::change_stream 来监视数据更改。

本指南中的示例使用来自Atlas 示例数据集sample_restaurants 数据库中的 restaurants 集合。要从您的 C++ 应用程序访问此集合,实例化一个连接到 Atlas 集群的 mongocxx::client,并将以下值分配给您的 dbcollection 变量

auto db = client["sample_restaurants"];
auto collection = db["restaurants"];

要了解如何创建免费的 MongoDB Atlas 集群并加载示例数据集,请参阅 Atlas 入门指南

要打开变更流,请调用 watch() 方法。你调用 watch() 方法时所在的实例决定了变更流监听的事件范围。你可以在以下类上调用 watch() 方法

  • mongocxx::client:监视MongoDB部署中的所有更改

  • mongocxx::database:监视数据库中所有集合的更改

  • mongocxx::collection:监视集合的更改

以下示例在 restaurants 集合上打开变更流,并按发生顺序输出更改

auto stream = collection.watch();
while (true) {
for (const auto& event : stream) {
std::cout << bsoncxx::to_json(event) << std::endl;
}
}

要开始监听更改,运行前面的代码。然后,在单独的应用程序或shell中修改 restaurants 集合。以下示例更新一个具有 name 字段值为 Blarney Castle 的文档

auto result = collection.update_one(make_document(kvp("name", "Blarney Castle")),
make_document(kvp("$set",
make_document(kvp("cuisine", "Irish")))));

当你更新集合时,变更流应用程序会按发生顺序打印更改。打印的更改事件类似于以下输出

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
{ "$timestamp" : { ... }, "wallTime" : { "$date" : ... }, "ns" :
{ "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" :
{ "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" :
{ "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }

你可以通过将 mongocxx::pipeline 实例作为参数传递给 watch() 方法来修改变更流输出。以下列表包括你可以通过调用它们的相应设置方法设置的某些 mongocxx::pipeline 字段

  • add_fields:向文档添加新字段

  • match:过滤文档

  • project:投影文档的字段子集

  • redact:限制文档内容

  • group:按指定的表达式对文档进行分组

  • merge:将结果输出到集合

提示

有关 mongocxx::pipeline 字段的完整列表,请参阅mongocxx::pipeline API 文档。

以下示例设置 match 字段,然后将管道传递给 watch() 方法。这将指示 watch() 方法仅输出更新操作

mongocxx::pipeline pipeline;
pipeline.match(make_document(kvp("operationType", "update")));
auto stream = collection.watch(pipeline);
while (true) {
for (const auto& event : stream) {
std::cout << bsoncxx::to_json(event) << std::endl;
}
}

您可以通过传递 mongocxx::options::change_stream 类的实例作为参数来修改 watch() 方法的行为。以下表格描述了您可以在 mongocxx::options::change_stream 实例中设置的字段

字段
描述

full_document

指定是否在更改后显示整个文档,而不是仅显示对文档所做的更改。有关此选项的更多信息,请参阅包含预图像和后图像.

full_document_before_change

指定是否在更改后显示更改前的完整文档,而不是仅显示对文档所做的更改。有关此选项的更多信息,请参阅包含预图像和后图像

resume_after

指示 watch() 在指定的恢复令牌之后的操作后继续返回更改。
每个更改流事件文档都包含一个恢复令牌作为 _id 字段。传递代表您想要在之后恢复的操作的更改事件文档的整个 _id 字段。
resume_afterstart_afterstart_at_operation_time 互斥。

start_after

指示 watch() 在指定的恢复令牌中的操作之后启动一个新的更改流。该字段允许在无效事件后恢复通知。
每个更改流事件文档都包含一个恢复令牌作为 _id 字段。传递代表您想要在之后恢复的操作的更改事件文档的整个 _id 字段。
start_afterresume_afterstart_at_operation_time 互斥。

start_at_operation_time

指示 watch() 仅返回在指定时间戳之后发生的事件。
start_at_operation_timeresume_afterstart_after 互斥。

max_await_time_ms

设置服务器在返回空批次之前等待新数据更改报告给更改流游标的最大时间(以毫秒为单位)。默认为1000毫秒。

batch_size

设置从MongoDB集群的响应中每个批次返回的更改事件的最大数量。

collation

设置用于更改流游标的排序规则。

comment

将注释附加到操作上。

重要

只有当您的部署使用MongoDB v6.0或更高版本时,您才能在集合上启用预图像和后图像。

默认情况下,当您对集合执行操作时,相应的更改事件仅包含该操作修改的字段的增量。要查看更改前后完整的文档,请指定 mongocxx::options::change_stream 实例的 full_document_before_changefull_document 字段。

预图像 是更改之前的文档的完整版本。要包含预图像在更改流事件中,将 full_document_before_change 字段设置为以下字符串之一

  • "whenAvailable":仅在预图像可用时,更改事件包含修改文档的预图像。

  • "required":更改事件包含修改文档的预图像。如果预图像不可用,驱动程序将引发错误。

后图像 是更改之后的文档的完整版本。要包含后图像在更改流事件中,将 full_document 字段设置为以下字符串之一

  • "updateLookup":更改事件包含更改后某个时间点的整个更改文档的副本。

  • "whenAvailable":仅在后图像可用时,更改事件包含修改文档的后图像。

  • "required":更改事件包含修改文档的后图像。如果后图像不可用,驱动程序将引发错误。

以下示例在集合上调用 watch() 方法,并通过设置 mongocxx::options::change_stream 实例的 full_document 字段来包含更新文档的图片后版本。

mongocxx::options::change_stream opts;
opts.full_document("updateLookup");
auto stream = collection.watch(opts);
while (true) {
for (const auto& event : stream) {
std::cout << bsoncxx::to_json(event) << std::endl;
}
}

当更改流应用程序运行时,使用前面的更新示例通过 restaurants 集合更新文档,将打印出类似于以下代码的更改事件。

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
{ "$timestamp" : { ... } }, "wallTime" : { "$date" : ... },
"fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" : "202-24",
"coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street" :
"Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "cuisine" :
"Irish", "grades" : [ ... ], "name" : "Blarney Castle", "restaurant_id" : "40366356" },
"ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" :
{ "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" :
{ "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }

提示

要了解有关前版本和后版本更多信息,请参阅 MongoDB 服务器手册中的 带有文档前版本和后版本的更改流

要了解更多关于更改流的信息,请参阅 MongoDB 服务器手册中的 更改流

要了解更多关于本指南中讨论的任何方法或类型的信息,请参阅以下 API 文档

返回

游标