SERVER-112800: Make replWriterThreadCount and replWriterMinThreadCount settable at runtime (#50696)

GitOrigin-RevId: 144a08b7c1587d40bd6d10036823f0f93bee7765
This commit is contained in:
Pierre Turin 2026-04-02 04:00:41 -07:00 committed by MongoDB Bot
parent 87e234a41a
commit 8bcd27de30
25 changed files with 1584 additions and 125 deletions

View File

@ -1,36 +0,0 @@
// This test ensures that the replWriterThreadCount server parameter:
// 1) cannot be less than 1
// 2) cannot be greater than 256
// 3) is actually set to the passed in value
// 4) cannot be altered at run time
// too low a count
clearRawMongoProgramOutput();
assert.throws(() => MongoRunner.runMongod({setParameter: 'replWriterThreadCount=0'}));
assert(
rawMongoProgramOutput().match(
"Invalid value for parameter replWriterThreadCount: 0 is not greater than or equal to 1"),
"mongod started with too low a value for replWriterThreadCount");
// too high a count
clearRawMongoProgramOutput();
assert.throws(() => MongoRunner.runMongod({setParameter: 'replWriterThreadCount=257'}));
assert(
rawMongoProgramOutput().match(
"Invalid value for parameter replWriterThreadCount: 257 is not less than or equal to 256"),
"mongod started with too high a value for replWriterThreadCount");
// proper count
clearRawMongoProgramOutput();
let mongo = MongoRunner.runMongod({setParameter: 'replWriterThreadCount=24'});
assert.neq(null, mongo, "mongod failed to start with a suitable replWriterThreadCount value");
assert(!rawMongoProgramOutput().match("Invalid value for parameter replWriterThreadCount"),
"despite accepting the replWriterThreadCount value, mongod logged an error");
// getParameter to confirm the value was set
var result = mongo.getDB("admin").runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(24, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
// setParameter to ensure it is not possible
assert.commandFailed(mongo.getDB("admin").runCommand({setParameter: 1, replWriterThreadCount: 1}));
MongoRunner.stopMongod(mongo);

View File

@ -0,0 +1,591 @@
// This test ensures that the replWriterThreadCount server parameter:
// 1) cannot be less than 1
// 2) cannot be greater than 256
// 3) is actually set to the passed in value
// 4) can be altered at run time
import {Thread} from "jstests/libs/parallelTester.js";
function testSettingParameter() {
// too low a count
clearRawMongoProgramOutput();
const tooLowThreadCount = 0;
assert.throws(
() => MongoRunner.runMongod({
replSet: "rs0",
setParameter: "replWriterThreadCount=" + tooLowThreadCount.toString()
}),
);
assert.soon(
() => rawMongoProgramOutput("Invalid value for parameter replWriterThreadCount"),
"mongod started with too low a value for replWriterThreadCount",
);
// too high a count
clearRawMongoProgramOutput();
const tooHighThreadCount = 257;
assert.throws(
() => MongoRunner.runMongod({
replSet: "rs0",
setParameter: "replWriterThreadCount=" + tooHighThreadCount.toString()
}),
);
assert.soon(
() => rawMongoProgramOutput("Invalid value for parameter replWriterThreadCount"),
"mongod started with too high a value for replWriterThreadCount",
);
// proper counts
clearRawMongoProgramOutput();
const acceptableMinThreadCount = 4;
const acceptableThreadCount = 24;
const conn = MongoRunner.runMongod({
replSet: "rs0",
setParameter: {
replWriterMinThreadCount: acceptableMinThreadCount,
replWriterThreadCount: acceptableThreadCount,
},
});
assert.neq(null, conn, "mongod failed to start with a suitable replWriterThreadCount value");
// Initialize replica set
const adminDB = conn.getDB("admin");
assert.commandWorked(
adminDB.runCommand({
replSetInitiate: {
_id: "rs0",
members: [{_id: 0, host: conn.host}],
},
}),
);
const hostInfoRes = adminDB.runCommand({hostInfo: 1});
const numCores = hostInfoRes.system.numCores;
jsTest.log("System info: numCores=" + numCores);
const threadCountCap = numCores * 2;
const actualExpectedMaxThreads = Math.min(acceptableThreadCount, threadCountCap);
// Wait for repl set initialization to finish
assert.soon(
() => rawMongoProgramOutput("11280000.*Starting thread pool")
.match(
'ReplWriterWorkerThreadPool.*"minThreads":' +
acceptableMinThreadCount.toString() +
'.*"maxThreads":' + actualExpectedMaxThreads.toString(),
),
"ReplWriterWorker thread pool did not start",
);
assert(
!rawMongoProgramOutput().match("Invalid value for parameter replWriterThreadCount"),
"despite accepting the replWriterThreadCount value, mongod logged an error",
);
assert(
!rawMongoProgramOutput().match("Invalid value for parameter replWriterMinThreadCount"),
"despite accepting the replWriterMinThreadCount value, mongod logged an error",
);
// getParameter to confirm the server parameter value:
{
const result = adminDB.runCommand(
{getParameter: 1, replWriterThreadCount: 1, replWriterMinThreadCount: 1});
assert.eq(acceptableThreadCount,
result.replWriterThreadCount,
"replWriterThreadCount was not set internally");
assert.eq(
acceptableMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally",
);
}
// Make sure using setParameter with a wrong max thread count still fails:
{
// Can't set max to 0
assert.commandFailed(
adminDB.runCommand({setParameter: 1, replWriterThreadCount: tooLowThreadCount}));
let result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
acceptableThreadCount,
result.replWriterThreadCount,
"replWriterThreadCount was overwritten by invalid value",
);
// Can't set max to less than min (4)
assert.commandFailed(
adminDB.runCommand(
{setParameter: 1, replWriterThreadCount: acceptableMinThreadCount - 1}),
);
result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
acceptableThreadCount,
result.replWriterThreadCount,
"replWriterThreadCount was overwritten by invalid value",
);
// Can't set max to >256
assert.commandFailed(
adminDB.runCommand({setParameter: 1, replWriterThreadCount: tooHighThreadCount}));
result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
acceptableThreadCount,
result.replWriterThreadCount,
"replWriterThreadCount was overwritten by invalid value",
);
}
// Make sure using setParameter with a wrong min thread count still fails:
{
// Can't set min to <0
assert.commandFailed(adminDB.runCommand({setParameter: 1, replWriterMinThreadCount: -3}));
let result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
acceptableMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was overwritten by invalid value",
);
// Can't set min to >max
assert.commandFailed(
adminDB.runCommand(
{setParameter: 1, replWriterMinThreadCount: acceptableThreadCount + 1}),
);
result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
acceptableMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was overwritten by invalid value",
);
// Can't set min to > pool size, even if max is higher
assert.commandFailed(
adminDB.runCommand(
{setParameter: 1, replWriterMinThreadCount: actualExpectedMaxThreads + 1}),
);
result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
acceptableMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was overwritten by invalid value",
);
}
// decrease maximum thread count
{
jsTest.log("Decreasing max thread count");
const lowerMax = actualExpectedMaxThreads - 1;
// If this is not the case, we cannot continue with the test
assert(
lowerMax >= acceptableMinThreadCount,
"Must have at least 3 cores available to run this test (current number: " + numCores +
")",
);
assert.commandWorked(
adminDB.runCommand({setParameter: 1, replWriterThreadCount: lowerMax}));
const result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
lowerMax, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
}
// increase maximum thread count
{
jsTest.log("Increasing max thread count");
assert.commandWorked(
adminDB.runCommand({setParameter: 1, replWriterThreadCount: actualExpectedMaxThreads}));
const result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
actualExpectedMaxThreads,
result.replWriterThreadCount,
"replWriterThreadCount was not set internally",
);
}
// increase minimum thread count
{
jsTest.log("Increasing min thread count");
const higherMin = acceptableMinThreadCount + 2;
// we have already ensured that there are at least 3 cores.
assert.commandWorked(
adminDB.runCommand({setParameter: 1, replWriterMinThreadCount: higherMin}));
const result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(higherMin,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally");
}
// decrease minimum thread count
{
jsTest.log("Decreasing min thread count");
const lowerMin = acceptableMinThreadCount;
assert.commandWorked(
adminDB.runCommand({setParameter: 1, replWriterMinThreadCount: lowerMin}));
const result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(lowerMin,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally");
}
// increase maximum thread count above number of cores * 2
{
jsTest.log("Increase maximum threads over number of cores * 2");
const maxAboveActualPoolMaxSize = threadCountCap + 4;
assert.commandWorked(adminDB.runCommand(
{setParameter: 1, replWriterThreadCount: maxAboveActualPoolMaxSize}));
const result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
maxAboveActualPoolMaxSize,
result.replWriterThreadCount,
"replWriterThreadCount was not set internally",
);
const matchParameters = '.*"replWriterThreadCount":"' +
maxAboveActualPoolMaxSize.toString() + '.*"maxThreads":"' + threadCountCap.toString() +
'.*"numCores":"' + numCores.toString();
jsTest.log("looking for " + matchParameters + " in test output");
assert.soon(
() =>
rawMongoProgramOutput(
"11280003.*replWriterThreadCount is set to higher than the max number of threads",
)
.match(matchParameters),
"mongod did not warn the user that the value will not take effect due to exceeding the number of cores times two",
);
}
// Concurrent update
{
for (let i = 0; i < 10; i++) {
jsTest.log("Concurrent update start: " + i);
const initMaxThreadCount = 8;
const initMinThreadCount = 6;
assert.commandWorked(
adminDB.runCommand({setParameter: 1, replWriterThreadCount: initMaxThreadCount}));
assert.commandWorked(adminDB.runCommand(
{setParameter: 1, replWriterMinThreadCount: initMinThreadCount}));
let result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(initMaxThreadCount,
result.replWriterThreadCount,
"replWriterThreadCount was not set internally");
result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
initMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally",
);
function newConn(port) {
const mongo = new Mongo("localhost:" + port);
assert(mongo);
const db = mongo.getDB("admin");
assert(db);
return db;
}
function setMaxThreadCount(newConn, port, value) {
const db = newConn(port);
db.runCommand({setParameter: 1, replWriterThreadCount: value});
}
function setMinThreadCount(newConn, port, value) {
const db = newConn(port);
db.runCommand({setParameter: 1, replWriterMinThreadCount: value});
}
const newMaxThreadCount = 5;
const newMinThreadCount = 1;
const maxThread = new Thread(setMaxThreadCount, newConn, conn.port, newMaxThreadCount);
const minThread = new Thread(setMinThreadCount, newConn, conn.port, newMinThreadCount);
maxThread.start();
minThread.start();
maxThread.join();
minThread.join();
// We don't assert that both params were successfully set since setting the max might
// have happened before setting the min, which would make it fail.
result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
const resMaxThreadCount = result.replWriterThreadCount;
result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
const resMinThreadCount = result.replWriterMinThreadCount;
assert.eq(
newMinThreadCount,
resMinThreadCount,
"replWriterMinThreadCount was not correctly set, " +
"replWriterMinThreadCount=" + resMinThreadCount +
"replWriterThreadCount=" + resMaxThreadCount,
);
assert(
resMaxThreadCount === initMaxThreadCount || resMaxThreadCount === newMaxThreadCount,
"replWriterThreadCount was incorrectly set, " +
"replWriterMinThreadCount=" + resMinThreadCount +
"replWriterThreadCount=" + resMaxThreadCount,
);
}
}
MongoRunner.stopMongod(conn);
}
function testOplogApplication() {
clearRawMongoProgramOutput();
const rst = new ReplSetTest({name: jsTest.name(), nodes: 2});
rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
const secondaries = rst.getSecondaries();
const dbName = "testDB";
const collName = jsTest.name();
const primaryDB = primary.getDB(dbName);
const primaryColl = primaryDB[collName];
const hostInfoRes = primaryDB.runCommand({hostInfo: 1});
const numCores = hostInfoRes.system.numCores;
jsTest.log("System info: numCores=" + numCores);
const twoThreadsPerCore = numCores * 2;
jsTest.log("Set thread pool size to two threads per core");
for (const secondary of secondaries) {
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterThreadCount: twoThreadsPerCore}));
let result = secondary.adminCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(twoThreadsPerCore,
result.replWriterThreadCount,
"replWriterThreadCount was not set internally");
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterMinThreadCount: twoThreadsPerCore}));
result = secondary.adminCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
twoThreadsPerCore,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally",
);
secondary.adminCommand({
setParameter: 1,
logComponentVerbosity: {replication: {verbosity: 2}},
});
secondary.adminCommand({
setParameter: 1,
logComponentVerbosity: {executor: {verbosity: 1}},
});
}
assert.commandWorked(primaryDB.createCollection(collName, {writeConcern: {w: "majority"}}));
const numWrites = 500;
assert.commandWorked(primaryColl.insert({_id: "writeAllDurable"}, {writeConcern: {w: 2}}));
for (let i = 0; i < numWrites; i++) {
assert.commandWorked(primaryColl.insert({_id: "majority2" + i}, {writeConcern: {w: 2}}));
}
assert.soon(
() => rawMongoProgramOutput("11280004.*Number of workers for oplog application")
.match(
'.*"nWorkers":' + twoThreadsPerCore.toString(),
),
"mongod did not print the correct thread pool size (2*number of cores) when applying oplog",
);
assert(
!rawMongoProgramOutput().match("Reaping this thread as we are above maxThreads"),
"Threads were reaped unexpectedly before the pool size was modified",
);
clearRawMongoProgramOutput();
jsTest.log("Decrease thread pool size to one thread per core");
for (const secondary of secondaries) {
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterMinThreadCount: numCores}));
let result = secondary.adminCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(numCores,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally");
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterThreadCount: numCores}));
result = secondary.adminCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
numCores, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
}
// Ensure thread pool size has changed
for (let i = numWrites; i < numWrites * 2; i++) {
assert.commandWorked(primaryColl.insert({_id: "majority2" + i}, {writeConcern: {w: 2}}));
}
assert.soon(
() => rawMongoProgramOutput("11280004.*Number of workers for oplog application")
.match(
'.*"nWorkers":' + numCores.toString(),
),
"mongod did not print the correct thread pool size (1*number of cores) when applying oplog",
);
// Check that threads above numCores were reaped
for (let i = numCores; i < twoThreadsPerCore; i++) {
const threadNum = i + 1;
assert.soon(
() => rawMongoProgramOutput("Reaping this thread as we are above maxThreads")
.match(
'"numThreads":' + threadNum + ',"maxThreads":' + numCores,
),
"the right number of threads were not reaped to meet maxThreads (" + threadNum +
" was not present)",
);
}
// Should not have reaped more than numCores threads
assert(
!rawMongoProgramOutput("Reaping this thread as we are above maxThreads")
.match(
'"numThreads":' + numCores + ',"maxThreads":' + numCores,
),
"the right number of threads were not reaped to meet maxThreads",
);
clearRawMongoProgramOutput();
jsTest.log("Increase thread pool size");
const newThreads = numCores + 2;
for (const secondary of secondaries) {
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterThreadCount: newThreads}));
let result = secondary.adminCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(newThreads,
result.replWriterThreadCount,
"replWriterThreadCount was not set internally");
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterMinThreadCount: newThreads}));
result = secondary.adminCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(newThreads,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally");
}
// check that new threads were spawned to meet minThreads
for (let i = numCores; i < newThreads; i++) {
assert.soon(
() => rawMongoProgramOutput("Spawning new thread as we are below minThreads")
.match(
'"numThreads":' + i + ',"minThreads":' + newThreads,
),
"the right number of threads were not spawned to meet minThreads",
);
}
// Check that we did not launch any extra threads
assert(
!rawMongoProgramOutput("Spawning new thread as we are below minThreads")
.match(
'"numThreads":' + newThreads + ',"minThreads":' + newThreads,
),
"the right number of threads were not spawned to meet minThreads",
);
// Ensure thread pool size has changed
for (let i = numWrites * 2; i < numWrites * 3 + 1; i++) {
assert.commandWorked(primaryColl.insert({_id: "majority2" + i}, {writeConcern: {w: 2}}));
}
assert.soon(
() => rawMongoProgramOutput("11280004.*Number of workers for oplog application")
.match(
'.*"nWorkers":' + newThreads.toString(),
),
"mongod did not print the correct thread pool size (1*number of cores) when applying oplog",
);
rst.stopSet();
}
function testPreparedTransactionWithThreadCountChange() {
jsTest.log(
"Testing prepared transaction with replWriterThreadCount change between prepare and commit");
clearRawMongoProgramOutput();
const rst = new ReplSetTest({name: jsTest.name() + "_preparedTxn", nodes: 2});
rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
const secondary = rst.getSecondary();
const dbName = "testPreparedTxnDB";
const collName = "testColl";
const primaryDB = primary.getDB(dbName);
assert.commandWorked(primaryDB.createCollection(collName, {writeConcern: {w: "majority"}}));
// Set replWriterThreadCount to maximum possible value on the secondary.
const hostInfoRes = primaryDB.runCommand({hostInfo: 1});
const numCores = hostInfoRes.system.numCores;
jsTest.log("System info: numCores=" + numCores);
const twoThreadsPerCore = numCores * 2;
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterThreadCount: twoThreadsPerCore}));
let result = secondary.adminCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(twoThreadsPerCore,
result.replWriterThreadCount,
"replWriterThreadCount was not set to initial value");
// Prepare but do not commit a transaction.
const session = primary.startSession();
const sessionDB = session.getDatabase(dbName);
const sessionColl = sessionDB.getCollection(collName);
session.startTransaction();
// Insert multiple documents to different collections/namespaces to increase the chance
// of using multiple writer threads.
const numInserts = twoThreadsPerCore * 10;
for (let i = 0; i < numInserts; i++) {
assert.commandWorked(sessionColl.insert({_id: i, data: "test" + i}));
}
// Prepare the transaction, storing the requesterIds based on the current writerVectors size (2
// * numCores).
const prepareTimestamp = assert
.commandWorked(
session.getDatabase("admin").adminCommand({
prepareTransaction: 1,
writeConcern: {w: "majority"},
}),
)
.prepareTimestamp;
jsTest.log("Transaction prepared with timestamp: " + tojson(prepareTimestamp));
// Wait for oplog application to the secondary.
rst.awaitReplication();
// Reduce the replWriterThreadCount on the secondary.
const reducedThreadCount = 1;
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterMinThreadCount: reducedThreadCount}));
assert.commandWorked(
secondary.adminCommand({setParameter: 1, replWriterThreadCount: reducedThreadCount}));
result = secondary.adminCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
reducedThreadCount, result.replWriterThreadCount, "replWriterThreadCount was not reduced");
jsTest.log("Reduced replWriterThreadCount from " + twoThreadsPerCore + " to " +
reducedThreadCount);
// Commit the transaction, to test that the server can handle a different number of writer
// vectors between prepare and commit.
const commitTimestamp = Timestamp(prepareTimestamp.getTime(), prepareTimestamp.getInc() + 1);
assert.commandWorked(
session.getDatabase("admin").adminCommand({
commitTransaction: 1,
commitTimestamp: commitTimestamp,
writeConcern: {w: "majority"},
}),
);
jsTest.log("Transaction committed successfully");
// Verify the data was replicated correctly.
rst.awaitReplication();
const secondaryDB = secondary.getDB(dbName);
const count = secondaryDB.getCollection(collName).count();
assert.eq(numInserts, count, "Expected " + numInserts + " documents on secondary after commit");
session.endSession();
rst.stopSet();
jsTest.log("testPreparedTransactionWithThreadCountChange passed");
}
testSettingParameter();
testOplogApplication();
testPreparedTransactionWithThreadCountChange();

View File

@ -641,7 +641,7 @@ mongo_cc_library(
"//src/mongo/db/repl:oplog_applier_batcher.cpp",
],
deps = [
":repl_server_parameters_idl",
":repl_writer_thread_pool_server_parameters",
"//src/mongo/db:server_base",
"//src/mongo/db/admission:execution_admission_context",
"//src/mongo/db/auth:authorization_manager_global",
@ -1237,6 +1237,30 @@ mongo_cc_library(
],
)
mongo_idl_library(
name = "repl_writer_thread_pool_server_parameters_idl",
src = "repl_writer_thread_pool_server_parameters.idl",
idl_deps = [
"//src/mongo/db:basic_types_idl_gen",
],
deps = [
"//src/mongo/db:basic_types_idl",
],
)
mongo_cc_library(
name = "repl_writer_thread_pool_server_parameters",
srcs = [
"repl_worker_pool_thread_count.cpp",
],
deps = [
":repl_coordinator_interface",
":repl_writer_thread_pool_server_parameters_idl",
"//src/mongo/util:processinfo",
"//src/mongo/util/concurrency:thread_pool",
],
)
mongo_cc_library(
name = "cloner_utils",
srcs = [
@ -1968,6 +1992,7 @@ mongo_cc_benchmark(
tags = ["repl_bm"],
deps = [
":repl_server_parameters_idl",
":repl_writer_thread_pool_server_parameters",
"//src/mongo/db:index_builds_coordinator_mongod",
"//src/mongo/db:server_base",
"//src/mongo/db:service_context_d",
@ -2032,6 +2057,20 @@ mongo_cc_library(
],
)
mongo_cc_unit_test(
name = "db_repl_worker_pool_thread_count_test",
srcs = [
"repl_worker_pool_thread_count_test.cpp",
],
tags = ["mongo_unittest_seventh_group"],
deps = [
":repl_writer_thread_pool_server_parameters",
"//src/mongo/stdx",
"//src/mongo/unittest",
"//src/mongo/util:processinfo",
],
)
mongo_cc_unit_test(
name = "db_repl_idempotency_test",
srcs = [
@ -2262,6 +2301,7 @@ mongo_idl_library(
"//src/mongo/db:basic_types_idl_gen",
],
deps = [
"//src/mongo/client:read_preference",
"//src/mongo/db:basic_types_idl",
],
)

View File

@ -30,10 +30,8 @@
#include <algorithm>
#include <boost/smart_ptr.hpp>
#include <functional>
#include <mutex>
#include <string>
#include <type_traits>
#include <utility>
#include <boost/move/utility_core.hpp>
@ -41,7 +39,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/repl_worker_pool_thread_count.h"
#include "mongo/logv2/log.h"
#include "mongo/logv2/log_attr.h"
#include "mongo/logv2/log_component.h"
@ -50,7 +48,6 @@
#include "mongo/util/debug_util.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/future_impl.h"
#include "mongo/util/processinfo.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@ -151,34 +148,15 @@ const OplogApplier::Options& OplogApplier::getOptions() const {
return _options;
}
std::unique_ptr<ThreadPool> makeReplWorkerPool() {
// Reduce content pinned in cache by single oplog batch on small machines by reducing the number
// of threads of ReplWriter to reduce the number of concurrent open WT transactions.
if (replWriterThreadCount < replWriterMinThreadCount) {
LOGV2_FATAL_NOTRACE(
5605400,
"replWriterMinThreadCount must be less than or equal to replWriterThreadCount",
"replWriterMinThreadCount"_attr = replWriterMinThreadCount,
"replWriterThreadCount"_attr = replWriterThreadCount);
}
auto numberOfThreads =
std::min(replWriterThreadCount, 2 * static_cast<int>(ProcessInfo::getNumAvailableCores()));
return makeReplWorkerPool(numberOfThreads);
}
std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount) {
return makeReplWorkerPool(threadCount, "ReplWriterWorker"_sd);
}
std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount,
std::unique_ptr<ThreadPool> makeReplWorkerPool(size_t threadCount,
StringData name,
bool isKillableByStepdown) {
ThreadPool::Options options;
options.threadNamePrefix = name + "-";
options.poolName = name + "ThreadPool";
options.minThreads =
replWriterMinThreadCount < threadCount ? replWriterMinThreadCount : threadCount;
options.maxThreads = static_cast<size_t>(threadCount);
options.minThreads = std::min(getMinThreadCountForReplWorkerPool(), threadCount);
options.maxThreads = threadCount;
options.onCreateThread = [isKillableByStepdown](const std::string&) {
Client::initThread(getThreadName(),
getGlobalServiceContext()->getService(ClusterRole::ShardServer));
@ -195,5 +173,13 @@ std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount,
return pool;
}
std::unique_ptr<ThreadPool> makeReplWorkerPool() {
return makeReplWorkerPool(getThreadCountForReplWorkerPool());
}
std::unique_ptr<ThreadPool> makeReplWorkerPool(size_t threadCount) {
return makeReplWorkerPool(threadCount, "ReplWriterWorker"_sd, false);
}
} // namespace repl
} // namespace mongo

View File

@ -37,7 +37,6 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_applier_batcher.h"
@ -47,7 +46,6 @@
#include "mongo/db/repl/storage_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/duration.h"
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
@ -263,15 +261,11 @@ public:
extern NoopOplogApplierObserver noopOplogApplierObserver;
/**
* Creates the default thread pool for writer tasks.
* Creates the thread pool for writer tasks.
*/
std::unique_ptr<ThreadPool> makeReplWorkerPool();
std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount);
/**
* Creates a thread pool suitable for writer tasks, with the specified name
*/
std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount,
std::unique_ptr<ThreadPool> makeReplWorkerPool(size_t threadCount);
std::unique_ptr<ThreadPool> makeReplWorkerPool(size_t threadCount,
StringData name,
bool isKillableByStepdown = false);

View File

@ -639,7 +639,12 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
// Increment the batch size stat.
oplogApplicationBatchSize.increment(ops.size());
std::vector<WorkerMultikeyPathInfo> multikeyVector(_workerPool->getStats().options.maxThreads);
// The worker pool's maxThreads can be updated at any point in time. So we record its current
// value here and use it to construct our different work vectors for the next batch of work to
// schedule.
const size_t nWorkers = _workerPool->getStats().options.maxThreads;
LOGV2_DEBUG(11280004, 2, "Number of workers for oplog application", "nWorkers"_attr = nWorkers);
std::vector<WorkerMultikeyPathInfo> multikeyVector(nWorkers);
{
// Each node records cumulative batch application stats for itself using this timer.
TimerHolder timer(&applyBatchStats);
@ -671,8 +676,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
// and create a pseudo oplog.
std::vector<std::vector<OplogEntry>> derivedOps;
std::vector<std::vector<ApplierOperation>> writerVectors(
_workerPool->getStats().options.maxThreads);
std::vector<std::vector<ApplierOperation>> writerVectors(nWorkers);
_fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
// Wait for oplog writes to finish.
@ -694,8 +698,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
_consistencyMarkers->getMinValid(opCtx) < ops.front().getOpTime();
{
std::vector<Status> statusVector(_workerPool->getStats().options.maxThreads,
Status::OK());
std::vector<Status> statusVector(nWorkers, Status::OK());
// Doles out all the work to the writer pool threads. writerVectors is not modified,
// but applyOplogBatchPerWorker will modify the vectors that it contains.
invariant(writerVectors.size() == statusVector.size());

View File

@ -392,7 +392,10 @@ void OplogApplierUtils::addDerivedCommitsOrAborts(
// When this commit refers to a split prepare, we split the commit and add them
// to the writers that have been assigned split prepare ops.
for (const auto& sessInfo : *sessionInfos) {
addToWriterVectorImpl(sessInfo.requesterId,
// The number of workers could have changed since the prepare phase: mod by list size to
// make sure we are still in bounds.
const auto idx = sessInfo.requesterId % writerVectors->size();
addToWriterVectorImpl(idx,
writerVectors,
commitOrAbortOp,
ApplicationInstruction::applySplitPreparedTxnOp,

View File

@ -81,6 +81,7 @@
#include "mongo/db/repl/oplog_writer_impl.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/repl_writer_thread_pool_server_parameters_gen.h"
#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"

View File

@ -33,6 +33,7 @@ global:
cpp_includes:
- "mongo/client/read_preference.h"
- "mongo/client/read_preference_validators.h"
- "mongo/db/repl/repl_worker_pool_thread_count.h"
imports:
- "mongo/db/basic_types.idl"
@ -277,29 +278,6 @@ server_parameters:
gte: 0
redact: false
# From oplog_applier.cpp
replWriterThreadCount:
description: The number of threads in the thread pool used to apply the oplog
set_at: startup
cpp_vartype: int
cpp_varname: replWriterThreadCount
default: 16
validator:
gte: 1
lte: 256
redact: false
replWriterMinThreadCount:
description: The minimum number of threads in the thread pool used to apply the oplog
set_at: startup
cpp_vartype: int
cpp_varname: replWriterMinThreadCount
default: 0
validator:
gte: 0
lte: 256
redact: false
replBatchLimitOperations:
description: The maximum number of operations to apply in a single batch
set_at: [ startup, runtime ]

View File

@ -0,0 +1,184 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/repl/repl_worker_pool_thread_count.h"
#include "mongo/db/repl/repl_writer_thread_pool_server_parameters_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/mutex.h"
#include "mongo/util/processinfo.h"
#include <algorithm>
#include <boost/optional/optional.hpp>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
namespace mongo {
namespace repl {
size_t getMinThreadCountForReplWorkerPool() {
return static_cast<size_t>(replWriterMinThreadCount);
}
size_t getThreadCountForReplWorkerPool() {
return std::min(static_cast<size_t>(replWriterThreadCount),
static_cast<size_t>(2 * ProcessInfo::getNumAvailableCores()));
}
namespace {
/* Mutex used to prevent concurrent setting of replWriterThreadCount and replWriterMinThreadCount.
* This is the workflow of setting one of the parameters:
* 1. validateUpdateXXX() is called before setting the param value. We lock the mutex.
* 2. param is set.
* 3. onUpdateXXX() is called after setting the param value. We unlock the mutex.
*/
mongo::Mutex threadCountParamsMutex;
// We are using a boost::optional in order to hold the unique_lock between step 1 and 3.
boost::optional<stdx::unique_lock<mongo::Mutex>> threadCountParamsLocker;
} // namespace
Status validateUpdateReplWriterThreadCount(const int count, const boost::optional<TenantId>&) {
// This range check must be the same as in the repl_writer_thread_pool_server_parameters.idl
// file. We validate it here to avoid locking the mutex if the value is out of range.
if (count <= 0) {
return Status(ErrorCodes::BadValue,
str::stream() << "Invalid value for parameter replWriterThreadCount: "
"must be greater than 0");
}
if (count > 256) {
return Status(ErrorCodes::BadValue,
str::stream() << "Invalid value for parameter replWriterThreadCount: "
"must be less than or equal to 256");
}
stdx::unique_lock<mongo::Mutex> lk(threadCountParamsMutex);
if (count < replWriterMinThreadCount) {
return Status(ErrorCodes::BadValue,
str::stream() << "replWriterThreadCount must be greater or equal to '"
<< replWriterMinThreadCount
<< "', which is the current value of replWriterMinThreadCount");
}
size_t newCount = static_cast<size_t>(count);
size_t numCores = ProcessInfo::getNumAvailableCores();
size_t maxThreads = 2 * numCores;
if (newCount > maxThreads) {
LOGV2_WARNING(11280003,
"replWriterThreadCount is set to higher than the max number of threads for "
"the writer pool, which is 2 * the number of cores available. The pool size "
"will be capped at 2 * the number of cores.",
"replWriterThreadCount"_attr = std::to_string(newCount),
"maxThreads"_attr = std::to_string(maxThreads),
"numCores"_attr = std::to_string(numCores));
}
// Moving ownership of the lock while leaving the mutex locked
threadCountParamsLocker.emplace(std::move(lk));
return Status::OK();
}
Status validateUpdateReplWriterMinThreadCount(const int count, const boost::optional<TenantId>&) {
// This range check must be the same as in the repl_writer_thread_pool_server_parameters.idl
// file. We validate it here to avoid locking the mutex if the value is out of range.
if (count < 0) {
return Status(ErrorCodes::BadValue,
str::stream() << "Invalid value for parameter replWriterMinThreadCount: "
"must be greater or equal to 0");
}
if (count > 256) {
return Status(ErrorCodes::BadValue,
str::stream() << "Invalid value for parameter replWriterMinThreadCount: "
"must be less than or equal to 256");
}
stdx::unique_lock<mongo::Mutex> lk(threadCountParamsMutex);
size_t newCount = static_cast<size_t>(count);
// May be replWriterThreadCount, or may be capped by the number of CPUs
size_t poolActualSize = getThreadCountForReplWorkerPool();
if (newCount > poolActualSize) {
return Status(ErrorCodes::BadValue,
str::stream() << "replWriterMinThreadCount must be less than or equal to '"
<< poolActualSize
<< "', which is the current max threads for the thread pool");
}
// Moving ownership of the lock while leaving the mutex locked
threadCountParamsLocker.emplace(std::move(lk));
return Status::OK();
}
Status onUpdateReplWriterThreadCount(const int) {
// Reduce content pinned in cache by single oplog batch on small machines by reducing the number
// of threads of ReplWriter to reduce the number of concurrent open WT transactions.
// Here we adopt the ownership of the locker without locking the underlying mutex as it was
// previously locked by validateUpdateReplWriterThreadCount().
stdx::unique_lock<mongo::Mutex> lk(std::move(threadCountParamsLocker.get()));
threadCountParamsLocker.reset();
if (hasGlobalServiceContext()) {
// If the global service context is set, then we're past startup, so we need to update the
// repl worker thread pool.
auto replCoord = ReplicationCoordinator::get(getGlobalServiceContext());
auto replWorkThreadPool = replCoord->getDbWorkThreadPool();
if (replWorkThreadPool) {
replWorkThreadPool->setMaxThreads(getThreadCountForReplWorkerPool());
}
}
return Status::OK();
}
Status onUpdateReplWriterMinThreadCount(const int) {
// Here we adopt the ownership of the locker without locking the underlying mutex as it was
// previously locked by validateUpdateReplWriterMinThreadCount().
stdx::unique_lock<mongo::Mutex> lk(std::move(threadCountParamsLocker.get()));
threadCountParamsLocker.reset();
if (hasGlobalServiceContext()) {
// If the global service context is set, then we're past startup, so we need to update the
// repl worker thread pool.
auto replCoord = ReplicationCoordinator::get(getGlobalServiceContext());
auto replWorkThreadPool = replCoord->getDbWorkThreadPool();
if (replWorkThreadPool) {
replWorkThreadPool->setMinThreads(getMinThreadCountForReplWorkerPool());
}
}
return Status::OK();
}
} // namespace repl
} // namespace mongo

View File

@ -0,0 +1,49 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/base/status.h"
#include "mongo/db/tenant_id.h"
#include <boost/optional/optional.hpp>
namespace mongo {
namespace repl {
size_t getMinThreadCountForReplWorkerPool();
size_t getThreadCountForReplWorkerPool();
Status validateUpdateReplWriterThreadCount(int count, const boost::optional<TenantId>&);
Status onUpdateReplWriterThreadCount(int);
Status validateUpdateReplWriterMinThreadCount(int count, const boost::optional<TenantId>&);
Status onUpdateReplWriterMinThreadCount(int);
} // namespace repl
} // namespace mongo

View File

@ -0,0 +1,176 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/repl/repl_worker_pool_thread_count.h"
#include "mongo/db/repl/repl_writer_thread_pool_server_parameters_gen.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/processinfo.h"
#include <algorithm>
namespace mongo {
namespace repl {
namespace {
class ReplWorkerPoolThreadCountParamsTest : public unittest::Test {
public:
ReplWorkerPoolThreadCountParamsTest()
: _threadCountCap(
std::min(static_cast<int>(2 * ProcessInfo::getNumAvailableCores()), 256)) {}
void setUp() override {
ASSERT_GTE(_threadCountCap, 2);
setMinThreadCount(0);
setMaxThreadCount(16);
}
protected:
void setMinThreadCount(int newValue) {
ASSERT_OK(validateUpdateReplWriterMinThreadCount(newValue, boost::none));
replWriterMinThreadCount = newValue;
ASSERT_OK(onUpdateReplWriterMinThreadCount(newValue));
ASSERT_EQ(newValue, getMinThreadCountForReplWorkerPool());
}
void setMaxThreadCount(int newValue) {
ASSERT_OK(validateUpdateReplWriterThreadCount(newValue, boost::none));
replWriterThreadCount = newValue;
ASSERT_OK(onUpdateReplWriterThreadCount(newValue));
ASSERT_EQ(std::min(newValue, _threadCountCap), getThreadCountForReplWorkerPool());
}
int _threadCountCap;
};
TEST_F(ReplWorkerPoolThreadCountParamsTest, SetGetSuccess) {
setMinThreadCount(0);
setMaxThreadCount(1);
setMaxThreadCount(2);
setMinThreadCount(2);
setMaxThreadCount(10);
setMaxThreadCount(100);
setMaxThreadCount(256);
setMinThreadCount(_threadCountCap);
setMinThreadCount(_threadCountCap - 1);
}
TEST_F(ReplWorkerPoolThreadCountParamsTest, SetFailure) {
setMaxThreadCount(16);
setMinThreadCount(2);
// max < min
ASSERT_NOT_OK(validateUpdateReplWriterThreadCount(1, boost::none));
// min > max
ASSERT_NOT_OK(validateUpdateReplWriterMinThreadCount(17, boost::none));
ASSERT_NOT_OK(validateUpdateReplWriterMinThreadCount(_threadCountCap + 1, boost::none));
// max out of bound
ASSERT_NOT_OK(validateUpdateReplWriterThreadCount(-1, boost::none));
ASSERT_NOT_OK(validateUpdateReplWriterThreadCount(0, boost::none));
ASSERT_NOT_OK(validateUpdateReplWriterThreadCount(257, boost::none));
ASSERT_NOT_OK(validateUpdateReplWriterThreadCount(1000, boost::none));
// min out of bound
ASSERT_NOT_OK(validateUpdateReplWriterMinThreadCount(-1, boost::none));
ASSERT_NOT_OK(validateUpdateReplWriterMinThreadCount(257, boost::none));
ASSERT_NOT_OK(validateUpdateReplWriterMinThreadCount(1000, boost::none));
}
TEST_F(ReplWorkerPoolThreadCountParamsTest, SetConcurrentSuccess) {
const int newMinValue = 1;
const int newMaxValue = _threadCountCap;
// Start setting the min (this takes the lock)
ASSERT_OK(validateUpdateReplWriterMinThreadCount(newMinValue, boost::none));
stdx::thread setMaxThread([&] {
// Set the max
// This should block until the min has finished being set and the lock has been released.
// This will take the lock.
ASSERT_OK(validateUpdateReplWriterThreadCount(newMaxValue, boost::none));
// Now the min should have finished being set.
ASSERT_EQ(newMinValue, getMinThreadCountForReplWorkerPool());
// Finish setting the max
replWriterThreadCount = newMaxValue;
// This will release the lock
ASSERT_OK(onUpdateReplWriterThreadCount(newMaxValue));
ASSERT_EQ(newMaxValue, getThreadCountForReplWorkerPool());
});
// Finish setting the min
replWriterMinThreadCount = newMinValue;
// This will release the lock
ASSERT_OK(onUpdateReplWriterMinThreadCount(newMinValue));
ASSERT_EQ(newMinValue, getMinThreadCountForReplWorkerPool());
// Now setting the max can go through
setMaxThread.join();
}
TEST_F(ReplWorkerPoolThreadCountParamsTest, SetConcurrentFailure) {
const int newMinValue = 3;
const int newMaxValue = 2;
// Start setting the max (this takes the lock)
ASSERT_OK(validateUpdateReplWriterThreadCount(newMaxValue, boost::none));
stdx::thread setMinThread([&] {
// Try setting the min
// This should block until the max has finished being set and the lock has been released.
// This will take the lock and immediately release it since the validation should fail.
// This should fail as min > max.
ASSERT_NOT_OK(validateUpdateReplWriterMinThreadCount(newMinValue, boost::none));
// Now the max should have been set.
ASSERT_EQ(newMaxValue, getThreadCountForReplWorkerPool());
});
// Finish setting the max
replWriterThreadCount = newMaxValue;
// This will release the lock
ASSERT_OK(onUpdateReplWriterThreadCount(newMaxValue));
ASSERT_EQ(newMaxValue, getThreadCountForReplWorkerPool());
// Now trying to set the min can go through
setMinThread.join();
}
} // namespace
} // namespace repl
} // namespace mongo

View File

@ -0,0 +1,69 @@
# Copyright (C) 2025-present MongoDB, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Server Side Public License, version 1,
# as published by MongoDB, Inc.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Server Side Public License for more details.
#
# You should have received a copy of the Server Side Public License
# along with this program. If not, see
# <http://www.mongodb.com/licensing/server-side-public-license>.
#
# As a special exception, the copyright holders give permission to link the
# code of portions of this program with the OpenSSL library under certain
# conditions as described in each individual source file and distribute
# linked combinations including the program with the OpenSSL library. You
# must comply with the Server Side Public License in all respects for
# all of the code used other than as permitted herein. If you modify file(s)
# with this exception, you may extend this exception to your version of the
# file(s), but you are not obligated to do so. If you do not wish to do so,
# delete this exception statement from your version. If you delete this
# exception statement from all source files in the program, then also delete
# it in the license file.
#
# Server parameters related to the replication thread pool used by oplog appliers/writers
global:
cpp_namespace: "mongo::repl"
cpp_includes:
- "mongo/db/repl/repl_worker_pool_thread_count.h"
imports:
- "mongo/db/basic_types.idl"
server_parameters:
# From repl_worker_pool_thread_count.cpp
replWriterThreadCount:
description: The number of threads in the thread pool used to apply the oplog
set_at: [startup, runtime]
cpp_vartype: int
cpp_varname: replWriterThreadCount
on_update: onUpdateReplWriterThreadCount
default: 16
validator:
# This range is also enforced in validateUpdateReplWriterThreadCount(),
# both should validate the same range.
gte: 1
lte: 256
callback: validateUpdateReplWriterThreadCount
redact: false
replWriterMinThreadCount:
description: The minimum number of threads in the thread pool used to apply the oplog
set_at: [startup, runtime]
cpp_vartype: int
cpp_varname: replWriterMinThreadCount
on_update: onUpdateReplWriterMinThreadCount
default: 0
validator:
# This range is also enforced in validateUpdateReplWriterMinThreadCount(),
# both should validate the same range.
gte: 0
lte: 256
callback: validateUpdateReplWriterMinThreadCount
redact: false

View File

@ -68,6 +68,7 @@
#include "mongo/executor/task_executor.h"
#include "mongo/platform/compiler.h"
#include "mongo/rpc/topology_version_gen.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/duration.h"
#include "mongo/util/future.h"
#include "mongo/util/interruptible.h"
@ -693,6 +694,13 @@ public:
*/
virtual void appendSecondaryInfoData(BSONObjBuilder* result) = 0;
/**
* Returns the ThreadPool used by replication to apply the sync source's operations in parallel
* (in OplogApplier) and to clone the databases and collections during initial sync.
* Note: the returned pointer can be null if called before the replication logic was started.
*/
virtual ThreadPool* getDbWorkThreadPool() const = 0;
/**
* Returns a copy of the current ReplSetConfig.
*

View File

@ -103,7 +103,7 @@ public:
virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const = 0;
/**
* Returns shared db worker thread pool for collection cloning.
* Returns shared db worker thread pool for collection cloning and oplog applier.
*/
virtual ThreadPool* getDbWorkThreadPool() const = 0;

View File

@ -272,7 +272,7 @@ private:
// Task executor used to run replication tasks.
std::shared_ptr<executor::TaskExecutor> _taskExecutor;
// Used by repl::applyOplogBatch() to apply the sync source's operations in parallel.
// Used by OplogApplier::applyOplogBatch() to apply the sync source's operations in parallel.
// Also used by database and collection cloners to perform storage operations.
// Cloners and oplog application run in separate phases of initial sync so it is fine to share
// this thread pool.

View File

@ -3667,6 +3667,10 @@ void ReplicationCoordinatorImpl::appendSecondaryInfoData(BSONObjBuilder* result)
_topCoord->fillMemberData(result);
}
ThreadPool* ReplicationCoordinatorImpl::getDbWorkThreadPool() const noexcept {
return _externalState->getDbWorkThreadPool();
}
ReplSetConfig ReplicationCoordinatorImpl::getConfig() const {
return _getReplSetConfig();
}

View File

@ -298,6 +298,8 @@ public:
void appendSecondaryInfoData(BSONObjBuilder* result) override;
ThreadPool* getDbWorkThreadPool() const noexcept override;
ReplSetConfig getConfig() const override;
ReplSetConfig getConfig(WithLock) const;

View File

@ -462,6 +462,10 @@ StatusWith<BSONObj> ReplicationCoordinatorMock::prepareReplSetUpdatePositionComm
return cmdBuilder.obj();
}
ThreadPool* ReplicationCoordinatorMock::getDbWorkThreadPool() const noexcept {
return nullptr;
}
ReplSetConfig ReplicationCoordinatorMock::getConfig() const {
stdx::lock_guard<Mutex> lk(_mutex);

View File

@ -254,6 +254,8 @@ public:
void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
ThreadPool* getDbWorkThreadPool() const noexcept override;
ReplSetConfig getConfig() const override;
ConnectionString getConfigConnectionString() const override;

View File

@ -313,6 +313,10 @@ void ReplicationCoordinatorNoOp::appendSecondaryInfoData(BSONObjBuilder*) {
MONGO_UNREACHABLE;
}
ThreadPool* ReplicationCoordinatorNoOp::getDbWorkThreadPool() const noexcept {
MONGO_UNREACHABLE;
}
ReplSetConfig ReplicationCoordinatorNoOp::getConfig() const {
MONGO_UNREACHABLE;
}

View File

@ -220,6 +220,8 @@ public:
void appendSecondaryInfoData(BSONObjBuilder*) final;
ThreadPool* getDbWorkThreadPool() const noexcept final;
ReplSetConfig getConfig() const final;
ConnectionString getConfigConnectionString() const final;

View File

@ -77,6 +77,26 @@ std::string threadIdToString(stdx::thread::id id) {
return oss.str();
}
/**
* Check the options limits, and fassert if they don't make sense.
*/
void checkOptionsLimits(const ThreadPool::Options& options) {
if (options.maxThreads < 1) {
LOGV2_FATAL(28702,
"Cannot configure pool with maximum number of threads less than 1",
"poolName"_attr = options.poolName,
"maxThreads"_attr = options.maxThreads);
}
if (options.minThreads > options.maxThreads) {
LOGV2_FATAL(28686,
"Cannot configure pool with minimum number of threads larger than the "
"maximum",
"poolName"_attr = options.poolName,
"minThreads"_attr = options.minThreads,
"maxThreads"_attr = options.maxThreads);
}
}
/**
* Sets defaults and checks bounds limits on "options", and returns it.
*
@ -89,20 +109,7 @@ ThreadPool::Options cleanUpOptions(ThreadPool::Options&& options) {
if (options.threadNamePrefix.empty()) {
options.threadNamePrefix = "{}-"_format(options.poolName);
}
if (options.maxThreads < 1) {
LOGV2_FATAL(28702,
"Cannot create pool with maximum number of threads less than 1",
"poolName"_attr = options.poolName,
"maxThreads"_attr = options.maxThreads);
}
if (options.minThreads > options.maxThreads) {
LOGV2_FATAL(28686,
"Cannot create pool with minimum number of threads larger than the "
"configured maximum",
"poolName"_attr = options.poolName,
"minThreads"_attr = options.minThreads,
"maxThreads"_attr = options.maxThreads);
}
checkOptionsLimits(options);
return {std::move(options)};
}
@ -120,6 +127,8 @@ public:
void schedule(Task task);
void waitForIdle();
Stats getStats() const;
void setMaxThreads(size_t maxThreads);
void setMinThreads(size_t minThreads);
private:
/**
@ -199,8 +208,8 @@ private:
*/
void _joinRetired_inlock();
// These are the options with which the pool was configured at construction time.
const Options _options;
// These are the options with which the pool is configured.
Options _options;
// Mutex guarding all non-const member variables.
mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ThreadPool::_mutex");
@ -253,7 +262,9 @@ ThreadPool::Impl::~Impl() {
}
if (_state != shutdownComplete) {
LOGV2_FATAL(28704, "Failed to shutdown pool during destruction");
LOGV2_FATAL(28704,
"Failed to shutdown pool during destruction",
"poolName"_attr = _options.poolName);
}
invariant(_threads.empty());
invariant(_pendingTasks.empty());
@ -266,6 +277,12 @@ void ThreadPool::Impl::startup() {
"Attempted to start pool that has already started",
"poolName"_attr = _options.poolName);
}
LOGV2(11280000,
"Starting thread pool",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads,
"maxThreads"_attr = _options.maxThreads);
_setState_inlock(running);
invariant(_threads.empty());
size_t numToStart = std::clamp(_pendingTasks.size(), _options.minThreads, _options.maxThreads);
@ -382,7 +399,7 @@ void ThreadPool::Impl::schedule(Task task) {
if (_state == preStart) {
return;
}
if (_numIdleThreads < _pendingTasks.size()) {
if (_numIdleThreads < _pendingTasks.size() && _threads.size() < _options.maxThreads) {
_startWorkerThread_inlock();
}
if (_numIdleThreads <= _pendingTasks.size()) {
@ -436,6 +453,20 @@ void ThreadPool::Impl::_workerThreadBody(const std::string& threadName) noexcept
void ThreadPool::Impl::_consumeTasks() {
stdx::unique_lock<Latch> lk(_mutex);
while (_state == running) {
if (_threads.size() > _options.maxThreads) {
LOGV2_DEBUG(23114,
1,
"Reaping this thread as we are above maxThreads",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"maxThreads"_attr = _options.maxThreads);
// Wake up someone else if there is work to do, as we will be exiting without doing it.
if (!_pendingTasks.empty()) {
_workAvailable.notify_one();
}
break;
}
if (!_pendingTasks.empty()) {
_doOneTask(&lk);
continue;
@ -459,6 +490,7 @@ void ThreadPool::Impl::_consumeTasks() {
LOGV2_DEBUG(23106,
1,
"Reaping this thread",
"poolName"_attr = _options.poolName,
"nextThreadRetirementDate"_attr =
_lastFullUtilizationDate + _options.maxIdleThreadAge);
break;
@ -467,6 +499,7 @@ void ThreadPool::Impl::_consumeTasks() {
LOGV2_DEBUG(23107,
3,
"Not reaping this thread",
"poolName"_attr = _options.poolName,
"nextThreadRetirementDate"_attr = nextRetirement);
waitDeadline = nextRetirement;
} else {
@ -477,6 +510,7 @@ void ThreadPool::Impl::_consumeTasks() {
LOGV2_DEBUG(23108,
3,
"Waiting for work",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads);
}
@ -513,7 +547,7 @@ void ThreadPool::Impl::_consumeTasks() {
"expectedState"_attr = static_cast<int32_t>(running));
}
// This thread is ending because it was idle for too long.
// This thread is ending because it was idle for too long, or we were over maxThreads.
// Move self from _threads to _retiredThreads.
auto selfId = stdx::this_thread::get_id();
auto pos = std::find_if(
@ -603,6 +637,47 @@ void ThreadPool::Impl::_setState_inlock(const LifecycleState newState) {
_stateChange.notify_all();
}
void ThreadPool::Impl::setMinThreads(size_t minThreads) {
stdx::lock_guard<Latch> lk(_mutex);
const auto oldMinThreads = _options.minThreads;
_options.minThreads = minThreads;
checkOptionsLimits(_options);
LOGV2(11280001,
"setting thread pool minThreads",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads,
"maxThreads"_attr = _options.maxThreads,
"old minThreads"_attr = oldMinThreads);
// Check if we need to create new threads
while (_threads.size() < _options.minThreads) {
LOGV2_DEBUG(1280005,
1,
"Spawning new thread as we are below minThreads",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads,
"maxThreads"_attr = _options.maxThreads);
_startWorkerThread_inlock();
}
}
void ThreadPool::Impl::setMaxThreads(size_t maxThreads) {
stdx::lock_guard<Latch> lk(_mutex);
const auto oldMaxThreads = _options.maxThreads;
_options.maxThreads = maxThreads;
checkOptionsLimits(_options);
LOGV2(11280002,
"setting thread pool maxThreads",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads,
"maxThreads"_attr = _options.maxThreads,
"old maxThreads"_attr = oldMaxThreads);
// Reaping extra threads will automatically be done in _consumeTasks().
}
// ========================================
// ThreadPool public functions that simply forward to the `_impl`.
@ -634,4 +709,12 @@ ThreadPool::Stats ThreadPool::getStats() const {
return _impl->getStats();
}
void ThreadPool::setMinThreads(size_t minThreads) {
_impl->setMinThreads(minThreads);
}
void ThreadPool::setMaxThreads(size_t maxThreads) {
_impl->setMaxThreads(maxThreads);
}
} // namespace mongo

View File

@ -175,6 +175,20 @@ public:
*/
Stats getStats() const;
/**
* Set the minimum number of threads for this ThreadPool.
* Calling this method will spin up new threads if the new minimum is greater than the current
* number of threads.
*/
void setMinThreads(size_t minThreads);
/**
* Set the maximum number of threads for this ThreadPool.
* Calling this method will cause threads to be reaped once they finish their tasks if more than
* the maximum are running.
*/
void setMaxThreads(size_t maxThreads);
private:
class Impl;
std::unique_ptr<Impl> _impl;

View File

@ -199,7 +199,7 @@ TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) {
DEATH_TEST_REGEX(ThreadPoolTest,
MaxThreadsTooFewDies,
"Cannot create pool.*with maximum number of threads.*less than 1") {
"Cannot configure pool.*with maximum number of threads.*less than 1") {
ThreadPool::Options options;
options.maxThreads = 0;
ThreadPool pool(options);
@ -208,7 +208,7 @@ DEATH_TEST_REGEX(ThreadPoolTest,
DEATH_TEST_REGEX(
ThreadPoolTest,
MinThreadsTooManyDies,
R"#(.*Cannot create pool.*with minimum number of threads.*larger than the configured maximum.*minThreads":6,"maxThreads":5)#") {
R"#(.*Cannot configure pool.*with minimum number of threads.*larger than the maximum.*minThreads":6,"maxThreads":5)#") {
ThreadPool::Options options;
options.maxThreads = 5;
options.minThreads = 6;
@ -327,7 +327,305 @@ TEST(ThreadPoolTest, JoinAllRetiredThreads) {
ASSERT_EQ(pool.getStats().numIdleThreads, 0);
}
TEST(ThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) {
DEATH_TEST_REGEX_F(
ThreadPoolTest,
ModifyMinThreadsGreaterThanMax,
R"re(.*Cannot configure pool.*with minimum number of threads.*larger than the maximum.*minThreads":7,"maxThreads":5)re") {
ThreadPool::Options options;
options.maxThreads = 5;
options.minThreads = 3;
auto& pool = makePool(options);
const size_t newMinThreads = 7;
pool.setMinThreads(newMinThreads);
}
DEATH_TEST_REGEX_F(
ThreadPoolTest,
ModifyMaxLessThanMin,
R"re(.*Cannot configure pool.*with minimum number of threads.*larger than the maximum.*minThreads":3,"maxThreads":2)re") {
ThreadPool::Options options;
options.maxThreads = 5;
options.minThreads = 3;
auto& pool = makePool(options);
const size_t newMaxThreads = 2;
pool.setMaxThreads(newMaxThreads);
}
DEATH_TEST_REGEX_F(ThreadPoolTest,
ModifyMaxToZero,
"Cannot configure pool.*with maximum number of threads.*less than 1") {
ThreadPool::Options options;
options.maxThreads = 5;
options.minThreads = 0;
auto& pool = makePool(options);
const size_t newMaxThreads = 0;
pool.setMaxThreads(newMaxThreads);
}
TEST_F(ThreadPoolTest, ModifyMinThreads) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
options.minThreads = 4;
options.maxThreads = 8;
options.maxIdleThreadAge = Milliseconds(100);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
unittest::Barrier barrier(options.maxThreads + 1);
auto& pool = makePool(options);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
barrier.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads);
// Modify to lower value
// reset # of retired threads
retiredThreads.store(0);
// barrier was reset when counter reached 0
const size_t newMinThreads = 2;
pool.setMinThreads(newMinThreads);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
barrier.countDownAndWait();
while (pool.getStats().numThreads > newMinThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, newMinThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - newMinThreads);
// modify to higher value
// reset # of retired threads
retiredThreads.store(0);
// barrier was reset when counter reached 0
const size_t higherMinThreads = 6;
pool.setMinThreads(higherMinThreads);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
barrier.countDownAndWait();
while (pool.getStats().numThreads > higherMinThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, higherMinThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - higherMinThreads);
pool.shutdown();
pool.join();
}
TEST_F(ThreadPoolTest, DecreaseMaxThreadsAndDoLessWork) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
const size_t originalMaxThreads = 8;
options.minThreads = 4;
options.maxThreads = originalMaxThreads;
options.maxIdleThreadAge = Milliseconds(1000);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
auto& pool = makePool(options);
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
unittest::Barrier barrier(options.maxThreads + 1);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
barrier.countDownAndWait();
// No threads should have retired yet.
ASSERT_EQ(retiredThreads.load(), 0);
// Modify maxThreads to a lower value;
const size_t lowerMaxThreads = 4;
pool.setMaxThreads(lowerMaxThreads);
unittest::Barrier barrier2(2);
// Schedule only one task
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier2.countDownAndWait();
});
barrier2.countDownAndWait();
// Cleaning up the retired threads happens after task execution, so wait briefly for this to
// complete.
sleepFor(Milliseconds{100});
// Four threads should have retired due to lowering max from 8 to 4.
ASSERT_EQ(retiredThreads.load(), originalMaxThreads - lowerMaxThreads);
pool.shutdown();
pool.join();
}
TEST_F(ThreadPoolTest, ModifyMaxThreads) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
options.minThreads = 4;
options.maxThreads = 8;
options.maxIdleThreadAge = Milliseconds(100);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
unittest::Barrier barrier(options.maxThreads + 1);
auto& pool = makePool(options);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
barrier.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads);
// Modify to higher value
// reset # of retired threads
retiredThreads.store(0);
const size_t newMaxThreads = 12;
pool.setMaxThreads(newMaxThreads);
// create new barrier to reflect new number of threads
unittest::Barrier barrier2(newMaxThreads + 1);
for (auto i = size_t{0}; i < newMaxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier2.countDownAndWait();
});
}
barrier2.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), newMaxThreads - options.minThreads);
// modify to lower value
// reset # of retired threads
retiredThreads.store(0);
const size_t lowerMaxThreads = 6;
pool.setMaxThreads(lowerMaxThreads);
// create new barrier to reflect new number of threads
unittest::Barrier barrier3(lowerMaxThreads + 1);
for (auto i = size_t{0}; i < lowerMaxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier3.countDownAndWait();
});
}
barrier3.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), lowerMaxThreads - options.minThreads);
pool.shutdown();
pool.join();
}
TEST_F(ThreadPoolTest, ModifyMaxAndMinThreads) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
options.minThreads = 4;
options.maxThreads = 8;
options.maxIdleThreadAge = Milliseconds(100);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
unittest::Barrier barrier(options.maxThreads + 1);
auto& pool = makePool(options);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
barrier.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads);
// reset # of retired threads
retiredThreads.store(0);
const size_t newMaxThreads = 12;
const size_t newMinThreads = 2;
pool.setMaxThreads(newMaxThreads);
pool.setMinThreads(newMinThreads);
// create new barrier to reflect new number of threads
unittest::Barrier barrier2(newMaxThreads + 1);
for (auto i = size_t{0}; i < newMaxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier2.countDownAndWait();
});
}
barrier2.countDownAndWait();
while (pool.getStats().numThreads > newMinThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, newMinThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), newMaxThreads - newMinThreads);
pool.shutdown();
pool.join();
}
TEST_F(ThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) {
ThreadPool::Options options;
options.minThreads = 1;
options.maxThreads = 1;