监控数据更改
概述
在本指南中,您可以学习如何使用 C 驱动程序来监视变更流,让您能够查看数据实时变更。变更流是 MongoDB 服务器的一个功能,它会在集合、数据库或部署上发布数据变更。您的应用程序可以订阅变更流并使用事件执行其他操作。
示例数据
本指南中的示例使用来自sample_restaurants
数据库中的 restaurants
集合。Atlas 示例数据集。要了解如何创建免费的 MongoDB Atlas 集群并加载示例数据集,请参阅Atlas 入门指南。
打开变更流
要打开变更流,调用以下函数之一,以对应您要观察的事件范围
mongoc_client_watch()
:监视 MongoDB 部署中的所有更改mongoc_database_watch()
:监控数据库中所有集合的变化mongoc_collection_watch()
:监控集合中的变化
打开变化流示例
以下示例在restaurants
集合上打开一个变化流,并打印变化
bson_t *pipeline = bson_new (); const bson_t *doc; mongoc_change_stream_t *change_stream = mongoc_collection_watch (collection, pipeline, NULL); while (true) { bson_error_t error; if (mongoc_change_stream_next (change_stream, &doc)) { char *str = bson_as_canonical_extended_json (doc, NULL); printf ("Received change: %s\n", str); bson_free (str); } else if (mongoc_change_stream_error_document (change_stream, &error, NULL)) { printf("Got error on change stream: %s\n", error.message); break; } } bson_destroy (pipeline); mongoc_change_stream_destroy (change_stream);
要开始监控变化,运行应用程序。然后在单独的应用程序或shell中,对restaurants
集合执行写操作。以下示例更新了一个字段值为"Blarney Castle"
的文档
bson_t *filter = BCON_NEW ("name", BCON_UTF8 ("Blarney Castle")); bson_t *update = BCON_NEW("$set", "{", "cuisine", BCON_UTF8 ("Irish"), "}"); mongoc_collection_update_one (collection, filter, update, NULL, NULL, NULL);
当更新集合时,变化流应用程序将按发生顺序打印变化。打印的变化事件类似于以下内容
{ "_id": { ... }, "operationType": "update", "clusterTime": { ... }, "ns": { "db": "sample_restaurants", "coll": "restaurants" }, "updateDescription": { "updatedFields": { "cuisine": "Irish" }, "removedFields": [], "truncatedArrays": [] } ... }
修改变化流输出
您可以将pipeline
参数传递给任何监控函数以修改变化流输出。此参数允许您仅监控指定的变化事件。将参数格式化为对象列表,其中每个对象表示一个聚合阶段。
您可以在pipeline
参数中指定以下阶段
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
匹配特定事件示例
以下示例使用 pipeline
参数包括一个 $match
阶段,以打开仅记录更新操作的更改流。
bson_t *pipeline = BCON_NEW ( "pipeline", "[", "{", "$match", "{", "operationType", BCON_UTF8 ("update"), "}", "}", "]"); const bson_t *doc; mongoc_change_stream_t *change_stream = mongoc_collection_watch (collection, pipeline, NULL); while (mongoc_change_stream_next (change_stream, &doc)) { char *str = bson_as_canonical_extended_json (doc, NULL); printf ("Received change: %s\n", str); bson_free (str); } bson_destroy (pipeline); mongoc_change_stream_destroy (change_stream);
要了解有关修改更改流输出的更多信息,请参阅 MongoDB 服务器手册中的修改更改流输出部分。
修改监控行为
您可以通过向函数调用传递选项来修改任何监控函数。如果您没有指定任何选项,驱动程序不会自定义操作。
以下表格描述了您可以用于自定义监控函数行为的选项
选项 | 描述 |
---|---|
batchSize | 设置每个批次返回的文档数量。 |
comment | 指定要附加到操作上的注释。 |
fullDocument | 设置 fullDocument 值。要了解更多信息,请参阅本文件中的包括预图像和后图像部分。 |
fullDocumentBeforeChange | 设置 fullDocumentBeforeChange 值。要了解更多信息,请参阅本文件中的包括预图像和后图像部分。 |
maxAwaitTimeMS | 设置服务器上此操作的最大等待执行时间(以毫秒为单位)。 |
有关可以用于配置监控操作的完整选项列表,请参阅 MongoDB 服务器手册中的watch 方法指南。
包括预图像和后图像
重要
您只能在部署使用 MongoDB v6.0 或更高版本时,在集合上启用预图像和后图像。
默认情况下,当您对一个集合执行操作时,相应的更改事件仅包含由该操作修改的字段的变化量。要查看更改前后完整的文档,请在您的监视函数调用中指定 fullDocumentBeforeChange
或 fullDocument
选项。
前像 是更改前的文档的完整版本。要包含前像在更改流事件中,将以下其中一个值传递给 fullDocumentBeforeChange
选项。
whenAvailable
:只有在前像可用的情况下,更改事件才包含修改文档的前像。required
:更改事件包含修改文档的前像。如果前像不可用,驱动程序将引发错误。
后像 是更改后的文档的完整版本。要包含后像在更改流事件中,将以下其中一个值传递给 fullDocument
选项。
updateLookup
:更改事件包含从更改后的某个时间点开始的整个已更改文档的副本。whenAvailable
:只有在后像可用的情况下,更改事件才包含修改文档的后像。required
:更改事件包含修改文档的后像。如果后像不可用,驱动程序将引发错误。
以下示例在集合上调用 mongoc_collection_watch()
函数,并通过指定 fullDocument
选项将更新文档的后像包含在结果中。
bson_t *pipeline = bson_new (); bson_t *opts = BCON_NEW ("fullDocument", BCON_UTF8 ("updateLookup")); const bson_t *doc; mongoc_change_stream_t *change_stream = mongoc_collection_watch (collection, pipeline, opts); while (true) { bson_error_t error; if (mongoc_change_stream_next (change_stream, &doc)) { char *str = bson_as_canonical_extended_json (doc, NULL); printf ("Received change: %s\n", str); bson_free (str); } else if (mongoc_change_stream_error_document (change_stream, &error, NULL)) { printf("Got error on change stream: %s\n", error.message); break; } } bson_destroy (pipeline); bson_destroy (opts); mongoc_change_stream_destroy (change_stream);
在更改流应用程序运行时,使用 前面的更新示例 更新 restaurants
集合中的文档会打印出类似于以下的变化事件。
{ "_id": ..., "operationType": "update", "clusterTime": ..., "wallTime": ..., "fullDocument": { "_id": { ... }, "address": ..., "borough": "Queens", "cuisine": "Irish", "grades": [ ... ], "name": "Blarney Castle", "restaurant_id": ... }, "ns": { "db": "sample_restaurants", "coll": "restaurants" }, "documentKey": { "_id": ... }, "updateDescription": { "updatedFields": { "cuisine": "Irish" }, "removedFields": [], "truncatedArrays": [] } }
有关前像和后像的更多信息,请参阅 MongoDB 服务器手册中的 具有文档前像和后像的更改流。
更多信息
有关更改流的更多信息,请参阅 MongoDB 服务器手册中的 更改流。
API 文档
要了解更多关于本指南中讨论的任何函数或类型的信息,请参阅以下 API 文档