驱动程序 API
回调 API 与核心 API 的比较
启动事务,执行指定的操作,并提交(或在出错时回滚)。
自动为
TransientTransactionError
和UnknownTransactionCommitResult
错误添加错误处理逻辑。
需要显式调用以启动事务和提交事务。
不包含对
TransientTransactionError
和UnknownTransactionCommitResult
错误的错误处理逻辑,而是提供了对这些错误进行自定义错误处理的灵活性。
回调API
回调API包含逻辑
在事务遇到
TransientTransactionError
错误时,整个事务将重新尝试。在提交操作遇到
UnknownTransactionCommitResult
错误时,将重新尝试提交操作。
从MongoDB 6.2版本开始,如果服务器收到TransactionTooLargeForCache
错误,服务器不会重试事务。
示例
➤使用右上角的选择语言下拉菜单设置此页面的示例语言。
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
static bool with_transaction_example (bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_write_concern_t *wc = NULL; mongoc_collection_t *coll = NULL; bool success = false; bool ret = false; bson_t *doc = NULL; bson_t *insert_opts = NULL; mongoc_client_session_t *session = NULL; mongoc_transaction_opt_t *txn_opts = NULL; /* For a replica set, include the replica set name and a seedlist of the * members in the URI string; e.g. * uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \ * "27017/?replicaSet=myRepl"; * client = mongoc_client_new (uri_repl); * For a sharded cluster, connect to the mongos instances; e.g. * uri_sharded = * "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; * client = mongoc_client_new (uri_sharded); */ client = get_client (); /* Prereq: Create collections. Note Atlas connection strings include a majority write * concern by default. */ wc = mongoc_write_concern_new (); mongoc_write_concern_set_wmajority (wc, 0); insert_opts = bson_new (); mongoc_write_concern_append (wc, insert_opts); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } /* Step 1: Start a client session. */ session = mongoc_client_start_session (client, NULL /* opts */, error); if (!session) { goto fail; } /* Step 2: Optional. Define options to use for the transaction. */ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_write_concern (txn_opts, wc); /* Step 3: Use mongoc_client_session_with_transaction to start a transaction, * execute the callback, and commit (or abort on error). */ ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error); if (!ret) { goto fail; } success = true; fail: bson_destroy (doc); mongoc_collection_destroy (coll); bson_destroy (insert_opts); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); mongoc_client_session_destroy (session); mongoc_client_destroy (client); return success; } /* Define the callback that specifies the sequence of operations to perform * inside the transactions. */ static bool callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_collection_t *coll = NULL; bson_t *doc = NULL; bool success = false; bool ret = false; BSON_UNUSED (ctx); client = mongoc_client_session_get_client (session); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (1)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (999)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } success = true; fail: mongoc_collection_destroy (coll); bson_destroy (doc); return success; }
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
// The mongocxx::instance constructor and destructor initialize and shut down the driver, // respectively. Therefore, a mongocxx::instance must be created before using the driver and // must remain alive for as long as the driver is in use. mongocxx::instance inst{}; // For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. // uriString = // 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}}; // Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be // needed. The suggested Atlas connection string includes majority write concern by default. write_concern wc_majority{}; wc_majority.acknowledge_level(write_concern::level::k_majority); // Prereq: Create collections. auto foo = client["mydb1"]["foo"]; auto bar = client["mydb2"]["bar"]; try { options::insert opts; opts.write_concern(wc_majority); foo.insert_one(make_document(kvp("abc", 0)), opts); bar.insert_one(make_document(kvp("xyz", 0)), opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred while inserting: " << e.what() << std::endl; return EXIT_FAILURE; } // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transactions. client_session::with_transaction_cb callback = [&](client_session* session) { // Important:: You must pass the session to the operations. foo.insert_one(*session, make_document(kvp("abc", 1))); bar.insert_one(*session, make_document(kvp("xyz", 999))); }; // Step 2: Start a client session auto session = client.start_session(); // Step 3: Use with_transaction to start a transaction, execute the callback, // and commit (or abort on error). try { options::transaction opts; opts.write_concern(wc_majority); session.with_transaction(callback, opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS;
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"; // For a sharded cluster, connect to the mongos instances; e.g. // string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; var client = new MongoClient(connectionString); // Prereq: Create collections. var database1 = client.GetDatabase("mydb1"); var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority); collection1.InsertOne(new BsonDocument("abc", 0)); var database2 = client.GetDatabase("mydb2"); var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority); collection2.InsertOne(new BsonDocument("xyz", 0)); // Step 1: Start a client session. using (var session = client.StartSession()) { // Step 2: Optional. Define options to use for the transaction. var transactionOptions = new TransactionOptions( writeConcern: WriteConcern.WMajority); // Step 3: Define the sequence of operations to perform inside the transactions var cancellationToken = CancellationToken.None; // normally a real token would be used result = session.WithTransaction( (s, ct) => { try { collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct); collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct); } catch (MongoWriteException) { // Do something in response to the exception throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur. } return "Inserted into collections in different databases"; }, transactionOptions, cancellationToken); }
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
// WithTransactionExample is an example of using the Session.WithTransaction function. func WithTransactionExample(ctx context.Context) error { // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl" // For a sharded cluster, connect to the mongos instances; e.g. // uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/" uri := mtest.ClusterURI() clientOpts := options.Client().ApplyURI(uri) client, err := mongo.Connect(clientOpts) if err != nil { return err } defer func() { _ = client.Disconnect(ctx) }() // Prereq: Create collections. wcMajority := writeconcern.Majority() wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority) fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts) barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts) // Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction. callback := func(sesctx context.Context) (interface{}, error) { // Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the // transaction. if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil { return nil, err } if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil { return nil, err } return nil, nil } // Step 2: Start a session and run the callback using WithTransaction. session, err := client.StartSession() if err != nil { return err } defer session.EndSession(ctx) result, err := session.WithTransaction(ctx, callback) if err != nil { return err } log.Printf("result: %v\n", result) return nil }
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
/* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g. String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin"; */ final MongoClient client = MongoClients.create(uri); /* Create collections. */ client.getDatabase("mydb1").getCollection("foo") .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0)); client.getDatabase("mydb2").getCollection("bar") .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0)); /* Step 1: Start a client session. */ final ClientSession clientSession = client.startSession(); /* Step 2: Optional. Define options to use for the transaction. */ TransactionOptions txnOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.LOCAL) .writeConcern(WriteConcern.MAJORITY) .build(); /* Step 3: Define the sequence of operations to perform inside the transactions. */ TransactionBody txnBody = new TransactionBody<String>() { public String execute() { MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo"); MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar"); /* Important:: You must pass the session to the operations.. */ coll1.insertOne(clientSession, new Document("abc", 1)); coll2.insertOne(clientSession, new Document("xyz", 999)); return "Inserted into collections in different databases"; } }; try { /* Step 4: Use .withTransaction() to start a transaction, execute the callback, and commit (or abort on error). */ clientSession.withTransaction(txnBody, txnOptions); } catch (RuntimeException e) { // some error handling } finally { clientSession.close(); }
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = AsyncIOMotorClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. async def callback(my_session): collection_one = my_session.client.mydb1.foo collection_two = my_session.client.mydb2.bar # Important:: You must pass the session to the operations. await collection_one.insert_one({"abc": 1}, session=my_session) await collection_two.insert_one({"xyz": 999}, session=my_session) # Step 2: Start a client session. async with await client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). await session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' const client = new MongoClient(uri); await client.connect(); // Prereq: Create collections. await client .db('mydb1') .collection('foo') .insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } }); await client .db('mydb2') .collection('bar') .insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } }); // Step 1: Start a Client Session const session = client.startSession(); // Step 2: Optional. Define options to use for the transaction const transactionOptions = { readPreference: 'primary', readConcern: { level: 'local' }, writeConcern: { w: 'majority' } }; // Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error) // Note: The callback for withTransaction MUST be async and/or return a Promise. try { await session.withTransaction(async () => { const coll1 = client.db('mydb1').collection('foo'); const coll2 = client.db('mydb2').collection('bar'); // Important:: You must pass the session to the operations await coll1.insertOne({ abc: 1 }, { session }); await coll2.insertOne({ xyz: 999 }, { session }); }, transactionOptions); } finally { await session.endSession(); await client.close(); }
注意
对于Perl驱动程序,请参阅核心API使用示例。
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
/* * For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. * uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' * For a sharded cluster, connect to the mongos instances; e.g. * uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' */ $client = new \MongoDB\Client($uriString); // Prerequisite: Create collections. $client->selectCollection( 'mydb1', 'foo', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['abc' => 0]); $client->selectCollection( 'mydb2', 'bar', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['xyz' => 0]); // Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. $callback = function (\MongoDB\Driver\Session $session) use ($client): void { $client ->selectCollection('mydb1', 'foo') ->insertOne(['abc' => 1], ['session' => $session]); $client ->selectCollection('mydb2', 'bar') ->insertOne(['xyz' => 999], ['session' => $session]); }; // Step 2: Start a client session. $session = $client->startSession(); // Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). \MongoDB\with_transaction($session, $callback);
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = MongoClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. def callback(session): collection_one = session.client.mydb1.foo collection_two = session.client.mydb2.bar # Important:: You must pass the session to the operations. collection_one.insert_one({"abc": 1}, session=session) collection_two.insert_one({"xyz": 999}, session=session) # Step 2: Start a client session. with client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
重要
使用与您的MongoDB版本兼容的MongoDB驱动程序。
在使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式地创建集合。请参阅在事务中创建集合和索引。
示例使用新的回调API来处理事务,该API启动事务,执行指定的操作,并提交(或在错误时中止)。新的回调API包含了用于处理TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000}) # Prereq: Create collections. client.use('mydb1')['foo'].insert_one(abc: 0) client.use('mydb2')['bar'].insert_one(xyz: 0) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. callback = Proc.new do |my_session| collection_one = client.use('mydb1')['foo'] collection_two = client.use('mydb2')['bar'] # Important: You must pass the session to the operations. collection_one.insert_one({'abc': 1}, session: my_session) collection_two.insert_one({'xyz': 999}, session: my_session) end #. Step 2: Start a client session. session = client.start_session # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( read_concern: {level: :local}, write_concern: {w: :majority, wtimeout: 1000}, read: {mode: :primary}, &callback)
注意
对于Scala驱动程序,请参阅核心API使用示例。
核心API
核心事务API不包含对标记为
TransientTransactionError
的错误的重试逻辑。如果事务中的操作返回一个标记为TransientTransactionError
的错误,则整个事务可以重试。为了处理
TransientTransactionError
,应用程序应显式地包含错误的重试逻辑。UnknownTransactionCommitResult
。如果提交返回一个标记为UnknownTransactionCommitResult
的错误,则可以重试提交。为了处理
UnknownTransactionCommitResult
,应用程序应显式地包含错误的重试逻辑。
示例
➤使用右上角的选择语言下拉菜单设置此页面的示例语言。
以下示例包含逻辑以重试事务中的瞬态错误和重试未知提交错误。
/* takes a session, an out-param for server reply, and out-param for error. */ typedef bool (*txn_func_t) (mongoc_client_session_t *, bson_t *, bson_error_t *); /* runs transactions with retry logic */ bool run_transaction_with_retry (txn_func_t txn_func, mongoc_client_session_t *cs, bson_error_t *error) { bson_t reply; bool r; while (true) { /* perform transaction */ r = txn_func (cs, &reply, error); if (r) { /* success */ bson_destroy (&reply); return true; } MONGOC_WARNING ("Transaction aborted: %s", error->message); if (mongoc_error_has_label (&reply, "TransientTransactionError")) { /* on transient error, retry the whole transaction */ MONGOC_WARNING ("TransientTransactionError, retrying transaction..."); bson_destroy (&reply); } else { /* non-transient error */ break; } } bson_destroy (&reply); return false; } /* commit transactions with retry logic */ bool commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error) { bson_t reply; bool r; while (true) { /* commit uses write concern set at transaction start, see * mongoc_transaction_opts_set_write_concern */ r = mongoc_client_session_commit_transaction (cs, &reply, error); if (r) { MONGOC_DEBUG ("Transaction committed"); break; } if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) { MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ..."); bson_destroy (&reply); } else { /* commit failed, cannot retry */ break; } } bson_destroy (&reply); return r; } /* updates two collections in a transaction and calls commit_with_retry */ bool update_employee_info (mongoc_client_session_t *cs, bson_t *reply, bson_error_t *error) { mongoc_client_t *client; mongoc_collection_t *employees; mongoc_collection_t *events; mongoc_read_concern_t *rc; mongoc_write_concern_t *wc; mongoc_transaction_opt_t *txn_opts; bson_t opts = BSON_INITIALIZER; bson_t *filter = NULL; bson_t *update = NULL; bson_t *event = NULL; bool r; bson_init (reply); client = mongoc_client_session_get_client (cs); employees = mongoc_client_get_collection (client, "hr", "employees"); events = mongoc_client_get_collection (client, "reporting", "events"); rc = mongoc_read_concern_new (); mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); wc = mongoc_write_concern_new (); mongoc_write_concern_set_w ( wc, MONGOC_WRITE_CONCERN_W_MAJORITY); /* Atlas connection strings include majority by default*/ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_read_concern (txn_opts, rc); mongoc_transaction_opts_set_write_concern (txn_opts, wc); r = mongoc_client_session_start_transaction (cs, txn_opts, error); if (!r) { goto done; } r = mongoc_client_session_append (cs, &opts, error); if (!r) { goto done; } filter = BCON_NEW ("employee", BCON_INT32 (3)); update = BCON_NEW ("$set", "{", "status", "Inactive", "}"); /* mongoc_collection_update_one will reinitialize reply */ bson_destroy (reply); r = mongoc_collection_update_one (employees, filter, update, &opts, reply, error); if (!r) { goto abort; } event = BCON_NEW ("employee", BCON_INT32 (3)); BCON_APPEND (event, "status", "{", "new", "Inactive", "old", "Active", "}"); bson_destroy (reply); r = mongoc_collection_insert_one (events, event, &opts, reply, error); if (!r) { goto abort; } r = commit_with_retry (cs, error); abort: if (!r) { MONGOC_ERROR ("Aborting due to error in transaction: %s", error->message); mongoc_client_session_abort_transaction (cs, NULL); } done: mongoc_collection_destroy (employees); mongoc_collection_destroy (events); mongoc_read_concern_destroy (rc); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); bson_destroy (&opts); bson_destroy (filter); bson_destroy (update); bson_destroy (event); return r; } void example_func (mongoc_client_t *client) { mongoc_client_session_t *cs; bson_error_t error; bool r; ASSERT (client); cs = mongoc_client_start_session (client, NULL, &error); if (!cs) { MONGOC_ERROR ("Could not start session: %s", error.message); return; } r = run_transaction_with_retry (update_employee_info, cs, &error); if (!r) { MONGOC_ERROR ("Could not update employee, permanent error: %s", error.message); } mongoc_client_session_destroy (cs); }
using transaction_func = std::function<void(client_session & session)>; auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) { while (true) { try { txn_func(session); // performs transaction. break; } catch (const operation_exception& oe) { std::cout << "Transaction aborted. Caught exception during transaction." << std::endl; // If transient error, retry the whole transaction. if (oe.has_error_label("TransientTransactionError")) { std::cout << "TransientTransactionError, retrying transaction ..." << std::endl; continue; } else { throw oe; } } } }; auto commit_with_retry = [](client_session& session) { while (true) { try { session.commit_transaction(); // Uses write concern set at transaction start. std::cout << "Transaction committed." << std::endl; break; } catch (const operation_exception& oe) { // Can retry commit if (oe.has_error_label("UnknownTransactionCommitResult")) { std::cout << "UnknownTransactionCommitResult, retrying commit operation ..." << std::endl; continue; } else { std::cout << "Error during commit ..." << std::endl; throw oe; } } } }; // Updates two collections in a transaction auto update_employee_info = [&](client_session& session) { auto& client = session.client(); auto employees = client["hr"]["employees"]; auto events = client["reporting"]["events"]; options::transaction txn_opts; read_concern rc; rc.acknowledge_level(read_concern::level::k_snapshot); txn_opts.read_concern(rc); write_concern wc; wc.acknowledge_level(write_concern::level::k_majority); txn_opts.write_concern(wc); session.start_transaction(txn_opts); try { employees.update_one( make_document(kvp("employee", 3)), make_document(kvp("$set", make_document(kvp("status", "Inactive"))))); events.insert_one(make_document( kvp("employee", 3), kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active"))))); } catch (const operation_exception& oe) { std::cout << "Caught exception during transaction, aborting." << std::endl; session.abort_transaction(); throw oe; } commit_with_retry(session); }; auto session = client.start_session(); try { run_transaction_with_retry(update_employee_info, session); } catch (const operation_exception& oe) { // Do something with error. throw oe; }
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session) { while (true) { try { txnFunc(client, session); // performs transaction break; } catch (MongoException exception) { // if transient error, retry the whole transaction if (exception.HasErrorLabel("TransientTransactionError")) { Console.WriteLine("TransientTransactionError, retrying transaction."); continue; } else { throw; } } } } public void CommitWithRetry(IClientSessionHandle session) { while (true) { try { session.CommitTransaction(); Console.WriteLine("Transaction committed."); break; } catch (MongoException exception) { // can retry commit if (exception.HasErrorLabel("UnknownTransactionCommitResult")) { Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation"); continue; } else { Console.WriteLine($"Error during commit: {exception.Message}."); throw; } } } } // updates two collections in a transaction public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session) { var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees"); var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events"); session.StartTransaction(new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority)); try { employeesCollection.UpdateOne( session, Builders<BsonDocument>.Filter.Eq("employee", 3), Builders<BsonDocument>.Update.Set("status", "Inactive")); eventsCollection.InsertOne( session, new BsonDocument { { "employee", 3 }, { "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } } }); } catch (Exception exception) { Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}."); session.AbortTransaction(); throw; } CommitWithRetry(session); } public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client) { // start a session using (var session = client.StartSession()) { try { RunTransactionWithRetry(UpdateEmployeeInfo, client, session); } catch (Exception exception) { // do something with error Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}."); } } }
runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error { for { err := txnFn(ctx) // Performs transaction. if err == nil { return nil } log.Println("Transaction aborted. Caught exception during transaction.") // If transient error, retry the whole transaction if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") { log.Println("TransientTransactionError, retrying transaction...") continue } return err } } commitWithRetry := func(ctx context.Context) error { sess := mongo.SessionFromContext(ctx) for { err := sess.CommitTransaction(ctx) switch e := err.(type) { case nil: log.Println("Transaction committed.") return nil case mongo.CommandError: // Can retry commit if e.HasErrorLabel("UnknownTransactionCommitResult") { log.Println("UnknownTransactionCommitResult, retrying commit operation...") continue } log.Println("Error during commit...") return e default: log.Println("Error during commit...") return e } } } // Updates two collections in a transaction. updateEmployeeInfo := func(ctx context.Context) error { employees := client.Database("hr").Collection("employees") events := client.Database("reporting").Collection("events") sess := mongo.SessionFromContext(ctx) err := sess.StartTransaction(options.Transaction(). SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.Majority()), ) if err != nil { return err } _, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}}) if err != nil { sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } _, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}}) if err != nil { sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } return commitWithRetry(ctx) } txnOpts := options.Transaction().SetReadPreference(readpref.Primary()) return client.UseSessionWithOptions( ctx, options.Session().SetDefaultTransactionOptions(txnOpts), func(ctx context.Context) error { return runTransactionWithRetry(ctx, updateEmployeeInfo) }, ) }
重要
要将读写操作与事务关联,您必须在事务中的每个操作中传递会话。
void runTransactionWithRetry(Runnable transactional) { while (true) { try { transactional.run(); break; } catch (MongoException e) { System.out.println("Transaction aborted. Caught exception during transaction."); if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { System.out.println("TransientTransactionError, aborting transaction and retrying ..."); continue; } else { throw e; } } } } void commitWithRetry(ClientSession clientSession) { while (true) { try { clientSession.commitTransaction(); System.out.println("Transaction committed"); break; } catch (MongoException e) { // can retry commit if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) { System.out.println("UnknownTransactionCommitResult, retrying commit operation ..."); continue; } else { System.out.println("Exception during commit ..."); throw e; } } } } void updateEmployeeInfo() { MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees"); MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events"); TransactionOptions txnOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .build(); try (ClientSession clientSession = client.startSession()) { clientSession.startTransaction(txnOptions); employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")); eventsCollection.insertOne(clientSession, new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active"))); commitWithRetry(clientSession); } } void updateEmployeeInfoWithRetry() { runTransactionWithRetry(this::updateEmployeeInfo); }
注意
对于Motor,请参阅回调API。
重要
要将读写操作与事务关联,您必须在事务中的每个操作中传递会话。
async function commitWithRetry(session) { try { await session.commitTransaction(); console.log('Transaction committed.'); } catch (error) { if (error.hasErrorLabel('UnknownTransactionCommitResult')) { console.log('UnknownTransactionCommitResult, retrying commit operation ...'); await commitWithRetry(session); } else { console.log('Error during commit ...'); throw error; } } } async function runTransactionWithRetry(txnFunc, client, session) { try { await txnFunc(client, session); } catch (error) { console.log('Transaction aborted. Caught exception during transaction.'); // If transient error, retry the whole transaction if (error.hasErrorLabel('TransientTransactionError')) { console.log('TransientTransactionError, retrying transaction ...'); await runTransactionWithRetry(txnFunc, client, session); } else { throw error; } } } async function updateEmployeeInfo(client, session) { session.startTransaction({ readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' }, readPreference: 'primary' }); const employeesCollection = client.db('hr').collection('employees'); const eventsCollection = client.db('reporting').collection('events'); await employeesCollection.updateOne( { employee: 3 }, { $set: { status: 'Inactive' } }, { session } ); await eventsCollection.insertOne( { employee: 3, status: { new: 'Inactive', old: 'Active' } }, { session } ); try { await commitWithRetry(session); } catch (error) { await session.abortTransaction(); throw error; } } return client.withSession(session => runTransactionWithRetry(updateEmployeeInfo, client, session) );
重要
要将读写操作与事务关联,您必须在事务中的每个操作中传递会话。
sub runTransactionWithRetry { my ( $txnFunc, $session ) = @_; LOOP: { eval { $txnFunc->($session); # performs transaction }; if ( my $error = $@ ) { print("Transaction aborted-> Caught exception during transaction.\n"); # If transient error, retry the whole transaction if ( $error->has_error_label("TransientTransactionError") ) { print("TransientTransactionError, retrying transaction ->..\n"); redo LOOP; } else { die $error; } } } return; } sub commitWithRetry { my ($session) = @_; LOOP: { eval { $session->commit_transaction(); # Uses write concern set at transaction start. print("Transaction committed->\n"); }; if ( my $error = $@ ) { # Can retry commit if ( $error->has_error_label("UnknownTransactionCommitResult") ) { print("UnknownTransactionCommitResult, retrying commit operation ->..\n"); redo LOOP; } else { print("Error during commit ->..\n"); die $error; } } } return; } # Updates two collections in a transactions sub updateEmployeeInfo { my ($session) = @_; my $employeesCollection = $session->client->ns("hr.employees"); my $eventsCollection = $session->client->ns("reporting.events"); $session->start_transaction( { readConcern => { level => "snapshot" }, writeConcern => { w => "majority" }, readPreference => 'primary', } ); eval { $employeesCollection->update_one( { employee => 3 }, { '$set' => { status => "Inactive" } }, { session => $session}, ); $eventsCollection->insert_one( { employee => 3, status => { new => "Inactive", old => "Active" } }, { session => $session}, ); }; if ( my $error = $@ ) { print("Caught exception during transaction, aborting->\n"); $session->abort_transaction(); die $error; } commitWithRetry($session); } # Start a session my $session = $client->start_session(); eval { runTransactionWithRetry(\&updateEmployeeInfo, $session); }; if ( my $error = $@ ) { # Do something with error } $session->end_session();
重要
要将读写操作与事务关联,您必须在事务中的每个操作中传递会话。
private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session): void { while (true) { try { $txnFunc($client, $session); // performs transaction break; } catch (\MongoDB\Driver\Exception\CommandException $error) { $resultDoc = $error->getResultDocument(); // If transient error, retry the whole transaction if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) { continue; } else { throw $error; } } catch (\MongoDB\Driver\Exception\Exception $error) { throw $error; } } } private function commitWithRetry3(\MongoDB\Driver\Session $session): void { while (true) { try { $session->commitTransaction(); echo "Transaction committed.\n"; break; } catch (\MongoDB\Driver\Exception\CommandException $error) { $resultDoc = $error->getResultDocument(); if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) { echo "UnknownTransactionCommitResult, retrying commit operation ...\n"; continue; } else { echo "Error during commit ...\n"; throw $error; } } catch (\MongoDB\Driver\Exception\Exception $error) { echo "Error during commit ...\n"; throw $error; } } } private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session): void { $session->startTransaction([ 'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'), 'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::PRIMARY), 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY), ]); try { $client->hr->employees->updateOne( ['employee' => 3], ['$set' => ['status' => 'Inactive']], ['session' => $session], ); $client->reporting->events->insertOne( ['employee' => 3, 'status' => ['new' => 'Inactive', 'old' => 'Active']], ['session' => $session], ); } catch (\MongoDB\Driver\Exception\Exception $error) { echo "Caught exception during transaction, aborting.\n"; $session->abortTransaction(); throw $error; } $this->commitWithRetry3($session); } private function doUpdateEmployeeInfo(\MongoDB\Client $client): void { // Start a session. $session = $client->startSession(); try { $this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session); } catch (\MongoDB\Driver\Exception\Exception) { // Do something with error } }
重要
要将读写操作与事务关联,您必须在事务中的每个操作中传递会话。
def run_transaction_with_retry(txn_func, session): while True: try: txn_func(session) # performs transaction break except (ConnectionFailure, OperationFailure) as exc: # If transient error, retry the whole transaction if exc.has_error_label("TransientTransactionError"): print("TransientTransactionError, retrying transaction ...") continue else: raise def commit_with_retry(session): while True: try: # Commit uses write concern set at transaction start. session.commit_transaction() print("Transaction committed.") break except (ConnectionFailure, OperationFailure) as exc: # Can retry commit if exc.has_error_label("UnknownTransactionCommitResult"): print("UnknownTransactionCommitResult, retrying commit operation ...") continue else: print("Error during commit ...") raise # Updates two collections in a transactions def update_employee_info(session): employees_coll = session.client.hr.employees events_coll = session.client.reporting.events with session.start_transaction( read_concern=ReadConcern("snapshot"), write_concern=WriteConcern(w="majority"), read_preference=ReadPreference.PRIMARY, ): employees_coll.update_one( {"employee": 3}, {"$set": {"status": "Inactive"}}, session=session ) events_coll.insert_one( {"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session ) commit_with_retry(session) # Start a session. with client.start_session() as session: try: run_transaction_with_retry(update_employee_info, session) except Exception: # Do something with error. raise
重要
要将读写操作与事务关联,您必须在事务中的每个操作中传递会话。
def run_transaction_with_retry(session) begin yield session # performs transaction rescue Mongo::Error => e puts 'Transaction aborted. Caught exception during transaction.' raise unless e.label?('TransientTransactionError') puts "TransientTransactionError, retrying transaction ..." retry end end def commit_with_retry(session) begin session.commit_transaction puts 'Transaction committed.' rescue Mongo::Error => e if e.label?('UnknownTransactionCommitResult') puts "UnknownTransactionCommitResult, retrying commit operation ..." retry else puts 'Error during commit ...' raise end end end # updates two collections in a transaction def update_employee_info(session) employees_coll = session.client.use(:hr)[:employees] events_coll = session.client.use(:reporting)[:events] session.start_transaction(read_concern: { level: :snapshot }, write_concern: { w: :majority }, read: {mode: :primary}) employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} }, session: session) events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } }, session: session) commit_with_retry(session) end session = client.start_session begin run_transaction_with_retry(session) do update_employee_info(session) end rescue StandardError => e # Do something with error raise end
重要
要将读写操作与事务关联,您必须在事务中的每个操作中传递会话。
/* * Copyright 2008-present MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://apache.ac.cn/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.mongodb.scala import org.mongodb.scala.model.{Filters, Updates} import org.mongodb.scala.result.UpdateResult import scala.concurrent.Await import scala.concurrent.duration.Duration //scalastyle:off magic.number class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec { // Implicit functions that execute the Observable and return the results val waitDuration = Duration(5, "seconds") implicit class ObservableExecutor[T](observable: Observable[T]) { def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration) } implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) { def execute(): T = Await.result(observable.toFuture(), waitDuration) } // end implicit functions "The Scala driver" should "be able to commit a transaction" in withClient { client => assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost()) client.getDatabase("hr").drop().execute() client.getDatabase("hr").createCollection("employees").execute() client.getDatabase("hr").createCollection("events").execute() updateEmployeeInfoWithRetry(client).execute() should equal(Completed()) client.getDatabase("hr").drop().execute() should equal(Completed()) } def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = { observable.map(clientSession => { val employeesCollection = database.getCollection("employees") val eventsCollection = database.getCollection("events") val transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() clientSession.startTransaction(transactionOptions) employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")) .subscribe((res: UpdateResult) => println(res)) eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))) .subscribe((res: Completed) => println(res)) clientSession }) } def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => { println("UnknownTransactionCommitResult, retrying commit operation ...") commitAndRetry(observable) } case e: Exception => { println(s"Exception during commit ...: $e") throw e } }) } def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("TransientTransactionError, aborting transaction and retrying ...") runTransactionAndRetry(observable) } }) } def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = { val database = client.getDatabase("hr") val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession()) val commitTransactionObservable: SingleObservable[Completed] = updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction()) val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable) runTransactionAndRetry(commitAndRetryObservable) } }
驱动程序版本
事务错误处理
无论数据库系统是MongoDB还是关系型数据库,应用程序都应在事务提交期间采取错误处理措施,并包含事务的重试逻辑。
TransientTransactionError
事务内部的单个写操作不可重试,无论retryWrites
的值如何。如果操作遇到与标签“TransientTransactionError” “TransientTransactionError”
相关的错误,例如主节点下线时,整个事务可以被重试。
回调API包含了针对
“TransientTransactionError”
的重试逻辑。核心事务API不包含对
"TransientTransactionError"
的重试逻辑。为了处理"TransientTransactionError"
,应用程序应显式地包含错误的重试逻辑。要查看包含对瞬时错误重试逻辑的示例,请参阅核心API示例。
UnknownTransactionCommitResult
提交操作是可重试的写操作。如果提交操作遇到错误,MongoDB驱动程序会重试提交,不管retryWrites
的值是多少。
如果提交操作遇到标记为"UnknownTransactionCommitResult"
的错误,则可以重试提交。
回调API包含对
"UnknownTransactionCommitResult"
的重试逻辑。核心事务API不包含对
"UnknownTransactionCommitResult"
的重试逻辑。为了处理"UnknownTransactionCommitResult"
,应用程序应显式地包含错误的重试逻辑。要查看包含对未知提交错误重试逻辑的示例,请参阅核心API示例。
TransactionTooLargeForCache
新功能自版本6.2.
从MongoDB 6.2版本开始,如果服务器接收到TransactionTooLargeForCache
错误,则不会重试事务。此错误表示缓存太小,重试很可能会失败。
transactionTooLargeForCacheThreshold
阈值的默认值是0.75
。当事务使用超过75%的缓存时,服务器返回TransactionTooLargeForCache
而不是重试事务。
在MongoDB的早期版本中,服务器返回 TemporarilyUnavailable
或 WriteConflict
,而不是返回 TransactionTooLargeForCache
。
使用 setParameter
命令来修改错误阈值。
更多信息
mongosh
示例
以下 mongosh
方法可用于事务处理
// Create collections: db.getSiblingDB("mydb1").foo.insertOne( {abc: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } ) db.getSiblingDB("mydb2").bar.insertOne( {xyz: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } ) // Start a session. session = db.getMongo().startSession( { readPreference: { mode: "primary" } } ); coll1 = session.getDatabase("mydb1").foo; coll2 = session.getDatabase("mydb2").bar; // Start a transaction session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } ); // Operations inside the transaction try { coll1.insertOne( { abc: 1 } ); coll2.insertOne( { xyz: 999 } ); } catch (error) { // Abort transaction on error session.abortTransaction(); throw error; } // Commit the transaction using write concern set at transaction start session.commitTransaction(); session.endSession();