文档菜单
文档首页
/ / /
Java 同步驱动程序
/

监视变化

您可以通过打开一个更改流来跟踪MongoDB中数据的更改,例如集合、数据库或部署的更改。更改流允许应用程序监视数据更改并对其做出反应。

更改流在发生更改时返回包含有关更新数据信息的更改事件文档。

通过调用以下代码示例中的方法来打开更改流:watch()方法在MongoCollectionMongoDatabaseMongoClient对象上,如下所示

ChangeStreamIterable<Document> changeStream = database.watch();

watch()方法可以选择性地接受一个聚合管道,该管道由一个包含阶段的数组作为第一个参数,以过滤和转换更改事件输出如下

List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

watch()方法返回一个ChangeStreamIterable实例,该类提供了一些方法来访问、组织和遍历结果。ChangeStreamIterable还从其父类MongoIterable继承方法,该类实现了核心Java接口Iterable

您可以在ChangeStreamIterable上调用forEach()来处理事件,或者您可以使用返回一个MongoCursor实例的iterator()方法来遍历结果。

您可以在MongoCursor上调用方法,例如hasNext()来检查是否存在更多结果,next()来返回集合中的下一个文档,或者tryNext(),以立即返回更改流中的下一个可用元素或null。与由其他查询返回的MongoCursor不同,与更改流关联的MongoCursor在返回结果之前会等待更改事件到来。因此,使用更改流的MongoCursor调用next()永远不会抛出java.util.NoSuchElementException

要配置处理更改流返回的文档的选项,请使用由watch()返回的ChangeStreamIterable对象的成员方法。有关可用方法的更多详细信息,请参阅此示例底部有关ChangeStreamIterable API文档的链接。

要捕获变更流的事件,调用以下示例中的 forEach() 方法,并传入一个回调函数

changeStream.forEach(event -> System.out.println("Change observed: " + event));

当变更事件被触发时,回调函数将被执行。您可以在回调函数中指定逻辑来处理接收到的事件文档。

重要

forEach() 会阻塞当前线程

forEach() 的调用会阻塞当前线程,直到相应的变更流监听事件为止。如果您的程序需要继续执行其他逻辑,例如处理请求或响应用户输入,请考虑在一个单独的线程中创建并监听您的变更流。

注意

对于更新操作变更事件,变更流默认只返回修改的字段,而不是整个更新的文档。您可以通过调用 ChangeStreamIterable 对象的 fullDocument() 成员方法并传入值 FullDocument.UPDATE_LOOKUP 来配置变更流也返回文档的最新版本,如下所示

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

以下示例使用两个独立的应用程序来演示如何使用变更流监听变更

  • 第一个应用程序,名为 Watch,在 sample_mflix 数据库中的 movies 集合上打开一个变更流。 Watch 使用聚合管道根据 operationType 过滤变更,以便只接收插入和更新事件(删除事件通过省略被排除)。 Watch 使用回调来接收并打印发生在集合上的过滤后的变更事件。

  • 第二个应用程序,名为 WatchCompanion,在 sample_mflix 数据库中的 movies 集合中插入单个文档。接下来, WatchCompanion 使用新的字段值更新该文档。最后, WatchCompanion 删除该文档。

首先,运行 Watch 以打开集合上的变更流,并使用 forEach() 方法在变更流上定义一个回调。当 Watch 运行时,运行 WatchCompanion 通过对集合执行更改来生成变更事件。

注意

此示例使用连接 URI 连接到 MongoDB 实例。要了解更多关于连接到您的 MongoDB 实例的信息,请参阅连接指南.

Watch:

/**
* This file demonstrates how to open a change stream by using the Java driver.
* It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens
* to change events in the "movies" collection. The code uses a change stream with a pipeline
* to only filter for "insert" and "update" events.
*/
package usage.examples;
import java.util.Arrays;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
// Creates instructions to match insert and update operations
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
// Creates a change stream that receives change events for the specified operations
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
final int[] numberOfEvents = {0};
// Prints a message each time the change stream receives a change event, until it receives two events
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion:

// Performs CRUD operations to generate change events when run with the Watch application
package usage.examples;
import java.util.Arrays;
import org.bson.Document;
import org.bson.types.ObjectId;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
// Inserts a sample document into the "movies" collection and print its ID
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Success! Inserted document id: " + insertResult.getInsertedId());
// Updates the sample document and prints the number of modified documents
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
// Deletes the sample document and prints the number of deleted documents
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
// Prints a message if any exceptions occur during the operations
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

如果您按顺序运行前面的应用程序,您应该会看到类似于以下内容的 Watch 应用程序的输出。仅打印 insertupdate 操作,因为聚合管道过滤掉了 delete 操作

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

您还应该看到类似于以下内容的 WatchCompanion 应用程序的输出

Success! Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

提示

旧版 API

如果您正在使用旧版 API,请参阅我们的常见问题解答页面,了解您需要对此代码示例进行哪些更改。

有关本页面上提到的类和方法的更多信息,请参阅以下资源

  • 变更流 服务器手册条目

  • 变更事件 服务器手册条目

  • 聚合管道 服务器手册条目

  • 聚合阶段 服务器手册条目

  • ChangeStreamIterable API 文档

  • MongoCollection.watch() API 文档

  • MongoDatabase.watch() API 文档

  • MongoClient.watch() API 文档

返回

批量操作