文档菜单
文档首页
/ / /
Scala
/

Observables

在本页

  • Observable
  • SingleObservable
  • 订阅
  • 观察者
  • 背压
  • Observable 辅助函数
  • SingleObservable

Scala 驱动是一个异步和非阻塞的驱动。通过实现Observable 模型,异步事件成为简单、可组合的操作,摆脱了嵌套回调的复杂性。

对于异步操作,有三个接口

  • Observable

  • 订阅

  • 观察者

注意

该驱动程序基于 MongoDB Reactive Streams 驱动程序 并实现了反应流规范。《code class="leafygreen-ui-1l06pbn">Observable 是 Publisher 的实现,而 ObserverSubscriber 的实现。

以下类命名约定适用

  • Observable:自定义的 Publisher 实现

  • Observer:自定义的 Subscriber 实现

  • 订阅

可观察对象是扩展的 发布者 实现,它通常表示一个MongoDB操作,根据对 可观察对象订阅 请求向 观察者 发出其结果。

重要

可观察对象 可以被视为一个部分函数。与部分函数一样,直到它们被调用之前不会发生任何事情。可观察对象可以被多次订阅,每次订阅都可能产生新的副作用,例如查询MongoDB或插入数据。

SingleObservable 特征是一个只返回单个元素的 Publisher 实现。它可以像普通 Observable 一样使用。

Subscription 表示一个 Observer 订阅 Observable 的一对一生命周期。一个 ObservableSubscription 只能由一个 Observer 使用。Subscription 的目的是控制需求并允许从 Observable 中取消订阅。

Observer 提供了从 Observable 接收基于推送通知的机制。这些事件的请求通过其 Subscription 来表示。

订阅到Observable[TResult]后,Observer将通过onSubscribe(subscription: Subscription)方法接收到Subscription对象。通过Subscription对象来表示对结果的请求,并将任何结果传递给onNext(result: TResult)方法。如果因任何原因发生错误,将调用onError(e: Throwable)方法,并且不再向Observer传递事件。或者,当Observer消耗了Observable中的所有结果后,将调用onComplete()方法。

在下面的示例中,使用Subscription来控制迭代Observable时的需求。默认的Observer实现自动请求所有数据。以下我们重写了onSubscribe()方法,以便我们可以管理Observable的需求驱动迭代。

collection.find().subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(document.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})

org.mongodb.scala包提供了与Publisher类型更好的交互。扩展的功能包括通过匿名函数进行简单的订阅。

// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))
// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"),
() => println("Completed!"))

org.mongodb.scala包包括一个隐式类,还提供了以下Monadic运算符,使链式操作和使用PublisherObservable实例变得更加简单。

GenerateHtmlObservable().andThen({
case Success(html: String) => renderHtml(html)
case Failure(t) => renderHttp500
})

以下列表描述了可用的Monadic运算符。

  • andThen:允许链式操作Observable实例。

  • collect:将所有结果收集到一个序列中。

  • fallbackTo:如果失败,允许回退到另一个Observable

  • filter:过滤Observable的结果。

  • flatMap:通过将函数应用于Observable的每个结果来创建一个新的Observable

  • foldLeft:创建一个包含应用累加器函数的单一结果的Observable

  • foreach:将函数应用于每个发出的结果。

  • head:在Future中返回Observable的头部。

  • map:通过将函数应用于Observable的每个发出的结果来创建一个新的Observable

  • observeOn:创建一个新的Observable,该Observable使用特定的ExecutionContext进行未来的操作。

  • recover:创建一个新的Observable,它将通过分配另一个Observable的值来处理此Observable可能包含的任何匹配的throwable。

  • recoverWith:创建一个新的Observable,它将处理此Observable可能包含的任何匹配的throwable。

  • toFuture:收集Observable的结果并将它们转换为Future

  • transform:通过将resultFunction函数应用于每个发出的结果来创建一个新的Observable

  • withFilter:为Observable实例提供for-comprehensions支持。

  • zip:将此Observable和另一个Observable的值组合起来,并创建一个包含它们的结果的元组的新Observable

查看BoxedPublisher API文档,了解更多关于每个操作符的信息。

由于 SingleObservable[T] 返回单个项目,因此 toFuture() 方法会像 head 方法一样返回一个 Future[T]。还有一个隐式转换器,可以将 Publisher 转换为 SingleObservable

返回

监控