监视数据更改
概述
在本指南中,您可以通过使用 变更流 来学习如何监控文档更改。
变更流输出新的更改事件,提供对实时数据更改的访问。您可以在集合、数据库或客户端对象上打开变更流。
示例数据
本指南中的示例使用以下内容Course
结构作为 courses
集合中文档的模型
type Course struct { Title string Enrollment int32 }
要运行本指南中的示例,请使用以下代码片段将以下文档加载到 db
数据库中的 courses
集合中
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
提示
不存在的数据库和集合
如果在执行写入操作时必要的数据库和集合不存在,则服务器隐式创建它们。
每个文档都包含一个大学课程的描述,包括课程标题和最大招生人数,对应于每个文档中的 title
和 enrollment
字段。
注意
由于驱动程序生成它们是唯一的,每个示例输出都显示了截断的 _data
、clusterTime
和 ObjectID
值。
打开变更流
要打开变更流,请使用 Watch()
方法。该方法需要两个参数:上下文参数和管道参数。要返回所有变更,请传入一个空的 Pipeline
对象。
示例
以下示例在 courses
集合上打开变更流,并输出所有变更
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
如果您在单独的程序或壳中修改 courses
集合,此代码将按变更发生的情况打印出您的变更。插入一个具有 title
值为 "Advanced Screenwriting"
和 enrollment
值为 20
的文档,将导致以下变更事件
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
修改变更流输出
使用管道参数来修改更改流输出。此参数允许您仅监视某些更改事件。将管道参数格式化为文档数组,其中每个文档代表一个聚合阶段。
您可以在该参数中使用以下管道阶段:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
示例
以下示例在db
数据库上打开更改流,但仅监视新删除操作
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
注意
在db
数据库上调用Watch()
方法,因此代码输出该数据库中任何集合的新删除操作。
修改Watch()
的行为
使用options
参数修改Watch()
方法的行为。
您可以为Watch()
方法指定以下选项:
ResumeAfter
StartAfter
FullDocument
FullDocumentBeforeChange
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
Comment
ShowExpandedEvents
StartAtOperationTime
Custom
CustomPipeline
有关这些选项的更多信息,请访问MongoDB服务器手册.
前后图像
当您对集合执行任何CRUD操作时,默认情况下,相应的更改事件文档仅包含操作修改的字段的变化量。您可以通过指定Watch()
方法的options
参数中的设置,在变化量之外看到更改前后的完整文档。
如果您想查看文档的后像,即更改后的完整文档,将options
参数中的FullDocument
字段设置为以下值之一
UpdateLookup
:更改事件文档包含整个更改文档的副本。WhenAvailable
:如果后像是可用的,更改事件文档将包含更改事件的修改文档的后像。Required
:输出与WhenAvailable
相同,但如果后像不可用,驱动程序将引发服务器端错误。
如果您想查看文档的前像,即更改前的完整文档,将options
参数中的FullDocumentBeforeChange
字段设置为以下值之一
WhenAvailable
:如果前像是可用的,更改事件文档将包含更改事件的修改文档的前像。Required
:输出与WhenAvailable
相同,但如果前像不可用,驱动程序将引发服务器端错误。
重要
要访问文档的前后像,您必须为集合启用changeStreamPreAndPostImages
。有关说明和更多信息,请参阅MongoDB服务器手册。
注意
插入的文档没有前像,删除的文档没有后像。
示例
以下示例在courses
集合上调用Watch()
方法。它为options
参数的FullDocument
字段指定一个值,以输出整个修改文档的副本,而不是只输出已更改的字段。
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
将具有"World Fiction"
标题的文档的enrollment
值从35
更改为30
会导致以下更改事件。
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
未指定FullDocument
选项时,相同的更新操作不再在更改事件文档中输出"fullDocument"
值。
更多信息
有关更改流的可运行示例,请参阅监视数据更改.
有关更改流的更多信息,请参阅更改流。
API 文档
要了解更多关于Watch()
方法的信息,请访问以下API文档链接