监控数据更改
概述
在这篇指南中,您可以了解如何使用 Kotlin Sync 驱动程序来监控 变更流,让您能够查看数据库的实时更改。变更流是 MongoDB 服务器的一项功能,它在一个集合、数据库或部署上发布数据更改。您的应用程序可以订阅变更流并使用事件执行其他操作。
示例数据
本指南中的示例使用restaurants
集合在 sample_restaurants
数据库中的数据,这些数据来自Atlas 示例数据集。要了解如何创建免费的 MongoDB Atlas 集群并加载数据集,请参阅 Atlas 入门 指南。
以下 Kotlin 数据类模型表示此集合中的文档
data class Restaurant( val name: String, val cuisine: String, )
打开变更流
要打开变更流,请调用 watch()
方法。您调用 watch()
方法的实例将确定变更流监听事件的作用域。您可以在以下类实例上调用 watch()
方法
MongoClient
:用于监视 MongoDB 部署中的所有变更MongoDatabase
:用于监视数据库中所有集合的变更MongoCollection
:用于监视集合中的变更
打开变更流示例
以下示例在 restaurants
集合上打开变更流,并按发生顺序打印变更
collection.watch().forEach { change -> println(change) }
要开始监视变更,运行应用程序。然后在另一个应用程序或 shell 中对 restaurants
集合执行写操作。以下示例更新一个名称为 "Blarney Castle"
的文档
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle") val update = Updates.set(Restaurant::cuisine.name, "Irish") val result = collection.updateOne(filter, update)
当您更新集合时,变更流应用程序将按发生顺序打印变更。打印的变更事件类似于以下内容
{ "_id": { ... }, "operationType": "update", "clusterTime": { ... }, "ns": { "db": "sample_restaurants", "coll": "restaurants" }, "updateDescription": { "updatedFields": { "cuisine": "Irish" }, "removedFields": [], "truncatedArrays": [] } ... }
修改变更流输出
您可以将 pipeline
参数传递给 watch()
方法以修改变更流输出。此参数允许您仅监视指定的变更事件。将参数格式化为代表每个聚合阶段的对象的列表。
您可以在 pipeline
参数中指定以下阶段
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
匹配特定事件示例
以下示例使用了包含pipeline
参数和$match
的代码,以打开一个仅记录更新操作的更改流。
val pipeline = listOf( Aggregates.match(Filters.eq("operationType", "update")) ) collection.watch(pipeline).forEach { change -> println(change) }
要了解更多关于修改更改流输出的信息,请参阅MongoDB服务器手册中的修改更改流输出部分。
修改watch()行为
您可以通过将方法链接到由watch()
方法调用返回的ChangeStreamIterable
对象来修改watch()
。如果您不指定任何选项,驱动程序不会自定义操作。
以下表格描述了您可以用于自定义watch()
行为的方法
方法 | 描述 |
---|---|
batchSize() | 设置每个批次返回的文档数量。 |
collation() | 指定排序结果时使用的语言排序类型。有关更多信息,请参阅MongoDB服务器手册中的排序。 |
comment() | 指定要附加到操作上的注释。 |
fullDocument() | 设置 fullDocument 值。有关更多信息,请参阅本文件的包含前像和后像部分。 |
fullDocumentBeforeChange() | 设置 fullDocumentBeforeChange 值。有关更多信息,请参阅本文件的包含前像和后像部分。 |
maxAwaitTime() | 设置此操作的最多等待执行时间,单位为毫秒。 |
有关您可以用于配置 watch()
方法的完整方法列表,请参阅ChangeStreamIterable API 文档。
包含预图像和后图像
重要
只有当您的部署使用 MongoDB v6.0 或更高版本时,才能在集合上启用预图像和后图像。
默认情况下,当您在集合上执行操作时,相应的更改事件仅包括该操作修改的字段的变化量。要查看更改前或更改后的完整文档,请将 fullDocumentBeforeChange()
或 fullDocument()
方法链接到 watch()
方法。
预图像 是更改前文档的完整版本。要将预图像包含在更改流事件中,请将以下选项之一传递给 fullDocumentBeforeChange()
方法
FullDocumentBeforeChange.WHEN_AVAILABLE
:仅当预图像可用时,更改事件才包括修改文档的预图像。FullDocumentBeforeChange.REQUIRED
:更改事件包括修改文档的预图像。如果预图像不可用,驱动程序将引发错误。
后图像 是更改后文档的完整版本。要将后图像包含在更改流事件中,请将以下选项之一传递给 fullDocument()
方法
FullDocument.UPDATE_LOOKUP
:更改事件包括更改后的整个更改文档的副本。FullDocument.WHEN_AVAILABLE
:更改事件仅在可用时包括修改后的文档的后置图像。FullDocument.REQUIRED
:更改事件包括修改后的文档的后置图像。如果后置图像不可用,驱动程序将引发错误。
以下示例在一个集合上调用 watch()
方法,并通过指定 fullDocument
参数将更新文档的后置图像包括在结果中
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change -> println("Received a change: $change") }
在更改流应用程序运行时,通过使用 前面的更新示例 更新 restaurants
集合中的文档,将打印出类似于以下内容的更改事件
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{value=..., seconds=..., inc=...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
有关前置图像和后置图像的更多信息,请参阅 MongoDB 服务器手册中的带有文档前置和后置图像的更改流。
更多信息
有关更改流的更多信息,请参阅 MongoDB 服务器手册中的更改流。
API 文档
有关本指南中讨论的任何方法或类型的更多信息,请参阅以下 API 文档