更改流
更改流允许应用程序在不进行手动跟踪的复杂性和风险的情况下访问实时数据更改。oplog。应用程序可以使用变更流来订阅单个集合、数据库或整个部署上的所有数据更改,并立即做出反应。由于变更流使用聚合框架,应用程序还可以过滤特定的更改或随意转换通知。
从 MongoDB 5.1 开始,变更流进行了优化,提供了更有效的资源利用率和一些聚合管道阶段的更快执行。
可用性
存储引擎。
副本集和分片集群必须使用 WiredTiger 存储引擎。变更流也可以用于使用 MongoDB 的 静态加密 功能的部署。
副本集协议版本。
副本集和分片集群必须使用副本集协议版本 1 (
pv1
)。读取关注“多数”启用。
变更流 无论是否支持
"majority"
读取关注;也就是说,读取关注majority
支持可以是启用(默认)或 禁用 以使用变更流。
稳定的API支持
更改流包含在稳定API V1中。然而,showExpandedEvents选项不包括在稳定API V1中。
连接
更改流的连接可以使用带有+srv
连接选项的DNS种子列表,或者在连接字符串中单独列出服务器。
如果驱动程序失去了对更改流的连接或连接中断,它将通过具有匹配的读取偏好的集群中的另一个节点尝试重新建立对更改流的连接。如果驱动程序找不到具有正确读取偏好的节点,它将抛出异常。
有关更多信息,请参阅连接字符串URI格式。
监视集合、数据库或部署
您可以对以下对象打开更改流
目标 | 描述 |
---|---|
一个集合 | 您可以为单个集合打开更改流游标(除了 本页面的示例使用 MongoDB 驱动程序打开并操作单个集合的变更流游标。请参阅 |
数据库 | 您可以为单个数据库(不包括 有关 MongoDB 驱动程序方法,请参阅您的驱动程序文档。请参阅 |
部署 | 您可以为部署(无论是复制集还是分片集群)打开变更流游标,以监视除 有关 MongoDB 驱动程序方法,请参阅您的驱动程序文档。请参阅 |
注意
变更流示例
本页面的示例使用 MongoDB 驱动程序说明了如何打开集合的变更流游标以及如何处理变更流游标。
变更流性能考虑因素
如果数据库上打开的活动变更流数量超过连接池大小,您可能会遇到通知延迟。每个变更流在等待下一个事件期间都会使用一个连接和一个getMore操作。为了避免任何延迟问题,您应确保池大小大于打开的变更流数量。有关详细信息,请参阅maxPoolSize设置。
分片集群考虑事项
当在分片集群上打开更改流时
无论更改流的目标是否为特定的分片键范围,
mongos
都会在每个分片上创建单独的更改流。这种行为不受更改流目标是否为特定分片键范围的影响。当
mongos
接收到更改流结果时,它会对这些结果进行排序和过滤。如果需要,mongos
还会执行fullDocument
查找。
为了获得最佳性能,请限制在更改流中使用 $lookup
查询。
打开更改流
要打开更改流
对于副本集,您可以从任何承载数据的成员发出打开更改流操作。
对于分片集群,您必须从
mongos
发出打开更改流操作。
以下示例打开一个集合的更改流,并遍历游标以检索更改流文档。 [1]
➤使用页面上方右边的“选择您的语言”下拉菜单设置此页面上示例的语言。
以下C语言示例假设您已连接到MongoDB副本集并访问了数据库,并且包含一个inventory
集合。
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
以下C#示例假设您已连接到MongoDB副本集并访问了数据库,并且包含一个inventory
集合。
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
以下Go语言示例假设您已连接到MongoDB副本集并访问了数据库,并且包含一个inventory
集合。
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
以下Java示例假设您已经连接到MongoDB副本集并访问了一个数据库,其中包含一个库存
集合。
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
以下Kotlin示例假设您已连接到MongoDB副本集并可以访问包含库存
集合的数据库。要了解更多关于完成这些任务的信息,请参阅Kotlin Coroutine Driver 数据库和集合指南。
val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } }
以下示例假设您已经连接到MongoDB副本集并访问了一个数据库,其中包含一个库存
集合。
cursor = db.inventory.watch() document = await cursor.next()
以下Node.js示例假设您已连接到MongoDB副本集并访问了一个数据库,其中包含一个库存
集合。
以下示例使用流来处理更改事件。
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器来处理更改事件
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
ChangeStream扩展自EventEmitter。
以下示例假设您已连接到MongoDB副本集并访问了一个数据库,其中包含一个库存
集合。
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
以下Python示例假设您已连接到MongoDB副本集并访问了一个数据库,其中包含一个库存
集合。
cursor = db.inventory.watch() next(cursor)
以下示例假设您已连接到MongoDB副本集并访问了一个数据库,其中包含一个库存
集合。
cursor = inventory.watch.to_enum next_change = cursor.next
以下Swift(异步)示例假设你已经连接到MongoDB副本集并访问了一个数据库,该数据库包含一个库存
集合。(点击此处查看详细说明)
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
以下Swift(同步)示例假设你已经连接到MongoDB副本集并访问了一个数据库,该数据库包含一个库存
集合。(点击此处查看详细说明)
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
要从游标中检索数据变更事件,请迭代变更流游标。有关变更流事件的更多信息,请参阅变更事件。
以下条件之一发生时,变更流游标将保持打开状态:
游标被显式关闭。
发生无效化事件;例如,集合删除或重命名。
与MongoDB部署的连接关闭或超时。有关更多信息,请参阅游标行为。
如果部署是分片集群,移除分片可能会使打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。
注意
未关闭游标的生命周期依赖于语言。
[1] | 你可以指定一个startAtOperationTime 来在特定时间点打开游标。如果指定的起始点在过去,它必须在oplog的时间范围内。 |
修改更改流输出
➤使用页面上方右边的“选择您的语言”下拉菜单设置此页面上示例的语言。
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
管道列表包含一个单独的 $match
阶段,该阶段筛选符合以下任一或两个标准的操作:
username
值为alice
operationType
值为delete
将管道传递给 watch()
方法,将使更改流在通过指定的 pipeline
后返回通知。
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
val pipeline = listOf( Aggregates.match( or( eq("fullDocument.username", "alice"), `in`("operationType", listOf("delete")) ) )) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
管道列表包含一个单独的 $match
阶段,该阶段筛选符合以下任一或两个标准的操作:
username
值为alice
operationType
值为delete
将管道传递给 watch()
方法,将使更改流在通过指定的 pipeline
后返回通知。
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
以下示例使用流来处理更改事件。
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器来处理更改事件
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
您可以通过在配置更改流时提供以下管道阶段的数组来控制更改流输出:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
提示
更改流事件文档的 resume token 由更改流的 _id
字段充当。不要使用管道修改或删除更改流事件的 _id
字段。
从 MongoDB 4.2 开始,如果更改流聚合管道修改了事件的一个 _id
字段,更改流将抛出异常。
有关更改流响应文档格式的更多信息,请参阅更改事件。
在完整文档中查找更新操作
默认情况下,更改流只返回更新操作期间字段的增量。然而,您可以配置更改流以返回更新文档的最新多数提交版本。
➤使用页面上方右边的“选择您的语言”下拉菜单设置此页面上示例的语言。
要返回更新文档的最新多数提交版本,请将“fullDocument”选项与“updateLookup”值一起传递给“mongoc_collection_watch”方法。
在下面的示例中,所有更新操作通知都包含一个“fullDocument”字段,它表示受更新操作影响的文档的当前版本。
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
要返回更新文档的最新多数提交版本,请将“FullDocument = ChangeStreamFullDocumentOption.UpdateLookup”传递给db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个“FullDocument”字段,它表示受更新操作影响的文档的当前版本。
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
要返回更新文档的最新多数提交版本,请使用“SetFullDocument(options.UpdateLookup)”更改流选项。
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
要返回更新文档的最新多数提交版本,请将“FullDocument.UPDATE_LOOKUP”传递给“db.collection.watch.fullDocument()”方法。
在下面的示例中,所有更新操作通知都包含一个“FullDocument”字段,它表示受更新操作影响的文档的当前版本。
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
要返回更新文档的最新多数提交版本,请将 FullDocument.UPDATE_LOOKUP
传递给 ChangeStreamFlow.fullDocument() 方法。
在下面的示例中,所有更新操作通知都包含一个“FullDocument”字段,它表示受更新操作影响的文档的当前版本。
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } }
要返回更新文档的最新多数提交版本,请将 full_document='updateLookup'
传递给 db.collection.watch()
方法。
以下示例中,所有更新操作通知都包含一个表示受更新操作影响的文档 当前版本 的 `full_document
字段。
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
要返回更新文档的最新多数提交版本,请将 { fullDocument: 'updateLookup' }
传递给 db.collection.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个“fullDocument”字段,它表示受更新操作影响的文档的当前版本。
以下示例使用流来处理更改事件。
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
或者,您也可以使用迭代器来处理更改事件
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
要返回更新文档的最新多数提交版本,请将 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
传递给 db.watch()
方法。
在下面的示例中,所有更新操作通知都包含一个“fullDocument”字段,它表示受更新操作影响的文档的当前版本。
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
要返回更新文档的最新多数提交版本,请将 full_document='updateLookup'
传递给 db.collection.watch()
方法。
以下示例中,所有更新操作通知都包含一个表示受更新操作影响的文档 当前版本 的 full_document
字段。
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
要返回更新文档的最新多数提交版本,请将 full_document: 'updateLookup'
传递给 db.watch()
方法。
以下示例中,所有更新操作通知都包含一个表示受更新操作影响的文档 当前版本 的 full_document
字段。
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
要返回更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup)
传递给 watch()
方法。
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
要返回更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup)
传递给 watch()
方法。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
注意
如果有在更新操作和查找之间修改了更新文档的多数提交操作,则返回的完整文档可能与更新操作时的文档有显著差异。
但是,更改流文档中包含的增量始终正确描述了应用于该更改流事件的监视集合更改。
如果以下情况之一为真,则更新事件的 fullDocument
字段可能缺失
如果文档被删除或集合在更新和查找之间被删除。
如果更新更改了该集合分片键中至少一个字段的值。
有关更改流响应文档格式的更多信息,请参阅更改事件。
恢复更改流
更改流可以通过指定恢复令牌来恢复,在打开游标时使用 resumeAfter 或 startAfter。
resumeAfter
用于更改流
您可以通过在打开游标时将恢复令牌传递给 resumeAfter
来在特定事件后恢复更改流。
有关恢复令牌的更多信息,请参阅 恢复令牌。
重要
oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作,如果时间戳在过去。
您不能使用
resumeAfter
在 invalidate 事件(例如,集合删除或重命名)关闭流之后恢复更改流。相反,您可以使用 startAfter 在 invalidate 事件之后启动新的更改流。
在下面的示例中,将 resumeAfter
选项附加到流选项中,以便在销毁后重新创建流。将 _id
传递给更改流尝试从指定的操作开始恢复通知。
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
以下示例中,从最后一个更改流文档中检索了resumeToken
并将其作为选项传递给Watch()
方法。将resumeToken
传递给Watch()
方法将使更改流尝试从在resume token中指定的操作之后开始恢复通知。
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
您可以使用ChangeStreamOptions.SetResumeAfter来指定更改流的恢复令牌。如果设置了resumeAfter选项,更改流将在resume token中指定的操作之后恢复通知。下面的示例中,SetResumeAfter
接受必须解析为恢复令牌的值,例如示例中的resumeToken
。
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
您可以使用resumeAfter()
方法在resume token中指定的操作之后恢复通知。下面的示例中,resumeAfter()
方法接受必须解析为恢复令牌的值,例如示例中的resumeToken
。
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
您可以使用ChangeStreamFlow.resumeAfter()方法在resume token中指定的操作之后恢复通知。下面的示例中,resumeAfter()
方法接受必须解析为恢复令牌的值,如示例中的resumeToken
变量。
val resumeToken = BsonDocument() val job = launch { val changeStream = collection.watch() .resumeAfter(resumeToken) changeStream.collect { println(it) } }
您可以使用resume_after
修饰符在resume token中指定的操作之后恢复通知。下面的示例中,resume_after
修饰符接受必须解析为恢复令牌的值,例如示例中的resume_token
。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
您可以使用 resumeAfter
选项在指定恢复令牌的操作之后恢复通知。该 resumeAfter
选项接受一个必须解析为恢复令牌的值,例如下面示例中的 resumeToken
。
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
您可以使用 resumeAfter
选项在指定恢复令牌的操作之后恢复通知。该 resumeAfter
选项接受一个必须解析为恢复令牌的值,例如下面示例中的 $resumeToken
。
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
您可以使用resume_after
修饰符在resume token中指定的操作之后恢复通知。下面的示例中,resume_after
修饰符接受必须解析为恢复令牌的值,例如示例中的resume_token
。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
您可以使用resume_after
修饰符在resume token中指定的操作之后恢复通知。下面的示例中,resume_after
修饰符接受必须解析为恢复令牌的值,例如示例中的resume_token
。
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
您可以使用 resumeAfter
选项在指定恢复令牌的操作之后恢复通知。该 resumeAfter
选项接受一个必须解析为恢复令牌的值,例如下面示例中的 resumeToken
。
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
您可以使用 resumeAfter
选项在指定恢复令牌的操作之后恢复通知。该 resumeAfter
选项接受一个必须解析为恢复令牌的值,例如下面示例中的 resumeToken
。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
用于更改流
您可以通过在打开游标时传递恢复令牌到 startAfter
来在特定事件后启动一个新的更改流。与 resumeAfter 不同,startAfter
可以通过创建一个新的更改流来在 invalidate 事件 之后恢复通知。
有关恢复令牌的更多信息,请参阅 恢复令牌。
重要
oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作,如果时间戳在过去。
恢复令牌
恢复令牌可以从多个来源获取
来源 | 描述 |
---|---|
每个更改事件通知都在 _id 字段上包含一个恢复令牌。 | |
聚合阶段 此字段仅在使用 | |
getMore 命令包含在 cursor.postBatchResumeToken 字段上的恢复令牌。 |
从 MongoDB 4.2 开始,如果更改流聚合管道修改了事件的一个 _id
字段,更改流将抛出异常。
提示
MongoDB 提供了一个扩展到 片段,这是一个对 mongosh
的扩展,可以解码十六进制编码的恢复令牌。
您可以从以下resumetoken片段中安装和运行mongosh
:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
如果您的系统已安装npm
,您也可以从命令行(不使用mongosh
)运行resumetoken。
npx mongodb-resumetoken-decoder <RESUME TOKEN>
以下内容提供了更多关于
变更事件中的恢复令牌
变更事件通知包含在 _id
字段上的恢复令牌
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
来自 aggregate
的恢复令牌
使用 aggregate
命令时,聚合阶段 $changeStream
包含在 cursor.postBatchResumeToken
字段上的恢复令牌
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
来自 getMore
的恢复令牌
getMore
命令还包含在 cursor.postBatchResumeToken
字段上的恢复令牌
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
用例
变更流可以为依赖业务系统的架构提供帮助,一旦数据变更持久化,就通知下游系统。例如,变更流可以在实施提取、转换和加载(ETL)服务、跨平台同步、协作功能和通知服务时节省开发人员的时间。
访问控制
对于强制执行自托管部署的身份验证和授权的部署:
要针对特定集合打开变更流,应用程序必须具有授予对应集合上
changeStream
和find
操作的权限。{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 要在一个数据库上打开变更流,应用程序必须具有授予数据库中所有非
system
集合上changeStream
和find
操作的权限。{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 要在一个整个部署上打开变更流,应用程序必须具有授予部署中所有数据库的所有非
system
集合上changeStream
和find
操作的权限。{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
事件通知
更改流仅通知已持续到副本集多数数据成员的数据更改。这确保了只有由多数已提交且在故障情况下可持久化的更改触发通知。
例如,考虑一个3成员的副本集,其中更改流光标针对主节点打开。如果一个客户端发出插入操作,则更改流仅在插入已持续到多数数据成员后,才通知应用程序数据更改。
如果操作与事务相关联,则更改事件文档包含txnNumber
和lsid
。
排序
除非提供了显式的排序,否则更改流使用简单的二进制比较。
更改流和孤立文档
带有文档前后图像的变更流
从 MongoDB 6.0 版本开始,您可以使用 变更流事件 输出文档变更前后的版本(文档前后图像)
前图像是替换、更新或删除之前的文档。插入的文档没有前图像。
后图像是插入、替换或更新之后的文档。删除的文档没有后图像。
使用
db.createCollection()
、create
或collMod
为集合启用changeStreamPreAndPostImages
。
如果图像在文档更新或删除操作时未被启用,则 变更流事件 中不可用
在集合上未启用。
在
expireAfterSeconds
中设置的保留时间后删除。以下示例在整个集群上设置
expireAfterSeconds
为100
秒use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 以下示例返回当前
changeStreamOptions
设置,包括expireAfterSeconds
db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) 将
expireAfterSeconds
设置为off
使用默认保留策略:前图像和后图像保留直到相应的变更流事件从 oplog 中删除。如果从 oplog 中删除了变更流事件,则相应的图像和后图像也将被删除,无论
expireAfterSeconds
的图像保留时间。
其他注意事项
启用前后图像会消耗存储空间并增加处理时间。只有需要时才启用前后图像。
将变更流事件的大小限制在小于 16 兆字节。要限制事件大小,您可以
将文档大小限制在 8 兆字节。如果其他变更流事件字段(如
updateDescription
)不大,则可以在 变更流输出 中同时请求前后图像。如果其他变更流事件字段(如
updateDescription
)不大,则在变更流输出中仅请求后图像的文档,大小不超过 16 兆字节。如果
文档更新仅影响文档结构或内容的一小部分,并且
不会引发
replace
更改事件。一个replace
事件总是包括后图像。
要请求预图像,您需要在
db.collection.watch()
中将fullDocumentBeforeChange
设置为required
或whenAvailable
。要请求后图像,您可以使用相同的方法设置fullDocument
。预图像被写入到
config.system.preimages
集合。config.system.preimages
集合可能会变得很大。为了限制集合大小,您可以像之前显示的那样为预图像设置expireAfterSeconds
时间。预图像将由后台进程异步删除。
重要
不兼容功能
从MongoDB 6.0开始,如果您正在使用文档预图像和后图像为更改流,则必须在降级到较早的MongoDB版本之前,使用collMod
命令禁用每个集合的changeStreamPreAndPostImages。
有关带有更改流输出的完整示例,请参阅具有文档预图像和后图像的更改流。