文档菜单
文档首页
/ / /
Java Reactive Streams 驱动

将数据写入 MongoDB

本页内容

  • 概述
  • 插入一个
  • 插入多个
  • 更新一个
  • 更新多个
  • 替换一个
  • 删除一个
  • 删除多个
  • 批量写入

本页包含可复制的代码示例,展示如何使用 Java Reactive Streams 驱动程序方法将数据写入 MongoDB。

提示

要了解更多关于本页面上显示的任何方法的信息,请参阅每个部分提供的链接。

要使用本页上的示例,请将代码示例复制到示例应用程序或您自己的应用程序中。确保用您 MongoDB 部署的相关值替换代码示例中的所有占位符,例如<连接字符串>

本指南使用 Project Reactor 库来消费 Java Reactive Streams 驱动程序方法返回的 Publisher 实例。要了解更多关于 Project Reactor 库以及如何使用它,请参阅 Reactor 文档中的入门

还可以使用其他方式消费 Publisher 实例。您可以使用许多替代库中的一个,例如 RxJava,或者直接调用 Publisher.subscribe() 并传递自己的 Subscriber 实现方式。

本指南使用 Reactor 的 Mono.block() 方法订阅 Publisher 并阻塞当前线程,直到 Publisher 达到其终端状态。要了解更多关于反应流倡议的信息,请参阅 反应流。

重要

返回的发布者(Publishers)是冷(Cold)

Java 反应流驱动程序方法返回的所有 Publisher 实例都是冷的,这意味着除非您订阅返回的 Publisher,否则对应的操作不会发生。我们建议只订阅返回的 Publisher 一次,因为多次订阅可能导致错误。

您可以使用以下示例应用程序来测试本页上的代码示例。要使用示例应用程序,请执行以下步骤

  1. 在您的 IDE 中创建一个新的 Java 项目。

  2. 在您的 Java 项目中安装 Java 反应流驱动程序。

  3. 在您的Java项目中安装Project Reactor库

  4. 复制以下代码,并将其粘贴到名为WriteOperations.java的新Java文件中。

  5. 从本页复制代码示例,并将其粘贴到文件中的指定行。

1import com.mongodb.MongoException;
2import com.mongodb.ConnectionString;
3import com.mongodb.MongoClientSettings;
4import com.mongodb.ServerApi;
5import com.mongodb.ServerApiVersion;
6import com.mongodb.bulk.BulkWriteResult;
7
8import com.mongodb.client.model.DeleteOneModel;
9import com.mongodb.client.model.InsertOneModel;
10import com.mongodb.client.model.ReplaceOneModel;
11import com.mongodb.client.model.UpdateOneModel;
12import com.mongodb.client.model.DeleteOptions;
13import com.mongodb.client.model.InsertManyOptions;
14import com.mongodb.client.model.InsertOneOptions;
15import com.mongodb.client.model.UpdateOptions;
16import com.mongodb.client.model.Updates;
17import com.mongodb.client.result.UpdateResult;
18import com.mongodb.client.result.DeleteResult;
19import com.mongodb.client.result.InsertManyResult;
20import com.mongodb.client.result.InsertOneResult;
21import com.mongodb.reactivestreams.client.MongoCollection;
22
23import org.bson.Document;
24
25import com.mongodb.reactivestreams.client.MongoClient;
26import com.mongodb.reactivestreams.client.MongoClients;
27import com.mongodb.reactivestreams.client.MongoDatabase;
28import reactor.core.publisher.Mono;
29
30import java.util.ArrayList;
31import java.util.Arrays;
32import java.util.List;
33
34import static com.mongodb.client.model.Filters.eq;
35import static com.mongodb.client.model.Updates.set;
36
37class WriteOperations {
38 public static void main(String[] args) throws InterruptedException {
39 // Replace the placeholder with your Atlas connection string
40 String uri = "<connection string>";
41
42 // Construct a ServerApi instance using the ServerApi.builder() method
43 ServerApi serverApi = ServerApi.builder()
44 .version(ServerApiVersion.V1)
45 .build();
46
47 MongoClientSettings settings = MongoClientSettings.builder()
48 .applyConnectionString(new ConnectionString(uri))
49 .serverApi(serverApi)
50 .build();
51
52 // Create a new client and connect to the server
53 try (MongoClient mongoClient = MongoClients.create(settings)) {
54 MongoDatabase database = mongoClient.getDatabase("<database name>");
55 MongoCollection<Document> collection = database.getCollection("<collection name>");
56 // Start example code here
57
58 // End example code here
59 }
60 }
61}
Document document = new Document("<field name>", "<value>");
Publisher<InsertOneResult> insertOnePublisher = collection.insertOne(document);
InsertOneResult result = Mono.from(insertOnePublisher).block();
System.out.printf("Inserted 1 document with ID %s.",
result.getInsertedId());

要了解有关insertOne()方法的更多信息,请参阅插入文档指南。

Document doc1 = new Document("<field name>", "<value>");
Document doc2 = new Document("<field name>", "<value>");
List<Document> documents = Arrays.asList(doc1, doc2);
Publisher<InsertManyResult> insertManyPublisher = collection.insertMany(documents);
InsertManyResult result = Mono.from(insertManyPublisher).block();
System.out.printf("Inserted documents with IDs %s.",
result.getInsertedIds());

要了解有关insertMany()方法的更多信息,请参阅插入文档指南。

Publisher<UpdateResult> updateOnePublisher = collection.updateOne(
eq("<field name>", "<value>"),
set("<field name>", "<new value>"));
UpdateResult result = Mono.from(updateOnePublisher).block();
System.out.printf("Updated %s document.",
result.getModifiedCount());

要了解更多关于 updateOne() 方法的知识,请参阅更新文档指南。

Publisher<UpdateResult> updateManyPublisher = collection.updateMany(
eq("<field name>", "<value>"),
set("<field name>", "<new value>"));
UpdateResult result = Mono.from(updateManyPublisher).block();
System.out.printf("Updated %s documents.",
result.getModifiedCount());

要了解更多关于 updateMany() 方法的知识,请参阅更新文档指南。

Publisher<UpdateResult> replaceOnePublisher = collection.replaceOne(
eq("<field name>", "<value>"),
new Document().append("<field name>", "<new value>")
.append("<new field name>", "<new value>"));
UpdateResult result = Mono.from(replaceOnePublisher).block();
System.out.printf("Replaced %s document.",
result.getModifiedCount());

要了解更多关于 replaceOne() 方法的知识,请参阅替换文档指南。

Publisher<DeleteResult> deleteOnePublisher = collection.deleteOne(
eq("<field name>", "<value>"));
DeleteResult result = Mono.from(deleteOnePublisher).block();
System.out.printf("Deleted %s document.", result.getDeletedCount());

了解更多关于 deleteOne() 方法的信息,请参阅删除文档指南。

Publisher<DeleteResult> deleteManyPublisher = collection.deleteMany(
eq("<field name>", "<value>"));
DeleteResult result = Mono.from(deleteManyPublisher).block();
System.out.printf("Deleted %s documents.", result.getDeletedCount());

了解更多关于 deleteMany() 方法的信息,请参阅删除文档指南。

Publisher<BulkWriteResult> bulkWritePublisher = collection.bulkWrite(
Arrays.asList(new InsertOneModel<>(
new Document("<field name>", "<value>")),
new InsertOneModel<>(new Document("<field name>", "<value>")),
new UpdateOneModel<>(eq("<field name>", "<value>"),
set("<field name>", "<new value>")),
new DeleteOneModel<>(eq("<field name>", "<value>")),
new ReplaceOneModel<>(eq("<field name>", "<value>"),
new Document("<field name>", "<new value>")
.append("<new field name>", "<new value>"))));
BulkWriteResult bulkResult = Mono.from(bulkWritePublisher).block();
System.out.printf("Modified %s documents and deleted %s documents.",
bulkResult.getModifiedCount(), bulkResult.getDeletedCount());

返回

数据库与集合