文档菜单
文档首页
/ / /
Kotlin 协程

打开变更流

本页内容

  • 概述
  • 打开变更流
  • 将聚合运算符应用于您的变更流
  • 拆分大型变更流事件
  • 包含前图像和后图像

在本指南中,您可以学习如何使用 更改流 来监视数据库的实时更改。更改流是MongoDB服务器功能,允许您的应用程序订阅单个集合、数据库或部署上的数据更改。您可以为应用程序接收的数据指定一组聚合运算符以过滤和转换。当连接到MongoDB v6.0或更高版本时,您可以配置事件以包含更改之前和之后的数据文档。

以下部分介绍了如何打开和配置更改流

  • 打开变更流

  • 将聚合运算符应用于您的变更流

  • 拆分大型变更流事件

  • 包含前图像和后图像

您可以使用更改流来订阅特定类型的数据更改,并在应用程序中产生更改事件。

要打开更改流,请调用watch() 方法,该方法在 MongoCollectionMongoDatabaseMongoClient 的实例上。

重要

独立MongoDB部署不支持更改流,因为该功能需要副本集oplog。有关oplog的更多信息,请参阅副本集oplog 服务器手册页面。

在调用 watch() 方法的对象上确定更改流监听事件的范围。

如果您在 MongoCollection 上调用 watch(),更改流将监视一个集合。

如果您在 MongoDatabase 上调用 watch(),更改流将监视该数据库中的所有集合。

如果您在 MongoClient 上调用 watch(),变化流将监控连接的 MongoDB 部署中的所有更改。

以下代码示例展示了如何打开变化流并在集合中的数据更改时打印变化流事件。

// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

在集合上执行的插入操作应该生成类似于以下文本的输出。

Received a change event: ChangeStreamDocument{
operationType='insert',
resumeToken={"_data": "825EC..."},
namespace=myDb.myChangeStreamCollection,
...
}

有关可运行的示例,请参阅监控更改用法示例页面。

要了解更多关于 watch() 方法的知识,请参阅以下 API 文档

您可以将聚合管道作为参数传递给 watch() 方法,以指定变化流接收哪些变化事件。

有关您的 MongoDB 服务器版本支持的聚合运算符的详细信息,请参阅 修改变化流输出。

以下代码示例展示了如何应用聚合管道来配置您的更改流,以接收仅插入和更新操作的事件

val pipeline = listOf(
Aggregates.match(Filters.`in`("operationType",
listOf("insert", "update")))
)
// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

当更改流接收到更新更改事件时,前面的代码示例输出以下文本

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
...

当连接到 MongoDB v7.0 或更高版本时,您可以使用 $changeStreamSplitLargeEvent 聚合运算符将超过 16 MB 的事件文档拆分为较小的片段。

仅在您预计更改流事件将超出文档大小限制时才使用 $changeStreamSplitLargeEvent 运算符。例如,如果您的应用程序需要完整的文档预映像或后映像,您可能需要使用此功能。

$changeStreamSplitLargeEvent 聚合阶段按顺序返回片段。您可以通过更改流游标访问这些片段。每个片段文档都包含一个 splitEvent 对象,该对象包含以下字段

字段
说明
fragment
片段的索引,从 1 开始
of
组成拆分事件的片段总数

以下示例打开一个包含聚合管道和 $changeStreamSplitLargeEvent 聚合阶段的更改流,以拆分大型事件

val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument()))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

注意

您只能在聚合管道中有一个 $changeStreamSplitLargeEvent 阶段,并且它必须是管道中的最后一个阶段。

有关 $changeStreamSplitLargeEvent 聚合运算符的更多信息,请参阅服务器手册中的 $changeStreamSplitLargeEvent (聚合)

您可以为更改事件配置包含或省略以下数据

  • **预图像**:如果存在,表示操作之前文档版本的文档

  • **后图像**:如果存在,表示操作之后文档版本的文档

要接收包含预图像或后图像的更改流事件,您必须连接到 MongoDB v6.0 或更高版本的部署,并设置以下内容

  • 在您的 MongoDB 部署中为集合启用预图像和后图像。

    提示

    有关如何在您的部署中启用这些功能的信息,请参阅带有文档预图像和后图像的更改流 MongoDB 服务器手册页面。

    有关如何指导驱动程序创建启用预图像和后图像的集合的信息,请参阅创建启用预图像和后图像的集合部分。

  • 配置您的更改流以检索预图像、后图像或两者。

    提示

    要配置更改流以包含预图像,请参阅预图像配置示例

    要配置更改流以包含后图像,请参阅后图像配置示例

要使用驱动程序创建具有预图像和后图像选项的集合,指定 ChangeStreamPreAndPostImagesOptions 的实例,并调用以下示例中的 createCollection() 方法

val collectionOptions = CreateCollectionOptions()
collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true))
database.createCollection("myChangeStreamCollection", collectionOptions)

您可以通过从 MongoDB Shell 运行 collMod 命令来更改现有集合中的预图像和后图像选项。有关如何执行此操作的信息,请参阅collMod 服务器手册文档。

警告

当您在集合上修改此选项时,如果在您的应用程序中打开的任何更改流配置为需要接收前像或后像,则可能失败。

以下代码示例展示了如何配置更改流以包含前像并输出结果

val job = launch {
val changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

前面的示例配置更改流使用FullDocumentBeforeChange.REQUIRED选项。这配置更改流以返回替换、更新和删除更改事件的前像,并使服务器在无法获取前像时引发错误。

假设一个应用程序将软件库依赖集合中一个文档的latestVersion字段的值从2.0.0更新到2.1.0。前面代码示例输出的相应更改事件应类似于以下文本

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...}
namespace=software.libraries,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}},
...

有关选项列表,请参阅FullDocumentBeforeChange API 文档。

以下代码示例展示了如何配置一个更改流以包含后像并输出结果

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

前面的示例将更改流配置为使用 FullDocument.UPDATE_LOOKUP 选项。这配置更改流返回原始文档和更改后的文档之间的差异,以及更改发生后的某个时间点的文档副本。

假设一个应用程序将城市人口普查数据集合中一个文档的 population 字段从 800 更新到 950。前面代码示例输出的相应更改事件应类似于以下文本

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
namespace=censusData.cities,
destinationNamespace=null,
fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}},
updatedFields={"population": 950}, ...
...

有关选项列表,请参阅 FullDocument API 文档。

下一页

MongoDB Kotlin 驱动程序