变更流
从 MongoDB 服务器 3.6 版本开始,聚合框架支持新的$changeStream
管道阶段。在聚合管道中首先指定此阶段允许用户请求发送针对特定集合的所有更改的通知。从 MongoDB 4.0 版本开始,变更流支持数据库和集群,而不仅仅是集合。
Ruby 驱动提供了一个 API,用于通过这个新的管道阶段接收特定集合、数据库或集群变更的通知。虽然您可以直接使用管道操作符和聚合框架创建变更流,但建议使用以下驱动程序 API,因为驱动程序在超时、网络错误、服务器错误(表示正在发生故障转移)或其他可恢复错误时,会一次性恢复变更流。
服务器上的变更流需要“大多数”读取关注点或无读取关注点。
由于文档中记录的问题,变更流与 JRuby 不兼容。在此处。具体来说,JRuby 在后台绿色线程中急切地评估了 Enumerator 中的 #next
,因此在对变更流调用 #next
时,会在后台循环调用 getMores。
在集合上监听变更
通过在集合上调用 #watch
方法创建集合变更流。
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') collection = client[:test] stream = collection.watch collection.insert_one(a: 1) doc = stream.to_enum.next process(doc)
您也可以在可用时接收通知。
stream = collection.watch enum = stream.to_enum while doc = enum.next process(doc) end
《next》方法会阻塞并轮询集群,直到有变化可用。使用《try_next》方法迭代变化流而不阻塞;该方法将等待最多max_await_time_ms毫秒的服务器变化,如果没有收到变化,它将返回nil。如果存在非可恢复错误,则《next》和《try_next》都会抛出异常。请参阅下文中的“恢复变化流”部分,以读取集合变化的示例。
变化流可以接受聚合框架管道操作符格式中的过滤器
stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } }, {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } } ]) enum = stream.to_enum while doc = enum.next process(doc) end
在数据库上监视变化
数据库变化流会在数据库中任何集合上通知变化,以及数据库级别的事件,如数据库被删除。
通过在数据库对象上调用《#watch》方法创建数据库变化流
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') database = client.database stream = database.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
在集群上监视变化
集群变化流会在集群中的任何集合上通知变化,以及在集群级别的任何事件。
通过在客户端对象上调用《#watch》方法创建集群变化流(而不是集群对象)
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') stream = client.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc)
关闭更改流
您可以通过调用其 #close
方法来关闭更改流
stream.close
恢复更改流
更改流由两种类型的操作组成:初始聚合和 getMore
请求以接收下一批更改。
在出现网络错误以及服务器返回表示其状态已更改的错误(例如,它不再是主节点)时,驱动程序将自动重试每个 getMore
操作一次。驱动程序不会重试初始聚合。
在实际情况中,这意味着例如
如果集群没有足够的可用节点来满足
"majority"
读取优先级,调用collection.watch
将失败。一旦
collection.watch
成功返回,如果集群随后发生选举或丢失节点,但恢复得足够快,则通过next
或each
方法进行更改流读取将透明地继续到应用程序。
为了无限期且可靠地监视更改而不会丢失任何更改或处理更改超过一次,应用程序必须跟踪更改流的恢复令牌,并在遇到导致驱动程序的自动恢复也失败的长时间错误条件时重新启动更改流。以下代码片段显示了无限期迭代更改流的示例,使用更改流的 resume_token
方法检索恢复令牌,并使用所有 MongoDB 或网络错误上的 :resume_after
选项重新启动更改流
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum while doc = enum.next process(doc) token = stream.resume_token end rescue Mongo::Error sleep 1 end end
上述迭代在 enum.next
调用处阻塞,并且当运行此代码的 Ruby 进程终止时,不允许恢复处理。驱动程序还提供了 try_next
方法,在更改流中没有变化时,该方法会在短暂等待后返回 nil
,而不是无限期地阻塞。使用 try_next
方法,即使在某个特定请求没有返回任何更改的情况下,也可能会在每个 getMore
请求后持久化恢复令牌,这样恢复令牌就会保持在 oplog 的顶部,并且当处理更改的过程终止时,应用程序有机会将其持久化。
token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum doc = enum.try_next if doc process(doc) end token = stream.resume_token # Persist +token+ to support resuming processing upon process restart rescue Mongo::Error sleep 1 end end
请注意,即使在调用返回了文档的情况下,也应该在每次 try_next
调用后从更改流中检索恢复令牌。
恢复令牌还包含在每个更改流文档的 _id
字段中。不建议读取 _id
字段,因为它可能被应用程序投影出去,并且仅使用 _id
字段无法在 getMore
返回无文档时前进恢复令牌。