监控数据更改
概述
在本指南中,您可以了解如何使用change stream来监视您数据的实时变化。Change stream是MongoDB服务器的一项功能,它允许您的应用程序订阅集合、数据库或部署中的数据变化。
示例数据
本指南中的示例使用sample_restaurants.restaurants集合,该集合来自Atlas示例数据集。要了解如何创建免费的MongoDB Atlas集群并加载示例数据集,请参阅入门指南。入门指南。
重要
项目反应器库
本指南使用项目反应器库来消费Java反应式流驱动程序方法返回的发布者实例。要了解有关项目反应器库以及如何使用它的更多信息,请参阅Reactor文档中的入门。要了解本指南中如何使用项目反应器库方法,请参阅写入MongoDB数据指南。
打开更改流
要打开更改流,请调用 watch() 方法。你调用该方法的实例将确定更改流监听的事件范围。你可以在以下类的实例上调用 watch() 方法
MongoClient:用于监控 MongoDB 部署的所有更改MongoDatabase:用于监控数据库中所有集合的更改MongoCollection:用于监控集合的更改
以下示例在 restaurants 集合上打开更改流,并按发生顺序输出更改
// Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
要开始监控更改,运行应用程序。然后,在单独的应用程序或壳中,对 restaurants 集合执行写操作。更新具有 "name" 字段值为 "Blarney Castle" 的文档会导致以下更改流输出
Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
修改更改流输出
你可以将聚合管道作为参数传递给 watch() 方法以修改更改流输出。此参数允许你仅监控指定的更改事件。
你可以在 pipeline 参数中指定以下聚合阶段
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
以下示例将聚合管道传递给更改流,以仅记录更新操作
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
有关修改更改流输出的更多信息,请参阅 MongoDB 服务器手册中的 修改更改流输出 部分。
修改 watch() 行为
您可以将方法链接到 watch() 方法上,这些方法代表了您可以用来配置更改流操作的选项。如果您没有指定任何选项,驱动程序不会自定义操作。
下表描述了您可以链接到 watch() 以自定义其行为的函数
选项 | 描述 |
|---|---|
fullDocument() | 指定在更改后是否显示完整文档,而不是只显示对文档所做的更改。有关此选项的更多信息,请参阅 包含预图像和后图像。 |
fullDocumentBeforeChange() | 指定是否显示更改之前的完整文档,而不是只显示对文档所做的更改。有关此选项的更多信息,请参阅 包含预图像和后图像。 |
resumeAfter() | 指示 watch() 在恢复令牌指定的操作后继续返回更改。每个更改流事件文档都包含一个恢复令牌,作为 _id 字段。传递表示您想要在之后恢复的操作的更改事件文档的整个 _id 字段。resumeAfter() 与 startAfter() 和 startAtOperationTime() 是互斥的。 |
startAfter() | 指示 watch() 在恢复令牌指定的操作后开始新的更改流。允许在无效事件后恢复通知。每个更改流事件文档都包含一个恢复令牌,作为 _id 字段。传递表示您想要在之后恢复的操作的更改事件文档的整个 _id 字段。startAfter() 与 resumeAfter() 和 startAtOperationTime() 是互斥的。 |
startAtOperationTime() | 指示 watch() 只返回指定时间戳之后发生的事件。startAtOperationTime() 与 resumeAfter() 和 startAfter() 是互斥的。 |
maxAwaitTime() | 指定服务器在向更改流光标报告新数据更改之前等待的最大时间,以毫秒为单位。默认为1000毫秒。 |
showExpandedEvents() | 从 MongoDB Server v6.0 开始,更改流支持数据定义语言 (DDL) 事件(如 createIndexes 和 dropIndexes 事件)的更改通知。要在更改流中包含扩展事件,请调用此方法并传递值 true。 |
batchSize() | 指定从 MongoDB 集群响应的每个批次中返回的最大更改事件数。 |
collation() | 指定更改流光标要使用的排序。 |
comment() | 将注释附加到操作。 |
包括预图像和后图像
重要
只有当您的部署使用MongoDB v6.0或更高版本时,才能在集合上启用预图像和后图像。
默认情况下,当您对集合执行操作时,相应的事件更改只包括该操作修改的字段的变化量。要查看更改前或后的完整文档,请将fullDocumentBeforeChange()或fullDocument()方法链接到watch()方法。
预图像是更改之前文档的完整版本。要在更改流事件中包含预图像,请将以下值之一传递给fullDocumentBeforeChange()方法
FullDocumentBeforeChange.WHEN_AVAILABLE:只有当预图像可用时,更改事件才包括修改文档的预图像。FullDocumentBeforeChange.REQUIRED:更改事件包括修改文档的预图像。如果预图像不可用,驱动程序将引发错误。
后图像是更改之后文档的完整版本。要在更改流事件中包含后图像,请将以下值之一传递给fullDocument()方法
FullDocument.UPDATE_LOOKUP:更改事件包括更改后某个时间点的整个更改文档的副本。FullDocument.WHEN_AVAILABLE:只有当后图像可用时,更改事件才包括修改文档的后图像。FullDocument.REQUIRED:更改事件包括修改文档的后图像。如果后图像不可用,驱动程序将引发错误。
以下示例在集合上打开更改流并使用fullDocument()方法链接到watch()方法,以包含更新文档的后图像
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received including the full // document after the update ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
有关预图像和后图像的更多信息,请参阅MongoDB服务器手册中的具有文档预图像和后图像的更改流。
附加信息
了解更多关于更改流的信息,请参阅MongoDB服务器手册中的更改流。
API文档
要了解更多关于本指南中讨论的任何方法或类型,请参阅以下API文档