文档菜单
文档首页
/
MongoDB 手册

事务

本页内容

  • 事务 API
  • 事务和原子性
  • 事务和操作
  • 事务和会话
  • 读取关注/写入关注/读取偏好
  • 一般信息
  • 了解更多

在 MongoDB 中,对单个文档的操作是原子的。因为您可以使用嵌入式文档和数组来捕获单个文档结构中数据之间的关系,而不是在多个文档和集合中归一化,这种单文档原子性消除了许多实际用例中分布式事务的需求。

对于需要读取和写入多个文档原子性的情况(在一个或多个集合中),MongoDB 支持分布式事务。使用分布式事务,事务可以跨越多个操作、集合、数据库、文档和分片。

本页面的信息适用于以下环境中的部署

  • MongoDB Atlas:云中 MongoDB 部署的完全托管服务

  • MongoDB企业版:基于订阅的自管理MongoDB版本

  • MongoDB社区版:源代码可用、免费使用并自管理的MongoDB版本


使用右上角的选择语言下拉菜单设置以下示例的语言。


此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

static bool
with_transaction_example (bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client ();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 0);
insert_opts = bson_new ();
mongoc_write_concern_append (wc, insert_opts);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (0));
ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session (client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new ();
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy (doc);
mongoc_collection_destroy (coll);
bson_destroy (insert_opts);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
mongoc_client_session_destroy (session);
mongoc_client_destroy (client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
BSON_UNUSED (ctx);
client = mongoc_client_session_get_client (session);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (1));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (999));
ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy (coll);
bson_destroy (doc);
return success;
}

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
// Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be
// needed. The suggested Atlas connection string includes majority write concern by default.
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
session.with_transaction(callback, opts);
} catch (const mongocxx::exception& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (interface{}, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances.
For example:
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations.
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)

此示例突出了事务API的关键组件。特别是,它使用了回调API。回调API

  • 启动事务

  • 执行指定的操作

  • 提交结果或在错误发生时结束事务

服务器端操作中的错误,例如 DuplicateKeyError,可以结束事务并导致命令错误,提示用户事务已结束。这种行为是预期的,即使客户端从未调用 Session.abortTransaction(),也会发生。要添加自定义错误处理,请使用事务上的 核心API

回调API对某些错误实现了重试逻辑。驱动程序在遇到 瞬时事务错误未知事务提交结果 提交错误后尝试重新运行事务。

从MongoDB 6.2版本开始,如果服务器收到 TransactionTooLargeForCache 错误,则不会重试事务。

重要

// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g. let uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?
// replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g.
// let uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
let client = Client::with_uri_str(uri).await?;
// Prereq: Create collections. CRUD operations in transactions must be on existing collections.
client
.database("mydb1")
.collection::<Document>("foo")
.insert_one(doc! { "abc": 0})
.await?;
client
.database("mydb2")
.collection::<Document>("bar")
.insert_one(doc! { "xyz": 0})
.await?;
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transaction.
async fn callback(session: &mut ClientSession) -> Result<()> {
let collection_one = session
.client()
.database("mydb1")
.collection::<Document>("foo");
let collection_two = session
.client()
.database("mydb2")
.collection::<Document>("bar");
// Important: You must pass the session to the operations.
collection_one
.insert_one(doc! { "abc": 1 })
.session(&mut *session)
.await?;
collection_two
.insert_one(doc! { "xyz": 999 })
.session(session)
.await?;
Ok(())
}
// Step 2: Start a client session.
let mut session = client.start_session().await?;
// Step 3: Use and_run to start a transaction, execute the callback, and commit (or
// abort on error).
session
.start_transaction()
.and_run((), |session, _| callback(session).boxed())
.await?;

此示例使用了 核心API。由于核心API没有包含对TransientTransactionErrorUnknownTransactionCommitResult提交错误的重试逻辑,因此示例中包含了对这些错误的显式重试逻辑

重要

/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://apache.ac.cn/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}

提示

另请参阅

有关mongosh中的示例,请参阅mongosh示例。

对于需要原子性读取和写入多个文档(在单个或多个集合中)的情况,MongoDB支持分布式事务,包括副本集和分片集群的事务。

分布式事务是原子性的

  • 事务要么应用所有数据更改,要么回滚更改。

  • 如果事务提交,则事务中作出的所有数据更改都会保存,并且对事务外可见。

    但在事务提交之前,事务中作出的数据更改对事务外不可见。

    然而,当事务写入多个分片时,并非所有外部读取操作都需要等待已提交事务的结果在分片间可见。例如,如果事务已提交,且在分片A上可见写入1,但在分片B上写入2尚未可见,则在读取关注度"local"的外部读取可以读取写入1的结果,而不会看到写入2。

  • 当事务中止时,事务中作出的所有数据更改都会被丢弃,而永远不会变得可见。例如,如果事务中的任何操作失败,事务将中止,并且事务中作出的所有数据更改将永远不会变得可见。

重要

在大多数情况下,分布式事务比单文档写入产生更大的性能开销,因此分布式事务的可用性不应替代有效的模式设计。对于许多场景,非规范化数据模型(内嵌文档和数组)将继续是您数据和用例的最佳选择。也就是说,对于许多场景,适当的数据建模将最大限度地减少分布式事务的需求。

关于额外的交易使用考虑因素(例如运行时限制和oplog大小限制),请参阅生产注意事项。

提示

另请参阅

分布式事务可以跨越多个操作、集合、数据库、文档和分片使用。

关于事务

  • 您可以在事务中创建集合和索引。详细信息请参阅在事务中创建集合和索引

  • 事务中使用的集合可以位于不同的数据库中。

    注意

    您不能在跨分片写事务中创建新集合。例如,如果您在一个分片中对现有集合进行写入并隐式地在另一个分片中创建集合,MongoDB无法在同一个事务中执行这两个操作。

  • 您不能写入固定集合。

  • 您不能在从固定集合读取时使用读取关注点 "snapshot"。 (从MongoDB 5.0开始)

  • 您不能向configadminlocal数据库中的集合进行读写。

  • 您不能向system.*集合进行写入。

  • 您不能使用explain或类似命令返回受支持操作的查询计划。

  • 对于在事务外部创建的光标,您不能在事务内调用getMore

  • 对于在事务中创建的光标,您不能在事务外调用getMore

  • 您不能将killCursors命令指定为事务的第一个操作。

    此外,如果您在事务中运行killCursors命令,服务器将立即停止指定的光标。它不会等待事务提交。

关于不支持在事务中执行的操作列表,请参阅受限操作。

提示

在开始事务前立即创建或删除集合时,如果在事务中访问该集合,则在创建或删除操作中使用"majority"写关注度,以确保事务可以获取所需的锁。

提示

另请参阅

如果事务不是一个跨分片写事务,您可以在分布式事务中执行以下操作

  • 创建集合。

  • 在同一个事务中之前创建的新空集合上创建索引。

在事务中创建集合时

当在事务中[1]创建索引时,要创建的索引必须在以下之一

  • 一个不存在的集合上。集合将在操作过程中创建。

  • 在同一个事务中之前创建的新空集合上。

[1] 您还可以运行db.collection.createIndex()db.collection.createIndexes()来检查现有索引的存在。这些操作在成功返回时不会创建索引。
  • 您不能在跨分片写事务中创建新集合。例如,如果您在一个分片中对现有集合进行写入并隐式地在另一个分片中创建集合,MongoDB无法在同一个事务中执行这两个操作。

  • 当针对分片集合时,您不能在事务中使用$graphLookup阶段。

  • 要在事务中显式创建集合或索引,事务的读取关注级别必须是"local"

    要显式创建集合和索引,请使用以下命令和方法

    命令
    方法

提示

另请参阅

要在事务中执行计数操作,请使用$count聚合阶段或$group(带有$sum表达式)聚合阶段。

MongoDB驱动程序提供了一种集合级API countDocuments(filter, options) 作为辅助方法,该方法使用$group$sum表达式来执行计数。

mongosh 提供了 db.collection.countDocuments() 辅助方法,该方法使用$group$sum表达式来执行计数。

在事务中执行区分操作

  • 对于非分片集合,您可以使用 db.collection.distinct() 方法/ distinct 命令,以及带有 $group 阶段的聚合管道。

  • 对于分片集合,不能使用 db.collection.distinct() 方法或 distinct 命令。

    要找到分片集合的区分值,请使用带有 $group 阶段的聚合管道。例如

    • 而不是 db.coll.distinct("x"),使用

      db.coll.aggregate([
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])
    • 而不是 db.coll.distinct("x", { status: "A" }),使用

      db.coll.aggregate([
      { $match: { status: "A" } },
      { $group: { _id: null, distinctValues: { $addToSet: "$x" } } },
      { $project: { _id: 0 } }
      ])

    管道返回一个文档的光标

    { "distinctValues" : [ 2, 3, 1 ] }

    迭代光标以访问结果文档。

信息性命令,如 hellobuildInfoconnectionStatus(及其辅助方法)可以在事务中使用;然而,它们不能作为事务中的第一个操作。

以下操作在事务中不允许

  • 事务与会话相关联。

  • 会话最多只能有一个打开的事务。

  • 在使用驱动程序时,事务中的每个操作都必须与会话相关联。有关详细信息,请参阅您特定驱动程序的文档。

  • 如果会话结束且存在未完成的事务,则事务将中止。

事务中的操作使用事务级别的读取偏好。

使用驱动程序,您可以在事务开始时设置事务级别的读取偏好。

  • 如果未设置事务级别的读取偏好,则事务将使用会话级别的读取偏好。

  • 如果未设置事务级别和会话级别的读取偏好,则事务将使用客户端级别的读取偏好。默认情况下,客户端级别的读取偏好为primary

分布式事务中包含读取操作必须使用读取偏好primary。给定事务中的所有操作必须路由到同一成员。

事务中的操作使用事务级别的读取关注。这意味着在事务内部忽略集合和数据库级别的读取关注。

您可以在事务开始时设置事务级别的读取关注

  • 如果未设置事务级别的读取关注,则事务级别的读取关注默认为会话级别的读取关注。

  • 如果事务级别和会话级别的读取关注都没有设置,则事务级别的读取关注默认为客户端级别的读取关注。默认情况下,客户端级别的读取关注在主节点上的读取是 "local"。另请参阅

事务支持以下读取关注级别

  • 读取关注 "local" 返回从节点可用的最新数据,但可以被回滚。

  • 在副本集上,即使事务使用读取关注 local,您可能会观察到更强大的读取隔离,其中操作读取的是事务打开时的时间点的快照。

  • 对于分片集群上的事务,"local" 读取关注无法保证跨分片的数据来自相同的快照视图。如果需要快照隔离,请使用 "snapshot" 读取关注。

  • 在事务中您可以创建集合和索引。如果显式创建一个集合或索引,则事务必须使用读取关注点"local"。如果您隐式创建一个集合,您可以使用事务中可用的任何读取关注点。

  • 如果事务以写入关注点 "majority"提交,读取关注点"majority"返回已被大多数副本集成员确认的数据,并且无法回滚。否则,读取关注点"majority"不提供读取操作读取大多数已提交数据的任何保证。

  • 对于分片集群上的事务,读取关注点"majority"不能保证数据来自跨分片相同的快照视图。如果需要快照隔离,请使用读取关注点"snapshot"

事务使用事务级别的写关注来提交写操作。事务内的写操作必须在没有显式写关注指定的条件下运行,并使用默认写关注。在提交时,使用事务级别的写关注提交的写操作。

提示

不要在事务内的单个写操作中显式设置写关注。在事务内设置单个写操作的写关注将返回错误。

可以在事务开始时设置事务级别的写关注

  • 如果未设置事务级别的写关注,则事务级别的写关注默认为提交时的会话级别写关注。

  • 如果未设置事务级别的写关注和会话级别的写关注,则事务级别的写关注默认为客户端级别的写关注。

事务支持所有写关注 w 值,包括

  • 写关注 w: 1 在提交已应用到主节点后返回确认。

    重要

    当你使用 w: 1 提交时,如果你的事务在故障转移中,它可能被回滚。

  • 当你使用 w: 1 写关注提交时,事务级别的 "majority" 读取关注不能保证事务中的读取操作读取了大多数已提交的数据。

  • 当你使用 w: 1 写关注时,事务级 "snapshot" 读取关注无法保证事务中的读取操作使用了多数已提交数据的快照。

  • 写关注 w: "majority" 在提交应用到多数投票成员后返回确认。

  • 当你使用 w: "majority" 写关注进行提交时,事务级 "majority" 读取关注保证操作读取了多数已提交数据。对于分片集群上的事务,这种多数已提交数据的视图在分片中不是同步的。

  • 当你使用 w: "majority" 写关注进行提交时,事务级 "snapshot" 读取关注保证操作从同步的多数已提交数据快照中读取。

注意

无论指定了事务的写关注,分片集群事务的提交操作包括一些使用 {w: "majority", j: true} 写关注的部分。

服务器参数 coordinateCommitReturnImmediatelyAfterPersistingDecision 控制何时将事务提交决策返回给客户端。

该参数在MongoDB 5.0中引入,默认值为 true。在MongoDB 6.1中,默认值变为 false

coordinateCommitReturnImmediatelyAfterPersistingDecisionfalse 时,分片事务协调器等待所有成员确认 多文档事务 提交后,再将提交决策返回给客户端。

如果你为 "majority" 写关注的多文档事务指定写关注,并且事务无法复制到计算出的多数 副本集 成员,则事务可能不会立即在副本集成员上回滚。副本集最终会保持 最终一致性。事务始终应用于或回滚到所有副本集成员。

无论为事务指定的写关注是什么,当重试w: "majority"时,驱动程序都将应用事务的写关注作为写关注。

以下部分描述了事务的更多考虑因素。

有关生产环境中的事务,请参阅生产考虑因素。此外,有关分片集群,请参阅生产考虑因素(分片集群)

如果副本集有仲裁者,则无法使用事务更改分片键。仲裁者无法参与多分片事务所需的数据操作。

跨越多个分片的写操作将导致错误并中止,如果任何事务操作从一个包含仲裁员的分片读取或写入。

不能在具有以下设置的sharded集群上运行事务:将 writeConcernMajorityJournalDefault 设置为 false(例如具有使用 内存存储引擎 的投票成员的分片)。

注意

无论指定了事务的写关注,分片集群事务的提交操作包括一些使用 {w: "majority", j: true} 写关注的部分。

要获取事务状态和指标,请使用以下方法

来源
返回

返回 事务 指标。

某些 serverStatus 响应字段在 MongoDB Atlas M0/M2/M5 集群上不会返回。有关更多信息,请参阅 MongoDB Atlas 文档中的 受限命令

$currentOp 聚合管道

返回

返回

mongodmongos 日志消息
包括有关慢事务(超过 operationProfiling.slowOpThresholdMs 阈值的操作)在 TXN 日志组件中的信息。

要使用事务,部署中所有成员的 featureCompatibilityVersion 必须至少为

部署
最小 featureCompatibilityVersion
副本集
4.0
分片集群
4.2

要检查成员的 fCV,连接到该成员并运行以下命令

db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )

有关更多信息,请参阅 setFeatureCompatibilityVersion 参考页面。

分布式事务 在副本集和分片集群中受支持,其中

  • 主节点使用 WiredTiger 存储引擎,并且

  • 二级成员可以使用WiredTiger存储引擎或内存存储引擎。

注意

无法在设置了writeConcernMajorityJournalDefaultfalse的分片集群上运行事务,例如使用了内存存储引擎的投票成员所在的分片。

从MongoDB 5.2(以及5.0.4)开始

返回

更新