文档菜单
文档首页
/
MongoDB 手册

更改流

本页内容

  • 可用性
  • 连接
  • 监视集合、数据库或部署
  • 更改流性能考虑因素
  • 打开更改流
  • 修改更改流输出
  • 更新操作查找完整文档
  • 恢复更改流
  • 用例
  • 访问控制
  • 事件通知
  • 排序规则
  • 更改流和孤儿文档
  • 具有文档前后图像的更改流

更改流允许应用程序在不进行手动跟踪的复杂性和风险的情况下访问实时数据更改。oplog。应用程序可以使用变更流来订阅单个集合、数据库或整个部署上的所有数据更改,并立即做出反应。由于变更流使用聚合框架,应用程序还可以过滤特定的更改或随意转换通知。

从 MongoDB 5.1 开始,变更流进行了优化,提供了更有效的资源利用率和一些聚合管道阶段的更快执行。

变更流适用于 副本集分片集群:

更改流包含在稳定API V1中。然而,showExpandedEvents选项不包括在稳定API V1中。

更改流的连接可以使用带有+srv连接选项的DNS种子列表,或者在连接字符串中单独列出服务器。

如果驱动程序失去了对更改流的连接或连接中断,它将通过具有匹配的读取偏好的集群中的另一个节点尝试重新建立对更改流的连接。如果驱动程序找不到具有正确读取偏好的节点,它将抛出异常。

有关更多信息,请参阅连接字符串URI格式。

您可以对以下对象打开更改流

目标
描述
一个集合

您可以为单个集合打开更改流游标(除了system集合,或者任何位于adminlocalconfig数据库中的集合)。

本页面的示例使用 MongoDB 驱动程序打开并操作单个集合的变更流游标。请参阅mongosh方法db.collection.watch()

数据库

您可以为单个数据库(不包括adminlocalconfig数据库)打开变更流游标,以监视所有非系统集合的更改。

有关 MongoDB 驱动程序方法,请参阅您的驱动程序文档。请参阅mongosh方法db.watch()

部署

您可以为部署(无论是复制集还是分片集群)打开变更流游标,以监视除adminlocalconfig之外的所有数据库中的所有非系统集合的更改。

有关 MongoDB 驱动程序方法,请参阅您的驱动程序文档。请参阅mongosh方法Mongo.watch()

注意

变更流示例

本页面的示例使用 MongoDB 驱动程序说明了如何打开集合的变更流游标以及如何处理变更流游标。

如果数据库上打开的活动变更流数量超过连接池大小,您可能会遇到通知延迟。每个变更流在等待下一个事件期间都会使用一个连接和一个getMore操作。为了避免任何延迟问题,您应确保池大小大于打开的变更流数量。有关详细信息,请参阅maxPoolSize设置。

当在分片集群上打开更改流时

  • 无论更改流的目标是否为特定的分片键范围,mongos 都会在每个分片上创建单独的更改流。这种行为不受更改流目标是否为特定分片键范围的影响。

  • mongos 接收到更改流结果时,它会对这些结果进行排序和过滤。如果需要,mongos 还会执行 fullDocument 查找。

为了获得最佳性能,请限制在更改流中使用 $lookup 查询。

要打开更改流

  • 对于副本集,您可以从任何承载数据的成员发出打开更改流操作。

  • 对于分片集群,您必须从 mongos 发出打开更改流操作。

以下示例打开一个集合的更改流,并遍历游标以检索更改流文档。 [1]


使用页面上方右边的“选择您的语言”下拉菜单设置此页面上示例的语言。


以下C语言示例假设您已连接到MongoDB副本集并访问了数据库,并且包含一个inventory集合。

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

以下C#示例假设您已连接到MongoDB副本集并访问了数据库,并且包含一个inventory集合。

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

以下Go语言示例假设您已连接到MongoDB副本集并访问了数据库,并且包含一个inventory集合。

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

以下Java示例假设您已经连接到MongoDB副本集并访问了一个数据库,其中包含一个库存集合。

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

以下Kotlin示例假设您已连接到MongoDB副本集并可以访问包含库存集合的数据库。要了解更多关于完成这些任务的信息,请参阅Kotlin Coroutine Driver 数据库和集合指南。

val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}

以下示例假设您已经连接到MongoDB副本集并访问了一个数据库,其中包含一个库存集合。

cursor = db.inventory.watch()
document = await cursor.next()

以下Node.js示例假设您已连接到MongoDB副本集并访问了一个数据库,其中包含一个库存集合。

以下示例使用流来处理更改事件。

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器来处理更改事件

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

ChangeStream扩展自EventEmitter。

以下示例假设您已连接到MongoDB副本集并访问了一个数据库,其中包含一个库存集合。

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

以下Python示例假设您已连接到MongoDB副本集并访问了一个数据库,其中包含一个库存集合。

cursor = db.inventory.watch()
next(cursor)

以下示例假设您已连接到MongoDB副本集并访问了一个数据库,其中包含一个库存集合。

cursor = inventory.watch.to_enum
next_change = cursor.next

以下Swift(异步)示例假设你已经连接到MongoDB副本集并访问了一个数据库,该数据库包含一个库存集合。(点击此处查看详细说明)

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

以下Swift(同步)示例假设你已经连接到MongoDB副本集并访问了一个数据库,该数据库包含一个库存集合。(点击此处查看详细说明)

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

要从游标中检索数据变更事件,请迭代变更流游标。有关变更流事件的更多信息,请参阅变更事件。

以下条件之一发生时,变更流游标将保持打开状态:

  • 游标被显式关闭。

  • 发生无效化事件;例如,集合删除或重命名。

  • 与MongoDB部署的连接关闭或超时。有关更多信息,请参阅游标行为

  • 如果部署是分片集群,移除分片可能会使打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。

注意

未关闭游标的生命周期依赖于语言。

[1] 你可以指定一个startAtOperationTime来在特定时间点打开游标。如果指定的起始点在过去,它必须在oplog的时间范围内。

使用页面上方右边的“选择您的语言”下拉菜单设置此页面上示例的语言。


您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

管道列表包含一个单独的 $match 阶段,该阶段筛选符合以下任一或两个标准的操作:

  • username 值为 alice

  • operationType 值为 delete

将管道传递给 watch() 方法,将使更改流在通过指定的 pipeline 后返回通知。

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

管道列表包含一个单独的 $match 阶段,该阶段筛选符合以下任一或两个标准的操作:

  • username 值为 alice

  • operationType 值为 delete

将管道传递给 watch() 方法,将使更改流在通过指定的 pipeline 后返回通知。

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

以下示例使用流来处理更改事件。

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器来处理更改事件

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

提示

更改流事件文档的 resume token 由更改流的 _id 字段充当。不要使用管道修改或删除更改流事件的 _id 字段。

从 MongoDB 4.2 开始,如果更改流聚合管道修改了事件的一个 _id 字段,更改流将抛出异常。

有关更改流响应文档格式的更多信息,请参阅更改事件

默认情况下,更改流只返回更新操作期间字段的增量。然而,您可以配置更改流以返回更新文档的最新多数提交版本。


使用页面上方右边的“选择您的语言”下拉菜单设置此页面上示例的语言。


要返回更新文档的最新多数提交版本,请将“fullDocument”选项与“updateLookup”值一起传递给“mongoc_collection_watch”方法。

在下面的示例中,所有更新操作通知都包含一个“fullDocument”字段,它表示受更新操作影响的文档的当前版本。

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

要返回更新文档的最新多数提交版本,请将“FullDocument = ChangeStreamFullDocumentOption.UpdateLookup”传递给db.collection.watch()方法。

在下面的示例中,所有更新操作通知都包含一个“FullDocument”字段,它表示受更新操作影响的文档的当前版本。

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

要返回更新文档的最新多数提交版本,请使用“SetFullDocument(options.UpdateLookup)”更改流选项。

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

要返回更新文档的最新多数提交版本,请将“FullDocument.UPDATE_LOOKUP”传递给“db.collection.watch.fullDocument()”方法。

在下面的示例中,所有更新操作通知都包含一个“FullDocument”字段,它表示受更新操作影响的文档的当前版本。

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

要返回更新文档的最新多数提交版本,请将 FullDocument.UPDATE_LOOKUP 传递给 ChangeStreamFlow.fullDocument() 方法。

在下面的示例中,所有更新操作通知都包含一个“FullDocument”字段,它表示受更新操作影响的文档的当前版本。

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}

要返回更新文档的最新多数提交版本,请将 full_document='updateLookup' 传递给 db.collection.watch() 方法。

以下示例中,所有更新操作通知都包含一个表示受更新操作影响的文档 当前版本`full_document 字段。

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

要返回更新文档的最新多数提交版本,请将 { fullDocument: 'updateLookup' } 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个“fullDocument”字段,它表示受更新操作影响的文档的当前版本。

以下示例使用流来处理更改事件。

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
// process next document
});

或者,您也可以使用迭代器来处理更改事件

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

要返回更新文档的最新多数提交版本,请将 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" 传递给 db.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个“fullDocument”字段,它表示受更新操作影响的文档的当前版本。

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

要返回更新文档的最新多数提交版本,请将 full_document='updateLookup' 传递给 db.collection.watch() 方法。

以下示例中,所有更新操作通知都包含一个表示受更新操作影响的文档 当前版本full_document 字段。

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

要返回更新文档的最新多数提交版本,请将 full_document: 'updateLookup' 传递给 db.watch() 方法。

以下示例中,所有更新操作通知都包含一个表示受更新操作影响的文档 当前版本full_document 字段。

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

要返回更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup) 传递给 watch() 方法。

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

要返回更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup) 传递给 watch() 方法。

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

注意

如果有在更新操作和查找之间修改了更新文档的多数提交操作,则返回的完整文档可能与更新操作时的文档有显著差异。

但是,更改流文档中包含的增量始终正确描述了应用于该更改流事件的监视集合更改。

如果以下情况之一为真,则更新事件的 fullDocument 字段可能缺失

  • 如果文档被删除或集合在更新和查找之间被删除。

  • 如果更新更改了该集合分片键中至少一个字段的值。

有关更改流响应文档格式的更多信息,请参阅更改事件

更改流可以通过指定恢复令牌来恢复,在打开游标时使用 resumeAfterstartAfter

您可以通过在打开游标时将恢复令牌传递给 resumeAfter 来在特定事件后恢复更改流。

有关恢复令牌的更多信息,请参阅 恢复令牌

重要

  • oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作,如果时间戳在过去。

  • 您不能使用 resumeAfterinvalidate 事件(例如,集合删除或重命名)关闭流之后恢复更改流。相反,您可以使用 startAfterinvalidate 事件之后启动新的更改流。

在下面的示例中,将 resumeAfter 选项附加到流选项中,以便在销毁后重新创建流。将 _id 传递给更改流尝试从指定的操作开始恢复通知。

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

以下示例中,从最后一个更改流文档中检索了resumeToken并将其作为选项传递给Watch()方法。将resumeToken传递给Watch()方法将使更改流尝试从在resume token中指定的操作之后开始恢复通知。

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

您可以使用ChangeStreamOptions.SetResumeAfter来指定更改流的恢复令牌。如果设置了resumeAfter选项,更改流将在resume token中指定的操作之后恢复通知。下面的示例中,SetResumeAfter接受必须解析为恢复令牌的值,例如示例中的resumeToken

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

您可以使用resumeAfter()方法在resume token中指定的操作之后恢复通知。下面的示例中,resumeAfter()方法接受必须解析为恢复令牌的值,例如示例中的resumeToken

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

您可以使用ChangeStreamFlow.resumeAfter()方法在resume token中指定的操作之后恢复通知。下面的示例中,resumeAfter()方法接受必须解析为恢复令牌的值,如示例中的resumeToken变量。

val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}

您可以使用resume_after修饰符在resume token中指定的操作之后恢复通知。下面的示例中,resume_after修饰符接受必须解析为恢复令牌的值,例如示例中的resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

您可以使用 resumeAfter 选项在指定恢复令牌的操作之后恢复通知。该 resumeAfter 选项接受一个必须解析为恢复令牌的值,例如下面示例中的 resumeToken

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
});
});

您可以使用 resumeAfter 选项在指定恢复令牌的操作之后恢复通知。该 resumeAfter 选项接受一个必须解析为恢复令牌的值,例如下面示例中的 $resumeToken

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

您可以使用resume_after修饰符在resume token中指定的操作之后恢复通知。下面的示例中,resume_after修饰符接受必须解析为恢复令牌的值,例如示例中的resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

您可以使用resume_after修饰符在resume token中指定的操作之后恢复通知。下面的示例中,resume_after修饰符接受必须解析为恢复令牌的值,例如示例中的resume_token

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

您可以使用 resumeAfter 选项在指定恢复令牌的操作之后恢复通知。该 resumeAfter 选项接受一个必须解析为恢复令牌的值,例如下面示例中的 resumeToken

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

您可以使用 resumeAfter 选项在指定恢复令牌的操作之后恢复通知。该 resumeAfter 选项接受一个必须解析为恢复令牌的值,例如下面示例中的 resumeToken

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

您可以通过在打开游标时传递恢复令牌到 startAfter 来在特定事件后启动一个新的更改流。与 resumeAfter 不同,startAfter 可以通过创建一个新的更改流来在 invalidate 事件 之后恢复通知。

有关恢复令牌的更多信息,请参阅 恢复令牌

重要

  • oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作,如果时间戳在过去。

恢复令牌可以从多个来源获取

来源
描述
每个更改事件通知都在 _id 字段上包含一个恢复令牌。

聚合阶段 $changeStream 包含在 cursor.postBatchResumeToken 字段上的恢复令牌。

此字段仅在使用 aggregate 命令时出现。

getMore 命令包含在 cursor.postBatchResumeToken 字段上的恢复令牌。

从 MongoDB 4.2 开始,如果更改流聚合管道修改了事件的一个 _id 字段,更改流将抛出异常。

提示

MongoDB 提供了一个扩展到 片段,这是一个对 mongosh 的扩展,可以解码十六进制编码的恢复令牌。

您可以从以下resumetoken片段中安装和运行mongosh

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

如果您的系统已安装npm,您也可以从命令行(不使用mongosh)运行resumetoken

npx mongodb-resumetoken-decoder <RESUME TOKEN>

以下内容提供了更多关于

变更事件通知包含在 _id 字段上的恢复令牌

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

使用 aggregate 命令时,聚合阶段 $changeStream 包含在 cursor.postBatchResumeToken 字段上的恢复令牌

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore 命令还包含在 cursor.postBatchResumeToken 字段上的恢复令牌

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

变更流可以为依赖业务系统的架构提供帮助,一旦数据变更持久化,就通知下游系统。例如,变更流可以在实施提取、转换和加载(ETL)服务、跨平台同步、协作功能和通知服务时节省开发人员的时间。

对于强制执行自托管部署的身份验证授权的部署:

  • 要针对特定集合打开变更流,应用程序必须具有授予对应集合上changeStreamfind操作的权限。

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • 要在一个数据库上打开变更流,应用程序必须具有授予数据库中所有非system集合上changeStreamfind操作的权限。

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • 要在一个整个部署上打开变更流,应用程序必须具有授予部署中所有数据库的所有非system集合上changeStreamfind操作的权限。

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

更改流仅通知已持续到副本集多数数据成员的数据更改。这确保了只有由多数已提交且在故障情况下可持久化的更改触发通知。

例如,考虑一个3成员的副本集,其中更改流光标针对主节点打开。如果一个客户端发出插入操作,则更改流仅在插入已持续到多数数据成员后,才通知应用程序数据更改。

如果操作与事务相关联,则更改事件文档包含txnNumberlsid

除非提供了显式的排序,否则更改流使用简单的二进制比较。

从MongoDB 5.3版本开始,在范围迁移期间,对于孤立文档的更新不会生成更改流事件。

从 MongoDB 6.0 版本开始,您可以使用 变更流事件 输出文档变更前后的版本(文档前后图像)

  • 前图像是替换、更新或删除之前的文档。插入的文档没有前图像。

  • 后图像是插入、替换或更新之后的文档。删除的文档没有后图像。

  • 使用 db.createCollection()createcollMod 为集合启用 changeStreamPreAndPostImages

如果图像在文档更新或删除操作时未被启用,则 变更流事件 中不可用

  • 在集合上未启用。

  • expireAfterSeconds 中设置的保留时间后删除。

    • 以下示例在整个集群上设置 expireAfterSeconds100

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • 以下示例返回当前 changeStreamOptions 设置,包括 expireAfterSeconds

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • expireAfterSeconds 设置为 off 使用默认保留策略:前图像和后图像保留直到相应的变更流事件从 oplog 中删除。

    • 如果从 oplog 中删除了变更流事件,则相应的图像和后图像也将被删除,无论 expireAfterSeconds 的图像保留时间。

其他注意事项

  • 启用前后图像会消耗存储空间并增加处理时间。只有需要时才启用前后图像。

  • 将变更流事件的大小限制在小于 16 兆字节。要限制事件大小,您可以

    • 将文档大小限制在 8 兆字节。如果其他变更流事件字段(如 updateDescription)不大,则可以在 变更流输出 中同时请求前后图像。

    • 如果其他变更流事件字段(如 updateDescription)不大,则在变更流输出中仅请求后图像的文档,大小不超过 16 兆字节。

    • 如果

      • 文档更新仅影响文档结构或内容的一小部分,并且

      • 不会引发replace更改事件。一个replace事件总是包括后图像。

  • 要请求预图像,您需要在db.collection.watch()中将fullDocumentBeforeChange设置为requiredwhenAvailable。要请求后图像,您可以使用相同的方法设置fullDocument

  • 预图像被写入到config.system.preimages集合。

    • config.system.preimages集合可能会变得很大。为了限制集合大小,您可以像之前显示的那样为预图像设置expireAfterSeconds时间。

    • 预图像将由后台进程异步删除。

重要

不兼容功能

从MongoDB 6.0开始,如果您正在使用文档预图像和后图像为更改流,则必须在降级到较早的MongoDB版本之前,使用collMod命令禁用每个集合的changeStreamPreAndPostImages

提示

另请参阅

有关带有更改流输出的完整示例,请参阅具有文档预图像和后图像的更改流。

返回

限制