SERVER-120304 Snapshot time may advance before fetching a command's operationTime (redux) (#53614)
Co-authored-by: Billy Donahue <BillyDonahue@users.noreply.github.com> GitOrigin-RevId: 1d30ac3d43f7352ab372a6f26458049f382a50fa
This commit is contained in:
parent
2c288ee479
commit
4cd1eadf98
184
jstests/replsets/operation_time_majority_read_race.js
Normal file
184
jstests/replsets/operation_time_majority_read_race.js
Normal file
@ -0,0 +1,184 @@
|
||||
/**
|
||||
* Ensures that the operationTime for majority reads is reported as the actual read timestamp.
|
||||
* A previous bug occurred because getCurrentCommittedSnapshotOpTime() can advance on another
|
||||
* thread between when the read executes and when operationTime is computed, causing the response
|
||||
* to report a timestamp newer than what the read actually observed.
|
||||
*
|
||||
* This can cause real data inconsistency via causal consistency: a client uses operationTime as
|
||||
* afterClusterTime on a follow-up majority read. With the bug, operationTime is advanced past a
|
||||
* concurrent update the original read never saw, so the follow-up is required to show that update.
|
||||
* From the client's perspective, a field appears to change between two consecutive reads with no
|
||||
* intervening write on the client's part.
|
||||
*
|
||||
* This test uses the hangBeforeComputeOperationTimeForMajorityRead failpoint to block the read
|
||||
* command before it computes operationTime, allowing the committed snapshot to advance via a
|
||||
* concurrent update. With the bug, operationTime is reported as >= the update commit timestamp
|
||||
* even though the read's snapshot only included the pre-update value.
|
||||
*
|
||||
* @tags: [
|
||||
* requires_majority_read_concern,
|
||||
* requires_fcv_90,
|
||||
* uses_transactions,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js";
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
|
||||
const dbName = jsTestName();
|
||||
const collectionName = "coll";
|
||||
|
||||
const rst = new ReplSetTest({name: dbName, nodes: 2});
|
||||
rst.startSet();
|
||||
rst.initiate();
|
||||
|
||||
const primary = rst.getPrimary();
|
||||
const testDB = primary.getDB(dbName);
|
||||
const fullNs = `${dbName}.${collectionName}`;
|
||||
|
||||
// Establish a baseline: insert a document and wait for replication.
|
||||
assert.commandWorked(
|
||||
testDB.runCommand({
|
||||
insert: collectionName,
|
||||
documents: [{_id: 1, x: 1}],
|
||||
writeConcern: {w: "majority"},
|
||||
}),
|
||||
);
|
||||
rst.awaitReplication();
|
||||
|
||||
// Configure the failpoint to block our find before it computes operationTime.
|
||||
// The failpoint must be namespace-specific to avoid affecting other tests.
|
||||
const failpoint = configureFailPoint(primary, "hangBeforeComputeOperationTimeForMajorityRead", {
|
||||
ns: fullNs,
|
||||
});
|
||||
|
||||
// Run a majority read in a parallel shell. It will block at the failpoint after
|
||||
// executing the read but before computing operationTime. Writes the result to a
|
||||
// collection since startParallelShell returns the exit code, not the function's return value.
|
||||
const resultCollName = "operation_time_race_result";
|
||||
function runMajorityFind(host, dbName, collectionName, resultCollName) {
|
||||
const m = new Mongo(host).getDB(dbName);
|
||||
const res = assert.commandWorked(
|
||||
m.runCommand({
|
||||
find: collectionName,
|
||||
filter: {},
|
||||
readConcern: {level: "majority"},
|
||||
}),
|
||||
);
|
||||
// Write result to collection so main thread can read it (parallel shell returns exit code only).
|
||||
m.getCollection(resultCollName).drop();
|
||||
m.getCollection(resultCollName).insertOne({
|
||||
operationTime: res.operationTime,
|
||||
firstBatch: res.cursor.firstBatch,
|
||||
});
|
||||
}
|
||||
|
||||
const parallelResult = startParallelShell(
|
||||
funWithArgs(runMajorityFind, primary.host, dbName, collectionName, resultCollName),
|
||||
primary.port,
|
||||
);
|
||||
|
||||
// Wait for the failpoint to be hit (the find has executed the read and is about to compute
|
||||
// operationTime).
|
||||
failpoint.wait();
|
||||
|
||||
// While the find is blocked, update the document to advance the committed snapshot.
|
||||
// The read's snapshot was established before this update, so the read observed x=1.
|
||||
// With the bug, computeOperationTime will pick up this newer committed snapshot and
|
||||
// report operationTime >= the update timestamp, even though the read never saw x=2.
|
||||
const updateRes = assert.commandWorked(
|
||||
testDB.runCommand({
|
||||
update: collectionName,
|
||||
updates: [{q: {_id: 1}, u: {$set: {x: 2}}}],
|
||||
writeConcern: {w: "majority"},
|
||||
}),
|
||||
);
|
||||
const writeCommitTimestamp = updateRes.operationTime;
|
||||
|
||||
rst.awaitReplication();
|
||||
|
||||
// Verify the update has advanced the committed snapshot.
|
||||
const statusAfterWrite = assert.commandWorked(primary.adminCommand({replSetGetStatus: 1}));
|
||||
assert.gte(
|
||||
timestampCmp(statusAfterWrite.optimes.lastCommittedOpTime.ts, writeCommitTimestamp),
|
||||
0,
|
||||
"Update should have advanced the committed snapshot: lastCommittedOpTime (" +
|
||||
tojson(statusAfterWrite.optimes.lastCommittedOpTime.ts) +
|
||||
") should be >= update commit timestamp (" +
|
||||
tojson(writeCommitTimestamp) +
|
||||
")",
|
||||
);
|
||||
|
||||
const commitPointAfterUpdate = writeCommitTimestamp;
|
||||
|
||||
// Release the failpoint so the find can complete.
|
||||
failpoint.off();
|
||||
|
||||
parallelResult();
|
||||
|
||||
// Read the result that the parallel shell wrote to the collection.
|
||||
const findResult = testDB.getCollection(resultCollName).findOne();
|
||||
assert(findResult, "Parallel shell should have written find result to " + resultCollName);
|
||||
|
||||
// The read executed before the update, so it must have seen x=1.
|
||||
const firstBatch = findResult.firstBatch ?? [];
|
||||
assert.eq(firstBatch.length, 1);
|
||||
assert.eq(firstBatch[0]._id, 1);
|
||||
assert.eq(
|
||||
firstBatch[0].x,
|
||||
1,
|
||||
"Majority read must see x=1: the snapshot was established before " + "the update to x=2 and should not reflect it",
|
||||
);
|
||||
|
||||
// Demonstrate the causal inconsistency by reading at exactly operationTime using atClusterTime
|
||||
// in a snapshot transaction. If operationTime correctly reflects the read's snapshot (with fix),
|
||||
// reading at that exact timestamp must also return x=1. With the bug, operationTime is advanced
|
||||
// to the update's commit timestamp, so reading at atClusterTime=operationTime returns x=2 —
|
||||
// inconsistent with the x=1 the majority read returned at "the same" timestamp.
|
||||
assert(findResult.operationTime !== undefined, "Find result should include operationTime");
|
||||
const session = primary.startSession({causalConsistency: false});
|
||||
session.startTransaction({readConcern: {level: "snapshot", atClusterTime: findResult.operationTime}});
|
||||
const followUpResult = assert.commandWorked(
|
||||
session.getDatabase(dbName).runCommand({find: collectionName, filter: {_id: 1}}),
|
||||
);
|
||||
const followUpX = followUpResult.cursor.firstBatch[0].x;
|
||||
session.abortTransaction();
|
||||
session.endSession();
|
||||
assert.eq(
|
||||
followUpX,
|
||||
1,
|
||||
"Reading at atClusterTime=" +
|
||||
tojson(findResult.operationTime) +
|
||||
" (operationTime from the majority read) returned x=" +
|
||||
followUpX +
|
||||
" but the majority read returned x=1. If operationTime correctly reflects the read's" +
|
||||
" snapshot, reading at that exact timestamp must also return x=1. With the bug," +
|
||||
" operationTime is advanced to the update timestamp (" +
|
||||
tojson(commitPointAfterUpdate) +
|
||||
"), so reading at atClusterTime=operationTime" +
|
||||
" returns the post-update x=2.",
|
||||
);
|
||||
|
||||
// BUG ASSERTION: operationTime must be < the update commit timestamp.
|
||||
// The read saw x=1 (pre-update), so operationTime must reflect a snapshot before the update.
|
||||
// With the bug, operationTime is advanced to the update timestamp, making the follow-up read
|
||||
// return x=2 even though the first read returned x=1 — a causal inconsistency from the
|
||||
// client's perspective.
|
||||
assert.lt(
|
||||
timestampCmp(findResult.operationTime, commitPointAfterUpdate),
|
||||
0,
|
||||
"operationTime (" +
|
||||
tojson(findResult.operationTime) +
|
||||
") must be less than the update commit timestamp (" +
|
||||
tojson(commitPointAfterUpdate) +
|
||||
"). The read returned x=1 (pre-update), so operationTime must precede the update. " +
|
||||
"With the bug, a causally-consistent follow-up read returns x=" +
|
||||
followUpX +
|
||||
" while the original read returned x=" +
|
||||
firstBatch[0].x +
|
||||
", making x appear to change between two consecutive reads with no client write.",
|
||||
);
|
||||
|
||||
testDB.getCollection(resultCollName).drop();
|
||||
rst.stopSet();
|
||||
@ -2386,6 +2386,7 @@ mongo_cc_library(
|
||||
"//src/mongo/db/commands/server_status:server_status_core",
|
||||
"//src/mongo/db/repl:repl_server_parameters",
|
||||
"//src/mongo/db/repl:replica_set_messages",
|
||||
"//src/mongo/db/rss:disable_snapshotting_fail_point",
|
||||
"//src/mongo/db/s:sharding_runtime_d",
|
||||
"//src/mongo/db/session:session_catalog_mongod",
|
||||
"//src/mongo/db/shard_role/lock_manager",
|
||||
|
||||
@ -42,3 +42,15 @@ server_parameters:
|
||||
cpp_varname: waitForSecondaryBeforeNoopWriteMS
|
||||
default: 10
|
||||
redact: false
|
||||
|
||||
testingSnapshotBehaviorInIsolation:
|
||||
description: >-
|
||||
Allows for testing of snapshot behavior by skipping the replication
|
||||
related checks and isolating the storage/query side of snapshotting.
|
||||
set_at: startup
|
||||
cpp_vartype: bool
|
||||
cpp_varname: gTestingSnapshotBehaviorInIsolation
|
||||
default: false
|
||||
redact: false
|
||||
test_only: true
|
||||
mod_visibility: public_for_technical_reasons
|
||||
|
||||
@ -1436,6 +1436,7 @@ mongo_cc_library(
|
||||
":local_oplog_info",
|
||||
"//src/mongo/db:common",
|
||||
"//src/mongo/db:mongod_options",
|
||||
"//src/mongo/db:read_concern_d_impl",
|
||||
"//src/mongo/db:server_base",
|
||||
"//src/mongo/db:service_context",
|
||||
"//src/mongo/db/commands:mongod_fcv",
|
||||
@ -1444,6 +1445,7 @@ mongo_cc_library(
|
||||
"//src/mongo/db/index_builds:index_builds_coordinator",
|
||||
"//src/mongo/db/repl/initial_sync:initial_syncer",
|
||||
"//src/mongo/db/repl/split_horizon",
|
||||
"//src/mongo/db/rss:disable_snapshotting_fail_point",
|
||||
"//src/mongo/db/session:kill_sessions_local",
|
||||
"//src/mongo/db/session:session_catalog",
|
||||
"//src/mongo/db/shard_role/lock_manager",
|
||||
|
||||
@ -59,6 +59,7 @@
|
||||
#include "mongo/db/index_builds/commit_quorum_options.h"
|
||||
#include "mongo/db/logical_time.h"
|
||||
#include "mongo/db/mongod_options_storage_gen.h"
|
||||
#include "mongo/db/read_concern_mongod_gen.h"
|
||||
#include "mongo/db/read_write_concern_defaults.h"
|
||||
#include "mongo/db/repl/always_allow_non_local_writes.h"
|
||||
#include "mongo/db/repl/check_quorum_for_config_change.h"
|
||||
@ -94,6 +95,7 @@
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_enabled.h"
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_manager.h"
|
||||
#include "mongo/db/replication_state_transition_lock_guard.h"
|
||||
#include "mongo/db/rss/disable_snapshotting_fail_point.h"
|
||||
#include "mongo/db/rss/replicated_storage_service.h"
|
||||
#include "mongo/db/server_feature_flags_gen.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
@ -5145,8 +5147,6 @@ OpTime ReplicationCoordinatorImpl::_recalculateStableOpTime(WithLock lk) {
|
||||
return stableOpTime;
|
||||
}
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(disableSnapshotting);
|
||||
|
||||
void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
|
||||
// Don't update the stable optime if we are in initial sync. We advance the oldest timestamp
|
||||
// continually to the lastApplied optime during initial sync oplog application, so if we learned
|
||||
@ -5701,7 +5701,7 @@ bool ReplicationCoordinatorImpl::_updateCommittedSnapshot(WithLock lk,
|
||||
invariant(newCommittedSnapshot.getTimestamp() >= _currentCommittedSnapshot->getTimestamp());
|
||||
invariant(newCommittedSnapshot >= *_currentCommittedSnapshot);
|
||||
}
|
||||
if (MONGO_unlikely(disableSnapshotting.shouldFail()))
|
||||
if (MONGO_unlikely(mongo::disableSnapshotting.shouldFail()))
|
||||
return false;
|
||||
_currentCommittedSnapshot = newCommittedSnapshot;
|
||||
_currentCommittedSnapshotCond.notify_all();
|
||||
|
||||
@ -59,14 +59,3 @@ server_parameters:
|
||||
validator:
|
||||
gte: 0
|
||||
redact: false
|
||||
|
||||
testingSnapshotBehaviorInIsolation:
|
||||
description: >-
|
||||
Allows for testing of snapshot behavior by skipping the replication
|
||||
related checks and isolating the storage/query side of snapshotting.
|
||||
set_at: startup
|
||||
cpp_vartype: bool
|
||||
cpp_varname: gTestingSnapshotBehaviorInIsolation
|
||||
default: false
|
||||
redact: false
|
||||
test_only: true
|
||||
|
||||
@ -67,6 +67,16 @@ mongo_cc_library(
|
||||
}),
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "disable_snapshotting_fail_point",
|
||||
srcs = ["disable_snapshotting_fail_point.cpp"],
|
||||
hdrs = ["disable_snapshotting_fail_point.h"],
|
||||
deps = [
|
||||
"//src/mongo:base",
|
||||
"//src/mongo/util:fail_point",
|
||||
],
|
||||
)
|
||||
|
||||
idl_generator(
|
||||
name = "snapshot_window_options_gen",
|
||||
src = "snapshot_window_options.idl",
|
||||
|
||||
39
src/mongo/db/rss/disable_snapshotting_fail_point.cpp
Normal file
39
src/mongo/db/rss/disable_snapshotting_fail_point.cpp
Normal file
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
|
||||
#include "mongo/db/rss/disable_snapshotting_fail_point.h"
|
||||
|
||||
#include "mongo/util/fail_point.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(disableSnapshotting);
|
||||
|
||||
} // namespace mongo
|
||||
39
src/mongo/db/rss/disable_snapshotting_fail_point.h
Normal file
39
src/mongo/db/rss/disable_snapshotting_fail_point.h
Normal file
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/util/fail_point.h"
|
||||
#include "mongo/util/modules.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
MONGO_MOD_PUBLIC extern FailPoint disableSnapshotting;
|
||||
|
||||
} // namespace mongo
|
||||
@ -74,6 +74,7 @@
|
||||
#include "mongo/db/profile_collection.h"
|
||||
#include "mongo/db/profile_settings.h"
|
||||
#include "mongo/db/query/query_request_helper.h"
|
||||
#include "mongo/db/read_concern_mongod_gen.h"
|
||||
#include "mongo/db/read_concern_support_result.h"
|
||||
#include "mongo/db/read_write_concern_defaults.h"
|
||||
#include "mongo/db/read_write_concern_defaults_gen.h"
|
||||
@ -87,6 +88,7 @@
|
||||
#include "mongo/db/repl/storage_interface.h"
|
||||
#include "mongo/db/replication_state_transition_lock_guard.h"
|
||||
#include "mongo/db/request_execution_context.h"
|
||||
#include "mongo/db/rss/disable_snapshotting_fail_point.h"
|
||||
#include "mongo/db/rss/replicated_storage_service.h"
|
||||
#include "mongo/db/server_feature_flags_gen.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
@ -186,6 +188,7 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeSettingTxnInterruptFlag);
|
||||
MONGO_FAIL_POINT_DEFINE(hangAfterCheckingWritabilityForMultiDocumentTransactions);
|
||||
MONGO_FAIL_POINT_DEFINE(failWithErrorCodeAfterSessionCheckOut);
|
||||
MONGO_FAIL_POINT_DEFINE(failIngressRequestRateLimiting);
|
||||
MONGO_FAIL_POINT_DEFINE(hangBeforeComputeOperationTimeForMajorityRead);
|
||||
|
||||
// Tracks the number of times a legacy unacknowledged write failed due to
|
||||
// not primary error resulted in network disconnection.
|
||||
@ -320,8 +323,23 @@ LogicalTime computeOperationTime(OperationContext* opCtx, LogicalTime startOpera
|
||||
|
||||
// Note: ReadConcernArgs::getLevel returns kLocal if none was set.
|
||||
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) {
|
||||
operationTime =
|
||||
LogicalTime(replCoord->getCurrentCommittedSnapshotOpTime().getTimestamp());
|
||||
hangBeforeComputeOperationTimeForMajorityRead.executeIf(
|
||||
[&](const BSONObj&) {
|
||||
hangBeforeComputeOperationTimeForMajorityRead.pauseWhileSet();
|
||||
},
|
||||
[&](const BSONObj& data) {
|
||||
const auto fpNss = NamespaceStringUtil::parseFailPointData(data, "ns");
|
||||
return fpNss.isEmpty() || CurOp::get(opCtx)->getNSS() == fpNss;
|
||||
});
|
||||
auto readTs = [&]() -> boost::optional<Timestamp> {
|
||||
if (gTestingSnapshotBehaviorInIsolation ||
|
||||
MONGO_unlikely(disableSnapshotting.shouldFail())) {
|
||||
return boost::none;
|
||||
}
|
||||
return shard_role_details::getRecoveryUnit(opCtx)->getLastUsedReadTimestamp();
|
||||
}();
|
||||
operationTime = LogicalTime(
|
||||
readTs ? *readTs : replCoord->getCurrentCommittedSnapshotOpTime().getTimestamp());
|
||||
} else {
|
||||
// Use the lockfree atomic shadow to avoid acquiring the ReplicationCoordinator
|
||||
// mutex on every response. Slight staleness is acceptable for operationTime.
|
||||
|
||||
@ -320,6 +320,14 @@ public:
|
||||
"Current storage engine does not support majority readConcerns"};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Timestamp that was used by the previous timestamped read, or none. This value is
|
||||
* cleared when a new transaction is opened.
|
||||
*/
|
||||
virtual boost::optional<Timestamp> getLastUsedReadTimestamp() const {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Timestamp being used by this recovery unit or boost::none if not reading from
|
||||
* a point in time. Any point in time returned will reflect one of the following:
|
||||
|
||||
@ -355,6 +355,9 @@ void WiredTigerRecoveryUnit::preallocateSnapshot(const OpenSnapshotOptions& opti
|
||||
|
||||
void WiredTigerRecoveryUnit::_txnClose(bool commit) {
|
||||
invariant(_isActive(), toString(_getState()));
|
||||
if (!_readAtTimestamp.isNull()) {
|
||||
_lastReadTimestampFromClosedTxn = _readAtTimestamp;
|
||||
}
|
||||
|
||||
if (TestingProctor::instance().isEnabled() && shouldGatherWriteContextForDebugging() &&
|
||||
commit) {
|
||||
@ -543,6 +546,10 @@ Status WiredTigerRecoveryUnit::majorityCommittedSnapshotAvailable() const {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
boost::optional<Timestamp> WiredTigerRecoveryUnit::getLastUsedReadTimestamp() const {
|
||||
return _lastReadTimestampFromClosedTxn;
|
||||
}
|
||||
|
||||
boost::optional<Timestamp> WiredTigerRecoveryUnit::getPointInTimeReadTimestamp() {
|
||||
// After a ReadSource has been set on this RecoveryUnit, callers expect that this method returns
|
||||
// the read timestamp that will be used for current or future transactions. Because callers use
|
||||
@ -589,7 +596,7 @@ boost::optional<Timestamp> WiredTigerRecoveryUnit::getPointInTimeReadTimestamp()
|
||||
invariant(!_readAtTimestamp.isNull());
|
||||
return _readAtTimestamp;
|
||||
|
||||
// The follow ReadSources returned values in the first switch block.
|
||||
// The following ReadSources returned values in the first switch block.
|
||||
case ReadSource::kNoTimestamp:
|
||||
case ReadSource::kProvided:
|
||||
MONGO_UNREACHABLE;
|
||||
@ -607,6 +614,8 @@ void WiredTigerRecoveryUnit::_txnOpen() {
|
||||
"Must be using snapshot isolation to open a WT transaction",
|
||||
_isolation == Isolation::snapshot);
|
||||
|
||||
_lastReadTimestampFromClosedTxn = boost::none;
|
||||
|
||||
ensureSnapshot();
|
||||
_ensureSession();
|
||||
|
||||
|
||||
@ -97,6 +97,8 @@ public:
|
||||
|
||||
Status majorityCommittedSnapshotAvailable() const override;
|
||||
|
||||
boost::optional<Timestamp> getLastUsedReadTimestamp() const override;
|
||||
|
||||
boost::optional<Timestamp> getPointInTimeReadTimestamp() override;
|
||||
|
||||
Status setTimestamp(Timestamp timestamp) override;
|
||||
@ -329,6 +331,7 @@ private:
|
||||
Timestamp _rollbackTimestamp;
|
||||
boost::optional<Timestamp> _lastTimestampSet;
|
||||
Timestamp _readAtTimestamp;
|
||||
boost::optional<Timestamp> _lastReadTimestampFromClosedTxn;
|
||||
UntimestampedWriteAssertionLevel _untimestampedWriteAssertionLevel =
|
||||
UntimestampedWriteAssertionLevel::kEnforce;
|
||||
std::unique_ptr<Timer> _timer;
|
||||
|
||||
@ -208,6 +208,38 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, SetReadSource) {
|
||||
ASSERT_EQ(Timestamp(1, 1), ru1->getPointInTimeReadTimestamp());
|
||||
}
|
||||
|
||||
TEST_F(WiredTigerRecoveryUnitTestFixture, CanReadLastUsedReadTimestamp) {
|
||||
OperationContext* opCtx1 = clientAndCtx1.second.get();
|
||||
std::unique_ptr<RecordStore> rs(harnessHelper->createRecordStore(opCtx1, "a.b"));
|
||||
|
||||
const Timestamp ts1{1, 1};
|
||||
RecordId rid;
|
||||
{
|
||||
StorageWriteTransaction txn(*ru1);
|
||||
const std::string str = "test";
|
||||
StatusWith<RecordId> res = rs->insertRecord(
|
||||
opCtx1, *shard_role_details::getRecoveryUnit(opCtx1), str.c_str(), str.size() + 1, ts1);
|
||||
ASSERT_OK(res);
|
||||
txn.commit();
|
||||
rid = res.getValue();
|
||||
}
|
||||
|
||||
// Before any read transaction with a timestamp has closed, getLastUsedReadTimestamp is none.
|
||||
ASSERT_EQ(boost::none, ru1->getLastUsedReadTimestamp());
|
||||
|
||||
const Timestamp ts2{2, 2};
|
||||
|
||||
// Open a read transaction at ts2.
|
||||
ru1->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts2);
|
||||
RecordData unused;
|
||||
ASSERT_TRUE(rs->findRecord(opCtx1, *shard_role_details::getRecoveryUnit(opCtx1), rid, &unused));
|
||||
ASSERT_EQ(std::string("test"), std::string(unused.data(), unused.size() - 1));
|
||||
|
||||
// Closing the transaction should capture the read timestamp.
|
||||
ru1->abandonSnapshot();
|
||||
ASSERT_EQ(ts2, ru1->getLastUsedReadTimestamp());
|
||||
}
|
||||
|
||||
TEST_F(WiredTigerRecoveryUnitTestFixture, NoOverlapReadSource) {
|
||||
OperationContext* opCtx1 = clientAndCtx1.second.get();
|
||||
OperationContext* opCtx2 = clientAndCtx2.second.get();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user