文档菜单
文档首页
/ / /
Kotlin 协程
/

监视更改

您可以通过打开一个 更改流 来跟踪 MongoDB 中数据的更改,例如集合、数据库或部署的更改。更改流允许应用程序监视数据更改并对其做出反应。

当发生更改时,更改流返回包含有关更新数据信息的 更改事件 文档。

通过调用以下方法打开更改流:watch() 方法在 MongoCollectionMongoDatabaseMongoClient 对象上,如下面的代码示例所示

val changeStream = collection.watch()

watch() 方法可以接受一个可选的 聚合管道,该管道由一个包含 阶段 的数组作为第一个参数,以过滤和转换更改事件输出如下

val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15)))
val changeStream = collection.watch(pipeline)

watch() 方法返回一个 ChangeStreamFlow 实例,这是一个提供多种方法来访问、组织和遍历结果的类。 ChangeStreamFlow 还继承自 Kotlin Coroutines 库中其父类 Flow 的方法。

您可以在 ChangeStreamFlow 上调用 collect() 来处理事件。或者,您可以使用内置在 Flow 中的其他方法来处理结果。

要配置处理更改流返回的文档的选项,请使用 watch() 返回的 ChangeStreamFlow 对象的成员方法。有关可用方法的更多详细信息,请参阅以下示例底部的 ChangeStreamFlow API 文档链接。

要捕获更改流的更改事件,请按如下方式调用 collect() 方法

val changeStream = collection.watch()
changeStream.collect {
println("Change observed: $it")
}

.collect() 函数在触发更改事件时触发。您可以在函数中指定逻辑以在接收到事件文档时处理该事件。

注意

对于更新操作更改事件,更改流默认只返回修改的字段,而不是整个更新文档。您可以通过调用 ChangeStreamFlow 对象的 fullDocument() 成员方法并使用值 FullDocument.UPDATE_LOOKUP 来配置更改流也返回文档的最新版本,如下所示

val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)

以下示例应用程序在sample_mflix数据库的movies集合上打开一个更改流。应用程序使用聚合管道根据operationType筛选更改,以便它只接收插入和更新事件。删除通过省略来排除。应用程序使用.collect()方法接收和打印集合上发生的筛选更改事件。

应用程序在单独的协程作业中启动collect()操作,这使得应用程序可以在更改流打开的同时继续运行。一旦操作完成,应用程序将关闭更改流并退出。

注意

此示例使用连接URI连接到MongoDB的一个实例。有关连接到您的MongoDB实例的更多信息,请参阅连接指南.

import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.kotlin.client.coroutine.MongoClient
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
data class Movie(val title: String, val year: Int)
fun main() = runBlocking {
// Replace the uri string with your MongoDB deployment's connection string
val uri = "<connection string uri>"
val mongoClient = MongoClient.create(uri)
val database = mongoClient.getDatabase("sample_mflix")
val collection = database.getCollection<Movie>("movies")
val job = launch {
val pipeline = listOf(
Aggregates.match(
Filters.`in`("operationType", mutableListOf("insert", "update"))
)
)
val changeStreamFlow = collection.watch(pipeline)
.fullDocument(FullDocument.DEFAULT)
changeStreamFlow.collect { event ->
println("Received a change to the collection: $event")
}
}
// Insert events captured by the change stream watcher
collection.insertOne(Movie("Back to the Future", 1985))
collection.insertOne(Movie("Freaky Friday", 2003))
// Update event captured by the change stream watcher
collection.updateOne(
Filters.eq(Movie::title.name, "Back to the Future"),
Updates.set(Movie::year.name, 1986)
)
// Delete event not captured by the change stream watcher
collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday"))
sleep(1000) // Give time for the change stream watcher to process all events
// Cancel coroutine job to stop the change stream watcher
job.cancel()
mongoClient.close()
}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}}
Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}

有关此页面上提到的类和方法的其他信息,请参阅以下资源

  • 更改流服务器手册条目

  • 更改事件服务器手册条目

  • 聚合管道服务器手册条目

  • 聚合阶段服务器手册条目

  • ChangeStreamFlowAPI文档

  • MongoCollection.watch() API 文档

  • MongoDatabase.watch() API 文档

  • MongoClient.watch() API 文档

返回

批量操作