SERVER-125965: Wait for stable timestamp to advance in initial sync (#54176)
GitOrigin-RevId: a0ad6c116cbef6833ee7d14d8bd046c7ef099441
This commit is contained in:
parent
7d1b18f756
commit
63745eaa45
@ -51,14 +51,22 @@ last-continuous:
|
|||||||
ticket: SERVER-113184
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/initial_sync_commit_prepared_transaction.js
|
- test_file: jstests/replsets/initial_sync_commit_prepared_transaction.js
|
||||||
ticket: SERVER-113184
|
ticket: SERVER-113184
|
||||||
|
- test_file: jstests/replsets/initial_sync_drop_collection.js
|
||||||
|
ticket: SERVER-113184
|
||||||
|
- test_file: jstests/replsets/initial_sync_during_stepdown.js
|
||||||
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js
|
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js
|
||||||
ticket: SERVER-113184
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application.js
|
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application.js
|
||||||
ticket: SERVER-113184
|
ticket: SERVER-113184
|
||||||
|
- test_file: jstests/replsets/initial_sync_rename_collection.js
|
||||||
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/initial_sync_replSetGetStatus.js
|
- test_file: jstests/replsets/initial_sync_replSetGetStatus.js
|
||||||
ticket: SERVER-121218
|
ticket: SERVER-121218
|
||||||
- test_file: jstests/replsets/initial_sync_test_fixture_test.js
|
- test_file: jstests/replsets/initial_sync_test_fixture_test.js
|
||||||
ticket: SERVER-113184
|
ticket: SERVER-113184
|
||||||
|
- test_file: jstests/replsets/initial_sync_waits_for_stable_timestamp.js
|
||||||
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/server_status_metrics.js
|
- test_file: jstests/replsets/server_status_metrics.js
|
||||||
ticket: SERVER-120318
|
ticket: SERVER-120318
|
||||||
- test_file: jstests/sharding/chunk_migration_maxkey_boundary.js
|
- test_file: jstests/sharding/chunk_migration_maxkey_boundary.js
|
||||||
@ -466,6 +474,10 @@ last-lts:
|
|||||||
ticket: SERVER-94259
|
ticket: SERVER-94259
|
||||||
- test_file: jstests/replsets/initial_sync_commit_prepared_transaction.js
|
- test_file: jstests/replsets/initial_sync_commit_prepared_transaction.js
|
||||||
ticket: SERVER-113184
|
ticket: SERVER-113184
|
||||||
|
- test_file: jstests/replsets/initial_sync_drop_collection.js
|
||||||
|
ticket: SERVER-113184
|
||||||
|
- test_file: jstests/replsets/initial_sync_during_stepdown.js
|
||||||
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/initial_sync_during_stepdown.js
|
- test_file: jstests/replsets/initial_sync_during_stepdown.js
|
||||||
ticket: SERVER-89664
|
ticket: SERVER-89664
|
||||||
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js
|
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js
|
||||||
@ -474,12 +486,16 @@ last-lts:
|
|||||||
ticket: SERVER-113184
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/initial_sync_index_conflict_recreate.js
|
- test_file: jstests/replsets/initial_sync_index_conflict_recreate.js
|
||||||
ticket: SERVER-93141
|
ticket: SERVER-93141
|
||||||
|
- test_file: jstests/replsets/initial_sync_rename_collection.js
|
||||||
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/initial_sync_replSetGetStatus.js
|
- test_file: jstests/replsets/initial_sync_replSetGetStatus.js
|
||||||
ticket: SERVER-121218
|
ticket: SERVER-121218
|
||||||
- test_file: jstests/replsets/initial_sync_survives_restart.js
|
- test_file: jstests/replsets/initial_sync_survives_restart.js
|
||||||
ticket: SERVER-88447
|
ticket: SERVER-88447
|
||||||
- test_file: jstests/replsets/initial_sync_test_fixture_test.js
|
- test_file: jstests/replsets/initial_sync_test_fixture_test.js
|
||||||
ticket: SERVER-113184
|
ticket: SERVER-113184
|
||||||
|
- test_file: jstests/replsets/initial_sync_waits_for_stable_timestamp.js
|
||||||
|
ticket: SERVER-113184
|
||||||
- test_file: jstests/replsets/large_dockey_succeeds_on_secondary.js
|
- test_file: jstests/replsets/large_dockey_succeeds_on_secondary.js
|
||||||
ticket: SERVER-104405
|
ticket: SERVER-104405
|
||||||
- test_file: jstests/replsets/log_unprepared_abort_txns.js
|
- test_file: jstests/replsets/log_unprepared_abort_txns.js
|
||||||
|
|||||||
@ -51,6 +51,13 @@ function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) {
|
|||||||
// can check initialSyncStatus fields after initial sync is complete.
|
// can check initialSyncStatus fields after initial sync is complete.
|
||||||
secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"});
|
secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"});
|
||||||
secondaryStartupParams["numInitialSyncAttempts"] = 1;
|
secondaryStartupParams["numInitialSyncAttempts"] = 1;
|
||||||
|
// We must disable this parameter because the primary may be able to advance the stable
|
||||||
|
// timestamp after initiate prior to this restart. As a result, the initial sync
|
||||||
|
// node may see that it is no longer initiating the set, and wait for stable
|
||||||
|
// to advance to beginApplying in initial sync. Since the majority of this
|
||||||
|
// set is 2, the primary will be unable to advance its stable, leaving this node
|
||||||
|
// stuck in initial sync. We must disable the wait to avoid this scenario.
|
||||||
|
secondaryStartupParams["initialSyncWaitForSyncSourceLastStableRecoveryTs"] = false;
|
||||||
secondary = replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams});
|
secondary = replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams});
|
||||||
secondaryDB = secondary.getDB(dbName);
|
secondaryDB = secondary.getDB(dbName);
|
||||||
secondaryColl = secondaryDB[collName];
|
secondaryColl = secondaryDB[collName];
|
||||||
|
|||||||
@ -54,6 +54,13 @@ function setupTest({
|
|||||||
// Skip clearing initial sync progress after a successful initial sync attempt so that we
|
// Skip clearing initial sync progress after a successful initial sync attempt so that we
|
||||||
// can check initialSyncStatus fields after initial sync is complete.
|
// can check initialSyncStatus fields after initial sync is complete.
|
||||||
secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"});
|
secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"});
|
||||||
|
// We must disable this parameter because the primary may be able to advance the stable
|
||||||
|
// timestamp after initiate prior to this restart. As a result, the initial sync
|
||||||
|
// node may see that it is no longer initiating the set, and wait for stable
|
||||||
|
// to advance to beginApplying in initial sync. Since the majority of this
|
||||||
|
// set is 2, the primary will be unable to advance its stable, leaving this node
|
||||||
|
// stuck in initial sync. We must disable the wait to avoid this scenario.
|
||||||
|
secondaryStartupParams["initialSyncWaitForSyncSourceLastStableRecoveryTs"] = false;
|
||||||
secondary = rst.start(secondary, {startClean: true, setParameter: secondaryStartupParams});
|
secondary = rst.start(secondary, {startClean: true, setParameter: secondaryStartupParams});
|
||||||
secondaryDB = secondary.getDB(dbName);
|
secondaryDB = secondary.getDB(dbName);
|
||||||
secondaryColl = secondaryDB[collName];
|
secondaryColl = secondaryDB[collName];
|
||||||
|
|||||||
@ -56,6 +56,13 @@ function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) {
|
|||||||
// can check initialSyncStatus fields after initial sync is complete.
|
// can check initialSyncStatus fields after initial sync is complete.
|
||||||
secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"});
|
secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"});
|
||||||
secondaryStartupParams["numInitialSyncAttempts"] = 1;
|
secondaryStartupParams["numInitialSyncAttempts"] = 1;
|
||||||
|
// We must disable this parameter because the primary may be able to advance the stable
|
||||||
|
// timestamp after initiate prior to this restart. As a result, the initial sync
|
||||||
|
// node may see that it is no longer initiating the set, and wait for stable
|
||||||
|
// to advance to beginApplying in initial sync. Since the majority of this
|
||||||
|
// set is 2, the primary will be unable to advance its stable, leaving this node
|
||||||
|
// stuck in initial sync. We must disable the wait to avoid this scenario.
|
||||||
|
secondaryStartupParams["initialSyncWaitForSyncSourceLastStableRecoveryTs"] = false;
|
||||||
secondary = replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams});
|
secondary = replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams});
|
||||||
secondaryDB = secondary.getDB(dbName);
|
secondaryDB = secondary.getDB(dbName);
|
||||||
secondaryColl = secondaryDB[collName];
|
secondaryColl = secondaryDB[collName];
|
||||||
|
|||||||
133
jstests/replsets/initial_sync_waits_for_stable_timestamp.js
Normal file
133
jstests/replsets/initial_sync_waits_for_stable_timestamp.js
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
/**
|
||||||
|
* Tests that initial sync correctly waits for the sync source's lastStableRecoveryTimestamp to
|
||||||
|
* advance past beginApplyingTimestamp:
|
||||||
|
* 1. Initial sync succeeds once the stable timestamp is released and allowed to advance.
|
||||||
|
* 2. Initial sync fails the current attempt if the stable timestamp does not advance within the
|
||||||
|
* configured retry period, then succeeds on the next attempt once it is released.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||||
|
import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js";
|
||||||
|
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||||
|
|
||||||
|
describe("initial sync waits for sync source stable recovery timestamp to advance", function () {
|
||||||
|
const kDbName = "test";
|
||||||
|
const kCollName = "coll";
|
||||||
|
let rst;
|
||||||
|
let primary;
|
||||||
|
|
||||||
|
beforeEach(function () {
|
||||||
|
rst = new ReplSetTest({nodes: 1});
|
||||||
|
rst.startSet();
|
||||||
|
rst.initiate();
|
||||||
|
primary = rst.getPrimary();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(function () {
|
||||||
|
rst.stopSet();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Sets up the primary's stable timestamp hold and adds a new secondary configured to use the
|
||||||
|
// wait-for-stable-timestamp feature. Returns the failpoint handle and the secondary node.
|
||||||
|
function prepareInitialSync(extraParams = {}) {
|
||||||
|
// Capture the initiating set entry ts (the earliest oplog entry on a fresh primary).
|
||||||
|
const initiatingSetTs = primary.getDB("local").oplog.rs.find().sort({$natural: 1}).limit(1).next().ts;
|
||||||
|
|
||||||
|
// Insert docs to populate the collection before the stable-timestamp pin.
|
||||||
|
assert.commandWorked(primary.getDB(kDbName)[kCollName].insertMany([{_id: 1}, {_id: 2}, {_id: 3}]));
|
||||||
|
|
||||||
|
// Wait for stable ts to checkpoint at a strictly later *second* than initiatingSetTs.
|
||||||
|
// _initiatingSetStableTimestampCallback computes diff = stableTs.getSecs() -
|
||||||
|
// earliestTs.getSecs(). With thresholdSecs=0 (JS test infra default), we need diff >= 1
|
||||||
|
// so the initiating-set skip (diff <= thresholdSecs) does not fire.
|
||||||
|
let pinTs;
|
||||||
|
assert.soon(() => {
|
||||||
|
assert.commandWorked(primary.getDB(kDbName)[kCollName].updateOne({_id: 1}, {$inc: {_v: 1}}));
|
||||||
|
assert.commandWorked(primary.adminCommand({fsync: 1}));
|
||||||
|
const status = assert.commandWorked(primary.adminCommand({replSetGetStatus: 1}));
|
||||||
|
const st = status.lastStableRecoveryTimestamp;
|
||||||
|
if (st.getTime() > initiatingSetTs.getTime()) {
|
||||||
|
pinTs = st;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}, "Timed out waiting for primary stable ts to advance past initiating set entry second");
|
||||||
|
|
||||||
|
// Pin stable ts here so beginApplyingTimestamp (set from the next inserts) will be above it.
|
||||||
|
const holdFp = configureFailPoint(primary, "holdStableTimestampAtSpecificTimestamp", {timestamp: pinTs});
|
||||||
|
|
||||||
|
// Insert more docs to push optime above pinTs so beginApplyingTimestamp > pinTs.
|
||||||
|
assert.commandWorked(primary.getDB(kDbName)[kCollName].insertMany([{_id: 4}, {_id: 5}, {_id: 6}]));
|
||||||
|
|
||||||
|
const secondary = rst.add({
|
||||||
|
rsConfig: {priority: 0, votes: 0},
|
||||||
|
setParameter: Object.assign(
|
||||||
|
{
|
||||||
|
numInitialSyncAttempts: 2,
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs: true,
|
||||||
|
},
|
||||||
|
extraParams,
|
||||||
|
),
|
||||||
|
});
|
||||||
|
rst.reInitiate();
|
||||||
|
rst.waitForState(secondary, ReplSetTest.State.STARTUP_2);
|
||||||
|
return {holdFp, secondary};
|
||||||
|
}
|
||||||
|
|
||||||
|
it("succeeds once the sync source stable recovery timestamp advances", function () {
|
||||||
|
const {holdFp, secondary} = prepareInitialSync();
|
||||||
|
|
||||||
|
// Wait until the secondary has entered the stable-timestamp wait loop (log ID 11318413).
|
||||||
|
assert.soon(
|
||||||
|
() => checkLog.checkContainsOnce(secondary, 11318413),
|
||||||
|
"Timed out waiting for secondary to enter stable timestamp wait loop",
|
||||||
|
);
|
||||||
|
|
||||||
|
jsTestLog("Releasing stable timestamp hold on primary.");
|
||||||
|
holdFp.off();
|
||||||
|
// A new write is required to trigger setStableTimestamp now that the hold is released;
|
||||||
|
// without it, WT's stable timestamp stays at the pinned value indefinitely.
|
||||||
|
assert.commandWorked(primary.getDB(kDbName)[kCollName].insertOne({_id: 7}));
|
||||||
|
|
||||||
|
rst.awaitSecondaryNodes(null, [secondary]);
|
||||||
|
assert.eq(7, secondary.getDB(kDbName)[kCollName].find().itcount());
|
||||||
|
});
|
||||||
|
|
||||||
|
it("fails if stable timestamp does not advance within the retry period", function () {
|
||||||
|
const kRetryPeriodSecs = 10;
|
||||||
|
const {holdFp, secondary} = prepareInitialSync({
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs: kRetryPeriodSecs,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the first initial sync attempt to time out (log ID 11318417).
|
||||||
|
assert.soon(
|
||||||
|
() => {
|
||||||
|
try {
|
||||||
|
const status = secondary.adminCommand({replSetGetStatus: 1});
|
||||||
|
return status.initialSyncStatus && status.initialSyncStatus.failedInitialSyncAttempts >= 1;
|
||||||
|
} catch (e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"Timed out waiting for initial sync attempt to fail",
|
||||||
|
(kRetryPeriodSecs + 60) * 1000,
|
||||||
|
);
|
||||||
|
|
||||||
|
const status = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
|
||||||
|
const failedAttempt = status.initialSyncStatus.initialSyncAttempts[0];
|
||||||
|
assert(
|
||||||
|
failedAttempt.status.includes("Failed to wait for stable recovery timestamp"),
|
||||||
|
"Expected stable timestamp wait timeout error",
|
||||||
|
{failedAttempt},
|
||||||
|
);
|
||||||
|
|
||||||
|
jsTestLog("Initial sync failed as expected. Releasing hold for second attempt.");
|
||||||
|
holdFp.off();
|
||||||
|
// A new write is required to trigger setStableTimestamp now that the hold is released;
|
||||||
|
// without it, WT's stable timestamp stays at the pinned value indefinitely.
|
||||||
|
assert.commandWorked(primary.getDB(kDbName)[kCollName].insertOne({_id: 7}));
|
||||||
|
|
||||||
|
rst.awaitSecondaryNodes(null, [secondary]);
|
||||||
|
assert.eq(7, secondary.getDB(kDbName)[kCollName].find().itcount());
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -90,6 +90,7 @@ public:
|
|||||||
const Result& result);
|
const Result& result);
|
||||||
void setResultAndCancelRemainingWork(const std::unique_lock<std::mutex>& lock,
|
void setResultAndCancelRemainingWork(const std::unique_lock<std::mutex>& lock,
|
||||||
const Result& result);
|
const Result& result);
|
||||||
|
void setResultAndCancelRemainingWork(WithLock lock, const Result& result);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
@ -137,6 +138,12 @@ void CallbackCompletionGuard<Result>::setResultAndCancelRemainingWork(
|
|||||||
_setResultAndCancelRemainingWork(lock, result);
|
_setResultAndCancelRemainingWork(lock, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename Result>
|
||||||
|
void CallbackCompletionGuard<Result>::setResultAndCancelRemainingWork(WithLock lock,
|
||||||
|
const Result& result) {
|
||||||
|
_setResultAndCancelRemainingWork(lock, result);
|
||||||
|
}
|
||||||
|
|
||||||
template <typename Result>
|
template <typename Result>
|
||||||
void CallbackCompletionGuard<Result>::_setResultAndCancelRemainingWork(WithLock lk,
|
void CallbackCompletionGuard<Result>::_setResultAndCancelRemainingWork(WithLock lk,
|
||||||
const Result& result) {
|
const Result& result) {
|
||||||
|
|||||||
@ -59,7 +59,13 @@ struct InitialSyncState {
|
|||||||
|
|
||||||
bool earliestOplogEntryIsInitiatingSet = false;
|
bool earliestOplogEntryIsInitiatingSet = false;
|
||||||
Timestamp earliestOplogEntryTimestamp;
|
Timestamp earliestOplogEntryTimestamp;
|
||||||
Date_t waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline;
|
Date_t waitForSyncSourceStableTimestampAdvanceStartTime; // Time at which we started waiting
|
||||||
|
// for the sync source's last
|
||||||
|
// checkpoint to advance. Used with
|
||||||
|
// the retry period parameter to
|
||||||
|
// compute the deadline on each loop.
|
||||||
|
int waitForSyncSourceStableTimestampAdvanceSleepMillis =
|
||||||
|
100; // How long to sleep in-between attempts. Increases exponentially.
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace repl
|
} // namespace repl
|
||||||
|
|||||||
@ -165,6 +165,10 @@ using LockGuard = std::lock_guard<std::mutex>;
|
|||||||
// Used to reset the oldest timestamp during initial sync to a non-null timestamp.
|
// Used to reset the oldest timestamp during initial sync to a non-null timestamp.
|
||||||
const Timestamp kTimestampOne(0, 1);
|
const Timestamp kTimestampOne(0, 1);
|
||||||
|
|
||||||
|
// Maximum amount of time we will pause in-between waits for our sync source to advance stable
|
||||||
|
// timestamp.
|
||||||
|
const int kMaxExponentialBackoffMillis = 30000;
|
||||||
|
|
||||||
ServiceContext::UniqueOperationContext makeOpCtx() {
|
ServiceContext::UniqueOperationContext makeOpCtx() {
|
||||||
return cc().makeOperationContext();
|
return cc().makeOperationContext();
|
||||||
}
|
}
|
||||||
@ -1269,6 +1273,13 @@ void InitialSyncer::_initiatingSetStableTimestampCallback(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
status = getStatusFromCommandResult(callbackArgs.response.data);
|
||||||
|
if (!status.isOK()) {
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
|
||||||
|
_initialSyncState.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
auto stableElem = callbackArgs.response.data["lastStableRecoveryTimestamp"];
|
auto stableElem = callbackArgs.response.data["lastStableRecoveryTimestamp"];
|
||||||
if (!stableElem) {
|
if (!stableElem) {
|
||||||
LOGV2_WARNING(11318414,
|
LOGV2_WARNING(11318414,
|
||||||
@ -1334,18 +1345,194 @@ void InitialSyncer::_initiatingSetStableTimestampCallback(
|
|||||||
"isInitiatingSet"_attr = isInitiatingSet,
|
"isInitiatingSet"_attr = isInitiatingSet,
|
||||||
"earliestOplogEntryTs"_attr = earliestTs,
|
"earliestOplogEntryTs"_attr = earliestTs,
|
||||||
"lastStableRecoveryTs"_attr = lastStableRecoveryTs);
|
"lastStableRecoveryTs"_attr = lastStableRecoveryTs);
|
||||||
_initialSyncState->waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline =
|
_initialSyncState->waitForSyncSourceStableTimestampAdvanceStartTime = (*_attemptExec)->now();
|
||||||
(*_attemptExec)->now() +
|
_checkStableTimestampAdvancementLocked(
|
||||||
Seconds(initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs.load());
|
lock, lastStableRecoveryTs, onCompletionGuard, beginFetchingOpTime);
|
||||||
// TODO (SERVER-125965): Wait for lastStableRecoveryTimestamp to advance in this case instead of
|
}
|
||||||
// proceeding directly to _initializeOplogFetcherAndDbCloners.
|
|
||||||
status = _scheduleWorkAndSaveHandle(
|
void InitialSyncer::_runFsyncOnSyncSource(const executor::TaskExecutor::CallbackArgs& callbackArgs,
|
||||||
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
|
const OpTime& beginFetchingOpTime) {
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
if (!_checkForShutdownAndHandleError(
|
||||||
|
lock, callbackArgs, onCompletionGuard, "error scheduling fsync on sync source")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
executor::RemoteCommandRequest fsyncRequest(
|
||||||
|
_syncSource, DatabaseName::kAdmin, BSON("fsync" << 1), nullptr);
|
||||||
|
auto cbHandle =
|
||||||
|
(*_attemptExec)
|
||||||
|
->scheduleRemoteCommand(
|
||||||
|
std::move(fsyncRequest),
|
||||||
|
[this, onCompletionGuard, beginFetchingOpTime](
|
||||||
|
TaskExecutor::RemoteCommandCallbackArgs args) {
|
||||||
|
// fsync is fire-and-forget: proceed to replSetGetStatus regardless of outcome.
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
auto status = _scheduleWorkAndSaveHandle(
|
||||||
|
lock,
|
||||||
|
[=, this](const executor::TaskExecutor::CallbackArgs& args) {
|
||||||
|
_runReplSetGetStatusOnSyncSource(
|
||||||
|
args, onCompletionGuard, beginFetchingOpTime);
|
||||||
|
},
|
||||||
|
&_waitForSyncSourceStableTimestampHandle,
|
||||||
|
"_runReplSetGetStatusOnSyncSource");
|
||||||
|
if (!status.isOK()) {
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
|
||||||
|
_initialSyncState.reset();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (!cbHandle.isOK()) {
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(lock, cbHandle.getStatus());
|
||||||
|
_initialSyncState.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the handle so that it can be cancelled if initial sync shuts down.
|
||||||
|
_waitForSyncSourceStableTimestampHandle = cbHandle.getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
void InitialSyncer::_runReplSetGetStatusOnSyncSource(
|
||||||
|
const executor::TaskExecutor::CallbackArgs& callbackArgs,
|
||||||
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
|
const OpTime& beginFetchingOpTime) {
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
if (!_checkForShutdownAndHandleError(lock,
|
||||||
|
callbackArgs,
|
||||||
|
onCompletionGuard,
|
||||||
|
"error scheduling replSetGetStatus on sync source")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
executor::RemoteCommandRequest replSetGetStatusRequest(
|
||||||
|
_syncSource, DatabaseName::kAdmin, BSON("replSetGetStatus" << 1), nullptr);
|
||||||
|
auto cbHandle =
|
||||||
|
(*_attemptExec)
|
||||||
|
->scheduleRemoteCommand(std::move(replSetGetStatusRequest),
|
||||||
|
[this, onCompletionGuard, beginFetchingOpTime](
|
||||||
|
TaskExecutor::RemoteCommandCallbackArgs args) {
|
||||||
|
_handleLastStableRecoveryTsResponse(
|
||||||
|
args, onCompletionGuard, beginFetchingOpTime);
|
||||||
|
});
|
||||||
|
if (!cbHandle.isOK()) {
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(lock, cbHandle.getStatus());
|
||||||
|
_initialSyncState.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the handle so that it can be cancelled if initial sync shuts down.
|
||||||
|
_waitForSyncSourceStableTimestampHandle = cbHandle.getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
void InitialSyncer::_handleLastStableRecoveryTsResponse(
|
||||||
|
const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs,
|
||||||
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
|
const OpTime& beginFetchingOpTime) {
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
|
||||||
|
auto status = _checkForShutdownAndConvertStatus(
|
||||||
lock,
|
lock,
|
||||||
|
callbackArgs.response.status,
|
||||||
|
"error waiting for sync source lastStableRecoveryTimestamp to advance");
|
||||||
|
if (!status.isOK()) {
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
|
||||||
|
_initialSyncState.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
status = getStatusFromCommandResult(callbackArgs.response.data);
|
||||||
|
if (!status.isOK()) {
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
|
||||||
|
_initialSyncState.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto stableElem = callbackArgs.response.data["lastStableRecoveryTimestamp"];
|
||||||
|
if (!stableElem) {
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(
|
||||||
|
lock,
|
||||||
|
Status(ErrorCodes::InvalidSyncSource,
|
||||||
|
"sync source replSetGetStatus response is missing lastStableRecoveryTimestamp"));
|
||||||
|
_initialSyncState.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_checkStableTimestampAdvancementLocked(
|
||||||
|
lock, stableElem.timestamp(), onCompletionGuard, beginFetchingOpTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
void InitialSyncer::_checkStableTimestampAdvancementLocked(
|
||||||
|
WithLock lock,
|
||||||
|
Timestamp lastStableRecoveryTs,
|
||||||
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
|
const OpTime& beginFetchingOpTime) {
|
||||||
|
const auto beginApplyingTs = _initialSyncState->beginApplyingTimestamp;
|
||||||
|
|
||||||
|
LOGV2_DEBUG(11318415,
|
||||||
|
2,
|
||||||
|
"Checking sync source lastStableRecoveryTimestamp",
|
||||||
|
"lastStableRecoveryTimestamp"_attr = lastStableRecoveryTs,
|
||||||
|
"beginApplyingTimestamp"_attr = beginApplyingTs);
|
||||||
|
|
||||||
|
if (lastStableRecoveryTs >= beginApplyingTs) {
|
||||||
|
LOGV2(11318400,
|
||||||
|
"Sync source lastStableRecoveryTimestamp has advanced; continuing with initial sync",
|
||||||
|
"lastStableRecoveryTimestamp"_attr = lastStableRecoveryTs,
|
||||||
|
"beginApplyingTimestamp"_attr = beginApplyingTs);
|
||||||
|
auto status = _scheduleWorkAndSaveHandle(
|
||||||
|
lock,
|
||||||
|
[=, this](const executor::TaskExecutor::CallbackArgs& args) {
|
||||||
|
_initializeOplogFetcherAndDbCloners(args, onCompletionGuard, beginFetchingOpTime);
|
||||||
|
},
|
||||||
|
&_initializeOplogFetcherAndDbClonersHandle,
|
||||||
|
"_initializeOplogFetcherAndDbCloners");
|
||||||
|
if (!status.isOK()) {
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
|
||||||
|
_initialSyncState.reset();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto now = (*_attemptExec)->now();
|
||||||
|
const auto deadline = _initialSyncState->waitForSyncSourceStableTimestampAdvanceStartTime +
|
||||||
|
Seconds(initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs.load());
|
||||||
|
if (now >= deadline) {
|
||||||
|
LOGV2_WARNING(11318417,
|
||||||
|
"Timed out waiting for sync source lastStableRecoveryTimestamp to advance",
|
||||||
|
"lastStableRecoveryTimestamp"_attr = lastStableRecoveryTs,
|
||||||
|
"beginApplyingTimestamp"_attr = beginApplyingTs,
|
||||||
|
"deadline"_attr = deadline);
|
||||||
|
onCompletionGuard->setResultAndCancelRemainingWork(
|
||||||
|
lock,
|
||||||
|
Status(ErrorCodes::ExceededTimeLimit,
|
||||||
|
"Failed to wait for stable recovery timestamp to advance. To resolve this, "
|
||||||
|
"ensure that the sync source is healthy and able to advance its checkpoint "
|
||||||
|
"timestamp."));
|
||||||
|
_initialSyncState.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retry after exponential backoff. Maximum amount of time for exponential backoff is 30s.
|
||||||
|
const auto sleepMillis = _initialSyncState->waitForSyncSourceStableTimestampAdvanceSleepMillis;
|
||||||
|
_initialSyncState->waitForSyncSourceStableTimestampAdvanceSleepMillis =
|
||||||
|
std::min(sleepMillis * 2, kMaxExponentialBackoffMillis);
|
||||||
|
const auto when = std::min(now + Milliseconds(sleepMillis), deadline);
|
||||||
|
|
||||||
|
LOGV2_DEBUG(11318418,
|
||||||
|
2,
|
||||||
|
"Sync source lastStableRecoveryTimestamp not yet advanced; retrying after backoff",
|
||||||
|
"lastStableRecoveryTimestamp"_attr = lastStableRecoveryTs,
|
||||||
|
"beginApplyingTimestamp"_attr = beginApplyingTs,
|
||||||
|
"retryAfterMillis"_attr = sleepMillis);
|
||||||
|
|
||||||
|
auto status = _scheduleWorkAtAndSaveHandle(
|
||||||
|
lock,
|
||||||
|
when,
|
||||||
[=, this](const executor::TaskExecutor::CallbackArgs& args) {
|
[=, this](const executor::TaskExecutor::CallbackArgs& args) {
|
||||||
_initializeOplogFetcherAndDbCloners(args, onCompletionGuard, beginFetchingOpTime);
|
_runFsyncOnSyncSource(args, onCompletionGuard, beginFetchingOpTime);
|
||||||
},
|
},
|
||||||
&_initializeOplogFetcherAndDbClonersHandle,
|
&_waitForSyncSourceStableTimestampHandle,
|
||||||
"_initializeOplogFetcherAndDbCloners");
|
"_runFsyncOnSyncSource");
|
||||||
if (!status.isOK()) {
|
if (!status.isOK()) {
|
||||||
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
|
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
|
||||||
_initialSyncState.reset();
|
_initialSyncState.reset();
|
||||||
|
|||||||
@ -407,9 +407,23 @@ private:
|
|||||||
* |
|
* |
|
||||||
* |
|
* |
|
||||||
* V
|
* V
|
||||||
* _checkIfInitiatingSet()
|
* _initiatingSetStableTimestampCallback()
|
||||||
* |
|
* | |
|
||||||
* |
|
* (skip) | (wait needed) |
|
||||||
|
* | V
|
||||||
|
* | _checkStableTimestampAdvancementLocked() <---+
|
||||||
|
* | | |
|
||||||
|
* | (not advanced) |
|
||||||
|
* | V |
|
||||||
|
* | _runFsyncOnSyncSource() (after backoff) |
|
||||||
|
* | | |
|
||||||
|
* | V |
|
||||||
|
* | _runReplSetGetStatusOnSyncSource() |
|
||||||
|
* | | |
|
||||||
|
* | V |
|
||||||
|
* | _handleLastStableRecoveryTsResponse() -------+
|
||||||
|
* | (advanced or proceed)
|
||||||
|
* +-------------------+
|
||||||
* V
|
* V
|
||||||
* _initializeOplogFetcherAndDbCloners()
|
* _initializeOplogFetcherAndDbCloners()
|
||||||
* |
|
* |
|
||||||
@ -574,7 +588,7 @@ private:
|
|||||||
*
|
*
|
||||||
* Non-initiating case — when the conditions above are not met, records the retry
|
* Non-initiating case — when the conditions above are not met, records the retry
|
||||||
* deadline in
|
* deadline in
|
||||||
* `_initialSyncState->waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline` and
|
* `_initialSyncState->waitForSyncSourceStableTimestampAdvanceStartTime` and
|
||||||
* proceeds with the original `beginFetchingOpTime`.
|
* proceeds with the original `beginFetchingOpTime`.
|
||||||
* (Full retry loop is tracked in SERVER-125965.)
|
* (Full retry loop is tracked in SERVER-125965.)
|
||||||
*
|
*
|
||||||
@ -585,6 +599,43 @@ private:
|
|||||||
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
const OpTime& beginFetchingOpTime);
|
const OpTime& beginFetchingOpTime);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Issues {fsync: 1} on the sync source to force a checkpoint, then schedules
|
||||||
|
* _runReplSetGetStatusOnSyncSource. Called on every wait attempt.
|
||||||
|
*/
|
||||||
|
void _runFsyncOnSyncSource(const executor::TaskExecutor::CallbackArgs& callbackArgs,
|
||||||
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
|
const OpTime& beginFetchingOpTime);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Issues {replSetGetStatus: 1} on the sync source and routes the response to
|
||||||
|
* _handleLastStableRecoveryTsResponse.
|
||||||
|
*/
|
||||||
|
void _runReplSetGetStatusOnSyncSource(const executor::TaskExecutor::CallbackArgs& callbackArgs,
|
||||||
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
|
const OpTime& beginFetchingOpTime);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates the replSetGetStatus response and delegates to
|
||||||
|
* _checkStableTimestampAdvancementLocked for the wait-loop decision.
|
||||||
|
*/
|
||||||
|
void _handleLastStableRecoveryTsResponse(
|
||||||
|
const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs,
|
||||||
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
|
const OpTime& beginFetchingOpTime);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether lastStableRecoveryTs >= beginApplyingTimestamp. If so, proceeds to
|
||||||
|
* _initializeOplogFetcherAndDbCloners. If the deadline has passed, fails the attempt with
|
||||||
|
* ExceededTimeLimit. Otherwise schedules the next exponential-backoff fsync retry.
|
||||||
|
* Must be called with _mutex held.
|
||||||
|
*/
|
||||||
|
void _checkStableTimestampAdvancementLocked(
|
||||||
|
WithLock lock,
|
||||||
|
Timestamp lastStableRecoveryTs,
|
||||||
|
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
|
||||||
|
const OpTime& beginFetchingOpTime);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the oplog fetcher and database cloners.
|
* Initializes the oplog fetcher and database cloners.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -5343,6 +5343,19 @@ TEST_F(InitialSyncerTest, InitialSyncerReloadsTransientErrorRetryPeriodOnEachAtt
|
|||||||
ASSERT_EQUALS(initialSyncer->getAllowedOutageDuration_forTest(), Seconds(updatedRetryPeriod));
|
ASSERT_EQUALS(initialSyncer->getAllowedOutageDuration_forTest(), Seconds(updatedRetryPeriod));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerFailsIfEarliestOplogEntryFetcherReturnsEmptyBatch) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
runWaitStableTsPreamble();
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto net = getNet();
|
||||||
|
{
|
||||||
|
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
|
||||||
|
processSuccessfulEarliestOplogEntryFetcherResponse({});
|
||||||
|
}
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, _lastApplied);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(InitialSyncerTest,
|
TEST_F(InitialSyncerTest,
|
||||||
InitialSyncerPassesThroughEarliestOplogEntryFetcherCallbackErrorWhenWaitingForStableTs) {
|
InitialSyncerPassesThroughEarliestOplogEntryFetcherCallbackErrorWhenWaitingForStableTs) {
|
||||||
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
@ -5380,6 +5393,27 @@ TEST_F(InitialSyncerTest,
|
|||||||
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
|
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest,
|
||||||
|
InitialSyncerPassesThroughReplSetGetStatusCommandErrorWhenWaitingForStableTs) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
runWaitStableTsPreamble();
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto net = getNet();
|
||||||
|
{
|
||||||
|
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
|
||||||
|
processSuccessfulEarliestOplogEntryFetcherResponse({makeOplogEntryObj(1)});
|
||||||
|
// Network call succeeds but command returns {ok: 0} — only caught by
|
||||||
|
// getStatusFromCommandResult, not by the response.status check.
|
||||||
|
auto replSetGetStatusRequest = net->scheduleSuccessfulResponse(
|
||||||
|
BSON("ok" << 0 << "code" << ErrorCodes::NotWritablePrimary << "errmsg"
|
||||||
|
<< "not primary"));
|
||||||
|
assertRemoteCommandNameEquals("replSetGetStatus", replSetGetStatusRequest);
|
||||||
|
net->runReadyNetworkOperations();
|
||||||
|
}
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::NotWritablePrimary, _lastApplied);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(InitialSyncerTest,
|
TEST_F(InitialSyncerTest,
|
||||||
InitialSyncerFailsIfReplSetGetStatusResponseMissingLastStableRecoveryTimestamp) {
|
InitialSyncerFailsIfReplSetGetStatusResponseMissingLastStableRecoveryTimestamp) {
|
||||||
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
@ -5497,4 +5531,368 @@ TEST_F(InitialSyncerTest, InitialSyncerDoesNotSkipWaitWhenInitiatingSetOutsideTh
|
|||||||
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
|
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerWaitLoopIgnoresFsyncError) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto opCtx = makeOpCtx();
|
||||||
|
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
|
||||||
|
{
|
||||||
|
MockNetwork::InSequence seq(*_mock);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(2);
|
||||||
|
_mock
|
||||||
|
->expect([](auto& r) { return r["find"].str() == "system.version"; },
|
||||||
|
makeCursorResponse(
|
||||||
|
0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()}))
|
||||||
|
.times(1);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(1);
|
||||||
|
// Stable ts not yet advanced; schedules fsync after 100ms backoff.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0)))
|
||||||
|
.times(1);
|
||||||
|
// fsync is fire-and-forget: a transport error must be silently ignored.
|
||||||
|
_mock
|
||||||
|
->expect("fsync",
|
||||||
|
RemoteCommandResponse::make_forTest(
|
||||||
|
Status(ErrorCodes::OperationFailed, "fsync failed at sync source")))
|
||||||
|
.times(1);
|
||||||
|
// replSetGetStatus is still called after the fsync failure. The final error comes from
|
||||||
|
// here (NotWritablePrimary), not from fsync (OperationFailed), confirming the fsync
|
||||||
|
// error was swallowed.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
RemoteCommandResponse::make_forTest(
|
||||||
|
Status(ErrorCodes::NotWritablePrimary, "not primary")))
|
||||||
|
.times(1);
|
||||||
|
}
|
||||||
|
const auto startTime = getNet()->now();
|
||||||
|
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
|
||||||
|
// Advance clock past the 100ms backoff so the fsync and subsequent replSetGetStatus fire.
|
||||||
|
_mock->runUntil(startTime + Milliseconds(200));
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::NotWritablePrimary, _lastApplied);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerWaitLoopPassesThroughReplSetGetStatusError) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto opCtx = makeOpCtx();
|
||||||
|
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
|
||||||
|
{
|
||||||
|
MockNetwork::InSequence seq(*_mock);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(2);
|
||||||
|
_mock
|
||||||
|
->expect([](auto& r) { return r["find"].str() == "system.version"; },
|
||||||
|
makeCursorResponse(
|
||||||
|
0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()}))
|
||||||
|
.times(1);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(1);
|
||||||
|
// Stable ts not yet advanced; schedules fsync after 100ms backoff.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0)))
|
||||||
|
.times(1);
|
||||||
|
_mock->expect("fsync", BSON("ok" << 1)).times(1);
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
RemoteCommandResponse::make_forTest(Status(ErrorCodes::OperationFailed,
|
||||||
|
"replSetGetStatus failed at sync "
|
||||||
|
"source")))
|
||||||
|
.times(1);
|
||||||
|
}
|
||||||
|
const auto startTime = getNet()->now();
|
||||||
|
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
|
||||||
|
// Advance clock past the 100ms backoff so the fsync and subsequent replSetGetStatus fire.
|
||||||
|
_mock->runUntil(startTime + Milliseconds(200));
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerWaitLoopPassesThroughReplSetGetStatusCommandError) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto opCtx = makeOpCtx();
|
||||||
|
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
|
||||||
|
{
|
||||||
|
MockNetwork::InSequence seq(*_mock);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(2);
|
||||||
|
_mock
|
||||||
|
->expect([](auto& r) { return r["find"].str() == "system.version"; },
|
||||||
|
makeCursorResponse(
|
||||||
|
0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()}))
|
||||||
|
.times(1);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(1);
|
||||||
|
// Stable ts not yet advanced; schedules fsync after 100ms backoff.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0)))
|
||||||
|
.times(1);
|
||||||
|
_mock->expect("fsync", BSON("ok" << 1)).times(1);
|
||||||
|
// Network call succeeds but command returns {ok: 0}.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 0 << "code" << ErrorCodes::NotWritablePrimary << "errmsg"
|
||||||
|
<< "not primary"))
|
||||||
|
.times(1);
|
||||||
|
}
|
||||||
|
const auto startTime = getNet()->now();
|
||||||
|
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
|
||||||
|
_mock->runUntil(startTime + Milliseconds(200));
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::NotWritablePrimary, _lastApplied);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerWaitLoopRetriesUntilStableTimestampAdvances) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto opCtx = makeOpCtx();
|
||||||
|
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
|
||||||
|
{
|
||||||
|
MockNetwork::InSequence seq(*_mock);
|
||||||
|
// Begin fetching and applying timestamps.
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(2)}))
|
||||||
|
.times(2);
|
||||||
|
_mock
|
||||||
|
->expect([](auto& r) { return r["find"].str() == "system.version"; },
|
||||||
|
makeCursorResponse(
|
||||||
|
0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()}))
|
||||||
|
.times(1);
|
||||||
|
// Earliest oplog entry.
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(0)}))
|
||||||
|
.times(1);
|
||||||
|
// Return lastStableRecoveryTimestamp not equal to earliest ts so that we don't skip the
|
||||||
|
// wait. Stable ts not yet advanced (Timestamp(1,1) < beginApplyingTs≈Timestamp(2,0));
|
||||||
|
// schedules fsync after 100ms backoff.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(1, 1)))
|
||||||
|
.times(1);
|
||||||
|
// First check (after 100ms backoff): stable timestamp has advanced.
|
||||||
|
_mock->expect("fsync", BSON("ok" << 1)).times(1);
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(2, 1)))
|
||||||
|
.times(1);
|
||||||
|
}
|
||||||
|
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
|
||||||
|
// Advance mock clock by 100ms to trigger the 100ms retry backoff and consume all expectations.
|
||||||
|
_mock->runUntil(getNet()->now() + Milliseconds(100));
|
||||||
|
ASSERT_OK(initialSyncer->shutdown());
|
||||||
|
_mock->runUntilIdle();
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerWaitLoopTimesOut) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
RAIIServerParameterControllerForTest noRetryPeriod(
|
||||||
|
"initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs", 1);
|
||||||
|
|
||||||
|
FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
|
||||||
|
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
|
||||||
|
"skipRecoverUserWriteCriticalSections");
|
||||||
|
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto opCtx = makeOpCtx();
|
||||||
|
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
|
||||||
|
{
|
||||||
|
MockNetwork::InSequence seq(*_mock);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(2);
|
||||||
|
_mock
|
||||||
|
->expect([](auto& r) { return r["find"].str() == "system.version"; },
|
||||||
|
makeCursorResponse(
|
||||||
|
0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()}))
|
||||||
|
.times(1);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(1);
|
||||||
|
// Stable ts not yet advanced; schedules fsync after 100ms. Then 4 rounds of
|
||||||
|
// fsync+replSetGetStatus at T≈100ms, 300ms, 700ms, 1000ms (deadline-clamped).
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0)))
|
||||||
|
.times(1);
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
_mock->expect("fsync", BSON("ok" << 1)).times(1);
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0)))
|
||||||
|
.times(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const auto startTime = getNet()->now();
|
||||||
|
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
|
||||||
|
_mock->runUntil(startTime + Seconds(2));
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, _lastApplied.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verifies the kMaxExponentialBackoffMillis (30 s) cap on the retry sleep interval.
|
||||||
|
//
|
||||||
|
// After round 9 (T=51100ms), sleepMillis would double to 51200ms without the cap. With the 30s
|
||||||
|
// cap it becomes 30000ms, so round 10 fires at T=81100ms instead of T=102300ms. Setting the
|
||||||
|
// deadline to 82s means round 11 fires at the deadline (82000ms), whereas without the cap only 10
|
||||||
|
// waits fire before the deadline, leaving one unsatisfied expectation and failing the test.
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerWaitLoopBackoffCappedAt30Seconds) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
RAIIServerParameterControllerForTest retryPeriod(
|
||||||
|
"initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs", 82);
|
||||||
|
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto opCtx = makeOpCtx();
|
||||||
|
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
|
||||||
|
{
|
||||||
|
MockNetwork::InSequence seq(*_mock);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(2);
|
||||||
|
_mock
|
||||||
|
->expect([](auto& r) { return r["find"].str() == "system.version"; },
|
||||||
|
makeCursorResponse(
|
||||||
|
0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()}))
|
||||||
|
.times(1);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(1);
|
||||||
|
// Ts not yet advanced; schedules fsync after 100ms backoff. 11 rounds of
|
||||||
|
// (fsync + replSetGetStatus) fire at T≈100ms, 300ms, 700ms, 1500ms, 3100ms, 6300ms,
|
||||||
|
// 12700ms, 25500ms, 51100ms, 81100ms (cap kicks in here), and 82000ms (deadline).
|
||||||
|
// Round 11 at T=82000ms sees now >= deadline and returns ExceededTimeLimit.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0)))
|
||||||
|
.times(1);
|
||||||
|
for (int i = 0; i < 11; ++i) {
|
||||||
|
_mock->expect("fsync", BSON("ok" << 1)).times(1);
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0)))
|
||||||
|
.times(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const auto startTime = getNet()->now();
|
||||||
|
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
|
||||||
|
_mock->runUntil(startTime + Seconds(83));
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, _lastApplied.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerWaitLoopFailsIfLastStableRecoveryTsMissing) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto opCtx = makeOpCtx();
|
||||||
|
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
|
||||||
|
{
|
||||||
|
MockNetwork::InSequence seq(*_mock);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(2);
|
||||||
|
_mock
|
||||||
|
->expect([](auto& r) { return r["find"].str() == "system.version"; },
|
||||||
|
makeCursorResponse(
|
||||||
|
0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()}))
|
||||||
|
.times(1);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(1);
|
||||||
|
// Stable ts not yet advanced; schedules fsync after 100ms backoff.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0)))
|
||||||
|
.times(1);
|
||||||
|
_mock->expect("fsync", BSON("ok" << 1)).times(1);
|
||||||
|
// First wait-loop check (after fsync): response missing lastStableRecoveryTimestamp.
|
||||||
|
_mock->expect("replSetGetStatus", BSON("ok" << 1 /* no lastStableRecoveryTimestamp */))
|
||||||
|
.times(1);
|
||||||
|
}
|
||||||
|
const auto startTime = getNet()->now();
|
||||||
|
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
|
||||||
|
// Advance past the 100ms backoff to trigger the fsync and subsequent replSetGetStatus.
|
||||||
|
_mock->runUntil(startTime + Milliseconds(100));
|
||||||
|
_mock->runUntilExpectationsSatisfied();
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InitialSyncerTest, InitialSyncerWaitLoopExitsOnFirstCheckWhenStableTsAlreadyAdvanced) {
|
||||||
|
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
|
||||||
|
auto initialSyncer = &getInitialSyncer();
|
||||||
|
auto opCtx = makeOpCtx();
|
||||||
|
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
|
||||||
|
{
|
||||||
|
MockNetwork::InSequence seq(*_mock);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(2);
|
||||||
|
_mock
|
||||||
|
->expect([](auto& r) { return r["find"].str() == "system.version"; },
|
||||||
|
makeCursorResponse(
|
||||||
|
0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()}))
|
||||||
|
.times(1);
|
||||||
|
_mock
|
||||||
|
->expect(
|
||||||
|
BSON("find" << "oplog.rs"),
|
||||||
|
makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)}))
|
||||||
|
.times(1);
|
||||||
|
// replSetGetStatus returns a ts already past beginApplyingTs — no fsync expected.
|
||||||
|
_mock
|
||||||
|
->expect("replSetGetStatus",
|
||||||
|
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(99, 1)))
|
||||||
|
.times(1);
|
||||||
|
}
|
||||||
|
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
|
||||||
|
_mock->runUntilExpectationsSatisfied();
|
||||||
|
ASSERT_OK(initialSyncer->shutdown());
|
||||||
|
_mock->runUntilIdle();
|
||||||
|
initialSyncer->join();
|
||||||
|
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user