SERVER-124943 Add queuing support to the IngressRequestRateLimiter (#53599)

GitOrigin-RevId: ca1d45dd0cea41643ebe69dbe94bd889feb47c49
This commit is contained in:
Matt Broadstone 2026-05-20 10:09:25 -04:00 committed by MongoDB Bot
parent c14866f57c
commit d00374ee45
23 changed files with 1891 additions and 85 deletions

View File

@ -0,0 +1,624 @@
/**
* Tests for the ingressRequestAdmissionMaxQueueDepth server parameter and end-to-end queuing
* behavior of the ingress request rate limiter.
*
* @tags: [requires_fcv_80]
*/
import {Thread} from "jstests/libs/parallelTester.js";
import {
getRateLimiterStats,
kSlowestRefreshRateSecs,
makeAuthConn,
measureQueueStats,
runTestReplSet,
runTestSharded,
runTestStandalone,
withForcedQueueing,
withRateLimitingDisabled,
} from "jstests/noPassthrough/admission/libs/ingress_request_rate_limiter_helper.js";
// The test intentionally sets an extremely slow admission rate, so a queued find can otherwise
// wait for a very long time. Capping the amount of time a queued find can wait prevents the test
// from hanging indefinitely if the killAllSessions command fails. Keep this timeout conservatively
// high for slower TSAN/debug builds.
const kQueuedFindMaxTimeMS = 30000;
const kBurstOneSecs = Math.round(1.0 / kSlowestRefreshRateSecs);
const kSlowRateBurstOneParams = {
ingressRequestAdmissionBurstCapacitySecs: kBurstOneSecs,
ingressRequestRateLimiterEnabled: true,
};
/** Forces the rate limiter's token bucket to be re-clamped to burst=1 at the slow rate. */
function forceSlowRateBurstOne(exemptConn) {
assert.commandWorked(
exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionBurstCapacitySecs: kBurstOneSecs}),
);
}
/** Enables the fractional-rate failpoint override for deterministic slow-rate queuing. */
function enableFractionalRateOverride(exemptConn, rate) {
assert.commandWorked(
exemptConn.adminCommand({
configureFailPoint: "ingressRequestRateLimiterFractionalRateOverride",
mode: "alwaysOn",
data: {rate},
}),
);
}
/** Disables the fractional-rate failpoint override to use the configured server rate. */
function disableFractionalRateOverride(exemptConn) {
assert.commandWorked(
exemptConn.adminCommand({
configureFailPoint: "ingressRequestRateLimiterFractionalRateOverride",
mode: "off",
}),
);
}
function getQueuedOpsWithComment(exemptConn, comment) {
return exemptConn
.getDB("admin")
.aggregate([
{$currentOp: {allUsers: true, localOps: true}},
{$match: {"command.comment": comment, "currentQueue.name": "ingress"}},
])
.toArray();
}
function countQueuedOpsWithComment(exemptConn, comment) {
return getQueuedOpsWithComment(exemptConn, comment).length;
}
function waitForQueuedOp(exemptConn, comment) {
let queuedOp;
assert.soonRetryOnNetworkErrors(
() => {
const ops = getQueuedOpsWithComment(exemptConn, comment);
if (ops.length === 0) {
return false;
}
queuedOp = ops[0];
return true;
},
"expected the queued request to appear in $currentOp with currentQueue.name == 'ingress'",
30000,
200,
);
return queuedOp;
}
/** Kills all queued ops and waits for the ingress queue to fully drain. */
function killQueuedOpsAndWaitForDrain(exemptConn, beforeStats) {
assert.soonRetryOnNetworkErrors(
() => {
assert.commandWorked(exemptConn.adminCommand({killAllSessions: []}));
// Also kill any no-lsid ops stuck in the rate limiter on the local node (e.g.
// endSessions commands issued by the JS GC finalizer). These ops have no lsid so
// killAllSessions cannot reach them, and they sleep indefinitely at the slow rate.
const localOps = exemptConn
.getDB("admin")
.aggregate([{$currentOp: {allUsers: true, localOps: true}}])
.toArray();
for (const op of localOps) {
if (!op.lsid && op.opid && op.command && op.command.endSessions) {
exemptConn.adminCommand({killOp: 1, op: op.opid});
}
}
const stats = getRateLimiterStats(exemptConn);
return (
stats.addedToQueue - beforeStats.addedToQueue === stats.removedFromQueue - beforeStats.removedFromQueue
);
},
"timed out waiting for queued ingress requests to drain after killAllSessions",
30000,
200,
);
}
function testCurrentOpAndServerStatusReportIngressQueue(conn, exemptConn) {
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 5}));
const before = getRateLimiterStats(exemptConn);
const beforeServerStatus = exemptConn.getDB("admin").serverStatus();
const beforeQueuesIngress = beforeServerStatus.queues ? beforeServerStatus.queues.ingress : undefined;
const kComment = "testCurrentOpAndServerStatusReportIngressQueue";
const t = new Thread(
async function (host, maxTimeMS, comment) {
const {makeAuthConn} = await import(
"jstests/noPassthrough/admission/libs/ingress_request_rate_limiter_helper.js"
);
const c = makeAuthConn(host);
return c.getDB("test").runCommand({find: "col", filter: {}, maxTimeMS, comment});
},
conn.host,
kQueuedFindMaxTimeMS,
kComment,
);
withForcedQueueing(exemptConn, () => {
// While hangInRateLimiter is active, the thread's find is guaranteed to take the queueing
// path so we can deterministically observe it in $currentOp and in serverStatus.
t.start();
// (1) $currentOp must surface the queued op as being in the "ingress" queue and report
// operation-level queue-time stats under queues.ingress.
const queuedOp = waitForQueuedOp(exemptConn, kComment);
assert.eq(queuedOp.currentQueue.name, "ingress", "queued op should report 'ingress' as currentQueue.name", {
op: queuedOp,
});
assert(queuedOp.queues && queuedOp.queues.ingress, "queued op should include queues.ingress metrics", {
op: queuedOp,
});
assert.gte(
queuedOp.queues.ingress.totalTimeQueuedMicros,
queuedOp.currentQueue.timeQueuedMicros,
"queues.ingress.totalTimeQueuedMicros should include at least currentQueue.timeQueuedMicros",
{op: queuedOp},
);
// Queue time should be observed as progressing while the op remains queued.
const initialQueuedMicros = queuedOp.currentQueue.timeQueuedMicros;
assert.soonRetryOnNetworkErrors(
() => {
const ops = getQueuedOpsWithComment(exemptConn, kComment);
if (ops.length === 0) {
return false;
}
return ops[0].currentQueue.timeQueuedMicros > initialQueuedMicros;
},
"expected queued op currentQueue.timeQueuedMicros to increase while queued",
30000,
200,
);
// (2) serverStatus.network.ingressRequestRateLimiter should reflect non-regressed general
// queueing counters for IRRL.
const after = getRateLimiterStats(exemptConn);
assert.gte(
after.addedToQueue - before.addedToQueue,
1,
"serverStatus should report the queued request via ingressRequestRateLimiter.addedToQueue",
{before, after},
);
assert.gte(
after.attemptedAdmissions - before.attemptedAdmissions,
1,
"expected at least one attempted admission for the queued request",
{before, after},
);
assert.gte(
after.attemptedAdmissions - before.attemptedAdmissions,
after.addedToQueue - before.addedToQueue,
"attempted admissions should be at least the number of queued admissions",
{before, after},
);
assert.eq(
after.rejectedAdmissions - before.rejectedAdmissions,
0,
"queued request should not be rejected while maxQueueDepth has headroom",
{before, after},
);
// (3) serverStatus.queues.ingress should keep exposing ingress queue stats fields.
const statusWithQueuedOp = exemptConn.getDB("admin").serverStatus();
const queuesIngress = statusWithQueuedOp.queues ? statusWithQueuedOp.queues.ingress : undefined;
assert.neq(queuesIngress, undefined, "expected serverStatus.queues.ingress", {
queues: statusWithQueuedOp.queues,
});
assert(queuesIngress.normalPriority, "serverStatus.queues.ingress should include normalPriority stats", {
queuesIngress,
});
const normalPriorityStats = queuesIngress.normalPriority;
assert.neq(beforeQueuesIngress, undefined, "expected baseline serverStatus.queues.ingress");
assert(beforeQueuesIngress.normalPriority, "expected baseline serverStatus.queues.ingress.normalPriority", {
beforeQueuesIngress,
});
assert(
normalPriorityStats.hasOwnProperty("totalTimeQueuedMicros"),
"expected serverStatus.queues.ingress.normalPriority.totalTimeQueuedMicros",
{normalPriorityStats},
);
assert(
beforeQueuesIngress.normalPriority.hasOwnProperty("totalTimeQueuedMicros"),
"expected baseline serverStatus.queues.ingress.normalPriority.totalTimeQueuedMicros",
{beforeQueuesIngress},
);
assert.gte(
normalPriorityStats.totalTimeQueuedMicros,
beforeQueuesIngress.normalPriority.totalTimeQueuedMicros,
"expected non-decreasing serverStatus.queues.ingress.normalPriority.totalTimeQueuedMicros",
{beforeQueuesIngress, queuesIngress},
);
});
withRateLimitingDisabled(exemptConn, () => {
killQueuedOpsAndWaitForDrain(exemptConn, before);
t.join();
});
// Post-drain invariant: queued requests have matching add/remove accounting.
const drained = getRateLimiterStats(exemptConn);
assert.eq(
drained.addedToQueue - before.addedToQueue,
drained.removedFromQueue - before.removedFromQueue,
"expected queued requests to be fully removed after cleanup",
{before, drained},
);
}
// ---------------------------------------------------------------------------
// Test: ingressRequestAdmissionMaxQueueDepth parameter validation
// ---------------------------------------------------------------------------
function testMaxQueueDepthParameterValidation(conn, exemptConn) {
assert.commandFailedWithCode(
exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: -1}),
ErrorCodes.BadValue,
"negative queue depth should be rejected",
);
assert.commandWorked(
exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 0}),
"zero queue depth (disabled) should be accepted",
);
assert.commandWorked(
exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 100}),
"positive queue depth should be accepted",
);
}
// ---------------------------------------------------------------------------
// Test: With maxQueueDepth = 0 (default), excess requests are rejected immediately and
// addedToQueue stays zero.
// ---------------------------------------------------------------------------
function testQueueDisabledRejectsImmediately(conn, exemptConn) {
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 0}));
forceSlowRateBurstOne(exemptConn);
// Connection setup happens while the client is unauthenticated and is therefore exempt
// from the rate limiter, so the multi-command auth handshake completes even with burst=1.
const conn2 = makeAuthConn(conn.host);
// Consume the single burst token.
assert.commandWorked(conn2.getDB("test").runCommand({find: "col", filter: {}}));
const statsDelta = measureQueueStats(exemptConn, () => {
assert.commandFailedWithCode(
conn2.getDB("test").runCommand({find: "col", filter: {}}),
ErrorCodes.IngressRequestRateLimitExceeded,
"request should be rejected when queue is disabled",
);
});
assert.eq(statsDelta.addedToQueue, 0, "no requests should enter the queue when maxQueueDepth=0");
assert.gte(statsDelta.rejectedAdmissions, 1, "rejected count should increment");
}
// ---------------------------------------------------------------------------
// Test: Concurrent requests that exceed burst capacity queue and eventually succeed.
// Verifies addedToQueue and removedFromQueue metrics are consistent.
//
// Disables the slow-rate failpoint and uses a moderate rate so queued leases drain quickly
// and threads can complete without being killed.
// ---------------------------------------------------------------------------
function testConcurrentRequestsQueueAndSucceed(conn, exemptConn) {
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 20}));
// Disable the slow-rate failpoint that the helper applies at startup.
disableFractionalRateOverride(exemptConn);
// 10 req/sec with burst=1: 5 concurrent threads each making findOne requests will reliably end up queuing.
// Set burstCapacitySecs first so the intermediate state still has burst >= 1.
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionBurstCapacitySecs: 0.1}));
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionRatePerSec: 10}));
const numThreads = 5;
const threads = [];
for (let i = 0; i < numThreads; i++) {
const t = new Thread(async function (host) {
const {makeAuthConn} = await import(
"jstests/noPassthrough/admission/libs/ingress_request_rate_limiter_helper.js"
);
const authConn = makeAuthConn(host);
assert.commandWorked(authConn.getDB("test").runCommand({find: "col", filter: {}}));
}, conn.host);
threads.push(t);
}
const statsDelta = measureQueueStats(exemptConn, () => {
for (const t of threads) {
t.start();
}
for (const t of threads) {
t.join();
}
});
const queued = statsDelta.addedToQueue;
const dequeued = statsDelta.removedFromQueue;
assert.gte(queued, 1, "at least some requests should have queued");
assert.eq(queued, dequeued, "every queued request should have been dequeued on success");
assert.eq(statsDelta.interruptedInQueue, 0, "no requests should have been interrupted");
}
// ---------------------------------------------------------------------------
// Test: When the queue is at capacity, requests beyond the limit are rejected.
// ---------------------------------------------------------------------------
function testQueueAtCapacityRejectsExcess(conn, exemptConn) {
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 2}));
const before = getRateLimiterStats(exemptConn);
const kComment = "testQueueAtCapacityRejectsExcess";
const numThreads = 5;
const threads = [];
for (let i = 0; i < numThreads; i++) {
const t = new Thread(
async function (host, maxTimeMS, comment) {
const {makeAuthConn} = await import(
"jstests/noPassthrough/admission/libs/ingress_request_rate_limiter_helper.js"
);
const authConn = makeAuthConn(host);
const res = authConn.getDB("test").runCommand({find: "col", filter: {}, maxTimeMS, comment});
if (!res.ok) {
// Requests rejected at queue capacity get IngressRequestRateLimitExceeded;
// requests that queued and were then killed (by the end of test drain) get
// Interrupted.
assert.commandFailedWithCode(
res,
[ErrorCodes.IngressRequestRateLimitExceeded, ErrorCodes.Interrupted],
"request should be rejected when queue is at capacity, or interrupted at test cleanup",
);
}
return res;
},
conn.host,
kQueuedFindMaxTimeMS,
kComment,
);
threads.push(t);
}
withForcedQueueing(exemptConn, () => {
// hangInRateLimiter forces every admit attempt through the queueing path; with
// maxQueueDepth=2, the first two attempts enqueue and the next three hit the
// queue-full check in RateLimiter::_impl->enqueue() and get IngressRequestRateLimitExceeded.
for (const t of threads) {
t.start();
}
// Wait until exactly maxQueueDepth of our ops are queued and the rest have been rejected.
assert.soonRetryOnNetworkErrors(
() => {
const numQueuedOps = countQueuedOpsWithComment(exemptConn, kComment);
const s = getRateLimiterStats(exemptConn);
return numQueuedOps == 2 && s.rejectedAdmissions - before.rejectedAdmissions >= 3;
},
"expected queue to fill and excess requests to be rejected",
30000,
200,
);
const after = getRateLimiterStats(exemptConn);
assert.lte(
after.addedToQueue - before.addedToQueue,
2,
"at most maxQueueDepth requests should have been enqueued",
);
assert.gte(
after.rejectedAdmissions - before.rejectedAdmissions,
1,
"at least one request should be rejected when the queue is full",
);
});
withRateLimitingDisabled(exemptConn, () => {
killQueuedOpsAndWaitForDrain(exemptConn, before);
for (const t of threads) {
t.join();
}
});
}
// ---------------------------------------------------------------------------
// Test: Requests interrupted while waiting in the queue increment interruptedInQueue.
// ---------------------------------------------------------------------------
function testInterruptedQueuedRequestsIncrementCounter(conn, exemptConn) {
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 10}));
const before = getRateLimiterStats(exemptConn);
const kComment = "testInterruptedQueuedRequestsIncrementCounter";
const numThreads = 3;
const threads = [];
for (let i = 0; i < numThreads; i++) {
const t = new Thread(
async function (host, maxTimeMS, comment) {
const {makeAuthConn} = await import(
"jstests/noPassthrough/admission/libs/ingress_request_rate_limiter_helper.js"
);
const c = makeAuthConn(host);
// This command will queue; we expect it to be killed.
return c.getDB("test").runCommand({find: "col", filter: {}, maxTimeMS, comment});
},
conn.host,
kQueuedFindMaxTimeMS,
kComment,
);
threads.push(t);
}
withForcedQueueing(exemptConn, () => {
for (const t of threads) {
t.start();
}
// Wait until all threads have a request visible in currentOp before killing, so that each
// one is counted in interruptedInQueue.
assert.soonRetryOnNetworkErrors(
() => countQueuedOpsWithComment(exemptConn, kComment) >= numThreads,
"expected all threads to have a request in the queue",
30000,
200,
);
// Interrupt all queued operations, then wait until none of our tagged ops remain in
// currentOp.
assert.commandWorked(exemptConn.adminCommand({killAllSessions: []}));
assert.soonRetryOnNetworkErrors(
() => countQueuedOpsWithComment(exemptConn, kComment) === 0,
"expected all queued threads to disappear from currentOp after killAllSessions",
30000,
200,
);
});
withRateLimitingDisabled(exemptConn, () => {
killQueuedOpsAndWaitForDrain(exemptConn, before);
for (const t of threads) {
t.join();
}
});
const after = getRateLimiterStats(exemptConn);
assert.gte(
after.interruptedInQueue - before.interruptedInQueue,
numThreads,
"every thread's queued request should appear in interruptedInQueue",
);
assert.eq(
after.addedToQueue - before.addedToQueue,
after.removedFromQueue - before.removedFromQueue,
"every queued request should be dequeued on interruption",
);
}
// ---------------------------------------------------------------------------
// Test: Dynamic update of ingressRequestAdmissionMaxQueueDepth takes effect immediately.
// Increasing the queue depth allows more requests to queue; decreasing to 0 causes
// immediate rejections again.
// ---------------------------------------------------------------------------
function testDynamicQueueDepthUpdate(conn, exemptConn) {
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 0}));
forceSlowRateBurstOne(exemptConn);
const conn2 = makeAuthConn(conn.host);
// Consume burst then verify rejection with queue disabled.
assert.commandWorked(conn2.getDB("test").runCommand({find: "col", filter: {}}));
assert.commandFailedWithCode(
conn2.getDB("test").runCommand({find: "col", filter: {}}),
ErrorCodes.IngressRequestRateLimitExceeded,
"should reject when queue disabled",
);
// Enable queuing.
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 5}));
const before = getRateLimiterStats(exemptConn);
// Now a request that would have been rejected should queue instead.
const kComment = "testDynamicQueueDepthUpdate";
const t = new Thread(
async function (host, maxTimeMS, comment) {
const {makeAuthConn} = await import(
"jstests/noPassthrough/admission/libs/ingress_request_rate_limiter_helper.js"
);
const c = makeAuthConn(host);
return c.getDB("test").runCommand({find: "col", filter: {}, maxTimeMS, comment});
},
conn.host,
kQueuedFindMaxTimeMS,
kComment,
);
t.start();
assert.soonRetryOnNetworkErrors(
() => countQueuedOpsWithComment(exemptConn, kComment) >= 1,
"expected request to enter queue after maxQueueDepth was increased",
30000,
200,
);
withRateLimitingDisabled(exemptConn, () => {
killQueuedOpsAndWaitForDrain(exemptConn, before);
t.join();
});
// Disable queuing again.
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestAdmissionMaxQueueDepth: 0}));
assert.commandFailedWithCode(
conn2.getDB("test").runCommand({find: "col", filter: {}}),
ErrorCodes.IngressRequestRateLimitExceeded,
"should reject again after queue disabled",
);
}
function resetTestState(exemptConn) {
const before = getRateLimiterStats(exemptConn);
withRateLimitingDisabled(exemptConn, () => {
killQueuedOpsAndWaitForDrain(exemptConn, before);
});
// Force at least one token to be available before restoring slow-rate behavior.
disableFractionalRateOverride(exemptConn);
assert.commandWorked(
exemptConn.adminCommand({
setParameter: 1,
ingressRequestAdmissionRatePerSec: 1000,
ingressRequestAdmissionBurstCapacitySecs: 1,
ingressRequestRateLimiterEnabled: true,
}),
);
assert.soon(
() => getRateLimiterStats(exemptConn).totalAvailableTokens >= 1,
"timed out waiting for ingress token refill in test preamble",
5000,
100,
);
// Restore deterministic baseline used by these tests.
enableFractionalRateOverride(exemptConn, kSlowestRefreshRateSecs);
assert.commandWorked(
exemptConn.adminCommand({
setParameter: 1,
ingressRequestAdmissionRatePerSec: 1,
ingressRequestAdmissionBurstCapacitySecs: kBurstOneSecs,
ingressRequestAdmissionMaxQueueDepth: 0,
ingressRequestRateLimiterEnabled: true,
}),
);
}
const kTopologies = [
{name: "standalone", runner: runTestStandalone},
{name: "replset", runner: runTestReplSet},
{name: "sharded", runner: runTestSharded},
];
const kTests = [
testCurrentOpAndServerStatusReportIngressQueue,
testMaxQueueDepthParameterValidation,
testQueueDisabledRejectsImmediately,
testConcurrentRequestsQueueAndSucceed,
testQueueAtCapacityRejectsExcess,
testInterruptedQueuedRequestsIncrementCounter,
testDynamicQueueDepthUpdate,
];
for (const {name, runner} of kTopologies) {
jsTest.log(`Running queuing suite on topology: ${name}`);
runner({startupParams: kSlowRateBurstOneParams, auth: true}, (conn, exemptConn) => {
for (const fn of kTests) {
jsTest.log(`Running ${fn.name} on topology: ${name}`);
resetTestState(exemptConn);
fn(conn, exemptConn);
}
});
}

View File

@ -59,6 +59,24 @@ export function authenticateConnection(conn) {
admin.auth(kUser, kPass);
}
/**
* Returns a new authenticated non-exempt connection to host.
*/
export function makeAuthConn(host) {
const conn = new Mongo(host);
authenticateConnection(conn);
return conn;
}
/**
* Returns a new authenticated exempt connection to host.
*/
export function makeExemptConn(host) {
const conn = new Mongo(`mongodb://${host}/?appName=${kRateLimiterExemptAppName}`);
authenticateConnection(conn);
return conn;
}
/**
* Returns the stats for the ingress request rate limiter.
*/
@ -68,6 +86,50 @@ export function getRateLimiterStats(exemptConn) {
return status.network.ingressRequestRateLimiter;
}
/**
* Measures ingress queue stats around an operation and returns the stat deltas.
*/
export function measureQueueStats(exemptConn, operationFn) {
const before = getRateLimiterStats(exemptConn);
operationFn();
const after = getRateLimiterStats(exemptConn);
return {
addedToQueue: after.addedToQueue - before.addedToQueue,
removedFromQueue: after.removedFromQueue - before.removedFromQueue,
interruptedInQueue: after.interruptedInQueue - before.interruptedInQueue,
rejectedAdmissions: after.rejectedAdmissions - before.rejectedAdmissions,
};
}
/**
* Runs the supplied function with the ingress request rate limiter disabled, restoring it
* afterwards.
*/
export function withRateLimitingDisabled(exemptConn, fn) {
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestRateLimiterEnabled: 0}));
try {
fn();
} finally {
assert.commandWorked(exemptConn.adminCommand({setParameter: 1, ingressRequestRateLimiterEnabled: 1}));
}
}
/**
* Activates the `hangInRateLimiter` failpoint for the duration of `fn`, which forces every
* non-exempt request through the rate limiter to queue with a long nap time. Used by tests that
* want to deterministically observe queueing behavior without depending on token-bucket state
* (which is racy under parallel admit attempts).
*/
export function withForcedQueueing(exemptConn, fn) {
assert.commandWorked(exemptConn.adminCommand({configureFailPoint: "hangInRateLimiter", mode: "alwaysOn"}));
try {
fn();
} finally {
assert.commandWorked(exemptConn.adminCommand({configureFailPoint: "hangInRateLimiter", mode: "off"}));
}
}
/**
* Expected error labels present in command responses when requests are rejected by the
* ingress request rate limiter.

View File

@ -49,6 +49,8 @@ const testKillOnClientDisconnectOpts = {
ingressConnectionEstablishmentRatePerSec: 1,
ingressConnectionEstablishmentBurstCapacitySecs: 1,
ingressConnectionEstablishmentMaxQueueDepth: maxQueueSize,
// TODO(SERVER-125073): Remove `ingressRequestRateLimiterEnabled:false` once we resolve how to hang specific rate limiters.
ingressRequestRateLimiterEnabled: false,
};
runTestStandaloneParamsSetAtStartup(testKillOnClientDisconnectOpts, testKillOnClientDisconnect);
runTestStandaloneParamsSetAtRuntime(testKillOnClientDisconnectOpts, testKillOnClientDisconnect);

View File

@ -101,6 +101,8 @@ const testRateLimiterStatsOpts = {
ingressConnectionEstablishmentRatePerSec: 1,
ingressConnectionEstablishmentBurstCapacitySecs: 1,
ingressConnectionEstablishmentMaxQueueDepth: maxQueueSize,
// TODO(SERVER-125073): Remove `ingressRequestRateLimiterEnabled:false` once we resolve how to hang specific rate limiters.
ingressRequestRateLimiterEnabled: false,
};
runTestStandaloneParamsSetAtStartup(testRateLimiterStatsOpts, testRateLimiterStats);
runTestStandaloneParamsSetAtRuntime(testRateLimiterStatsOpts, testRateLimiterStats);

View File

@ -63,10 +63,12 @@ mongo_cc_library(
],
deps = [
":app_name_exemption_matcher",
":ingress_admission_context",
":rate_limiter",
"//src/mongo:base",
"//src/mongo/db:server_feature_flags",
"//src/mongo/db:service_context",
"//src/mongo/db/admission/ticketing:admission_context",
"//src/mongo/db/auth",
"//src/mongo/rpc:client_metadata",
"//src/mongo/transport:cidr_range_list_parameter",

View File

@ -198,6 +198,11 @@ The rate limiter is controlled by the following server parameters:
- `ingressRequestRateLimiterExemptions` (document, default: {}): A document containing a list of
CIDR ranges to be exempted from ingress request rate limiting. Acceptable values here follow the
same format as the `maxIncomingConnectionsOverride`.
- `ingressRequestRateLimiterApplicationExemptions` (document, default: `{appNames: []}`): A document
containing application/driver names exempt from ingress request rate limiting.
- `ingressRequestAdmissionMaxQueueDepth` (int64, default: 0): Maximum number of requests that may
queue waiting for a token. A value of 0 disables queueing, so requests that exceed rate+burst are
rejected immediately.
## Admission Token Acquisition
@ -215,10 +220,31 @@ circumstances:
Because token acquisition currently only takes place in the `SessionWorkflow`, all internal requests
are not subject to rate limiting.
If the thread is not considered exempt, it will attempt to acquire a token from the rate limiter. If
it is able to do so, it proceeds as normal. Otherwise, the request is rejected with an error labeled
with `SystemOverloaded`. Clients will observe this label and interpret the server as being
overloaded, modifying their routing and retry logic accordingly.
If the thread is not considered exempt, it attempts to acquire a token from the rate limiter. If a
token is immediately available, the request proceeds as normal. If not, behavior depends on
`ingressRequestAdmissionMaxQueueDepth`:
- If queueing is disabled (`0`) or the queue is full, the request is rejected with an error labeled
`SystemOverloaded`. Clients will observe this label and interpret the server as being overloaded,
modifying their routing and retry logic accordingly.
- If queueing is enabled and capacity exists, the request reserves a queue position and later blocks
in service entry point until that reserved position becomes valid (or until interruption).
### Admission Token Exemption
Token acquisition happens in two stages. `SessionWorkflow` calls `admitRequest` immediately after
reading the message, before an `OperationContext` exists. The service entry point later calls
`waitForAdmission` once the command has been parsed and the exemption signal is known
(`isExemptFromAdmissionControl`). Because that signal is unavailable at the first stage, the
rate-limiter-level exemption can only rescue requests that successfully reserved a queue slot:
- Rate exceeded, queueing disabled: rejected at `admitRequest`.
- Rate exceeded, queue full: rejected at `admitRequest`.
- Rate exceeded, queue has room: queued at `admitRequest`, then exempted at `waitForAdmission`.
This asymmetry is intentional. Determining exemption status from `admitRequest` would require
constructing or partially evaluating the command before deciding to reject, which would make
rejections more expensive.
## Metrics
@ -232,6 +258,12 @@ The following `serverStatus` metrics are emitted by the `IngressRequestRateLimit
- `rejectedAdmissions`: the total number of requests that were rejected by the rate limiter.
- `exemptedAdmissions`: the total number of requests that bypassed the rate limiter due to one of
the conditions described above.
- `addedToQueue`: the total number of requests that entered the ingress request rate limiter queue.
- `removedFromQueue`: the total number of requests removed from the ingress request rate limiter
queue (admitted, interrupted, or exempted after queue reservation).
- `interruptedInQueue`: the number of queued requests interrupted before admission.
- `averageTimeQueuedMicros`: moving average queue wait time for successfully admitted queued
requests.
- `totalAvailableTokens`: the current capacity of the underlying token bucket.
# Data-Node Ingress Admission Control

View File

@ -33,7 +33,9 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/json.h"
#include "mongo/db/admission/app_name_exemption_matcher.h"
#include "mongo/db/admission/ingress_admission_context.h"
#include "mongo/db/admission/ingress_request_rate_limiter_gen.h"
#include "mongo/db/admission/ticketing/admission_context.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/service_context.h"
#include "mongo/rpc/metadata/client_metadata.h"
@ -44,6 +46,7 @@
namespace mongo {
namespace admission {
namespace {
@ -53,7 +56,9 @@ VersionedValue<CIDRList> ingressRequestRateLimiterIPExemptions;
VersionedValue<std::vector<std::string>> ingressRequestRateLimiterAppExemptions;
const auto getIngressRequestRateLimiter =
ServiceContext::declareDecoration<boost::optional<IngressRequestRateLimiter>>();
ServiceContext::declareDecoration<boost::optional<admission::IngressRequestRateLimiter>>();
const auto getDeferredAdmissionToken =
Client::declareDecoration<boost::optional<admission::RateLimiter::DeferredToken>>();
const ConstructorActionRegistererType<ServiceContext> onServiceContextCreate{
"InitIngressRequestRateLimiter", [](ServiceContext* ctx) {
@ -173,7 +178,7 @@ Status IngressRequestRateLimiterAppExemptions::setFromString(StringData str,
IngressRequestRateLimiter::IngressRequestRateLimiter()
: _rateLimiter{static_cast<double>(gIngressRequestRateLimiterRatePerSec.load()),
gIngressRequestRateLimiterBurstCapacitySecs.load(),
0,
gIngressRequestAdmissionMaxQueueDepth.load(),
"ingressRequestRateLimiter"} {
if (const auto scopedFp = ingressRequestRateLimiterFractionalRateOverride.scoped();
MONGO_unlikely(scopedFp.isActive())) {
@ -193,13 +198,59 @@ Status IngressRequestRateLimiter::admitRequest(Client* client) {
return Status::OK();
}
auto rateLimitResult = _rateLimiter.tryAcquireToken();
if (MONGO_unlikely(rateLimitResult == admission::RateLimiter::kRejectedErrorCode)) {
auto tokenResult = _rateLimiter.acquireToken();
if (MONGO_likely(tokenResult.isOK())) {
if (!tokenResult.getValue().isReady()) {
// The rate limiter issued a non-ready DeferredToken, store it on the client to resolve
// later in the request pipeline.
dassert(!getDeferredAdmissionToken(client).has_value(),
"Client already has a deferred admission token");
getDeferredAdmissionToken(client).emplace(std::move(tokenResult.getValue()));
}
// The rate limiter's DeferredToken is ready now, the token was already consumed and stats
// were recorded. Dropping the DeferredToken here is intentional, the destructor is a no-op
// for ready DeferredTokens.
return Status::OK();
}
if (tokenResult.getStatus() == RateLimiter::kRejectedErrorCode) {
return Status{ErrorCodes::IngressRequestRateLimitExceeded,
"Request rejected: ingress request rate limit exceeded"};
}
return tokenResult.getStatus();
}
return rateLimitResult;
Status IngressRequestRateLimiter::waitForAdmission(OperationContext* opCtx,
bool isExemptFromAdmissionControl) {
auto deferredToken = std::exchange(getDeferredAdmissionToken(opCtx->getClient()), boost::none);
if (!deferredToken) {
return Status::OK();
}
if (isExemptFromAdmissionControl) {
std::move(*deferredToken).recordExemption();
return Status::OK();
}
WaitingForAdmissionGuard waitingGuard(&IngressAdmissionContext::get(opCtx),
opCtx->getServiceContext()->getTickSource());
return std::move(*deferredToken).get(opCtx);
}
void IngressRequestRateLimiter::clearDeferredAdmissionToken(Client* client) {
getDeferredAdmissionToken(client) = boost::none;
}
void IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
Client* client, RateLimiter::DeferredToken deferredToken) {
invariant(!getDeferredAdmissionToken(client).has_value(),
"Client already has a deferred admission token");
getDeferredAdmissionToken(client).emplace(std::move(deferredToken));
}
bool IngressRequestRateLimiter::hasDeferredAdmissionToken_forTest(Client* client) {
return getDeferredAdmissionToken(client).has_value();
}
// This function is only for testing, but the _forTest name append makes the module linter
@ -241,18 +292,22 @@ Status IngressRequestRateLimiter::onUpdateAdmissionBurstCapacitySecs(double burs
return Status::OK();
}
void IngressRequestRateLimiter::appendStats(BSONObjBuilder* bob) const {
// First we get the stats in a separate object in order to not mutate bob
auto rateLimiterBob = BSONObjBuilder{};
_rateLimiter.appendStats(&rateLimiterBob);
auto const rateLimiterStats = rateLimiterBob.obj();
// Then we copy elements one by one to avoid coping queueing stats
bob->append(rateLimiterStats.getField("rejectedAdmissions"));
bob->append(rateLimiterStats.getField("successfulAdmissions"));
bob->append(rateLimiterStats.getField("exemptedAdmissions"));
bob->append(rateLimiterStats.getField("attemptedAdmissions"));
bob->append(rateLimiterStats.getField("totalAvailableTokens"));
void IngressRequestRateLimiter::updateMaxQueueDepth(std::int64_t maxQueueDepth) {
_rateLimiter.setMaxQueueDepth(maxQueueDepth);
}
Status IngressRequestRateLimiter::onUpdateAdmissionMaxQueueDepth(std::int64_t maxQueueDepth) {
if (auto client = Client::getCurrent()) {
getIngressRequestRateLimiter(client->getServiceContext())
->updateMaxQueueDepth(maxQueueDepth);
}
return Status::OK();
}
void IngressRequestRateLimiter::appendStats(BSONObjBuilder* bob) const {
_rateLimiter.appendStats(bob);
}
} // namespace admission
} // namespace mongo

View File

@ -33,10 +33,12 @@
#include "mongo/db/admission/rate_limiter.h"
#include "mongo/util/modules.h"
#include <cstddef>
#include <cstdint>
#include <boost/optional.hpp>
namespace mongo {
namespace admission {
class MONGO_MOD_PUBLIC IngressRequestRateLimiter {
public:
@ -45,20 +47,43 @@ public:
* Returns the reference to IngressRequestRateLimiter associated with the operation's service
* context.
*/
static IngressRequestRateLimiter& get(ServiceContext* opCtx);
static IngressRequestRateLimiter& get(ServiceContext* svcCtx);
/**
* Attempt to receive admission into the system. If the current rate of request admissions has
* exceeded the configured rate limit and consumed the burst size, the operation will be
* rejected with an error in the SystemOverloaded category.
* Attempts to admit a request into the system. Returns an error status if the rate limit and
* burst capacity are exceeded AND the queue is at capacity (if configured).
*
* If an admission is queued a DeferredToken is stored on a client decoration which will be
* resolved later in the request pipeline.
*/
Status admitRequest(Client* client);
/**
* Waits for admission to be granted. If there is no deferred token then this is a no-op,
* otherwise it resolves the deferred token.
*
* If isExemptFromAdmissionControl is true, the deferred token is marked exempt instead of
* waited on, returning the borrowed token to the bucket and releasing its queue slot.
*
* Exemption is determined here (rather than in admitRequest) because it
* depends on command-level state that is not available at the point of message read. As a
* consequence, the exemption can only take effect for requests that successfully reserved a
* queue slot in admitRequest. A rate-exceeded request that was rejected up front because
* queueing was disabled or the queue was full will not reach this function. See README.md for
* the rationale.
*/
static Status waitForAdmission(OperationContext* opCtx, bool isExemptFromAdmissionControl);
/**
* Adjusts the refresh rate and burst capacity of the rate limiter.
*/
void updateRateParameters(double refreshRatePerSec, double burstCapacitySecs);
/**
* Sets the maximum number of requests that may be queued waiting for a token.
*/
void updateMaxQueueDepth(std::int64_t maxQueueDepth);
/**
* Called automatically when the value of the server parameter
* ingressRequestAdmissionRatePerSec changes value.
@ -71,6 +96,12 @@ public:
*/
MONGO_MOD_PRIVATE static Status onUpdateAdmissionBurstCapacitySecs(double burstCapacitySecs);
/**
* Called automatically when the value of the server parameter
* ingressRequestAdmissionMaxQueueDepth changes value.
*/
MONGO_MOD_PRIVATE static Status onUpdateAdmissionMaxQueueDepth(std::int64_t maxQueueDepth);
/**
* Reports the ingress admission rate limiter metrics.
*/
@ -86,8 +117,16 @@ public:
*/
static bool isAppNameExempted(Client* client);
/** Clears any pending deferred admission token stored on the client. */
static void clearDeferredAdmissionToken(Client* client);
/** Test-only helper to seed a pending DeferredToken on a client. */
static void setDeferredAdmissionToken_forTest(Client* client, RateLimiter::DeferredToken token);
/** Test-only helper to check if a client has a deferred admission token. */
static bool hasDeferredAdmissionToken_forTest(Client* client);
private:
admission::RateLimiter _rateLimiter;
RateLimiter _rateLimiter;
};
} // namespace admission
} // namespace mongo

View File

@ -28,7 +28,7 @@
global:
mod_visibility: public
cpp_namespace: "mongo"
cpp_namespace: "mongo::admission"
cpp_includes:
- "mongo/db/admission/ingress_request_rate_limiter.h"
- "mongo/db/server_options.h"
@ -87,3 +87,17 @@ server_parameters:
# Expects the payload to be an instance of `ApplicationExemptionListParameters`
override_set: true
redact: false
ingressRequestAdmissionMaxQueueDepth:
description: >-
The maximum number of requests that may be queued waiting for an admission token when
the ingress request rate limit is exceeded. Requests that arrive when the queue is at
capacity are rejected immediately. A value of 0 disables queueing (requests are always
rejected immediately when the rate limit is exceeded).
set_at: [startup, runtime]
cpp_varname: gIngressRequestAdmissionMaxQueueDepth
cpp_vartype: Atomic<long long>
on_update: IngressRequestRateLimiter::onUpdateAdmissionMaxQueueDepth
default: 0
validator: {gte: 0}
redact: false

View File

@ -59,7 +59,7 @@ public:
executionBuilder.done();
}
if (gIngressAdmissionControlEnabled.load() && role.has(ClusterRole::ShardServer)) {
if (gIngressAdmissionControlEnabled.load() || gFeatureFlagIngressRateLimiting.isEnabled()) {
BSONObjBuilder ingressBuilder(admissionBuilder.subobjStart("ingress"));
auto& controller = IngressAdmissionController::get(opCtx);
controller.appendStats(ingressBuilder);

View File

@ -168,6 +168,73 @@ private:
folly::TokenBucket _tokenBucket;
};
RateLimiter::DeferredToken::DeferredToken(RateLimiterPrivate* impl,
Milliseconds napTime,
double numTokens)
: _impl(impl), _napTime(napTime), _numTokens(numTokens) {}
RateLimiter::DeferredToken::~DeferredToken() {
if (!_impl || isReady()) {
return;
}
// Unconsumed non-ready deferred token: return the borrowed token and release the queue slot.
_impl->readScopedTokenBucket().returnTokens(_numTokens);
_impl->queued.fetchAndSubtract(1);
_impl->stats.removedFromQueue.incrementRelaxed();
}
Status RateLimiter::DeferredToken::get(OperationContext* opCtx) && {
invariant(_impl);
if (isReady()) {
// Token was immediately available. No need to update stats, they were already recorded in
// acquireToken().
_impl = nullptr;
return Status::OK();
}
auto* impl = std::exchange(_impl, nullptr);
ON_BLOCK_EXIT([impl] {
impl->stats.removedFromQueue.incrementRelaxed();
impl->queued.fetchAndSubtract(1);
});
Date_t deadline = opCtx->getServiceContext()->getPreciseClockSource()->now() + _napTime;
try {
LOGV2_DEBUG(10550200,
4,
"Going to sleep waiting for token acquisition",
"rateLimiterName"_attr = impl->name,
"napTimeMillis"_attr = _napTime.toString());
opCtx->sleepUntil(deadline);
} catch (const DBException& e) {
impl->stats.interruptedInQueue.incrementRelaxed();
LOGV2_DEBUG(10440800,
4,
"Interrupted while waiting in rate limiter queue",
"rateLimiterName"_attr = impl->name,
"exception"_attr = e.toString());
impl->readScopedTokenBucket().returnTokens(_numTokens);
return e.toStatus().withContext(fmt::format(
"Interrupted while waiting in rate limiter queue. rateLimiterName={}", impl->name));
}
impl->stats.successfulAdmissions.incrementRelaxed();
impl->stats.averageTimeQueuedMicros.addSample(
static_cast<double>(durationCount<Microseconds>(_napTime)));
return Status::OK();
}
void RateLimiter::DeferredToken::recordExemption() && {
invariant(_impl);
invariant(_napTime > Milliseconds{0}); // Exemptions are only supported for queued requests.
// This method only records the exemption, token/queue cleanup is covered by the destructor.
_impl->stats.exemptedAdmissions.incrementRelaxed();
}
RateLimiter::RateLimiter(double refreshRatePerSec,
double burstCapacitySecs,
int64_t maxQueueDepth,
@ -186,14 +253,22 @@ RateLimiter::RateLimiter(double refreshRatePerSec,
RateLimiter::~RateLimiter() = default;
Status RateLimiter::acquireToken(OperationContext* opCtx, double numTokensToConsume) {
StatusWith<RateLimiter::DeferredToken> RateLimiter::acquireToken(double numTokensToConsume) {
const bool hangInLimiter = hangInRateLimiter.shouldFail();
const auto maxQueueDepth = _impl->maxQueueDepth.loadRelaxed();
if (!hangInLimiter && (maxQueueDepth <= 0 || _impl->queued.load() >= maxQueueDepth)) {
// Queueing unavailable (disabled or currently full): use try-acquire semantics.
if (auto status = tryAcquireToken(numTokensToConsume); !status.isOK()) {
return status;
}
_impl->stats.averageTimeQueuedMicros.addSample(0);
return DeferredToken(_impl.get(), Milliseconds{0}, numTokensToConsume);
}
_impl->stats.attemptedAdmissions.incrementRelaxed();
// The consumeWithBorrowNonBlocking API consumes a token (possibly leading to a negative
// bucket balance), and returns how long the consumer should nap until their token
// reservation becomes valid.
double waitForTokenSecs;
if (MONGO_unlikely(hangInRateLimiter.shouldFail())) {
if (hangInLimiter) {
waitForTokenSecs = 60 * 60; // 1 hour
} else {
waitForTokenSecs =
@ -202,45 +277,30 @@ Status RateLimiter::acquireToken(OperationContext* opCtx, double numTokensToCons
.value_or(0);
}
if (auto napTime = doubleToMillis(waitForTokenSecs); napTime > Milliseconds{0}) {
// Calculate the deadline before incrementing the queued metric to ensure that unit tests
// don't advance the mock clock before the sleep deadline is calculated.
Date_t deadline = opCtx->getServiceContext()->getPreciseClockSource()->now() + napTime;
auto napTime = doubleToMillis(waitForTokenSecs);
if (napTime > Milliseconds{0}) {
// Token not immediately available: reserve a queue slot.
if (auto status = _impl->enqueue(); !status.isOK()) {
_impl->readScopedTokenBucket().returnTokens(numTokensToConsume);
_impl->stats.rejectedAdmissions.incrementRelaxed();
return status;
}
_impl->stats.addedToQueue.incrementRelaxed();
ON_BLOCK_EXIT([&] {
_impl->stats.removedFromQueue.incrementRelaxed();
_impl->queued.fetchAndSubtract(1);
});
try {
LOGV2_DEBUG(10550200,
4,
"Going to sleep waiting for token acquisition",
"rateLimiterName"_attr = _impl->name,
"napTimeMillis"_attr = napTime.toString());
opCtx->sleepUntil(deadline);
} catch (const DBException& e) {
_impl->stats.interruptedInQueue.incrementRelaxed();
LOGV2_DEBUG(10440800,
4,
"Interrupted while waiting in rate limiter queue",
"rateLimiterName"_attr = _impl->name,
"exception"_attr = e.toString());
_impl->readScopedTokenBucket().returnTokens(numTokensToConsume);
return e.toStatus().withContext(
fmt::format("Interrupted while waiting in rate limiter queue. rateLimiterName={}",
_impl->name));
}
return DeferredToken(_impl.get(), napTime, numTokensToConsume);
}
// Token immediately available.
_impl->stats.successfulAdmissions.incrementRelaxed();
_impl->stats.averageTimeQueuedMicros.addSample(waitForTokenSecs * 1'000'000);
_impl->stats.averageTimeQueuedMicros.addSample(0);
return DeferredToken(_impl.get(), Milliseconds{0}, numTokensToConsume);
}
return Status::OK();
Status RateLimiter::acquireToken(OperationContext* opCtx, double numTokensToConsume) {
auto tokenResult = acquireToken(numTokensToConsume);
if (!tokenResult.isOK()) {
return tokenResult.getStatus();
}
return std::move(tokenResult.getValue()).get(opCtx);
}
Status RateLimiter::tryAcquireToken(double numTokensToConsume) {
@ -314,4 +374,8 @@ int64_t RateLimiter::queued() const {
return _impl->queued.load();
}
int64_t RateLimiter::maxQueueDepth() const {
return _impl->maxQueueDepth.loadRelaxed();
}
} // namespace mongo::admission

View File

@ -30,8 +30,11 @@
#pragma once
#include "mongo/base/counter.h"
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/operation_context.h"
#include "mongo/util/duration.h"
#include "mongo/util/modules.h"
#include "mongo/util/moving_average.h"
#include "mongo/util/system_tick_source.h"
@ -44,7 +47,68 @@ namespace MONGO_MOD_PUBLIC admission {
* interruptibility, maximum queue depth, and metrics.
*/
class MONGO_MOD_PUBLIC RateLimiter {
class RateLimiterPrivate;
public:
/**
* A DeferredToken represents an atomically pre-reserved position in the rate limiter queue. It
* encapsulates logic for waiting until the reserved position becomes valid or for abandoning
* the reservation. See acquireToken() for details on how deferred tokens are issued and used.
*
* A DeferredToken must be consumed exactly once via get() before it is destroyed, unless
* recordExemption() is called first to mark the request as not subject to admission control.
*/
class MONGO_MOD_PUBLIC DeferredToken {
public:
DeferredToken(const DeferredToken&) = delete;
DeferredToken& operator=(const DeferredToken&) = delete;
DeferredToken& operator=(DeferredToken&&) = delete;
// The _impl member serves as the consumed/moved-from sentinel, it is nulled when ownership
// transfers out (move operation) or when the deferred token is redeemed (get). The default
// move constructor would copy the pointer without nulling the source, so we define it here
// to perform the necessary std::exchange.
DeferredToken(DeferredToken&& other) noexcept
: _impl(std::exchange(other._impl, nullptr)),
_napTime(other._napTime),
_numTokens(other._numTokens) {}
~DeferredToken();
/**
* Returns true if the token was immediately available when acquireToken() was called.
* For ready deferred tokens, get() returns without sleeping.
*/
bool isReady() const {
return _napTime == Milliseconds{0};
}
/**
* Waits until the pre-reserved token slot becomes valid, or until the opCtx is
* interrupted. For ready deferred tokens, this method returns immediately.
*
* Must be called exactly once, the deferred token is consumed on return.
*/
Status get(OperationContext* opCtx) &&;
/**
* Records that this request is not subject to admission control.
*
* This method is valid only for queued (non-ready) deferred tokens. It only records the
* exemption, token/queue cleanup is covered by the destructor.
*/
void recordExemption() &&;
private:
friend class RateLimiter;
DeferredToken(RateLimiterPrivate* impl, Milliseconds napTime, double numTokens);
RateLimiterPrivate* _impl{nullptr};
Milliseconds _napTime{0};
double _numTokens{1.0};
};
struct Stats {
/**
* addedToQueue is the count of acquireToken calls that involved entering a sleep.
@ -104,10 +168,19 @@ public:
~RateLimiter();
/**
* Acquire a token or block until one becomes available. Returns an error status if
* the operationContext is interrupted or the maxQueueDepth is exceeded.
* Atomically reserves a token position and returns a DeferredToken. The deferred token is
* either ready (the token was immediately available) or queued (the token was not immediately
* available and the caller must wait for the slot to become valid).
*
* Returns an error if the max queue depth is exceeded.
*/
Status acquireToken(OperationContext*, double numTokensToConsume = 1.0);
StatusWith<DeferredToken> acquireToken(double numTokensToConsume = 1.0);
/**
* Convenience method that acquires a token and blocks until it is ready. This is equivalent to
* calling acquireToken() and then get(opCtx) on the returned deferred token.
*/
Status acquireToken(OperationContext* opCtx, double numTokensToConsume = 1.0);
/**
* Attempts to acquire a token without queuing. Returns an error status if the rate limit
@ -160,8 +233,10 @@ public:
/** Returns the number of sessions that are sleeping in acquireToken(...). **/
int64_t queued() const;
/** Returns the configured maximum number of sessions that may sleep in acquireToken(...). **/
int64_t maxQueueDepth() const;
private:
class RateLimiterPrivate;
std::unique_ptr<RateLimiterPrivate> _impl;
};
} // namespace MONGO_MOD_PUBLIC admission

View File

@ -220,6 +220,25 @@ TEST_F(RateLimiterWithMockClockTest, QueueingDisabled) {
});
}
// Verify that a negative max queue depth takes the try-acquire path: requests that would queue are
// rejected immediately.
TEST_F(RateLimiterWithMockClockTest, NegativeMaxQueueDepthDisablesQueueing) {
RateLimiter rateLimiter = makeRateLimiter("NegativeMaxQueueDepthDisablesQueueing",
/*refreshRate=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/-1);
auto opCtx = makeOperationContext();
// Consume the initial burst token so the next request must queue.
ASSERT_OK(rateLimiter.acquireToken(opCtx.get()));
auto tokenResult = rateLimiter.acquireToken();
ASSERT_EQ(tokenResult.getStatus(), Status(RateLimiter::kRejectedErrorCode, ""));
ASSERT_EQ(rateLimiter.stats().addedToQueue.get(), 0);
ASSERT_EQ(rateLimiter.stats().rejectedAdmissions.get(), 1);
ASSERT_EQ(rateLimiter.queued(), 0);
}
// Verify that if a client disconnects while their session thread is asleep in the rate limiter,
// the rate limiter wakes up the thread and returns the appropriate error status.
TEST_F(RateLimiterWithMockClockTest, InterruptedDueToOperationKilled) {
@ -560,6 +579,123 @@ TEST_F(RateLimiterWithMockClockTest, InterruptedDueToOperationDeadline) {
});
}
// Verify that calling recordExemption() on a non-ready DeferredToken records the exemption stat,
// and that DeferredToken destruction then returns the borrowed token and releases the queue slot.
TEST_F(RateLimiterWithMockClockTest, DeferredTokenRecordExemptionAndReleaseQueueSlot) {
RateLimiter rateLimiter = makeRateLimiter("DeferredTokenRecordExemptionAndReleaseQueueSlot");
auto opCtx = makeOperationContext();
// Exhaust the burst so the next token must queue.
ASSERT_OK(rateLimiter.acquireToken(opCtx.get()));
ASSERT_EQ(rateLimiter.tokenBalance(), 0);
{
auto tokenResult = rateLimiter.acquireToken();
ASSERT(tokenResult.isOK());
auto deferredToken = std::move(tokenResult.getValue());
ASSERT_FALSE(deferredToken.isReady());
// Token is borrowed (bucket goes negative).
ASSERT_EQ(rateLimiter.tokenBalance(), -1);
ASSERT_EQ(rateLimiter.queued(), 1);
ASSERT_EQ(rateLimiter.stats().addedToQueue.get(), 1);
// Mark as exempt: records the stat, queued-token cleanup runs in the DeferredToken
// destructor.
std::move(deferredToken).recordExemption();
ASSERT_EQ(rateLimiter.stats().exemptedAdmissions.get(), 1);
}
ASSERT_EQ(rateLimiter.tokenBalance(), 0);
ASSERT_EQ(rateLimiter.queued(), 0);
ASSERT_EQ(rateLimiter.stats().removedFromQueue.get(), 1);
// The exempted DeferredToken was never admitted successfully.
ASSERT_EQ(rateLimiter.stats().successfulAdmissions.get(), 1);
}
// Verify that dropping a non-ready DeferredToken without consuming it causes the destructor
// to return the borrowed token and release the queue slot, preventing resource leaks.
TEST_F(RateLimiterWithMockClockTest, DeferredTokenDestructorCleansUpDroppedNonReadyToken) {
RateLimiter rateLimiter =
makeRateLimiter("DeferredTokenDestructorCleansUpDroppedNonReadyToken");
auto opCtx = makeOperationContext();
ASSERT_OK(rateLimiter.acquireToken(opCtx.get()));
ASSERT_EQ(rateLimiter.tokenBalance(), 0);
{
auto tokenResult = rateLimiter.acquireToken();
ASSERT(tokenResult.isOK());
auto deferredToken = std::move(tokenResult.getValue());
ASSERT_FALSE(deferredToken.isReady());
ASSERT_EQ(rateLimiter.tokenBalance(), -1);
ASSERT_EQ(rateLimiter.queued(), 1);
// Drop DeferredToken without calling get() or recordExemption().
}
ASSERT_EQ(rateLimiter.tokenBalance(), 0);
ASSERT_EQ(rateLimiter.queued(), 0);
ASSERT_EQ(rateLimiter.stats().addedToQueue.get(), 1);
ASSERT_EQ(rateLimiter.stats().removedFromQueue.get(), 1);
ASSERT_EQ(rateLimiter.stats().successfulAdmissions.get(), 1);
}
// Verify that dropping a ready DeferredToken without calling get() is a no-op: the token was
// already
// permanently consumed by acquireToken() and successfulAdmissions was already recorded.
TEST_F(RateLimiterWithMockClockTest, DeferredTokenReadyDestructorIsNoOp) {
RateLimiter rateLimiter = makeRateLimiter("DeferredTokenReadyDestructorIsNoOp");
const double initialBalance = rateLimiter.tokenBalance();
{
auto tokenResult = rateLimiter.acquireToken();
ASSERT(tokenResult.isOK());
auto deferredToken = std::move(tokenResult.getValue());
ASSERT_TRUE(deferredToken.isReady());
// For ready DeferredTokens, acquireToken() already incremented successfulAdmissions.
ASSERT_EQ(rateLimiter.stats().successfulAdmissions.get(), 1);
ASSERT_EQ(rateLimiter.tokenBalance(), initialBalance - 1);
// Destructor runs here — should be a no-op for ready DeferredTokens.
}
// Token remains consumed; no return to bucket.
ASSERT_EQ(rateLimiter.tokenBalance(), initialBalance - 1);
ASSERT_EQ(rateLimiter.stats().addedToQueue.get(), 0);
ASSERT_EQ(rateLimiter.stats().removedFromQueue.get(), 0);
}
// Verify that moving a DeferredToken nulls the source (making its destructor a no-op) and
// that exactly one cleanup occurs when the destination goes out of scope.
TEST_F(RateLimiterWithMockClockTest, DeferredTokenMoveSemantics) {
RateLimiter rateLimiter = makeRateLimiter("DeferredTokenMoveSemantics");
auto opCtx = makeOperationContext();
// Exhaust burst so the next token queues.
ASSERT_OK(rateLimiter.acquireToken(opCtx.get()));
auto tokenResult = rateLimiter.acquireToken();
ASSERT(tokenResult.isOK());
{
auto deferredToken1 = std::move(tokenResult.getValue());
ASSERT_FALSE(deferredToken1.isReady());
ASSERT_EQ(rateLimiter.queued(), 1);
// Move-construct into deferredToken2.
auto deferredToken2 = std::move(deferredToken1);
ASSERT_FALSE(deferredToken2.isReady());
// Queue count is unchanged; the move transferred ownership, not the slot.
ASSERT_EQ(rateLimiter.queued(), 1);
// deferredToken1 destructs (moved-from, _impl is null — no-op).
// deferredToken2 destructs (active — returns token and releases queue slot).
}
ASSERT_EQ(rateLimiter.queued(), 0);
ASSERT_EQ(rateLimiter.tokenBalance(), 0);
ASSERT_EQ(rateLimiter.stats().addedToQueue.get(), 1);
ASSERT_EQ(rateLimiter.stats().removedFromQueue.get(), 1);
}
TEST_F(RateLimiterWithMockClockTest, ReturnTokens) {
RateLimiter rateLimiter = makeRateLimiter("RateLimiterReturnTokens",
/*refreshRate=*/100,

View File

@ -91,7 +91,7 @@ public:
void appendIngressRequestRateLimiterStats(BSONObjBuilder* b, ServiceContext* service) const {
auto ingressRequestRateLimiterBuilder =
BSONObjBuilder{b->subobjStart("ingressRequestRateLimiter")};
const auto& ingressRequestRateLimiter = IngressRequestRateLimiter::get(service);
const auto& ingressRequestRateLimiter = admission::IngressRequestRateLimiter::get(service);
ingressRequestRateLimiter.appendStats(&ingressRequestRateLimiterBuilder);
ingressRequestRateLimiterBuilder.done();
}

View File

@ -190,9 +190,12 @@ bool ErrorLabelBuilder::isResumableChangeStreamError() const {
}
bool ErrorLabelBuilder::isOperationIdempotent() const {
// TODO: SERVER-108898 When OperationContext support marking an operation as idempotent, check
// for idempotency to apply the error label
return false;
// TODO(SERVER-108898): When OperationContext support marking an operation as idempotent, check
// for idempotency to apply the error label.
// Ingress request rate-limiter rejection happens before command execution. The operation can
// always be retried safely because no command side effects occurred.
return _code && *_code == ErrorCodes::IngressRequestRateLimitExceeded;
}
bool ErrorLabelBuilder::isErrorWithNoWritesPerformed() const {
@ -227,8 +230,11 @@ void ErrorLabelBuilder::build(BSONArrayBuilder& labels) const {
labels << ErrorLabel::kNoWritesPerformed;
}
} else if (isOperationIdempotent()) {
// TODO SERVER-108898: apply the NoWritesPerformed error label here too, if appropriate.
labels << ErrorLabel::kRetryableError;
// TODO(SERVER-108898): Today `isOperationIdempotent` only checks for
// IngressRequestRateLimitExceeded, which means no writes were ever performed.
// Additional logic will be needed if we add additional conditions.
labels << ErrorLabel::kNoWritesPerformed;
}
}

View File

@ -40,6 +40,7 @@
#include "mongo/db/admission/ingress_admission_control_gen.h"
#include "mongo/db/admission/ingress_admission_controller.h"
#include "mongo/db/admission/ingress_request_rate_limiter.h"
#include "mongo/db/admission/ingress_request_rate_limiter_gen.h"
#include "mongo/db/admission/ticketing/admission_context.h"
#include "mongo/db/admission/ticketing/ticketholder.h"
#include "mongo/db/api_parameters.h"
@ -1880,13 +1881,20 @@ void ExecCommandDatabase::_initiateCommand() {
// Respect the ingressRequestRateLimiterApplicationExemptions list so that internal
// clients are exempt from the failpoint.
if (IngressRequestRateLimiter::isAppNameExempted(opCtx->getClient())) {
if (admission::IngressRequestRateLimiter::isAppNameExempted(opCtx->getClient())) {
return false;
}
return true;
});
if (gFeatureFlagIngressRateLimiting.isEnabled()) {
const bool exemptFromIngressRateLimit =
isExemptFromAdmissionControl || !admission::gIngressRequestRateLimiterEnabled.load();
uassertStatusOK(admission::IngressRequestRateLimiter::waitForAdmission(
opCtx, exemptFromIngressRateLimit));
}
if (gIngressAdmissionControlEnabled.load()) {
// The way ingress admission works, one ticket should cover all the work for the operation.
// Therefore, if the operation has already been admitted by IngressAdmissionController, all

View File

@ -32,6 +32,9 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/admission/ingress_admission_context.h"
#include "mongo/db/admission/ingress_request_rate_limiter.h"
#include "mongo/db/admission/rate_limiter.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/operation_context.h"
@ -40,18 +43,37 @@
#include "mongo/db/rss/replicated_storage_service.h"
#include "mongo/db/service_context.h"
#include "mongo/db/topology/cluster_role.h"
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/rpc/message.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_entry_point_test_fixture.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/duration.h"
#include "mongo/util/tick_source_mock.h"
namespace mongo {
namespace mongo::admission {
namespace {
MONGO_REGISTER_COMMAND(TestCmdProcessInternalCommand).testOnly().forShard();
MONGO_REGISTER_COMMAND(TestCmdProcessInternalSucceedCommand).testOnly().forShard();
class TestCmdShardIngressSubject final : public TestCmdBase {
public:
static constexpr auto kCommandName = "testShardIngressSubject";
TestCmdShardIngressSubject() : TestCmdBase(kCommandName) {}
bool isSubjectToIngressAdmissionControl() const override {
return true;
}
bool runWithBuilderOnly(BSONObjBuilder&) override {
return true;
}
};
MONGO_REGISTER_COMMAND(TestCmdShardIngressSubject).testOnly().forShard();
class ServiceEntryPointShardRoleTest : public ServiceEntryPointTestFixture {
public:
void setUp() override {
@ -284,6 +306,211 @@ TEST_F(ServiceEntryPointShardServerTest, TestWriteConcernClientUnspecifiedWithDe
testWriteConcernClientUnspecifiedWithDefault();
}
TEST_F(ServiceEntryPointShardServerTest, WaitForAdmissionResolvesDeferredTokenBeforeInvocation) {
RAIIServerParameterControllerForTest rateLimiterEnabled{"ingressRequestRateLimiterEnabled",
true};
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/2,
"WaitForAdmissionResolvesDeferredTokenBeforeInvocation");
auto opCtx = makeOperationContext();
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
opCtx->getClient(), std::move(queuedTokenResult.getValue()));
// Mark this client as a direct client so that waitForAdmission exempts it rather than sleeping
// on the mock clock. We are just testing that waitForAdmission resolves the deferred token and
// allows the command to proceed, not that it waits for the token to be available.
opCtx->getClient()->setInDirectClient(true);
runCommandTestWithResponse(BSON("ping" << 1), opCtx.get(), Status::OK());
ASSERT_EQ(limiterForDeferredToken.queued(), 0);
ASSERT_EQ(limiterForDeferredToken.stats().exemptedAdmissions.get(), 1);
}
TEST_F(ServiceEntryPointShardServerTest,
PendingIngressDeferredTokenIsConsumedWhenRateLimitingDisabled) {
RAIIServerParameterControllerForTest rateLimiterEnabled{"ingressRequestRateLimiterEnabled",
false};
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/1,
"PendingIngressDeferredTokenIsConsumedWhenRateLimitingDisabled");
auto opCtx = makeOperationContext();
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
opCtx->getClient(), std::move(queuedTokenResult.getValue()));
auto msg = constructMessage(BSON(TestCmdSucceeds::kCommandName << 1), opCtx.get());
ASSERT_OK(handleRequest(msg, opCtx.get()));
ASSERT_EQ(limiterForDeferredToken.stats().addedToQueue.get(), 1);
ASSERT_EQ(limiterForDeferredToken.stats().removedFromQueue.get(), 1);
ASSERT_EQ(limiterForDeferredToken.stats().exemptedAdmissions.get(), 1);
ASSERT_EQ(limiterForDeferredToken.queued(), 0);
}
TEST_F(ServiceEntryPointShardServerTest, QueuedAdmissionInterrupted) {
RAIIServerParameterControllerForTest rateLimiterEnabled{"ingressRequestRateLimiterEnabled",
true};
auto opCtx = makeOperationContext();
auto* client = opCtx->getClient();
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/2,
"QueuedAdmissionInterrupted");
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
client, std::move(queuedTokenResult.getValue()));
// handleRequest blocks in waitForAdmission while the queued token's napTime elapses. A
// background thread waits until the opCtx is blocking there, then kills it to trigger
// Interrupted.
auto msg = constructMessage(BSON(TestCmdShardIngressSubject::kCommandName << 1), opCtx.get());
stdx::thread interrupter([&] {
while (!opCtx->isWaitingForConditionOrInterrupt()) {
sleepmillis(1);
}
opCtx->markKilled(ErrorCodes::Interrupted);
});
auto swDbResponse = handleRequest(msg, opCtx.get());
interrupter.join();
ASSERT_OK(swDbResponse);
const auto response = dbResponseToBSON(swDbResponse.getValue());
const auto status = getStatusFromCommandResult(response);
ASSERT_EQ(status.code(), ErrorCodes::Interrupted);
ASSERT_EQ(limiterForDeferredToken.stats().interruptedInQueue.get(), 1);
}
TEST_F(ServiceEntryPointShardServerTest, QueuedAdmissionRespectsMaxTimeMS) {
gFeatureFlagIngressRateLimiting.setForServerParameter(true);
RAIIServerParameterControllerForTest requestLimiterEnabled{"ingressRequestRateLimiterEnabled",
true};
auto* clockSource =
static_cast<ClockSourceMock*>(getGlobalServiceContext()->getFastClockSource());
auto* tickSource =
static_cast<TickSourceMock<Milliseconds>*>(getGlobalServiceContext()->getTickSource());
auto opCtx = makeOperationContext();
auto* client = opCtx->getClient();
// ServiceEntryPointShardRole has a contract guard ensuring presence of an AuthorizationSession.
AuthorizationSession::get(client);
// Create the limiter with the mock tick source so clock advancement controls token
// availability.
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/2,
"QueuedAdmissionRespectsMaxTimeMS",
tickSource);
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
client, std::move(queuedTokenResult.getValue()));
// handleRequest parses maxTimeMS from the message and sets the opCtx deadline. A background
// thread waits until the opCtx is blocking in waitForAdmission, then advances the mock clock
// past the 5ms deadline (well under the ~1000ms napTime) to trigger MaxTimeMSExpired.
stdx::thread clockAdvancer([&] {
while (!opCtx->isWaitingForConditionOrInterrupt()) {
sleepmillis(1);
}
clockSource->advance(Milliseconds(6));
tickSource->advance(Milliseconds(6));
});
auto msg = constructMessage(BSON(TestCmdShardIngressSubject::kCommandName
<< 1 << GenericArguments::kMaxTimeMSFieldName << 5),
opCtx.get());
auto swDbResponse = handleRequest(msg, opCtx.get());
clockAdvancer.join();
ASSERT_OK(swDbResponse);
const auto response = dbResponseToBSON(swDbResponse.getValue());
const auto status = getStatusFromCommandResult(response);
ASSERT_EQ(status.code(), ErrorCodes::MaxTimeMSExpired);
ASSERT_EQ(limiterForDeferredToken.stats().interruptedInQueue.get(), 1);
}
TEST_F(ServiceEntryPointShardServerTest, QueuedAdmissionWithLargeMaxTimeMSSucceeds) {
gFeatureFlagIngressRateLimiting.setForServerParameter(true);
RAIIServerParameterControllerForTest requestLimiterEnabled{"ingressRequestRateLimiterEnabled",
true};
auto* clockSource =
static_cast<ClockSourceMock*>(getGlobalServiceContext()->getFastClockSource());
auto* tickSource =
static_cast<TickSourceMock<Milliseconds>*>(getGlobalServiceContext()->getTickSource());
auto opCtx = makeOperationContext();
auto* client = opCtx->getClient();
// ServiceEntryPointShardRole has a contract guard ensuring presence of an AuthorizationSession.
AuthorizationSession::get(client);
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/2,
"QueuedAdmissionWithLargeMaxTimeMSSucceeds",
tickSource);
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
client, std::move(queuedTokenResult.getValue()));
// handleRequest will block in waitForAdmission while the queued token's napTime (~1000ms at
// 1 token/sec) elapses. A background thread waits until the opCtx is blocking, then advances
// the mock clock past the napTime to release the token and let the command succeed.
stdx::thread clockAdvancer([&] {
while (!opCtx->isWaitingForConditionOrInterrupt()) {
sleepmillis(1);
}
clockSource->advance(Milliseconds(1001));
tickSource->advance(Milliseconds(1001));
});
auto msg = constructMessage(BSON(TestCmdShardIngressSubject::kCommandName
<< 1 << GenericArguments::kMaxTimeMSFieldName << (60 * 1000)),
opCtx.get());
auto swDbResponse = handleRequest(msg, opCtx.get());
clockAdvancer.join();
ASSERT_OK(swDbResponse);
ASSERT_EQ(getStatusFromCommandResult(dbResponseToBSON(swDbResponse.getValue())), Status::OK());
ASSERT_EQ(limiterForDeferredToken.stats().successfulAdmissions.get(), 2);
}
#ifdef MONGO_CONFIG_OTEL
TEST_F(ServiceEntryPointShardServerTest, TelemetryContextDeserializedFromRequest) {
testTelemetryContextDeserializedFromRequest();
@ -419,4 +646,4 @@ TEST_F(ServiceEntryPointReplicaSetTest, TelemetryContextNotSetWhenNotInRequest)
#endif
} // namespace
} // namespace mongo
} // namespace mongo::admission

View File

@ -96,12 +96,16 @@ public:
getServer(), DatabaseName::kAdmin, std::move(cmd), BSONObj(), nullptr, timeout);
}
void enableRateLimiter(int maxQueueDepth) {
void enableRateLimiter(int maxQueueDepth, double burstCapacitySecs = 1.0) {
// TODO(SERVER-125073): Remove `ingressRequestRateLimiterEnabled:false` once we resolve how
// to hang specific rate limiters.
runSetupCommandSync(
DatabaseName::kAdmin,
BSON("setParameter" << 1 << "ingressConnectionEstablishmentRateLimiterEnabled" << true
<< "ingressRequestRateLimiterEnabled" << false
<< "ingressConnectionEstablishmentRatePerSec" << 1
<< "ingressConnectionEstablishmentBurstCapacitySecs" << 1
<< "ingressConnectionEstablishmentBurstCapacitySecs"
<< burstCapacitySecs
<< "ingressConnectionEstablishmentMaxQueueDepth" << maxQueueDepth));
}
@ -224,6 +228,15 @@ public:
context);
}
void assertPoolHasNoEstablishedConnection(StringData context) {
assertConnectionStats(
getFactory(),
getServer(),
[](const ConnectionStatsPer& s) { return s.inUse + s.available + s.leased == 0; },
[](const GRPCConnectionStats&) { return false; },
context);
}
void assertAllRequestsSucceeded(std::vector<Future<RemoteCommandResponse>>& futures) {
for (size_t i = 0; i < futures.size(); ++i) {
auto res = futures[i].getNoThrow(interruptible());
@ -442,8 +455,12 @@ TEST_F(EgressPoolRateLimiterResilienceTest, RejectionWithNoEstablishedConnection
auto baseline = getRateLimiterStats();
auto baselineRejected = baseline.isEmpty() ? 0 : baseline["rejected"].numberLong();
enableRateLimiter(/*maxQueueDepth=*/1);
// Keep initial token balance below one token so the first request cannot race through and
// establish a connection before rejections propagate.
enableRateLimiter(/*maxQueueDepth=*/1, /*burstCapacitySecs=*/0.1);
auto hangFP = configureFailPoint("hangInRateLimiter", BSONObj());
assertPoolHasNoEstablishedConnection(
"Pool must have zero established connections before no-established rejection test");
auto futures = sendPings(6, Seconds{30});
@ -467,8 +484,11 @@ TEST_F(EgressPoolRateLimiterResilienceTest, TimeoutWithNoEstablishedConnection)
auto baselineInterrupted =
baseline.isEmpty() ? 0 : baseline["interruptedDueToClientDisconnect"].numberLong();
enableRateLimiter(/*maxQueueDepth=*/10);
// Keep initial token balance below one token so requests have to queue in this test.
enableRateLimiter(/*maxQueueDepth=*/10, /*burstCapacitySecs=*/0.1);
auto hangFP = configureFailPoint("hangInRateLimiter", BSONObj());
assertPoolHasNoEstablishedConnection(
"Pool must have zero established connections before no-established timeout test");
auto futures = sendPings(3, Seconds{30});

View File

@ -39,6 +39,8 @@
#include "mongo/bson/bsontypes.h"
#include "mongo/bson/timestamp.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/admission/ingress_request_rate_limiter.h"
#include "mongo/db/admission/ingress_request_rate_limiter_gen.h"
#include "mongo/db/api_parameters.h"
#include "mongo/db/auth/cluster_umc_error_with_write_concern_error_info.h"
#include "mongo/db/client.h"
@ -401,6 +403,7 @@ public:
private:
Status _setup();
void _maybeWaitForAdmission();
ParseAndRunCommand* const _parc;
@ -609,6 +612,24 @@ void ParseAndRunCommand::_parseCommand() {
}
}
void ParseAndRunCommand::RunInvocation::_maybeWaitForAdmission() {
const auto opCtx = _parc->_rec->getOpCtx();
if (!gFeatureFlagIngressRateLimiting.isEnabled()) {
return;
}
// Deferred admission must run after maxTimeMS is applied to the opCtx so that queued requests
// respect the deadline.
// NOTE: All cluster commands are subject to admission control at this time.
const auto isProcessInternalCommand = isProcessInternalClient(*opCtx->getClient());
// TODO(SERVER-125863): Integrate whether the cluster command is subject to admission control
// here once we audit the list.
const bool exemptFromIngressRateLimit =
isProcessInternalCommand || !admission::gIngressRequestRateLimiterEnabled.load();
uassertStatusOK(
admission::IngressRequestRateLimiter::waitForAdmission(opCtx, exemptFromIngressRateLimit));
}
bool isInternalClient(OperationContext* opCtx) {
return opCtx->getClient()->session() && opCtx->getClient()->isInternalClient();
}
@ -632,6 +653,8 @@ Status ParseAndRunCommand::RunInvocation::_setup() {
opCtx->setUsesDefaultMaxTimeMS(usesDefaultMaxTimeMS);
}
_maybeWaitForAdmission();
if (MONGO_unlikely(
hangBeforeCheckingMongosShutdownInterrupt.shouldFail([&](const BSONObj& data) {
if (data.hasField("cmdName") && data.hasField("ns")) {
@ -1141,6 +1164,7 @@ void ParseAndRunCommand::RunInvocation::run() {
void ParseAndRunCommand::run() {
try {
_parseCommand();
RunInvocation runner(this);
runner.run();
} catch (const DBException& ex) {

View File

@ -30,15 +30,38 @@
#include "mongo/s/service_entry_point_router_role.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/admission/ingress_request_rate_limiter.h"
#include "mongo/db/admission/rate_limiter.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_entry_point_test_fixture.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/tick_source_mock.h"
namespace mongo {
namespace {
using namespace admission;
class TestCmdRouterIngressSubject final : public TestCmdBase {
public:
static constexpr auto kCommandName = "testRouterIngressSubject";
TestCmdRouterIngressSubject() : TestCmdBase(kCommandName) {}
bool isSubjectToIngressAdmissionControl() const override {
return true;
}
bool runWithBuilderOnly(BSONObjBuilder&) override {
return true;
}
};
MONGO_REGISTER_COMMAND(TestCmdRouterIngressSubject).testOnly().forRouter();
class ServiceEntryPointRouterRoleTest : public virtual service_context_test::RouterRoleOverride,
public ServiceEntryPointTestFixture {
public:
@ -81,6 +104,201 @@ TEST_F(ServiceEntryPointRouterRoleTest, TestCommandFailsRunInvocationWithExcepti
testCommandFailsRunInvocationWithException("Exception thrown while processing command");
}
TEST_F(ServiceEntryPointRouterRoleTest, WaitForAdmissionResolvesDeferredTokenBeforeInvocation) {
RAIIServerParameterControllerForTest requestLimiterEnabled{"ingressRequestRateLimiterEnabled",
true};
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/2,
"WaitForAdmissionResolvesDeferredTokenBeforeInvocation");
auto opCtx = makeOperationContext();
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
opCtx->getClient(), std::move(queuedTokenResult.getValue()));
// Mark this client as a direct client so that waitForAdmission exempts it rather than sleeping
// on the mock clock. We are just testing that waitForAdmission resolves the deferred token and
// allows the command to proceed, not that it waits for the token to be available.
opCtx->getClient()->setInDirectClient(true);
runCommandTestWithResponse(BSON("ping" << 1), opCtx.get(), Status::OK());
ASSERT_EQ(limiterForDeferredToken.queued(), 0);
ASSERT_EQ(limiterForDeferredToken.stats().exemptedAdmissions.get(), 1);
}
TEST_F(ServiceEntryPointRouterRoleTest,
PendingIngressDeferredTokenIsConsumedWhenRateLimitingDisabled) {
RAIIServerParameterControllerForTest requestLimiterEnabled{"ingressRequestRateLimiterEnabled",
false};
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/1,
"PendingIngressDeferredTokenIsConsumedWhenRateLimitingDisabled");
auto opCtx = makeOperationContext();
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
opCtx->getClient(), std::move(queuedTokenResult.getValue()));
auto msg = constructMessage(BSON(TestCmdSucceeds::kCommandName << 1), opCtx.get());
ASSERT_OK(handleRequest(msg, opCtx.get()));
ASSERT_EQ(limiterForDeferredToken.stats().addedToQueue.get(), 1);
ASSERT_EQ(limiterForDeferredToken.stats().removedFromQueue.get(), 1);
ASSERT_EQ(limiterForDeferredToken.stats().exemptedAdmissions.get(), 1);
ASSERT_EQ(limiterForDeferredToken.queued(), 0);
}
TEST_F(ServiceEntryPointRouterRoleTest, QueuedAdmissionInterrupted) {
gFeatureFlagIngressRateLimiting.setForServerParameter(true);
RAIIServerParameterControllerForTest requestLimiterEnabled{"ingressRequestRateLimiterEnabled",
true};
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/2,
"QueuedAdmissionInterrupted");
auto opCtx = makeOperationContext();
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
opCtx->getClient(), std::move(queuedTokenResult.getValue()));
// handleRequest blocks in waitForAdmission while the queued token's napTime elapses. A
// background thread waits until the opCtx is blocking there, then kills it to trigger
// Interrupted.
auto msg = constructMessage(BSON(TestCmdRouterIngressSubject::kCommandName << 1), opCtx.get());
stdx::thread interrupter([&] {
while (!opCtx->isWaitingForConditionOrInterrupt()) {
sleepmillis(1);
}
opCtx->markKilled(ErrorCodes::Interrupted);
});
auto swDbResponse = handleRequest(msg, opCtx.get());
interrupter.join();
ASSERT_OK(swDbResponse);
auto response = dbResponseToBSON(swDbResponse.getValue());
auto status = getStatusFromCommandResult(response);
ASSERT_EQ(status.code(), ErrorCodes::Interrupted);
ASSERT_EQ(limiterForDeferredToken.stats().interruptedInQueue.get(), 1);
}
TEST_F(ServiceEntryPointRouterRoleTest, QueuedAdmissionRespectsMaxTimeMS) {
gFeatureFlagIngressRateLimiting.setForServerParameter(true);
RAIIServerParameterControllerForTest requestLimiterEnabled{"ingressRequestRateLimiterEnabled",
true};
auto* clockSource =
static_cast<ClockSourceMock*>(getGlobalServiceContext()->getFastClockSource());
auto* tickSource =
static_cast<TickSourceMock<Milliseconds>*>(getGlobalServiceContext()->getTickSource());
// Create the limiter with the mock tick source so clock advancement controls token
// availability.
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/2,
"QueuedAdmissionRespectsMaxTimeMS",
tickSource);
auto opCtx = makeOperationContext();
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
opCtx->getClient(), std::move(queuedTokenResult.getValue()));
// handleRequest parses maxTimeMS from the message and sets the opCtx deadline before calling
// waitForAdmission. A background thread waits until the opCtx is blocking in waitForAdmission,
// then advances the mock clock past the 5ms deadline (well under the ~1000ms napTime) to
// trigger MaxTimeMSExpired.
stdx::thread clockAdvancer([&] {
while (!opCtx->isWaitingForConditionOrInterrupt()) {
sleepmillis(1);
}
clockSource->advance(Milliseconds(6));
tickSource->advance(Milliseconds(6));
});
auto msg = constructMessage(BSON(TestCmdRouterIngressSubject::kCommandName
<< 1 << GenericArguments::kMaxTimeMSFieldName << 5),
opCtx.get());
auto swDbResponse = handleRequest(msg, opCtx.get());
clockAdvancer.join();
ASSERT_OK(swDbResponse);
const auto response = dbResponseToBSON(swDbResponse.getValue());
const auto status = getStatusFromCommandResult(response);
ASSERT_EQ(status.code(), ErrorCodes::MaxTimeMSExpired);
ASSERT_EQ(limiterForDeferredToken.stats().interruptedInQueue.get(), 1);
}
TEST_F(ServiceEntryPointRouterRoleTest, QueuedAdmissionWithLargeMaxTimeMSSucceeds) {
gFeatureFlagIngressRateLimiting.setForServerParameter(true);
RAIIServerParameterControllerForTest requestLimiterEnabled{"ingressRequestRateLimiterEnabled",
true};
auto* clockSource =
static_cast<ClockSourceMock*>(getGlobalServiceContext()->getFastClockSource());
auto* tickSource =
static_cast<TickSourceMock<Milliseconds>*>(getGlobalServiceContext()->getTickSource());
// Create the limiter with the mock tick source so clock advancement controls token
// availability.
RateLimiter limiterForDeferredToken(
/*refreshRatePerSec=*/1.0,
/*burstCapacitySecs=*/1.0,
/*maxQueueDepth=*/2,
"QueuedAdmissionWithLargeMaxTimeMSSucceeds",
tickSource);
auto opCtx = makeOperationContext();
ASSERT_OK(limiterForDeferredToken.acquireToken(opCtx.get()));
auto queuedTokenResult = limiterForDeferredToken.acquireToken();
ASSERT_OK(queuedTokenResult.getStatus());
ASSERT_FALSE(queuedTokenResult.getValue().isReady());
IngressRequestRateLimiter::setDeferredAdmissionToken_forTest(
opCtx->getClient(), std::move(queuedTokenResult.getValue()));
// handleRequest will block in waitForAdmission while the queued token's napTime (~1000ms at
// 1 token/sec) elapses. A background thread waits until the opCtx is blocking, then advances
// the mock clock past the napTime to release the token and let the command succeed.
stdx::thread clockAdvancer([&] {
while (!opCtx->isWaitingForConditionOrInterrupt()) {
sleepmillis(1);
}
clockSource->advance(Milliseconds(1001));
tickSource->advance(Milliseconds(1001));
});
auto msg = constructMessage(BSON(TestCmdRouterIngressSubject::kCommandName
<< 1 << GenericArguments::kMaxTimeMSFieldName << (60 * 1000)),
opCtx.get());
auto swDbResponse = handleRequest(msg, opCtx.get());
clockAdvancer.join();
ASSERT_OK(swDbResponse);
ASSERT_EQ(getStatusFromCommandResult(dbResponseToBSON(swDbResponse.getValue())), Status::OK());
ASSERT_EQ(limiterForDeferredToken.stats().successfulAdmissions.get(), 2);
}
TEST_F(ServiceEntryPointRouterRoleTest, HandleRequestException) {
testHandleRequestException(5745706);
}

View File

@ -487,13 +487,16 @@ private:
};
struct IterationFrame {
explicit IterationFrame(const Impl& impl) : metrics{} {
explicit IterationFrame(const Impl& impl) : client(impl.client()), metrics{} {
metrics.start();
}
~IterationFrame() {
metrics.finish();
// Clean up any stale deferred admission token on error paths or at end of iteration.
admission::IngressRequestRateLimiter::clearDeferredAdmissionToken(client);
}
Client* client;
metrics_detail::Metrics metrics;
};
@ -753,11 +756,12 @@ void SessionWorkflow::Impl::_sendResponse() {
}
Status SessionWorkflow::Impl::_rateLimit() const {
if (!gFeatureFlagIngressRateLimiting.isEnabled() || !gIngressRequestRateLimiterEnabled.load()) {
if (!gFeatureFlagIngressRateLimiting.isEnabled() ||
!admission::gIngressRequestRateLimiterEnabled.load()) {
return Status::OK();
}
auto& admissionRateLimiter = IngressRequestRateLimiter::get(_serviceContext);
auto& admissionRateLimiter = admission::IngressRequestRateLimiter::get(_serviceContext);
return admissionRateLimiter.admitRequest(client());
}

View File

@ -307,7 +307,7 @@ private:
class SessionWorkflowRateLimitAcceptAllBm : public SessionWorkflowBm {
void doConfigureServerParameters(ServiceContext* sc) override {
gFeatureFlagIngressRateLimiting.setForServerParameter(true);
gIngressRequestRateLimiterEnabled.store(true);
admission::gIngressRequestRateLimiterEnabled.store(true);
}
};
@ -318,9 +318,9 @@ class SessionWorkflowRateLimitAcceptAllBm : public SessionWorkflowBm {
class SessionWorkflowRateLimitRejectAllBm : public SessionWorkflowBm {
void doConfigureServerParameters(ServiceContext* sc) override {
gFeatureFlagIngressRateLimiting.setForServerParameter(true);
gIngressRequestRateLimiterEnabled.store(true);
admission::gIngressRequestRateLimiterEnabled.store(true);
auto& rateLimiter = IngressRequestRateLimiter::get(getGlobalServiceContext());
auto& rateLimiter = admission::IngressRequestRateLimiter::get(getGlobalServiceContext());
auto const verySlowRatePerSec = 5e-6;
auto const smallestPossibleBurstSize = 1.0;

View File

@ -42,11 +42,13 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/admission/ingress_request_rate_limiter.h"
#include "mongo/db/admission/rate_limiter.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/baton.h"
#include "mongo/db/client.h"
#include "mongo/db/client_strand.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/error_labels.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_test_fixture.h"
@ -55,6 +57,7 @@
#include "mongo/otel/metrics/metrics_service.h"
#include "mongo/otel/metrics/metrics_test_util.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/legacy_reply_builder.h"
#include "mongo/rpc/message.h"
#include "mongo/rpc/op_compressed.h"
@ -105,6 +108,9 @@
namespace mongo::transport {
namespace {
using namespace admission;
const Status kClosedSessionError{ErrorCodes::SocketException, "Session is closed"};
const Status kNetworkError{ErrorCodes::HostUnreachable, "Someone is unreachable"};
const Status kShutdownError{ErrorCodes::ShutdownInProgress, "Something is shutting down"};
@ -330,6 +336,24 @@ public:
return std::move(pf->future);
}
// Overload that accepts a full callback. Blocks until the callback has returned, so any
// captures are visible to the caller as soon as expect() returns.
template <Event e>
Future<void> asyncExpect(unique_function<EventSigT<e>> cb) {
auto pf = std::make_shared<PromiseAndFuture<void>>();
injectMockResponse<e>([cb = std::move(cb), pf](auto&&... args) mutable -> EventResultT<e> {
if constexpr (std::is_void_v<EventResultT<e>>) {
cb(std::forward<decltype(args)>(args)...);
pf->promise.emplaceValue();
} else {
auto result = cb(std::forward<decltype(args)>(args)...);
pf->promise.emplaceValue();
return result;
}
});
return std::move(pf->future);
}
template <Event e>
void expect(EventResultT<e> r) {
asyncExpect<e>(std::move(r)).get();
@ -340,6 +364,28 @@ public:
asyncExpect<e>().get();
}
template <Event e>
void expect(unique_function<EventSigT<e>> cb) {
asyncExpect<e>(std::move(cb)).get();
}
/**
* Installs a one-shot sepHandleRequest mock that runs `cb` with the OperationContext and
* Message, then blocks until the mock fires. Using a synchronous wait ensures the
* expectation slot is empty by the time the helper returns, so subsequent expect<>/inject
* calls can safely push.
*/
void runSepHandleRequest(unique_function<DbResponse(OperationContext*, const Message&)> cb) {
auto pf = std::make_shared<PromiseAndFuture<void>>();
injectMockResponse<Event::sepHandleRequest>(
[cb = std::move(cb), pf](OperationContext* opCtx, const Message& msg) mutable {
auto response = cb(opCtx, msg);
pf->promise.emplaceValue();
return response;
});
pf->future.get();
}
std::function<void(Client*)> onClientDisconnectCb;
@ -747,6 +793,8 @@ public:
std::int32_t successfulAdmissions;
std::int32_t exemptedAdmissions;
std::int32_t attemptedAdmissions;
std::int32_t addedToQueue;
std::int32_t removedFromQueue;
double totalAvailableTokens;
};
@ -783,6 +831,8 @@ public:
.successfulAdmissions = stats["successfulAdmissions"].numberInt(),
.exemptedAdmissions = stats["exemptedAdmissions"].numberInt(),
.attemptedAdmissions = stats["attemptedAdmissions"].numberInt(),
.addedToQueue = stats["addedToQueue"].numberInt(),
.removedFromQueue = stats["removedFromQueue"].numberInt(),
.totalAvailableTokens = stats["totalAvailableTokens"].numberDouble(),
};
}
@ -793,6 +843,18 @@ public:
return uassertStatusOK(compressorManager.compressMessage(message, &cid));
}
static bool hasErrorLabels(const BSONObj& body,
std::initializer_list<StringData> expectedLabels) {
if (!body.hasField(kErrorLabelsFieldName))
return false;
auto labels = body[kErrorLabelsFieldName].Array();
return std::all_of(expectedLabels.begin(), expectedLabels.end(), [&](StringData expected) {
return std::any_of(labels.begin(), labels.end(), [&](const BSONElement& e) {
return e.String() == expected;
});
});
}
private:
double _convertBurstSizeToBurstCapacitySecs(double refreshRate, double burstSize) {
// Rounding needed to ensure that the conversion back to burstSize won't be incorrect due
@ -868,6 +930,135 @@ TEST_F(IngressRequestRateLimiterTest, FireAndForgetResponseCompressed) {
ASSERT_LT(stats.totalAvailableTokens, 1);
}
// Verifies that SessionWorkflow both sets the deferred admission token on the client and clears it
// via IterationFrame's destructor at the end of each iteration.
TEST_F(IngressRequestRateLimiterTest, IterationFrameClearsDeferredAdmissionTokenBetweenIterations) {
enableRateOverrideBehaviorWithSpecifiedBurstSize(1.0);
// Allow queueing so the post-burst request waits in the limiter rather than being rejected.
RAIIServerParameterControllerForTest queueDepth{"ingressRequestAdmissionMaxQueueDepth", 4};
startSession();
// Iteration 1: burst is available; SessionWorkflow's _rateLimit grants a ready token via
// admitRequest, leaving no deferred token on the client.
expect<Event::sessionSourceMessage>(setMoreToCome(makeOpMsg()));
runSepHandleRequest([](OperationContext* opCtx, const Message&) {
ASSERT_FALSE(
IngressRequestRateLimiter::hasDeferredAdmissionToken_forTest(opCtx->getClient()));
return makeResponse(Message{});
});
// Iteration 2: burst is exhausted; admitRequest queues and stores the resulting deferred
// token on the client. The handler observes the token, and IterationFrame's destructor must
// clear it when the iteration ends.
expect<Event::sessionSourceMessage>(setMoreToCome(makeOpMsg()));
runSepHandleRequest([](OperationContext* opCtx, const Message&) {
ASSERT_TRUE(
IngressRequestRateLimiter::hasDeferredAdmissionToken_forTest(opCtx->getClient()));
return makeResponse(Message{});
});
expect<Event::sessionSourceMessage>(kClosedSessionError);
expect<Event::sepEndSession>();
joinSessions();
// Iter 1 took the burst (success). Iter 2 added one to the queue, and the IterationFrame
// destructor tore down the unconsumed deferred token, releasing the queue slot.
const auto stats = getRateLimiterStats();
ASSERT_EQ(stats.attemptedAdmissions, 2);
ASSERT_EQ(stats.successfulAdmissions, 1);
ASSERT_EQ(stats.rejectedAdmissions, 0);
ASSERT_EQ(stats.addedToQueue, 1);
ASSERT_EQ(stats.removedFromQueue, 1);
}
TEST_F(IngressRequestRateLimiterTest, ImmediateRejectionHasExpectedErrorLabels) {
enableRateOverrideBehaviorWithSpecifiedBurstSize(1.0);
// Default maxQueueDepth is 0: requests are rejected immediately when the burst is exhausted.
startSession();
// Iteration 1: burst is available; consume it with a fire-and-forget request.
expect<Event::sessionSourceMessage>(setMoreToCome(makeOpMsg()));
runSepHandleRequest([](OperationContext*, const Message&) { return makeResponse(Message{}); });
// Iteration 2: burst is exhausted; rate limiter rejects immediately without calling
// sepHandleRequest. Capture the sunk rejection response; the callback overload of expect()
// blocks until the handler returns, so body is populated before the next expect() call.
expect<Event::sessionSourceMessage>(makeOpMsg());
BSONObj body;
expect<Event::sessionSinkMessage>([&](const Message& m) {
body = OpMsg::parse(m).body.getOwned();
return Status::OK();
});
expect<Event::sessionSourceMessage>(kClosedSessionError);
expect<Event::sepEndSession>();
joinSessions();
ASSERT_EQ(getStatusFromCommandResult(body).code(), ErrorCodes::IngressRequestRateLimitExceeded);
ASSERT_TRUE(hasErrorLabels(body,
{ErrorLabel::kSystemOverloadedError,
ErrorLabel::kRetryableError,
ErrorLabel::kNoWritesPerformed}));
const auto stats = getRateLimiterStats();
ASSERT_EQ(stats.attemptedAdmissions, 2);
ASSERT_EQ(stats.successfulAdmissions, 1);
ASSERT_EQ(stats.rejectedAdmissions, 1);
ASSERT_EQ(stats.addedToQueue, 0);
}
TEST_F(IngressRequestRateLimiterTest, QueueDepthExceededRejectionHasExpectedErrorLabels) {
enableRateOverrideBehaviorWithSpecifiedBurstSize(1.0);
RAIIServerParameterControllerForTest queueDepth{"ingressRequestAdmissionMaxQueueDepth", 1};
startSession();
// Iteration 1: burst is available; consume it with a fire-and-forget request.
expect<Event::sessionSourceMessage>(setMoreToCome(makeOpMsg()));
runSepHandleRequest([](OperationContext*, const Message&) { return makeResponse(Message{}); });
// Fill the single queue slot using a separate client so that the session's iter 2 request
// finds the queue at capacity. The session_workflow blocks in sessionSourceMessage.pop() until
// we push a handler, so filling the queue here is safe from races.
auto queueFillerClient =
getServiceContext()->getService()->makeClient("queue-depth-rejection-test");
ASSERT_OK(
IngressRequestRateLimiter::get(getServiceContext()).admitRequest(queueFillerClient.get()));
ASSERT_TRUE(
IngressRequestRateLimiter::hasDeferredAdmissionToken_forTest(queueFillerClient.get()));
// Iteration 2: queue is full; rate limiter rejects immediately. Capture the sunk rejection
// response; the callback overload of expect() blocks until the handler returns.
expect<Event::sessionSourceMessage>(makeOpMsg());
BSONObj body;
expect<Event::sessionSinkMessage>([&](const Message& m) {
body = OpMsg::parse(m).body.getOwned();
return Status::OK();
});
expect<Event::sessionSourceMessage>(kClosedSessionError);
expect<Event::sepEndSession>();
joinSessions();
// Release the queue slot held by the filler client.
IngressRequestRateLimiter::clearDeferredAdmissionToken(queueFillerClient.get());
ASSERT_EQ(getStatusFromCommandResult(body).code(), ErrorCodes::IngressRequestRateLimitExceeded);
ASSERT_TRUE(hasErrorLabels(body,
{ErrorLabel::kSystemOverloadedError,
ErrorLabel::kRetryableError,
ErrorLabel::kNoWritesPerformed}));
const auto stats = getRateLimiterStats();
ASSERT_EQ(stats.attemptedAdmissions, 3); // iter1 + queue-filler + iter2
ASSERT_EQ(stats.successfulAdmissions, 1);
ASSERT_EQ(stats.rejectedAdmissions, 1);
ASSERT_EQ(stats.addedToQueue, 1);
}
class StepRunnerSessionWorkflowTest : public SessionWorkflowTest {
public:
/**
@ -1214,5 +1405,6 @@ DEATH_TEST_F(SessionWorkflowTestDeathTest,
expect<Event::sepHandleRequest>(makeResponse(makeMessageWithOpcode(dbCompressed)));
joinSessions();
}
} // namespace
} // namespace mongo::transport