Observables
Scala 驱动是一个异步和非阻塞的驱动。通过实现Observable 模型,异步事件成为简单、可组合的操作,摆脱了嵌套回调的复杂性。
对于异步操作,有三个接口
注意
该驱动程序基于 MongoDB Reactive Streams 驱动程序 并实现了反应流规范。《code class="leafygreen-ui-1l06pbn">Observable 是 Publisher 的实现,而 Observer 是 Subscriber 的实现。
以下类命名约定适用
Observable:自定义的Publisher实现Observer:自定义的Subscriber实现订阅
可观察对象
可观察对象是扩展的 发布者 实现,它通常表示一个MongoDB操作,根据对 可观察对象 的 订阅 请求向 观察者 发出其结果。
重要
可观察对象 可以被视为一个部分函数。与部分函数一样,直到它们被调用之前不会发生任何事情。可观察对象可以被多次订阅,每次订阅都可能产生新的副作用,例如查询MongoDB或插入数据。
SingleObservable
该 SingleObservable 特征是一个只返回单个元素的 Publisher 实现。它可以像普通 Observable 一样使用。
订阅
Subscription 表示一个 Observer 订阅 Observable 的一对一生命周期。一个 Observable 的 Subscription 只能由一个 Observer 使用。Subscription 的目的是控制需求并允许从 Observable 中取消订阅。
观察者
Observer 提供了从 Observable 接收基于推送通知的机制。这些事件的请求通过其 Subscription 来表示。
订阅到Observable[TResult]后,Observer将通过onSubscribe(subscription: Subscription)方法接收到Subscription对象。通过Subscription对象来表示对结果的请求,并将任何结果传递给onNext(result: TResult)方法。如果因任何原因发生错误,将调用onError(e: Throwable)方法,并且不再向Observer传递事件。或者,当Observer消耗了Observable中的所有结果后,将调用onComplete()方法。
背压
在下面的示例中,使用Subscription来控制迭代Observable时的需求。默认的Observer实现自动请求所有数据。以下我们重写了onSubscribe()方法,以便我们可以管理Observable的需求驱动迭代。
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
Observable助手
org.mongodb.scala包提供了与Publisher类型更好的交互。扩展的功能包括通过匿名函数进行简单的订阅。
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
org.mongodb.scala包包括一个隐式类,还提供了以下Monadic运算符,使链式操作和使用Publisher或Observable实例变得更加简单。
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
以下列表描述了可用的Monadic运算符。
andThen:允许链式操作Observable实例。collect:将所有结果收集到一个序列中。fallbackTo:如果失败,允许回退到另一个Observable。filter:过滤Observable的结果。flatMap:通过将函数应用于Observable的每个结果来创建一个新的Observable。foldLeft:创建一个包含应用累加器函数的单一结果的Observable。foreach:将函数应用于每个发出的结果。head:在Future中返回Observable的头部。map:通过将函数应用于Observable的每个发出的结果来创建一个新的Observable。observeOn:创建一个新的Observable,该Observable使用特定的ExecutionContext进行未来的操作。recover:创建一个新的Observable,它将通过分配另一个Observable的值来处理此Observable可能包含的任何匹配的throwable。recoverWith:创建一个新的Observable,它将处理此Observable可能包含的任何匹配的throwable。toFuture:收集Observable的结果并将它们转换为Future。transform:通过将resultFunction函数应用于每个发出的结果来创建一个新的Observable。withFilter:为Observable实例提供for-comprehensions支持。zip:将此Observable和另一个Observable的值组合起来,并创建一个包含它们的结果的元组的新Observable。
查看BoxedPublisher API文档,了解更多关于每个操作符的信息。
SingleObservable
由于 SingleObservable[T] 返回单个项目,因此 toFuture() 方法会像 head 方法一样返回一个 Future[T]。还有一个隐式转换器,可以将 Publisher 转换为 SingleObservable。