事务
概述
在本指南中,您可以学习如何使用Java反应式流驱动程序执行事务。事务允许您运行一系列操作,直到所有数据更改都成功应用之前不会生效。如果事务中的任何操作失败,驱动程序将取消事务并丢弃所有数据更改,而不会使其在任何时候可见。
在MongoDB中,事务在逻辑会话中运行。会话是您打算依次运行的有关读取或写入操作的分组。使用会话,您可以因果一致性为一组操作启用,并运行ACID事务。MongoDB保证事务操作涉及的数据保持一致性,即使在操作遇到意外错误的情况下也是如此。
当使用Java反应式流驱动程序时,您可以从MongoClient
实例创建一个新的会话,类型为ClientSession
。我们建议您重用客户端以进行多个会话和事务,而不是每次都实例化一个新的客户端。
警告
仅使用创建它的MongoClient
(或相关的MongoDatabase
或MongoCollection
)与ClientSession
一起使用。使用与不同MongoClient
一起使用的ClientSession
会导致操作错误。
示例数据
本指南中的示例使用来自Atlas示例数据集的sample_restaurants.restaurants
和sample_mflix.movies
集合。有关如何创建免费的MongoDB Atlas集群并加载数据集的说明,请参阅入门.
重要
项目反应器库
本指南使用 Project Reactor 库来消费由 Java Reactive Streams 驱动方法返回的 Publisher
实例。要了解更多关于 Project Reactor 库及其使用方法,请参阅 Reactor 文档中的入门指南。要了解本指南中如何使用 Project Reactor 库的方法,请参阅将数据写入 MongoDB指南。
事务方法
通过在您的 MongoClient
实例上调用 startSession()
方法来创建一个 ClientSession
。然后,您可以使用 ClientSession
提供的方法来修改会话状态。以下表格详细说明了您可以用于管理事务的方法
方法 | 描述 |
---|---|
startTransaction() | 在当前会话上启动一个新的事务,配置了给定的选项。如果会话中已有正在进行的交易,则抛出异常。要了解更多关于此方法,请参阅 MongoDB 服务器手册中的startTransaction() 页面。 |
abortTransaction() | 结束当前会话的活跃事务。如果会话没有活跃事务,或者事务已经提交或结束,则会抛出异常。要了解更多关于此方法的信息,请参阅 MongoDB 服务器手册中的abortTransaction() 页面。 |
commitTransaction() | 提交当前会话的活跃事务。如果会话没有活跃事务或事务已被结束,则会抛出异常。要了解更多关于此方法的信息,请参阅 MongoDB 服务器手册中的commitTransaction() 页面。 |
事务示例
以下示例演示了如何创建会话、创建事务以及在单个事务中将文档插入到多个集合中。代码执行以下步骤
使用
startSession()
方法从客户端创建会话使用
startTransaction()
方法启动事务将文档插入到
restaurants
和movies
集合中使用
commitTransaction()
方法提交事务
MongoClient mongoClient = MongoClients.create(settings); MongoDatabase restaurantsDatabase = mongoClient.getDatabase("sample_restaurants"); MongoCollection<Document> restaurants = restaurantsDatabase.getCollection("restaurants"); MongoDatabase moviesDatabase = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> movies = moviesDatabase.getCollection("movies"); Mono.from(mongoClient.startSession()) .flatMap(session -> { // Begins the transaction session.startTransaction(); // Inserts documents in the given order return Mono.from(restaurants.insertOne(session, new Document("name", "Reactive Streams Pizza").append("cuisine", "Pizza"))) .then(Mono.from(movies.insertOne(session, new Document("title", "Java: Into the Streams").append("type", "Movie")))) // Commits the transaction .flatMap(result -> Mono.from(session.commitTransaction()) .thenReturn(result)) .onErrorResume(error -> Mono.from(session.abortTransaction()).then(Mono.error(error))) .doFinally(signalType -> session.close()); }) // Closes the client after the transaction completes .doFinally(signalType -> mongoClient.close()) // Prints the results of the transaction .subscribe( result -> System.out.println("Transaction succeeded"), error -> System.err.println("Transaction failed: " + error) );
更多信息
要了解更多关于本指南中提到的概念,请参阅服务器手册中的以下页面
API 文档
要了解本指南中讨论的任何类型或方法,请参阅以下API文档