Observables
Scala 驱动是一个异步和非阻塞的驱动。通过实现Observable
模型,异步事件成为简单、可组合的操作,摆脱了嵌套回调的复杂性。
对于异步操作,有三个接口
注意
该驱动程序基于 MongoDB Reactive Streams 驱动程序 并实现了反应流规范。《code class="leafygreen-ui-1l06pbn">Observable 是 Publisher
的实现,而 Observer
是 Subscriber
的实现。
以下类命名约定适用
Observable
:自定义的Publisher
实现Observer
:自定义的Subscriber
实现订阅
可观察对象
可观察对象是扩展的 发布者
实现,它通常表示一个MongoDB操作,根据对 可观察对象
的 订阅
请求向 观察者
发出其结果。
重要
可观察对象
可以被视为一个部分函数。与部分函数一样,直到它们被调用之前不会发生任何事情。可观察对象可以被多次订阅,每次订阅都可能产生新的副作用,例如查询MongoDB或插入数据。
SingleObservable
该 SingleObservable 特征是一个只返回单个元素的 Publisher
实现。它可以像普通 Observable
一样使用。
订阅
Subscription
表示一个 Observer
订阅 Observable
的一对一生命周期。一个 Observable
的 Subscription
只能由一个 Observer
使用。Subscription
的目的是控制需求并允许从 Observable
中取消订阅。
观察者
Observer
提供了从 Observable
接收基于推送通知的机制。这些事件的请求通过其 Subscription
来表示。
订阅到Observable[TResult]
后,Observer
将通过onSubscribe(subscription: Subscription)
方法接收到Subscription
对象。通过Subscription
对象来表示对结果的请求,并将任何结果传递给onNext(result: TResult)
方法。如果因任何原因发生错误,将调用onError(e: Throwable)
方法,并且不再向Observer
传递事件。或者,当Observer
消耗了Observable
中的所有结果后,将调用onComplete()
方法。
背压
在下面的示例中,使用Subscription
来控制迭代Observable
时的需求。默认的Observer
实现自动请求所有数据。以下我们重写了onSubscribe()
方法,以便我们可以管理Observable
的需求驱动迭代。
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
Observable助手
org.mongodb.scala
包提供了与Publisher
类型更好的交互。扩展的功能包括通过匿名函数进行简单的订阅。
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
org.mongodb.scala
包包括一个隐式类,还提供了以下Monadic运算符,使链式操作和使用Publisher
或Observable
实例变得更加简单。
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
以下列表描述了可用的Monadic运算符。
andThen
:允许链式操作Observable
实例。collect
:将所有结果收集到一个序列中。fallbackTo
:如果失败,允许回退到另一个Observable
。filter
:过滤Observable
的结果。flatMap
:通过将函数应用于Observable
的每个结果来创建一个新的Observable
。foldLeft
:创建一个包含应用累加器函数的单一结果的Observable
。foreach
:将函数应用于每个发出的结果。head
:在Future
中返回Observable
的头部。map
:通过将函数应用于Observable
的每个发出的结果来创建一个新的Observable
。observeOn
:创建一个新的Observable
,该Observable
使用特定的ExecutionContext
进行未来的操作。recover
:创建一个新的Observable
,它将通过分配另一个Observable
的值来处理此Observable
可能包含的任何匹配的throwable。recoverWith
:创建一个新的Observable
,它将处理此Observable
可能包含的任何匹配的throwable。toFuture
:收集Observable
的结果并将它们转换为Future
。transform
:通过将resultFunction
函数应用于每个发出的结果来创建一个新的Observable
。withFilter
:为Observable
实例提供for-comprehensions支持。zip
:将此Observable
和另一个Observable
的值组合起来,并创建一个包含它们的结果的元组的新Observable
。
查看BoxedPublisher API文档,了解更多关于每个操作符的信息。
SingleObservable
由于 SingleObservable[T]
返回单个项目,因此 toFuture()
方法会像 head 方法一样返回一个 Future[T]
。还有一个隐式转换器,可以将 Publisher
转换为 SingleObservable
。