文档菜单
文档首页
/ / /
Java Reactive Streams 驱动程序
/

变更流

本页内容

  • 先决条件
  • 连接到 MongoDB 部署
  • 监视集合上的更改
  • 监视数据库上的更改
  • 监视所有数据库上的更改
  • 内容筛选

MongoDB 3.6 引入了$changeStream 聚合管道运算符。

变更流提供了一种观察集合中文档变化的方法。为了提高这一新阶段的可操作性,MongoCollection 类型增加了一个新的 watch() 方法。ChangeStreamPublisher 实例设置变更流,并在遇到可能恢复的错误时自动尝试恢复。

您必须设置以下组件以运行本指南中的代码示例

  • test.restaurants 集合,其中包含来自文档资产 GitHub 中的 restaurants.json 文件的数据。文档.

  • 以下导入语句

import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.Document;

重要

本指南使用 Subscriber 实现,这些实现已在 Quick Start Primer 中描述。.

首先,连接到 MongoDB 部署,然后声明并定义 MongoDatabaseMongoCollection 实例。

以下代码连接到运行在本地主机 localhost 端口 27017 的独立 MongoDB 部署。然后,定义 database 变量以引用 test 数据库,以及 collection 变量以引用 restaurants 集合。

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");

要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB 教程。

要创建一个更改流,请使用 MongoCollection.watch() 方法之一。

以下示例中,更改流打印出它观察到的所有更改

collection.watch().subscribe(new PrintDocumentSubscriber());

应用程序可以打开一个更改流来监视数据库中的所有非系统集合。要创建此类更改流,请使用 MongoDatabase.watch() 方法之一。

以下示例中,更改流打印出它在给定数据库上观察到的所有更改

database.watch().subscribe(new PrintDocumentSubscriber());

应用程序可以打开一个更改流来监视MongoDB部署中所有数据库的所有非系统集合。要创建此类更改流,请使用 MongoClient.watch() 方法之一。

以下示例中,更改流打印出它在与 MongoClient 连接的部署上观察到的所有更改

client.watch().subscribe(new PrintDocumentSubscriber());

您可以将一系列聚合阶段传递给 watch() 方法以修改 $changeStream 操作符返回的数据。

注意

并非所有聚合操作符都受到支持。有关更多信息,请参阅服务器手册中的 变更流

在以下示例中,变更流打印出它观察到的所有与 insertupdatereplacedelete 操作相对应的变更。

首先,管道包括一个 $match 阶段来筛选 operationTypeinsertupdatereplacedelete 的文档。然后,将 fullDocument 设置为 FullDocument.UPDATE_LOOKUP,以便将更新后的文档包含在结果中。

collection.watch(
asList(
Aggregates.match(
Filters.in("operationType", asList("insert", "update", "replace", "delete"))
)
)
).fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(new PrintDocumentSubscriber());

返回

聚合框架