文档菜单
文档首页
/ / /
Rust 驱动
/ / /

打开更改流

在本页

  • 概述
  • 示例数据
  • 打开更改流
  • 示例
  • 将聚合运算符应用于更改流
  • 示例
  • 包含预图像和后图像
  • 创建启用预图像和后图像的集合
  • 预图像配置示例
  • 后图像配置示例
  • 更多信息
  • API 文档

在本指南中,您可以学习如何使用 变更流 来监控数据的实时更改。变更流是MongoDB Server的一个功能,允许您的应用程序订阅单个集合、数据库或部署上的数据更改。

您可以使用变更流来执行以下操作

  • 使设备能够跟踪和响应事件,例如运动检测相机

  • 创建一个监控股票价格变化的程序

  • 生成特定职位员工活动日志

您可以为应用程序接收的数据指定一组聚合运算符以进行过滤和转换。当连接到MongoDB部署v6.0或更高版本时,您还可以配置事件以包括更改前后的文档数据。

要了解如何打开和配置变更流,请参阅以下部分

  • 示例数据

  • 打开更改流

  • 将聚合运算符应用于更改流

  • 包含预图像和后图像

  • 更多信息

本指南中的示例监控的是directors 集合中的更改。假设该集合包含以下文档,它们被建模为结构体

let docs = vec! [
Director {
name: "Todd Haynes".to_string(),
movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()],
oscar_noms: 1,
},
Director {
name: "Jane Campion".to_string(),
movies: vec! ["The Piano".to_string(), "Bright Star".to_string()],
oscar_noms: 5,
}
];

提示

要了解如何向集合中插入文档,请参阅插入文档指南。

您可以通过打开变更流来订阅特定类型的数据变更,并在您的应用程序中产生变更事件。

要打开变更流,请在CollectionDatabaseClient实例上调用watch()方法。

重要

独立的MongoDB部署不支持变更流,因为此功能需要复制集oplog。有关oplog的更多信息,请参阅服务器手册中的复制集Oplog页面。

您调用watch()方法的结构类型决定了变更流监听事件的范围。以下表格描述了根据其调用对象watch()方法的操作。

结构类型
watch()行为
Collection
监控单个集合的变更
Database
监控数据库中所有集合的变更
Client
监控连接的MongoDB部署中的所有变更

以下示例在directors集合上打开变更流。该代码通过访问每个ChangeStreamEvent实例的operation_typefull_document字段来打印每个变更事件的操作类型和修改的文档。

let mut change_stream = my_coll.watch().await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Document: {:?}", event.full_document);
}

如果您在directors集合中插入一个name值为"Wes Anderson"的文档,前面的代码将产生以下输出

Operation performed: Insert
Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })

您可以将pipeline()方法链接到watch()方法来指定更改流接收哪些更改事件。将聚合管道作为参数传递给pipeline()

要了解您的MongoDB服务器版本支持哪些聚合操作符,请参阅服务器手册中的修改更改流输出

以下示例创建了一个聚合管道来过滤更新操作。然后,代码将管道传递给pipeline()方法,配置更改流只接收并打印更新操作的更改事件

let mut update_change_stream = my_coll.watch()
.pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }])
.await?;
while let Some(event) = update_change_stream.next().await.transpose()? {
println!("Update performed: {:?}", event.update_description);
}

如果您更新包含 name 值为 "Todd Haynes" 的文档,并通过增加 oscar_noms 字段的值来更新它,前面的代码将产生以下输出

Update performed: Some(UpdateDescription { updated_fields: Document({
"oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })

提示

要了解如何执行更新操作,请参阅修改文档指南。

您可以为变更事件配置包含或省略以下数据

  • 预图像:表示操作之前文档版本的文档,如果存在的话

  • 后图像:表示操作之后文档版本的文档,如果存在的话

重要

您只能在部署使用 MongoDB v6.0 或更高版本时,在集合上启用预图像和后图像。

要接收包含预图像或后图像的变更流事件,您必须执行以下操作

  • 在您的 MongoDB 部署上为集合启用预图像和后图像。

    提示

    要了解如何在部署上启用预图像和后图像,请参阅服务器手册中的带有文档预图像和后图像的变更流

    要了解如何指导驱动程序创建启用预图像和后图像的集合,请参阅本页的创建启用预图像和后图像的集合部分。

  • 配置您的变更流以检索预图像、后图像或两者。在此配置过程中,您可以指导驱动程序要求预图像和后图像或仅在可用时包含它们。

    提示

    要配置您的变更流以在变更事件中记录预图像,请参阅本页的预图像配置示例

    要配置您的变更流以在变更事件中记录后图像,请参阅本页的后图像配置示例

要为您的集合启用预图像和后图像文档,请使用change_stream_pre_and_post_images()选项构建器方法。以下示例使用此构建器方法指定集合选项,并创建一个可提供预图像和后图像的集合

let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build();
let result = my_db.create_collection("directors")
.change_stream_pre_and_post_images(enable)
.await?;

您可以通过从MongoDB Shell或从您的应用程序运行collMod命令来更改现有集合中的预图像和后图像选项。有关如何执行此操作的说明,请参阅运行命令指南以及服务器手册中的collMod条目。

警告

如果在集合上启用了预图像或后图像,使用collMod修改这些设置可能会导致该集合上现有的更改流终止。

要配置返回包含预图像的更改事件的更改流,请使用full_document_before_change()选项构建器方法。以下示例指定更改流选项并创建一个返回预图像文档的更改流

let pre_image = FullDocumentBeforeChangeType::Required;
let mut change_stream = my_coll.watch()
.full_document_before_change(pre_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Pre-image: {:?}", event.full_document_before_change);
}

前面的示例将FullDocumentBeforeChangeType::Required的值传递给full_document_before_change()选项构建器方法。此方法配置更改流,要求替换、更新和删除更改事件需要预图像。如果预图像不可用,驱动程序将引发错误。

如果您更新一个其name值为"Jane Campion"的文档,则更改事件将产生以下输出

Operation performed: Update
Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano",
"Bright Star"], oscar_noms: 5 })

要配置返回包含后图像的更改事件的更改流,请使用full_document()选项构建器方法。以下示例指定更改流选项并创建一个返回后图像文档的更改流

let post_image = FullDocumentType::WhenAvailable;
let mut change_stream = my_coll.watch()
.full_document(post_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Post-image: {:?}", event.full_document);
}

上一个示例将值 FullDocument::WhenAvailable 传递给 full_document() 选项构建方法。此方法配置更改流,以便在可用时返回替换、更新和删除更改事件的 post-image。

如果您删除了其中 name 值为 "Todd Haynes" 的文档,则更改事件将生成以下输出

Operation performed: Delete
Post-image: None

要了解更多关于本指南中提到的任何方法或类型的信息,请参阅以下 API 文档

返回

数据游标