SERVER-101373 unify ThreadPoolTest fixtures (#40753)

GitOrigin-RevId: 9d0740ec17e82103ba0ebdb2c5f407f370ad881a
This commit is contained in:
Billy Donahue 2025-09-04 14:34:10 -04:00 committed by MongoDB Bot
parent 4c395445a9
commit f6fd3943dc
6 changed files with 253 additions and 584 deletions

View File

@ -104,19 +104,6 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "thread_pool_test_fixture",
srcs = [
"thread_pool_test_common.cpp",
"thread_pool_test_fixture.cpp",
"//src/mongo/util/concurrency:thread_pool_test_common.h",
"//src/mongo/util/concurrency:thread_pool_test_fixture.h",
],
deps = [
"//src/mongo/unittest",
],
)
mongo_cc_unit_test(
name = "util_concurrency_test",
srcs = [
@ -131,11 +118,11 @@ mongo_cc_unit_test(
deps = [
":spin_lock",
":thread_pool",
":thread_pool_test_fixture",
":ticketholder",
"//src/mongo/db:service_context_non_d",
"//src/mongo/db:service_context_test_fixture",
"//src/mongo/db/auth:authmocks",
"//src/mongo/unittest",
"//src/mongo/util:packaged_task",
],
)

View File

@ -28,15 +28,12 @@
*/
#include <boost/move/utility_core.hpp>
#include <boost/optional/optional.hpp>
#include <fmt/format.h>
// IWYU pragma: no_include "cxxabi.h"
#include "mongo/base/init.h" // IWYU pragma: keep
#include "mongo/base/initializer.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/base/error_codes.h"
#include "mongo/base/status.h"
#include "mongo/base/string_data.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/atomic.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
@ -44,64 +41,116 @@
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/concurrency/thread_pool_test_common.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/future.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/str.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
#include <cstddef>
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
#include <boost/optional/optional.hpp>
#include <fmt/format.h>
namespace mongo {
namespace {
using namespace mongo;
MONGO_INITIALIZER(ThreadPoolCommonTests)(InitializerContext*) {
addTestsForThreadPool("ThreadPoolCommon",
[]() { return std::make_unique<ThreadPool>(ThreadPool::Options()); });
}
class ThreadPoolTest : public unittest::Test {
protected:
/**
* Manages blocking work launched by the test.
* Starts off in the blocked state.
*/
class BlockingWorkControl {
public:
~BlockingWorkControl() {
unblock();
}
void block() {
stdx::unique_lock lk(_mutex);
_workBlocked = true;
_cv.notify_all();
}
void unblock() {
stdx::unique_lock lk(_mutex);
_workBlocked = false;
_cv.notify_all();
}
void onWorkStart() {
stdx::unique_lock lk(_mutex);
++_workStarted;
_cv.notify_all();
}
void pauseWhileBlocked() {
stdx::unique_lock lk(_mutex);
_cv.wait(lk, [&] { return !_workBlocked; });
}
int pauseUntilWorkStartedCountAtLeast(int count) {
stdx::unique_lock lk(_mutex);
_cv.wait(lk, [&] { return _workStarted >= count; });
return _workStarted;
}
private:
mutable stdx::mutex _mutex;
stdx::condition_variable _cv;
int _workStarted = 0;
int _workBlocked = true;
};
ThreadPool& makePool(ThreadPool::Options options) {
ASSERT(!_pool);
_pool.emplace(std::move(options));
return *_pool;
}
ThreadPool& pool() {
void destroyPool() {
_pool.reset();
}
ThreadPool& getPool() {
ASSERT(_pool);
return *_pool;
}
void blockingWork() {
stdx::unique_lock<stdx::mutex> lk(mutex);
++count1;
cv1.notify_all();
while (!flag2) {
cv2.wait(lk);
void waitForReapTo(ThreadPool& pool, size_t upperBound) {
Timer reapTimer;
while (pool.getStats().numThreads > upperBound) {
Microseconds elapsed{reapTimer.micros()};
ASSERT_LT(elapsed, Seconds{5})
<< fmt::format("Failed to reap excess threads after {}", elapsed);
sleepFor(Milliseconds{50});
}
}
stdx::mutex mutex;
stdx::condition_variable cv1;
stdx::condition_variable cv2;
size_t count1 = 0U;
bool flag2 = false;
private:
void tearDown() override {
stdx::unique_lock<stdx::mutex> lk(mutex);
flag2 = true;
cv2.notify_all();
lk.unlock();
auto blockingWorkCallback(boost::optional<int> taskNumber = {}) {
return [&, taskNumber](auto status) {
ASSERT_OK(status) << (taskNumber ? std::to_string(*taskNumber) : std::string{});
blockingWorkControl().onWorkStart();
blockingWorkControl().pauseWhileBlocked();
};
}
BlockingWorkControl& blockingWorkControl() {
return _blockingWorkControl;
}
private:
boost::optional<ThreadPool> _pool;
BlockingWorkControl _blockingWorkControl;
};
using ThreadPoolDeathTest = ThreadPoolTest;
TEST_F(ThreadPoolTest, MinPoolSize0) {
ThreadPool::Options options;
@ -110,121 +159,99 @@ TEST_F(ThreadPoolTest, MinPoolSize0) {
options.maxIdleThreadAge = Milliseconds(100);
auto& pool = makePool(options);
pool.startup();
ASSERT_EQ(0U, pool.getStats().numThreads);
stdx::unique_lock<stdx::mutex> lk(mutex);
pool.schedule([this](auto status) {
ASSERT_OK(status);
blockingWork();
});
while (count1 != 1U) {
cv1.wait(lk);
ASSERT_EQ(pool.getStats().numThreads, 0);
for (int i = 0; i < 3; ++i) {
pool.schedule(blockingWorkCallback());
// First task is in progress, the others are pending.
blockingWorkControl().pauseUntilWorkStartedCountAtLeast(1);
{
auto stats = pool.getStats();
ASSERT_EQ(stats.numThreads, 1);
ASSERT_EQ(stats.numIdleThreads, 0);
ASSERT_EQ(stats.numPendingTasks, i);
}
}
auto stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numPendingTasks);
pool.schedule([](auto status) { ASSERT_OK(status); });
stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numIdleThreads);
ASSERT_EQUALS(1U, stats.numPendingTasks);
flag2 = true;
cv2.notify_all();
lk.unlock();
Timer reapTimer;
for (size_t i = 0; i < 100 && (stats = pool.getStats()).numThreads > options.minThreads; ++i) {
sleepmillis(100);
blockingWorkControl().unblock();
waitForReapTo(pool, 0);
blockingWorkControl().block();
// This one additional task will start on the one available thread.
pool.schedule(blockingWorkCallback());
blockingWorkControl().pauseUntilWorkStartedCountAtLeast(4);
{
auto stats = pool.getStats();
ASSERT_EQ(stats.numThreads, 1);
ASSERT_EQ(stats.numIdleThreads, 0);
ASSERT_EQ(stats.numPendingTasks, 0);
}
const Microseconds reapTime(reapTimer.micros());
ASSERT_EQ(options.minThreads, stats.numThreads)
<< "Failed to reap all threads after " << durationCount<Milliseconds>(reapTime) << "ms";
lk.lock();
flag2 = false;
count1 = 0;
pool.schedule([this](auto status) {
ASSERT_OK(status);
blockingWork();
});
while (count1 == 0) {
cv1.wait(lk);
blockingWorkControl().unblock();
waitForReapTo(pool, 0);
{
auto stats = pool.getStats();
ASSERT_EQ(stats.numThreads, 0);
ASSERT_EQ(stats.numIdleThreads, 0);
ASSERT_EQ(stats.numPendingTasks, 0);
}
stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numIdleThreads);
ASSERT_EQUALS(0U, stats.numPendingTasks);
flag2 = true;
cv2.notify_all();
lk.unlock();
}
TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) {
static const size_t extra = 10;
ThreadPool::Options options;
options.minThreads = 15;
options.maxThreads = 20;
options.maxIdleThreadAge = Milliseconds(100);
auto& pool = makePool(options);
pool.startup();
stdx::unique_lock<stdx::mutex> lk(mutex);
for (size_t i = 0U; i < 30U; ++i) {
pool.schedule([this, i](auto status) {
ASSERT_OK(status) << i;
blockingWork();
});
for (size_t i = 0; i < options.maxThreads + extra; ++i)
pool.schedule(blockingWorkCallback(i));
ASSERT_EQ(blockingWorkControl().pauseUntilWorkStartedCountAtLeast(options.maxThreads),
options.maxThreads);
sleepFor(Milliseconds(100));
{
auto stats = pool.getStats();
ASSERT_EQ(stats.numThreads, options.maxThreads);
ASSERT_EQ(stats.numIdleThreads, 0);
ASSERT_EQ(stats.numPendingTasks, extra);
}
while (count1 < 20U) {
cv1.wait(lk);
}
ASSERT_EQ(20U, count1);
auto stats = pool.getStats();
ASSERT_EQ(20U, stats.numThreads);
ASSERT_EQ(0U, stats.numIdleThreads);
ASSERT_EQ(10U, stats.numPendingTasks);
flag2 = true;
cv2.notify_all();
while (count1 < 30U) {
cv1.wait(lk);
}
lk.unlock();
stats = pool.getStats();
ASSERT_EQ(0U, stats.numPendingTasks);
Timer reapTimer;
for (size_t i = 0; i < 100 && (stats = pool.getStats()).numThreads > options.minThreads; ++i) {
sleepmillis(50);
}
const Microseconds reapTime(reapTimer.micros());
ASSERT_EQ(options.minThreads, stats.numThreads)
<< "Failed to reap excess threads after " << durationCount<Milliseconds>(reapTime) << "ms";
blockingWorkControl().unblock();
blockingWorkControl().pauseUntilWorkStartedCountAtLeast(options.maxThreads + extra);
ASSERT_EQ(pool.getStats().numPendingTasks, 0);
waitForReapTo(pool, options.minThreads);
}
DEATH_TEST_REGEX(ThreadPoolDeathTest,
MaxThreadsTooFewDies,
"Cannot create pool.*with maximum number of threads.*less than 1") {
DEATH_TEST_REGEX_F(ThreadPoolDeathTest,
MaxThreadsTooFewDies,
"Cannot create pool.*with maximum number of threads.*less than 1") {
ThreadPool::Options options;
options.maxThreads = 0;
ThreadPool pool(options);
makePool(options);
}
DEATH_TEST_REGEX(
DEATH_TEST_REGEX_F(
ThreadPoolDeathTest,
MinThreadsTooManyDies,
R"#(.*Cannot create pool.*with minimum number of threads.*larger than the configured maximum.*minThreads":6,"maxThreads":5)#") {
R"re(.*Cannot create pool.*with minimum number of threads.*larger than the configured maximum.*minThreads":6,"maxThreads":5)re") {
ThreadPool::Options options;
options.maxThreads = 5;
options.minThreads = 6;
ThreadPool pool(options);
makePool(options);
}
TEST(SimpleThreadPoolTest, LivePoolCleanedByDestructor) {
ThreadPool pool((ThreadPool::Options()));
TEST_F(ThreadPoolTest, LivePoolCleanedByDestructor) {
auto& pool = makePool({});
pool.startup();
while (pool.getStats().numThreads == 0) {
sleepmillis(50);
sleepFor(Milliseconds{50});
}
// Destructor should reap leftover threads.
}
DEATH_TEST_REGEX(ThreadPoolDeathTest,
DestructionDuringJoinDies,
"Attempted to join pool.*more than once.*DoubleJoinPool") {
DEATH_TEST_REGEX_F(ThreadPoolDeathTest,
DestructionDuringJoinDies,
"Attempted to join pool.*more than once.*DoubleJoinPool") {
// This test ensures that the ThreadPool destructor runs while some thread is blocked
// running ThreadPool::join, to see that double-join is fatal in the pool destructor.
stdx::mutex mutex;
@ -232,51 +259,49 @@ DEATH_TEST_REGEX(ThreadPoolDeathTest,
options.minThreads = 1;
options.maxThreads = 1;
options.poolName = "DoubleJoinPool";
boost::optional<ThreadPool> pool;
pool.emplace(options);
pool->startup();
ASSERT_EQ(1U, pool->getStats().numThreads);
auto& pool = makePool(options);
pool.startup();
ASSERT_EQ(pool.getStats().numThreads, 1);
stdx::unique_lock<stdx::mutex> lk(mutex);
stdx::unique_lock lk(mutex);
// Schedule 2 tasks to ensure that independent thread join() is blocked draining the tasks and
// causing the ThreadPool destructor join to fail due to double-join.
pool->schedule([&mutex](auto status) {
pool.schedule([&mutex](auto status) {
ASSERT_OK(status);
stdx::lock_guard<stdx::mutex> lk(mutex);
stdx::lock_guard lk(mutex);
});
pool->schedule([&mutex](auto status) {
pool.schedule([&mutex](auto status) {
ASSERT_OK(status);
stdx::lock_guard<stdx::mutex> lk(mutex);
stdx::lock_guard lk(mutex);
});
stdx::thread t;
ScopeGuard onExitGuard([&] {
stdx::thread t{[&] {
pool.shutdown();
pool.join();
}};
ScopeGuard onExitGuard = [&] {
lk.unlock();
if (t.joinable()) {
t.join();
}
});
t = stdx::thread([&pool] {
pool->shutdown();
pool->join();
});
};
ThreadPool::Stats stats;
while ((stats = pool->getStats()).numPendingTasks != 0U) {
sleepmillis(10);
while ((stats = pool.getStats()).numPendingTasks != 0) {
sleepFor(Milliseconds{10});
}
// Accounts for cleanup and regular worker thread.
ASSERT_EQ(2U, stats.numThreads);
ASSERT_EQ(0U, stats.numIdleThreads);
pool.reset();
ASSERT_EQ(stats.numThreads, 2);
ASSERT_EQ(stats.numIdleThreads, 0);
destroyPool();
MONGO_UNREACHABLE;
}
TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks) {
unittest::Barrier barrier(2U);
unittest::Barrier barrier(2);
std::string journal;
ThreadPool::Options options;
options.threadNamePrefix = "mythread";
options.maxThreads = 1U;
options.maxThreads = 1;
options.onCreateThread = [&](const std::string& threadName) {
journal.append(fmt::format("[onCreate({})]", threadName));
};
@ -288,11 +313,11 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks)
barrier.countDownAndWait();
});
barrier.countDownAndWait();
ASSERT_EQUALS(journal, "[onCreate(mythread0)][Call(OK)]");
ASSERT_EQ(journal, "[onCreate(mythread0)][Call(OK)]");
}
TEST(SimpleThreadPoolTest, JoinAllRetiredThreads) {
AtomicWord<unsigned long> retiredThreads(0);
TEST_F(ThreadPoolTest, JoinAllRetiredThreads) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
options.minThreads = 4;
options.maxThreads = 8;
@ -302,7 +327,7 @@ TEST(SimpleThreadPoolTest, JoinAllRetiredThreads) {
};
unittest::Barrier barrier(options.maxThreads + 1);
ThreadPool pool(options);
auto& pool = makePool(options);
for (auto i = options.maxThreads; i > 0; i--) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
@ -314,22 +339,21 @@ TEST(SimpleThreadPoolTest, JoinAllRetiredThreads) {
barrier.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepmillis(100);
sleepFor(Microseconds{1});
}
pool.shutdown();
pool.join();
const auto expectedRetiredThreads = options.maxThreads - options.minThreads;
ASSERT_EQ(retiredThreads.load(), expectedRetiredThreads);
ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads);
ASSERT_EQ(pool.getStats().numIdleThreads, 0);
}
TEST(SimpleThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) {
TEST_F(ThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) {
ThreadPool::Options options;
options.minThreads = 1;
options.maxThreads = 1;
ThreadPool pool(options);
auto& pool = makePool(options);
unittest::Barrier barrier(2);
pool.schedule([&](Status) {
barrier.countDownAndWait();
@ -337,7 +361,7 @@ TEST(SimpleThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) {
// ThreadPool::shutdown(). Introducing the following sleep increases the chances of such an
// ordering. However, this is a best-effort, and ThreadPool::shutdown() may still precede
// ThreadPool::waitForIdle on slow machines.
sleepmillis(10);
sleepFor(Milliseconds{10});
});
pool.schedule([&](Status) { pool.shutdown(); });
pool.startup();
@ -345,4 +369,79 @@ TEST(SimpleThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) {
pool.waitForIdle();
}
TEST_F(ThreadPoolTest, UnusedPool) {
makePool({});
}
TEST_F(ThreadPoolTest, CannotScheduleAfterShutdown) {
auto& pool = makePool({});
pool.shutdown();
pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); });
}
TEST_F(ThreadPoolTest, PoolDestructorExecutesRemainingTasks) {
auto& pool = makePool({});
bool executed = false;
pool.schedule([&](auto status) {
ASSERT_OK(status);
executed = true;
});
destroyPool();
ASSERT_TRUE(executed);
}
TEST_F(ThreadPoolTest, PoolJoinExecutesRemainingTasks) {
auto& pool = makePool({});
bool executed = false;
pool.schedule([&](auto status) {
ASSERT_OK(status);
executed = true;
});
pool.shutdown();
pool.join();
ASSERT_TRUE(executed);
}
TEST_F(ThreadPoolTest, RepeatedScheduleDoesntSmashStack) {
auto& pool = makePool({});
auto finished = makePromiseFuture<void>();
auto func = [&](auto&& self, int depth) -> void {
if (depth) {
pool.schedule([&, depth](auto status) {
ASSERT_OK(status);
self(self, depth - 1);
});
} else {
pool.shutdown();
finished.promise.emplaceValue();
}
};
func(func, 10'000);
pool.startup();
pool.join();
finished.future.get();
}
DEATH_TEST_F(ThreadPoolDeathTest, DieOnDoubleStartUp, "already started") {
auto& pool = makePool({});
pool.startup();
pool.startup();
}
DEATH_TEST_F(ThreadPoolDeathTest, DieWhenExceptionBubblesUp, "task oops") {
auto& pool = makePool({});
pool.startup();
pool.schedule([](auto&&) { uassertStatusOK(Status({ErrorCodes::BadValue, "task oops"})); });
pool.shutdown();
pool.join();
}
DEATH_TEST_F(ThreadPoolDeathTest, DieOnDoubleJoin, "Attempted to join pool") {
auto& pool = makePool({});
pool.shutdown();
pool.join();
pool.join();
}
} // namespace
} // namespace mongo

View File

@ -1,257 +0,0 @@
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include <absl/container/node_hash_map.h>
#include <fmt/format.h>
// IWYU pragma: no_include "cxxabi.h"
#include "mongo/base/error_codes.h"
#include "mongo/base/status.h"
#include "mongo/logv2/log.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
#include "mongo/util/concurrency/thread_pool_test_common.h"
#include "mongo/util/concurrency/thread_pool_test_fixture.h"
#include "mongo/util/str.h"
#include <cstddef>
#include <memory>
#include <mutex>
#include <utility>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
namespace mongo {
namespace {
using ThreadPoolFactory = std::function<std::unique_ptr<ThreadPoolInterface>()>;
class CommonThreadPoolTestFixture : public ThreadPoolTest {
public:
CommonThreadPoolTestFixture(ThreadPoolFactory makeThreadPool)
: _makeThreadPool(std::move(makeThreadPool)) {}
private:
std::unique_ptr<ThreadPoolInterface> makeThreadPool() override {
return _makeThreadPool();
}
ThreadPoolFactory _makeThreadPool;
};
using ThreadPoolTestCaseFactory =
std::function<std::unique_ptr<::mongo::unittest::Test>(ThreadPoolFactory)>;
using ThreadPoolTestCaseMap = stdx::unordered_map<std::string, ThreadPoolTestCaseFactory>;
static ThreadPoolTestCaseMap& threadPoolTestCaseRegistry() {
static ThreadPoolTestCaseMap registry;
return registry;
}
class TptRegistrationAgent {
TptRegistrationAgent(const TptRegistrationAgent&) = delete;
TptRegistrationAgent& operator=(const TptRegistrationAgent&) = delete;
public:
TptRegistrationAgent(const std::string& name, ThreadPoolTestCaseFactory makeTest) {
auto& entry = threadPoolTestCaseRegistry()[name];
if (entry) {
LOGV2_FATAL(34355, "Multiple attempts to register ThreadPoolTest", "name"_attr = name);
}
entry = std::move(makeTest);
}
};
template <typename T>
class TptDeathRegistrationAgent {
TptDeathRegistrationAgent(const TptDeathRegistrationAgent&) = delete;
TptDeathRegistrationAgent& operator=(const TptDeathRegistrationAgent&) = delete;
public:
TptDeathRegistrationAgent(const std::string& name, ThreadPoolTestCaseFactory makeTest) {
auto& entry = threadPoolTestCaseRegistry()[name];
if (entry) {
LOGV2_FATAL(
34356, "Multiple attempts to register ThreadPoolDeathTest", "name"_attr = name);
}
entry = [makeTest](ThreadPoolFactory makeThreadPool) {
return std::make_unique<::mongo::unittest::DeathTest<T>>(std::move(makeThreadPool));
};
}
};
#define COMMON_THREAD_POOL_TEST(TEST_NAME) \
class TPT_##TEST_NAME : public CommonThreadPoolTestFixture { \
public: \
TPT_##TEST_NAME(ThreadPoolFactory makeThreadPool) \
: CommonThreadPoolTestFixture(std::move(makeThreadPool)) {} \
\
private: \
void _doTest() override; \
static inline const TptRegistrationAgent _agent{ \
#TEST_NAME, [](ThreadPoolFactory makeThreadPool) { \
return std::make_unique<TPT_##TEST_NAME>(std::move(makeThreadPool)); \
}}; \
}; \
void TPT_##TEST_NAME::_doTest()
#define COMMON_THREAD_POOL_DEATH_TEST(TEST_NAME, MATCH_EXPR) \
class TPT_##TEST_NAME : public CommonThreadPoolTestFixture { \
public: \
TPT_##TEST_NAME(ThreadPoolFactory makeThreadPool) \
: CommonThreadPoolTestFixture(std::move(makeThreadPool)) {} \
static std::string getPattern() { \
return MATCH_EXPR; \
} \
static bool isRegex() { \
return false; \
} \
static int getLine() { \
return __LINE__; \
} \
static std::string getFile() { \
return __FILE__; \
} \
\
\
private: \
void _doTest() override; \
static inline const TptDeathRegistrationAgent<TPT_##TEST_NAME> _agent{ \
#TEST_NAME, [](ThreadPoolFactory makeThreadPool) { \
return std::make_unique<TPT_##TEST_NAME>(std::move(makeThreadPool)); \
}}; \
}; \
void TPT_##TEST_NAME::_doTest()
COMMON_THREAD_POOL_TEST(UnusedPool) {
getThreadPool();
}
COMMON_THREAD_POOL_TEST(CannotScheduleAfterShutdown) {
auto& pool = getThreadPool();
pool.shutdown();
pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); });
}
COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleStartUp, "already started") {
auto& pool = getThreadPool();
pool.startup();
pool.startup();
}
namespace {
constexpr auto kExceptionMessage = "No good very bad exception";
}
COMMON_THREAD_POOL_DEATH_TEST(DieWhenExceptionBubblesUp, kExceptionMessage) {
auto& pool = getThreadPool();
pool.startup();
pool.schedule(
[](auto status) { uassertStatusOK(Status({ErrorCodes::BadValue, kExceptionMessage})); });
pool.shutdown();
pool.join();
}
COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleJoin, "Attempted to join pool") {
auto& pool = getThreadPool();
pool.shutdown();
pool.join();
pool.join();
}
COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) {
auto& pool = getThreadPool();
bool executed = false;
pool.schedule([&executed](auto status) {
ASSERT_OK(status);
executed = true;
});
deleteThreadPool();
ASSERT_EQ(executed, true);
}
COMMON_THREAD_POOL_TEST(PoolJoinExecutesRemainingTasks) {
auto& pool = getThreadPool();
bool executed = false;
pool.schedule([&executed](auto status) {
ASSERT_OK(status);
executed = true;
});
pool.shutdown();
pool.join();
ASSERT_EQ(executed, true);
}
COMMON_THREAD_POOL_TEST(RepeatedScheduleDoesntSmashStack) {
const std::size_t depth = 10000ul;
auto& pool = getThreadPool();
std::function<void()> func;
std::size_t n = 0;
stdx::mutex mutex;
stdx::condition_variable condvar;
func = [&pool, &n, &func, &condvar, &mutex, depth]() {
stdx::unique_lock<stdx::mutex> lk(mutex);
if (n < depth) {
n++;
lk.unlock();
pool.schedule([&](auto status) {
ASSERT_OK(status);
func();
});
} else {
pool.shutdown();
condvar.notify_one();
}
};
func();
pool.startup();
pool.join();
stdx::unique_lock<stdx::mutex> lk(mutex);
condvar.wait(lk, [&n, depth] { return n == depth; });
}
} // namespace
void addTestsForThreadPool(const std::string& suiteName, ThreadPoolFactory makeThreadPool) {
auto& suite = unittest::Suite::getSuite(suiteName);
for (const auto& testCase : threadPoolTestCaseRegistry()) {
suite.add(str::stream() << suiteName << "::" << testCase.first,
__FILE__,
[testCase, makeThreadPool] { testCase.second(makeThreadPool)->run(); });
}
}
} // namespace mongo

View File

@ -1,48 +0,0 @@
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include <functional>
#include <memory>
#include <string>
namespace mongo {
class ThreadPoolInterface;
/**
* Sets up a unit test suite named "suiteName" that runs a battery of unit
* tests against thread pools returned by "makeThreadPool". These tests should
* work against any implementation of ThreadPoolInterface.
*/
void addTestsForThreadPool(const std::string& suiteName,
std::function<std::unique_ptr<ThreadPoolInterface>()> makeThreadPool);
} // namespace mongo

View File

@ -1,40 +0,0 @@
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/util/concurrency/thread_pool_test_fixture.h"
namespace mongo {
void ThreadPoolTest::setUp() {
_threadPool = makeThreadPool();
}
void ThreadPoolTest::tearDown() {}
} // namespace mongo

View File

@ -1,72 +0,0 @@
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/unittest/unittest.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
#include <memory>
namespace mongo {
/**
* Test fixture for tests that require a ThreadPool backed by a NetworkInterfaceMock.
*/
class ThreadPoolTest : public unittest::Test {
protected:
~ThreadPoolTest() override = default;
ThreadPoolInterface& getThreadPool() {
return *_threadPool;
}
void deleteThreadPool() {
_threadPool.reset();
}
/**
* Initializes both the NetworkInterfaceMock and ThreadPool but does not start the threadPool.
*/
void setUp() override;
/**
* Destroys the replication threadPool.
*
* Shuts down and joins the running threadPool.
*/
void tearDown() override;
private:
virtual std::unique_ptr<ThreadPoolInterface> makeThreadPool() = 0;
std::unique_ptr<ThreadPoolInterface> _threadPool;
};
} // namespace mongo