文档菜单
文档首页
/ / /
Java响应式流驱动程序

示例自定义订阅者实现

本页内容

  • 响应式流
  • 订阅者
  • 自定义订阅者实现
  • 阻塞与非阻塞示例
  • 发布者、订阅者和订阅

本指南提供了关于 Java 响应式流驱动程序及其异步 API 的背景信息。指南还列出并解释了示例自定义订阅者实现。

注意

有关如何安装驱动程序的说明,请参阅入门指南。

该库是响应式流规范的一种实现。响应式流API包含以下组件

  1. 发布者

  2. 订阅者

  3. 订阅

A发布者是提供可能无界数量的有序元素的服务提供者,根据从其订阅者或多个订阅者实例接收到的需求进行发布。

响应于对Publisher.subscribe(Subscriber)的调用,Subscriber类上方法的可能调用序列由以下协议给出

onSubscribe onNext* (onError | onComplete)?

这意味着始终会发出onSubscribe信号,随后是可能无界数量的onNext信号,这些信号是按Subscriber请求发出的。之后是一个onError信号,如果发生故障,或者当没有更多元素可用时,是一个onComplete信号,只要订阅没有取消。

提示

要了解更多关于响应式流的信息,请访问响应式流文档。

Java 反应式流驱动 API 与 Java 同步驱动 API 相似,任何导致网络 I/O 的方法都返回一个 Publisher<T> 类型,其中 T 是操作响应的类型。

注意

API 返回的所有 Publisher 类型都是 ,这意味着在它们被订阅之前不会发生任何事情。因此,仅仅创建一个 Publisher 不会导致任何网络 I/O。只有当调用 Publisher.subscribe() 方法时,驱动才会执行操作。

此实现中的发布者是 单播。每个对 PublisherSubscription 都与一个 MongoDB 操作相关联,并且 Publisher 实例的 Subscriber 会接收到自己特定的结果集。

在Java反应式流文档中,我们实现了不同的Subscriber类型。尽管这是一个为反应式流构建的假设场景,但我们确实在开始下一个示例之前阻塞了一个示例的结果,以确保数据库的状态。要查看所有自定义订阅者实现的源代码,请参阅驱动程序源代码中的SubscriberHelpers.javaSubscriberHelpers.java

  • ObservableSubscriber
    基本订阅者类是ObservableSubscriber<T>,这是一个存储Publisher<T>结果的Subscriber。它还包含一个await()方法,这样我们就可以阻塞结果以确保在继续到下一个示例之前数据库的状态。
  • OperationSubscriber
    这是ObservableSubscriber的一个实现,当它被订阅时立即调用Subscription.request()
  • PrintSubscriber
    这是OperationSubscriber的一个实现,当调用Subscriber.onComplete()方法时打印一条消息。
  • ConsumerSubscriber
    这是OperationSubscriber的一个实现,它接受一个Consumer,并在调用Subscriber.onNext(T result)时调用Consumer.accept(result)
  • PrintToStringSubscriber
    这是ConsumerSubscriber的一个实现,当调用Subscriber.onNext()方法时,打印结果字符串的版本。
  • PrintDocumentSubscriber
    当调用Subscriber.onNext()方法时,该实现将打印出Document类型的JSON版本。

由于我们的Subscriber类型包含一个只有在调用Subscriber.onComplete()方法时才会释放的闩锁,我们可以通过调用await方法来使用这个闩锁来阻塞进一步的操作。以下两个示例都使用了我们的自动请求数据的PrintDocumentSubscriber

第一个是非阻塞的,第二个是阻塞的,等待Publisher完成

// Create a publisher
Publisher<Document> publisher = collection.find();
// Non-blocking
publisher.subscribe(new PrintDocumentSubscriber());
Subscriber<Document> subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber);
subscriber.await(); // Block for the publisher to complete

一般来说,PublisherSubscriberSubscription类型构成了一个低级API,预计用户和库将基于它们构建更具有表现力的API,而不仅仅是使用这些接口。作为一个仅实现这些接口的库,用户将受益于这个不断发展的生态系统,这是响应式流的核心设计原则。

返回

POJO CRUD 示例