从 MongoDB 读取数据
概述
本页面包含可复制的代码示例,展示了 Java 响应式流驱动程序方法,您可以使用这些方法从 MongoDB 读取数据。
提示
要了解更多关于本页面上显示的任何方法,请参阅每个部分中提供的链接。
要从本页面上使用示例,请将代码示例复制到示例应用程序或您自己的应用程序中。确保用您MongoDB部署的相关值替换代码示例中的所有占位符,例如<连接字符串>
。
项目Reactor实现
本指南使用Project Reactor库来消费由Java反应式流驱动程序方法返回的Publisher
实例。要了解有关Project Reactor库以及如何使用它的更多信息,请参阅Reactor文档中的入门。
还有其他方法可以消费Publisher
实例。您可以使用许多替代库之一,例如RxJava,或者直接调用Publisher.subscribe()
并传递自己的Subscriber
实现。
本指南中的示例使用了来自Reactor的Flux.blockLast()
方法来订阅一个Publisher
并阻塞当前线程,直到Publisher
达到其终端状态。有关Reactive Streams倡议的更多信息,请参阅Reactive Streams。
重要
返回的发布者(Publisher)是冷启动的
Java Reactive Streams驱动方法返回的所有Publisher
实例都是冷启动的,这意味着除非您订阅返回的Publisher
,否则相应的操作不会发生。我们建议只订阅一次返回的Publisher
,因为多次订阅可能会导致错误。
示例应用程序
您可以使用以下示例应用程序来测试本页上的代码示例。要使用示例应用程序,请执行以下步骤
在您的IDE中创建一个新的Java项目。
在您的Java项目中安装Java Reactive Streams驱动程序。
在您的Java项目中安装Project Reactor库。
复制以下代码,并将其粘贴到名为
ReadOperations.java
的新 Java 文件中。从本页面复制一个代码示例,并将其粘贴到文件中的指定行。
1 import com.mongodb.MongoException; 2 import com.mongodb.ConnectionString; 3 import com.mongodb.MongoClientSettings; 4 import com.mongodb.ServerApi; 5 import com.mongodb.ServerApiVersion; 6 7 import com.mongodb.reactivestreams.client.MongoCollection; 8 9 import org.bson.Document; 10 11 import com.mongodb.reactivestreams.client.MongoClient; 12 import com.mongodb.reactivestreams.client.MongoClients; 13 import com.mongodb.reactivestreams.client.MongoDatabase; 14 import com.mongodb.reactivestreams.client.FindPublisher; 15 import com.mongodb.reactivestreams.client.DistinctPublisher; 16 import com.mongodb.reactivestreams.client.ChangeStreamPublisher; 17 import reactor.core.publisher.Flux; 18 19 import java.util.ArrayList; 20 import java.util.Arrays; 21 import java.util.List; 22 23 import static com.mongodb.client.model.Filters.eq; 24 25 class ReadOperations { 26 public static void main(String[] args) throws InterruptedException { 27 // Replace the placeholder with your Atlas connection string 28 String uri = "<connection string>"; 29 30 // Construct a ServerApi instance using the ServerApi.builder() method 31 ServerApi serverApi = ServerApi.builder() 32 .version(ServerApiVersion.V1) 33 .build(); 34 35 MongoClientSettings settings = MongoClientSettings.builder() 36 .applyConnectionString(new ConnectionString(uri)) 37 .serverApi(serverApi) 38 .build(); 39 40 // Create a new client and connect to the server 41 try (MongoClient mongoClient = MongoClients.create(settings)) { 42 MongoDatabase database = mongoClient.getDatabase("<database name>"); 43 MongoCollection<Document> collection = database.getCollection("<collection name>"); 44 45 // Start example code here 46 47 // End example code here 48 } 49 } 50 }
查找 One
以下示例检索与给定过滤器指定的标准匹配的文档
FindPublisher<Document> findDocPublisher = collection .find(eq("<field name>", "<value>")).first(); Flux.from(findDocPublisher) .doOnNext(System.out::println) .blockLast();
要了解更多关于 find().first()
构造的信息,请参阅检索数据指南。
查找 Multiple
以下示例检索所有与给定过滤器指定的标准匹配的文档
FindPublisher<Document> findDocPublisher = collection .find(eq("<field name>", "<value>")); Flux.from(findDocPublisher) .doOnNext(System.out::println) .blockLast();
要了解更多关于 find()
方法的信息,请参阅 检索数据指南。
统计集合中的文档数量
以下示例返回指定集合中的文档数量
Publisher<Long> countPublisher = collection.countDocuments(); Flux.from(countPublisher) .doOnNext(System.out::println) .blockLast();
要了解更多关于 countDocuments()
方法的知识,请参阅 计数文档 指南。
查询返回的文档计数
以下示例返回指定集合中符合给定过滤器条件的文档数量
Publisher<Long> countPublisher = collection.countDocuments( eq("<field name>", "<value>")); Flux.from(countPublisher) .doOnNext(System.out::println) .blockLast();
要了解更多关于 countDocuments()
方法的知识,请参阅 计数文档 指南。
估计文档计数
以下示例根据集合元数据返回指定集合中的近似文档数量
Publisher<Long> countPublisher = collection.estimatedDocumentCount(); Flux.from(countPublisher) .doOnNext(System.out::println) .blockLast();
要了解更多关于 estimatedDocumentCount()
方法的知识,请参阅 计数文档 指南。
检索唯一值
以下示例返回给定集合中指定字段名的所有唯一值
DistinctPublisher<String> distinctPublisher = collection.distinct( "<field name>", <type>.class); Flux.from(distinctPublisher) .doOnNext(System.out::println) .blockLast();
要了解更多关于 distinct()
方法的知识,请参阅 检索唯一字段值 指南。
监控数据变化
以下示例创建了一个给定集合的更改流,并打印出该集合中后续的更改事件
ChangeStreamPublisher<Document> changePublisher = collection.watch(); Flux.from(changePublisher) .doOnNext(System.out::println) .blockLast();
要了解更多关于 watch()
方法的知识,请参阅 监控数据变化 指南。