阅读隔离性、一致性和时效性
隔离保证
阅读未提交
根据读取关注点,客户端可以在写入之前看到写入的结果耐用性:
无论写入的写入关注点如何,使用
"local"
或"available"
读取关注点的客户端可以在写入操作得到客户端确认之前看到写入操作的结果。使用
"local"
或"available"
读取关注点的客户端可以读取在副本集故障转移期间可能会回滚的数据。
在多文档事务操作中,当事务提交时,事务中进行的所有数据更改都会被保存并在事务外部可见。也就是说,事务不会在回滚一些更改的同时提交其他更改。
直到事务提交,事务中进行的更改在事务外部不可见。
然而,当事务写入多个分片时,并非所有外部读取操作都需要等待已提交事务的结果在所有分片上可见。例如,如果事务已提交且在分片A上可见写入1,但在分片B上写入2尚未可见,则在外部读取时,读取关注点为"local"
的情况下可以读取写入1的结果,而不会看到写入2。
读取未提交是默认的隔离级别,并适用于mongod
独立实例以及复制集和分片集群。
读取未提交和单文档原子性
写入操作在单个文档级别上是原子的;也就是说,如果一个写入正在更新文档中的多个字段,则读取操作永远不会看到只更新了部分字段的文档。但是,尽管客户端可能看不到一个部分更新的文档,读取未提交意味着并发读取操作可能仍然会在更改变得持久化之前看到更新的文档。
在独立mongod
实例中,对单个文档的一系列读取和写入操作是可序列化的。在复制集中,对单个文档的一系列读取和写入操作在没有回滚的情况下才是可序列化的。
读取未提交的数据和多文档写入
当单个写入操作(例如 db.collection.updateMany()
)修改多个文档时,每个文档的修改是原子的,但整个操作不是原子的。
当执行多文档写入操作时,无论是通过单个写入操作还是多个写入操作,其他操作可能会交错。
对于需要多个文档(在单个或多个集合中)读写原子的场景,MongoDB 支持分布式事务,包括副本集和分片集群的事务。
有关更多信息,请参阅 事务
重要
在大多数情况下,分布式事务相较于单文档写入会带来更高的性能开销,分布式事务的可用性不应替代有效的模式设计。对于许多场景,非规范化数据模型(嵌入文档和数组)将仍然适用于您的数据和用例。也就是说,对于许多场景,适当的数据建模将最大限度地减少对分布式事务的需求。
有关其他事务使用考虑因素(例如运行时限制和oplog大小限制),请参阅 生产注意事项
如果不隔离多文档写入操作,MongoDB 会表现出以下行为
非点时间读操作。假设一个读操作从时间 t1 开始并开始读取文档。然后在某个后续时间 t2,一个写操作提交了对其中一个文档的更新。读者可能看到文档的更新版本,因此看不到数据的点时间快照。
不可序列化操作。假设一个读取操作在时间 t 1 读取文档 d 1,而一个写入操作在稍后的时间 t 3 更新 d 1。这引入了一个读写依赖,如果操作需要序列化,则读取操作必须先于写入操作。但是,假设写入操作在时间 t 2 更新文档 d 2,而读取操作随后在稍后的时间 t 4 读取 d 2。这引入了一个写后读依赖,在可序列化调度中,读取操作需要紧跟在写入操作之后。存在一个依赖循环,使得可序列化变得不可能。
读取操作可能会错过在读取操作过程中更新的匹配文档。
游标快照
在某些情况下,MongoDB 游标可能会多次返回相同的文档。当游标返回文档时,其他操作可能会与查询交织。如果这些操作中有一个改变了查询所使用的索引上的索引字段,则游标可能会多次返回相同的文档。
使用 唯一索引 的查询,在某些情况下,可能会返回重复值。如果使用唯一索引的游标与共享相同唯一值的文档的删除和插入交织,则游标可能会从不同的文档中两次返回相同的唯一值。
考虑使用读取隔离。了解更多信息,请参阅 读取关注点 "快照"
。
单调写入
MongoDB 默认为独立 mongod
实例和副本集提供单调写入保证。
有关单调写入和分片集群的信息,请参阅因果一致性.
实时订单
在主文档上执行读写操作时,使用"linearizable"
读关注和"majority"
写关注,可以使多个线程同时对单个文档进行读写,就像一个线程实时执行这些操作一样;也就是说,这些读写操作的对应调度被认为是可线性化的。
因果一致性
如果一个操作在逻辑上依赖于先前的操作,则操作之间存在因果关系。例如,基于指定条件删除所有文档的写操作和随后的验证删除操作的读操作之间存在因果关系。
在因果一致性会话中,MongoDB按照尊重它们因果关系的顺序执行因果操作,并且客户端观察到与因果关系一致的结果。
客户端会话和因果一致性保证
为了提供因果一致性,MongoDB在客户端会话中启用了因果一致性。因果一致性的会话表示,与具有"majority"
读取关注度和具有"majority"
写入关注度的写入操作相关联的读取操作序列具有因果关系,这种因果关系由它们的顺序表示。应用程序必须确保一次只有一个线程在客户端会话中执行这些操作。
对于因果相关的操作
客户端启动一个客户端会话。
重要
客户端会话只能保证以下操作的因果一致性
具有
"majority"
读取关注度的读取操作;即返回的数据已被大多数副本集成员确认且是持久的。具有
"majority"
写入关注度的写入操作;即请求确认操作已应用于大多数副本集的投票成员。
有关因果一致性和各种读取和写入关注度的更多信息,请参阅因果一致性和读取和写入关注。
当客户端发出一系列具有
"majority"
读取关注度和"majority"
写入关注度的写入操作时,客户端将在每个操作中包含会话信息。对于与具有
"majority"
读取关注度和具有"majority"
写入关注度的写入操作相关的每个读取操作,即使操作出错,MongoDB也会返回操作时间和集群时间。客户端会话会跟踪操作时间和集群时间。注意
MongoDB不会为未确认的(
w: 0
)写入操作返回操作时间和集群时间。未确认的写入不表示任何因果关系。尽管MongoDB在客户端会话中返回读取操作和已确认写入操作的操作时间和集群时间,但只有具有
"majority"
读取关注度和具有"majority"
写入关注度的读取操作和写入操作可以保证因果一致性。有关详细信息,请参阅因果一致性和读取和写入关注。相关的客户端会话跟踪这两个时间字段。
注意
操作可以在不同的会话中保持因果一致性。MongoDB 驱动程序和
mongosh
提供了用于推进客户端会话的操作时间和集群时间的方法。因此,客户端可以将一个客户端会话的集群时间和操作时间推进,以与其他客户端会话的操作保持一致。
因果一致性保证
以下表格列出了因果一致性会话为具有 "majority"
读取关注度和具有 "majority"
写关注度的写操作提供的因果一致性保证。
保证 | 描述 |
---|---|
读取你的写入 | 读取操作反映了在其之前的写操作的结果。 |
单调读取 | 读取操作不会返回早于先前读取操作的数据状态的结果。 例如,在一个会话中
那么读 2 不能返回写 1 的结果。 |
单调写入 | 必须先于其他写入操作的写入操作在那些其他写入操作之前执行。 例如,如果一个会话中的写 1 必须在写 2 之前,那么写 2 时的数据状态必须反映写 1 之后的态。其他写入可以在写 1 和写 2 之间交错,但写 2 不能在写 1 之前发生。 |
写入跟随读取 | 必须在读取操作之后发生的写入操作在那些读取操作之后执行。也就是说,写入时的数据状态必须包含先前读取操作的数据状态。 |
读取优先级
这些保证适用于MongoDB部署的所有成员。例如,在一个因果一致会话中,如果你先发出一个带有 "majority"
写关注度的写操作,然后执行一个从二级副本(即读取偏好 secondary
)读取并带有 "majority"
读取关注度的读操作,那么读操作将反映写操作之后的数据库状态。
隔离
因果一致会话内的操作与该会话外的操作不是隔离的。如果会话的写和读操作之间有并发写操作交错,那么会话的读操作可能会返回反映在会话写操作之后发生的写操作的结果。
MongoDB 驱动
提示
应用程序必须确保一次只有一个线程在客户端会话中执行这些操作。
客户端需要更新为MongoDB 3.6或更高版本的MongoDB驱动程序
Java 3.6+ Python 3.6+ C 1.9+ Go 1.8+ | C# 2.5+ Node 3.0+ Ruby 2.5+ Rust 2.1+ Swift 1.2+ | Perl 2.0+ PHPC 1.4+ Scala 2.2+ C++ 3.6.6+ |
示例
重要
因果一致会话只能保证具有 "majority"
读取关注和 "majority"
写入关注的读取操作和写入操作的因果一致性。
考虑一个名为 items
的集合,该集合维护各种项目的当前和历史数据。只有历史数据有非空的 end
日期。如果一个项目的 sku
值发生变化,则具有旧 sku
值的文档需要更新 end
日期,之后插入具有当前 sku
值的新文档。客户端可以使用因果一致会话来确保更新发生在插入之前。
➤使用右上角的 选择您的语言 下拉菜单设置此示例的语言。
/* Use a causally-consistent session to run some operations. */ wc = mongoc_write_concern_new (); mongoc_write_concern_set_wmajority (wc, 1000); mongoc_collection_set_write_concern (coll, wc); rc = mongoc_read_concern_new (); mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_MAJORITY); mongoc_collection_set_read_concern (coll, rc); session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_causal_consistency (session_opts, true); session1 = mongoc_client_start_session (client, session_opts, &error); if (!session1) { fprintf (stderr, "couldn't start session: %s\n", error.message); goto cleanup; } /* Run an update_one with our causally-consistent session. */ update_opts = bson_new (); res = mongoc_client_session_append (session1, update_opts, &error); if (!res) { fprintf (stderr, "couldn't add session to opts: %s\n", error.message); goto cleanup; } query = BCON_NEW ("sku", "111"); update = BCON_NEW ("$set", "{", "end", BCON_DATE_TIME (bson_get_monotonic_time ()), "}"); res = mongoc_collection_update_one (coll, query, update, update_opts, NULL, /* reply */ &error); if (!res) { fprintf (stderr, "update failed: %s\n", error.message); goto cleanup; } /* Run an insert with our causally-consistent session */ insert_opts = bson_new (); res = mongoc_client_session_append (session1, insert_opts, &error); if (!res) { fprintf (stderr, "couldn't add session to opts: %s\n", error.message); goto cleanup; } insert = BCON_NEW ("sku", "nuts-111", "name", "Pecans", "start", BCON_DATE_TIME (bson_get_monotonic_time ())); res = mongoc_collection_insert_one (coll, insert, insert_opts, NULL, &error); if (!res) { fprintf (stderr, "insert failed: %s\n", error.message); goto cleanup; }
using (var session1 = client.StartSession(new ClientSessionOptions { CausalConsistency = true })) { var currentDate = DateTime.UtcNow.Date; var items = client.GetDatabase( "test", new MongoDatabaseSettings { ReadConcern = ReadConcern.Majority, WriteConcern = new WriteConcern( WriteConcern.WMode.Majority, TimeSpan.FromMilliseconds(1000)) }) .GetCollection<BsonDocument>("items"); items.UpdateOne(session1, Builders<BsonDocument>.Filter.And( Builders<BsonDocument>.Filter.Eq("sku", "111"), Builders<BsonDocument>.Filter.Eq("end", BsonNull.Value)), Builders<BsonDocument>.Update.Set("end", currentDate)); items.InsertOne(session1, new BsonDocument { {"sku", "nuts-111"}, {"name", "Pecans"}, {"start", currentDate} }); }
// Example 1: Use a causally consistent session to ensure that the update occurs before the insert. ClientSession session1 = client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build()); Date currentDate = new Date(); MongoCollection<Document> items = client.getDatabase("test") .withReadConcern(ReadConcern.MAJORITY) .withWriteConcern(WriteConcern.MAJORITY.withWTimeout(1000, TimeUnit.MILLISECONDS)) .getCollection("test"); items.updateOne(session1, eq("sku", "111"), set("end", currentDate)); Document document = new Document("sku", "nuts-111") .append("name", "Pecans") .append("start", currentDate); items.insertOne(session1, document);
async with await client.start_session(causal_consistency=True) as s1: current_date = datetime.datetime.today() items = client.get_database( "test", read_concern=ReadConcern("majority"), write_concern=WriteConcern("majority", wtimeout=1000), ).items await items.update_one( {"sku": "111", "end": None}, {"$set": {"end": current_date}}, session=s1 ) await items.insert_one( {"sku": "nuts-111", "name": "Pecans", "start": current_date}, session=s1 )
my $s1 = $conn->start_session({ causalConsistency => 1 }); $items = $conn->get_database( "test", { read_concern => { level => 'majority' }, write_concern => { w => 'majority', wtimeout => 10000 }, } )->get_collection("items"); $items->update_one( { sku => 111, end => undef }, { '$set' => { end => $current_date} }, { session => $s1 } ); $items->insert_one( { sku => "nuts-111", name => "Pecans", start => $current_date }, { session => $s1 } );
$items = $client->selectDatabase( 'test', [ 'readConcern' => new \MongoDB\Driver\ReadConcern(\MongoDB\Driver\ReadConcern::MAJORITY), 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->items; $s1 = $client->startSession( ['causalConsistency' => true], ); $currentDate = new \MongoDB\BSON\UTCDateTime(); $items->updateOne( ['sku' => '111', 'end' => ['$exists' => false]], ['$set' => ['end' => $currentDate]], ['session' => $s1], ); $items->insertOne( ['sku' => '111-nuts', 'name' => 'Pecans', 'start' => $currentDate], ['session' => $s1], );
with client.start_session(causal_consistency=True) as s1: current_date = datetime.datetime.today() items = client.get_database( "test", read_concern=ReadConcern("majority"), write_concern=WriteConcern("majority", wtimeout=1000), ).items items.update_one( {"sku": "111", "end": None}, {"$set": {"end": current_date}}, session=s1 ) items.insert_one( {"sku": "nuts-111", "name": "Pecans", "start": current_date}, session=s1 )
let s1 = client1.startSession(options: ClientSessionOptions(causalConsistency: true)) let currentDate = Date() var dbOptions = MongoDatabaseOptions( readConcern: .majority, writeConcern: try .majority(wtimeoutMS: 1000) ) let items = client1.db("test", options: dbOptions).collection("items") let result1 = items.updateOne( filter: ["sku": "111", "end": .null], update: ["$set": ["end": .datetime(currentDate)]], session: s1 ).flatMap { _ in items.insertOne(["sku": "nuts-111", "name": "Pecans", "start": .datetime(currentDate)], session: s1) }
let s1 = client1.startSession(options: ClientSessionOptions(causalConsistency: true)) let currentDate = Date() var dbOptions = MongoDatabaseOptions( readConcern: .majority, writeConcern: try .majority(wtimeoutMS: 1000) ) let items = client1.db("test", options: dbOptions).collection("items") try items.updateOne( filter: ["sku": "111", "end": .null], update: ["$set": ["end": .datetime(currentDate)]], session: s1 ) try items.insertOne(["sku": "nuts-111", "name": "Pecans", "start": .datetime(currentDate)], session: s1)
如果另一个客户端需要读取所有当前 sku
值,可以将集群时间和操作时间向前推进到另一个会话的时间,以确保此客户端与另一个会话因果一致,并在两次写入之后读取。
/* Make a new session, session2, and make it causally-consistent * with session1, so that session2 will read session1's writes. */ session2 = mongoc_client_start_session (client, session_opts, &error); if (!session2) { fprintf (stderr, "couldn't start session: %s\n", error.message); goto cleanup; } /* Set the cluster time for session2 to session1's cluster time */ cluster_time = mongoc_client_session_get_cluster_time (session1); mongoc_client_session_advance_cluster_time (session2, cluster_time); /* Set the operation time for session2 to session2's operation time */ mongoc_client_session_get_operation_time (session1, ×tamp, &increment); mongoc_client_session_advance_operation_time (session2, timestamp, increment); /* Run a find on session2, which should now find all writes done * inside of session1 */ find_opts = bson_new (); res = mongoc_client_session_append (session2, find_opts, &error); if (!res) { fprintf (stderr, "couldn't add session to opts: %s\n", error.message); goto cleanup; } find_query = BCON_NEW ("end", BCON_NULL); read_prefs = mongoc_read_prefs_new (MONGOC_READ_SECONDARY); cursor = mongoc_collection_find_with_opts (coll, query, find_opts, read_prefs); while (mongoc_cursor_next (cursor, &result)) { json = bson_as_relaxed_extended_json (result, NULL); fprintf (stdout, "Document: %s\n", json); bson_free (json); } if (mongoc_cursor_error (cursor, &error)) { fprintf (stderr, "cursor failure: %s\n", error.message); goto cleanup; }
using (var session2 = client.StartSession(new ClientSessionOptions { CausalConsistency = true })) { session2.AdvanceClusterTime(session1.ClusterTime); session2.AdvanceOperationTime(session1.OperationTime); var items = client.GetDatabase( "test", new MongoDatabaseSettings { ReadPreference = ReadPreference.Secondary, ReadConcern = ReadConcern.Majority, WriteConcern = new WriteConcern(WriteConcern.WMode.Majority, TimeSpan.FromMilliseconds(1000)) }) .GetCollection<BsonDocument>("items"); var filter = Builders<BsonDocument>.Filter.Eq("end", BsonNull.Value); foreach (var item in items.Find(session2, filter).ToEnumerable()) { // process item } }
// Example 2: Advance the cluster time and the operation time to that of the other session to ensure that // this client is causally consistent with the other session and read after the two writes. ClientSession session2 = client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build()); session2.advanceClusterTime(session1.getClusterTime()); session2.advanceOperationTime(session1.getOperationTime()); items = client.getDatabase("test") .withReadPreference(ReadPreference.secondary()) .withReadConcern(ReadConcern.MAJORITY) .withWriteConcern(WriteConcern.MAJORITY.withWTimeout(1000, TimeUnit.MILLISECONDS)) .getCollection("items"); for (Document item: items.find(session2, eq("end", BsonNull.VALUE))) { System.out.println(item); }
async with await client.start_session(causal_consistency=True) as s2: s2.advance_cluster_time(s1.cluster_time) s2.advance_operation_time(s1.operation_time) items = client.get_database( "test", read_preference=ReadPreference.SECONDARY, read_concern=ReadConcern("majority"), write_concern=WriteConcern("majority", wtimeout=1000), ).items async for item in items.find({"end": None}, session=s2): print(item)
my $s2 = $conn->start_session({ causalConsistency => 1 }); $s2->advance_cluster_time( $s1->cluster_time ); $s2->advance_operation_time( $s1->operation_time ); $items = $conn->get_database( "test", { read_preference => 'secondary', read_concern => { level => 'majority' }, write_concern => { w => 'majority', wtimeout => 10000 }, } )->get_collection("items"); $cursor = $items->find( { end => undef }, { session => $s2 } ); for my $item ( $cursor->all ) { say join(" ", %$item); }
$s2 = $client->startSession( ['causalConsistency' => true], ); $s2->advanceClusterTime($s1->getClusterTime()); $s2->advanceOperationTime($s1->getOperationTime()); $items = $client->selectDatabase( 'test', [ 'readPreference' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::SECONDARY), 'readConcern' => new \MongoDB\Driver\ReadConcern(\MongoDB\Driver\ReadConcern::MAJORITY), 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->items; $result = $items->find( ['end' => ['$exists' => false]], ['session' => $s2], ); foreach ($result as $item) { var_dump($item); }
with client.start_session(causal_consistency=True) as s2: s2.advance_cluster_time(s1.cluster_time) s2.advance_operation_time(s1.operation_time) items = client.get_database( "test", read_preference=ReadPreference.SECONDARY, read_concern=ReadConcern("majority"), write_concern=WriteConcern("majority", wtimeout=1000), ).items for item in items.find({"end": None}, session=s2): print(item)
let options = ClientSessionOptions(causalConsistency: true) let result2: EventLoopFuture<Void> = client2.withSession(options: options) { s2 in // The cluster and operation times are guaranteed to be non-nil since we already used s1 for operations above. s2.advanceClusterTime(to: s1.clusterTime!) s2.advanceOperationTime(to: s1.operationTime!) dbOptions.readPreference = .secondary let items2 = client2.db("test", options: dbOptions).collection("items") return items2.find(["end": .null], session: s2).flatMap { cursor in cursor.forEach { item in print(item) } } }
try client2.withSession(options: ClientSessionOptions(causalConsistency: true)) { s2 in // The cluster and operation times are guaranteed to be non-nil since we already used s1 for operations above. s2.advanceClusterTime(to: s1.clusterTime!) s2.advanceOperationTime(to: s1.operationTime!) dbOptions.readPreference = .secondary let items2 = client2.db("test", options: dbOptions).collection("items") for item in try items2.find(["end": .null], session: s2) { print(item) } }
限制
以下构建内存结构的操作不是因果一致的