文档菜单
文档首页
/ / /
Kotlin 同步驱动程序
/

监控数据更改

本页内容

  • 概述
  • 示例数据
  • 打开变更流
  • 打开变更流示例
  • 修改变更流输出
  • 匹配特定事件示例
  • 修改 watch() 行为
  • 包含前像和后像
  • 附加信息
  • API 文档

在这篇指南中,您可以了解如何使用 Kotlin Sync 驱动程序来监控 变更流,让您能够查看数据库的实时更改。变更流是 MongoDB 服务器的一项功能,它在一个集合、数据库或部署上发布数据更改。您的应用程序可以订阅变更流并使用事件执行其他操作。

本指南中的示例使用restaurants 集合在 sample_restaurants 数据库中的数据,这些数据来自Atlas 示例数据集。要了解如何创建免费的 MongoDB Atlas 集群并加载数据集,请参阅 Atlas 入门 指南。

以下 Kotlin 数据类模型表示此集合中的文档

data class Restaurant(
val name: String,
val cuisine: String,
)

要打开变更流,请调用 watch() 方法。您调用 watch() 方法的实例将确定变更流监听事件的作用域。您可以在以下类实例上调用 watch() 方法

  • MongoClient:用于监视 MongoDB 部署中的所有变更

  • MongoDatabase:用于监视数据库中所有集合的变更

  • MongoCollection:用于监视集合中的变更

以下示例在 restaurants 集合上打开变更流,并按发生顺序打印变更

collection.watch().forEach { change ->
println(change)
}

要开始监视变更,运行应用程序。然后在另一个应用程序或 shell 中对 restaurants 集合执行写操作。以下示例更新一个名称为 "Blarney Castle" 的文档

val filter = Filters.eq(Restaurant::name.name, "Blarney Castle")
val update = Updates.set(Restaurant::cuisine.name, "Irish")
val result = collection.updateOne(filter, update)

当您更新集合时,变更流应用程序将按发生顺序打印变更。打印的变更事件类似于以下内容

{
"_id": { ... },
"operationType": "update",
"clusterTime": { ... },
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
...
}

您可以将 pipeline 参数传递给 watch() 方法以修改变更流输出。此参数允许您仅监视指定的变更事件。将参数格式化为代表每个聚合阶段的对象的列表。

您可以在 pipeline 参数中指定以下阶段

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

以下示例使用了包含pipeline参数和$match的代码,以打开一个仅记录更新操作的更改流。

val pipeline = listOf(
Aggregates.match(Filters.eq("operationType", "update"))
)
collection.watch(pipeline).forEach { change ->
println(change)
}

要了解更多关于修改更改流输出的信息,请参阅MongoDB服务器手册中的修改更改流输出部分。

您可以通过将方法链接到由watch()方法调用返回的ChangeStreamIterable对象来修改watch()。如果您不指定任何选项,驱动程序不会自定义操作。

以下表格描述了您可以用于自定义watch()行为的方法

方法
描述
batchSize()
设置每个批次返回的文档数量。
collation()
指定排序结果时使用的语言排序类型。有关更多信息,请参阅MongoDB服务器手册中的排序
comment()
指定要附加到操作上的注释。
fullDocument()
设置fullDocument值。有关更多信息,请参阅本文件的包含前像和后像部分。
fullDocumentBeforeChange()
设置fullDocumentBeforeChange值。有关更多信息,请参阅本文件的包含前像和后像部分。
maxAwaitTime()
设置此操作的最多等待执行时间,单位为毫秒。

有关您可以用于配置 watch() 方法的完整方法列表,请参阅ChangeStreamIterable API 文档。

重要

只有当您的部署使用 MongoDB v6.0 或更高版本时,才能在集合上启用预图像和后图像。

默认情况下,当您在集合上执行操作时,相应的更改事件仅包括该操作修改的字段的变化量。要查看更改前或更改后的完整文档,请将 fullDocumentBeforeChange()fullDocument() 方法链接到 watch() 方法。

预图像 是更改前文档的完整版本。要将预图像包含在更改流事件中,请将以下选项之一传递给 fullDocumentBeforeChange() 方法

  • FullDocumentBeforeChange.WHEN_AVAILABLE:仅当预图像可用时,更改事件才包括修改文档的预图像。

  • FullDocumentBeforeChange.REQUIRED:更改事件包括修改文档的预图像。如果预图像不可用,驱动程序将引发错误。

后图像 是更改后文档的完整版本。要将后图像包含在更改流事件中,请将以下选项之一传递给 fullDocument() 方法

  • FullDocument.UPDATE_LOOKUP:更改事件包括更改后的整个更改文档的副本。

  • FullDocument.WHEN_AVAILABLE:更改事件仅在可用时包括修改后的文档的后置图像。

  • FullDocument.REQUIRED:更改事件包括修改后的文档的后置图像。如果后置图像不可用,驱动程序将引发错误。

以下示例在一个集合上调用 watch() 方法,并通过指定 fullDocument 参数将更新文档的后置图像包括在结果中

collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change ->
println("Received a change: $change")
}

在更改流应用程序运行时,通过使用 前面的更新示例 更新 restaurants 集合中的文档,将打印出类似于以下内容的更改事件

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish),
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}},
clusterTime=Timestamp{value=..., seconds=..., inc=...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null,
wallTime=BsonDateTime{value=...}}

有关前置图像和后置图像的更多信息,请参阅 MongoDB 服务器手册中的带有文档前置和后置图像的更改流

有关更改流的更多信息,请参阅 MongoDB 服务器手册中的更改流

有关本指南中讨论的任何方法或类型的更多信息,请参阅以下 API 文档

返回

数据游标