事务
在 MongoDB 中,对单个文档的操作是原子的。因为您可以使用嵌入式文档和数组来捕获单个文档结构中数据之间的关系,而不是在多个文档和集合中归一化,这种单文档原子性消除了许多实际用例中分布式事务的需求。
对于需要读取和写入多个文档原子性的情况(在一个或多个集合中),MongoDB 支持分布式事务。使用分布式事务,事务可以跨越多个操作、集合、数据库、文档和分片。
本页面的信息适用于以下环境中的部署
MongoDB Atlas:云中 MongoDB 部署的完全托管服务
MongoDB企业版:基于订阅的自管理MongoDB版本
MongoDB社区版:源代码可用、免费使用并自管理的MongoDB版本
事务API
➤使用右上角的选择语言下拉菜单设置以下示例的语言。
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
static bool with_transaction_example (bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_write_concern_t *wc = NULL; mongoc_collection_t *coll = NULL; bool success = false; bool ret = false; bson_t *doc = NULL; bson_t *insert_opts = NULL; mongoc_client_session_t *session = NULL; mongoc_transaction_opt_t *txn_opts = NULL; /* For a replica set, include the replica set name and a seedlist of the * members in the URI string; e.g. * uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \ * "27017/?replicaSet=myRepl"; * client = mongoc_client_new (uri_repl); * For a sharded cluster, connect to the mongos instances; e.g. * uri_sharded = * "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; * client = mongoc_client_new (uri_sharded); */ client = get_client (); /* Prereq: Create collections. Note Atlas connection strings include a majority write * concern by default. */ wc = mongoc_write_concern_new (); mongoc_write_concern_set_wmajority (wc, 0); insert_opts = bson_new (); mongoc_write_concern_append (wc, insert_opts); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } /* Step 1: Start a client session. */ session = mongoc_client_start_session (client, NULL /* opts */, error); if (!session) { goto fail; } /* Step 2: Optional. Define options to use for the transaction. */ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_write_concern (txn_opts, wc); /* Step 3: Use mongoc_client_session_with_transaction to start a transaction, * execute the callback, and commit (or abort on error). */ ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error); if (!ret) { goto fail; } success = true; fail: bson_destroy (doc); mongoc_collection_destroy (coll); bson_destroy (insert_opts); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); mongoc_client_session_destroy (session); mongoc_client_destroy (client); return success; } /* Define the callback that specifies the sequence of operations to perform * inside the transactions. */ static bool callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_collection_t *coll = NULL; bson_t *doc = NULL; bool success = false; bool ret = false; BSON_UNUSED (ctx); client = mongoc_client_session_get_client (session); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (1)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (999)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } success = true; fail: mongoc_collection_destroy (coll); bson_destroy (doc); return success; }
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
// The mongocxx::instance constructor and destructor initialize and shut down the driver, // respectively. Therefore, a mongocxx::instance must be created before using the driver and // must remain alive for as long as the driver is in use. mongocxx::instance inst{}; // For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. // uriString = // 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}}; // Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be // needed. The suggested Atlas connection string includes majority write concern by default. write_concern wc_majority{}; wc_majority.acknowledge_level(write_concern::level::k_majority); // Prereq: Create collections. auto foo = client["mydb1"]["foo"]; auto bar = client["mydb2"]["bar"]; try { options::insert opts; opts.write_concern(wc_majority); foo.insert_one(make_document(kvp("abc", 0)), opts); bar.insert_one(make_document(kvp("xyz", 0)), opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred while inserting: " << e.what() << std::endl; return EXIT_FAILURE; } // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transactions. client_session::with_transaction_cb callback = [&](client_session* session) { // Important:: You must pass the session to the operations. foo.insert_one(*session, make_document(kvp("abc", 1))); bar.insert_one(*session, make_document(kvp("xyz", 999))); }; // Step 2: Start a client session auto session = client.start_session(); // Step 3: Use with_transaction to start a transaction, execute the callback, // and commit (or abort on error). try { options::transaction opts; opts.write_concern(wc_majority); session.with_transaction(callback, opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS;
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"; // For a sharded cluster, connect to the mongos instances; e.g. // string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; var client = new MongoClient(connectionString); // Prereq: Create collections. var database1 = client.GetDatabase("mydb1"); var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority); collection1.InsertOne(new BsonDocument("abc", 0)); var database2 = client.GetDatabase("mydb2"); var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority); collection2.InsertOne(new BsonDocument("xyz", 0)); // Step 1: Start a client session. using (var session = client.StartSession()) { // Step 2: Optional. Define options to use for the transaction. var transactionOptions = new TransactionOptions( writeConcern: WriteConcern.WMajority); // Step 3: Define the sequence of operations to perform inside the transactions var cancellationToken = CancellationToken.None; // normally a real token would be used result = session.WithTransaction( (s, ct) => { try { collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct); collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct); } catch (MongoWriteException) { // Do something in response to the exception throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur. } return "Inserted into collections in different databases"; }, transactionOptions, cancellationToken); }
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
// WithTransactionExample is an example of using the Session.WithTransaction function. func WithTransactionExample(ctx context.Context) error { // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl" // For a sharded cluster, connect to the mongos instances; e.g. // uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/" uri := mtest.ClusterURI() clientOpts := options.Client().ApplyURI(uri) client, err := mongo.Connect(clientOpts) if err != nil { return err } defer func() { _ = client.Disconnect(ctx) }() // Prereq: Create collections. wcMajority := writeconcern.Majority() wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority) fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts) barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts) // Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction. callback := func(sesctx context.Context) (interface{}, error) { // Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the // transaction. if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil { return nil, err } if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil { return nil, err } return nil, nil } // Step 2: Start a session and run the callback using WithTransaction. session, err := client.StartSession() if err != nil { return err } defer session.EndSession(ctx) result, err := session.WithTransaction(ctx, callback) if err != nil { return err } log.Printf("result: %v\n", result) return nil }
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
/* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances. For example: String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin"; */ final MongoClient client = MongoClients.create(uri); /* Create collections. */ client.getDatabase("mydb1").getCollection("foo") .withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0)); client.getDatabase("mydb2").getCollection("bar") .withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0)); /* Step 1: Start a client session. */ final ClientSession clientSession = client.startSession(); /* Step 2: Optional. Define options to use for the transaction. */ TransactionOptions txnOptions = TransactionOptions.builder() .writeConcern(WriteConcern.MAJORITY) .build(); /* Step 3: Define the sequence of operations to perform inside the transactions. */ TransactionBody txnBody = new TransactionBody<String>() { public String execute() { MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo"); MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar"); /* Important:: You must pass the session to the operations. */ coll1.insertOne(clientSession, new Document("abc", 1)); coll2.insertOne(clientSession, new Document("xyz", 999)); return "Inserted into collections in different databases"; } }; try { /* Step 4: Use .withTransaction() to start a transaction, execute the callback, and commit (or abort on error). */ clientSession.withTransaction(txnBody, txnOptions); } catch (RuntimeException e) { // some error handling } finally { clientSession.close(); }
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = AsyncIOMotorClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. async def callback(my_session): collection_one = my_session.client.mydb1.foo collection_two = my_session.client.mydb2.bar # Important:: You must pass the session to the operations. await collection_one.insert_one({"abc": 1}, session=my_session) await collection_two.insert_one({"xyz": 999}, session=my_session) # Step 2: Start a client session. async with await client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). await session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' const client = new MongoClient(uri); await client.connect(); // Prereq: Create collections. await client .db('mydb1') .collection('foo') .insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } }); await client .db('mydb2') .collection('bar') .insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } }); // Step 1: Start a Client Session const session = client.startSession(); // Step 2: Optional. Define options to use for the transaction const transactionOptions = { readPreference: 'primary', readConcern: { level: 'local' }, writeConcern: { w: 'majority' } }; // Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error) // Note: The callback for withTransaction MUST be async and/or return a Promise. try { await session.withTransaction(async () => { const coll1 = client.db('mydb1').collection('foo'); const coll2 = client.db('mydb2').collection('bar'); // Important:: You must pass the session to the operations await coll1.insertOne({ abc: 1 }, { session }); await coll2.insertOne({ xyz: 999 }, { session }); }, transactionOptions); } finally { await session.endSession(); await client.close(); }
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
sub runTransactionWithRetry { my ( $txnFunc, $session ) = @_; LOOP: { eval { $txnFunc->($session); # performs transaction }; if ( my $error = $@ ) { print("Transaction aborted-> Caught exception during transaction.\n"); # If transient error, retry the whole transaction if ( $error->has_error_label("TransientTransactionError") ) { print("TransientTransactionError, retrying transaction ->..\n"); redo LOOP; } else { die $error; } } } return; } sub commitWithRetry { my ($session) = @_; LOOP: { eval { $session->commit_transaction(); # Uses write concern set at transaction start. print("Transaction committed->\n"); }; if ( my $error = $@ ) { # Can retry commit if ( $error->has_error_label("UnknownTransactionCommitResult") ) { print("UnknownTransactionCommitResult, retrying commit operation ->..\n"); redo LOOP; } else { print("Error during commit ->..\n"); die $error; } } } return; } # Updates two collections in a transactions sub updateEmployeeInfo { my ($session) = @_; my $employeesCollection = $session->client->ns("hr.employees"); my $eventsCollection = $session->client->ns("reporting.events"); $session->start_transaction( { readConcern => { level => "snapshot" }, writeConcern => { w => "majority" }, readPreference => 'primary', } ); eval { $employeesCollection->update_one( { employee => 3 }, { '$set' => { status => "Inactive" } }, { session => $session}, ); $eventsCollection->insert_one( { employee => 3, status => { new => "Inactive", old => "Active" } }, { session => $session}, ); }; if ( my $error = $@ ) { print("Caught exception during transaction, aborting->\n"); $session->abort_transaction(); die $error; } commitWithRetry($session); } # Start a session my $session = $client->start_session(); eval { runTransactionWithRetry(\&updateEmployeeInfo, $session); }; if ( my $error = $@ ) { # Do something with error } $session->end_session();
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
/* * For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. * uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' * For a sharded cluster, connect to the mongos instances; e.g. * uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' */ $client = new \MongoDB\Client($uriString); // Prerequisite: Create collections. $client->selectCollection( 'mydb1', 'foo', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['abc' => 0]); $client->selectCollection( 'mydb2', 'bar', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['xyz' => 0]); // Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. $callback = function (\MongoDB\Driver\Session $session) use ($client): void { $client ->selectCollection('mydb1', 'foo') ->insertOne(['abc' => 1], ['session' => $session]); $client ->selectCollection('mydb2', 'bar') ->insertOne(['xyz' => 999], ['session' => $session]); }; // Step 2: Start a client session. $session = $client->startSession(); // Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). \MongoDB\with_transaction($session, $callback);
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = MongoClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. def callback(session): collection_one = session.client.mydb1.foo collection_two = session.client.mydb2.bar # Important:: You must pass the session to the operations. collection_one.insert_one({"abc": 1}, session=session) collection_two.insert_one({"xyz": 999}, session=session) # Step 2: Start a client session. with client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000}) # Prereq: Create collections. client.use('mydb1')['foo'].insert_one(abc: 0) client.use('mydb2')['bar'].insert_one(xyz: 0) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. callback = Proc.new do |my_session| collection_one = client.use('mydb1')['foo'] collection_two = client.use('mydb2')['bar'] # Important: You must pass the session to the operations. collection_one.insert_one({'abc': 1}, session: my_session) collection_two.insert_one({'xyz': 999}, session: my_session) end #. Step 2: Start a client session. session = client.start_session # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( read_concern: {level: :local}, write_concern: {w: :majority, wtimeout: 1000}, read: {mode: :primary}, &callback)
此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API
启动事务
执行指定的操作
提交结果或在错误发生时结束事务
服务器端操作中的错误,例如 DuplicateKeyError
,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction()
,也会发生。要添加自定义错误处理,请使用事务上的 核心API。
回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误 或 未知事务提交结果 提交错误后尝试重新运行事务。
从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
// For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. let uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/? // replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g. // let uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; let client = Client::with_uri_str(uri).await?; // Prereq: Create collections. CRUD operations in transactions must be on existing collections. client .database("mydb1") .collection::<Document>("foo") .insert_one(doc! { "abc": 0}) .await?; client .database("mydb2") .collection::<Document>("bar") .insert_one(doc! { "xyz": 0}) .await?; // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transaction. async fn callback(session: &mut ClientSession) -> Result<()> { let collection_one = session .client() .database("mydb1") .collection::<Document>("foo"); let collection_two = session .client() .database("mydb2") .collection::<Document>("bar"); // Important: You must pass the session to the operations. collection_one .insert_one(doc! { "abc": 1 }) .session(&mut *session) .await?; collection_two .insert_one(doc! { "xyz": 999 }) .session(session) .await?; Ok(()) } // Step 2: Start a client session. let mut session = client.start_session().await?; // Step 3: Use and_run to start a transaction, execute the callback, and commit (or // abort on error). session .start_transaction() .and_run((), |session, _| callback(session).boxed()) .await?;
此示例使用了 核心API。由于核心API没有包含对TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑,因此示例中包含了对这些错误的显式重试逻辑
重要
使用适用于您的MongoDB版本的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅 在事务中创建集合和索引。
/* * Copyright 2008-present MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://apache.ac.cn/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.mongodb.scala import org.mongodb.scala.model.{Filters, Updates} import org.mongodb.scala.result.UpdateResult import scala.concurrent.Await import scala.concurrent.duration.Duration //scalastyle:off magic.number class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec { // Implicit functions that execute the Observable and return the results val waitDuration = Duration(5, "seconds") implicit class ObservableExecutor[T](observable: Observable[T]) { def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration) } implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) { def execute(): T = Await.result(observable.toFuture(), waitDuration) } // end implicit functions "The Scala driver" should "be able to commit a transaction" in withClient { client => assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost()) client.getDatabase("hr").drop().execute() client.getDatabase("hr").createCollection("employees").execute() client.getDatabase("hr").createCollection("events").execute() updateEmployeeInfoWithRetry(client).execute() should equal(Completed()) client.getDatabase("hr").drop().execute() should equal(Completed()) } def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = { observable.map(clientSession => { val employeesCollection = database.getCollection("employees") val eventsCollection = database.getCollection("events") val transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() clientSession.startTransaction(transactionOptions) employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")) .subscribe((res: UpdateResult) => println(res)) eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))) .subscribe((res: Completed) => println(res)) clientSession }) } def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => { println("UnknownTransactionCommitResult, retrying commit operation ...") commitAndRetry(observable) } case e: Exception => { println(s"Exception during commit ...: $e") throw e } }) } def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("TransientTransactionError, aborting transaction and retrying ...") runTransactionAndRetry(observable) } }) } def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = { val database = client.getDatabase("hr") val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession()) val commitTransactionObservable: SingleObservable[Completed] = updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction()) val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable) runTransactionAndRetry(commitAndRetryObservable) } }
事务和原子性
对于需要原子性读取和写入多个文档(在单个或多个集合中)的情况,MongoDB支持分布式事务,包括副本集和分片集群的事务。
分布式事务是原子性的
事务要么应用所有数据更改,要么回滚更改。
如果事务提交,则事务中作出的所有数据更改都会保存,并且对事务外可见。
但在事务提交之前,事务中作出的数据更改对事务外不可见。
然而,当事务写入多个分片时,并非所有外部读取操作都需要等待已提交事务的结果在分片间可见。例如,如果事务已提交,且在分片A上可见写入1,但在分片B上写入2尚未可见,则在读取关注度
"local"
的外部读取可以读取写入1的结果,而不会看到写入2。当事务中止时,事务中作出的所有数据更改都会被丢弃,而永远不会变得可见。例如,如果事务中的任何操作失败,事务将中止,并且事务中作出的所有数据更改将永远不会变得可见。
重要
在大多数情况下,分布式事务比单文档写入产生更大的性能开销,因此分布式事务的可用性不应替代有效的模式设计。对于许多场景,非规范化数据模型(内嵌文档和数组)将继续是您数据和用例的最佳选择。也就是说,对于许多场景,适当的数据建模将最大限度地减少分布式事务的需求。
关于额外的交易使用考虑因素(例如运行时限制和oplog大小限制),请参阅生产注意事项。
事务和操作
分布式事务可以跨越多个操作、集合、数据库、文档和分片使用。
关于事务
您可以在事务中创建集合和索引。详细信息请参阅在事务中创建集合和索引
事务中使用的集合可以位于不同的数据库中。
注意
您不能在跨分片写事务中创建新集合。例如,如果您在一个分片中对现有集合进行写入并隐式地在另一个分片中创建集合,MongoDB无法在同一个事务中执行这两个操作。
您不能写入固定集合。
您不能在从固定集合读取时使用读取关注点
"snapshot"
。 (从MongoDB 5.0开始)您不能向
config
、admin
或local
数据库中的集合进行读写。您不能向
system.*
集合进行写入。您不能使用
explain
或类似命令返回受支持操作的查询计划。
您不能将
killCursors
命令指定为事务的第一个操作。此外,如果您在事务中运行
killCursors
命令,服务器将立即停止指定的光标。它不会等待事务提交。
关于不支持在事务中执行的操作列表,请参阅受限操作。
提示
在开始事务前立即创建或删除集合时,如果在事务中访问该集合,则在创建或删除操作中使用"majority"
写关注度,以确保事务可以获取所需的锁。
在事务中创建集合和索引
如果事务不是一个跨分片写事务,您可以在分布式事务中执行以下操作
创建集合。
在同一个事务中之前创建的新空集合上创建索引。
在事务中创建集合时
您可以隐式创建一个集合,例如
对一个不存在的集合执行插入操作,或者
对一个不存在的集合执行更新/查找并修改操作,其中
upsert: true
。
您可以使用
create
命令或其辅助函数db.createCollection()
显式创建一个集合。
当在事务中[1]创建索引时,要创建的索引必须在以下之一
一个不存在的集合上。集合将在操作过程中创建。
在同一个事务中之前创建的新空集合上。
[1] | 您还可以运行db.collection.createIndex() 和db.collection.createIndexes() 来检查现有索引的存在。这些操作在成功返回时不会创建索引。 |
限制
您不能在跨分片写事务中创建新集合。例如,如果您在一个分片中对现有集合进行写入并隐式地在另一个分片中创建集合,MongoDB无法在同一个事务中执行这两个操作。
当针对分片集合时,您不能在事务中使用
$graphLookup
阶段。要在事务中显式创建集合或索引,事务的读取关注级别必须是
"local"
。要显式创建集合和索引,请使用以下命令和方法
计数操作
要在事务中执行计数操作,请使用$count
聚合阶段或$group
(带有$sum
表达式)聚合阶段。
MongoDB驱动程序提供了一种集合级API countDocuments(filter, options)
作为辅助方法,该方法使用$group
与$sum
表达式来执行计数。
mongosh
提供了 db.collection.countDocuments()
辅助方法,该方法使用$group
与$sum
表达式来执行计数。
区分操作
在事务中执行区分操作
对于非分片集合,您可以使用
db.collection.distinct()
方法/distinct
命令,以及带有$group
阶段的聚合管道。对于分片集合,不能使用
db.collection.distinct()
方法或distinct
命令。要找到分片集合的区分值,请使用带有
$group
阶段的聚合管道。例如而不是
db.coll.distinct("x")
,使用db.coll.aggregate([ { $group: { _id: null, distinctValues: { $addToSet: "$x" } } }, { $project: { _id: 0 } } ]) 而不是
db.coll.distinct("x", { status: "A" })
,使用db.coll.aggregate([ { $match: { status: "A" } }, { $group: { _id: null, distinctValues: { $addToSet: "$x" } } }, { $project: { _id: 0 } } ])
管道返回一个文档的光标
{ "distinctValues" : [ 2, 3, 1 ] } 迭代光标以访问结果文档。
信息性操作
信息性命令,如 hello
,buildInfo
,connectionStatus
(及其辅助方法)可以在事务中使用;然而,它们不能作为事务中的第一个操作。
受限操作
以下操作在事务中不允许
在跨分片写事务中创建新集合。例如,如果您在一个分片中向现有集合写入,并在不同分片中隐式创建一个集合,MongoDB无法在同一事务中执行这两个操作。
显式创建集合,例如
db.createCollection()
方法,以及索引,例如db.collection.createIndexes()
和db.collection.createIndex()
方法,当使用不同于"local"
的读取关注级别时。listCollections
和listIndexes
命令及其辅助方法。其他非CRUD和非信息性操作,例如
createUser
、getParameter
、count
等,及其辅助方法。
事务和会话
事务与会话相关联。
会话最多只能有一个打开的事务。
在使用驱动程序时,事务中的每个操作都必须与会话相关联。有关详细信息,请参阅您特定驱动程序的文档。
如果会话结束且存在未完成的事务,则事务将中止。
读取关注/写入关注/读取偏好
事务和读取偏好
事务中的操作使用事务级别的读取偏好。
使用驱动程序,您可以在事务开始时设置事务级别的读取偏好。
如果未设置事务级别的读取偏好,则事务将使用会话级别的读取偏好。
如果未设置事务级别和会话级别的读取偏好,则事务将使用客户端级别的读取偏好。默认情况下,客户端级别的读取偏好为
primary
。
事务和读取关注
事务中的操作使用事务级别的读取关注。这意味着在事务内部忽略集合和数据库级别的读取关注。
您可以在事务开始时设置事务级别的读取关注。
如果未设置事务级别的读取关注,则事务级别的读取关注默认为会话级别的读取关注。
如果事务级别和会话级别的读取关注都没有设置,则事务级别的读取关注默认为客户端级别的读取关注。默认情况下,客户端级别的读取关注在主节点上的读取是
"local"
。另请参阅
事务支持以下读取关注级别
"local"
"majority"
如果事务以写入关注点 "majority"提交,读取关注点
"majority"
返回已被大多数副本集成员确认的数据,并且无法回滚。否则,读取关注点"majority"
不提供读取操作读取大多数已提交数据的任何保证。对于分片集群上的事务,读取关注点
"majority"
不能保证数据来自跨分片相同的快照视图。如果需要快照隔离,请使用读取关注点"snapshot"
。
"snapshot"
读取关注点
"snapshot"
返回的数据是大多数已提交数据的快照,前提是事务以写入关注点 "majority"提交。如果事务在提交时没有使用写入关注点 "majority",则
"snapshot"
读取关注点不保证读取操作使用了大多数已提交数据的快照。对于分片集群上的事务,数据
"snapshot"
视图在分片间是同步的。
事务和写关注
事务使用事务级别的写关注来提交写操作。事务内的写操作必须在没有显式写关注指定的条件下运行,并使用默认写关注。在提交时,使用事务级别的写关注提交的写操作。
提示
不要在事务内的单个写操作中显式设置写关注。在事务内设置单个写操作的写关注将返回错误。
可以在事务开始时设置事务级别的写关注。
如果未设置事务级别的写关注,则事务级别的写关注默认为提交时的会话级别写关注。
如果未设置事务级别的写关注和会话级别的写关注,则事务级别的写关注默认为客户端级别的写关注。
w: "majority"
(在 MongoDB 5.0 及以后版本中),对于包含仲裁者的部署,有所不同。请参阅隐式默认写关注。
事务支持所有写关注 w 值,包括
w: 1
写关注
w: 1
在提交已应用到主节点后返回确认。重要
当你使用
w: 1
提交时,如果你的事务在故障转移中,它可能被回滚。当你使用
w: 1
写关注提交时,事务级别的"majority"
读取关注不能保证事务中的读取操作读取了大多数已提交的数据。当你使用
w: 1
写关注时,事务级"snapshot"
读取关注无法保证事务中的读取操作使用了多数已提交数据的快照。
w: "majority"
写关注
w: "majority"
在提交应用到多数投票成员后返回确认。当你使用
w: "majority"
写关注进行提交时,事务级"majority"
读取关注保证操作读取了多数已提交数据。对于分片集群上的事务,这种多数已提交数据的视图在分片中不是同步的。当你使用
w: "majority"
写关注进行提交时,事务级"snapshot"
读取关注保证操作从同步的多数已提交数据快照中读取。
注意
无论指定了事务的写关注,分片集群事务的提交操作包括一些使用 {w: "majority", j: true}
写关注的部分。
服务器参数 coordinateCommitReturnImmediatelyAfterPersistingDecision
控制何时将事务提交决策返回给客户端。
该参数在MongoDB 5.0中引入,默认值为 true
。在MongoDB 6.1中,默认值变为 false
。
当 coordinateCommitReturnImmediatelyAfterPersistingDecision
为 false
时,分片事务协调器等待所有成员确认 多文档事务 提交后,再将提交决策返回给客户端。
如果你为 "majority"
写关注的多文档事务指定写关注,并且事务无法复制到计算出的多数 副本集 成员,则事务可能不会立即在副本集成员上回滚。副本集最终会保持 最终一致性。事务始终应用于或回滚到所有副本集成员。
无论为事务指定的写关注是什么,当重试w: "majority"
时,驱动程序都将应用事务的写关注作为写关注。
一般信息
以下部分描述了事务的更多考虑因素。
生产考虑因素
有关生产环境中的事务,请参阅生产考虑因素。此外,有关分片集群,请参阅生产考虑因素(分片集群)。
仲裁者
如果副本集有仲裁者,则无法使用事务更改分片键。仲裁者无法参与多分片事务所需的数据操作。
跨越多个分片的写操作将导致错误并中止,如果任何事务操作从一个包含仲裁员的分片读取或写入。
分片配置限制
不能在具有以下设置的sharded集群上运行事务:将 writeConcernMajorityJournalDefault
设置为 false
(例如具有使用 内存存储引擎 的投票成员的分片)。
注意
无论指定了事务的写关注,分片集群事务的提交操作包括一些使用 {w: "majority", j: true}
写关注的部分。
诊断
要获取事务状态和指标,请使用以下方法
来源 | 返回 |
---|---|
$currentOp 聚合管道 | 返回
|
currentOp 命令 | 返回
|
包括有关慢事务(超过 operationProfiling.slowOpThresholdMs 阈值的操作)在 TXN 日志组件中的信息。 |
功能兼容性版本 (fCV)
要使用事务,部署中所有成员的 featureCompatibilityVersion 必须至少为
部署 | 最小 featureCompatibilityVersion |
---|---|
副本集 | 4.0 |
分片集群 | 4.2 |
要检查成员的 fCV,连接到该成员并运行以下命令
db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )
有关更多信息,请参阅 setFeatureCompatibilityVersion
参考页面。
存储引擎
分布式事务 在副本集和分片集群中受支持,其中
主节点使用 WiredTiger 存储引擎,并且
二级成员可以使用WiredTiger存储引擎或内存存储引擎。
注意
无法在设置了writeConcernMajorityJournalDefault
为false
的分片集群上运行事务,例如使用了内存存储引擎的投票成员所在的分片。
限制关键部分等待时间
从MongoDB 5.2(以及5.0.4)开始
要限制分片在事务中等待关键部分的时间,请使用
metadataRefreshInTransactionMaxWaitBehindCritSecMS
参数。