Revert "SERVER-125964: Apply initial sync oplog entries from initiating set o… (#53240)" (#53899)

Co-authored-by: auto-revert-processor <devprod-si-team@mongodb.com>
GitOrigin-RevId: b840c596c0a1f61b60b0678ae28bc6b037286522
This commit is contained in:
auto-revert-app[bot] 2026-05-16 07:38:07 +00:00 committed by MongoDB Bot
parent 5aea7b5399
commit c021919b7c
13 changed files with 99 additions and 962 deletions

View File

@ -53,16 +53,6 @@ last-continuous:
ticket: SERVER-120318
- test_file: jstests/sharding/chunk_migration_maxkey_boundary.js
ticket: SERVER-121533
- test_file: jstests/replsets/initial_sync_test_fixture_test.js
ticket: SERVER-113184
- test_file: jstests/replsets/initial_sync_check_initiating_set_oplog_entry.js
ticket: SERVER-113184
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application.js
ticket: SERVER-113184
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js
ticket: SERVER-113184
- test_file: jstests/replsets/initial_sync_commit_prepared_transaction.js
ticket: SERVER-113184
suites: null
last-lts:
all:
@ -754,14 +744,4 @@ last-lts:
ticket: SERVER-91075
- test_file: src/mongo/db/modules/enterprise/jstests/fle2/query/range/two_sided_find.js
ticket: SERVER-91075
- test_file: jstests/replsets/initial_sync_test_fixture_test.js
ticket: SERVER-113184
- test_file: jstests/replsets/initial_sync_check_initiating_set_oplog_entry.js
ticket: SERVER-113184
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application.js
ticket: SERVER-113184
- test_file: jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js
ticket: SERVER-113184
- test_file: jstests/replsets/initial_sync_commit_prepared_transaction.js
ticket: SERVER-113184
suites: null

View File

@ -3008,16 +3008,6 @@ export class ReplSetTest {
// the number of connection attempts.
options.setParameter.numInitialSyncConnectAttempts = options.setParameter.numInitialSyncConnectAttempts || 60;
// Reduce the initiating-set optimization threshold so it doesn't fire broadly in tests. The
// default threshold (3600s) would trigger for every freshly-initiated test replica set. With
// threshold=0, the shortcut only fires when `diff <= 0` (i.e. the sync source's stable
// recovery TS has not advanced past the initiating-set entry timestamp). NOTE: this still
// fires when stable TS == earliest TS (diff=0). Tests that check beginApplyingTimestamp or
// oplog batch sizes may need to explicitly disable the feature entirely via
// setParameter: {initialSyncWaitForSyncSourceLastStableRecoveryTs: false}.
options.setParameter.initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs =
options.setParameter.initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs || 0;
// The default time for stepdown and quiesce mode in response to SIGTERM is 15 seconds.
// Reduce this to 100ms for faster shutdown.
options.setParameter.shutdownTimeoutMillisForSignaledShutdown =
@ -3071,15 +3061,6 @@ export class ReplSetTest {
delete options.setParameter.performTimeseriesCompressionIntermediateDataIntegrityCheckOnInsert;
}
const olderThan90 =
MongoRunner.compareBinVersions(
MongoRunner.getBinVersionFor(options.binVersion),
MongoRunner.getBinVersionFor("9.0"),
) === -1;
if (olderThan90) {
delete options.setParameter.initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs;
}
if (tojson(options) != tojson({})) jsTest.log.info({options});
jsTest.log.info("ReplSetTest " + (restart ? "(Re)" : "") + "Starting....");

View File

@ -1,113 +0,0 @@
/**
* Tests that initial sync skips waiting for the sync source's lastStableRecoveryTimestamp to
* advance when the sync source's earliest oplog entry is the "initiating set" entry and
* lastStableRecoveryTimestamp is within the configured threshold of that entry.
*
* Setup: a single-node primary whose stable recovery timestamp is held at the initiating-set
* entry's timestamp. Writes advance the primary's optime beyond that held value, so that without
* the skip logic, initial sync would stall waiting for the stable timestamp to catch up.
* With the skip logic, initial sync should complete successfully.
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
// Use a 1 MB oplog so the "initiating set" entry remains the earliest entry throughout the test.
const rst = new ReplSetTest({nodes: 1, nodeOptions: {oplogSize: 1}});
rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
// Wait for the primary to take its first stable checkpoint so that lastStableRecoveryTimestamp
// is set to a value near the initiating set entry's timestamp.
assert.soon(() => {
const status = assert.commandWorked(primary.adminCommand({replSetGetStatus: 1}));
return bsonWoCompare(status.lastStableRecoveryTimestamp, Timestamp(0, 0)) > 0;
}, "Timed out waiting for primary to take its first stable checkpoint");
// Retrieve the initiating set entry's timestamp from the head of the oplog.
const initiatingSetTs = primary.getDB("local").oplog.rs.find().sort({$natural: 1}).limit(1).next().ts;
jsTestLog("Initiating set oplog entry timestamp: " + tojson(initiatingSetTs));
// Hold the primary's stable recovery timestamp at the initiating set entry's timestamp.
// This prevents lastStableRecoveryTimestamp from advancing past initiatingSetTs, so that
// without the initiating-set skip logic, initial sync would stall indefinitely waiting for
// the stable timestamp to reach beginApplyingTimestamp.
const holdStableFp = configureFailPoint(primary, "holdStableTimestampAtSpecificTimestamp", {
timestamp: initiatingSetTs,
});
// Insert documents to advance the primary's optime. beginApplyingTimestamp will be set to
// a timestamp above the held stable recovery timestamp.
const testDb = primary.getDB("test");
assert.commandWorked(testDb.coll.insertMany([{a: 1}, {a: 2}, {a: 3}]));
// Capture the primary's last oplog entry timestamp. Without the initiating-set reset logic,
// this is what beginApplyingTimestamp would be set to on the secondary.
const normalBeginApplyingTs = primary.getDB("local").oplog.rs.find().sort({$natural: -1}).limit(1).next().ts;
assert(
bsonWoCompare(normalBeginApplyingTs, initiatingSetTs) > 0,
"Primary's last oplog entry should be newer than the initiating set entry. normalBeginApplyingTs=" +
tojson(normalBeginApplyingTs) +
" initiatingSetTs=" +
tojson(initiatingSetTs),
);
jsTestLog("Adding secondary to trigger initial sync.");
// Add a new non-voting secondary to trigger initial sync. numInitialSyncAttempts=1 ensures
// the test fails fast if the attempt is rejected rather than retried indefinitely.
const secondary = rst.add({
rsConfig: {priority: 0, votes: 0},
setParameter: {
numInitialSyncAttempts: 1,
// Manually configure this parameter to 3600 since it is set to 0 in JS test infrastructure.
initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs: 3600,
},
});
// Pause initial sync just before database cloning begins. At this point _checkIfInitiatingSet()
// has already run and overridden beginApplyingTimestamp with the initiating set entry timestamp.
const pauseBeforeCloningFp = configureFailPoint(secondary, "initialSyncHangBeforeCopyingDatabases");
rst.reInitiate();
// Wait for initial sync to reach the cloning phase.
pauseBeforeCloningFp.wait();
// Validate that beginApplyingTimestamp was reset to the initiating set entry's timestamp
// rather than remaining at the normal value (normalBeginApplyingTs).
const syncStatus = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
assert(syncStatus.initialSyncStatus, "Expected initialSyncStatus in replSetGetStatus during initial sync");
assert.eq(
syncStatus.initialSyncStatus.initialSyncOplogStart,
initiatingSetTs,
"beginApplyingTimestamp should have been reset to the initiating set entry timestamp " +
tojson(initiatingSetTs) +
", not the normal value " +
tojson(normalBeginApplyingTs),
);
jsTestLog(
"Validated: startApplyingTimestamp was reset from " +
tojson(normalBeginApplyingTs) +
" to initiating set entry timestamp: " +
tojson(initiatingSetTs),
);
pauseBeforeCloningFp.off();
// Initial sync should complete successfully because:
// - The earliest oplog entry on the sync source IS the "initiating set" noop entry.
// - lastStableRecoveryTimestamp == initiatingSetTs (held by the failpoint).
// - The diff between lastStableRecoveryTimestamp and the initiating set entry timestamp is
// zero, which is within the default 3600 s threshold.
// The InitialSyncer therefore skips the wait and proceeds directly to cloning.
rst.awaitSecondaryNodes(null, [secondary]);
jsTestLog("Initial sync completed successfully via the initiating-set skip path.");
// Verify the inserted documents were synced.
assert.eq(3, secondary.getDB("test").coll.find().itcount());
holdStableFp.off();
rst.stopSet();

View File

@ -78,17 +78,6 @@ secondary = replTest.start(
data: {namespace: testColl.getFullName(), numDocsToClone: 2},
}),
"numInitialSyncAttempts": 1,
// Disable this parameter as this test explicitly wants to validate a
// beginFetching/beginApplying timestamp in initial sync. These timestamps may
// get reset during the 'wait for stable timestamp' phase of initial sync, so
// we skip this phase.
// We must also disable this because the primary may be able to advance the stable
// timestamp after initiate prior to this restart. As a result, the initial sync
// node may see that it is no longer initiating the set, and wait for stable
// to advance to beginApplying in initial sync. Since the majority of this
// set is 2, the primary will be unable to advance its stable, leaving this node
// stuck in initial sync. We must disable the wait to avoid this scenario.
"initialSyncWaitForSyncSourceLastStableRecoveryTs": false,
},
},
true /* wait */,

View File

@ -111,17 +111,6 @@ secondary = replTest.start(
data: {namespace: testColl.getFullName(), numDocsToClone: 2},
}),
"numInitialSyncAttempts": 1,
// Disable this parameter as this test explicitly wants to validate a
// beginFetching/beginApplying timestamp in initial sync. These timestamps may
// get reset during the 'wait for stable timestamp' phase of initial sync, so
// we skip this phase.
// We must also disable this because the primary may be able to advance the stable
// timestamp after initiate prior to this restart. As a result, the initial sync
// node may see that it is no longer initiating the set, and wait for stable
// to advance to beginApplying in initial sync. Since the majority of this
// set is 2, the primary will be unable to advance its stable, leaving this node
// stuck in initial sync. We must disable the wait to avoid this scenario.
"initialSyncWaitForSyncSourceLastStableRecoveryTs": false,
},
},
true /* wait */,
@ -137,21 +126,6 @@ assert.commandWorked(
maxTimeMS: kDefaultWaitForFailPointTimeout,
}),
);
// Verify that the secondary has set the correct beginFetchingTimestamp and
// beginApplyingTimestamp before proceeding with collection cloning.
const syncStatus = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
assert.eq(
syncStatus.initialSyncStatus.initialSyncOplogFetchingStart,
beginFetchingTs,
"Expected beginFetchingTimestamp: " + tojson(beginFetchingTs),
);
assert.eq(
syncStatus.initialSyncStatus.initialSyncOplogStart,
beginApplyingTimestamp,
"Expected beginApplyingTimestamp: " + tojson(beginApplyingTimestamp),
);
jsTestLog("Running operations while collection cloning is paused");
// Run some operations on the sync source while collection cloning is paused so that we know

View File

@ -21,7 +21,6 @@
*/
import {PrepareHelpers} from "jstests/core/txns/libs/prepare_helpers.js";
import {kDefaultWaitForFailPointTimeout} from "jstests/libs/fail_point_util.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
const replTest = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0, votes: 0}}]});
@ -94,55 +93,7 @@ replTest.stop(
// Validation would encounter a prepare conflict on the open transaction.
{skipValidation: true},
);
secondary = replTest.start(
secondary,
{
startClean: true,
setParameter: {
"numInitialSyncAttempts": 1,
// Disable this parameter as this test explicitly wants to validate a
// beginFetching/beginApplying timestamp in initial sync. These timestamps may
// get reset during the 'wait for stable timestamp' phase of initial sync, so
// we skip this phase.
// We must also disable this because the primary may be able to advance the stable
// timestamp after initiate prior to this restart. As a result, the initial sync
// node may see that it is no longer initiating the set, and wait for stable
// to advance to beginApplying in initial sync. Since the majority of this
// set is 2, the primary will be unable to advance its stable, leaving this node
// stuck in initial sync. We must disable the wait to avoid this scenario.
"initialSyncWaitForSyncSourceLastStableRecoveryTs": false,
"failpoint.initialSyncHangBeforeCopyingDatabases": tojson({mode: "alwaysOn"}),
},
},
true /* wait */,
);
// Wait for the failpoint to be triggered so we know initial sync is paused with timestamps set.
assert.commandWorked(
secondary.adminCommand({
waitForFailPoint: "initialSyncHangBeforeCopyingDatabases",
timesEntered: 1,
maxTimeMS: kDefaultWaitForFailPointTimeout,
}),
);
const syncStatus = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
assert.eq(
syncStatus.initialSyncStatus.initialSyncOplogStart,
beginApplyingTimestamp,
"Expected beginApplyingTimestamp: " + tojson(beginApplyingTimestamp),
);
assert.eq(
syncStatus.initialSyncStatus.initialSyncOplogFetchingStart,
beginFetchingTs,
"Expected beginFetchingTimestamp: " + tojson(beginFetchingTs),
);
// Resume initial sync.
assert.commandWorked(
secondary.adminCommand({configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"}),
);
secondary = replTest.start(secondary, {startClean: true, setParameter: {"numInitialSyncAttempts": 1}}, true /* wait */);
replTest.awaitSecondaryNodes();
replTest.awaitReplication();

View File

@ -164,12 +164,6 @@ try {
checkLogForGetTimestampMsg(secondary, "beginApplyingTimestamp", prepareTimestamp, true);
checkLogForCollectionClonerMsg(secondary, "listDatabases", "admin", false);
// Disable the initiating-set optimization so that beginApplyingTimestamp stays at the
// prepare timestamp.
assert.commandWorked(
secondary.adminCommand({setParameter: 1, initialSyncWaitForSyncSourceLastStableRecoveryTs: false}),
);
// This step call will resume initial sync and pause it again after the node gets the
// listDatabases result from its sync source.
assert(!initialSyncTest.step());

View File

@ -29,7 +29,6 @@ function _testSecondaryMetricsHelper(
baseOpsReceived,
baseOpsWritten,
baseOpsBytes,
baseOpsNetwork,
) {
let ss = secondary.getDB("test").serverStatus();
jsTestLog(`Secondary ${secondary.host} metrics: ${tojson(ss.metrics)}`);
@ -37,14 +36,12 @@ function _testSecondaryMetricsHelper(
assert(ss.metrics.repl.network.readersCreated > 0, "no (oplog) readers created");
assert(ss.metrics.repl.network.getmores.num > 0, "no getmores");
assert(ss.metrics.repl.network.getmores.totalMillis > 0, "no getmores time");
// network.ops includes entries fetched during initial sync oplog application, which may
// exceed apply.ops when initial sync fetches from an earlier timestamp (e.g. when the
// sync source was recently initiated, the initial sync node will fetch and apply from the initiating set oplog
// entry). Use the network.ops baseline captured at test setup time so the assertion is self-consistent.
// The first oplog entry may or may not make it into network.ops now that we have two
// n ops (initiate and new primary) before steady replication starts.
// Sometimes, we disconnect from our sync source and since our find is a gte query, we may
// double count an oplog entry, so we need some wiggle room for that.
assert.lte(ss.metrics.repl.network.ops, opCount + baseOpsNetwork + 5, "wrong number of ops retrieved");
assert.gte(ss.metrics.repl.network.ops, opCount + baseOpsNetwork, "wrong number of ops retrieved");
assert.lte(ss.metrics.repl.network.ops, opCount + baseOpsApplied + 5, "wrong number of ops retrieved");
assert.gte(ss.metrics.repl.network.ops, opCount + baseOpsApplied, "wrong number of ops retrieved");
assert(ss.metrics.repl.network.bytes > 0, "zero or missing network bytes");
assert.gt(ss.metrics.repl.network.replSetUpdatePosition.num, 0, "no update position commands sent");
@ -90,15 +87,7 @@ function _testSecondaryMetricsHelper(
// Metrics are racy, e.g. repl.buffer.apply.count could over- or under-reported briefly. Retry on
// error.
function testSecondaryMetrics(
secondary,
opCount,
baseOpsApplied,
baseOpsReceived,
baseOpsWritten,
baseOpsBytes,
baseOpsNetwork,
) {
function testSecondaryMetrics(secondary, opCount, baseOpsApplied, baseOpsReceived, baseOpsWritten, baseOpsBytes) {
assert.soon(() => {
try {
_testSecondaryMetricsHelper(
@ -108,7 +97,6 @@ function testSecondaryMetrics(
baseOpsReceived,
baseOpsWritten,
baseOpsBytes,
baseOpsNetwork,
);
return true;
} catch (exc) {
@ -164,9 +152,6 @@ let secondaryBaseOplogOpsWritten = FeatureFlagUtil.isPresentAndEnabled(secondary
? ss.metrics.repl.write.batchSize
: undefined;
let secondaryBaseOplogBytes = ss.metrics.repl.apply.bytes;
// network.ops includes initial-sync oplog fetcher entries, which may exceed apply.ops when the
// sync source was recently initiated and beginFetchingTimestamp is moved to the initiate noop.
let secondaryBaseOplogOpsNetwork = ss.metrics.repl.network.ops;
// Disable batching of inserts so each one creates an oplog entry.
assert.commandWorked(testDB.adminCommand({setParameter: 1, internalInsertMaxBatchSize: 1}));
@ -185,7 +170,6 @@ testSecondaryMetrics(
secondaryBaseOplogOpsReceived,
secondaryBaseOplogOpsWritten,
secondaryBaseOplogBytes,
secondaryBaseOplogOpsNetwork,
);
let options = {writeConcern: {w: 2}, multi: true, upsert: true};
@ -198,7 +182,6 @@ testSecondaryMetrics(
secondaryBaseOplogOpsReceived,
secondaryBaseOplogOpsWritten,
secondaryBaseOplogBytes,
secondaryBaseOplogOpsNetwork,
);
// Test that the number of oplog getMore requested by the secondary and processed by the primary has

View File

@ -56,10 +56,6 @@ struct InitialSyncState {
Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states.
Timer timer; // Timer for timing how long each initial sync attempt takes.
size_t appliedOps = 0;
bool earliestOplogEntryIsInitiatingSet = false;
Timestamp earliestOplogEntryTimestamp;
Date_t waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline;
};
} // namespace repl

View File

@ -359,8 +359,6 @@ void InitialSyncer::_cancelRemainingWork(WithLock lk) {
_cancelHandle(lk, _getBaseRollbackIdHandle);
_cancelHandle(lk, _getLastRollbackIdHandle);
_cancelHandle(lk, _getNextApplierBatchHandle);
_cancelHandle(lk, _initializeOplogFetcherAndDbClonersHandle);
_cancelHandle(lk, _waitForSyncSourceStableTimestampHandle);
_shutdownComponent(lk, _oplogFetcher);
if (_sharedData) {
@ -377,7 +375,6 @@ void InitialSyncer::_cancelRemainingWork(WithLock lk) {
_shutdownComponent(lk, _fCVFetcher);
_shutdownComponent(lk, _lastOplogEntryFetcher);
_shutdownComponent(lk, _beginFetchingOpTimeFetcher);
_shutdownComponent(lk, _earliestOplogEntryFetcher);
(*_attemptExec)->shutdown();
(*_clonerAttemptExec)->shutdown();
_attemptCanceled = true;
@ -1135,286 +1132,6 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp(
}
}
void InitialSyncer::_initializeOplogFetcherAndDbCloners(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const OpTime& beginFetchingOpTime) {
std::unique_lock<std::mutex> lock(_mutex);
if (!_checkForShutdownAndHandleError(
lock,
callbackArgs,
onCompletionGuard,
"error while initializing oplog fetchers and db cloners")) {
_initialSyncState.reset();
return;
}
LOGV2(11318416,
"Initializing oplog fetcher and cloners",
"beginFetchingTimestamp"_attr = _initialSyncState->beginFetchingTimestamp,
"beginApplyingTimestamp"_attr = _initialSyncState->beginApplyingTimestamp);
const auto configResult = _dataReplicatorExternalState->getCurrentConfig();
auto status = configResult.getStatus();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState.reset();
return;
}
const auto& config = configResult.getValue();
OplogFetcher::Config oplogFetcherConfig(
beginFetchingOpTime,
_syncSource,
config,
initialSyncOplogFetcherBatchSize.load(),
OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource);
oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc;
_oplogFetcher = (*_createOplogFetcherFn)(
*_attemptExec,
std::make_unique<OplogFetcherRestartDecisionInitialSyncer>(
_sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts),
_dataReplicatorExternalState.get(),
[=, this](OplogFetcher::Documents::const_iterator first,
OplogFetcher::Documents::const_iterator last,
const OplogFetcher::DocumentsInfo& info) {
return _enqueueDocuments(first, last, info);
},
[=, this](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); },
std::move(oplogFetcherConfig));
LOGV2_DEBUG(21178, 2, "Starting OplogFetcher", "oplogFetcher"_attr = _oplogFetcher->toString());
// _startupComponent is shutdown-aware.
status = _startupComponent(lock, _oplogFetcher);
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState->allDatabaseCloner.reset();
return;
}
if (MONGO_unlikely(initialSyncHangBeforeCopyingDatabases.shouldFail())) {
lock.unlock();
// This could have been done with a scheduleWorkAt but this is used only by JS tests where
// we run with multiple threads so it's fine to spin on this thread.
// This log output is used in js tests so please leave it.
LOGV2(21179,
"initial sync - initialSyncHangBeforeCopyingDatabases fail point "
"enabled. Blocking until fail point is disabled.");
while (MONGO_unlikely(initialSyncHangBeforeCopyingDatabases.shouldFail()) &&
!_isShuttingDown()) {
mongo::sleepsecs(1);
}
lock.lock();
}
LOGV2_DEBUG(21180,
2,
"Starting AllDatabaseCloner",
"allDatabaseCloner"_attr = _initialSyncState->allDatabaseCloner->toString());
auto [startClonerFuture, startCloner] =
_initialSyncState->allDatabaseCloner->runOnExecutorEvent(*_clonerAttemptExec);
// runOnExecutorEvent ensures the future is not ready unless an error has occurred.
if (startClonerFuture.isReady()) {
status = startClonerFuture.getNoThrow();
invariant(!status.isOK());
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
return;
}
_initialSyncState->allDatabaseClonerFuture =
std::move(startClonerFuture).onCompletion([this, onCompletionGuard](Status status) mutable {
// The completion guard must run on the main executor, and never inline. In unit tests,
// without the executor call, it would run on the wrong executor. In both production
// and in unit tests, if the cloner finishes very quickly, the callback could run
// in-line and result in self-deadlock.
std::unique_lock<std::mutex> lock(_mutex);
auto exec_status = (*_attemptExec)
->scheduleWork([this, status, onCompletionGuard](
executor::TaskExecutor::CallbackArgs args) {
_allDatabaseClonerCallback(status, onCompletionGuard);
});
if (!exec_status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, exec_status.getStatus());
// In the shutdown case, it is possible the completion guard will be run
// from this thread (since the lambda holding another copy didn't schedule).
// If it does, we will self-deadlock if we're holding the lock, so release it.
lock.unlock();
}
// In unit tests, this reset ensures the completion guard does not run during the
// destruction of the lambda (which occurs on the wrong executor), except in the
// shutdown case.
onCompletionGuard.reset();
});
_setPhase(lock, Phase::kCloningData);
lock.unlock();
// Start (and therefore finish) the cloners outside the lock. This ensures onCompletion
// is not run with the mutex held, which would result in self-deadlock.
(*_clonerAttemptExec)->signalEvent(startCloner);
}
void InitialSyncer::_initiatingSetStableTimestampCallback(
const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const OpTime& beginFetchingOpTime) {
std::unique_lock<std::mutex> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus(
lock,
callbackArgs.response.status,
"error while running replSetGetStatus to check initiating set on sync source");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState.reset();
return;
}
auto stableElem = callbackArgs.response.data["lastStableRecoveryTimestamp"];
if (!stableElem) {
LOGV2_WARNING(11318414,
"Sync source replSetGetStatus response is missing "
"lastStableRecoveryTimestamp; failing initial sync attempt",
"syncSource"_attr = _syncSource);
onCompletionGuard->setResultAndCancelRemainingWork(
lock,
Status(ErrorCodes::InvalidSyncSource,
"sync source replSetGetStatus response is missing lastStableRecoveryTimestamp"));
_initialSyncState.reset();
return;
}
const Timestamp lastStableRecoveryTs = stableElem.timestamp();
auto beginFetchingOpTimeForInitiate = beginFetchingOpTime;
const auto earliestTs = _initialSyncState->earliestOplogEntryTimestamp;
const bool isInitiatingSet = _initialSyncState->earliestOplogEntryIsInitiatingSet;
const int64_t diff = static_cast<int64_t>(lastStableRecoveryTs.getSecs()) -
static_cast<int64_t>(earliestTs.getSecs());
const int64_t thresholdSecs =
initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs.load();
if (isInitiatingSet && diff <= thresholdSecs) {
LOGV2(11318412,
"Skipping wait for sync source stable recovery timestamp: sync source was recently "
"initiated and its stable recovery timestamp is within threshold of the initiating "
"set oplog entry",
"earliestOplogEntryTs"_attr = earliestTs,
"lastStableRecoveryTs"_attr = lastStableRecoveryTs,
"thresholdSecs"_attr = thresholdSecs);
_initialSyncState->beginApplyingTimestamp = earliestTs;
_initialSyncState->beginFetchingTimestamp = earliestTs;
_summaryStats->beginApplyingTimestamp.storeRelaxed(
_initialSyncState->beginApplyingTimestamp.asULL());
_summaryStats->beginFetchingTimestamp.storeRelaxed(
_initialSyncState->beginFetchingTimestamp.asULL());
// The 'initializingSet' oplog entry will always have term = -1, since it is written
// prior to setting the term to 0 and completing the rest of the set initialization.
beginFetchingOpTimeForInitiate = OpTime(earliestTs, OpTime::kUninitializedTerm);
auto status = _scheduleWorkAndSaveHandle(
lock,
[=, this](const executor::TaskExecutor::CallbackArgs& args) {
_initializeOplogFetcherAndDbCloners(
args, onCompletionGuard, beginFetchingOpTimeForInitiate);
},
&_initializeOplogFetcherAndDbClonersHandle,
"_initializeOplogFetcherAndDbCloners from _initiatingSetStableTimestampCallback");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState.reset();
}
return;
}
LOGV2(11318413,
"Initiating set check did not meet skip criteria; proceeding to wait for sync source "
"stable recovery timestamp to advance",
"isInitiatingSet"_attr = isInitiatingSet,
"earliestOplogEntryTs"_attr = earliestTs,
"lastStableRecoveryTs"_attr = lastStableRecoveryTs);
_initialSyncState->waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline =
(*_attemptExec)->now() +
Seconds(initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs.load());
// TODO (SERVER-125965): Wait for lastStableRecoveryTimestamp to advance in this case instead of
// proceeding directly to _initializeOplogFetcherAndDbCloners.
status = _scheduleWorkAndSaveHandle(
lock,
[=, this](const executor::TaskExecutor::CallbackArgs& args) {
_initializeOplogFetcherAndDbCloners(args, onCompletionGuard, beginFetchingOpTime);
},
&_initializeOplogFetcherAndDbClonersHandle,
"_initializeOplogFetcherAndDbCloners");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState.reset();
}
}
void InitialSyncer::_earliestOplogEntryForInitiatingSetCallback(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const OpTime& beginFetchingOpTime) {
std::unique_lock<std::mutex> lock(_mutex);
_setPhase(lock, Phase::kWaitingForSyncSourceStableTs);
auto status = _checkForShutdownAndConvertStatus(
lock, result.getStatus(), "error while getting earliest oplog entry to check for initiate");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState.reset();
return;
}
const auto docs = result.getValue().documents;
const auto hasDoc = docs.begin() != docs.end();
if (!hasDoc) {
onCompletionGuard->setResultAndCancelRemainingWork(
lock,
Status(ErrorCodes::NoMatchingDocument, "earliest oplog entry not able to be fetched"));
_initialSyncState.reset();
return;
}
auto swOplogEntry = OplogEntry::parse(docs.front());
if (!swOplogEntry.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, swOplogEntry.getStatus());
_initialSyncState.reset();
return;
}
const auto oplogEntry = swOplogEntry.getValue();
const bool isInitiatingSet =
(oplogEntry.getOpType() == OpTypeEnum::kNoop &&
oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)));
// Store the earliest oplog entry info so _checkIfInitiatingSet can use it once we have
// the sync source's lastStableRecoveryTimestamp from replSetGetStatus.
_initialSyncState->earliestOplogEntryIsInitiatingSet = isInitiatingSet;
_initialSyncState->earliestOplogEntryTimestamp = oplogEntry.getTimestamp();
// Fetch lastStableRecoveryTimestamp from the sync source. The callback will decide whether
// to skip the wait (initiating-set case, stable ts within threshold) or start the wait loop.
executor::RemoteCommandRequest replSetGetStatusRequest(
_syncSource, DatabaseName::kAdmin, BSON("replSetGetStatus" << 1), nullptr);
auto cbHandle =
(*_attemptExec)
->scheduleRemoteCommand(std::move(replSetGetStatusRequest),
[this, onCompletionGuard, beginFetchingOpTime](
TaskExecutor::RemoteCommandCallbackArgs args) {
_initiatingSetStableTimestampCallback(
args, onCompletionGuard, beginFetchingOpTime);
});
if (!cbHandle.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, cbHandle.getStatus());
_initialSyncState.reset();
return;
}
_waitForSyncSourceStableTimestampHandle = cbHandle.getValue();
}
void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const OpTime& lastOpTime,
@ -1533,52 +1250,103 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
logAttrs(NamespaceString::kRsOplogNamespace),
"beginFetchingTimestamp"_attr = _initialSyncState->beginFetchingTimestamp);
if (!initialSyncWaitForSyncSourceLastStableRecoveryTs.load()) {
// Server parameter is toggled off, skip waiting for stable recovery timestamp to advance on
// sync source.
LOGV2_WARNING(
11318403,
"Skipping waiting for sync source stable recovery timestamp to advance on sync source "
"because the 'initialSyncWaitForSyncSourceLastStableRecoveryTs' parameter is off",
"beginFetchingTimestamp"_attr = _initialSyncState->beginFetchingTimestamp,
"beginApplyingTimestamp"_attr = _initialSyncState->beginApplyingTimestamp);
status = _scheduleWorkAndSaveHandle(
lock,
[=, this](const executor::TaskExecutor::CallbackArgs& args) {
_initializeOplogFetcherAndDbCloners(args, onCompletionGuard, beginFetchingOpTime);
},
&_initializeOplogFetcherAndDbClonersHandle,
str::stream() << "_initializeOplogFetcherAndDbCloners");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState.reset();
}
return;
}
// Check earliest oplog entry prior to starting the wait. If it is the "initiating set"
// oplog entry, skip wait.
BSONObj query =
BSON("find" << NamespaceString::kRsOplogNamespace.coll() << "sort" << BSON("$natural" << 1)
<< "limit" << 1 << ReadConcernArgs::kReadConcernFieldName
<< ReadConcernArgs::kLocal.toBSONInner());
_earliestOplogEntryFetcher = std::make_unique<Fetcher>(
*_attemptExec,
_syncSource,
NamespaceString::kRsOplogNamespace.dbName(),
query,
[=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response,
mongo::Fetcher::NextAction*,
mongo::BSONObjBuilder*) mutable {
_earliestOplogEntryForInitiatingSetCallback(
response, onCompletionGuard, beginFetchingOpTime);
});
status = _earliestOplogEntryFetcher->schedule();
const auto configResult = _dataReplicatorExternalState->getCurrentConfig();
status = configResult.getStatus();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState.reset();
_earliestOplogEntryFetcher.reset();
return;
}
const auto& config = configResult.getValue();
OplogFetcher::Config oplogFetcherConfig(
beginFetchingOpTime,
_syncSource,
config,
initialSyncOplogFetcherBatchSize.load(),
OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource);
oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc;
_oplogFetcher = (*_createOplogFetcherFn)(
*_attemptExec,
std::make_unique<OplogFetcherRestartDecisionInitialSyncer>(
_sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts),
_dataReplicatorExternalState.get(),
[=, this](OplogFetcher::Documents::const_iterator first,
OplogFetcher::Documents::const_iterator last,
const OplogFetcher::DocumentsInfo& info) {
return _enqueueDocuments(first, last, info);
},
[=, this](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); },
std::move(oplogFetcherConfig));
LOGV2_DEBUG(21178, 2, "Starting OplogFetcher", "oplogFetcher"_attr = _oplogFetcher->toString());
// _startupComponent is shutdown-aware.
status = _startupComponent(lock, _oplogFetcher);
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
_initialSyncState->allDatabaseCloner.reset();
return;
}
if (MONGO_unlikely(initialSyncHangBeforeCopyingDatabases.shouldFail())) {
lock.unlock();
// This could have been done with a scheduleWorkAt but this is used only by JS tests where
// we run with multiple threads so it's fine to spin on this thread.
// This log output is used in js tests so please leave it.
LOGV2(21179,
"initial sync - initialSyncHangBeforeCopyingDatabases fail point "
"enabled. Blocking until fail point is disabled.");
while (MONGO_unlikely(initialSyncHangBeforeCopyingDatabases.shouldFail()) &&
!_isShuttingDown()) {
mongo::sleepsecs(1);
}
lock.lock();
}
LOGV2_DEBUG(21180,
2,
"Starting AllDatabaseCloner",
"allDatabaseCloner"_attr = _initialSyncState->allDatabaseCloner->toString());
auto [startClonerFuture, startCloner] =
_initialSyncState->allDatabaseCloner->runOnExecutorEvent(*_clonerAttemptExec);
// runOnExecutorEvent ensures the future is not ready unless an error has occurred.
if (startClonerFuture.isReady()) {
status = startClonerFuture.getNoThrow();
invariant(!status.isOK());
onCompletionGuard->setResultAndCancelRemainingWork(lock, status);
return;
}
_initialSyncState->allDatabaseClonerFuture =
std::move(startClonerFuture).onCompletion([this, onCompletionGuard](Status status) mutable {
// The completion guard must run on the main executor, and never inline. In unit tests,
// without the executor call, it would run on the wrong executor. In both production
// and in unit tests, if the cloner finishes very quickly, the callback could run
// in-line and result in self-deadlock.
std::unique_lock<std::mutex> lock(_mutex);
auto exec_status = (*_attemptExec)
->scheduleWork([this, status, onCompletionGuard](
executor::TaskExecutor::CallbackArgs args) {
_allDatabaseClonerCallback(status, onCompletionGuard);
});
if (!exec_status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lock, exec_status.getStatus());
// In the shutdown case, it is possible the completion guard will be run
// from this thread (since the lambda holding another copy didn't schedule).
// If it does, we will self-deadlock if we're holding the lock, so release it.
lock.unlock();
}
// In unit tests, this reset ensures the completion guard does not run during the
// destruction of the lambda (which occurs on the wrong executor), except in the
// shutdown case.
onCompletionGuard.reset();
});
_setPhase(lock, Phase::kCloningData);
lock.unlock();
// Start (and therefore finish) the cloners outside the lock. This ensures onCompletion
// is not run with the mutex held, which would result in self-deadlock.
(*_clonerAttemptExec)->signalEvent(startCloner);
}
void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus,
@ -2348,19 +2116,6 @@ void InitialSyncer::_clearRetriableError(WithLock lk) {
_retryingOperation = boost::none;
}
bool InitialSyncer::_checkForShutdownAndHandleError(
std::unique_lock<std::mutex>& lk,
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const std::string& errorMsg) {
auto status = _checkForShutdownAndConvertStatus(lk, callbackArgs, errorMsg);
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork(lk, status);
return false;
}
return true;
}
Status InitialSyncer::_checkForShutdownAndConvertStatus(
WithLock lk,
const executor::TaskExecutor::CallbackArgs& callbackArgs,
@ -2629,8 +2384,6 @@ StringData InitialSyncer::phaseToString(Phase phase) {
return "determiningStartOpTime"_sd;
case Phase::kFetchingFCV:
return "fetchingFCV"_sd;
case Phase::kWaitingForSyncSourceStableTs:
return "waitingForSyncSourceStableTs"_sd;
case Phase::kCloningData:
return "cloningData"_sd;
case Phase::kDeterminingStopTimestamp:

View File

@ -160,7 +160,6 @@ public:
kCheckingSourceRollback,
kDeterminingStartOpTime,
kFetchingFCV,
kWaitingForSyncSourceStableTs,
kCloningData,
kDeterminingStopTimestamp,
kApplyingOplog,
@ -402,18 +401,6 @@ private:
* _fcvFetcherCallback()
* |
* |
* V
* _earliestOplogEntryForInitiatingSetCallback()
* |
* |
* V
* _checkIfInitiatingSet()
* |
* |
* V
* _initializeOplogFetcherAndDbCloners()
* |
* |
* +------------------------------+
* | |
* | |
@ -544,55 +531,6 @@ private:
const OpTime& lastOpTime,
OpTime& beginFetchingOpTime);
/**
* Fetcher callback that receives the sync source's earliest oplog entry and determines
* whether that entry is the "initiating set" noop written when the replica set was first
* created. The result is stored in `_initialSyncState` and then a `replSetGetStatus`
* command is issued to the sync source; the response is handled by
* `_initiatingSetStableTimestampCallback`.
*
* Only reached when `initialSyncWaitForSyncSourceLastStableRecoveryTs` is enabled.
* Transitions the sync phase to `Phase::kWaitingForSyncSourceStableTs`.
*/
void _earliestOplogEntryForInitiatingSetCallback(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const OpTime& beginFetchingOpTime);
/**
* Remote-command callback that receives `replSetGetStatus` from the sync source,
* determines whether or not we are initiating the replica set, and updates
* `_initialSyncState` accordingly.
*
* Initiating set case skips the stable-timestamp wait and begins cloning from the
* initiating-set entry's timestamp when both conditions hold:
* - The earliest oplog entry IS the "initiating set" noop, AND
* - `lastStableRecoveryTimestamp - earliestOplogEntryTimestamp <=
* initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs`
* In this case `beginApplyingTimestamp` and `beginFetchingTimestamp` are reset to the
* initiating-set entry's timestamp (with term = kUninitializedTerm).
*
* Non-initiating case when the conditions above are not met, records the retry
* deadline in
* `_initialSyncState->waitForSyncSourceStableTimestampAdvanceMaxRetryDeadline` and
* proceeds with the original `beginFetchingOpTime`.
* (Full retry loop is tracked in SERVER-125965.)
*
* Scheduled by `_earliestOplogEntryForInitiatingSetCallback`.
*/
void _initiatingSetStableTimestampCallback(
const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const OpTime& beginFetchingOpTime);
/**
* Initializes the oplog fetcher and database cloners.
*/
void _initializeOplogFetcherAndDbCloners(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const OpTime& beginFetchingOpTime);
/**
* Callback for oplog fetcher.
*/
@ -714,16 +652,6 @@ private:
*/
void _clearRetriableError(WithLock lk);
/**
* Checks the embedded status inside the callback args and current data replicator shutdown
* state. Returns true if the status is OK, otherwise returns false. If false, cancels the
* initial sync attempt and resets the initial sync state.
*/
bool _checkForShutdownAndHandleError(std::unique_lock<std::mutex>& lk,
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const std::string& errorMsg);
/**
* Checks the given status (or embedded status inside the callback args) and current data
* replicator shutdown state. If the given status is not OK or if we are shutting down, returns
@ -824,12 +752,6 @@ private:
// Handle returned from RollbackChecker::checkForRollback().
RollbackChecker::CallbackHandle _getLastRollbackIdHandle; // (M)
// Handle to currently scheduled _initializeOplogFetcherAndDbCloners() task.
executor::TaskExecutor::CallbackHandle _initializeOplogFetcherAndDbClonersHandle; // (M)
// Handle used for waiting for stable timestamp to advance on sync source.
executor::TaskExecutor::CallbackHandle _waitForSyncSourceStableTimestampHandle; // (M)
// Handle to currently scheduled _getNextApplierBatchCallback() task.
executor::TaskExecutor::CallbackHandle _getNextApplierBatchHandle; // (M)
@ -841,7 +763,6 @@ private:
std::unique_ptr<Fetcher> _beginFetchingOpTimeFetcher; // (S)
std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S)
std::unique_ptr<Fetcher> _fCVFetcher; // (S)
std::unique_ptr<Fetcher> _earliestOplogEntryFetcher; // (S)
std::unique_ptr<MultiApplier> _applier; // (M)
HostAndPort _syncSource; // (M)
std::unique_ptr<DBClientConnection> _client; // (M)

View File

@ -286,20 +286,6 @@ public:
*/
void processSuccessfulFCVFetcherResponseLastLTS();
/**
* Schedules and processes a successful response to the network request sent by InitialSyncer's
* earliest oplog entry fetcher (used when initialSyncWaitForSyncSourceLastStableRecoveryTs is
* enabled). Validates that sort is $natural:1 (ascending).
*/
void processSuccessfulEarliestOplogEntryFetcherResponse(std::vector<BSONObj> docs);
/**
* Enables initialSyncWaitForSyncSourceLastStableRecoveryTs, starts the initial syncer, and
* drives it through all preamble network steps up to and including the FCV fetch. Leaves the
* earliest oplog entry fetch request pending for the caller to handle.
*/
void runWaitStableTsPreamble(int beginApplyingT = 1);
void finishProcessingNetworkResponse() {
getNet()->runReadyNetworkOperations();
if (getNet()->hasReadyRequests()) {
@ -339,7 +325,6 @@ protected:
void setUp() override {
executor::ThreadPoolExecutorTest::setUp();
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(false);
_storageInterface = std::make_unique<StorageInterfaceMock>();
_storageInterface->createOplogFn = [this](OperationContext* opCtx,
const NamespaceString& nss) {
@ -542,7 +527,6 @@ protected:
_replicationProcess.reset();
_storageInterface.reset();
_mock.reset();
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(false);
}
/**
@ -757,64 +741,6 @@ void InitialSyncerTest::processSuccessfulFCVFetcherResponse(std::vector<BSONObj>
net->runReadyNetworkOperations();
}
void InitialSyncerTest::processSuccessfulEarliestOplogEntryFetcherResponse(
std::vector<BSONObj> docs) {
auto net = getNet();
auto request =
assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
0LL, NamespaceString::kRsOplogNamespace, docs)));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
ASSERT_TRUE(request.cmdObj.hasField("sort"));
ASSERT_EQUALS(mongo::BSONType::object, request.cmdObj["sort"].type());
ASSERT_BSONOBJ_EQ(BSON("$natural" << 1), request.cmdObj.getObjectField("sort"));
net->runReadyNetworkOperations();
}
BSONObj makeInitiatingSetOplogEntryObj(int t) {
return DurableOplogEntry(OpTime(Timestamp(t, 1), -1),
OpTypeEnum::kNoop,
NamespaceString::createNamespaceString_forTest(""),
boost::none,
boost::none,
boost::none,
boost::none,
OplogEntry::kOplogVersion,
BSON("msg" << repl::kInitiatingSetMsg),
boost::none,
{},
boost::none,
Date_t() + Seconds(t),
{},
boost::none,
boost::none,
boost::none,
boost::none,
boost::none,
boost::none)
.toBSON();
}
void InitialSyncerTest::runWaitStableTsPreamble(int beginApplyingT) {
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
auto opCtx = makeOpCtx();
ASSERT_OK(getInitialSyncer().startup(opCtx.get(), 1U));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
auto txnRequest = net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, NamespaceString::kSessionTransactionsTableNamespace, {}, true));
assertRemoteCommandNameEquals("find", txnRequest);
net->runReadyNetworkOperations();
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(beginApplyingT)});
processSuccessfulFCVFetcherResponseLastLTS();
}
// The FCV callback has now scheduled the earliest oplog entry fetch, leaving one pending
// request in the queue for the caller to handle.
}
TEST_F(InitialSyncerTest, InvalidConstruction) {
InitialSyncerInterface::Options options;
options.getMyLastOptime = []() {
@ -5343,158 +5269,4 @@ TEST_F(InitialSyncerTest, InitialSyncerReloadsTransientErrorRetryPeriodOnEachAtt
ASSERT_EQUALS(initialSyncer->getAllowedOutageDuration_forTest(), Seconds(updatedRetryPeriod));
}
TEST_F(InitialSyncerTest,
InitialSyncerPassesThroughEarliestOplogEntryFetcherCallbackErrorWhenWaitingForStableTs) {
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
runWaitStableTsPreamble();
auto initialSyncer = &getInitialSyncer();
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
assertRemoteCommandNameEquals(
"find",
net->scheduleErrorResponse(
Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerPassesThroughReplSetGetStatusCallbackErrorWhenWaitingForStableTs) {
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
runWaitStableTsPreamble();
auto initialSyncer = &getInitialSyncer();
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
processSuccessfulEarliestOplogEntryFetcherResponse({makeOplogEntryObj(1)});
assertRemoteCommandNameEquals(
"replSetGetStatus",
net->scheduleErrorResponse(
Status(ErrorCodes::OperationFailed, "replSetGetStatus failed at sync source")));
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerFailsIfReplSetGetStatusResponseMissingLastStableRecoveryTimestamp) {
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
runWaitStableTsPreamble();
auto initialSyncer = &getInitialSyncer();
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
processSuccessfulEarliestOplogEntryFetcherResponse({makeOplogEntryObj(1)});
auto replSetGetStatusRequest =
net->scheduleSuccessfulResponse(BSON("ok" << 1 /* no lastStableRecoveryTimestamp */));
assertRemoteCommandNameEquals("replSetGetStatus", replSetGetStatusRequest);
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerSkipsWaitWhenInitiatingSetWithinThreshold) {
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
// Use t=5 for beginApplyingTimestamp so we can verify it gets overridden to t=1.
runWaitStableTsPreamble(5);
auto initialSyncer = &getInitialSyncer();
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Earliest oplog entry is the "initiating set" noop at ts(1,1).
processSuccessfulEarliestOplogEntryFetcherResponse({makeInitiatingSetOplogEntryObj(1)});
// lastStableRecoveryTimestamp matches exactly — diff = 0, well within threshold.
auto replSetGetStatusRequest = net->scheduleSuccessfulResponse(
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(1, 1)));
assertRemoteCommandNameEquals("replSetGetStatus", replSetGetStatusRequest);
net->runReadyNetworkOperations();
// _checkIfInitiatingSet took the skip path and set beginApplyingTimestamp to the
// initiating set entry's timestamp (t=1), overriding the original value (t=5).
// The executor thread is blocked while InNetworkGuard is held, so _initialSyncState
// is still valid here.
auto progress = initialSyncer->getInitialSyncProgress();
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
// Shut down to cancel the scheduled _initializeOplogFetcherAndDbCloners cleanly.
ASSERT_OK(initialSyncer->shutdown());
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerDoesNotSkipWaitWhenNotInitiatingSet) {
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
// Use t=5 for beginApplyingTimestamp so it is distinguishable from the earliest oplog entry
// timestamp (t=1). This lets us verify the timestamp was NOT overridden to t=1.
runWaitStableTsPreamble(5);
auto initialSyncer = &getInitialSyncer();
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Earliest oplog entry is a regular insert at ts(1,1) — not an initiating-set noop.
processSuccessfulEarliestOplogEntryFetcherResponse({makeOplogEntryObj(1)});
auto replSetGetStatusRequest = net->scheduleSuccessfulResponse(
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(1, 1)));
assertRemoteCommandNameEquals("replSetGetStatus", replSetGetStatusRequest);
net->runReadyNetworkOperations();
// _checkIfInitiatingSet did not take the skip path because the earliest oplog entry is not
// the initiating-set noop. beginApplyingTimestamp must remain at its original value (t=5),
// not be reset to the earliest oplog entry timestamp (t=1).
auto progress = initialSyncer->getInitialSyncProgress();
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(5, 1)) << progress;
ASSERT_OK(initialSyncer->shutdown());
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerDoesNotSkipWaitWhenInitiatingSetOutsideThreshold) {
initialSyncWaitForSyncSourceLastStableRecoveryTs.store(true);
// Lower the threshold so we can test with easily-constructed timestamps.
initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs.store(10);
const ScopeGuard resetThreshold([] {
initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs.store(3600);
});
// Use t=5 for beginApplyingTimestamp so it is distinguishable from the initiating-set entry
// timestamp (t=1). This lets us verify the timestamp was NOT overridden to t=1.
runWaitStableTsPreamble(5);
auto initialSyncer = &getInitialSyncer();
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Earliest oplog entry is the initiating-set noop at ts(1,1).
processSuccessfulEarliestOplogEntryFetcherResponse({makeInitiatingSetOplogEntryObj(1)});
// lastStableRecoveryTimestamp is 100 s ahead — outside the 10 s test threshold.
auto replSetGetStatusRequest = net->scheduleSuccessfulResponse(
BSON("ok" << 1 << "lastStableRecoveryTimestamp" << Timestamp(101, 1)));
assertRemoteCommandNameEquals("replSetGetStatus", replSetGetStatusRequest);
net->runReadyNetworkOperations();
// _checkIfInitiatingSet did not take the skip path because lastStableRecoveryTimestamp
// (t=101) is outside the threshold (10 s) of the initiating-set entry (t=1).
// beginApplyingTimestamp must remain at its original value (t=5), not be reset to t=1.
auto progress = initialSyncer->getInitialSyncProgress();
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(5, 1)) << progress;
ASSERT_OK(initialSyncer->shutdown());
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
} // namespace

View File

@ -751,50 +751,6 @@ server_parameters:
default: 30000
redact: false
initialSyncWaitForSyncSourceLastStableRecoveryTs:
description: >-
If true, during initial sync, we will wait for the sync source to advance its lastStableRecoveryTimestamp
to our beginApplyingTimestamp before starting collection cloning. This is to prevent a bug where we may miss
documents during collection clone if our sync source restarts. Please see SERVER-113184 for more details.
IMPORTANT: Turning this server parameter off will introduce risk for inconsistent data after initial sync.
Only turn this off if initial sync is unable to wait for sync source propagation, and run data consistency
validation after initial sync completes.
set_at: [startup, runtime]
cpp_vartype: AtomicWord<bool>
cpp_varname: initialSyncWaitForSyncSourceLastStableRecoveryTs
default: true
redact: false
initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs:
description: >-
The amount of time in seconds that the node will wait for the node's sync source to advance its lastStableRecoveryTimestamp
to our beginApplyingTimestamp. No effect if 'initialSyncWaitForSyncSourceLastStableRecoveryTs' is set to false. Default
wait period of 10 minutes.
set_at: [startup, runtime]
cpp_vartype: AtomicWord<int>
cpp_varname: initialSyncWaitForSyncSourceLastStableRecoveryTsRetryPeriodSecs
default:
expr: 60 * 10
validator:
gte: 0
redact: false
initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs:
description: >-
If the earliest oplog entry on the sync source is the "initiating set" entry and the
sync source's lastStableRecoveryTimestamp is within this many seconds of that entry,
initial sync skips waiting for the stable recovery timestamp to advance. This covers the
case where the replica set was just created and the sync source has not yet had time to
advance its stable recovery timestamp far from the initiating set timestamp. Default is
1 hour. No effect if 'initialSyncWaitForSyncSourceLastStableRecoveryTs' is set to false.
set_at: [startup, runtime]
cpp_vartype: AtomicWord<int>
cpp_varname: initialSyncWaitForSyncSourceLastStableRecoveryTsInitiatingSetThresholdSecs
default: 3600
validator:
gte: 0
redact: false
feature_flags:
featureFlagSecondaryIndexChecksInDbCheck:
description: When enabled, dbCheck runs document and secondary index consistency checks in addition to replica set data consistency checks.