SERVER-127310 Remove blocking work on the connections server status section (#54357)
GitOrigin-RevId: 2bec2fd3ac7d4052b16341a4d62c89214ca1a8be
This commit is contained in:
parent
9a51c55912
commit
ca9ea08b23
@ -41,6 +41,7 @@
|
||||
#include "mongo/util/functional.h"
|
||||
#include "mongo/util/out_of_line_executor.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
@ -84,7 +85,7 @@ void ServiceExecutorReserved::start() {
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(_mutex);
|
||||
_stillRunning.store(true);
|
||||
_numStartingThreads = _reservedThreads;
|
||||
_numStartingThreads.store(_reservedThreads);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < _reservedThreads; i++) {
|
||||
@ -102,8 +103,8 @@ Status ServiceExecutorReserved::_startWorker() {
|
||||
_shutdownCondition.notify_one();
|
||||
});
|
||||
|
||||
_numStartingThreads--;
|
||||
_numReadyThreads++;
|
||||
_numStartingThreads.subtractAndFetch(1);
|
||||
_numReadyThreads.addAndFetch(1);
|
||||
|
||||
while (_stillRunning.load()) {
|
||||
_threadWakeup.wait(lk, [&] { return (!_stillRunning.load() || !_readyTasks.empty()); });
|
||||
@ -118,10 +119,11 @@ Status ServiceExecutorReserved::_startWorker() {
|
||||
|
||||
auto task = std::move(_readyTasks.front());
|
||||
_readyTasks.pop_front();
|
||||
_numReadyThreads -= 1;
|
||||
_numReadyThreads.subtractAndFetch(1);
|
||||
bool launchReplacement = false;
|
||||
if (_numReadyThreads + _numStartingThreads < _reservedThreads) {
|
||||
_numStartingThreads++;
|
||||
// lk is still needed here to prevent a toctou bug.
|
||||
if (_numReadyThreads.load() + _numStartingThreads.load() < _reservedThreads) {
|
||||
_numStartingThreads.addAndFetch(1);
|
||||
launchReplacement = true;
|
||||
}
|
||||
|
||||
@ -133,9 +135,7 @@ Status ServiceExecutorReserved::_startWorker() {
|
||||
LOGV2_WARNING(22981,
|
||||
"Could not start new reserve worker thread",
|
||||
"error"_attr = threadStartStatus);
|
||||
lk.lock();
|
||||
_numStartingThreads--;
|
||||
lk.unlock();
|
||||
_numStartingThreads.subtractAndFetch(1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -146,10 +146,10 @@ Status ServiceExecutorReserved::_startWorker() {
|
||||
}
|
||||
|
||||
lk.lock();
|
||||
if (_numReadyThreads + 1 > _reservedThreads) {
|
||||
if (_numReadyThreads.load() + 1 > _reservedThreads) {
|
||||
break;
|
||||
} else {
|
||||
_numReadyThreads += 1;
|
||||
_numReadyThreads.addAndFetch(1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -208,9 +208,11 @@ void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
|
||||
};
|
||||
|
||||
auto statlet = [&] {
|
||||
std::lock_guard lk(_mutex);
|
||||
auto threads = static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
|
||||
auto total = static_cast<int>(threads - _numReadyThreads - _numStartingThreads);
|
||||
auto ready = static_cast<int>(_numReadyThreads.loadRelaxed());
|
||||
auto starting = static_cast<int>(_numStartingThreads.loadRelaxed());
|
||||
// Clamp to 0 in case there is a race condition where ready + starting > threads.
|
||||
auto total = std::max(threads - ready - starting, 0);
|
||||
auto running = total;
|
||||
auto waiting = 0;
|
||||
return Statlet{threads, total, running, waiting};
|
||||
|
||||
@ -98,8 +98,8 @@ private:
|
||||
std::deque<Task> _readyTasks;
|
||||
|
||||
AtomicWord<unsigned> _numRunningWorkerThreads{0};
|
||||
size_t _numReadyThreads{0};
|
||||
size_t _numStartingThreads{0};
|
||||
Atomic<size_t> _numReadyThreads{0};
|
||||
Atomic<size_t> _numStartingThreads{0};
|
||||
|
||||
const std::string _name;
|
||||
const size_t _reservedThreads;
|
||||
|
||||
@ -434,8 +434,7 @@ bool SessionManagerCommon::waitForNoSessions(Milliseconds timeout) {
|
||||
}
|
||||
|
||||
std::size_t SessionManagerCommon::numOpenSessions() const {
|
||||
auto sync = _sessions->sync();
|
||||
return sync.size();
|
||||
return _sessions->size();
|
||||
}
|
||||
|
||||
std::vector<std::pair<SessionId, std::string>> SessionManagerCommon::getOpenSessionIDs() const {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user