变更流
MongoDB 3.6 引入了$changeStream
聚合管道运算符。
变更流提供了一种观察集合中文档变化的方法。为了提高这一新阶段的可操作性,MongoCollection
类型增加了一个新的 watch()
方法。ChangeStreamPublisher
实例设置变更流,并在遇到可能恢复的错误时自动尝试恢复。
先决条件
您必须设置以下组件以运行本指南中的代码示例
test.restaurants
集合,其中包含来自文档资产 GitHub 中的restaurants.json
文件的数据。文档.以下导入语句
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.ChangeStreamDocument; import org.bson.Document;
重要
本指南使用 Subscriber
实现,这些实现已在 Quick Start Primer
中描述。中.
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义 MongoDatabase
和 MongoCollection
实例。
以下代码连接到运行在本地主机 localhost
端口 27017
的独立 MongoDB 部署。然后,定义 database
变量以引用 test
数据库,以及 collection
变量以引用 restaurants
集合。
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB 教程。
注意集合上的变化
要创建一个更改流,请使用 MongoCollection.watch()
方法之一。
以下示例中,更改流打印出它观察到的所有更改
collection.watch().subscribe(new PrintDocumentSubscriber());
注意数据库上的变化
应用程序可以打开一个更改流来监视数据库中的所有非系统集合。要创建此类更改流,请使用 MongoDatabase.watch()
方法之一。
以下示例中,更改流打印出它在给定数据库上观察到的所有更改
database.watch().subscribe(new PrintDocumentSubscriber());
注意所有数据库上的变化
应用程序可以打开一个更改流来监视MongoDB部署中所有数据库的所有非系统集合。要创建此类更改流,请使用 MongoClient.watch()
方法之一。
以下示例中,更改流打印出它在与 MongoClient
连接的部署上观察到的所有更改
client.watch().subscribe(new PrintDocumentSubscriber());
内容过滤
您可以将一系列聚合阶段传递给 watch()
方法以修改 $changeStream
操作符返回的数据。
注意
并非所有聚合操作符都受到支持。有关更多信息,请参阅服务器手册中的 变更流。
在以下示例中,变更流打印出它观察到的所有与 insert
、update
、replace
和 delete
操作相对应的变更。
首先,管道包括一个 $match
阶段来筛选 operationType
为 insert
、update
、replace
或 delete
的文档。然后,将 fullDocument
设置为 FullDocument.UPDATE_LOOKUP
,以便将更新后的文档包含在结果中。
collection.watch( asList( Aggregates.match( Filters.in("operationType", asList("insert", "update", "replace", "delete")) ) ) ).fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(new PrintDocumentSubscriber());