打开变更流
概述
在本指南中,您可以学习如何使用 更改流 来监视数据库的实时更改。更改流是MongoDB服务器功能,允许您的应用程序订阅单个集合、数据库或部署上的数据更改。您可以为应用程序接收的数据指定一组聚合运算符以过滤和转换。当连接到MongoDB v6.0或更高版本时,您可以配置事件以包含更改之前和之后的数据文档。
以下部分介绍了如何打开和配置更改流
打开更改流
您可以使用更改流来订阅特定类型的数据更改,并在应用程序中产生更改事件。
要打开更改流,请调用watch()
方法,该方法在 MongoCollection
、MongoDatabase
或 MongoClient
的实例上。
重要
独立MongoDB部署不支持更改流,因为该功能需要副本集oplog。有关oplog的更多信息,请参阅副本集oplog 服务器手册页面。
在调用 watch()
方法的对象上确定更改流监听事件的范围。
如果您在 MongoCollection
上调用 watch()
,更改流将监视一个集合。
如果您在 MongoDatabase
上调用 watch()
,更改流将监视该数据库中的所有集合。
如果您在 MongoClient
上调用 watch()
,变化流将监控连接的 MongoDB 部署中的所有更改。
示例
以下代码示例展示了如何打开变化流并在集合中的数据更改时打印变化流事件。
// Launch the change stream in a separate coroutine, // so you can cancel it later. val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
在集合上执行的插入操作应该生成类似于以下文本的输出。
Received a change event: ChangeStreamDocument{ operationType='insert', resumeToken={"_data": "825EC..."}, namespace=myDb.myChangeStreamCollection, ... }
有关可运行的示例,请参阅监控更改用法示例页面。
要了解更多关于 watch()
方法的知识,请参阅以下 API 文档
将聚合运算符应用于您的变化流
您可以将聚合管道作为参数传递给 watch()
方法,以指定变化流接收哪些变化事件。
有关您的 MongoDB 服务器版本支持的聚合运算符的详细信息,请参阅 修改变化流输出。
示例
以下代码示例展示了如何应用聚合管道来配置您的更改流,以接收仅插入和更新操作的事件
val pipeline = listOf( Aggregates.match(Filters.`in`("operationType", listOf("insert", "update"))) ) // Launch the change stream in a separate coroutine, // so you can cancel it later. val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
当更改流接收到更新更改事件时,前面的代码示例输出以下文本
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...}, ...
分割大型更改流事件
当连接到 MongoDB v7.0 或更高版本时,您可以使用 $changeStreamSplitLargeEvent
聚合运算符将超过 16 MB 的事件文档拆分为较小的片段。
仅在您预计更改流事件将超出文档大小限制时才使用 $changeStreamSplitLargeEvent
运算符。例如,如果您的应用程序需要完整的文档预映像或后映像,您可能需要使用此功能。
$changeStreamSplitLargeEvent
聚合阶段按顺序返回片段。您可以通过更改流游标访问这些片段。每个片段文档都包含一个 splitEvent
对象,该对象包含以下字段
字段 | 说明 |
---|---|
fragment | 片段的索引,从 1 开始 |
of | 组成拆分事件的片段总数 |
以下示例打开一个包含聚合管道和 $changeStreamSplitLargeEvent
聚合阶段的更改流,以拆分大型事件
val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument())) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
注意
您只能在聚合管道中有一个 $changeStreamSplitLargeEvent
阶段,并且它必须是管道中的最后一个阶段。
有关 $changeStreamSplitLargeEvent
聚合运算符的更多信息,请参阅服务器手册中的 $changeStreamSplitLargeEvent (聚合)。
包含预图像和后图像
您可以为更改事件配置包含或省略以下数据
**预图像**:如果存在,表示操作之前文档版本的文档
**后图像**:如果存在,表示操作之后文档版本的文档
要接收包含预图像或后图像的更改流事件,您必须连接到 MongoDB v6.0 或更高版本的部署,并设置以下内容
在您的 MongoDB 部署中为集合启用预图像和后图像。
提示
有关如何在您的部署中启用这些功能的信息,请参阅带有文档预图像和后图像的更改流 MongoDB 服务器手册页面。
有关如何指导驱动程序创建启用预图像和后图像的集合的信息,请参阅创建启用预图像和后图像的集合部分。
配置您的更改流以检索预图像、后图像或两者。
创建启用预图像和后图像的集合
要使用驱动程序创建具有预图像和后图像选项的集合,指定 ChangeStreamPreAndPostImagesOptions
的实例,并调用以下示例中的 createCollection()
方法
val collectionOptions = CreateCollectionOptions() collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true)) database.createCollection("myChangeStreamCollection", collectionOptions)
您可以通过从 MongoDB Shell 运行 collMod
命令来更改现有集合中的预图像和后图像选项。有关如何执行此操作的信息,请参阅collMod 服务器手册文档。
警告
当您在集合上修改此选项时,如果在您的应用程序中打开的任何更改流配置为需要接收前像或后像,则可能失败。
前像配置示例
以下代码示例展示了如何配置更改流以包含前像并输出结果
val job = launch { val changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED) changeStream.collect { println(it) } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
前面的示例配置更改流使用FullDocumentBeforeChange.REQUIRED
选项。这配置更改流以返回替换、更新和删除更改事件的前像,并使服务器在无法获取前像时引发错误。
假设一个应用程序将软件库依赖集合中一个文档的latestVersion
字段的值从2.0.0
更新到2.1.0
。前面代码示例输出的相应更改事件应类似于以下文本
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...} namespace=software.libraries, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}}, ...
有关选项列表,请参阅FullDocumentBeforeChange API 文档。
后像配置示例
以下代码示例展示了如何配置一个更改流以包含后像并输出结果
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } } // Perform MongoDB operations that trigger change events... // Cancel the change stream when you're done listening for events. job.cancel()
前面的示例将更改流配置为使用 FullDocument.UPDATE_LOOKUP
选项。这配置更改流返回原始文档和更改后的文档之间的差异,以及更改发生后的某个时间点的文档副本。
假设一个应用程序将城市人口普查数据集合中一个文档的 population
字段从 800
更新到 950
。前面代码示例输出的相应更改事件应类似于以下文本
Received a change event: ChangeStreamDocument{ operationType=update, resumeToken={...}, namespace=censusData.cities, destinationNamespace=null, fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}}, updatedFields={"population": 950}, ... ...
有关选项列表,请参阅 FullDocument API 文档。