监视变化
您可以通过打开一个更改流来跟踪MongoDB中数据的更改,例如集合、数据库或部署的更改。更改流允许应用程序监视数据更改并对其做出反应。
更改流在发生更改时返回包含有关更新数据信息的更改事件文档。
通过调用以下代码示例中的方法来打开更改流:watch()
方法在MongoCollection
、MongoDatabase
或MongoClient
对象上,如下所示
ChangeStreamIterable<Document> changeStream = database.watch();
watch()
方法可以选择性地接受一个聚合管道,该管道由一个包含阶段的数组作为第一个参数,以过滤和转换更改事件输出如下
List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
watch()
方法返回一个ChangeStreamIterable
实例,该类提供了一些方法来访问、组织和遍历结果。ChangeStreamIterable
还从其父类MongoIterable
继承方法,该类实现了核心Java接口Iterable
。
您可以在ChangeStreamIterable
上调用forEach()
来处理事件,或者您可以使用返回一个MongoCursor
实例的iterator()
方法来遍历结果。
您可以在MongoCursor
上调用方法,例如hasNext()
来检查是否存在更多结果,next()
来返回集合中的下一个文档,或者tryNext()
,以立即返回更改流中的下一个可用元素或null
。与由其他查询返回的MongoCursor
不同,与更改流关联的MongoCursor
在返回结果之前会等待更改事件到来。因此,使用更改流的MongoCursor
调用next()
永远不会抛出java.util.NoSuchElementException
。
要配置处理更改流返回的文档的选项,请使用由watch()
返回的ChangeStreamIterable
对象的成员方法。有关可用方法的更多详细信息,请参阅此示例底部有关ChangeStreamIterable
API文档的链接。
如何使用回调处理变更流事件
要捕获变更流的事件,调用以下示例中的 forEach()
方法,并传入一个回调函数
changeStream.forEach(event -> System.out.println("Change observed: " + event));
当变更事件被触发时,回调函数将被执行。您可以在回调函数中指定逻辑来处理接收到的事件文档。
重要
forEach()
会阻塞当前线程
forEach()
的调用会阻塞当前线程,直到相应的变更流监听事件为止。如果您的程序需要继续执行其他逻辑,例如处理请求或响应用户输入,请考虑在一个单独的线程中创建并监听您的变更流。
注意
对于更新操作变更事件,变更流默认只返回修改的字段,而不是整个更新的文档。您可以通过调用 ChangeStreamIterable
对象的 fullDocument()
成员方法并传入值 FullDocument.UPDATE_LOOKUP
来配置变更流也返回文档的最新版本,如下所示
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
示例
以下示例使用两个独立的应用程序来演示如何使用变更流监听变更
第一个应用程序,名为
Watch
,在sample_mflix
数据库中的movies
集合上打开一个变更流。Watch
使用聚合管道根据operationType
过滤变更,以便只接收插入和更新事件(删除事件通过省略被排除)。Watch
使用回调来接收并打印发生在集合上的过滤后的变更事件。第二个应用程序,名为
WatchCompanion
,在sample_mflix
数据库中的movies
集合中插入单个文档。接下来,WatchCompanion
使用新的字段值更新该文档。最后,WatchCompanion
删除该文档。
首先,运行 Watch
以打开集合上的变更流,并使用 forEach()
方法在变更流上定义一个回调。当 Watch
运行时,运行 WatchCompanion
通过对集合执行更改来生成变更事件。
注意
此示例使用连接 URI 连接到 MongoDB 实例。要了解更多关于连接到您的 MongoDB 实例的信息,请参阅连接指南.
Watch
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package usage.examples; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion
:
// Performs CRUD operations to generate change events when run with the Watch application package usage.examples; import java.util.Arrays; import org.bson.Document; import org.bson.types.ObjectId; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Success! Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
如果您按顺序运行前面的应用程序,您应该会看到类似于以下内容的 Watch
应用程序的输出。仅打印 insert
和 update
操作,因为聚合管道过滤掉了 delete
操作
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
您还应该看到类似于以下内容的 WatchCompanion
应用程序的输出
Success! Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
有关本页面上提到的类和方法的更多信息,请参阅以下资源
变更流 服务器手册条目
变更事件 服务器手册条目
聚合管道 服务器手册条目
聚合阶段 服务器手册条目
ChangeStreamIterable API 文档
MongoCollection.watch() API 文档
MongoDatabase.watch() API 文档
MongoClient.watch() API 文档