文档菜单
文档首页
/ / /
Java响应式流驱动程序

从 MongoDB 读取数据

本页内容

  • 概述
  • 项目 Reactor 实现
  • 示例应用程序
  • 查找一个
  • 查找多个
  • 统计集合中的文档数量
  • 统计查询返回的文档数量
  • 估计文档数量
  • 检索不同值
  • 监控数据变更

本页面包含可复制的代码示例,展示了 Java 响应式流驱动程序方法,您可以使用这些方法从 MongoDB 读取数据。

提示

要了解更多关于本页面上显示的任何方法,请参阅每个部分中提供的链接。

要从本页面上使用示例,请将代码示例复制到示例应用程序或您自己的应用程序中。确保用您MongoDB部署的相关值替换代码示例中的所有占位符,例如<连接字符串>

本指南使用Project Reactor库来消费由Java反应式流驱动程序方法返回的Publisher实例。要了解有关Project Reactor库以及如何使用它的更多信息,请参阅Reactor文档中的入门

还有其他方法可以消费Publisher实例。您可以使用许多替代库之一,例如RxJava,或者直接调用Publisher.subscribe()并传递自己的Subscriber实现。

本指南中的示例使用了来自Reactor的Flux.blockLast()方法来订阅一个Publisher并阻塞当前线程,直到Publisher达到其终端状态。有关Reactive Streams倡议的更多信息,请参阅Reactive Streams。

重要

返回的发布者(Publisher)是冷启动的

Java Reactive Streams驱动方法返回的所有Publisher实例都是冷启动的,这意味着除非您订阅返回的Publisher,否则相应的操作不会发生。我们建议只订阅一次返回的Publisher,因为多次订阅可能会导致错误。

您可以使用以下示例应用程序来测试本页上的代码示例。要使用示例应用程序,请执行以下步骤

  1. 在您的IDE中创建一个新的Java项目。

  2. 在您的Java项目中安装Java Reactive Streams驱动程序。

  3. 在您的Java项目中安装Project Reactor库

  4. 复制以下代码,并将其粘贴到名为 ReadOperations.java 的新 Java 文件中。

  5. 从本页面复制一个代码示例,并将其粘贴到文件中的指定行。

1import com.mongodb.MongoException;
2import com.mongodb.ConnectionString;
3import com.mongodb.MongoClientSettings;
4import com.mongodb.ServerApi;
5import com.mongodb.ServerApiVersion;
6
7import com.mongodb.reactivestreams.client.MongoCollection;
8
9import org.bson.Document;
10
11import com.mongodb.reactivestreams.client.MongoClient;
12import com.mongodb.reactivestreams.client.MongoClients;
13import com.mongodb.reactivestreams.client.MongoDatabase;
14import com.mongodb.reactivestreams.client.FindPublisher;
15import com.mongodb.reactivestreams.client.DistinctPublisher;
16import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
17import reactor.core.publisher.Flux;
18
19import java.util.ArrayList;
20import java.util.Arrays;
21import java.util.List;
22
23import static com.mongodb.client.model.Filters.eq;
24
25class ReadOperations {
26 public static void main(String[] args) throws InterruptedException {
27 // Replace the placeholder with your Atlas connection string
28 String uri = "<connection string>";
29
30 // Construct a ServerApi instance using the ServerApi.builder() method
31 ServerApi serverApi = ServerApi.builder()
32 .version(ServerApiVersion.V1)
33 .build();
34
35 MongoClientSettings settings = MongoClientSettings.builder()
36 .applyConnectionString(new ConnectionString(uri))
37 .serverApi(serverApi)
38 .build();
39
40 // Create a new client and connect to the server
41 try (MongoClient mongoClient = MongoClients.create(settings)) {
42 MongoDatabase database = mongoClient.getDatabase("<database name>");
43 MongoCollection<Document> collection = database.getCollection("<collection name>");
44
45 // Start example code here
46
47 // End example code here
48 }
49 }
50}

以下示例检索与给定过滤器指定的标准匹配的文档

FindPublisher<Document> findDocPublisher = collection
.find(eq("<field name>", "<value>")).first();
Flux.from(findDocPublisher)
.doOnNext(System.out::println)
.blockLast();

要了解更多关于 find().first() 构造的信息,请参阅检索数据指南。

以下示例检索所有与给定过滤器指定的标准匹配的文档

FindPublisher<Document> findDocPublisher = collection
.find(eq("<field name>", "<value>"));
Flux.from(findDocPublisher)
.doOnNext(System.out::println)
.blockLast();

要了解更多关于 find() 方法的信息,请参阅 检索数据指南。

以下示例返回指定集合中的文档数量

Publisher<Long> countPublisher = collection.countDocuments();
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

要了解更多关于 countDocuments() 方法的知识,请参阅 计数文档 指南。

以下示例返回指定集合中符合给定过滤器条件的文档数量

Publisher<Long> countPublisher = collection.countDocuments(
eq("<field name>", "<value>"));
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

要了解更多关于 countDocuments() 方法的知识,请参阅 计数文档 指南。

以下示例根据集合元数据返回指定集合中的近似文档数量

Publisher<Long> countPublisher = collection.estimatedDocumentCount();
Flux.from(countPublisher)
.doOnNext(System.out::println)
.blockLast();

要了解更多关于 estimatedDocumentCount() 方法的知识,请参阅 计数文档 指南。

以下示例返回给定集合中指定字段名的所有唯一值

DistinctPublisher<String> distinctPublisher = collection.distinct(
"<field name>", <type>.class);
Flux.from(distinctPublisher)
.doOnNext(System.out::println)
.blockLast();

要了解更多关于 distinct() 方法的知识,请参阅 检索唯一字段值 指南。

以下示例创建了一个给定集合的更改流,并打印出该集合中后续的更改事件

ChangeStreamPublisher<Document> changePublisher = collection.watch();
Flux.from(changePublisher)
.doOnNext(System.out::println)
.blockLast();

要了解更多关于 watch() 方法的知识,请参阅 监控数据变化 指南。

返回

运行数据库命令