更改流
MongoDB 服务器版本 3.6 引入了$changeStream
聚合管道运算符。
更改流提供了一种监视集合中文档更改的方法。为了提高这个新阶段的可用性,MongoCollection
类型包含 watch()
方法。ChangeStreamObservable
实例设置更改流并在遇到可能可恢复的错误时自动尝试恢复。
先决条件
您必须设置以下组件才能运行本指南中的代码示例
一个包含来自文档资源GitHub中的
restaurants.json
文件文档的test.restaurants
集合文档.以下导入语句
import java.util.concurrent.CountDownLatch import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.changestream._
注意
本指南使用Observable
隐式实现,如快速入门指南中所述快速入门指南.
连接到MongoDB部署
首先,连接到MongoDB部署,然后声明并定义MongoDatabase
和MongoCollection
实例。
以下代码连接到运行在localhost
上的独立MongoDB部署,端口为27017
。然后,它定义了database
变量来引用test
数据库,并将collection
变量用于引用restaurants
集合
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants")
有关连接到MongoDB部署的更多信息,请参阅连接到MongoDB教程。
监视集合中的更改
要创建更改流,请使用MongoCollection.watch()
方法之一。
以下示例中,变更流打印出它观察到的所有变更
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() } val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await() // Block waiting for the latch
监视数据库中的变更
应用程序可以打开一个单独的变更流来监视数据库中所有非系统集合。要创建此类变更流,请使用 MongoDatabase.watch()
方法之一。
以下示例中,变更流打印出它观察到的给定数据库上的所有变更
val observer = LatchedObserver() database.watch().subscribe(observer) observer.await() // Block waiting for the latch
监视所有数据库中的变更
应用程序可以打开一个单独的变更流来监视MongoDB部署中所有数据库的所有非系统集合。要创建此类变更流,请使用 MongoClient.watch()
方法之一。
以下示例中,变更流打印出它观察到的连接到 MongoClient
的部署中的所有变更
val observer = LatchedObserver() client.watch().subscribe(observer) observer.await() // Block waiting for the latch
过滤内容
您可以将聚合阶段的列表传递给 watch()
方法来修改由 $changeStream
操作符返回的数据。
注意
并非所有聚合操作符都受支持。有关更多信息,请参阅服务器手册中的 变更流。
以下示例中,更改流打印出它观察到的所有与 insert
(插入)、update
(更新)、replace
(替换)和 delete
(删除)操作相对应的变化。
首先,管道包含一个 $match
阶段,用于筛选出 operationType
字段为 insert
、update
、replace
或 delete
的文档。然后,将 fullDocument
设置为 FullDocument.UPDATE_LOOKUP
,以便将更新后的文档包含在结果中
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete"))))) .fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer) observer.await() // Block waiting for the latch