打开更改流
概述
在本指南中,您可以学习如何使用 变更流 来监控数据的实时更改。变更流是MongoDB Server的一个功能,允许您的应用程序订阅单个集合、数据库或部署上的数据更改。
您可以使用变更流来执行以下操作
使设备能够跟踪和响应事件,例如运动检测相机
创建一个监控股票价格变化的程序
生成特定职位员工活动日志
您可以为应用程序接收的数据指定一组聚合运算符以进行过滤和转换。当连接到MongoDB部署v6.0或更高版本时,您还可以配置事件以包括更改前后的文档数据。
要了解如何打开和配置变更流,请参阅以下部分
示例数据
本指南中的示例监控的是directors
集合中的更改。假设该集合包含以下文档,它们被建模为结构体
let docs = vec! [ Director { name: "Todd Haynes".to_string(), movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()], oscar_noms: 1, }, Director { name: "Jane Campion".to_string(), movies: vec! ["The Piano".to_string(), "Bright Star".to_string()], oscar_noms: 5, } ];
提示
要了解如何向集合中插入文档,请参阅插入文档指南。
打开变更流
您可以通过打开变更流来订阅特定类型的数据变更,并在您的应用程序中产生变更事件。
要打开变更流,请在Collection
、Database
或Client
实例上调用watch()
方法。
重要
独立的MongoDB部署不支持变更流,因为此功能需要复制集oplog。有关oplog的更多信息,请参阅服务器手册中的复制集Oplog页面。
您调用watch()
方法的结构类型决定了变更流监听事件的范围。以下表格描述了根据其调用对象watch()
方法的操作。
结构类型 | watch() 行为 |
---|---|
Collection | 监控单个集合的变更 |
Database | 监控数据库中所有集合的变更 |
Client | 监控连接的MongoDB部署中的所有变更 |
示例
以下示例在directors
集合上打开变更流。该代码通过访问每个ChangeStreamEvent
实例的operation_type
和full_document
字段来打印每个变更事件的操作类型和修改的文档。
let mut change_stream = my_coll.watch().await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Document: {:?}", event.full_document); }
提示
要查看完整的ChangeStreamEvent
结构体字段列表,请参阅API文档中的ChangeStreamEvent。
如果您在directors
集合中插入一个name
值为"Wes Anderson"
的文档,前面的代码将产生以下输出
Operation performed: Insert Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })
将聚合操作符应用于您的更改流
您可以将pipeline()
方法链接到watch()
方法来指定更改流接收哪些更改事件。将聚合管道作为参数传递给pipeline()
。
要了解您的MongoDB服务器版本支持哪些聚合操作符,请参阅服务器手册中的修改更改流输出。
示例
以下示例创建了一个聚合管道来过滤更新操作。然后,代码将管道传递给pipeline()
方法,配置更改流只接收并打印更新操作的更改事件
let mut update_change_stream = my_coll.watch() .pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }]) .await?; while let Some(event) = update_change_stream.next().await.transpose()? { println!("Update performed: {:?}", event.update_description); }
如果您更新包含 name
值为 "Todd Haynes"
的文档,并通过增加 oscar_noms
字段的值来更新它,前面的代码将产生以下输出
Update performed: Some(UpdateDescription { updated_fields: Document({ "oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })
提示
要了解如何执行更新操作,请参阅修改文档指南。
包括预图像和后图像
您可以为变更事件配置包含或省略以下数据
预图像:表示操作之前文档版本的文档,如果存在的话
后图像:表示操作之后文档版本的文档,如果存在的话
重要
您只能在部署使用 MongoDB v6.0 或更高版本时,在集合上启用预图像和后图像。
要接收包含预图像或后图像的变更流事件,您必须执行以下操作
在您的 MongoDB 部署上为集合启用预图像和后图像。
提示
要了解如何在部署上启用预图像和后图像,请参阅服务器手册中的带有文档预图像和后图像的变更流。
要了解如何指导驱动程序创建启用预图像和后图像的集合,请参阅本页的创建启用预图像和后图像的集合部分。
配置您的变更流以检索预图像、后图像或两者。在此配置过程中,您可以指导驱动程序要求预图像和后图像或仅在可用时包含它们。
创建启用预图像和后图像的集合
要为您的集合启用预图像和后图像文档,请使用change_stream_pre_and_post_images()
选项构建器方法。以下示例使用此构建器方法指定集合选项,并创建一个可提供预图像和后图像的集合
let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build(); let result = my_db.create_collection("directors") .change_stream_pre_and_post_images(enable) .await?;
您可以通过从MongoDB Shell或从您的应用程序运行collMod
命令来更改现有集合中的预图像和后图像选项。有关如何执行此操作的说明,请参阅运行命令指南以及服务器手册中的collMod
条目。
警告
如果在集合上启用了预图像或后图像,使用collMod
修改这些设置可能会导致该集合上现有的更改流终止。
预图像配置示例
要配置返回包含预图像的更改事件的更改流,请使用full_document_before_change()
选项构建器方法。以下示例指定更改流选项并创建一个返回预图像文档的更改流
let pre_image = FullDocumentBeforeChangeType::Required; let mut change_stream = my_coll.watch() .full_document_before_change(pre_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Pre-image: {:?}", event.full_document_before_change); }
前面的示例将FullDocumentBeforeChangeType::Required
的值传递给full_document_before_change()
选项构建器方法。此方法配置更改流,要求替换、更新和删除更改事件需要预图像。如果预图像不可用,驱动程序将引发错误。
如果您更新一个其name
值为"Jane Campion"
的文档,则更改事件将产生以下输出
Operation performed: Update Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano", "Bright Star"], oscar_noms: 5 })
后图像配置示例
要配置返回包含后图像的更改事件的更改流,请使用full_document()
选项构建器方法。以下示例指定更改流选项并创建一个返回后图像文档的更改流
let post_image = FullDocumentType::WhenAvailable; let mut change_stream = my_coll.watch() .full_document(post_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Post-image: {:?}", event.full_document); }
上一个示例将值 FullDocument::WhenAvailable
传递给 full_document()
选项构建方法。此方法配置更改流,以便在可用时返回替换、更新和删除更改事件的 post-image。
如果您删除了其中 name
值为 "Todd Haynes"
的文档,则更改事件将生成以下输出
Operation performed: Delete Post-image: None
更多信息
API 文档
要了解更多关于本指南中提到的任何方法或类型的信息,请参阅以下 API 文档