监视更改
您可以通过打开一个 更改流 来跟踪 MongoDB 中数据的更改,例如集合、数据库或部署的更改。更改流允许应用程序监视数据更改并对其做出反应。
当发生更改时,更改流返回包含有关更新数据信息的 更改事件 文档。
通过调用以下方法打开更改流:watch()
方法在 MongoCollection
、MongoDatabase
或 MongoClient
对象上,如下面的代码示例所示
val changeStream = collection.watch()
watch()
方法可以接受一个可选的 聚合管道,该管道由一个包含 阶段 的数组作为第一个参数,以过滤和转换更改事件输出如下
val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15))) val changeStream = collection.watch(pipeline)
watch()
方法返回一个 ChangeStreamFlow
实例,这是一个提供多种方法来访问、组织和遍历结果的类。 ChangeStreamFlow
还继承自 Kotlin Coroutines 库中其父类 Flow
的方法。
您可以在 ChangeStreamFlow
上调用 collect()
来处理事件。或者,您可以使用内置在 Flow
中的其他方法来处理结果。
要配置处理更改流返回的文档的选项,请使用 watch()
返回的 ChangeStreamFlow
对象的成员方法。有关可用方法的更多详细信息,请参阅以下示例底部的 ChangeStreamFlow
API 文档链接。
使用 .collect() 处理更改流事件
要捕获更改流的更改事件,请按如下方式调用 collect()
方法
val changeStream = collection.watch() changeStream.collect { println("Change observed: $it") }
.collect()
函数在触发更改事件时触发。您可以在函数中指定逻辑以在接收到事件文档时处理该事件。
注意
对于更新操作更改事件,更改流默认只返回修改的字段,而不是整个更新文档。您可以通过调用 ChangeStreamFlow
对象的 fullDocument()
成员方法并使用值 FullDocument.UPDATE_LOOKUP
来配置更改流也返回文档的最新版本,如下所示
val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP)
示例
以下示例应用程序在sample_mflix
数据库的movies
集合上打开一个更改流。应用程序使用聚合管道根据operationType
筛选更改,以便它只接收插入和更新事件。删除通过省略来排除。应用程序使用.collect()
方法接收和打印集合上发生的筛选更改事件。
应用程序在单独的协程作业中启动collect()
操作,这使得应用程序可以在更改流打开的同时继续运行。一旦操作完成,应用程序将关闭更改流并退出。
注意
此示例使用连接URI连接到MongoDB的一个实例。有关连接到您的MongoDB实例的更多信息,请参阅连接指南.
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import com.mongodb.client.model.changestream.FullDocument import com.mongodb.kotlin.client.coroutine.MongoClient import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Thread.sleep data class Movie(val title: String, val year: Int) fun main() = runBlocking { // Replace the uri string with your MongoDB deployment's connection string val uri = "<connection string uri>" val mongoClient = MongoClient.create(uri) val database = mongoClient.getDatabase("sample_mflix") val collection = database.getCollection<Movie>("movies") val job = launch { val pipeline = listOf( Aggregates.match( Filters.`in`("operationType", mutableListOf("insert", "update")) ) ) val changeStreamFlow = collection.watch(pipeline) .fullDocument(FullDocument.DEFAULT) changeStreamFlow.collect { event -> println("Received a change to the collection: $event") } } // Insert events captured by the change stream watcher collection.insertOne(Movie("Back to the Future", 1985)) collection.insertOne(Movie("Freaky Friday", 2003)) // Update event captured by the change stream watcher collection.updateOne( Filters.eq(Movie::title.name, "Back to the Future"), Updates.set(Movie::year.name, 1986) ) // Delete event not captured by the change stream watcher collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday")) sleep(1000) // Give time for the change stream watcher to process all events // Cancel coroutine job to stop the change stream watcher job.cancel() mongoClient.close() }
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}} Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}} Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}
有关此页面上提到的类和方法的其他信息,请参阅以下资源
更改流服务器手册条目
更改事件服务器手册条目
聚合管道服务器手册条目
聚合阶段服务器手册条目
ChangeStreamFlowAPI文档
MongoCollection.watch() API 文档
MongoDatabase.watch() API 文档
MongoClient.watch() API 文档