文档菜单
文档首页
/ / /
Java 同步驱动程序
/ / /

打开变更流

本页内容

  • 概述
  • 打开变更流
  • 将聚合运算符应用到变更流中
  • 拆分大型变更流事件
  • 包括前图像和后图像

在本指南中,您可以学习如何使用 变更流 来监控数据库的实时变化。变更流是 MongoDB 服务器的一项功能,允许您的应用程序订阅单个集合、数据库或部署上的数据变化。

您可以指定一组聚合运算符来筛选和转换应用程序接收到的数据。当连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改前后的文档数据。

有关如何打开和配置变更流的信息,请参阅以下部分

  • 打开变更流

  • 将聚合运算符应用到变更流中

  • 包括前图像和后图像

您可以通过开启数据变更流来订阅特定类型的数据变更,并在您的应用程序中产生变更事件。

要开启变更流,请调用watch() 方法,在 MongoCollectionMongoDatabaseMongoClient 的实例上。

重要

独立的 MongoDB 部署不支持变更流,因为此功能需要副本集 oplog。有关 oplog 的更多信息,请参阅 MongoDB Server 手册页面副本集 oplog

您在哪个对象上调用 watch() 方法,就决定了变更流监听事件的范围。

如果您在 MongoCollection 上调用 watch(),变更流将监控该集合。

如果您在 MongoDatabase 上调用 watch(),变更流将监控该数据库中的所有集合。

如果您在 MongoClient 上调用 watch(),变更流将监控连接的 MongoDB 部署中的所有变更。

此示例展示了如何在 myColl 集合上开启变更流,并按发生顺序打印变更流事件。

驱动程序将变更流事件存储在类型为 ChangeStreamIterable 的变量中。在以下示例中,我们指定驱动程序应使用 Document 类型填充 ChangeStreamIterable 对象。因此,驱动程序将单个变更流事件存储为 ChangeStreamDocument 对象。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

在集合上的插入操作产生以下输出

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

有关可运行的示例,请参阅监视变更 使用示例页面。

有关 watch() 方法的更多信息,请参阅以下 API 文档

您可以将聚合管道作为参数传递给 watch() 方法,以指定变更流接收哪些变更事件。

要了解您的 MongoDB 服务器版本支持哪些聚合运算符,请参阅 修改变更流输出。

以下代码示例展示了如何将聚合管道应用于配置变更流,以便仅接收插入和更新操作的变更事件

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

对集合的更新操作产生以下输出

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

从 MongoDB 7.0 开始,您可以使用 $changeStreamSplitLargeEvent 聚合阶段将超过 16 MB 的事件拆分为更小的片段。

仅在绝对必要时使用$changeStreamSplitLargeEvent。例如,如果您的应用程序需要完整的文档预或后图像,并且生成超过16 MB的事件,则使用$changeStreamSplitLargeEvent

在$changeStreamSplitLargeEvent阶段,返回片段是顺序的。您可以通过使用更改流游标来访问片段。每个片段都包含一个包含以下字段的SplitEvent对象

字段
描述
片段
片段的索引,从1开始
组成拆分事件的片段总数

以下示例通过使用$changeStreamSplitLargeEvent聚合阶段来修改您的更改流,以拆分大事件

ChangeStreamIterable<Document> changeStream = collection.watch(
Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

注意

在您的聚合管道中,只能有一个$changeStreamSplitLargeEvent阶段,并且它必须是管道中的最后一个阶段。

您可以在更改流游标上调用getSplitEvent()方法来访问SplitEvent,如下例所示

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

有关$changeStreamSplitLargeEvent聚合阶段的更多信息,请参阅$changeStreamSplitLargeEvent服务器文档。

您可以为更改事件配置包含或省略以下数据

  • 预图像,表示操作之前文档版本的文档,如果存在

  • 后图像,表示操作之后文档版本的文档,如果存在

重要

您只能在您的部署使用MongoDB v6.0或更高版本的情况下,在集合上启用预和后图像。

要接收包含预图像或后图像的更改流事件,您必须执行以下操作

  • 在您的MongoDB部署中为集合启用预图像和后图像。

    提示

    有关在您的部署上启用预和后图像的说明,请参阅服务器手册中的包含文档预和后图像的更改流

    要了解如何指导驱动程序创建启用预图像和后图像的集合,请参阅创建启用预图像和后图像的集合部分。

  • 配置您的更改流以检索预图像或后图像之一或两者。

    提示

    要配置您的更改流以在更改事件中记录预图像,请参阅预图像配置示例。

    要配置您的更改流以在更改事件中记录后图像,请参阅后图像配置示例。

要使用驱动程序创建启用预图像和后图像选项的集合,请指定一个ChangeStreamPreAndPostImagesOptions实例,并调用以下示例中的createCollection()方法

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

您可以通过从MongoDB Shell运行collMod命令来更改现有集合中的预图像和后图像选项。有关如何执行此操作的信息,请参阅服务器手册中的collMod

警告

如果您在集合上启用了预图像或后图像,则使用collMod修改这些设置可能导致该集合上现有的更改流失败。

以下代码示例展示了如何配置myColl集合上的更改流以包括预图像并输出任何更改事件

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

上一个示例配置了更改流使用 FullDocumentBeforeChange.REQUIRED 选项。此选项配置更改流在替换、更新和删除更改事件时需要预图像。如果预图像不可用,驱动程序将引发错误。

假设您将文档中 amount 字段的值从 150 更新到 2000。此更改事件产生以下输出

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

有关选项列表,请参阅FullDocumentBeforeChange API 文档。

以下代码示例展示了如何配置myColl集合上的更改流以包括预图像并输出任何更改事件

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

上一个示例配置了更改流使用 FullDocument.WHEN_AVAILABLE 选项。此选项配置更改流在替换和更新更改事件中返回修改后文档的后图像(如果可用)。

假设您将文档中 color 字段的值从 "purple" 更新到 "pink"。更改事件产生以下输出

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

有关选项列表,请参阅完整文档 API文档。

返回

游标中的数据