mongo/jstests/replsets/retryable_write_interrupted_on_stepdown.js
Pierre Turin 98f9b607bc SERVER-110728 Kill retryable writes ops on stepdown (#46583)
GitOrigin-RevId: d08b0b2ca80865894573c63c908c75626b75bf82
2026-03-03 18:22:21 +00:00

470 lines
20 KiB
JavaScript

/**
* Tests that retried writes are correctly interrupted on stepdown on the first and second try (see: SERVER-110728).
*/
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js";
describe("Tests that retried writes are correctly interrupted on stepdown on the first and second try", function () {
const kNodes = 3;
const dbName = "test_db";
const collName = jsTest.name();
let rst = null;
let primaryConn = null;
let primaryDB = null;
beforeEach(() => {
clearRawMongoProgramOutput();
rst = new ReplSetTest({
nodes: kNodes,
settings: {chainingAllowed: false},
});
rst.startSet({});
rst.initiate();
primaryConn = rst.getPrimary();
primaryDB = primaryConn.getDB(dbName);
primaryDB.adminCommand({
"setDefaultRWConcern": 1,
"defaultWriteConcern": {
"w": "majority",
// High timeout to give time for the other thread to execute the stepdown command.
"wtimeout": 10000,
},
"defaultReadConcern": {
"level": "local",
},
});
assert.commandWorked(primaryDB.createCollection("coordinationColl", {writeConcern: {w: 1}}));
primaryDB.getMongo().setSecondaryOk();
primaryDB.getMongo().setReadPref("primaryPreferred");
assert.commandWorked(primaryDB.createCollection(collName));
assert.commandWorked(
primaryDB[collName].insertMany([
{_id: 1, x: 1},
{_id: 2, x: 2},
]),
);
assert.eq(primaryDB[collName].countDocuments({}), 2);
rst.awaitReplication();
rst.getSecondaries().forEach((secondary) => assert.eq(secondary.getDB(dbName)[collName].countDocuments({}), 2));
});
afterEach(() => {
rst.stopSet();
});
// Functions used to signal that an event has happened.
// Used to coordinate writes and stepdown attempts between threads.
function signalEvent(primaryDB, eventName) {
assert.commandWorked(primaryDB.coordinationColl.insertOne({_id: eventName}, {writeConcern: {w: 1}}));
}
function waitForEvent(primaryDB, eventName) {
assert.soon(
() => primaryDB.coordinationColl.findOne({_id: eventName}),
"Did not find '" + eventName + "' document in 'coordinationColl', the event was never signaled",
);
}
// Functions used to run the test with different write commands:
function doTestForInsert(testRetryWriteFn) {
it("retryable INSERT write gets interrupted", () => {
jsTest.log.info("Test retryable write for INSERT");
function runInsert(primaryDB, collName) {
const insertCmd = {
insert: collName,
documents: [{_id: 10}, {_id: 20}],
ordered: false,
lsid: {id: UUID("867dee52-c331-484e-92d1-c56479b8e67e")},
txnNumber: NumberLong(42),
};
return primaryDB.runCommand(insertCmd);
}
assert.eq(primaryDB[collName].countDocuments({}), 2);
testRetryWriteFn(runInsert);
rst.nodes.forEach((n) => assert.eq(n.getDB(dbName)[collName].countDocuments({}), 4));
});
}
function doTestForUpdate(testRetryWriteFn) {
it("retryable UPDATE write gets interrupted", () => {
jsTest.log.info("Test retryable write for UPDATE");
function runUpdate(primaryDB, collName) {
const updateCmd = {
update: collName,
updates: [
{q: {_id: 1}, u: {$inc: {x: 1}}}, // in place
{q: {_id: 2}, u: {z: 1}}, // replacement
{q: {_id: 3}, u: {$inc: {y: 1}}, upsert: true},
],
ordered: false,
lsid: {id: UUID("867dee52-c331-484e-92d1-c56479b8e67e")},
txnNumber: NumberLong(42),
};
return primaryDB.runCommand(updateCmd);
}
assert.eq(primaryDB[collName].countDocuments({}), 2);
testRetryWriteFn(runUpdate);
rst.nodes.forEach((n) => assert.eq(n.getDB(dbName)[collName].countDocuments({}), 3));
});
}
function doTestForDelete(testRetryWriteFn) {
it("retryable DELETE write gets interrupted", () => {
jsTest.log.info("Test retryable write for DELETE");
function runDelete(primaryDB, collName) {
const deleteCmd = {
delete: collName,
deletes: [
{q: {x: 1}, limit: 1},
{q: {_id: 2}, limit: 1},
],
ordered: false,
lsid: {id: UUID("867dee52-c331-484e-92d1-c56479b8e67e")},
txnNumber: NumberLong(42),
};
return primaryDB.runCommand(deleteCmd);
}
assert.eq(primaryDB[collName].countDocuments({}), 2);
testRetryWriteFn(runDelete);
rst.nodes.forEach((n) => assert.eq(n.getDB(dbName)[collName].countDocuments({}), 0));
});
}
function doTestForBulkWrite(testRetryWriteFn) {
it("retryable BULK write gets interrupted", () => {
jsTest.log.info("Test retryable write for BULK write");
function runBulkWrite(primaryDB, collName) {
const bulkCmd = {
bulkWrite: 1,
ops: [
{insert: 0, document: {_id: 10}},
{insert: 0, document: {_id: 20}},
{update: 0, filter: {_id: 1}, updateMods: {$inc: {x: 1}}},
{delete: 0, filter: {_id: 2}},
],
nsInfo: [{ns: primaryDB[collName].getFullName()}],
lsid: {id: UUID("867dee52-c331-484e-92d1-c56479b8e67e")},
txnNumber: NumberLong(42),
};
return primaryDB.adminCommand(bulkCmd);
}
assert.eq(primaryDB[collName].countDocuments({}), 2);
testRetryWriteFn(runBulkWrite);
rst.nodes.forEach((n) => assert.eq(n.getDB(dbName)[collName].countDocuments({}), 3));
});
}
//
// Retryable write on the same node.
//
describe("Execute retried writes twice on the same node", function () {
/**
* Run a retryable write on the primary while doing a stepdown to interrupt the write.
*
* We first block both secondaries from applying oplogs, this will prevent majority writes
* from completing.
* We have two threads:
* 1. The "doRetryableWrite" thread will do the retryable write twice. The write will wait
* for write majority for 10sec. Before the write times out the other thread will attempt
* to step-down which will make the writes immediately fail with the
* InterruptedDueToReplStateChange error.
* 2. The main thread will wait for the "doRetryableWrite" thread to execute the write
* command, then it will attempt to step-down. The step-down will make the write fail and
* then it will timeout after 1sec (secondaryCatchUpPeriodSecs) since no secondaries can
* catch up with the primary.
*
* After the "doRetryableWrite" thread has attempted to execute the write twice, we join the
* thread. Then we unlock both secondaries and execute the write a third time, this attempt
* will succeed.
*
* @param {function (primaryDB, collName) -> WriteResult} runWrite - Run a retryable write
*/
function testRetryWriteOnPrimary(runWrite) {
// Block secondaries from applying oplogs
rst.getSecondaries().forEach((secondary) => secondary.getDB(dbName).fsyncLock());
function doRetryableWrite(primaryPort, dbName, collName, runWrite, signalEvent, waitForEvent) {
const newConn = new Mongo("localhost:" + primaryPort);
assert(newConn);
newConn.setSecondaryOk();
newConn.setReadPref("primaryPreferred");
const primaryDB = newConn.getDB(dbName);
signalEvent(primaryDB, "readyForFirstStepDown");
// First try:
let writeRes = assert.commandFailedWithCode(
runWrite(primaryDB, collName),
ErrorCodes.InterruptedDueToReplStateChange,
);
jsTest.log.info("1st write result: " + tojson(writeRes));
waitForEvent(primaryDB, "firstStepDownDone");
// Second try:
// Before SERVER-110728, the second try would fail with "WriteConcernTimeout" error
// instead of "InterruptedDueToReplStateChange".
writeRes = assert.commandFailedWithCode(
runWrite(primaryDB, collName),
ErrorCodes.InterruptedDueToReplStateChange,
);
jsTest.log.info("2nd write result: " + tojson(writeRes));
}
const writerThread = new Thread(
doRetryableWrite,
primaryConn.port,
dbName,
collName,
runWrite,
signalEvent,
waitForEvent,
);
writerThread.start();
waitForEvent(primaryDB, "readyForFirstStepDown");
sleep(1000); // Give time to the other thread to execute the write attempt
let result = assert.commandFailedWithCode(
primaryConn.adminCommand({replSetStepDown: 5, secondaryCatchUpPeriodSecs: 1, force: false}),
ErrorCodes.ExceededTimeLimit,
);
jsTest.log.info("1st stepdown, result: " + tojson(result));
assert.soon(
() =>
rawMongoProgramOutput(
"8562701.*Repl state change interrupted a thread.*" +
'"name":"conn[0-9]+".*globalLockConflict":true.*"isRetryableWrite":true',
),
"mongod did not log that it interrupted the retryable write due to the stepdown",
);
signalEvent(primaryDB, "firstStepDownDone");
sleep(1000); // Give time to the other thread to execute the write attempt
result = assert.commandFailedWithCode(
primaryConn.adminCommand({replSetStepDown: 5, secondaryCatchUpPeriodSecs: 1, force: false}),
ErrorCodes.ExceededTimeLimit,
);
jsTest.log.info("2nd stepdown, result:" + tojson(result));
assert.soon(
() =>
rawMongoProgramOutput(
"8562701.*Repl state change interrupted a thread.*" +
'"name":"conn[0-9]+".*"globalLockConflict":false.*"isRetryableWrite":true',
),
"mongod did not log that it interrupted the retryable write due to the stepdown",
);
assert.doesNotThrow(() => writerThread.join());
// Unlock secondaries
rst.getSecondaries().forEach((secondary) => secondary.getDB(dbName).fsyncUnlock());
// Now the write should succeed
assert.commandWorked(runWrite(primaryDB, collName));
rst.awaitReplication();
}
doTestForInsert(testRetryWriteOnPrimary);
doTestForUpdate(testRetryWriteOnPrimary);
doTestForDelete(testRetryWriteOnPrimary);
doTestForBulkWrite(testRetryWriteOnPrimary);
}); // "Execute retried writes twice on the same node"
//
// Retryable write on different nodes.
//
describe("Execute retried writes on different nodes", function () {
let lockedSecondary = null;
let liveSecondary = null;
beforeEach(() => {
const secondaries = rst.getSecondaries();
assert.eq(secondaries.length, 2);
lockedSecondary = secondaries[0];
liveSecondary = secondaries[1];
primaryDB.adminCommand({
"setDefaultRWConcern": 1,
"defaultWriteConcern": {
// Writes should be blocked even with one live secondary
"w": 3,
// High timeout to give time for the other thread to execute the stepdown command.
"wtimeout": 10000,
},
});
});
/**
* Run a retryable write twice on a different primary while doing a stepdown to interrupt
* the write.
*
* We first block one of the secondaries from applying oplogs, this will prevent w:3 writes
* from completing, and will allow the step-down to succeed.
* We have two threads:
* 1. The "doRetryableWrite" thread will do the retryable write (with write concern w:3)
* twice. The write will wait to be replicated on all 3 nodes of the replset for 10sec.
* Before the write times out the other thread will do a step-down which will make the
* writes immediately fail with the InterruptedDueToReplStateChange error. After the
* first write has failed and the step-down is completed, this thread will re-try the
* write on the new primary. Again, the other thread will do another step-down before the
* write can time out, which will make it fail.
* 2. The main thread will wait for the "doRetryableWrite" thread to execute the write
* command, then it will send a step-down command on the current primary. The step-down
* will succeed and make the write fail. The first step-down is executed on "primary" and
* the second is executed on "liveSecondary", since this is the secondary node which will
* step-up on the first step-down.
*
* After the "doRetryableWrite" thread has attempted to execute the write twice, we join the
* thread. Then we unlock the secondary and execute the write a third time, this attempt
* will succeed.
*
* @param {function (primaryDB, collName) -> WriteResult} runWrite - Run a retryable write
*/
function testRetryWriteOnTwoNodes(runWrite) {
// Block one secondary from applying oplogs
lockedSecondary.getDB(dbName).fsyncLock();
function doRetryableWrite(
primaryPort,
liveSecondaryPort,
dbName,
collName,
runWrite,
signalEvent,
waitForEvent,
) {
const primaryConn = new Mongo("localhost:" + primaryPort);
assert(primaryConn);
primaryConn.setSecondaryOk();
primaryConn.setReadPref("primaryPreferred");
const primaryDB = primaryConn.getDB(dbName);
signalEvent(primaryDB, "readyForFirstStepDown");
// First try:
let writeRes = assert.commandFailedWithCode(
runWrite(primaryDB, collName),
ErrorCodes.InterruptedDueToReplStateChange,
);
jsTest.log.info("1st write result: " + tojson(writeRes));
const newPrimaryConn = new Mongo("localhost:" + liveSecondaryPort);
assert(newPrimaryConn);
newPrimaryConn.setSecondaryOk();
newPrimaryConn.setReadPref("primaryPreferred");
assert.soon(
() => newPrimaryConn.adminCommand("hello").isWritablePrimary,
"Live secondary node never became a primary after the first step-down",
);
const newPrimaryDB = newPrimaryConn.getDB(dbName);
waitForEvent(newPrimaryDB, "readyForSecondStepDown");
// Second try:
// Before SERVER-110728, the second try would fail with "PrimarySteppedDown" error
// instead of "InterruptedDueToReplStateChange".
writeRes = assert.commandFailedWithCode(
runWrite(newPrimaryDB, collName),
ErrorCodes.InterruptedDueToReplStateChange,
);
jsTest.log.info("2nd write result: " + tojson(writeRes));
}
const writerThread = new Thread(
doRetryableWrite,
primaryConn.port,
liveSecondary.port,
dbName,
collName,
runWrite,
signalEvent,
waitForEvent,
);
writerThread.start();
waitForEvent(primaryDB, "readyForFirstStepDown");
sleep(1000); // Give time to the other thread to execute the write attempt
let result = assert.commandWorked(
primaryConn.adminCommand({replSetStepDown: 10, secondaryCatchUpPeriodSecs: 5, force: false}),
);
jsTest.log.info("1st stepdown, result: " + tojson(result));
assert.soon(
() =>
rawMongoProgramOutput(
"8562701.*Repl state change interrupted a thread.*" +
'"name":"conn[0-9]+".*"globalLockConflict":true.*"isRetryableWrite":true',
),
"mongod did not log that it interrupted the retryable write due to the stepdown",
);
assert.soon(
() => liveSecondary.adminCommand("hello").isWritablePrimary,
"Live secondary node never became a primary after the first step-down",
);
assert.soon(
() => !primaryConn.adminCommand("hello").isWritablePrimary,
"The old primary node never stepped-down",
);
const newPrimary = liveSecondary;
// Wait for at least replSetStepDown time (10sec) so that the old primary can become primary again.
sleep(10000);
signalEvent(newPrimary.getDB(dbName), "readyForSecondStepDown");
sleep(1000); // Give time to the other thread to execute the write attempt
result = assert.commandWorked(
newPrimary.adminCommand({replSetStepDown: 10, secondaryCatchUpPeriodSecs: 5, force: false}),
);
jsTest.log.info("2nd stepdown, result:" + tojson(result));
assert.soon(
() =>
rawMongoProgramOutput(
"8562701.*Repl state change interrupted a thread.*" +
'"name":"conn[0-9]+".*"globalLockConflict":false.*"isRetryableWrite":true',
),
"mongod did not log that it interrupted the retryable write due to the stepdown",
);
assert.doesNotThrow(() => writerThread.join());
assert.soon(
() => primaryConn.adminCommand("hello").isWritablePrimary,
"Old primary node never became a primary after the second step-down",
);
assert.soon(
() => !liveSecondary.adminCommand("hello").isWritablePrimary,
"Live secondary node never stepped down after the second step-down",
);
// Unlock secondary
lockedSecondary.getDB(dbName).fsyncUnlock();
// Now the write should succeed
assert.commandWorked(runWrite(primaryDB, collName));
rst.awaitReplication();
}
doTestForInsert(testRetryWriteOnTwoNodes);
doTestForUpdate(testRetryWriteOnTwoNodes);
doTestForDelete(testRetryWriteOnTwoNodes);
doTestForBulkWrite(testRetryWriteOnTwoNodes);
}); // "Execute retried writes on different nodes"
});