文档菜单
文档首页
/ / /
Scala
/

基础教程

在本页

  • 响应式流
  • 可观察对象
  • 背压
  • 快速入门中使用到的助手

本指南在展示如何使用驱动器和MongoDB的快速入门指南之前,提供了关于Scala驱动器和其异步API的背景信息。快速入门指南.

注意

请参阅安装指南了解如何安装驱动器的说明。

Scala驱动器建立在MongoDB Java Reactive Streams驱动器之上。响应式流API由以下组件组成

  1. Observable:一个自定义的Publisher实现。

  2. Observer:一个自定义的Subscriber实现。

  3. 订阅

Observable是一个提供可能无界数量的序列化元素的提供者,这些元素根据其从Observer或多个Observer实例接收到的需求进行发布。

响应对Observable.subscribe(Observer)的调用,Observer类上方法的可能调用序列如下协议所定义:

onSubscribe onNext* (onError | onComplete)?

这意味着总是先发出onSubscribe()信号,然后是可能无界的onNext()信号,由Observer请求。之后可能是一个onError()信号(如果发生失败),或者在没有更多元素可用时,一个onComplete()信号,只要没有取消Subscription

提示

要了解更多关于响应式流的资料,请访问响应式流文档。

Scala 驱动 API 与Java 同步驱动 API 相似,并且任何导致网络 I/O 的方法都会返回 Observable<T> 类型,其中 T 是操作响应的类型。

注意

API 返回的所有 Observable 类型都是 ,这意味着只有在订阅后才会发生任何操作。因此,仅创建一个 Observable 不会引起任何网络 I/O。只有在调用 Subscription.request() 方法时,驱动程序才会执行操作。

在此实现中,发布者都是 单播。每个对 ObservableSubscription 都与单个 MongoDB 操作相关联,并且 Observable 实例的 Observer 会接收到它自己特定的结果集。

默认情况下,Observer 特性会在 Observable 订阅后立即请求所有结果。请确保 Observer 可以处理 Observable 的所有结果。自定义 Observer.onSubscribe() 方法的实现可以保存 Subscription,以便仅在 Observer 有容量时请求数据。

在快速入门中,我们实现了在驱动程序源 GitHub 仓库中的 Helpers.scala 文件中定义的自定义隐式辅助工具。这些辅助工具检索并打印结果。虽然快速入门是一个用于异步代码的人工场景,但示例块在一个示例的结果上阻塞,然后再开始下一个,以确保数据库的状态。《code class="leafygreen-ui-1l06pbn">辅助 对象提供了以下方法

  • results():阻塞直到 Observable 完成,并返回收集到的结果

  • headResult():阻塞直到可以返回 Observable 的第一个结果

  • printResults():阻塞直到 Observable 完成,并打印出每个结果

  • printHeadResult():阻塞直到Observable的第一个结果可用,然后打印它

返回

入门