SERVER-110728 Kill retryable writes ops on stepdown (#46583)

GitOrigin-RevId: d08b0b2ca80865894573c63c908c75626b75bf82
This commit is contained in:
Pierre Turin 2026-03-03 09:50:58 -08:00 committed by MongoDB Bot
parent 06aab6e63d
commit 98f9b607bc
6 changed files with 988 additions and 119 deletions

View File

@ -147,6 +147,8 @@ last-continuous:
ticket: SERVER-120210
- test_file: jstests/noPassthrough/replication/killOp_against_repl_threads.js
ticket: SERVER-112273
- test_file: jstests/replsets/retryable_write_interrupted_on_stepdown.js
ticket: SERVER-110728
suites: null
last-lts:
all:
@ -792,4 +794,6 @@ last-lts:
ticket: SERVER-120210
- test_file: jstests/noPassthrough/replication/killOp_against_repl_threads.js
ticket: SERVER-112273
- test_file: jstests/replsets/retryable_write_interrupted_on_stepdown.js
ticket: SERVER-110728
suites: null

View File

@ -0,0 +1,469 @@
/**
* Tests that retried writes are correctly interrupted on stepdown on the first and second try (see: SERVER-110728).
*/
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js";
describe("Tests that retried writes are correctly interrupted on stepdown on the first and second try", function () {
const kNodes = 3;
const dbName = "test_db";
const collName = jsTest.name();
let rst = null;
let primaryConn = null;
let primaryDB = null;
beforeEach(() => {
clearRawMongoProgramOutput();
rst = new ReplSetTest({
nodes: kNodes,
settings: {chainingAllowed: false},
});
rst.startSet({});
rst.initiate();
primaryConn = rst.getPrimary();
primaryDB = primaryConn.getDB(dbName);
primaryDB.adminCommand({
"setDefaultRWConcern": 1,
"defaultWriteConcern": {
"w": "majority",
// High timeout to give time for the other thread to execute the stepdown command.
"wtimeout": 10000,
},
"defaultReadConcern": {
"level": "local",
},
});
assert.commandWorked(primaryDB.createCollection("coordinationColl", {writeConcern: {w: 1}}));
primaryDB.getMongo().setSecondaryOk();
primaryDB.getMongo().setReadPref("primaryPreferred");
assert.commandWorked(primaryDB.createCollection(collName));
assert.commandWorked(
primaryDB[collName].insertMany([
{_id: 1, x: 1},
{_id: 2, x: 2},
]),
);
assert.eq(primaryDB[collName].countDocuments({}), 2);
rst.awaitReplication();
rst.getSecondaries().forEach((secondary) => assert.eq(secondary.getDB(dbName)[collName].countDocuments({}), 2));
});
afterEach(() => {
rst.stopSet();
});
// Functions used to signal that an event has happened.
// Used to coordinate writes and stepdown attempts between threads.
function signalEvent(primaryDB, eventName) {
assert.commandWorked(primaryDB.coordinationColl.insertOne({_id: eventName}, {writeConcern: {w: 1}}));
}
function waitForEvent(primaryDB, eventName) {
assert.soon(
() => primaryDB.coordinationColl.findOne({_id: eventName}),
"Did not find '" + eventName + "' document in 'coordinationColl', the event was never signaled",
);
}
// Functions used to run the test with different write commands:
function doTestForInsert(testRetryWriteFn) {
it("retryable INSERT write gets interrupted", () => {
jsTest.log.info("Test retryable write for INSERT");
function runInsert(primaryDB, collName) {
const insertCmd = {
insert: collName,
documents: [{_id: 10}, {_id: 20}],
ordered: false,
lsid: {id: UUID("867dee52-c331-484e-92d1-c56479b8e67e")},
txnNumber: NumberLong(42),
};
return primaryDB.runCommand(insertCmd);
}
assert.eq(primaryDB[collName].countDocuments({}), 2);
testRetryWriteFn(runInsert);
rst.nodes.forEach((n) => assert.eq(n.getDB(dbName)[collName].countDocuments({}), 4));
});
}
function doTestForUpdate(testRetryWriteFn) {
it("retryable UPDATE write gets interrupted", () => {
jsTest.log.info("Test retryable write for UPDATE");
function runUpdate(primaryDB, collName) {
const updateCmd = {
update: collName,
updates: [
{q: {_id: 1}, u: {$inc: {x: 1}}}, // in place
{q: {_id: 2}, u: {z: 1}}, // replacement
{q: {_id: 3}, u: {$inc: {y: 1}}, upsert: true},
],
ordered: false,
lsid: {id: UUID("867dee52-c331-484e-92d1-c56479b8e67e")},
txnNumber: NumberLong(42),
};
return primaryDB.runCommand(updateCmd);
}
assert.eq(primaryDB[collName].countDocuments({}), 2);
testRetryWriteFn(runUpdate);
rst.nodes.forEach((n) => assert.eq(n.getDB(dbName)[collName].countDocuments({}), 3));
});
}
function doTestForDelete(testRetryWriteFn) {
it("retryable DELETE write gets interrupted", () => {
jsTest.log.info("Test retryable write for DELETE");
function runDelete(primaryDB, collName) {
const deleteCmd = {
delete: collName,
deletes: [
{q: {x: 1}, limit: 1},
{q: {_id: 2}, limit: 1},
],
ordered: false,
lsid: {id: UUID("867dee52-c331-484e-92d1-c56479b8e67e")},
txnNumber: NumberLong(42),
};
return primaryDB.runCommand(deleteCmd);
}
assert.eq(primaryDB[collName].countDocuments({}), 2);
testRetryWriteFn(runDelete);
rst.nodes.forEach((n) => assert.eq(n.getDB(dbName)[collName].countDocuments({}), 0));
});
}
function doTestForBulkWrite(testRetryWriteFn) {
it("retryable BULK write gets interrupted", () => {
jsTest.log.info("Test retryable write for BULK write");
function runBulkWrite(primaryDB, collName) {
const bulkCmd = {
bulkWrite: 1,
ops: [
{insert: 0, document: {_id: 10}},
{insert: 0, document: {_id: 20}},
{update: 0, filter: {_id: 1}, updateMods: {$inc: {x: 1}}},
{delete: 0, filter: {_id: 2}},
],
nsInfo: [{ns: primaryDB[collName].getFullName()}],
lsid: {id: UUID("867dee52-c331-484e-92d1-c56479b8e67e")},
txnNumber: NumberLong(42),
};
return primaryDB.adminCommand(bulkCmd);
}
assert.eq(primaryDB[collName].countDocuments({}), 2);
testRetryWriteFn(runBulkWrite);
rst.nodes.forEach((n) => assert.eq(n.getDB(dbName)[collName].countDocuments({}), 3));
});
}
//
// Retryable write on the same node.
//
describe("Execute retried writes twice on the same node", function () {
/**
* Run a retryable write on the primary while doing a stepdown to interrupt the write.
*
* We first block both secondaries from applying oplogs, this will prevent majority writes
* from completing.
* We have two threads:
* 1. The "doRetryableWrite" thread will do the retryable write twice. The write will wait
* for write majority for 10sec. Before the write times out the other thread will attempt
* to step-down which will make the writes immediately fail with the
* InterruptedDueToReplStateChange error.
* 2. The main thread will wait for the "doRetryableWrite" thread to execute the write
* command, then it will attempt to step-down. The step-down will make the write fail and
* then it will timeout after 1sec (secondaryCatchUpPeriodSecs) since no secondaries can
* catch up with the primary.
*
* After the "doRetryableWrite" thread has attempted to execute the write twice, we join the
* thread. Then we unlock both secondaries and execute the write a third time, this attempt
* will succeed.
*
* @param {function (primaryDB, collName) -> WriteResult} runWrite - Run a retryable write
*/
function testRetryWriteOnPrimary(runWrite) {
// Block secondaries from applying oplogs
rst.getSecondaries().forEach((secondary) => secondary.getDB(dbName).fsyncLock());
function doRetryableWrite(primaryPort, dbName, collName, runWrite, signalEvent, waitForEvent) {
const newConn = new Mongo("localhost:" + primaryPort);
assert(newConn);
newConn.setSecondaryOk();
newConn.setReadPref("primaryPreferred");
const primaryDB = newConn.getDB(dbName);
signalEvent(primaryDB, "readyForFirstStepDown");
// First try:
let writeRes = assert.commandFailedWithCode(
runWrite(primaryDB, collName),
ErrorCodes.InterruptedDueToReplStateChange,
);
jsTest.log.info("1st write result: " + tojson(writeRes));
waitForEvent(primaryDB, "firstStepDownDone");
// Second try:
// Before SERVER-110728, the second try would fail with "WriteConcernTimeout" error
// instead of "InterruptedDueToReplStateChange".
writeRes = assert.commandFailedWithCode(
runWrite(primaryDB, collName),
ErrorCodes.InterruptedDueToReplStateChange,
);
jsTest.log.info("2nd write result: " + tojson(writeRes));
}
const writerThread = new Thread(
doRetryableWrite,
primaryConn.port,
dbName,
collName,
runWrite,
signalEvent,
waitForEvent,
);
writerThread.start();
waitForEvent(primaryDB, "readyForFirstStepDown");
sleep(1000); // Give time to the other thread to execute the write attempt
let result = assert.commandFailedWithCode(
primaryConn.adminCommand({replSetStepDown: 5, secondaryCatchUpPeriodSecs: 1, force: false}),
ErrorCodes.ExceededTimeLimit,
);
jsTest.log.info("1st stepdown, result: " + tojson(result));
assert.soon(
() =>
rawMongoProgramOutput(
"8562701.*Repl state change interrupted a thread.*" +
'"name":"conn[0-9]+".*globalLockConflict":true.*"isRetryableWrite":true',
),
"mongod did not log that it interrupted the retryable write due to the stepdown",
);
signalEvent(primaryDB, "firstStepDownDone");
sleep(1000); // Give time to the other thread to execute the write attempt
result = assert.commandFailedWithCode(
primaryConn.adminCommand({replSetStepDown: 5, secondaryCatchUpPeriodSecs: 1, force: false}),
ErrorCodes.ExceededTimeLimit,
);
jsTest.log.info("2nd stepdown, result:" + tojson(result));
assert.soon(
() =>
rawMongoProgramOutput(
"8562701.*Repl state change interrupted a thread.*" +
'"name":"conn[0-9]+".*"globalLockConflict":false.*"isRetryableWrite":true',
),
"mongod did not log that it interrupted the retryable write due to the stepdown",
);
assert.doesNotThrow(() => writerThread.join());
// Unlock secondaries
rst.getSecondaries().forEach((secondary) => secondary.getDB(dbName).fsyncUnlock());
// Now the write should succeed
assert.commandWorked(runWrite(primaryDB, collName));
rst.awaitReplication();
}
doTestForInsert(testRetryWriteOnPrimary);
doTestForUpdate(testRetryWriteOnPrimary);
doTestForDelete(testRetryWriteOnPrimary);
doTestForBulkWrite(testRetryWriteOnPrimary);
}); // "Execute retried writes twice on the same node"
//
// Retryable write on different nodes.
//
describe("Execute retried writes on different nodes", function () {
let lockedSecondary = null;
let liveSecondary = null;
beforeEach(() => {
const secondaries = rst.getSecondaries();
assert.eq(secondaries.length, 2);
lockedSecondary = secondaries[0];
liveSecondary = secondaries[1];
primaryDB.adminCommand({
"setDefaultRWConcern": 1,
"defaultWriteConcern": {
// Writes should be blocked even with one live secondary
"w": 3,
// High timeout to give time for the other thread to execute the stepdown command.
"wtimeout": 10000,
},
});
});
/**
* Run a retryable write twice on a different primary while doing a stepdown to interrupt
* the write.
*
* We first block one of the secondaries from applying oplogs, this will prevent w:3 writes
* from completing, and will allow the step-down to succeed.
* We have two threads:
* 1. The "doRetryableWrite" thread will do the retryable write (with write concern w:3)
* twice. The write will wait to be replicated on all 3 nodes of the replset for 10sec.
* Before the write times out the other thread will do a step-down which will make the
* writes immediately fail with the InterruptedDueToReplStateChange error. After the
* first write has failed and the step-down is completed, this thread will re-try the
* write on the new primary. Again, the other thread will do another step-down before the
* write can time out, which will make it fail.
* 2. The main thread will wait for the "doRetryableWrite" thread to execute the write
* command, then it will send a step-down command on the current primary. The step-down
* will succeed and make the write fail. The first step-down is executed on "primary" and
* the second is executed on "liveSecondary", since this is the secondary node which will
* step-up on the first step-down.
*
* After the "doRetryableWrite" thread has attempted to execute the write twice, we join the
* thread. Then we unlock the secondary and execute the write a third time, this attempt
* will succeed.
*
* @param {function (primaryDB, collName) -> WriteResult} runWrite - Run a retryable write
*/
function testRetryWriteOnTwoNodes(runWrite) {
// Block one secondary from applying oplogs
lockedSecondary.getDB(dbName).fsyncLock();
function doRetryableWrite(
primaryPort,
liveSecondaryPort,
dbName,
collName,
runWrite,
signalEvent,
waitForEvent,
) {
const primaryConn = new Mongo("localhost:" + primaryPort);
assert(primaryConn);
primaryConn.setSecondaryOk();
primaryConn.setReadPref("primaryPreferred");
const primaryDB = primaryConn.getDB(dbName);
signalEvent(primaryDB, "readyForFirstStepDown");
// First try:
let writeRes = assert.commandFailedWithCode(
runWrite(primaryDB, collName),
ErrorCodes.InterruptedDueToReplStateChange,
);
jsTest.log.info("1st write result: " + tojson(writeRes));
const newPrimaryConn = new Mongo("localhost:" + liveSecondaryPort);
assert(newPrimaryConn);
newPrimaryConn.setSecondaryOk();
newPrimaryConn.setReadPref("primaryPreferred");
assert.soon(
() => newPrimaryConn.adminCommand("hello").isWritablePrimary,
"Live secondary node never became a primary after the first step-down",
);
const newPrimaryDB = newPrimaryConn.getDB(dbName);
waitForEvent(newPrimaryDB, "readyForSecondStepDown");
// Second try:
// Before SERVER-110728, the second try would fail with "PrimarySteppedDown" error
// instead of "InterruptedDueToReplStateChange".
writeRes = assert.commandFailedWithCode(
runWrite(newPrimaryDB, collName),
ErrorCodes.InterruptedDueToReplStateChange,
);
jsTest.log.info("2nd write result: " + tojson(writeRes));
}
const writerThread = new Thread(
doRetryableWrite,
primaryConn.port,
liveSecondary.port,
dbName,
collName,
runWrite,
signalEvent,
waitForEvent,
);
writerThread.start();
waitForEvent(primaryDB, "readyForFirstStepDown");
sleep(1000); // Give time to the other thread to execute the write attempt
let result = assert.commandWorked(
primaryConn.adminCommand({replSetStepDown: 10, secondaryCatchUpPeriodSecs: 5, force: false}),
);
jsTest.log.info("1st stepdown, result: " + tojson(result));
assert.soon(
() =>
rawMongoProgramOutput(
"8562701.*Repl state change interrupted a thread.*" +
'"name":"conn[0-9]+".*"globalLockConflict":true.*"isRetryableWrite":true',
),
"mongod did not log that it interrupted the retryable write due to the stepdown",
);
assert.soon(
() => liveSecondary.adminCommand("hello").isWritablePrimary,
"Live secondary node never became a primary after the first step-down",
);
assert.soon(
() => !primaryConn.adminCommand("hello").isWritablePrimary,
"The old primary node never stepped-down",
);
const newPrimary = liveSecondary;
// Wait for at least replSetStepDown time (10sec) so that the old primary can become primary again.
sleep(10000);
signalEvent(newPrimary.getDB(dbName), "readyForSecondStepDown");
sleep(1000); // Give time to the other thread to execute the write attempt
result = assert.commandWorked(
newPrimary.adminCommand({replSetStepDown: 10, secondaryCatchUpPeriodSecs: 5, force: false}),
);
jsTest.log.info("2nd stepdown, result:" + tojson(result));
assert.soon(
() =>
rawMongoProgramOutput(
"8562701.*Repl state change interrupted a thread.*" +
'"name":"conn[0-9]+".*"globalLockConflict":false.*"isRetryableWrite":true',
),
"mongod did not log that it interrupted the retryable write due to the stepdown",
);
assert.doesNotThrow(() => writerThread.join());
assert.soon(
() => primaryConn.adminCommand("hello").isWritablePrimary,
"Old primary node never became a primary after the second step-down",
);
assert.soon(
() => !liveSecondary.adminCommand("hello").isWritablePrimary,
"Live secondary node never stepped down after the second step-down",
);
// Unlock secondary
lockedSecondary.getDB(dbName).fsyncUnlock();
// Now the write should succeed
assert.commandWorked(runWrite(primaryDB, collName));
rst.awaitReplication();
}
doTestForInsert(testRetryWriteOnTwoNodes);
doTestForUpdate(testRetryWriteOnTwoNodes);
doTestForDelete(testRetryWriteOnTwoNodes);
doTestForBulkWrite(testRetryWriteOnTwoNodes);
}); // "Execute retried writes on different nodes"
});

View File

@ -1343,6 +1343,19 @@ mongo_cc_library(
],
)
mongo_cc_unit_test(
name = "auto_get_rstl_for_stepup_stepdown_test",
srcs = [
"auto_get_rstl_for_stepup_stepdown_test.cpp",
],
tags = ["mongo_unittest_third_group"],
deps = [
":auto_get_rstl_for_stepup_stepdown",
"//src/mongo/db:service_context_d_test_fixture",
"//src/mongo/db/session:logical_session_id_helpers",
],
)
mongo_cc_library(
name = "repl_coordinator_impl",
srcs = [

View File

@ -31,6 +31,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/shard_role/lock_manager/dump_lock_manager.h"
#include "mongo/logv2/log.h"
@ -126,26 +127,14 @@ void AutoGetRstlForStepUpStepDown::_killOpThreadFn(Date_t deadline) {
ClientOperationKillableByStepdown{false});
LOGV2(21343, "Starting to kill user operations");
auto uniqueOpCtx = cc().makeOperationContext();
OperationContext* opCtx = uniqueOpCtx.get();
// Set the reason for killing operations.
ErrorCodes::Error killReason = ErrorCodes::InterruptedDueToReplStateChange;
// This thread needs storage rollback to complete timely, so instruct the storage
// engine to not do any extra eviction for this thread, if supported.
shard_role_details::getRecoveryUnit(opCtx)->setNoEvictionAfterCommitOrRollback();
const OperationContext* rstlOpCtx = getOpCtx();
OpsAndSessionsKiller killer(rstlOpCtx->getServiceContext(),
ErrorCodes::InterruptedDueToReplStateChange,
std::vector<const OperationContext*>{rstlOpCtx},
deadline);
while (true) {
// Reset the value before killing operations as we only want to track the number
// of operations that's running after step down.
_totalOpsRunning = 0;
_killConflictingOpsOnStepUpAndStepDown(killReason);
// Destroy all stashed transaction resources, in order to release locks.
SessionKiller::Matcher matcherAllSessions(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
killSessionsAbortUnpreparedTransactions(opCtx, matcherAllSessions, killReason, deadline);
killer.killConflictingOpsAndSessionsOnStepUpAndStepDown();
// Operations (like batch insert) that have currently yielded the global lock during step
// down can reacquire global lock in IX mode when this node steps back up after a brief
@ -159,7 +148,7 @@ void AutoGetRstlForStepUpStepDown::_killOpThreadFn(Date_t deadline) {
lock, Milliseconds(10).toSystemDuration(), [this] { return _killSignaled; })) {
LOGV2(21344, "Stopped killing user operations");
_stepUpStepDownCoord->updateAndLogStateTransitionMetrics(
_stateTransition, getTotalOpsKilled(), getTotalOpsRunning());
_stateTransition, killer.getTotalOpsKilled(), killer.getTotalOpsRunning());
_killSignaled = false;
return;
}
@ -180,22 +169,6 @@ void AutoGetRstlForStepUpStepDown::_stopAndWaitForKillOpThread() {
_killOpThread.reset();
}
size_t AutoGetRstlForStepUpStepDown::getTotalOpsKilled() const {
return _totalOpsKilled;
}
void AutoGetRstlForStepUpStepDown::incrementTotalOpsKilled(size_t val) {
_totalOpsKilled += val;
}
size_t AutoGetRstlForStepUpStepDown::getTotalOpsRunning() const {
return _totalOpsRunning;
}
void AutoGetRstlForStepUpStepDown::incrementTotalOpsRunning(size_t val) {
_totalOpsRunning += val;
}
void AutoGetRstlForStepUpStepDown::rstlRelease() {
_rstlLock->release();
}
@ -214,54 +187,88 @@ void AutoGetRstlForStepUpStepDown::rstlReacquire() {
_rstlLock->reacquire();
}
void AutoGetRstlForStepUpStepDown::_killConflictingOpsOnStepUpAndStepDown(
ErrorCodes::Error reason) {
const OperationContext* rstlOpCtx = getOpCtx();
ServiceContext* serviceCtx = rstlOpCtx->getServiceContext();
invariant(serviceCtx);
for (ServiceContext::LockedClientsCursor cursor(serviceCtx); Client* client = cursor.next();) {
ClientLock lk(client);
OperationContext* toKill = client->getOperationContext();
// Don't kill step up/step down thread.
if (toKill && !toKill->isKillPending() && toKill->getOpID() != rstlOpCtx->getOpID()) {
auto& tracker = StorageExecutionContext::get(toKill)->getPrepareConflictTracker();
bool isWaitingOnPrepareConflict = tracker.isWaitingOnPrepareConflict();
if (client->canKillOperationInStepdown()) {
auto locker = shard_role_details::getLocker(toKill);
bool alwaysInterrupt = toKill->shouldAlwaysInterruptAtStepDownOrUp();
bool globalLockConfict = locker->wasGlobalLockTakenInModeConflictingWithWrites();
if (alwaysInterrupt || globalLockConfict || isWaitingOnPrepareConflict) {
serviceCtx->killOperation(lk, toKill, reason);
incrementTotalOpsKilled();
LOGV2(8562701,
"Repl state change interrupted a thread.",
"name"_attr = client->desc(),
"alwaysInterrupt"_attr = alwaysInterrupt,
"globalLockConflict"_attr = globalLockConfict,
"isWaitingOnPrepareConflict"_attr = isWaitingOnPrepareConflict);
} else {
incrementTotalOpsRunning();
}
} else if (isWaitingOnPrepareConflict) {
// All operations that hit a prepare conflict should be killable to prevent
// deadlocks with prepared transactions on replica set step up and step down.
LOGV2_FATAL(9699100,
"Repl state change encountered a non-killable thread blocked on a "
"prepare conflict.",
"name"_attr = client->desc(),
"conflictCount"_attr = tracker.getThisOpPrepareConflictCount(),
"conflictDurationMicros"_attr =
tracker.getThisOpPrepareConflictDuration());
}
}
}
}
const OperationContext* AutoGetRstlForStepUpStepDown::getOpCtx() const {
return _opCtx;
}
OpsAndSessionsKiller::OpsAndSessionsKiller(ServiceContext* serviceCtx,
ErrorCodes::Error killReason,
std::vector<const OperationContext*> opsToIgnore,
Date_t deadline)
: _serviceCtx(serviceCtx),
_killReason(killReason),
_opsToIgnore(std::move(opsToIgnore)),
_deadline(deadline) {
invariant(_serviceCtx);
_opCtx = cc().makeOperationContext();
_opsToIgnore.push_back(_opCtx.get());
// This thread needs storage rollback to complete timely, so instruct the storage
// engine to not do any extra eviction for this thread, if supported.
shard_role_details::getRecoveryUnit(_opCtx.get())->setNoEvictionAfterCommitOrRollback();
}
void OpsAndSessionsKiller::killConflictingOpsAndSessionsOnStepUpAndStepDown() {
// Reset the value before killing operations as we only want to track the number of operations
// that's running after step down.
_totalOpsRunning = 0;
for (ServiceContext::LockedClientsCursor cursor(_serviceCtx); Client* client = cursor.next();) {
ClientLock lk(client);
OperationContext* toKill = client->getOperationContext();
if (!toKill || toKill->isKillPending()) {
continue;
}
// Don't kill ops to ignore.
if (std::any_of(_opsToIgnore.begin(),
_opsToIgnore.end(),
[&toKill](const OperationContext* opCtxToIgnore) {
return toKill->getOpID() == opCtxToIgnore->getOpID();
})) {
continue;
}
auto& tracker = StorageExecutionContext::get(toKill)->getPrepareConflictTracker();
const bool isWaitingOnPrepareConflict = tracker.isWaitingOnPrepareConflict();
if (client->canKillOperationInStepdown()) {
auto locker = shard_role_details::getLocker(toKill);
const bool alwaysInterrupt = toKill->shouldAlwaysInterruptAtStepDownOrUp();
const bool globalLockConfict = locker->wasGlobalLockTakenInModeConflictingWithWrites();
const bool isRetryableWrite = toKill->isRetryableWrite();
if (alwaysInterrupt || globalLockConfict || isWaitingOnPrepareConflict ||
isRetryableWrite) {
_serviceCtx->killOperation(lk, toKill, _killReason);
++_totalOpsKilled;
LOGV2(8562701,
"Repl state change interrupted a thread.",
"name"_attr = client->desc(),
"alwaysInterrupt"_attr = alwaysInterrupt,
"globalLockConflict"_attr = globalLockConfict,
"isWaitingOnPrepareConflict"_attr = isWaitingOnPrepareConflict,
"isRetryableWrite"_attr = isRetryableWrite);
} else {
++_totalOpsRunning;
}
} else if (isWaitingOnPrepareConflict) {
// All operations that hit a prepare conflict should be killable to prevent
// deadlocks with prepared transactions on replica set step up and step down.
LOGV2_FATAL(9699100,
"Repl state change encountered a non-killable thread blocked on a "
"prepare conflict.",
"name"_attr = client->desc(),
"conflictCount"_attr = tracker.getThisOpPrepareConflictCount(),
"conflictDuration"_attr = tracker.getThisOpPrepareConflictDuration());
}
}
// Destroy all stashed transaction resources, in order to release locks.
SessionKiller::Matcher matcherAllSessions(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx.get())});
killSessionsAbortUnpreparedTransactions(
_opCtx.get(), matcherAllSessions, _killReason, _deadline);
}
} // namespace repl
} // namespace mongo

View File

@ -32,16 +32,16 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/replication_state_transition_lock_guard.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/shard_role/shard_role.h"
#include "mongo/db/storage/execution_context.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/modules.h"
#include "mongo/util/time_support.h"
#include <vector>
namespace mongo {
namespace repl {
namespace MONGO_MOD_PRIVATE repl {
class StepUpStepDownCoordinator {
public:
/**
@ -82,37 +82,17 @@ public:
AutoGetRstlForStepUpStepDown(const AutoGetRstlForStepUpStepDown&) = delete;
AutoGetRstlForStepUpStepDown& operator=(const AutoGetRstlForStepUpStepDown&) = delete;
/*
/**
* Releases RSTL lock.
*/
void rstlRelease();
/*
/**
* Reacquires RSTL lock.
*/
void rstlReacquire();
/*
* Returns _totalOpsKilled value.
*/
size_t getTotalOpsKilled() const;
/*
* Increments _totalOpsKilled by val.
*/
void incrementTotalOpsKilled(size_t val = 1);
/*
* Returns _totalOpsRunning value.
*/
size_t getTotalOpsRunning() const;
/*
* Increments _totalOpsRunning by val.
*/
void incrementTotalOpsRunning(size_t val = 1);
/*
/**
* Returns the step up/step down opCtx.
*/
const OperationContext* getOpCtx() const;
@ -148,19 +128,13 @@ private:
* transactions.
* Terminates once killSignaled is set true.
*/
void _killOpThreadFn(Date_t deadline = Date_t::max());
void _killOpThreadFn(Date_t deadline);
/*
/**
* Signals killOpThread to stop killing operations.
*/
void _stopAndWaitForKillOpThread();
/**
* kill all conflicting operations that are blocked either on prepare conflict or have taken
* global lock not in MODE_IS. The conflicting operations can be either user or system
* operations marked as killable.
*/
void _killConflictingOpsOnStepUpAndStepDown(ErrorCodes::Error reason);
StepUpStepDownCoordinator* const _stepUpStepDownCoord; // not owned.
// step up/step down opCtx.
@ -170,10 +144,6 @@ private:
boost::optional<ReplicationStateTransitionLockGuard> _rstlLock;
// Thread that will run killOpThreadFn().
std::unique_ptr<stdx::thread> _killOpThread;
// Tracks number of operations killed on step up / step down.
size_t _totalOpsKilled = 0;
// Tracks number of operations left running on step up / step down.
size_t _totalOpsRunning = 0;
// Protects killSignaled and stopKillingOps cond. variable.
stdx::mutex _mutex;
// Signals thread about the change of killSignaled value.
@ -184,5 +154,59 @@ private:
// class.
ReplicationCoordinator::OpsKillingStateTransitionEnum _stateTransition;
};
} // namespace repl
/**
* Helper class to kill conflicting operations and sessions on step-up or step-down.
*/
class MONGO_MOD_FILE_PRIVATE OpsAndSessionsKiller {
public:
OpsAndSessionsKiller(ServiceContext* serviceCtx,
ErrorCodes::Error killReason,
std::vector<const OperationContext*> opsToIgnore,
Date_t deadline);
OpsAndSessionsKiller(const OpsAndSessionsKiller&) = delete;
OpsAndSessionsKiller& operator=(const OpsAndSessionsKiller&) = delete;
/**
* Kill conflicting operations and stashed transactions resources during a step-up or
* step-down, in order to release locks.
* Operations that are blocked either on prepare conflict or have taken the global lock not
* in MODE_IS or are retry-able writes are killed. The conflicting operations can be either
* user or system operations marked as killable.
*/
void killConflictingOpsAndSessionsOnStepUpAndStepDown();
/**
* Returns the total number of operation killed by this object.
*/
size_t getTotalOpsKilled() const {
return _totalOpsKilled;
}
/**
* Returns the total number of operation running that were not killed in the last call of
* killConflictingOpsAndSessionsOnStepUpAndStepDown().
*/
size_t getTotalOpsRunning() const {
return _totalOpsRunning;
}
private:
ServiceContext* _serviceCtx;
// The op context for this thread.
ServiceContext::UniqueOperationContext _opCtx;
// Error code used when killing operations and sessions.
const ErrorCodes::Error _killReason;
// Operations that should not be killed.
std::vector<const OperationContext*> _opsToIgnore;
// Deadline to kill sessions.
Date_t _deadline;
// Tracks total number of operations killed.
size_t _totalOpsKilled = 0;
// Tracks number of operations left running.
size_t _totalOpsRunning = 0;
};
} // namespace MONGO_MOD_PRIVATE repl
} // namespace mongo

View File

@ -0,0 +1,352 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/repl/auto_get_rstl_for_stepup_stepdown.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session/logical_session_id_helpers.h"
#include "mongo/db/shard_role/lock_manager/d_concurrency.h"
#include "mongo/db/shard_role/transaction_resources.h"
#include "mongo/db/storage/execution_context.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace repl {
/**
* Tests for AutoGetRstlForStepUpStepDown::OpsAndSessionsKiller.
*
* This test class uses ServiceContextMongoDTest to get a fully initialized service context with
* storage infrastructure needed for the OpsAndSessionsKiller object.
*/
class OpsAndSessionsKillerTest : public ServiceContextMongoDTest {
public:
void setUp() override {
ServiceContextMongoDTest::setUp();
}
void tearDown() override {
// End any prepare conflicts before destroying opCtxs.
for (auto* opCtx : _prepareConflictOpCtxs) {
auto& tracker = StorageExecutionContext::get(opCtx)->getPrepareConflictTracker();
tracker.endPrepareConflict(*getServiceContext()->getTickSource());
}
_prepareConflictOpCtxs.clear();
// Clear opCtxs before clients to ensure proper destruction order.
_opCtxs.clear();
_clients.clear();
ServiceContextMongoDTest::tearDown();
}
protected:
/**
* Creates a client and an associated operation context.
*/
auto makeClientAndOpCtx(std::string clientName, bool killable = true) {
auto client = getServiceContext()->getService()->makeClient(
std::move(clientName), nullptr, ClientOperationKillableByStepdown{killable});
auto opCtx = client->makeOperationContext();
return make_pair(std::move(client), std::move(opCtx));
}
/**
* Creates a client and an associated operation context, storing both internally
* to keep it alive for the lifetime of the test.
*/
OperationContext* makeOpCtx(std::string clientName, bool killable = true) {
auto [client, opCtx] = makeClientAndOpCtx(std::move(clientName), killable);
_clients.push_back(std::move(client));
_opCtxs.push_back(std::move(opCtx));
return _opCtxs.back().get();
}
/**
* Creates a client and operation context that is marked to always
* be interrupted during step up/step down.
*/
OperationContext* makeInterruptibleOpCtx(std::string clientName, bool killable = true) {
auto opCtx = makeOpCtx(std::move(clientName), killable);
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
return opCtx;
}
/**
* Creates a client and operation context that is marked as a retryable write.
* Retryable writes are killed during stepdown because they may have side effects
* that need to be rolled back.
*/
OperationContext* makeRetryableWriteOpCtx(std::string clientName, bool killable = true) {
auto opCtx = makeOpCtx(std::move(clientName), killable);
opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
opCtx->setTxnNumber(1);
ASSERT_TRUE(opCtx->isRetryableWrite());
return opCtx;
}
/**
* Creates a client and operation context that has taken a global lock in
* a mode that conflicts with writes. The lock is acquired and released immediately, but the
* wasGlobalLockTakenInModeConflictingWithWrites flag remains set.
*/
OperationContext* makeGlobalLockConflictOpCtx(std::string clientName, bool killable = true) {
auto opCtx = makeOpCtx(std::move(clientName), killable);
{
Lock::GlobalLock lock(opCtx, MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
ASSERT_TRUE(lock.isLocked());
}
ASSERT_TRUE(
shard_role_details::getLocker(opCtx)->wasGlobalLockTakenInModeConflictingWithWrites());
return opCtx;
}
/**
* Creates a client and operation context that is waiting on a prepare conflict.
* The prepare conflict is ended during tearDown.
*/
OperationContext* makePrepareConflictOpCtx(std::string clientName, bool killable = true) {
auto opCtx = makeOpCtx(std::move(clientName), killable);
auto& tracker = StorageExecutionContext::get(opCtx)->getPrepareConflictTracker();
tracker.beginPrepareConflict(*getServiceContext()->getTickSource());
ASSERT_TRUE(tracker.isWaitingOnPrepareConflict());
tracker.updatePrepareConflict(*getServiceContext()->getTickSource());
_prepareConflictOpCtxs.push_back(opCtx);
return opCtx;
}
/**
* Creates an OpsAndSessionsKiller object to be tested.
*/
std::unique_ptr<OpsAndSessionsKiller> makeKiller(
std::vector<const OperationContext*> opsToIgnore = {},
ErrorCodes::Error killReason = ErrorCodes::InterruptedDueToReplStateChange) {
return std::unique_ptr<OpsAndSessionsKiller>(new OpsAndSessionsKiller(
getServiceContext(), killReason, std::move(opsToIgnore), Date_t::max()));
}
private:
// Stores clients to keep them alive for the lifetime of the test.
std::vector<ServiceContext::UniqueClient> _clients;
// Stores opCtxs to keep them alive for the lifetime of the test.
std::vector<ServiceContext::UniqueOperationContext> _opCtxs;
// Tracks opCtxs with prepare conflicts for cleanup.
std::vector<OperationContext*> _prepareConflictOpCtxs;
};
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_NoOps) {
auto killer = makeKiller();
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 0);
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_DoesNotKillNormalOp) {
auto killer = makeKiller();
auto targetOpCtx = makeOpCtx("targetClient");
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 0);
ASSERT_EQ(killer->getTotalOpsRunning(), 1);
ASSERT_FALSE(targetOpCtx->isKillPending());
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_KillsAlwaysInterruptOp) {
auto killer = makeKiller();
auto targetOpCtx = makeInterruptibleOpCtx("targetClient");
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_TRUE(targetOpCtx->isKillPending());
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_KillsOpWithGlobalLockConflictingWithWrites) {
auto killer = makeKiller();
auto targetOpCtx = makeGlobalLockConflictOpCtx("targetClient");
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_TRUE(targetOpCtx->isKillPending());
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_KillsOpWaitingOnPrepareConflict) {
auto killer = makeKiller();
auto targetOpCtx = makePrepareConflictOpCtx("targetClient");
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_TRUE(targetOpCtx->isKillPending());
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_KillsRetryableWrite) {
auto killer = makeKiller();
auto retryableWriteOpCtx = makeRetryableWriteOpCtx("retryableWriteClient");
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_TRUE(retryableWriteOpCtx->isKillPending());
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_IgnoresOpsInIgnoreList) {
auto targetOpCtx = makeInterruptibleOpCtx("targetClient");
auto killer = makeKiller({targetOpCtx});
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 0);
// To ignore ops are not counted.
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_FALSE(targetOpCtx->isKillPending());
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_KillsMultipleOps) {
auto targetOpCtx1 = makeInterruptibleOpCtx("targetClient1");
auto targetOpCtx2 = makeInterruptibleOpCtx("targetClient2");
auto targetOpCtx3 = makeRetryableWriteOpCtx("targetClient3");
auto targetOpCtx4 = makePrepareConflictOpCtx("targetClient4");
auto targetOpCtx5 = makeGlobalLockConflictOpCtx("targetClient5");
auto toIgnoreOpCtx = makeGlobalLockConflictOpCtx("toIgnore");
auto notToKillOpCtx = makeOpCtx("notToKill");
auto killReason = ErrorCodes::InterruptedDueToReplStateChange;
auto killer = makeKiller({toIgnoreOpCtx}, killReason);
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 5);
ASSERT_EQ(killer->getTotalOpsRunning(), 1);
ASSERT_TRUE(targetOpCtx1->isKillPending());
ASSERT_TRUE(targetOpCtx2->isKillPending());
ASSERT_TRUE(targetOpCtx3->isKillPending());
ASSERT_TRUE(targetOpCtx4->isKillPending());
ASSERT_TRUE(targetOpCtx5->isKillPending());
ASSERT_EQ(targetOpCtx1->getKillStatus(), killReason);
ASSERT_EQ(targetOpCtx2->getKillStatus(), killReason);
ASSERT_EQ(targetOpCtx3->getKillStatus(), killReason);
ASSERT_EQ(targetOpCtx4->getKillStatus(), killReason);
ASSERT_EQ(targetOpCtx5->getKillStatus(), killReason);
ASSERT_FALSE(notToKillOpCtx->isKillPending());
ASSERT_FALSE(toIgnoreOpCtx->isKillPending());
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_NonKillableOpNotKilled) {
auto killer = makeKiller();
auto nonKillableOpCtx = makeInterruptibleOpCtx("nonKillableClient", false /* killable */);
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 0);
// Non-killable ops are not counted.
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_FALSE(nonKillableOpCtx->isKillPending());
}
using OpsAndSessionsKillerDeathTest = OpsAndSessionsKillerTest;
DEATH_TEST_F(OpsAndSessionsKillerDeathTest,
killConflictingOps_NonKillableOpWithPreparedConflict,
"9699100") {
auto killer = makeKiller();
makePrepareConflictOpCtx("nonKillableClient", false /* killable */);
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_SkipsAlreadyKillPending) {
auto killer = makeKiller();
auto targetOpCtx1 = makeInterruptibleOpCtx("targetClient1");
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_TRUE(targetOpCtx1->isKillPending());
// Kill again - should not increment count since op is already pending kill
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_TRUE(targetOpCtx1->isKillPending());
auto notToKillOpCtx = makeOpCtx("notToKill");
auto targetOpCtx2 = makeInterruptibleOpCtx("targetClient2");
// The new ops should be taken into account
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 2);
ASSERT_EQ(killer->getTotalOpsRunning(), 1);
ASSERT_TRUE(targetOpCtx1->isKillPending());
ASSERT_TRUE(targetOpCtx2->isKillPending());
ASSERT_FALSE(notToKillOpCtx->isKillPending());
// Nothing should change
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 2);
ASSERT_EQ(killer->getTotalOpsRunning(), 1);
ASSERT_TRUE(targetOpCtx1->isKillPending());
ASSERT_TRUE(targetOpCtx2->isKillPending());
ASSERT_FALSE(notToKillOpCtx->isKillPending());
}
TEST_F(OpsAndSessionsKillerTest, killConflictingOps_OpsRunningIsReset) {
auto killer = makeKiller();
auto toBeKilledOpCtx = makeGlobalLockConflictOpCtx("toBeKilled");
{
auto [notToKillClient, notToKillOpCtx] = makeClientAndOpCtx("notToKill");
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 1);
ASSERT_TRUE(toBeKilledOpCtx->isKillPending());
ASSERT_FALSE(notToKillOpCtx->isKillPending());
// Nothing should change
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 1);
ASSERT_TRUE(toBeKilledOpCtx->isKillPending());
ASSERT_FALSE(notToKillOpCtx->isKillPending());
}
// Total ops running should be reset to 0
killer->killConflictingOpsAndSessionsOnStepUpAndStepDown();
ASSERT_EQ(killer->getTotalOpsKilled(), 1);
ASSERT_EQ(killer->getTotalOpsRunning(), 0);
ASSERT_TRUE(toBeKilledOpCtx->isKillPending());
}
} // namespace repl
} // namespace mongo