监控数据更改
概述
在本指南中,您可以了解如何使用change stream
来监视您数据的实时变化。Change stream是MongoDB服务器的一项功能,它允许您的应用程序订阅集合、数据库或部署中的数据变化。
示例数据
本指南中的示例使用sample_restaurants.restaurants
集合,该集合来自Atlas示例数据集。要了解如何创建免费的MongoDB Atlas集群并加载示例数据集,请参阅入门指南。入门指南。
重要
项目反应器库
本指南使用项目反应器库来消费Java反应式流驱动程序方法返回的发布者
实例。要了解有关项目反应器库以及如何使用它的更多信息,请参阅Reactor文档中的入门。要了解本指南中如何使用项目反应器库方法,请参阅写入MongoDB数据指南。
打开更改流
要打开更改流,请调用 watch()
方法。你调用该方法的实例将确定更改流监听的事件范围。你可以在以下类的实例上调用 watch()
方法
MongoClient
:用于监控 MongoDB 部署的所有更改MongoDatabase
:用于监控数据库中所有集合的更改MongoCollection
:用于监控集合的更改
以下示例在 restaurants
集合上打开更改流,并按发生顺序输出更改
// Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
要开始监控更改,运行应用程序。然后,在单独的应用程序或壳中,对 restaurants
集合执行写操作。更新具有 "name"
字段值为 "Blarney Castle"
的文档会导致以下更改流输出
Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
修改更改流输出
你可以将聚合管道作为参数传递给 watch()
方法以修改更改流输出。此参数允许你仅监控指定的更改事件。
你可以在 pipeline
参数中指定以下聚合阶段
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
以下示例将聚合管道传递给更改流,以仅记录更新操作
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
有关修改更改流输出的更多信息,请参阅 MongoDB 服务器手册中的 修改更改流输出 部分。
修改 watch()
行为
您可以将方法链接到 watch()
方法上,这些方法代表了您可以用来配置更改流操作的选项。如果您没有指定任何选项,驱动程序不会自定义操作。
下表描述了您可以链接到 watch()
以自定义其行为的函数
选项 | 描述 |
---|---|
fullDocument() | 指定在更改后是否显示完整文档,而不是只显示对文档所做的更改。有关此选项的更多信息,请参阅 包含预图像和后图像。 |
fullDocumentBeforeChange() | 指定是否显示更改之前的完整文档,而不是只显示对文档所做的更改。有关此选项的更多信息,请参阅 包含预图像和后图像。 |
resumeAfter() | 指示 watch() 在恢复令牌指定的操作后继续返回更改。每个更改流事件文档都包含一个恢复令牌,作为 _id 字段。传递表示您想要在之后恢复的操作的更改事件文档的整个 _id 字段。resumeAfter() 与 startAfter() 和 startAtOperationTime() 是互斥的。 |
startAfter() | 指示 watch() 在恢复令牌指定的操作后开始新的更改流。允许在无效事件后恢复通知。每个更改流事件文档都包含一个恢复令牌,作为 _id 字段。传递表示您想要在之后恢复的操作的更改事件文档的整个 _id 字段。startAfter() 与 resumeAfter() 和 startAtOperationTime() 是互斥的。 |
startAtOperationTime() | 指示 watch() 只返回指定时间戳之后发生的事件。startAtOperationTime() 与 resumeAfter() 和 startAfter() 是互斥的。 |
maxAwaitTime() | 指定服务器在向更改流光标报告新数据更改之前等待的最大时间,以毫秒为单位。默认为1000毫秒。 |
showExpandedEvents() | 从 MongoDB Server v6.0 开始,更改流支持数据定义语言 (DDL) 事件(如 createIndexes 和 dropIndexes 事件)的更改通知。要在更改流中包含扩展事件,请调用此方法并传递值 true 。 |
batchSize() | 指定从 MongoDB 集群响应的每个批次中返回的最大更改事件数。 |
collation() | 指定更改流光标要使用的排序。 |
comment() | 将注释附加到操作。 |
包括预图像和后图像
重要
只有当您的部署使用MongoDB v6.0或更高版本时,才能在集合上启用预图像和后图像。
默认情况下,当您对集合执行操作时,相应的事件更改只包括该操作修改的字段的变化量。要查看更改前或后的完整文档,请将fullDocumentBeforeChange()
或fullDocument()
方法链接到watch()
方法。
预图像是更改之前文档的完整版本。要在更改流事件中包含预图像,请将以下值之一传递给fullDocumentBeforeChange()
方法
FullDocumentBeforeChange.WHEN_AVAILABLE
:只有当预图像可用时,更改事件才包括修改文档的预图像。FullDocumentBeforeChange.REQUIRED
:更改事件包括修改文档的预图像。如果预图像不可用,驱动程序将引发错误。
后图像是更改之后文档的完整版本。要在更改流事件中包含后图像,请将以下值之一传递给fullDocument()
方法
FullDocument.UPDATE_LOOKUP
:更改事件包括更改后某个时间点的整个更改文档的副本。FullDocument.WHEN_AVAILABLE
:只有当后图像可用时,更改事件才包括修改文档的后图像。FullDocument.REQUIRED
:更改事件包括修改文档的后图像。如果后图像不可用,驱动程序将引发错误。
以下示例在集合上打开更改流并使用fullDocument()
方法链接到watch()
方法,以包含更新文档的后图像
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received including the full // document after the update ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
有关预图像和后图像的更多信息,请参阅MongoDB服务器手册中的具有文档预图像和后图像的更改流。
附加信息
了解更多关于更改流的信息,请参阅MongoDB服务器手册中的更改流。
API文档
要了解更多关于本指南中讨论的任何方法或类型,请参阅以下API文档