SERVER-116168 Ensure Express executor doesn't hold locks while sleeping (#52990)

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
GitOrigin-RevId: 382a517562ab015d423537e265d65898e5f84303
This commit is contained in:
Kyle Burgess 2026-05-20 20:16:54 +02:00 committed by MongoDB Bot
parent 1eb7a7c36e
commit 4c0895b375
5 changed files with 499 additions and 26 deletions

View File

@ -1,11 +1,18 @@
/**
* TODO: Rename test to remove the word "update"
* This test checks that an update operation does not hold write locks while sleeping for backoff after a
* write conflict.
* This test checks that write operations do not hold write locks while sleeping for backoff after a
* write conflict. Specifically, it tests:
* 1. A batched delete (classic executor) does not hold write tickets while sleeping.
* 2. An express update does not hold write tickets while sleeping.
* 3. An express delete does not hold write tickets while sleeping.
* 4. An express update DOES hold the write ticket when the MaxReleaseTicketCycles fallback
* threshold is crossed.
* 5. An express update does not hold write tickets while sleeping for TemporarilyUnavailable
* backoff (WaitingForBackoff path).
* @tags: [requires_fcv_83]
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {isExpress} from "jstests/libs/query/analyze_plan.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {Thread} from "jstests/libs/parallelTester.js";
@ -38,6 +45,16 @@ assert.commandWorked(
),
);
// Verify the batched delete does NOT use the express executor.
{
const explainCmd = {
explain: {delete: coll.getName(), deletes: [{q: {a: "foo"}, limit: 0}]},
verbosity: "queryPlanner",
};
const explain = assert.commandWorked(db.runCommand(explainCmd));
assert(!isExpress(db, explain), "Batched delete should not use express executor: " + tojson(explain));
}
// Enable the failpoint to hang before logging and backoff.
const hangFp = configureFailPoint(primary, "planExecutorHangBeforeLogAndBackoff");
@ -56,8 +73,6 @@ const deleteThread = new Thread(function (host) {
deletes: [{q: {a: "foo"}, limit: 0}],
});
// TODO: SERVER-116168 We should also do this test with an express update.
return result;
}, primary.host);
@ -82,7 +97,7 @@ assert.commandWorked(
db.runCommand({
insert: coll.getName(),
documents: [{_id: numDocs + 1, a: "bar", b: 100}],
maxTimeMS: 10000,
maxTimeMS: 30000,
}),
);
@ -98,5 +113,388 @@ deleteThread.join();
const result = deleteThread.returnData();
jsTest.log("Delete thread completed with result: " + tojson(result));
// =====================================================================================
// Test 2: Express executor does not hold write tickets while sleeping for backoff.
// =====================================================================================
// Reset the write ticket limit before starting the express test.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
executionControlConcurrentWriteTransactions: 100,
}),
);
// Insert a fresh document for the express update test. (The batched delete above may have
// deleted all the original documents.)
const expressTestDocId = numDocs + 10;
assert.commandWorked(coll.insertOne({_id: expressTestDocId, a: "foo", b: 10}));
// Verify the update-by-_id DOES use the express executor.
{
const explainCmd = {
explain: {
update: coll.getName(),
updates: [{q: {_id: expressTestDocId}, u: {$set: {a: "bar"}}}],
},
verbosity: "queryPlanner",
};
const explain = assert.commandWorked(db.runCommand(explainCmd));
assert(isExpress(db, explain), "Update by _id should use express executor: " + tojson(explain));
}
// Enable the failpoint to hang before logging and backoff in the express executor.
const expressHangFp = configureFailPoint(primary, "expressExecutorHangBeforeLogAndBackoff");
// Enable the failpoint to throw write conflicts during express writes.
const expressWriteConflictFp = configureFailPoint(primary, "throwWriteConflictExceptionInExpressWrite");
// Create a thread that will run an express update (single-doc update by _id).
const expressUpdateThread = new Thread(
function (host, docId) {
const conn = new Mongo(host);
const db = conn.getDB("test");
const coll = db.concurrent_update_test;
// Update by _id so the express executor is chosen.
const result = db.runCommand({
update: coll.getName(),
updates: [{q: {_id: docId}, u: {$set: {a: "bar"}}}],
});
return result;
},
primary.host,
expressTestDocId,
);
expressUpdateThread.start();
// Wait until the express executor hits the pre-backoff failpoint.
expressHangFp.wait();
jsTestLog("Express update thread has hit the expressExecutorHangBeforeLogAndBackoff failpoint");
// Set concurrent write transactions to 1 to force other writes to compete for tickets. If the
// express thread is holding write tickets while sleeping, there will be no tickets available.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
executionControlConcurrentWriteTransactions: 1,
}),
);
// Attempt to do a write. Success means the express thread is not holding write tickets.
assert.commandWorked(
db.runCommand({
insert: coll.getName(),
documents: [{_id: numDocs + 11, a: "bar", b: 200}],
maxTimeMS: 30000,
}),
);
jsTestLog("Successfully inserted document, confirming express thread is not holding write tickets");
// Disable the failpoints and allow the express update to finish.
expressHangFp.off();
expressWriteConflictFp.off();
jsTestLog("Failpoints disabled, allowing blocked express operation to proceed");
// Wait for the express thread to complete.
expressUpdateThread.join();
const expressResult = expressUpdateThread.returnData();
jsTest.log("Express update thread completed with result: " + tojson(expressResult));
// =====================================================================================
// Test 3: Express executor does not hold write tickets while sleeping for backoff
// during a delete.
// =====================================================================================
// Reset the write ticket limit before starting the express delete test.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
executionControlConcurrentWriteTransactions: 100,
}),
);
// Insert a fresh document for the express delete test.
const expressDeleteDocId = numDocs + 20;
assert.commandWorked(coll.insertOne({_id: expressDeleteDocId, a: "foo", b: 30}));
// Verify the delete-by-_id DOES use the express executor.
{
const explainCmd = {
explain: {
delete: coll.getName(),
deletes: [{q: {_id: expressDeleteDocId}, limit: 1}],
},
verbosity: "queryPlanner",
};
const explain = assert.commandWorked(db.runCommand(explainCmd));
assert(isExpress(db, explain), "Delete by _id should use express executor: " + tojson(explain));
}
// Enable the failpoint to hang before logging and backoff in the express executor.
const expressDeleteHangFp = configureFailPoint(primary, "expressExecutorHangBeforeLogAndBackoff");
// Enable the failpoint to throw write conflicts during express writes.
const expressDeleteWriteConflictFp = configureFailPoint(primary, "throwWriteConflictExceptionInExpressWrite");
// Create a thread that will run an express delete (single-doc delete by _id).
const expressDeleteThread = new Thread(
function (host, docId) {
const conn = new Mongo(host);
const db = conn.getDB("test");
const coll = db.concurrent_update_test;
// Delete by _id so the express executor is chosen.
const result = db.runCommand({
delete: coll.getName(),
deletes: [{q: {_id: docId}, limit: 1}],
});
return result;
},
primary.host,
expressDeleteDocId,
);
expressDeleteThread.start();
// Wait until the express executor hits the pre-backoff failpoint.
expressDeleteHangFp.wait();
jsTestLog("Express delete thread has hit the expressExecutorHangBeforeLogAndBackoff failpoint");
// Set concurrent write transactions to 1. If the express thread is holding write tickets while
// sleeping, there will be no tickets available for the concurrent write below.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
executionControlConcurrentWriteTransactions: 1,
}),
);
// Attempt to do a write. Success means the express delete thread is not holding write tickets.
assert.commandWorked(
db.runCommand({
insert: coll.getName(),
documents: [{_id: numDocs + 21, a: "bar", b: 300}],
maxTimeMS: 30000,
}),
);
jsTestLog("Successfully inserted document, confirming express delete thread is not holding write tickets");
// Disable the failpoints and allow the express delete to finish.
expressDeleteHangFp.off();
expressDeleteWriteConflictFp.off();
jsTestLog("Failpoints disabled, allowing blocked express delete operation to proceed");
// Wait for the express delete thread to complete.
expressDeleteThread.join();
const expressDeleteResult = expressDeleteThread.returnData();
jsTest.log("Express delete thread completed with result: " + tojson(expressDeleteResult));
// =====================================================================================
// Test 4: Express executor HOLDS the write ticket during backoff once the
// MaxReleaseTicketCycles fallback threshold is crossed.
//
// With maxReleaseTicketCycles=0, any numAttempts > 0 triggers the fallback. The first WCE
// (numAttempts=0) still uses the release-ticket path; the second WCE (numAttempts=1 > 0)
// takes the hold-ticket path. We use skip=1 on the hang failpoint to skip the first
// (release-ticket) activation and catch the thread at the second (hold-ticket) activation,
// then verify that a concurrent write cannot acquire the ticket during that sleep.
// =====================================================================================
// Reset the write ticket limit before starting the fallback test.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
executionControlConcurrentWriteTransactions: 100,
}),
);
// Lower the fallback threshold to 0 so the hold-ticket path is taken on the second WCE.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
internalQueryWriteConflictBackoffMaxReleaseTicketCycles: 0,
}),
);
// Insert a fresh document for the fallback test.
const fallbackTestDocId = numDocs + 30;
assert.commandWorked(coll.insertOne({_id: fallbackTestDocId, a: "foo", b: 40}));
// Verify the update-by-_id uses the express executor.
{
const explainCmd = {
explain: {
update: coll.getName(),
updates: [{q: {_id: fallbackTestDocId}, u: {$set: {a: "bar"}}}],
},
verbosity: "queryPlanner",
};
const explain = assert.commandWorked(db.runCommand(explainCmd));
assert(isExpress(db, explain), "Update by _id should use express executor: " + tojson(explain));
}
// Configure the hang failpoint with skip=1: skip the first activation (release-ticket path at
// numAttempts=0) and only pause at the second activation (hold-ticket path at numAttempts=1).
const fallbackHangFp = configureFailPoint(primary, "expressExecutorHangBeforeLogAndBackoff", {}, {"skip": 1});
// Enable the WCE failpoint so every express write attempt conflicts.
const fallbackWCEFp = configureFailPoint(primary, "throwWriteConflictExceptionInExpressWrite");
// Start an express update thread.
const fallbackThread = new Thread(
function (host, docId) {
const conn = new Mongo(host);
const db = conn.getDB("test");
const coll = db.concurrent_update_test;
const result = db.runCommand({
update: coll.getName(),
updates: [{q: {_id: docId}, u: {$set: {a: "baz"}}}],
});
return result;
},
primary.host,
fallbackTestDocId,
);
fallbackThread.start();
// Wait for the second hang activation — the thread is now in the hold-ticket fallback path.
fallbackHangFp.wait();
jsTestLog("Express fallback thread is paused in the hold-ticket backoff path");
// Reduce available tickets to 1. The express thread is holding the only ticket, so any
// concurrent writer must wait.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
executionControlConcurrentWriteTransactions: 1,
}),
);
// A concurrent write should fail to acquire the ticket within maxTimeMS because the express
// thread is holding it while sleeping (the hold-ticket fallback path).
assert.commandFailedWithCode(
db.runCommand({
insert: coll.getName(),
documents: [{_id: numDocs + 31, a: "bar", b: 400}],
maxTimeMS: 3000,
}),
[ErrorCodes.MaxTimeMSExpired, ErrorCodes.LockTimeout],
);
jsTestLog("Concurrent write correctly failed — express fallback thread is holding the ticket");
// Disable the failpoints and allow the express thread to finish.
fallbackHangFp.off();
fallbackWCEFp.off();
jsTestLog("Failpoints disabled, allowing blocked express fallback operation to proceed");
fallbackThread.join();
jsTest.log("Express fallback thread completed with result: " + tojson(fallbackThread.returnData()));
// =====================================================================================
// Test 5: Express executor releases the write ticket while sleeping for
// TemporarilyUnavailable backoff (WaitingForBackoff path).
// =====================================================================================
// Reset the write ticket limit before starting the test.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
executionControlConcurrentWriteTransactions: 100,
}),
);
// Insert a fresh document for the TemporarilyUnavailable test.
const tuTestDocId = numDocs + 40;
assert.commandWorked(coll.insertOne({_id: tuTestDocId, a: "foo", b: 50}));
// Verify the update-by-_id uses the express executor.
{
const explainCmd = {
explain: {
update: coll.getName(),
updates: [{q: {_id: tuTestDocId}, u: {$set: {a: "bar"}}}],
},
verbosity: "queryPlanner",
};
const explain = assert.commandWorked(db.runCommand(explainCmd));
assert(isExpress(db, explain), "Update by _id should use express executor: " + tojson(explain));
}
// Enable the hang failpoint inside the WaitingForBackoff callback (fires after the ticket is
// released but before the sleep, confirming we are in the released-ticket state).
const tuHangFp = configureFailPoint(primary, "expressExecutorHangBeforeTemporarilyUnavailableBackoff");
// Enable the failpoint that injects a TemporarilyUnavailableException into express writes.
const tuFp = configureFailPoint(primary, "throwTemporarilyUnavailableExceptionInExpressWrite");
// Start an express update thread.
const tuThread = new Thread(
function (host, docId) {
const conn = new Mongo(host);
const db = conn.getDB("test");
const coll = db.concurrent_update_test;
const result = db.runCommand({
update: coll.getName(),
updates: [{q: {_id: docId}, u: {$set: {a: "bar"}}}],
});
return result;
},
primary.host,
tuTestDocId,
);
tuThread.start();
// Wait until the express thread is paused inside the WaitingForBackoff callback, at which
// point the ticket has already been released by temporarilyReleaseResourcesAndYield.
tuHangFp.wait();
jsTestLog(
"Express TemporarilyUnavailable thread has hit the expressExecutorHangBeforeTemporarilyUnavailableBackoff failpoint",
);
// Set concurrent write transactions to 1. If the thread were still holding the ticket this
// insert would time out.
assert.commandWorked(
primary.adminCommand({
setParameter: 1,
executionControlConcurrentWriteTransactions: 1,
}),
);
// This insert should succeed because the express thread released its ticket before sleeping.
assert.commandWorked(
db.runCommand({
insert: coll.getName(),
documents: [{_id: numDocs + 41, a: "bar", b: 500}],
maxTimeMS: 30000,
}),
);
jsTestLog(
"Successfully inserted document, confirming express TemporarilyUnavailable thread is not holding write tickets",
);
// Disable the failpoints and allow the express thread to finish.
tuHangFp.off();
tuFp.off();
jsTestLog("Failpoints disabled, allowing blocked express TemporarilyUnavailable operation to proceed");
tuThread.join();
jsTest.log("Express TemporarilyUnavailable thread completed with result: " + tojson(tuThread.returnData()));
// Cleanup.
rst.stopSet();

View File

@ -30,12 +30,31 @@
#include "mongo/db/exec/express/express_plan.h"
#include "mongo/db/shard_role/transaction_resources.h"
#include "mongo/db/storage/exceptions.h"
#include "mongo/util/fail_point.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
namespace mongo {
MONGO_FAIL_POINT_DEFINE(throwWriteConflictExceptionInExpressWrite);
MONGO_FAIL_POINT_DEFINE(throwTemporarilyUnavailableExceptionInExpressWrite);
namespace express {
void throwIfExpressWriteConflictFailpointEnabled() {
if (MONGO_unlikely(throwWriteConflictExceptionInExpressWrite.shouldFail())) {
throwWriteConflictException("Failpoint: throwWriteConflictExceptionInExpressWrite");
}
}
void throwIfExpressTemporarilyUnavailableFailpointEnabled() {
if (MONGO_unlikely(throwTemporarilyUnavailableExceptionInExpressWrite.shouldFail())) {
throwTemporarilyUnavailableException(
"Failpoint: throwTemporarilyUnavailableExceptionInExpressWrite");
}
}
void releaseShardFilterResources(ScopedCollectionFilter&) {}
void restoreShardFilterResources(ScopedCollectionFilter&) {}

View File

@ -87,6 +87,9 @@ void logRecordNotFound(OperationContext* opCtx,
const BSONObj& keyPattern,
const NamespaceString& ns);
void throwIfExpressWriteConflictFailpointEnabled();
void throwIfExpressTemporarilyUnavailableFailpointEnabled();
/**
* The 'PlanProgress' variant (defined below) represents the possible return values from an
* execution step in 'ExpressPlan' execution. It holds one of the following results.
@ -929,6 +932,8 @@ public:
exceptionRecoveryPolicy,
UpdateOperation::name,
[&]() {
throwIfExpressWriteConflictFailpointEnabled();
throwIfExpressTemporarilyUnavailableFailpointEnabled();
mutablebson::Document doc{};
bool docWasModified;
std::tie(newObj, docWasModified) =
@ -1010,6 +1015,8 @@ public:
exceptionRecoveryPolicy,
DeleteOperation::name,
[&]() {
throwIfExpressWriteConflictFailpointEnabled();
throwIfExpressTemporarilyUnavailableFailpointEnabled();
bool noWarn = false;
WriteUnitOfWork wunit(opCtx);
collection_internal::deleteDocument(opCtx,

View File

@ -67,6 +67,7 @@
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/logv2/log_component.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
#include <memory>
#include <utility>
@ -78,6 +79,10 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
namespace mongo {
MONGO_FAIL_POINT_DEFINE(expressExecutorHangBeforeLogAndBackoff);
MONGO_FAIL_POINT_DEFINE(expressExecutorHangBeforeTemporarilyUnavailableBackoff);
namespace {
class DoNotRecoverPolicy final : public express::ExceptionRecoveryPolicy {
public:
@ -439,35 +444,64 @@ template <class Plan>
void PlanExecutorExpress<Plan>::readyPlanExecution(express::WaitingForYield,
size_t& numUnavailabilityYieldsSinceLastSuccess,
size_t& numWriteConflictYieldsSinceLastSuccess) {
// Capture count before incrementing so the lambda sees the pre-increment value.
// No increasing write conflict metric as it was already increased before this point
// in ExceptionRecoveryPolicy::recoverFromNonFatalWriteException
logWriteConflictAndBackoff(numWriteConflictYieldsSinceLastSuccess++,
"plan execution",
"write contention during express execution"_sd,
NamespaceStringOrUUID(_nss));
auto numAttempts = numWriteConflictYieldsSinceLastSuccess++;
// TODO SERVER-116168: Is this the desired behavior?
_plan.temporarilyReleaseResourcesAndYield(_opCtx, []() {
// No-op.
});
// When release-ticket backoff is enabled, we yield the ticket before sleeping so other
// writers can make progress. However, after
// 'gInternalQueryWriteConflictBackoffMaxReleaseTicketCycles' consecutive conflicts we fall back
// to sleeping while holding the ticket. Holding the ticket throttles concurrent writers from
// entering the conflict zone, which helps break a write-conflict storm.
const bool releaseTicketEnabled = internalQueryEnableWriteConflictBackoffWithoutTicket.load();
const bool fallbackToHoldTicket = releaseTicketEnabled &&
static_cast<int64_t>(numAttempts) >=
gInternalQueryWriteConflictBackoffMaxReleaseTicketCycles.load();
if (releaseTicketEnabled && !fallbackToHoldTicket) {
_plan.temporarilyReleaseResourcesAndYield(_opCtx, [this, numAttempts]() {
if (MONGO_unlikely(expressExecutorHangBeforeLogAndBackoff.shouldFail())) {
expressExecutorHangBeforeLogAndBackoff.pauseWhileSet(_opCtx);
}
logWriteConflictAndBackoff(numAttempts,
"plan execution",
"write contention during express execution"_sd,
NamespaceStringOrUUID(_nss));
});
} else {
if (MONGO_unlikely(expressExecutorHangBeforeLogAndBackoff.shouldFail())) {
expressExecutorHangBeforeLogAndBackoff.pauseWhileSet(_opCtx);
}
logWriteConflictAndBackoff(numAttempts,
"plan execution",
"write contention during express execution"_sd,
NamespaceStringOrUUID(_nss));
_plan.temporarilyReleaseResourcesAndYield(_opCtx, []() {});
}
}
template <class Plan>
void PlanExecutorExpress<Plan>::readyPlanExecution(express::WaitingForBackoff,
size_t& numUnavailabilityYieldsSinceLastSuccess,
size_t& numWriteConflictYieldsSinceLastSuccess) {
handleTemporarilyUnavailableException(_opCtx,
numUnavailabilityYieldsSinceLastSuccess++,
"plan executor",
NamespaceStringOrUUID(_nss),
Status(ErrorCodes::TemporarilyUnavailable,
"resource contention during express execution"_sd),
numWriteConflictYieldsSinceLastSuccess);
// TODO SERVER-116168: Is this the desired behavior?
_plan.temporarilyReleaseResourcesAndYield(_opCtx, []() {
// No-op.
});
// Capture count before incrementing so the lambda sees the pre-increment value.
auto numUnavailabilityAttempts = numUnavailabilityYieldsSinceLastSuccess++;
_plan.temporarilyReleaseResourcesAndYield(
_opCtx, [this, numUnavailabilityAttempts, &numWriteConflictYieldsSinceLastSuccess]() {
if (MONGO_unlikely(
expressExecutorHangBeforeTemporarilyUnavailableBackoff.shouldFail())) {
expressExecutorHangBeforeTemporarilyUnavailableBackoff.pauseWhileSet(_opCtx);
}
handleTemporarilyUnavailableException(
_opCtx,
numUnavailabilityAttempts,
"plan executor",
NamespaceStringOrUUID(_nss),
Status(ErrorCodes::TemporarilyUnavailable,
"resource contention during express execution"_sd),
numWriteConflictYieldsSinceLastSuccess);
});
}
template <class Plan>

View File

@ -1009,6 +1009,21 @@ server_parameters:
default: false
redact: false
internalQueryWriteConflictBackoffMaxReleaseTicketCycles:
description: >-
When internalQueryEnableWriteConflictBackoffWithoutTicket is true, release-ticket WCE
backoff is used for up to this many WriteConflicts-in-a-row per operation. After the
threshold is crossed, the operation falls back to sleeping while holding its ticket.
Holding the ticket throttles concurrent writers in a write-conflict storm. A very
large value disables the fallback (pure release-ticket behavior).
set_at: [startup, runtime]
cpp_varname: gInternalQueryWriteConflictBackoffMaxReleaseTicketCycles
cpp_vartype: AtomicWord<int>
default: 2147483647
validator:
gte: 0
redact: false
internalChangeStreamRespectsReadPreference:
description: "Enables change stream cursors to respect readPreference after replica set elections."
set_at: [startup, runtime]