MongoDB\Database::watch()
新增在版本中1.4.
定义
MongoDB\Database::watch()
执行更改流操作。可以监视数据库级别的更改。
function watch( array $pipeline = [], array $options = [] ): MongoDB\ChangeStream
参数
$pipeline
: 数组|对象- 要附加到初始
$changeStream
阶段的阶段管道。 $options
: 数组指定所需选项的数组。
名称类型描述batchSize整数指定游标的批处理大小,该大小将应用于初始
aggregate
命令和任何后续的getMore
命令。这决定了从服务器响应中返回的更改事件的最大数量。无论
batchSize
选项如何,更改流的初始aggregate
命令响应通常不包含任何文档,除非使用另一个选项来配置其起始点(例如,startAfter
)。codecMongoDB\Codec\DocumentCodec...用于编码或解码文档的 编解码器。此选项与
typeMap
选项互斥。新增在版本中1.17.
排序规则数组|对象注释混合完整文档字符串确定
fullDocument
响应字段如何为更新操作填充。默认情况下,更改流仅返回更新操作的字段增量(通过
updateDescription
字段)并省略fullDocument
。插入和替换操作始终包含fullDocument
字段。删除操作省略该字段,因为文档已不存在。指定 "updateLookup" 以返回更新文档的当前大多数已提交版本。
MongoDB 6.0+ 允许如果集合已启用
changeStreamPreAndPostImages
,则返回修改文档的后像。指定 "whenAvailable" 以返回后像(如果可用)或空值(如果不可用)。指定 "required" 以返回后像(如果可用)或引发错误(如果不可用)。支持以下值
MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP
MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE
MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED
这是
$changeStream
管道阶段的选项。更改前完整文档字符串确定
fullDocumentBeforeChange
响应字段如何填充。默认情况下,该字段被省略。MongoDB 6.0+ 允许如果集合已启用
changeStreamPreAndPostImages
,则返回修改文档的前像。指定 "whenAvailable" 以返回前像(如果可用)或空值(如果不可用)。指定 "required" 以返回前像(如果可用)或引发错误(如果不可用)。支持以下值
MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE
MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED
这是
$changeStream
管道阶段的选项。新增在版本中1.13.
最大等待时间MS整数表示服务器在无数据可用时阻塞 getMore 操作的时间限制(以毫秒为单位)的正整数。
读取关注点用于操作的 读取关注点。默认为数据库的读取关注点。
读取偏好用于操作的 读取偏好。默认为数据库的读取偏好。
这用于初始更改流聚合以及自动恢复期间的自动服务器选择。
恢复后数组|对象指定新更改流的逻辑起始点。更改流返回的文档中的
_id
字段可以在这里使用。与
startAfter
和/或startAtOperationTime
一起使用此选项将导致服务器错误。这些选项是互斥的。这是
$changeStream
管道阶段的选项。会话与操作关联的客户端会话。
showExpandedEvents布尔型如果为 true,指示服务器在更改流中包含额外的 DDL 事件。可能包含的附加事件有
createIndexes
dropIndexes
modify
create
shardCollection
reshardCollection
(服务器 6.1+)refineCollectionShardKey
(服务器 6.1+)
不支持 6.0 版本之前的服务器版本,如果使用它将在执行时引发异常。
这是
$changeStream
管道阶段的选项。新增在版本中1.13.
startAfter数组|对象指定新更改流的逻辑起始点。更改流返回的文档中的
_id
字段可以在这里使用。与resumeAfter
不同,此选项可以与 "invalidate" 事件的恢复令牌一起使用。与
resumeAfter
和/或startAtOperationTime
一起使用此选项将导致服务器错误。这些选项是互斥的。不支持 4.2 版本之前的服务器版本,如果使用它将在执行时引发异常。
这是
$changeStream
管道阶段的选项。新增在版本中1.5.
startAtOperationTime如果指定,更改流将只提供在或之后发生的指定时间戳的更改。MongoDB 4.0+ 服务器的命令响应包括一个
operationTime
,可以在这里使用。默认情况下,如果可用,将使用初始aggregate
命令返回的operationTime
。与
resumeAfter
和/或startAfter
一起使用此选项将导致服务器错误。这些选项是互斥的。不支持 4.0 版本之前的服务器版本,如果使用它将在执行时引发异常。
这是
$changeStream
管道阶段的选项。typeMap数组应用于游标的 类型映射,它决定了如何将 BSON 文档转换为 PHP 值。默认为数据库的类型映射。
返回值
一个 MongoDB\ChangeStream
对象,该对象允许通过 迭代器 接口来遍历更改流中的事件。
错误/异常
如果服务器命令响应格式错误,将抛出 MongoDB\Exception\UnexpectedValueException
。
如果选项被使用且所选服务器不支持这些选项(例如 collation
,readConcern
,writeConcern
),将抛出 MongoDB\Exception\UnsupportedException
。
如果与参数或选项的解析相关的错误,将抛出 MongoDB\Exception\InvalidArgumentException
。
MongoDB\Driver\Exception\RuntimeException用于其他扩展级别的错误(例如连接错误)。
示例
此示例在迭代更改流时报告事件。
$uri = 'mongodb://rs1.example.com,rs2.example.com/?replicaSet=myReplicaSet'; $database = (new MongoDB\Client($uri))->test; $changeStream = $database->watch(); for ($changeStream->rewind(); true; $changeStream->next()) { if ( ! $changeStream->valid()) { continue; } $event = $changeStream->current(); if ($event['operationType'] === 'invalidate') { break; } $ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']); $id = json_encode($event['documentKey']['_id']); switch ($event['operationType']) { case 'delete': printf("Deleted document in %s with _id: %s\n\n", $ns, $id); break; case 'insert': printf("Inserted new document in %s\n", $ns); echo json_encode($event['fullDocument']), "\n\n"; break; case 'replace': printf("Replaced new document in %s with _id: %s\n", $ns, $id); echo json_encode($event['fullDocument']), "\n\n"; break; case 'update': printf("Updated document in %s with _id: %s\n", $ns, $id); echo json_encode($event['updateDescription']), "\n\n"; break; } }
假设在上述脚本迭代更改流时,文档被插入、更新和删除,则输出将类似于
Inserted new document in test.inventory {"_id":{"$oid":"5a81fc0d6118fd1af1790d32"},"name":"Widget","quantity":5} Updated document in test.inventory with _id: {"$oid":"5a81fc0d6118fd1af1790d32"} {"updatedFields":{"quantity":4},"removedFields":[]} Deleted document in test.inventory with _id: {"$oid":"5a81fc0d6118fd1af1790d32"}