diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 33bcb6b095e..4ca8f715548 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -51,14 +51,22 @@ last-continuous: ticket: SERVER-113184 - test_file: jstests/replsets/initial_sync_commit_prepared_transaction.js ticket: SERVER-113184 + - test_file: jstests/replsets/initial_sync_drop_collection.js + ticket: SERVER-113184 + - test_file: jstests/replsets/initial_sync_during_stepdown.js + ticket: SERVER-113184 - test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js ticket: SERVER-113184 - test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application.js ticket: SERVER-113184 + - test_file: jstests/replsets/initial_sync_rename_collection.js + ticket: SERVER-113184 - test_file: jstests/replsets/initial_sync_replSetGetStatus.js ticket: SERVER-121218 - test_file: jstests/replsets/initial_sync_test_fixture_test.js ticket: SERVER-113184 + - test_file: jstests/replsets/initial_sync_waits_for_stable_timestamp.js + ticket: SERVER-113184 - test_file: jstests/replsets/server_status_metrics.js ticket: SERVER-120318 - test_file: jstests/sharding/chunk_migration_maxkey_boundary.js @@ -466,6 +474,10 @@ last-lts: ticket: SERVER-94259 - test_file: jstests/replsets/initial_sync_commit_prepared_transaction.js ticket: SERVER-113184 + - test_file: jstests/replsets/initial_sync_drop_collection.js + ticket: SERVER-113184 + - test_file: jstests/replsets/initial_sync_during_stepdown.js + ticket: SERVER-113184 - test_file: jstests/replsets/initial_sync_during_stepdown.js ticket: SERVER-89664 - test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js @@ -474,12 +486,16 @@ last-lts: ticket: SERVER-113184 - test_file: jstests/replsets/initial_sync_index_conflict_recreate.js ticket: SERVER-93141 + - test_file: jstests/replsets/initial_sync_rename_collection.js + ticket: SERVER-113184 - test_file: jstests/replsets/initial_sync_replSetGetStatus.js ticket: SERVER-121218 - test_file: jstests/replsets/initial_sync_survives_restart.js ticket: SERVER-88447 - test_file: jstests/replsets/initial_sync_test_fixture_test.js ticket: SERVER-113184 + - test_file: jstests/replsets/initial_sync_waits_for_stable_timestamp.js + ticket: SERVER-113184 - test_file: jstests/replsets/large_dockey_succeeds_on_secondary.js ticket: SERVER-104405 - test_file: jstests/replsets/log_unprepared_abort_txns.js diff --git a/jstests/replsets/initial_sync_drop_collection.js b/jstests/replsets/initial_sync_drop_collection.js index 1377d9e54ce..c752369ec9a 100644 --- a/jstests/replsets/initial_sync_drop_collection.js +++ b/jstests/replsets/initial_sync_drop_collection.js @@ -51,6 +51,13 @@ function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) { // can check initialSyncStatus fields after initial sync is complete. secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"}); secondaryStartupParams["numInitialSyncAttempts"] = 1; + // We must disable this parameter because the primary may be able to advance the stable + // timestamp after initiate prior to this restart. As a result, the initial sync + // node may see that it is no longer initiating the set, and wait for stable + // to advance to beginApplying in initial sync. Since the majority of this + // set is 2, the primary will be unable to advance its stable, leaving this node + // stuck in initial sync. We must disable the wait to avoid this scenario. + secondaryStartupParams["initialSyncWaitForSyncSourceLastStableRecoveryTs"] = false; secondary = replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams}); secondaryDB = secondary.getDB(dbName); secondaryColl = secondaryDB[collName]; diff --git a/jstests/replsets/initial_sync_during_stepdown.js b/jstests/replsets/initial_sync_during_stepdown.js index 06425855c53..85ad5358abb 100644 --- a/jstests/replsets/initial_sync_during_stepdown.js +++ b/jstests/replsets/initial_sync_during_stepdown.js @@ -54,6 +54,13 @@ function setupTest({ // Skip clearing initial sync progress after a successful initial sync attempt so that we // can check initialSyncStatus fields after initial sync is complete. secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"}); + // We must disable this parameter because the primary may be able to advance the stable + // timestamp after initiate prior to this restart. As a result, the initial sync + // node may see that it is no longer initiating the set, and wait for stable + // to advance to beginApplying in initial sync. Since the majority of this + // set is 2, the primary will be unable to advance its stable, leaving this node + // stuck in initial sync. We must disable the wait to avoid this scenario. + secondaryStartupParams["initialSyncWaitForSyncSourceLastStableRecoveryTs"] = false; secondary = rst.start(secondary, {startClean: true, setParameter: secondaryStartupParams}); secondaryDB = secondary.getDB(dbName); secondaryColl = secondaryDB[collName]; diff --git a/jstests/replsets/initial_sync_rename_collection.js b/jstests/replsets/initial_sync_rename_collection.js index 931039017c1..a595b5d87ed 100644 --- a/jstests/replsets/initial_sync_rename_collection.js +++ b/jstests/replsets/initial_sync_rename_collection.js @@ -56,6 +56,13 @@ function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) { // can check initialSyncStatus fields after initial sync is complete. secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"}); secondaryStartupParams["numInitialSyncAttempts"] = 1; + // We must disable this parameter because the primary may be able to advance the stable + // timestamp after initiate prior to this restart. As a result, the initial sync + // node may see that it is no longer initiating the set, and wait for stable + // to advance to beginApplying in initial sync. Since the majority of this + // set is 2, the primary will be unable to advance its stable, leaving this node + // stuck in initial sync. We must disable the wait to avoid this scenario. + secondaryStartupParams["initialSyncWaitForSyncSourceLastStableRecoveryTs"] = false; secondary = replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams}); secondaryDB = secondary.getDB(dbName); secondaryColl = secondaryDB[collName]; diff --git a/jstests/replsets/initial_sync_waits_for_stable_timestamp.js b/jstests/replsets/initial_sync_waits_for_stable_timestamp.js new file mode 100644 index 00000000000..9c0e0bbbd52 --- /dev/null +++ b/jstests/replsets/initial_sync_waits_for_stable_timestamp.js @@ -0,0 +1,133 @@ +/** + * Tests that initial sync correctly waits for the sync source's lastStableRecoveryTimestamp to + * advance past beginApplyingTimestamp: + * 1. Initial sync succeeds once the stable timestamp is released and allowed to advance. + * 2. Initial sync fails the current attempt if the stable timestamp does not advance within the + * configured retry period, then succeeds on the next attempt once it is released. + */ + +import {configureFailPoint} from "jstests/libs/fail_point_util.js"; +import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js"; +import {ReplSetTest} from "jstests/libs/replsettest.js"; + +describe("initial sync waits for sync source stable recovery timestamp to advance", function () { + const kDbName = "test"; + const kCollName = "coll"; + let rst; + let primary; + + beforeEach(function () { + rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + primary = rst.getPrimary(); + }); + + afterEach(function () { + rst.stopSet(); + }); + + // Sets up the primary's stable timestamp hold and adds a new secondary configured to use the + // wait-for-stable-timestamp feature. Returns the failpoint handle and the secondary node. + function prepareInitialSync(extraParams = {}) { + // Capture the initiating set entry ts (the earliest oplog entry on a fresh primary). + const initiatingSetTs = primary.getDB("local").oplog.rs.find().sort({$natural: 1}).limit(1).next().ts; + + // Insert docs to populate the collection before the stable-timestamp pin. + assert.commandWorked(primary.getDB(kDbName)[kCollName].insertMany([{_id: 1}, {_id: 2}, {_id: 3}])); + + // Wait for stable ts to checkpoint at a strictly later *second* than initiatingSetTs. + // _initiatingSetStableTimestampCallback computes diff = stableTs.getSecs() - + // earliestTs.getSecs(). With thresholdSecs=0 (JS test infra default), we need diff >= 1 + // so the initiating-set skip (diff <= thresholdSecs) does not fire. + let pinTs; + assert.soon(() => { + assert.commandWorked(primary.getDB(kDbName)[kCollName].updateOne({_id: 1}, {$inc: {_v: 1}})); + assert.commandWorked(primary.adminCommand({fsync: 1})); + const status = assert.commandWorked(primary.adminCommand({replSetGetStatus: 1})); + const st = status.lastStableRecoveryTimestamp; + if (st.getTime() > initiatingSetTs.getTime()) { + pinTs = st; + return true; + } + return false; + }, "Timed out waiting for primary stable ts to advance past initiating set entry second"); + + // Pin stable ts here so beginApplyingTimestamp (set from the next inserts) will be above it. + const holdFp = configureFailPoint(primary, "holdStableTimestampAtSpecificTimestamp", {timestamp: pinTs}); + + // Insert more docs to push optime above pinTs so beginApplyingTimestamp > pinTs. + assert.commandWorked(primary.getDB(kDbName)[kCollName].insertMany([{_id: 4}, {_id: 5}, {_id: 6}])); + + const secondary = rst.add({ + rsConfig: {priority: 0, votes: 0}, + setParameter: Object.assign( + { + numInitialSyncAttempts: 2, + initialSyncWaitForSyncSourceLastStableRecoveryTs: true, + }, + extraParams, + ), + }); + rst.reInitiate(); + rst.waitForState(secondary, ReplSetTest.State.STARTUP_2); + return {holdFp, secondary}; + } + + it("succeeds once the sync source stable recovery timestamp advances", function () { + const {holdFp, secondary} = prepareInitialSync(); + + // Wait until the secondary has entered the stable-timestamp wait loop (log ID 11318413). + assert.soon( + () => checkLog.checkContainsOnce(secondary, 11318413), + "Timed out waiting for secondary to enter stable timestamp wait loop", + ); + + jsTestLog("Releasing stable timestamp hold on primary."); + holdFp.off(); + // A new write is required to trigger setStableTimestamp now that the hold is released; + // without it, WT's stable timestamp stays at the pinned value indefinitely. + assert.commandWorked(primary.getDB(kDbName)[kCollName].insertOne({_id: 7})); + + rst.awaitSecondaryNodes(null, [secondary]); + assert.eq(7, secondary.getDB(kDbName)[kCollName].find().itcount()); + }); + + it("fails if stable timestamp does not advance within the retry period", function () { + const kRetryPeriodSecs = 10; + const {holdFp, secondary} = prepareInitialSync({ + initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs: kRetryPeriodSecs, + }); + + // Wait for the first initial sync attempt to time out (log ID 11318417). + assert.soon( + () => { + try { + const status = secondary.adminCommand({replSetGetStatus: 1}); + return status.initialSyncStatus && status.initialSyncStatus.failedInitialSyncAttempts >= 1; + } catch (e) { + return false; + } + }, + "Timed out waiting for initial sync attempt to fail", + (kRetryPeriodSecs + 60) * 1000, + ); + + const status = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); + const failedAttempt = status.initialSyncStatus.initialSyncAttempts[0]; + assert( + failedAttempt.status.includes("Failed to wait for stable recovery timestamp"), + "Expected stable timestamp wait timeout error", + {failedAttempt}, + ); + + jsTestLog("Initial sync failed as expected. Releasing hold for second attempt."); + holdFp.off(); + // A new write is required to trigger setStableTimestamp now that the hold is released; + // without it, WT's stable timestamp stays at the pinned value indefinitely. + assert.commandWorked(primary.getDB(kDbName)[kCollName].insertOne({_id: 7})); + + rst.awaitSecondaryNodes(null, [secondary]); + assert.eq(7, secondary.getDB(kDbName)[kCollName].find().itcount()); + }); +}); diff --git a/src/mongo/db/repl/initial_sync/callback_completion_guard.h b/src/mongo/db/repl/initial_sync/callback_completion_guard.h index 108d4530e5f..443985ac01b 100644 --- a/src/mongo/db/repl/initial_sync/callback_completion_guard.h +++ b/src/mongo/db/repl/initial_sync/callback_completion_guard.h @@ -90,6 +90,7 @@ public: const Result& result); void setResultAndCancelRemainingWork(const std::unique_lock& lock, const Result& result); + void setResultAndCancelRemainingWork(WithLock lock, const Result& result); private: /** @@ -137,6 +138,12 @@ void CallbackCompletionGuard::setResultAndCancelRemainingWork( _setResultAndCancelRemainingWork(lock, result); } +template +void CallbackCompletionGuard::setResultAndCancelRemainingWork(WithLock lock, + const Result& result) { + _setResultAndCancelRemainingWork(lock, result); +} + template void CallbackCompletionGuard::_setResultAndCancelRemainingWork(WithLock lk, const Result& result) { diff --git a/src/mongo/db/repl/initial_sync/initial_sync_state.h b/src/mongo/db/repl/initial_sync/initial_sync_state.h index 40f865e5991..8973ce9afe8 100644 --- a/src/mongo/db/repl/initial_sync/initial_sync_state.h +++ b/src/mongo/db/repl/initial_sync/initial_sync_state.h @@ -59,7 +59,13 @@ struct InitialSyncState { bool earliestOplogEntryIsInitiatingSet = false; Timestamp earliestOplogEntryTimestamp; - Date_t waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline; + Date_t waitForSyncSourceStableTimestampAdvanceStartTime; // Time at which we started waiting + // for the sync source's last + // checkpoint to advance. Used with + // the retry period parameter to + // compute the deadline on each loop. + int waitForSyncSourceStableTimestampAdvanceSleepMillis = + 100; // How long to sleep in-between attempts. Increases exponentially. }; } // namespace repl diff --git a/src/mongo/db/repl/initial_sync/initial_syncer.cpp b/src/mongo/db/repl/initial_sync/initial_syncer.cpp index aff30f4f760..9931604b0e9 100644 --- a/src/mongo/db/repl/initial_sync/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_sync/initial_syncer.cpp @@ -165,6 +165,10 @@ using LockGuard = std::lock_guard; // Used to reset the oldest timestamp during initial sync to a non-null timestamp. const Timestamp kTimestampOne(0, 1); +// Maximum amount of time we will pause in-between waits for our sync source to advance stable +// timestamp. +const int kMaxExponentialBackoffMillis = 30000; + ServiceContext::UniqueOperationContext makeOpCtx() { return cc().makeOperationContext(); } @@ -1269,6 +1273,13 @@ void InitialSyncer::_initiatingSetStableTimestampCallback( return; } + status = getStatusFromCommandResult(callbackArgs.response.data); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork(lock, status); + _initialSyncState.reset(); + return; + } + auto stableElem = callbackArgs.response.data["lastStableRecoveryTimestamp"]; if (!stableElem) { LOGV2_WARNING(11318414, @@ -1334,18 +1345,194 @@ void InitialSyncer::_initiatingSetStableTimestampCallback( "isInitiatingSet"_attr = isInitiatingSet, "earliestOplogEntryTs"_attr = earliestTs, "lastStableRecoveryTs"_attr = lastStableRecoveryTs); - _initialSyncState->waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline = - (*_attemptExec)->now() + - Seconds(initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs.load()); - // TODO (SERVER-125965): Wait for lastStableRecoveryTimestamp to advance in this case instead of - // proceeding directly to _initializeOplogFetcherAndDbCloners. - status = _scheduleWorkAndSaveHandle( + _initialSyncState->waitForSyncSourceStableTimestampAdvanceStartTime = (*_attemptExec)->now(); + _checkStableTimestampAdvancementLocked( + lock, lastStableRecoveryTs, onCompletionGuard, beginFetchingOpTime); +} + +void InitialSyncer::_runFsyncOnSyncSource(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard, + const OpTime& beginFetchingOpTime) { + std::unique_lock lock(_mutex); + if (!_checkForShutdownAndHandleError( + lock, callbackArgs, onCompletionGuard, "error scheduling fsync on sync source")) { + return; + } + + executor::RemoteCommandRequest fsyncRequest( + _syncSource, DatabaseName::kAdmin, BSON("fsync" << 1), nullptr); + auto cbHandle = + (*_attemptExec) + ->scheduleRemoteCommand( + std::move(fsyncRequest), + [this, onCompletionGuard, beginFetchingOpTime]( + TaskExecutor::RemoteCommandCallbackArgs args) { + // fsync is fire-and-forget: proceed to replSetGetStatus regardless of outcome. + std::unique_lock lock(_mutex); + auto status = _scheduleWorkAndSaveHandle( + lock, + [=, this](const executor::TaskExecutor::CallbackArgs& args) { + _runReplSetGetStatusOnSyncSource( + args, onCompletionGuard, beginFetchingOpTime); + }, + &_waitForSyncSourceStableTimestampHandle, + "_runReplSetGetStatusOnSyncSource"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork(lock, status); + _initialSyncState.reset(); + } + }); + if (!cbHandle.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork(lock, cbHandle.getStatus()); + _initialSyncState.reset(); + return; + } + + // Save the handle so that it can be cancelled if initial sync shuts down. + _waitForSyncSourceStableTimestampHandle = cbHandle.getValue(); +} + +void InitialSyncer::_runReplSetGetStatusOnSyncSource( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard, + const OpTime& beginFetchingOpTime) { + std::unique_lock lock(_mutex); + if (!_checkForShutdownAndHandleError(lock, + callbackArgs, + onCompletionGuard, + "error scheduling replSetGetStatus on sync source")) { + return; + } + + executor::RemoteCommandRequest replSetGetStatusRequest( + _syncSource, DatabaseName::kAdmin, BSON("replSetGetStatus" << 1), nullptr); + auto cbHandle = + (*_attemptExec) + ->scheduleRemoteCommand(std::move(replSetGetStatusRequest), + [this, onCompletionGuard, beginFetchingOpTime]( + TaskExecutor::RemoteCommandCallbackArgs args) { + _handleLastStableRecoveryTsResponse( + args, onCompletionGuard, beginFetchingOpTime); + }); + if (!cbHandle.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork(lock, cbHandle.getStatus()); + _initialSyncState.reset(); + return; + } + + // Save the handle so that it can be cancelled if initial sync shuts down. + _waitForSyncSourceStableTimestampHandle = cbHandle.getValue(); +} + +void InitialSyncer::_handleLastStableRecoveryTsResponse( + const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard, + const OpTime& beginFetchingOpTime) { + std::unique_lock lock(_mutex); + + auto status = _checkForShutdownAndConvertStatus( lock, + callbackArgs.response.status, + "error waiting for sync source lastStableRecoveryTimestamp to advance"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork(lock, status); + _initialSyncState.reset(); + return; + } + + status = getStatusFromCommandResult(callbackArgs.response.data); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork(lock, status); + _initialSyncState.reset(); + return; + } + + auto stableElem = callbackArgs.response.data["lastStableRecoveryTimestamp"]; + if (!stableElem) { + onCompletionGuard->setResultAndCancelRemainingWork( + lock, + Status(ErrorCodes::InvalidSyncSource, + "sync source replSetGetStatus response is missing lastStableRecoveryTimestamp")); + _initialSyncState.reset(); + return; + } + + _checkStableTimestampAdvancementLocked( + lock, stableElem.timestamp(), onCompletionGuard, beginFetchingOpTime); +} + +void InitialSyncer::_checkStableTimestampAdvancementLocked( + WithLock lock, + Timestamp lastStableRecoveryTs, + std::shared_ptr onCompletionGuard, + const OpTime& beginFetchingOpTime) { + const auto beginApplyingTs = _initialSyncState->beginApplyingTimestamp; + + LOGV2_DEBUG(11318415, + 2, + "Checking sync source lastStableRecoveryTimestamp", + "lastStableRecoveryTimestamp"_attr = lastStableRecoveryTs, + "beginApplyingTimestamp"_attr = beginApplyingTs); + + if (lastStableRecoveryTs >= beginApplyingTs) { + LOGV2(11318400, + "Sync source lastStableRecoveryTimestamp has advanced; continuing with initial sync", + "lastStableRecoveryTimestamp"_attr = lastStableRecoveryTs, + "beginApplyingTimestamp"_attr = beginApplyingTs); + auto status = _scheduleWorkAndSaveHandle( + lock, + [=, this](const executor::TaskExecutor::CallbackArgs& args) { + _initializeOplogFetcherAndDbCloners(args, onCompletionGuard, beginFetchingOpTime); + }, + &_initializeOplogFetcherAndDbClonersHandle, + "_initializeOplogFetcherAndDbCloners"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork(lock, status); + _initialSyncState.reset(); + } + return; + } + + const auto now = (*_attemptExec)->now(); + const auto deadline = _initialSyncState->waitForSyncSourceStableTimestampAdvanceStartTime + + Seconds(initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs.load()); + if (now >= deadline) { + LOGV2_WARNING(11318417, + "Timed out waiting for sync source lastStableRecoveryTimestamp to advance", + "lastStableRecoveryTimestamp"_attr = lastStableRecoveryTs, + "beginApplyingTimestamp"_attr = beginApplyingTs, + "deadline"_attr = deadline); + onCompletionGuard->setResultAndCancelRemainingWork( + lock, + Status(ErrorCodes::ExceededTimeLimit, + "Failed to wait for stable recovery timestamp to advance. To resolve this, " + "ensure that the sync source is healthy and able to advance its checkpoint " + "timestamp.")); + _initialSyncState.reset(); + return; + } + + // Retry after exponential backoff. Maximum amount of time for exponential backoff is 30s. + const auto sleepMillis = _initialSyncState->waitForSyncSourceStableTimestampAdvanceSleepMillis; + _initialSyncState->waitForSyncSourceStableTimestampAdvanceSleepMillis = + std::min(sleepMillis * 2, kMaxExponentialBackoffMillis); + const auto when = std::min(now + Milliseconds(sleepMillis), deadline); + + LOGV2_DEBUG(11318418, + 2, + "Sync source lastStableRecoveryTimestamp not yet advanced; retrying after backoff", + "lastStableRecoveryTimestamp"_attr = lastStableRecoveryTs, + "beginApplyingTimestamp"_attr = beginApplyingTs, + "retryAfterMillis"_attr = sleepMillis); + + auto status = _scheduleWorkAtAndSaveHandle( + lock, + when, [=, this](const executor::TaskExecutor::CallbackArgs& args) { - _initializeOplogFetcherAndDbCloners(args, onCompletionGuard, beginFetchingOpTime); + _runFsyncOnSyncSource(args, onCompletionGuard, beginFetchingOpTime); }, - &_initializeOplogFetcherAndDbClonersHandle, - "_initializeOplogFetcherAndDbCloners"); + &_waitForSyncSourceStableTimestampHandle, + "_runFsyncOnSyncSource"); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork(lock, status); _initialSyncState.reset(); diff --git a/src/mongo/db/repl/initial_sync/initial_syncer.h b/src/mongo/db/repl/initial_sync/initial_syncer.h index e9e939b3a68..ac85ea551e4 100644 --- a/src/mongo/db/repl/initial_sync/initial_syncer.h +++ b/src/mongo/db/repl/initial_sync/initial_syncer.h @@ -407,9 +407,23 @@ private: * | * | * V - * _checkIfInitiatingSet() - * | - * | + * _initiatingSetStableTimestampCallback() + * | | + * (skip) | (wait needed) | + * | V + * | _checkStableTimestampAdvancementLocked() <---+ + * | | | + * | (not advanced) | + * | V | + * | _runFsyncOnSyncSource() (after backoff) | + * | | | + * | V | + * | _runReplSetGetStatusOnSyncSource() | + * | | | + * | V | + * | _handleLastStableRecoveryTsResponse() -------+ + * | (advanced or proceed) + * +-------------------+ * V * _initializeOplogFetcherAndDbCloners() * | @@ -574,7 +588,7 @@ private: * * Non-initiating case — when the conditions above are not met, records the retry * deadline in - * `_initialSyncState->waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline` and + * `_initialSyncState->waitForSyncSourceStableTimestampAdvanceStartTime` and * proceeds with the original `beginFetchingOpTime`. * (Full retry loop is tracked in SERVER-125965.) * @@ -585,6 +599,43 @@ private: std::shared_ptr onCompletionGuard, const OpTime& beginFetchingOpTime); + /** + * Issues {fsync: 1} on the sync source to force a checkpoint, then schedules + * _runReplSetGetStatusOnSyncSource. Called on every wait attempt. + */ + void _runFsyncOnSyncSource(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard, + const OpTime& beginFetchingOpTime); + + /** + * Issues {replSetGetStatus: 1} on the sync source and routes the response to + * _handleLastStableRecoveryTsResponse. + */ + void _runReplSetGetStatusOnSyncSource(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard, + const OpTime& beginFetchingOpTime); + + /** + * Validates the replSetGetStatus response and delegates to + * _checkStableTimestampAdvancementLocked for the wait-loop decision. + */ + void _handleLastStableRecoveryTsResponse( + const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard, + const OpTime& beginFetchingOpTime); + + /** + * Checks whether lastStableRecoveryTs >= beginApplyingTimestamp. If so, proceeds to + * _initializeOplogFetcherAndDbCloners. If the deadline has passed, fails the attempt with + * ExceededTimeLimit. Otherwise schedules the next exponential-backoff fsync retry. + * Must be called with _mutex held. + */ + void _checkStableTimestampAdvancementLocked( + WithLock lock, + Timestamp lastStableRecoveryTs, + std::shared_ptr onCompletionGuard, + const OpTime& beginFetchingOpTime); + /** * Initializes the oplog fetcher and database cloners. */ diff --git a/src/mongo/db/repl/initial_sync/initial_syncer_test.cpp b/src/mongo/db/repl/initial_sync/initial_syncer_test.cpp index e432e9ca50e..d99862b7216 100644 --- a/src/mongo/db/repl/initial_sync/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_sync/initial_syncer_test.cpp @@ -5343,6 +5343,19 @@ TEST_F(InitialSyncerTest, InitialSyncerReloadsTransientErrorRetryPeriodOnEachAtt ASSERT_EQUALS(initialSyncer->getAllowedOutageDuration_forTest(), Seconds(updatedRetryPeriod)); } +TEST_F(InitialSyncerTest, InitialSyncerFailsIfEarliestOplogEntryFetcherReturnsEmptyBatch) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + runWaitStableTsPreamble(); + auto initialSyncer = &getInitialSyncer(); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + processSuccessfulEarliestOplogEntryFetcherResponse({}); + } + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, _lastApplied); +} + TEST_F(InitialSyncerTest, InitialSyncerPassesThroughEarliestOplogEntryFetcherCallbackErrorWhenWaitingForStableTs) { initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); @@ -5380,6 +5393,27 @@ TEST_F(InitialSyncerTest, ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); } +TEST_F(InitialSyncerTest, + InitialSyncerPassesThroughReplSetGetStatusCommandErrorWhenWaitingForStableTs) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + runWaitStableTsPreamble(); + auto initialSyncer = &getInitialSyncer(); + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + processSuccessfulEarliestOplogEntryFetcherResponse({makeOplogEntryObj(1)}); + // Network call succeeds but command returns {ok: 0} — only caught by + // getStatusFromCommandResult, not by the response.status check. + auto replSetGetStatusRequest = net->scheduleSuccessfulResponse( + BSON("ok" << 0 << "code" << ErrorCodes::NotWritablePrimary << "errmsg" + << "not primary")); + assertRemoteCommandNameEquals("replSetGetStatus", replSetGetStatusRequest); + net->runReadyNetworkOperations(); + } + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::NotWritablePrimary, _lastApplied); +} + TEST_F(InitialSyncerTest, InitialSyncerFailsIfReplSetGetStatusResponseMissingLastStableRecoveryTimestamp) { initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); @@ -5497,4 +5531,368 @@ TEST_F(InitialSyncerTest, InitialSyncerDoesNotSkipWaitWhenInitiatingSetOutsideTh ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); } + +TEST_F(InitialSyncerTest, InitialSyncerWaitLoopIgnoresFsyncError) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + { + MockNetwork::InSequence seq(*_mock); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(2); + _mock + ->expect([](auto& r) { return r["find"].str() == "system.version"; }, + makeCursorResponse( + 0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()})) + .times(1); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(1); + // Stable ts not yet advanced; schedules fsync after 100ms backoff. + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0))) + .times(1); + // fsync is fire-and-forget: a transport error must be silently ignored. + _mock + ->expect("fsync", + RemoteCommandResponse::make_forTest( + Status(ErrorCodes::OperationFailed, "fsync failed at sync source"))) + .times(1); + // replSetGetStatus is still called after the fsync failure. The final error comes from + // here (NotWritablePrimary), not from fsync (OperationFailed), confirming the fsync + // error was swallowed. + _mock + ->expect("replSetGetStatus", + RemoteCommandResponse::make_forTest( + Status(ErrorCodes::NotWritablePrimary, "not primary"))) + .times(1); + } + const auto startTime = getNet()->now(); + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + // Advance clock past the 100ms backoff so the fsync and subsequent replSetGetStatus fire. + _mock->runUntil(startTime + Milliseconds(200)); + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::NotWritablePrimary, _lastApplied); +} + +TEST_F(InitialSyncerTest, InitialSyncerWaitLoopPassesThroughReplSetGetStatusError) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + { + MockNetwork::InSequence seq(*_mock); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(2); + _mock + ->expect([](auto& r) { return r["find"].str() == "system.version"; }, + makeCursorResponse( + 0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()})) + .times(1); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(1); + // Stable ts not yet advanced; schedules fsync after 100ms backoff. + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0))) + .times(1); + _mock->expect("fsync", BSON("ok" << 1)).times(1); + _mock + ->expect("replSetGetStatus", + RemoteCommandResponse::make_forTest(Status(ErrorCodes::OperationFailed, + "replSetGetStatus failed at sync " + "source"))) + .times(1); + } + const auto startTime = getNet()->now(); + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + // Advance clock past the 100ms backoff so the fsync and subsequent replSetGetStatus fire. + _mock->runUntil(startTime + Milliseconds(200)); + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + +TEST_F(InitialSyncerTest, InitialSyncerWaitLoopPassesThroughReplSetGetStatusCommandError) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + { + MockNetwork::InSequence seq(*_mock); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(2); + _mock + ->expect([](auto& r) { return r["find"].str() == "system.version"; }, + makeCursorResponse( + 0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()})) + .times(1); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(1); + // Stable ts not yet advanced; schedules fsync after 100ms backoff. + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0))) + .times(1); + _mock->expect("fsync", BSON("ok" << 1)).times(1); + // Network call succeeds but command returns {ok: 0}. + _mock + ->expect("replSetGetStatus", + BSON("ok" << 0 << "code" << ErrorCodes::NotWritablePrimary << "errmsg" + << "not primary")) + .times(1); + } + const auto startTime = getNet()->now(); + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + _mock->runUntil(startTime + Milliseconds(200)); + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::NotWritablePrimary, _lastApplied); +} + +TEST_F(InitialSyncerTest, InitialSyncerWaitLoopRetriesUntilStableTimestampAdvances) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + { + MockNetwork::InSequence seq(*_mock); + // Begin fetching and applying timestamps. + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(2)})) + .times(2); + _mock + ->expect([](auto& r) { return r["find"].str() == "system.version"; }, + makeCursorResponse( + 0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()})) + .times(1); + // Earliest oplog entry. + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(0)})) + .times(1); + // Return lastStableRecoveryTimestamp not equal to earliest ts so that we don't skip the + // wait. Stable ts not yet advanced (Timestamp(1,1) < beginApplyingTs≈Timestamp(2,0)); + // schedules fsync after 100ms backoff. + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(1, 1))) + .times(1); + // First check (after 100ms backoff): stable timestamp has advanced. + _mock->expect("fsync", BSON("ok" << 1)).times(1); + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(2, 1))) + .times(1); + } + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + // Advance mock clock by 100ms to trigger the 100ms retry backoff and consume all expectations. + _mock->runUntil(getNet()->now() + Milliseconds(100)); + ASSERT_OK(initialSyncer->shutdown()); + _mock->runUntilIdle(); + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + +TEST_F(InitialSyncerTest, InitialSyncerWaitLoopTimesOut) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + RAIIServerParameterControllerForTest noRetryPeriod( + "initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs", 1); + + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + FailPointEnableBlock skipRecoverUserWriteCriticalSections( + "skipRecoverUserWriteCriticalSections"); + + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + { + MockNetwork::InSequence seq(*_mock); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(2); + _mock + ->expect([](auto& r) { return r["find"].str() == "system.version"; }, + makeCursorResponse( + 0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()})) + .times(1); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(1); + // Stable ts not yet advanced; schedules fsync after 100ms. Then 4 rounds of + // fsync+replSetGetStatus at T≈100ms, 300ms, 700ms, 1000ms (deadline-clamped). + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0))) + .times(1); + for (int i = 0; i < 4; ++i) { + _mock->expect("fsync", BSON("ok" << 1)).times(1); + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0))) + .times(1); + } + } + const auto startTime = getNet()->now(); + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + _mock->runUntil(startTime + Seconds(2)); + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, _lastApplied.getStatus()); +} + +// Verifies the kMaxExponentialBackoffMillis (30 s) cap on the retry sleep interval. +// +// After round 9 (T=51100ms), sleepMillis would double to 51200ms without the cap. With the 30s +// cap it becomes 30000ms, so round 10 fires at T=81100ms instead of T=102300ms. Setting the +// deadline to 82s means round 11 fires at the deadline (82000ms), whereas without the cap only 10 +// waits fire before the deadline, leaving one unsatisfied expectation and failing the test. +TEST_F(InitialSyncerTest, InitialSyncerWaitLoopBackoffCappedAt30Seconds) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + RAIIServerParameterControllerForTest retryPeriod( + "initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs", 82); + + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + { + MockNetwork::InSequence seq(*_mock); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(2); + _mock + ->expect([](auto& r) { return r["find"].str() == "system.version"; }, + makeCursorResponse( + 0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()})) + .times(1); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(1); + // Ts not yet advanced; schedules fsync after 100ms backoff. 11 rounds of + // (fsync + replSetGetStatus) fire at T≈100ms, 300ms, 700ms, 1500ms, 3100ms, 6300ms, + // 12700ms, 25500ms, 51100ms, 81100ms (cap kicks in here), and 82000ms (deadline). + // Round 11 at T=82000ms sees now >= deadline and returns ExceededTimeLimit. + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0))) + .times(1); + for (int i = 0; i < 11; ++i) { + _mock->expect("fsync", BSON("ok" << 1)).times(1); + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0))) + .times(1); + } + } + const auto startTime = getNet()->now(); + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + _mock->runUntil(startTime + Seconds(83)); + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, _lastApplied.getStatus()); +} + +TEST_F(InitialSyncerTest, InitialSyncerWaitLoopFailsIfLastStableRecoveryTsMissing) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + { + MockNetwork::InSequence seq(*_mock); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(2); + _mock + ->expect([](auto& r) { return r["find"].str() == "system.version"; }, + makeCursorResponse( + 0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()})) + .times(1); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(1); + // Stable ts not yet advanced; schedules fsync after 100ms backoff. + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(0, 0))) + .times(1); + _mock->expect("fsync", BSON("ok" << 1)).times(1); + // First wait-loop check (after fsync): response missing lastStableRecoveryTimestamp. + _mock->expect("replSetGetStatus", BSON("ok" << 1 /* no lastStableRecoveryTimestamp */)) + .times(1); + } + const auto startTime = getNet()->now(); + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + // Advance past the 100ms backoff to trigger the fsync and subsequent replSetGetStatus. + _mock->runUntil(startTime + Milliseconds(100)); + _mock->runUntilExpectationsSatisfied(); + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied); +} + +TEST_F(InitialSyncerTest, InitialSyncerWaitLoopExitsOnFirstCheckWhenStableTsAlreadyAdvanced) { + initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true); + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + { + MockNetwork::InSequence seq(*_mock); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(2); + _mock + ->expect([](auto& r) { return r["find"].str() == "system.version"; }, + makeCursorResponse( + 0LL, NamespaceString::kServerConfigurationNamespace, {getLastLTSBson()})) + .times(1); + _mock + ->expect( + BSON("find" << "oplog.rs"), + makeCursorResponse(0LL, NamespaceString::kRsOplogNamespace, {makeOplogEntryObj(1)})) + .times(1); + // replSetGetStatus returns a ts already past beginApplyingTs — no fsync expected. + _mock + ->expect("replSetGetStatus", + BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(99, 1))) + .times(1); + } + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + _mock->runUntilExpectationsSatisfied(); + ASSERT_OK(initialSyncer->shutdown()); + _mock->runUntilIdle(); + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied); +} + } // namespace