SERVER-119347 Generalize TicketHolder to support pluggable concurrency primitives and policies (#48816)

GitOrigin-RevId: acccc50427584b03732f7835f1895583925deabd
This commit is contained in:
Pol Piñol Castuera 2026-03-02 15:29:36 +01:00 committed by MongoDB Bot
parent a6c76161ec
commit 7579e58fb3
11 changed files with 1186 additions and 248 deletions

View File

@ -417,6 +417,7 @@ admission:
- src/mongo/db/admission/
- src/mongo/db/flow_control_ticketholder*
- src/mongo/util/concurrency/admission_context*
- src/mongo/util/concurrency/*ticket_semaphore*
- src/mongo/util/concurrency/ticketholder*
workload_simulation:

View File

@ -45,6 +45,21 @@ idl_generator(
src = "ticketholder_parameters.idl",
)
mongo_cc_library(
name = "ticket_semaphore",
srcs = [
"unordered_ticket_semaphore.cpp",
],
hdrs = [
"ticket_semaphore.h",
],
deps = [
":admission_context",
"//src/mongo:base",
"//src/mongo/db:server_base",
],
)
mongo_cc_library(
name = "ticketholder",
srcs = [
@ -53,6 +68,7 @@ mongo_cc_library(
],
deps = [
":admission_context",
":ticket_semaphore",
"//src/mongo/db:server_base",
"//src/mongo/db:server_feature_flags",
"//src/mongo/db/admission/execution_control:execution_control_stats",
@ -154,3 +170,20 @@ mongo_cc_unit_test(
"//src/mongo/util/concurrency:ticketholder",
],
)
mongo_cc_unit_test(
name = "ticket_semaphore_test",
srcs = [
"ticket_semaphore_test.cpp",
],
tags = ["mongo_unittest_eighth_group"],
deps = [
":ticket_semaphore",
"//src/mongo/db:server_options",
"//src/mongo/db:service_context_test_fixture",
"//src/mongo/db/auth:authmocks",
"//src/mongo/db/shard_role:service_context_non_d",
"//src/mongo/unittest",
"//src/mongo/util:packaged_task",
],
)

View File

@ -3,7 +3,7 @@ filters:
- "*":
approvers:
- 10gen/server-programmability
- "*ticketholder*":
- "*ticket*":
approvers:
- 10gen/server-workload-resilience
- "admission_context.*":

View File

@ -125,21 +125,6 @@ public:
Priority getPriority() const;
/**
* Setters for queue statistics to be used in unit tests only
*/
MONGO_MOD_PUBLIC void setAdmission_forTest(int32_t admissions);
MONGO_MOD_PUBLIC void setTotalTimeQueuedMicros_forTest(int64_t micros);
protected:
friend class ScopedAdmissionPriorityBase;
friend class Ticket;
friend class TicketHolder;
friend class WaitingForAdmissionGuard;
AdmissionContext() = default;
void recordAdmission();
void recordLowAdmission();
@ -160,6 +145,19 @@ protected:
_holdingTicket.store(false);
}
/**
* Setters for queue statistics to be used in unit tests only
*/
MONGO_MOD_PUBLIC void setAdmission_forTest(int32_t admissions);
MONGO_MOD_PUBLIC void setTotalTimeQueuedMicros_forTest(int64_t micros);
protected:
friend class ScopedAdmissionPriorityBase;
friend class WaitingForAdmissionGuard;
AdmissionContext() = default;
constexpr static TickSource::Tick kNotQueueing = -1;
MONGO_MOD_PRIVATE Atomic<std::int32_t> _exemptedAdmissions{0};

View File

@ -0,0 +1,94 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/modules.h"
#include "mongo/util/time_support.h"
#include <cstdint>
namespace mongo {
/**
* Controls concurrent access to a finite pool of permits.
*
* Callers acquire a permit before proceeding with a unit of work, and release it when done. If no
* permits are available, callers either fail immediately (tryAcquire) or block until one becomes
* available (acquire).
*/
class TicketSemaphore {
public:
virtual ~TicketSemaphore() = default;
/**
* Non-blocking acquire. Returns true if a permit was consumed.
*/
virtual bool tryAcquire() = 0;
/**
* Blocks until a permit is acquired, 'until' expires, or the operation is interrupted.
* Returns true if acquired.
*
* Throws 'AdmissionQueueOverflow' if the wait queue is full.
*/
virtual bool acquire(OperationContext* opCtx,
AdmissionContext* admCtx,
Date_t until,
bool interruptible) = 0;
/**
* Returns one permit to the sempahore, waking a queued waiter.
*/
virtual void release() = 0;
/**
* Adjusts the number of total permit count by 'delta'.
*/
virtual void resize(int delta) = 0;
/**
* Returns the instantaneous number of un-acquired permits (not checked out by an operation).
*/
virtual int available() const = 0;
/**
* Adjusts the maximum number of threads waiting.
*/
virtual void setMaxWaiters(int waiters) = 0;
/**
* Returns the instantaneous number of threads blocked in 'acquire'.
*/
virtual int waiters() const = 0;
};
} // namespace mongo

View File

@ -0,0 +1,679 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/util/concurrency/ticket_semaphore.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/concurrency/unordered_ticket_semaphore.h"
#include "mongo/util/duration.h"
#include "mongo/util/packaged_task.h"
#include <functional>
#include <memory>
#include <vector>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
namespace mongo {
using SemaphoreFactory = std::function<std::unique_ptr<TicketSemaphore>(int)>;
// Generous timeout so tests fail with a useful diagnostic instead of hanging.
constexpr auto kWaitTimeout = Minutes{1};
Date_t getDeadline() {
return Date_t::now() + kWaitTimeout;
}
/**
* Parameterized test fixture for TicketSemaphore implementations.
*/
class TicketSemaphoreTest : public ServiceContextTest,
public testing::WithParamInterface<SemaphoreFactory> {
public:
void setUp() override {
ServiceContextTest::setUp();
_client = getServiceContext()->getService()->makeClient("test");
_opCtx = _client->makeOperationContext();
ThreadPool::Options opts;
_pool = std::make_unique<ThreadPool>(opts);
_pool->startup();
}
void tearDown() override {
_pool->shutdown();
ServiceContextTest::tearDown();
}
std::unique_ptr<TicketSemaphore> makeSemaphore(int numPermits) {
return GetParam()(numPermits);
}
OperationContext* opCtx() {
return _opCtx.get();
}
/**
* Polls the semaphore's waiters() count until at least 'expectedWaiters' threads are
* queued. This is deterministic: once waiters() reaches the target, any subsequent
* release()/resize() is guaranteed to notify or leave permits for those threads.
*/
void waitUntilBlocked(TicketSemaphore* sem, int expectedWaiters) {
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
while (sem->waiters() < expectedWaiters) {
stdx::this_thread::yield();
}
});
}
template <std::invocable Callable>
auto spawn(Callable&& cb) -> Future<std::invoke_result_t<Callable>> {
using ReturnType = std::invoke_result_t<Callable>;
auto task = PackagedTask(std::forward<Callable>(cb));
auto taskFuture = task.getFuture();
_pool->schedule([runTask = std::move(task)](Status s) mutable {
invariant(s);
runTask();
});
return taskFuture;
}
protected:
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;
private:
std::unique_ptr<ThreadPool> _pool;
};
TEST_P(TicketSemaphoreTest, TryAcquireSucceedsWhenAvailable) {
auto sem = makeSemaphore(3);
ASSERT_EQ(sem->available(), 3);
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 2);
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 1);
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 0);
}
TEST_P(TicketSemaphoreTest, TryAcquireFailsWhenNoneAvailable) {
auto sem = makeSemaphore(1);
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 0);
ASSERT_FALSE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 0);
}
TEST_P(TicketSemaphoreTest, TryAcquireFailsWithZeroPermits) {
auto sem = makeSemaphore(0);
ASSERT_EQ(sem->available(), 0);
ASSERT_FALSE(sem->tryAcquire());
}
TEST_P(TicketSemaphoreTest, ReleaseIncrementsAvailable) {
auto sem = makeSemaphore(2);
ASSERT_TRUE(sem->tryAcquire());
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 0);
sem->release();
ASSERT_EQ(sem->available(), 1);
sem->release();
ASSERT_EQ(sem->available(), 2);
sem->release();
ASSERT_EQ(sem->available(), 3);
}
TEST_P(TicketSemaphoreTest, Resize) {
auto sem = makeSemaphore(5);
ASSERT_EQ(sem->available(), 5);
sem->resize(3);
ASSERT_EQ(sem->available(), 8);
sem->resize(-6);
ASSERT_EQ(sem->available(), 2);
sem->resize(-2);
ASSERT_EQ(sem->available(), 0);
ASSERT_FALSE(sem->tryAcquire());
}
TEST_P(TicketSemaphoreTest, ResizeNegativeBelowZero) {
auto sem = makeSemaphore(2);
ASSERT_TRUE(sem->tryAcquire());
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 0);
sem->resize(-3);
ASSERT_EQ(sem->available(), -3);
ASSERT_FALSE(sem->tryAcquire());
sem->release();
ASSERT_EQ(sem->available(), -2);
ASSERT_FALSE(sem->tryAcquire());
sem->release();
sem->release();
sem->release();
ASSERT_EQ(sem->available(), 1);
ASSERT_TRUE(sem->tryAcquire());
}
TEST_P(TicketSemaphoreTest, AcquireSucceedsWhenAvailable) {
auto sem = makeSemaphore(3);
MockAdmissionContext admCtx;
ASSERT_TRUE(sem->acquire(opCtx(), &admCtx, Date_t::max(), true /* interruptible */));
ASSERT_EQ(sem->available(), 2);
}
TEST_P(TicketSemaphoreTest, AcquireTimesOut) {
auto sem = makeSemaphore(0);
MockAdmissionContext admCtx;
auto deadline = Date_t::now() + Milliseconds{50};
ASSERT_FALSE(sem->acquire(opCtx(), &admCtx, deadline, true /* interruptible */));
MockAdmissionContext admCtx2;
auto deadline2 = Date_t::now() + Milliseconds{50};
ASSERT_FALSE(sem->acquire(opCtx(), &admCtx2, deadline2, false /* interruptible */));
}
TEST_P(TicketSemaphoreTest, AcquireOpCtxDeadlineThrows) {
auto sem = makeSemaphore(0);
auto client = getServiceContext()->getService()->makeClient("deadlineTest");
auto deadlineOpCtx = client->makeOperationContext();
deadlineOpCtx->setDeadlineAfterNowBy(Milliseconds{50}, ErrorCodes::MaxTimeMSExpired);
MockAdmissionContext admCtx;
ASSERT_THROWS_CODE(
sem->acquire(deadlineOpCtx.get(), &admCtx, Date_t::max(), true /* interruptible */),
DBException,
ErrorCodes::MaxTimeMSExpired);
}
TEST_P(TicketSemaphoreTest, AcquireExplicitDeadlineReturnsFalse) {
auto sem = makeSemaphore(0);
auto client = getServiceContext()->getService()->makeClient("deadlineTest");
auto deadlineOpCtx = client->makeOperationContext();
deadlineOpCtx->setDeadlineAfterNowBy(Minutes{5}, ErrorCodes::MaxTimeMSExpired);
MockAdmissionContext admCtx;
auto shortDeadline = Date_t::now() + Milliseconds{50};
ASSERT_FALSE(
sem->acquire(deadlineOpCtx.get(), &admCtx, shortDeadline, true /* interruptible */));
}
TEST_P(TicketSemaphoreTest, AcquireBlocksAndSucceedsWhenReleased) {
auto sem = makeSemaphore(1);
// Exhaust the single permit.
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 0);
// Spawn a thread that waits to acquire.
auto* rawSem = sem.get();
Future<bool> acquireFuture = spawn([&, rawSem]() {
auto client = getServiceContext()->getService()->makeClient("waiter");
auto waiterOpCtx = client->makeOperationContext();
MockAdmissionContext waiterAdmCtx;
return rawSem->acquire(waiterOpCtx.get(), &waiterAdmCtx, getDeadline(), true);
});
waitUntilBlocked(rawSem, 1);
// Release the permit -- the waiter should wake up and succeed.
rawSem->release();
bool result = false;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(acquireFuture).get(_opCtx.get());
});
ASSERT_TRUE(result);
}
TEST_P(TicketSemaphoreTest, AcquireRespectsInterrupt) {
auto sem = makeSemaphore(0);
auto client = getServiceContext()->getService()->makeClient("interruptee");
auto interruptOpCtx = client->makeOperationContext();
auto* rawSem = sem.get();
auto* rawOpCtx = interruptOpCtx.get();
Future<void> acquireFuture = spawn([&, rawSem, rawOpCtx]() {
MockAdmissionContext admCtx;
rawSem->acquire(rawOpCtx, &admCtx, Date_t::max(), true /* interruptible */);
});
waitUntilBlocked(rawSem, 1);
// Kill the operation -- the waiter should throw.
interruptOpCtx->markKilled(ErrorCodes::Interrupted);
ASSERT_THROWS_CODE(([&] {
_opCtx->runWithDeadline(
getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
std::move(acquireFuture).get(_opCtx.get());
});
})(),
DBException,
ErrorCodes::Interrupted);
}
TEST_P(TicketSemaphoreTest, MultipleWaitersAllAcquire) {
auto sem = makeSemaphore(0);
auto* rawSem = sem.get();
constexpr int numWaiters = 5;
std::vector<Future<bool>> futures;
for (int i = 0; i < numWaiters; ++i) {
futures.push_back(spawn([&, i, rawSem, svcCtx = getServiceContext()]() {
auto client = svcCtx->getService()->makeClient("waiter" + std::to_string(i));
auto waiterOpCtx = client->makeOperationContext();
MockAdmissionContext admCtx;
return rawSem->acquire(waiterOpCtx.get(), &admCtx, getDeadline(), true);
}));
}
waitUntilBlocked(rawSem, numWaiters);
for (int i = 0; i < numWaiters; ++i) {
rawSem->release();
}
for (auto& f : futures) {
bool result = false;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(f).get(_opCtx.get());
});
ASSERT_TRUE(result);
}
ASSERT_EQ(rawSem->available(), 0);
}
TEST_P(TicketSemaphoreTest, ResizeWakesWaiters) {
auto sem = makeSemaphore(0);
auto* rawSem = sem.get();
// Spawn a waiter that blocks because there are 0 permits.
Future<bool> waiter = spawn([&, rawSem, svcCtx = getServiceContext()]() {
auto client = svcCtx->getService()->makeClient("waiter");
auto waiterOpCtx = client->makeOperationContext();
MockAdmissionContext admCtx;
return rawSem->acquire(waiterOpCtx.get(), &admCtx, getDeadline(), true);
});
waitUntilBlocked(rawSem, 1);
// Resizing from 0 to 1 permit should wake the waiter.
rawSem->resize(1);
bool result = false;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(waiter).get(_opCtx.get());
});
ASSERT_TRUE(result);
}
TEST_P(TicketSemaphoreTest, ResizeNegativeWhileWaitersAreWaiting) {
// Start with 2 permits, acquire both so available = 0, then spawn waiters.
auto sem = makeSemaphore(2);
auto* rawSem = sem.get();
ASSERT_TRUE(sem->tryAcquire());
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 0);
constexpr int numWaiters = 2;
std::vector<Future<bool>> futures;
for (int i = 0; i < numWaiters; ++i) {
futures.push_back(spawn([&, i, rawSem, svcCtx = getServiceContext()]() {
auto client = svcCtx->getService()->makeClient("waiter" + std::to_string(i));
auto waiterOpCtx = client->makeOperationContext();
MockAdmissionContext admCtx;
return rawSem->acquire(waiterOpCtx.get(), &admCtx, getDeadline(), true);
}));
}
waitUntilBlocked(rawSem, numWaiters);
// Resize permits into negative territory while waiters are blocked.
rawSem->resize(-3);
ASSERT_EQ(rawSem->available(), -3);
// Releasing the 2 held permits only brings available to -1; waiters stay blocked.
rawSem->release();
rawSem->release();
ASSERT_FALSE(futures[0].isReady());
ASSERT_FALSE(futures[1].isReady());
// Additional releases bring available positive so waiters can acquire.
rawSem->release();
rawSem->release();
rawSem->release();
for (auto& f : futures) {
bool result = false;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(f).get(_opCtx.get());
});
ASSERT_TRUE(result);
}
}
TEST_P(TicketSemaphoreTest, MaxWaitersZeroRejectsWaiters) {
auto sem = makeSemaphore(2);
sem->setMaxWaiters(0);
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 1);
// Acquire succeeds without queuing when a permit is available.
MockAdmissionContext admCtx;
ASSERT_TRUE(sem->acquire(opCtx(), &admCtx, getDeadline(), true /* interruptible */));
ASSERT_EQ(sem->available(), 0);
// But once no permits are available, it must queue — which is rejected at maxWaiters 0.
MockAdmissionContext admCtx2;
ASSERT_THROWS_CODE(sem->acquire(opCtx(), &admCtx2, getDeadline(), true /* interruptible */),
DBException,
ErrorCodes::AdmissionQueueOverflow);
}
TEST_P(TicketSemaphoreTest, MaxWaitersAllowsUpToLimit) {
auto sem = makeSemaphore(0);
sem->setMaxWaiters(2);
auto* rawSem = sem.get();
std::vector<Future<bool>> futures;
for (int i = 0; i < 2; ++i) {
futures.push_back(spawn([&, i, rawSem, svcCtx = getServiceContext()]() {
auto client = svcCtx->getService()->makeClient("waiter" + std::to_string(i));
auto waiterOpCtx = client->makeOperationContext();
MockAdmissionContext admCtx;
return rawSem->acquire(waiterOpCtx.get(), &admCtx, getDeadline(), true);
}));
}
waitUntilBlocked(rawSem, 2);
// A 3rd waiter exceeds the limit.
MockAdmissionContext admCtx;
ASSERT_THROWS_CODE(sem->acquire(opCtx(), &admCtx, getDeadline(), true /* interruptible */),
DBException,
ErrorCodes::AdmissionQueueOverflow);
rawSem->release();
rawSem->release();
for (auto& f : futures) {
bool result = false;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(f).get(_opCtx.get());
});
ASSERT_TRUE(result);
}
}
TEST_P(TicketSemaphoreTest, SetMaxWaitersDoesNotAffectExistingWaiters) {
auto sem = makeSemaphore(0);
auto* rawSem = sem.get();
Future<bool> waiter = spawn([&, rawSem, svcCtx = getServiceContext()]() {
auto client = svcCtx->getService()->makeClient("waiter");
auto waiterOpCtx = client->makeOperationContext();
MockAdmissionContext admCtx;
return rawSem->acquire(waiterOpCtx.get(), &admCtx, getDeadline(), true);
});
waitUntilBlocked(rawSem, 1);
// Shrink maxWaiters to 0 while 1 waiter is already queued.
rawSem->setMaxWaiters(0);
// The existing waiter should still succeed when a permit becomes available.
rawSem->release();
bool result = false;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(waiter).get(_opCtx.get());
});
ASSERT_TRUE(result);
}
TEST_P(TicketSemaphoreTest, TryAcquireBypassesMaxWaiters) {
auto sem = makeSemaphore(2);
sem->setMaxWaiters(0);
ASSERT_TRUE(sem->tryAcquire());
ASSERT_TRUE(sem->tryAcquire());
ASSERT_EQ(sem->available(), 0);
ASSERT_FALSE(sem->tryAcquire());
}
TEST_P(TicketSemaphoreTest, WaitersCountReflectsBlockedThreads) {
auto sem = makeSemaphore(0);
auto* rawSem = sem.get();
ASSERT_EQ(rawSem->waiters(), 0);
constexpr int numWaiters = 3;
std::vector<Future<bool>> futures;
for (int i = 0; i < numWaiters; ++i) {
futures.push_back(spawn([&, i, rawSem, svcCtx = getServiceContext()]() {
auto client = svcCtx->getService()->makeClient("waiter" + std::to_string(i));
auto waiterOpCtx = client->makeOperationContext();
MockAdmissionContext admCtx;
return rawSem->acquire(waiterOpCtx.get(), &admCtx, getDeadline(), true);
}));
}
waitUntilBlocked(rawSem, numWaiters);
for (int i = 0; i < numWaiters; ++i) {
rawSem->release();
}
for (auto& f : futures) {
_opCtx->runWithDeadline(
getDeadline(), ErrorCodes::ExceededTimeLimit, [&] { std::move(f).get(_opCtx.get()); });
}
ASSERT_EQ(rawSem->waiters(), 0);
}
TEST_P(TicketSemaphoreTest, NonInterruptibleAcquireReturnsFalseOnKillAndTimeout) {
auto sem = makeSemaphore(0);
auto* rawSem = sem.get();
auto client = getServiceContext()->getService()->makeClient("survivor");
auto survivorOpCtx = client->makeOperationContext();
auto* rawOpCtx = survivorOpCtx.get();
auto deadline = Date_t::now() + Milliseconds{500};
Future<bool> acquireFuture = spawn([&, rawSem, rawOpCtx, deadline]() {
MockAdmissionContext admCtx;
return rawSem->acquire(rawOpCtx, &admCtx, deadline, false /* interruptible */);
});
waitUntilBlocked(rawSem, 1);
survivorOpCtx->markKilled(ErrorCodes::Interrupted);
bool result = true;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(acquireFuture).get(_opCtx.get());
});
ASSERT_FALSE(result);
}
TEST_P(TicketSemaphoreTest, NonInterruptibleAcquireWaitsForPermitAfterKill) {
auto sem = makeSemaphore(0);
auto* rawSem = sem.get();
auto client = getServiceContext()->getService()->makeClient("survivor");
auto survivorOpCtx = client->makeOperationContext();
auto* rawOpCtx = survivorOpCtx.get();
Future<bool> acquireFuture = spawn([&, rawSem, rawOpCtx]() {
MockAdmissionContext admCtx;
return rawSem->acquire(rawOpCtx, &admCtx, Date_t::max(), false /* interruptible */);
});
waitUntilBlocked(rawSem, 1);
survivorOpCtx->markKilled(ErrorCodes::Interrupted);
sleepFor(Milliseconds{500});
ASSERT_FALSE(acquireFuture.isReady());
rawSem->resize(1);
bool result = false;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(acquireFuture).get(_opCtx.get());
});
ASSERT_TRUE(result);
}
TEST_P(TicketSemaphoreTest, WaiterCountDecreasesAfterTimeout) {
auto sem = makeSemaphore(0);
sem->setMaxWaiters(1);
auto* rawSem = sem.get();
MockAdmissionContext admCtx;
auto shortDeadline = Date_t::now() + Milliseconds{50};
ASSERT_FALSE(sem->acquire(opCtx(), &admCtx, shortDeadline, false /* interruptible */));
ASSERT_EQ(rawSem->waiters(), 0);
// The waiter slot was reclaimed, so a new waiter can queue without overflow.
Future<bool> waiter = spawn([&, rawSem, svcCtx = getServiceContext()]() {
auto client = svcCtx->getService()->makeClient("waiter");
auto waiterOpCtx = client->makeOperationContext();
MockAdmissionContext admCtx2;
return rawSem->acquire(
waiterOpCtx.get(), &admCtx2, getDeadline(), true /* interruptible */);
});
waitUntilBlocked(rawSem, 1);
rawSem->release();
bool result = false;
_opCtx->runWithDeadline(getDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
result = std::move(waiter).get(_opCtx.get());
});
ASSERT_TRUE(result);
}
TEST_P(TicketSemaphoreTest, ConcurrentAcquireDoesNotOverbookOrLeak) {
constexpr int numPermits = 3;
constexpr int numThreads = 10;
constexpr int opsPerThread = 50;
auto sem = makeSemaphore(numPermits);
auto* rawSem = sem.get();
// Track maximum concurrent permit holders to verify no overbooking.
AtomicWord<int> concurrentHolders{0};
AtomicWord<int> maxConcurrentHolders{0};
std::vector<stdx::thread> threads;
for (int i = 0; i < numThreads; ++i) {
threads.emplace_back([&, i]() {
auto client =
getServiceContext()->getService()->makeClient("worker" + std::to_string(i));
auto workerOpCtx = client->makeOperationContext();
MockAdmissionContext admCtx;
PseudoRandom rng(SecureRandom().nextInt64());
for (int op = 0; op < opsPerThread; ++op) {
ASSERT_TRUE(rawSem->acquire(workerOpCtx.get(), &admCtx, getDeadline(), true));
auto current = concurrentHolders.fetchAndAdd(1) + 1;
// Update peak.
auto peak = maxConcurrentHolders.load();
while (current > peak && !maxConcurrentHolders.compareAndSwap(&peak, current)) {
}
// Simulate short work.
sleepFor(Milliseconds{rng.nextInt32(4)});
concurrentHolders.fetchAndSubtract(1);
rawSem->release();
}
});
}
for (auto& t : threads) {
t.join();
}
// No overbooking: peak concurrent holders must not exceed the number of permits.
ASSERT_LTE(maxConcurrentHolders.load(), numPermits);
// No leak: all permits are returned.
ASSERT_EQ(rawSem->available(), numPermits);
ASSERT_EQ(concurrentHolders.load(), 0);
}
INSTANTIATE_TEST_SUITE_P(UnorderedTicketSemaphore,
TicketSemaphoreTest,
testing::Values([](int numPermits) -> std::unique_ptr<TicketSemaphore> {
return std::make_unique<UnorderedTicketSemaphore>(
numPermits, static_cast<int>(DEFAULT_MAX_CONN));
}));
} // namespace mongo

View File

@ -33,6 +33,7 @@
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/ticketholder_parameters_gen.h"
#include "mongo/util/concurrency/unordered_ticket_semaphore.h"
#include "mongo/util/duration.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/tick_source.h"
@ -44,27 +45,28 @@ namespace mongo {
TicketHolder::TicketHolder(ServiceContext* serviceContext,
int numTickets,
bool trackPeakUsed,
std::int32_t maxQueueDepth,
int maxQueueDepth,
DelinquentCallback delinquentCallback,
AcquisitionCallback acquisitionCallback,
WaitedAcquisitionCallback waitedAcquisitionCallback,
ReleaseCallback releaseCallback,
ResizePolicy resizePolicy)
ResizePolicy resizePolicy,
std::unique_ptr<TicketSemaphore> semaphore)
: _trackPeakUsed(trackPeakUsed),
_resizePolicy(resizePolicy),
_serviceContext(serviceContext),
_tickets(numTickets),
_maxQueueDepth(maxQueueDepth),
_tickSource(serviceContext->getTickSource()),
_outof(numTickets),
_reportDelinquentOpCallback(delinquentCallback),
_reportAcquisitionOpCallback(acquisitionCallback),
_reportWaitedAcquisitionOpCallback(waitedAcquisitionCallback),
_reportReleaseOpCallback(releaseCallback) {
_semaphore = semaphore ? std::move(semaphore)
: std::make_unique<UnorderedTicketSemaphore>(numTickets, maxQueueDepth);
_enabledDelinquent = gFeatureFlagRecordDelinquentMetrics.isEnabled();
_delinquentMs = Milliseconds(gDelinquentAcquisitionIntervalMillis.load());
}
bool TicketHolder::resize(OperationContext* opCtx, int32_t newSize, Date_t deadline) {
bool TicketHolder::resize(OperationContext* opCtx, int newSize, Date_t deadline) {
stdx::lock_guard<stdx::mutex> lk(_resizeMutex);
auto difference = newSize - _outof.load();
MockAdmissionContext admCtx;
@ -75,7 +77,7 @@ bool TicketHolder::resize(OperationContext* opCtx, int32_t newSize, Date_t deadl
// Hand out tickets one-by-one until we've given them all out.
for (auto remaining = difference; remaining > 0; remaining--) {
// This call bypasses statistics reporting.
_releaseNormalPriorityTicket(&admCtx);
_semaphore->release();
_outof.fetchAndAdd(1);
}
} else {
@ -84,15 +86,22 @@ bool TicketHolder::resize(OperationContext* opCtx, int32_t newSize, Date_t deadl
opCtx);
// Make sure the operation isn't interrupted before waiting for tickets.
opCtx->checkForInterrupt();
auto effectiveDeadline =
opCtx->hasDeadline() ? std::min(deadline, opCtx->getDeadline()) : deadline;
// Take tickets one-by-one without releasing.
for (auto remaining = -difference; remaining > 0; remaining--) {
// This call bypasses statistics reporting.
auto ticket = _performWaitForTicketUntil(opCtx, &admCtx, deadline, true);
if (!ticket) {
if (!_semaphore->acquire(
opCtx, &admCtx, effectiveDeadline, true /* interruptible */)) {
// We timed out getting a ticket, fail the resize.
return false;
}
ticket->discard();
// Reducing capacity does not require issuing a ticket. The semaphore has
// already admitted the operation via waitForAdmission, and we simply remove it
// from the pool.
_outof.fetchAndSubtract(1);
}
}
@ -104,29 +113,14 @@ bool TicketHolder::resize(OperationContext* opCtx, int32_t newSize, Date_t deadl
MONGO_UNREACHABLE;
}
void TicketHolder::setMaxQueueDepth(int32_t size) {
_maxQueueDepth.store(size);
void TicketHolder::setMaxQueueDepth(int size) {
_semaphore->setMaxWaiters(size);
}
void TicketHolder::_immediateResize(WithLock, int32_t newSize) {
void TicketHolder::_immediateResize(WithLock, int newSize) {
auto oldSize = _outof.swap(newSize);
auto delta = newSize - oldSize;
auto oldAvailable = _tickets.fetchAndAdd(delta);
if ((oldAvailable <= 0) && ((oldAvailable + delta) > 0)) {
_tickets.notifyMany(oldAvailable + delta);
}
}
void TicketHolder::_releaseTicketUpdateStats(Ticket& ticket) {
if (ticket._priority == AdmissionContext::Priority::kExempt) {
_updateQueueStatsOnRelease(_exemptStats, ticket);
return;
}
ticket._admissionContext->markTicketReleased();
_updateQueueStatsOnRelease(_holderStats, ticket);
_releaseNormalPriorityTicket(ticket._admissionContext);
_semaphore->resize(delta);
}
Ticket TicketHolder::waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx) {
@ -148,17 +142,24 @@ boost::optional<Ticket> TicketHolder::waitForTicketUntilNoInterrupt_DO_NOT_USE(
boost::optional<Ticket> TicketHolder::_waitForTicketUntilMaybeInterruptible(
OperationContext* opCtx, AdmissionContext* admCtx, Date_t until, bool interruptible) {
// Attempt a quick acquisition first.
if (auto ticket = tryAcquire(admCtx)) {
return ticket;
if (admCtx->getPriority() == AdmissionContext::Priority::kExempt) {
return _issueTicket(admCtx);
}
auto tickSource = _serviceContext->getTickSource();
if (_semaphore->tryAcquire()) {
return _issueTicket(admCtx);
}
auto startWaitTime = _tickSource->getTicks();
_holderStats.totalAddedQueue.fetchAndAddRelaxed(1);
ON_BLOCK_EXIT([&, startWaitTime = tickSource->getTicks()] {
auto waitDelta = tickSource->ticksTo<Microseconds>(tickSource->getTicks() - startWaitTime);
ON_BLOCK_EXIT([&] {
auto waitDelta =
_tickSource->ticksTo<Microseconds>(_tickSource->getTicks() - startWaitTime);
_holderStats.totalTimeQueuedMicros.fetchAndAddRelaxed(waitDelta.count());
_holderStats.totalRemovedQueue.fetchAndAddRelaxed(1);
if (_reportWaitedAcquisitionOpCallback) {
_reportWaitedAcquisitionOpCallback(admCtx, waitDelta);
}
@ -169,126 +170,34 @@ boost::optional<Ticket> TicketHolder::_waitForTicketUntilMaybeInterruptible(
_holderStats.totalCanceled.fetchAndAddRelaxed(1);
});
WaitingForAdmissionGuard waitForAdmission(admCtx, tickSource);
auto ticket = _performWaitForTicketUntil(opCtx, admCtx, until, interruptible);
if (ticket) {
cancelWait.dismiss();
_updateQueueStatsOnTicketAcquisition(admCtx, _holderStats, admCtx->getPriority());
_updatePeakUsed();
return ticket;
} else {
return boost::none;
}
}
boost::optional<Ticket> TicketHolder::_performWaitForTicketUntil(OperationContext* opCtx,
AdmissionContext* admCtx,
Date_t until,
bool interruptible) {
// Cap 'until' to not exceed the operation's deadline.
if (interruptible && opCtx->hasDeadline()) {
until = std::min(until, opCtx->getDeadline());
}
auto nextDeadline = [&]() {
// Timed waits can be problematic if we have a large number of waiters, since each time we
// check for interrupt we risk waking up all waiting threads at the same time. We introduce
// some jitter here to try to reduce the impact of a thundering herd of waiters woken at
// the same time.
static int32_t baseIntervalMs = 500;
static double jitterFactor = 0.2;
static thread_local XorShift128 urbg(SecureRandom().nextInt64());
int32_t offset = std::uniform_int_distribution<int32_t>(
-jitterFactor * baseIntervalMs, baseIntervalMs * jitterFactor)(urbg);
return std::min(until, Date_t::now() + Milliseconds{baseIntervalMs + offset});
};
WaitingForAdmissionGuard waitForAdmission(admCtx, _tickSource);
bool hasStartedWaiting = false;
// Since uassert throws, we use raii to substract the waiter count
ON_BLOCK_EXIT([&] {
if (hasStartedWaiting) {
_waiterCount.fetchAndSubtract(1);
}
});
while (true) {
if (boost::optional<Ticket> maybeTicket = _tryAcquireNormalPriorityTicket(admCtx);
maybeTicket) {
return maybeTicket;
}
Date_t deadline = nextDeadline();
if (!hasStartedWaiting) {
const auto previousWaiterCount = _waiterCount.fetchAndAdd(1);
hasStartedWaiting = true;
if (previousWaiterCount >= _maxQueueDepth.loadRelaxed()) {
admCtx->recordOperationLoadShed();
uasserted(
ErrorCodes::AdmissionQueueOverflow,
"MongoDB is overloaded and cannot accept new operations. Try again later.");
}
}
_tickets.waitUntil(0, deadline);
if (interruptible) {
opCtx->checkForInterrupt();
}
if (deadline == until) {
if (!interruptible) {
// Uninterruptible wait - just return timeout
return boost::none;
}
// It's possible that system_clock (used by BasicWaitableAtomic::waitUntil)
// is slightly ahead of FastClock (used by OperationContext::checkForInterrupt).
// Handle this clock skew by explicitly checking the deadline against until,
// similar to OperationContext::waitForConditionOrInterruptNoAssertUntil.
opCtx->checkForDeadlineExpired(until);
// We only reach here when 'until' < opCtx->getDeadline(), meaning the caller
// provided an explicit deadline that is earlier than the operation's maxTimeMS.
// In this case, we return boost::none to indicate the caller's deadline expired
// (non-throwing), while the operation itself remains alive.
return boost::none;
}
if (_semaphore->acquire(opCtx, admCtx, until, interruptible)) {
cancelWait.dismiss();
return _issueTicket(admCtx);
}
return boost::none;
}
boost::optional<Ticket> TicketHolder::tryAcquire(AdmissionContext* admCtx) {
const auto currentPriority = admCtx->getPriority();
if (currentPriority == AdmissionContext::Priority::kExempt) {
_updateQueueStatsOnTicketAcquisition(admCtx, _exemptStats, currentPriority);
return Ticket{this, admCtx};
if (admCtx->getPriority() == AdmissionContext::Priority::kExempt) {
return _issueTicket(admCtx);
}
auto ticket = _tryAcquireNormalPriorityTicket(admCtx);
if (ticket) {
_updateQueueStatsOnTicketAcquisition(admCtx, _holderStats, currentPriority);
_updatePeakUsed();
if (_semaphore->tryAcquire()) {
return _issueTicket(admCtx);
}
return ticket;
return boost::none;
}
boost::optional<Ticket> TicketHolder::_tryAcquireNormalPriorityTicket(AdmissionContext* admCtx) {
int32_t available = _tickets.load();
while (true) {
if (available <= 0) {
return boost::none;
}
if (_tickets.compareAndSwap(&available, available - 1)) {
return _makeTicket(admCtx);
}
}
}
int32_t TicketHolder::getAndResetPeakUsed() {
int TicketHolder::getAndResetPeakUsed() {
invariant(_trackPeakUsed);
return _peakUsed.swap(used());
}
@ -339,26 +248,44 @@ void TicketHolder::_appendQueueStats(BSONObjBuilder& b, const QueueStats& stats)
b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed());
}
void TicketHolder::_updateQueueStatsOnRelease(TicketHolder::QueueStats& queueStats,
const Ticket& ticket) {
queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1);
auto tickSource = _serviceContext->getTickSource();
Ticket TicketHolder::_issueTicket(AdmissionContext* admCtx) {
if (admCtx->getPriority() != AdmissionContext::Priority::kExempt) {
admCtx->markTicketHeld();
}
_recordAcquisition(admCtx, admCtx->getPriority());
return Ticket{this, admCtx};
}
void TicketHolder::_releaseTicket(Ticket& ticket) {
auto delta =
tickSource->ticksTo<Microseconds>(tickSource->getTicks() - ticket._acquisitionTime);
_tickSource->ticksTo<Microseconds>(_tickSource->getTicks() - ticket._acquisitionTime);
auto& queueStats =
ticket._priority == AdmissionContext::Priority::kExempt ? _exemptStats : _holderStats;
queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1);
queueStats.totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count());
if (_enabledDelinquent && _reportDelinquentOpCallback && delta > _delinquentMs) {
_reportDelinquentOpCallback(ticket.getAdmissionContext(),
duration_cast<Milliseconds>(delta));
}
if (_reportReleaseOpCallback) {
_reportReleaseOpCallback(ticket._admissionContext, duration_cast<Microseconds>(delta));
}
if (ticket._priority == AdmissionContext::Priority::kExempt) {
return;
}
ticket._admissionContext->markTicketReleased();
_semaphore->release();
}
void TicketHolder::_updateQueueStatsOnTicketAcquisition(AdmissionContext* admCtx,
TicketHolder::QueueStats& queueStats,
AdmissionContext::Priority priority) {
void TicketHolder::_recordAcquisition(AdmissionContext* admCtx,
AdmissionContext::Priority priority) {
// The admission context is shared across both normal and low priority ticket holders, so we
// need to check the priority-specific counter rather than total admissions.
bool isNewAdmission = false;
@ -368,9 +295,13 @@ void TicketHolder::_updateQueueStatsOnTicketAcquisition(AdmissionContext* admCtx
isNewAdmission = admCtx->getAdmissions() == 0;
}
auto& queueStats =
priority == AdmissionContext::Priority::kExempt ? _exemptStats : _holderStats;
if (isNewAdmission) {
queueStats.totalNewAdmissions.fetchAndAddRelaxed(1);
}
queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
if (priority == AdmissionContext::Priority::kExempt) {
admCtx->recordExemptedAdmission();
@ -381,7 +312,11 @@ void TicketHolder::_updateQueueStatsOnTicketAcquisition(AdmissionContext* admCtx
}
admCtx->recordAdmission();
queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
if (priority != AdmissionContext::Priority::kExempt) {
_updatePeakUsed();
}
if (_reportAcquisitionOpCallback) {
_reportAcquisitionOpCallback(admCtx);
}
@ -391,38 +326,22 @@ int64_t TicketHolder::numFinishedProcessing() const {
return _holderStats.totalFinishedProcessing.load();
}
int64_t TicketHolder::queued() const {
auto removed = _holderStats.totalRemovedQueue.loadRelaxed();
auto added = _holderStats.totalAddedQueue.loadRelaxed();
return std::max(added - removed, (int64_t)0);
};
int32_t TicketHolder::available() const {
return _tickets.load();
int TicketHolder::queued() const {
return _semaphore->waiters();
}
void TicketHolder::_releaseNormalPriorityTicket(AdmissionContext* admCtx) {
// Notifying a futex costs a syscall. Since queued waiters guarantee that the `_waiterCount` is
// non-zero while they are waiting, we can avoid the needless cost if there are tickets and no
// queued waiters.
int32_t availableBeforeIncrementing = _tickets.fetchAndAdd(1);
if (availableBeforeIncrementing >= 0 && _waiterCount.load() > 0) {
_tickets.notifyOne();
}
int TicketHolder::available() const {
return _semaphore->available();
}
void TicketHolder::setNumFinishedProcessing_forTest(int32_t numFinishedProcessing) {
void TicketHolder::setNumFinishedProcessing_forTest(int64_t numFinishedProcessing) {
_holderStats.totalFinishedProcessing.store(numFinishedProcessing);
}
void TicketHolder::setPeakUsed_forTest(int32_t used) {
void TicketHolder::setPeakUsed_forTest(int used) {
_peakUsed.store(used);
}
int32_t TicketHolder::waiting_forTest() const {
return _waiterCount.load();
}
void TicketHolder::incrementDelinquencyStats(
const admission::execution_control::DelinquencyStats& newStats) {
_delinquencyStats += newStats;

View File

@ -32,19 +32,17 @@
#include "mongo/db/admission/execution_control/execution_control_stats.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/waitable_atomic.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/ticket_semaphore.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/modules.h"
#include "mongo/util/tick_source.h"
#include "mongo/util/time_support.h"
#include <cstdint>
#include <limits>
#include <boost/move/utility_core.hpp>
#include <boost/optional/optional.hpp>
namespace mongo {
@ -52,9 +50,13 @@ namespace mongo {
class Ticket;
/**
* Maintains and distributes tickets across operations from a limited pool of tickets. The ticketing
* mechanism is required for global lock acquisition to reduce contention on storage engine
* resources.
* Concurrent admission control mechanism that distributes a finite number of tickets to limit how
* many operations may proceed simultaneously. Operations acquire a ticket before performing work
* and release it when done/yield. If tickets are exhausted, incoming operations block until one
* becomes available.
*
* Additionally, it tracks queue and processing statistics (wait times, queue depth, cancellations,
* peak usage) and fires observer callbacks on acquisition, release, and delinquent operations.
*/
class MONGO_MOD_PUBLIC TicketHolder {
friend class Ticket;
@ -82,17 +84,18 @@ public:
* The default value for maxQueueDepth. It it set to the default max connection amount, which is
* practically infinite for the purpose of the ticket holder.
*/
static constexpr auto kDefaultMaxQueueDepth = static_cast<std::int32_t>(DEFAULT_MAX_CONN);
static constexpr int kDefaultMaxQueueDepth = static_cast<int>(DEFAULT_MAX_CONN);
TicketHolder(ServiceContext* serviceContext,
int numTickets,
bool trackPeakUsed,
std::int32_t maxQueueDepth,
int maxQueueDepth,
DelinquentCallback delinquentCallback = nullptr,
AcquisitionCallback acquisitionCallback = nullptr,
WaitedAcquisitionCallback waitedAcquisitionCallback = nullptr,
ReleaseCallback releaseCallback = nullptr,
ResizePolicy resizePolicy = ResizePolicy::kGradual);
ResizePolicy resizePolicy = ResizePolicy::kGradual,
std::unique_ptr<TicketSemaphore> semaphore = nullptr);
/**
* Adjusts the total number of tickets allocated for the ticket pool to 'newSize'.
@ -100,13 +103,13 @@ public:
* Returns 'true' if the resize completed without reaching the 'deadline', and 'false'
* otherwise.
*/
bool resize(OperationContext* opCtx, int32_t newSize, Date_t deadline = Date_t::max());
bool resize(OperationContext* opCtx, int newSize, Date_t deadline = Date_t::max());
/**
* Adjusts the maximum number of threads waiting for a ticket. Will not affect threads already
* waiting
*/
void setMaxQueueDepth(int32_t newSize);
void setMaxQueueDepth(int newSize);
/**
* Attempts to acquire a ticket without blocking. Returns a ticket if one is available,
@ -147,39 +150,33 @@ public:
/**
* The total number of tickets allotted to the ticket pool.
*/
int32_t outof() const {
int outof() const {
return _outof.loadRelaxed();
}
/**
* Instantaneous number of tickets that are checked out by an operation.
*/
int32_t used() const {
int used() const {
return outof() - available();
}
/**
* Instantaneous number of operations waiting in queue for a ticket.
* TODO SERVER-74082: Consider changing this metric to int32_t.
*/
int64_t queued() const;
int queued() const;
/**
* Peak number of tickets checked out at once since the previous time this function was called.
* Invariants that 'trackPeakUsed' has been passed to the TicketHolder,
*/
int32_t getAndResetPeakUsed();
/**
* Exposes the amount of waiting threads for testing purpose.
*/
int32_t waiting_forTest() const;
int getAndResetPeakUsed();
/**
* Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket
* pool.
*/
int32_t available() const;
int available() const;
/**
* The total number of operations that acquired a ticket, completed their work, and released the
@ -187,9 +184,9 @@ public:
*/
int64_t numFinishedProcessing() const;
MONGO_MOD_PRIVATE void setNumFinishedProcessing_forTest(int32_t numFinishedProcessing);
MONGO_MOD_PRIVATE void setNumFinishedProcessing_forTest(int64_t numFinishedProcessing);
MONGO_MOD_PRIVATE void setPeakUsed_forTest(int32_t used);
MONGO_MOD_PRIVATE void setPeakUsed_forTest(int used);
/**
* Appends all queue and delinquency stats.
@ -230,36 +227,31 @@ private:
AtomicWord<std::int64_t> totalTimeQueuedMicros{0};
};
/**
* Generates and returns a ticket to the caller.
*/
Ticket _issueTicket(AdmissionContext* admCtx);
/**
* Releases a ticket back into the ticket pool and updates queueing statistics. Tickets
* issued for exempt operations do not get deposited back to the pool.
* This function must not throw.
*/
void _releaseTicketUpdateStats(Ticket& ticket);
/**
* This function must not throw.
*/
void _releaseNormalPriorityTicket(AdmissionContext* admCtx);
boost::optional<Ticket> _tryAcquireNormalPriorityTicket(AdmissionContext* admCtx);
void _releaseTicket(Ticket& ticket);
boost::optional<Ticket> _waitForTicketUntilMaybeInterruptible(OperationContext* opCtx,
AdmissionContext* admCtx,
Date_t until,
bool interruptible);
boost::optional<Ticket> _performWaitForTicketUntil(OperationContext* opCtx,
AdmissionContext* admCtx,
Date_t until,
bool interruptible);
void _updatePeakUsed();
const bool _trackPeakUsed;
void _updateQueueStatsOnRelease(TicketHolder::QueueStats& queueStats, const Ticket& ticket);
void _updateQueueStatsOnTicketAcquisition(AdmissionContext* admCtx,
TicketHolder::QueueStats& queueStats,
AdmissionContext::Priority priority);
/**
* Updates statistics and notifies the observer for a ticket acquisition at the given priority.
*/
void _recordAcquisition(AdmissionContext* admCtx, AdmissionContext::Priority priority);
/**
* Appends the statistics stored in QueueStats to BSONObjBuilder b; We track statistics
@ -267,26 +259,18 @@ private:
*/
void _appendQueueStats(BSONObjBuilder& b, const QueueStats& stats) const;
void _immediateResize(WithLock, int32_t newSize);
/**
* Creates a ticket for a non-exempt admission.
*/
Ticket _makeTicket(AdmissionContext* admCtx);
void _immediateResize(WithLock, int newSize);
QueueStats _holderStats;
QueueStats _exemptStats;
ResizePolicy _resizePolicy;
ServiceContext* _serviceContext;
TickSource* _tickSource;
// Serializes updates to _outof to ensure only 1 thread can change the size of the ticket pool
// at a time. Reading _outof does not require holding the lock.
stdx::mutex _resizeMutex;
BasicWaitableAtomic<int32_t> _tickets;
Atomic<int32_t> _maxQueueDepth;
Atomic<int32_t> _waiterCount{0};
Atomic<int32_t> _outof;
Atomic<int32_t> _peakUsed;
Atomic<int> _outof;
Atomic<int> _peakUsed;
bool _enabledDelinquent{false};
Milliseconds _delinquentMs{0};
DelinquentCallback _reportDelinquentOpCallback{nullptr};
@ -294,6 +278,9 @@ private:
WaitedAcquisitionCallback _reportWaitedAcquisitionOpCallback{nullptr};
ReleaseCallback _reportReleaseOpCallback{nullptr};
mongo::admission::execution_control::DelinquencyStats _delinquencyStats;
// Synchronization mechanism for waiters.
std::unique_ptr<TicketSemaphore> _semaphore;
};
/**
@ -332,7 +319,7 @@ public:
~Ticket() {
if (_ticketholder) {
_ticketholder->_releaseTicketUpdateStats(*this);
_ticketholder->_releaseTicket(*this);
}
}
@ -358,7 +345,7 @@ private:
Ticket(TicketHolder* ticketHolder, AdmissionContext* admissionContext)
: _ticketholder(ticketHolder), _admissionContext(admissionContext) {
_priority = admissionContext->getPriority();
_acquisitionTime = ticketHolder->_serviceContext->getTickSource()->getTicks();
_acquisitionTime = ticketHolder->_tickSource->getTicks();
}
/**
@ -382,10 +369,4 @@ private:
TickSource::Tick _acquisitionTime;
};
inline Ticket TicketHolder::_makeTicket(AdmissionContext* admCtx) {
// TODO(SERVER-92647): Move this to the Ticket constructor so it also applies to exempt tickets
admCtx->markTicketHeld();
return Ticket{this, admCtx};
}
} // namespace mongo

View File

@ -947,7 +947,7 @@ TEST_F(TicketHolderImmediateResizeTest, WaitQueueMax1) {
// We wait until ticketFuture is actually waiting for the ticket or until timeout exceeded
_opCtx->runWithDeadline(getNextDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
waitUntilCanceled(*_opCtx, [&] { return holder->waiting_forTest() == 1; });
waitUntilCanceled(*_opCtx, [&] { return holder->queued() == 1; });
});
// Since the maximum amount of ticket is one, and one is already waiting, it will throw
@ -1014,7 +1014,7 @@ TEST_F(TicketHolderImmediateResizeTest, WaitQueueMaxChange) {
// We wait until ticketFuture is actually waiting for the ticket or until timeout exceeded
_opCtx->runWithDeadline(getNextDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
waitUntilCanceled(*_opCtx, [&] { return holder->waiting_forTest() == 1; });
waitUntilCanceled(*_opCtx, [&] { return holder->queued() == 1; });
});
// Change the max queue depth to one. Since one is already waiting, the next acquisition attempt

View File

@ -0,0 +1,153 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/util/concurrency/unordered_ticket_semaphore.h"
#include "mongo/db/operation_context.h"
namespace mongo {
namespace {
Date_t nextDeadline(Date_t until) {
// Timed waits can be problematic if we have a large number of waiters, since each time we
// check for interrupt we risk waking up all waiting threads at the same time. We introduce
// some jitter here to try to reduce the impact of a thundering herd of waiters woken at
// the same time.
constexpr auto kBaseInterval = 500;
constexpr double kJitterFactor = 0.2;
static thread_local XorShift128 urbg(SecureRandom().nextInt64());
int32_t offset = std::uniform_int_distribution<int32_t>(-kJitterFactor * kBaseInterval,
kBaseInterval * kJitterFactor)(urbg);
return std::min(until, Date_t::now() + Milliseconds{kBaseInterval + offset});
}
} // namespace
bool UnorderedTicketSemaphore::tryAcquire() {
int available = _permits.load();
while (available > 0) {
if (_permits.compareAndSwap(&available, available - 1)) {
return true;
}
}
return false;
}
bool UnorderedTicketSemaphore::acquire(OperationContext* opCtx,
AdmissionContext* admCtx,
Date_t until,
bool interruptible) {
bool hasStartedWaiting = false;
ON_BLOCK_EXIT([&] {
if (hasStartedWaiting) {
_waiters.fetchAndSubtract(1);
}
});
while (true) {
if (tryAcquire()) {
return true;
}
Date_t deadline = nextDeadline(until);
if (!hasStartedWaiting) {
const auto previousWaiters = _waiters.fetchAndAdd(1);
hasStartedWaiting = true;
if (previousWaiters >= _maxWaiters.loadRelaxed()) {
admCtx->recordOperationLoadShed();
uasserted(
ErrorCodes::AdmissionQueueOverflow,
"MongoDB is overloaded and cannot accept new operations. Try again later.");
}
}
_permits.waitUntil(0, deadline);
if (interruptible) {
opCtx->checkForInterrupt();
}
if (deadline == until) {
if (!interruptible) {
// Uninterruptible wait - just return timeout
return false;
}
// It's possible that system_clock (used by BasicWaitableAtomic::waitUntil)
// is slightly ahead of FastClock (used by OperationContext::checkForInterrupt).
// Handle this clock skew by explicitly checking the deadline against until,
// similar to OperationContext::waitForConditionOrInterruptNoAssertUntil.
opCtx->checkForDeadlineExpired(until);
// We only reach here when 'until' < opCtx->getDeadline(), meaning the caller
// provided an explicit deadline that is earlier than the operation's maxTimeMS.
// In this case, we return false to indicate the caller's deadline expired
// (non-throwing), while the operation itself remains alive.
return false;
}
}
MONGO_UNREACHABLE;
}
void UnorderedTicketSemaphore::release() {
// Notifying a futex costs a syscall. Since queued waiters guarantee that the `_waiters` is
// non-zero while they are waiting, we can avoid the needless cost if there are permits and no
// queued waiters.
int availableBeforeIncrementing = _permits.fetchAndAdd(1);
if (availableBeforeIncrementing >= 0 && _waiters.load() > 0) {
_permits.notifyOne();
}
}
void UnorderedTicketSemaphore::resize(int delta) {
auto oldAvailable = _permits.fetchAndAdd(delta);
if ((oldAvailable <= 0) && ((oldAvailable + delta) > 0)) {
_permits.notifyMany(oldAvailable + delta);
}
}
int UnorderedTicketSemaphore::available() const {
return _permits.load();
}
void UnorderedTicketSemaphore::setMaxWaiters(int waiters) {
_maxWaiters.store(waiters);
}
int UnorderedTicketSemaphore::waiters() const {
return _waiters.load();
}
} // namespace mongo

View File

@ -0,0 +1,80 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/waitable_atomic.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/ticket_semaphore.h"
#include "mongo/util/modules.h"
#include "mongo/util/time_support.h"
#include <cstdint>
namespace mongo {
/**
* A Semaphore implementation where waiters compete for permits using a futex-based wait on an
* atomic permit counter.
*
* On each release, one sleeping waiter is woken to compete again against concurrent tryAcquire()
* callers. There is no fairness guarantee in this implementation; a newly arrived operation can
* claim a permit ahead of existing waiters.
*/
class UnorderedTicketSemaphore : public TicketSemaphore {
public:
UnorderedTicketSemaphore(int numPermits, int maxWaiters)
: _permits(numPermits), _maxWaiters(maxWaiters) {}
bool tryAcquire() override;
bool acquire(OperationContext* opCtx,
AdmissionContext* admCtx,
Date_t until,
bool interruptible) override;
void release() override;
void resize(int delta) override;
int available() const override;
void setMaxWaiters(int waiters) override;
int waiters() const override;
private:
BasicWaitableAtomic<int> _permits;
Atomic<int> _waiters{0};
Atomic<int> _maxWaiters;
};
} // namespace mongo