文档菜单
文档首页
/ / /
Java 反应式流驱动程序
/

监控数据更改

本页内容

  • 概述
  • 示例数据
  • 打开更改流
  • 修改更改流输出
  • 修改watch()行为
  • 包含预图像和后图像
  • 附加信息
  • API 文档

在本指南中,您可以了解如何使用change stream来监视您数据的实时变化。Change stream是MongoDB服务器的一项功能,它允许您的应用程序订阅集合、数据库或部署中的数据变化。

本指南中的示例使用sample_restaurants.restaurants集合,该集合来自Atlas示例数据集。要了解如何创建免费的MongoDB Atlas集群并加载示例数据集,请参阅入门指南。入门指南。

重要

项目反应器库

本指南使用项目反应器库来消费Java反应式流驱动程序方法返回的发布者实例。要了解有关项目反应器库以及如何使用它的更多信息,请参阅Reactor文档中的入门。要了解本指南中如何使用项目反应器库方法,请参阅写入MongoDB数据指南。

要打开更改流,请调用 watch() 方法。你调用该方法的实例将确定更改流监听的事件范围。你可以在以下类的实例上调用 watch() 方法

  • MongoClient:用于监控 MongoDB 部署的所有更改

  • MongoDatabase:用于监控数据库中所有集合的更改

  • MongoCollection:用于监控集合的更改

以下示例在 restaurants 集合上打开更改流,并按发生顺序输出更改

// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch();
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

要开始监控更改,运行应用程序。然后,在单独的应用程序或壳中,对 restaurants 集合执行写操作。更新具有 "name" 字段值为 "Blarney Castle" 的文档会导致以下更改流输出

Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null,
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{value=...}}

你可以将聚合管道作为参数传递给 watch() 方法以修改更改流输出。此参数允许你仅监控指定的更改事件。

你可以在 pipeline 参数中指定以下聚合阶段

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

以下示例将聚合管道传递给更改流,以仅记录更新操作

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

有关修改更改流输出的更多信息,请参阅 MongoDB 服务器手册中的 修改更改流输出 部分。

您可以将方法链接到 watch() 方法上,这些方法代表了您可以用来配置更改流操作的选项。如果您没有指定任何选项,驱动程序不会自定义操作。

下表描述了您可以链接到 watch() 以自定义其行为的函数

选项
描述
fullDocument()
指定在更改后是否显示完整文档,而不是只显示对文档所做的更改。有关此选项的更多信息,请参阅 包含预图像和后图像
fullDocumentBeforeChange()
指定是否显示更改之前的完整文档,而不是只显示对文档所做的更改。有关此选项的更多信息,请参阅 包含预图像和后图像
resumeAfter()
指示 watch() 在恢复令牌指定的操作后继续返回更改。
每个更改流事件文档都包含一个恢复令牌,作为 _id 字段。传递表示您想要在之后恢复的操作的更改事件文档的整个 _id 字段。
resumeAfter()startAfter()startAtOperationTime() 是互斥的。
startAfter()
指示 watch() 在恢复令牌指定的操作后开始新的更改流。允许在无效事件后恢复通知。
每个更改流事件文档都包含一个恢复令牌,作为 _id 字段。传递表示您想要在之后恢复的操作的更改事件文档的整个 _id 字段。
startAfter()resumeAfter()startAtOperationTime() 是互斥的。
startAtOperationTime()
指示 watch() 只返回指定时间戳之后发生的事件。
startAtOperationTime()resumeAfter()startAfter() 是互斥的。
maxAwaitTime()
指定服务器在向更改流光标报告新数据更改之前等待的最大时间,以毫秒为单位。默认为1000毫秒。
showExpandedEvents()
从 MongoDB Server v6.0 开始,更改流支持数据定义语言 (DDL) 事件(如 createIndexesdropIndexes 事件)的更改通知。要在更改流中包含扩展事件,请调用此方法并传递值 true
batchSize()
指定从 MongoDB 集群响应的每个批次中返回的最大更改事件数。
collation()
指定更改流光标要使用的排序。
comment()
将注释附加到操作。

重要

只有当您的部署使用MongoDB v6.0或更高版本时,才能在集合上启用预图像和后图像。

默认情况下,当您对集合执行操作时,相应的事件更改只包括该操作修改的字段的变化量。要查看更改前或后的完整文档,请将fullDocumentBeforeChange()fullDocument()方法链接到watch()方法。

预图像是更改之前文档的完整版本。要在更改流事件中包含预图像,请将以下值之一传递给fullDocumentBeforeChange()方法

  • FullDocumentBeforeChange.WHEN_AVAILABLE:只有当预图像可用时,更改事件才包括修改文档的预图像。

  • FullDocumentBeforeChange.REQUIRED:更改事件包括修改文档的预图像。如果预图像不可用,驱动程序将引发错误。

后图像是更改之后文档的完整版本。要在更改流事件中包含后图像,请将以下值之一传递给fullDocument()方法

  • FullDocument.UPDATE_LOOKUP:更改事件包括更改后某个时间点的整个更改文档的副本。

  • FullDocument.WHEN_AVAILABLE:只有当后图像可用时,更改事件才包括修改文档的后图像。

  • FullDocument.REQUIRED:更改事件包括修改文档的后图像。如果后图像不可用,驱动程序将引发错误。

以下示例在集合上打开更改流并使用fullDocument()方法链接到watch()方法,以包含更新文档的后图像

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received including the full
// document after the update
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

有关预图像和后图像的更多信息,请参阅MongoDB服务器手册中的具有文档预图像和后图像的更改流

了解更多关于更改流的信息,请参阅MongoDB服务器手册中的更改流

要了解更多关于本指南中讨论的任何方法或类型,请参阅以下API文档

返回

地理空间搜索