From f6fd3943dc5e86a69879e7ca6586c74825c1a4a1 Mon Sep 17 00:00:00 2001 From: Billy Donahue Date: Thu, 4 Sep 2025 14:34:10 -0400 Subject: [PATCH] SERVER-101373 unify ThreadPoolTest fixtures (#40753) GitOrigin-RevId: 9d0740ec17e82103ba0ebdb2c5f407f370ad881a --- src/mongo/util/concurrency/BUILD.bazel | 15 +- .../util/concurrency/thread_pool_test.cpp | 405 +++++++++++------- .../concurrency/thread_pool_test_common.cpp | 257 ----------- .../concurrency/thread_pool_test_common.h | 48 --- .../concurrency/thread_pool_test_fixture.cpp | 40 -- .../concurrency/thread_pool_test_fixture.h | 72 ---- 6 files changed, 253 insertions(+), 584 deletions(-) delete mode 100644 src/mongo/util/concurrency/thread_pool_test_common.cpp delete mode 100644 src/mongo/util/concurrency/thread_pool_test_common.h delete mode 100644 src/mongo/util/concurrency/thread_pool_test_fixture.cpp delete mode 100644 src/mongo/util/concurrency/thread_pool_test_fixture.h diff --git a/src/mongo/util/concurrency/BUILD.bazel b/src/mongo/util/concurrency/BUILD.bazel index 4b16181cbe2..82476794e54 100644 --- a/src/mongo/util/concurrency/BUILD.bazel +++ b/src/mongo/util/concurrency/BUILD.bazel @@ -104,19 +104,6 @@ mongo_cc_library( ], ) -mongo_cc_library( - name = "thread_pool_test_fixture", - srcs = [ - "thread_pool_test_common.cpp", - "thread_pool_test_fixture.cpp", - "//src/mongo/util/concurrency:thread_pool_test_common.h", - "//src/mongo/util/concurrency:thread_pool_test_fixture.h", - ], - deps = [ - "//src/mongo/unittest", - ], -) - mongo_cc_unit_test( name = "util_concurrency_test", srcs = [ @@ -131,11 +118,11 @@ mongo_cc_unit_test( deps = [ ":spin_lock", ":thread_pool", - ":thread_pool_test_fixture", ":ticketholder", "//src/mongo/db:service_context_non_d", "//src/mongo/db:service_context_test_fixture", "//src/mongo/db/auth:authmocks", + "//src/mongo/unittest", "//src/mongo/util:packaged_task", ], ) diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp index 2aa63477e8b..aeb69c1b782 100644 --- a/src/mongo/util/concurrency/thread_pool_test.cpp +++ b/src/mongo/util/concurrency/thread_pool_test.cpp @@ -28,15 +28,12 @@ */ -#include -#include -#include -// IWYU pragma: no_include "cxxabi.h" -#include "mongo/base/init.h" // IWYU pragma: keep -#include "mongo/base/initializer.h" +#include "mongo/util/concurrency/thread_pool.h" + +#include "mongo/base/error_codes.h" #include "mongo/base/status.h" #include "mongo/base/string_data.h" -#include "mongo/platform/atomic_word.h" +#include "mongo/platform/atomic.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" @@ -44,64 +41,116 @@ #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/concurrency/thread_pool_test_common.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/future.h" +#include "mongo/util/scopeguard.h" +#include "mongo/util/str.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" +#include +#include +#include #include #include +#include #include -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - +#include +#include +namespace mongo { namespace { -using namespace mongo; - -MONGO_INITIALIZER(ThreadPoolCommonTests)(InitializerContext*) { - addTestsForThreadPool("ThreadPoolCommon", - []() { return std::make_unique(ThreadPool::Options()); }); -} - class ThreadPoolTest : public unittest::Test { protected: + /** + * Manages blocking work launched by the test. + * Starts off in the blocked state. + */ + class BlockingWorkControl { + public: + ~BlockingWorkControl() { + unblock(); + } + + void block() { + stdx::unique_lock lk(_mutex); + _workBlocked = true; + _cv.notify_all(); + } + + void unblock() { + stdx::unique_lock lk(_mutex); + _workBlocked = false; + _cv.notify_all(); + } + + void onWorkStart() { + stdx::unique_lock lk(_mutex); + ++_workStarted; + _cv.notify_all(); + } + + void pauseWhileBlocked() { + stdx::unique_lock lk(_mutex); + _cv.wait(lk, [&] { return !_workBlocked; }); + } + + int pauseUntilWorkStartedCountAtLeast(int count) { + stdx::unique_lock lk(_mutex); + _cv.wait(lk, [&] { return _workStarted >= count; }); + return _workStarted; + } + + private: + mutable stdx::mutex _mutex; + stdx::condition_variable _cv; + int _workStarted = 0; + int _workBlocked = true; + }; + ThreadPool& makePool(ThreadPool::Options options) { ASSERT(!_pool); _pool.emplace(std::move(options)); return *_pool; } - ThreadPool& pool() { + void destroyPool() { + _pool.reset(); + } + + ThreadPool& getPool() { ASSERT(_pool); return *_pool; } - void blockingWork() { - stdx::unique_lock lk(mutex); - ++count1; - cv1.notify_all(); - while (!flag2) { - cv2.wait(lk); + void waitForReapTo(ThreadPool& pool, size_t upperBound) { + Timer reapTimer; + while (pool.getStats().numThreads > upperBound) { + Microseconds elapsed{reapTimer.micros()}; + ASSERT_LT(elapsed, Seconds{5}) + << fmt::format("Failed to reap excess threads after {}", elapsed); + sleepFor(Milliseconds{50}); } } - stdx::mutex mutex; - stdx::condition_variable cv1; - stdx::condition_variable cv2; - size_t count1 = 0U; - bool flag2 = false; - -private: - void tearDown() override { - stdx::unique_lock lk(mutex); - flag2 = true; - cv2.notify_all(); - lk.unlock(); + auto blockingWorkCallback(boost::optional taskNumber = {}) { + return [&, taskNumber](auto status) { + ASSERT_OK(status) << (taskNumber ? std::to_string(*taskNumber) : std::string{}); + blockingWorkControl().onWorkStart(); + blockingWorkControl().pauseWhileBlocked(); + }; } + BlockingWorkControl& blockingWorkControl() { + return _blockingWorkControl; + } + +private: boost::optional _pool; + BlockingWorkControl _blockingWorkControl; }; +using ThreadPoolDeathTest = ThreadPoolTest; TEST_F(ThreadPoolTest, MinPoolSize0) { ThreadPool::Options options; @@ -110,121 +159,99 @@ TEST_F(ThreadPoolTest, MinPoolSize0) { options.maxIdleThreadAge = Milliseconds(100); auto& pool = makePool(options); pool.startup(); - ASSERT_EQ(0U, pool.getStats().numThreads); - stdx::unique_lock lk(mutex); - pool.schedule([this](auto status) { - ASSERT_OK(status); - blockingWork(); - }); - while (count1 != 1U) { - cv1.wait(lk); + ASSERT_EQ(pool.getStats().numThreads, 0); + for (int i = 0; i < 3; ++i) { + pool.schedule(blockingWorkCallback()); + // First task is in progress, the others are pending. + blockingWorkControl().pauseUntilWorkStartedCountAtLeast(1); + { + auto stats = pool.getStats(); + ASSERT_EQ(stats.numThreads, 1); + ASSERT_EQ(stats.numIdleThreads, 0); + ASSERT_EQ(stats.numPendingTasks, i); + } } - auto stats = pool.getStats(); - ASSERT_EQUALS(1U, stats.numThreads); - ASSERT_EQUALS(0U, stats.numPendingTasks); - pool.schedule([](auto status) { ASSERT_OK(status); }); - stats = pool.getStats(); - ASSERT_EQUALS(1U, stats.numThreads); - ASSERT_EQUALS(0U, stats.numIdleThreads); - ASSERT_EQUALS(1U, stats.numPendingTasks); - flag2 = true; - cv2.notify_all(); - lk.unlock(); - Timer reapTimer; - for (size_t i = 0; i < 100 && (stats = pool.getStats()).numThreads > options.minThreads; ++i) { - sleepmillis(100); + + blockingWorkControl().unblock(); + waitForReapTo(pool, 0); + + blockingWorkControl().block(); + // This one additional task will start on the one available thread. + pool.schedule(blockingWorkCallback()); + blockingWorkControl().pauseUntilWorkStartedCountAtLeast(4); + { + auto stats = pool.getStats(); + ASSERT_EQ(stats.numThreads, 1); + ASSERT_EQ(stats.numIdleThreads, 0); + ASSERT_EQ(stats.numPendingTasks, 0); } - const Microseconds reapTime(reapTimer.micros()); - ASSERT_EQ(options.minThreads, stats.numThreads) - << "Failed to reap all threads after " << durationCount(reapTime) << "ms"; - lk.lock(); - flag2 = false; - count1 = 0; - pool.schedule([this](auto status) { - ASSERT_OK(status); - blockingWork(); - }); - while (count1 == 0) { - cv1.wait(lk); + + blockingWorkControl().unblock(); + waitForReapTo(pool, 0); + { + auto stats = pool.getStats(); + ASSERT_EQ(stats.numThreads, 0); + ASSERT_EQ(stats.numIdleThreads, 0); + ASSERT_EQ(stats.numPendingTasks, 0); } - stats = pool.getStats(); - ASSERT_EQUALS(1U, stats.numThreads); - ASSERT_EQUALS(0U, stats.numIdleThreads); - ASSERT_EQUALS(0U, stats.numPendingTasks); - flag2 = true; - cv2.notify_all(); - lk.unlock(); } TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) { + static const size_t extra = 10; ThreadPool::Options options; options.minThreads = 15; options.maxThreads = 20; options.maxIdleThreadAge = Milliseconds(100); auto& pool = makePool(options); pool.startup(); - stdx::unique_lock lk(mutex); - for (size_t i = 0U; i < 30U; ++i) { - pool.schedule([this, i](auto status) { - ASSERT_OK(status) << i; - blockingWork(); - }); + for (size_t i = 0; i < options.maxThreads + extra; ++i) + pool.schedule(blockingWorkCallback(i)); + ASSERT_EQ(blockingWorkControl().pauseUntilWorkStartedCountAtLeast(options.maxThreads), + options.maxThreads); + sleepFor(Milliseconds(100)); + { + auto stats = pool.getStats(); + ASSERT_EQ(stats.numThreads, options.maxThreads); + ASSERT_EQ(stats.numIdleThreads, 0); + ASSERT_EQ(stats.numPendingTasks, extra); } - while (count1 < 20U) { - cv1.wait(lk); - } - ASSERT_EQ(20U, count1); - auto stats = pool.getStats(); - ASSERT_EQ(20U, stats.numThreads); - ASSERT_EQ(0U, stats.numIdleThreads); - ASSERT_EQ(10U, stats.numPendingTasks); - flag2 = true; - cv2.notify_all(); - while (count1 < 30U) { - cv1.wait(lk); - } - lk.unlock(); - stats = pool.getStats(); - ASSERT_EQ(0U, stats.numPendingTasks); - Timer reapTimer; - for (size_t i = 0; i < 100 && (stats = pool.getStats()).numThreads > options.minThreads; ++i) { - sleepmillis(50); - } - const Microseconds reapTime(reapTimer.micros()); - ASSERT_EQ(options.minThreads, stats.numThreads) - << "Failed to reap excess threads after " << durationCount(reapTime) << "ms"; + + blockingWorkControl().unblock(); + blockingWorkControl().pauseUntilWorkStartedCountAtLeast(options.maxThreads + extra); + ASSERT_EQ(pool.getStats().numPendingTasks, 0); + waitForReapTo(pool, options.minThreads); } -DEATH_TEST_REGEX(ThreadPoolDeathTest, - MaxThreadsTooFewDies, - "Cannot create pool.*with maximum number of threads.*less than 1") { +DEATH_TEST_REGEX_F(ThreadPoolDeathTest, + MaxThreadsTooFewDies, + "Cannot create pool.*with maximum number of threads.*less than 1") { ThreadPool::Options options; options.maxThreads = 0; - ThreadPool pool(options); + makePool(options); } -DEATH_TEST_REGEX( +DEATH_TEST_REGEX_F( ThreadPoolDeathTest, MinThreadsTooManyDies, - R"#(.*Cannot create pool.*with minimum number of threads.*larger than the configured maximum.*minThreads":6,"maxThreads":5)#") { + R"re(.*Cannot create pool.*with minimum number of threads.*larger than the configured maximum.*minThreads":6,"maxThreads":5)re") { ThreadPool::Options options; options.maxThreads = 5; options.minThreads = 6; - ThreadPool pool(options); + makePool(options); } -TEST(SimpleThreadPoolTest, LivePoolCleanedByDestructor) { - ThreadPool pool((ThreadPool::Options())); +TEST_F(ThreadPoolTest, LivePoolCleanedByDestructor) { + auto& pool = makePool({}); pool.startup(); while (pool.getStats().numThreads == 0) { - sleepmillis(50); + sleepFor(Milliseconds{50}); } // Destructor should reap leftover threads. } -DEATH_TEST_REGEX(ThreadPoolDeathTest, - DestructionDuringJoinDies, - "Attempted to join pool.*more than once.*DoubleJoinPool") { +DEATH_TEST_REGEX_F(ThreadPoolDeathTest, + DestructionDuringJoinDies, + "Attempted to join pool.*more than once.*DoubleJoinPool") { // This test ensures that the ThreadPool destructor runs while some thread is blocked // running ThreadPool::join, to see that double-join is fatal in the pool destructor. stdx::mutex mutex; @@ -232,51 +259,49 @@ DEATH_TEST_REGEX(ThreadPoolDeathTest, options.minThreads = 1; options.maxThreads = 1; options.poolName = "DoubleJoinPool"; - boost::optional pool; - pool.emplace(options); - pool->startup(); - ASSERT_EQ(1U, pool->getStats().numThreads); + auto& pool = makePool(options); + pool.startup(); + ASSERT_EQ(pool.getStats().numThreads, 1); - stdx::unique_lock lk(mutex); + stdx::unique_lock lk(mutex); // Schedule 2 tasks to ensure that independent thread join() is blocked draining the tasks and // causing the ThreadPool destructor join to fail due to double-join. - pool->schedule([&mutex](auto status) { + pool.schedule([&mutex](auto status) { ASSERT_OK(status); - stdx::lock_guard lk(mutex); + stdx::lock_guard lk(mutex); }); - pool->schedule([&mutex](auto status) { + pool.schedule([&mutex](auto status) { ASSERT_OK(status); - stdx::lock_guard lk(mutex); + stdx::lock_guard lk(mutex); }); - stdx::thread t; - ScopeGuard onExitGuard([&] { + stdx::thread t{[&] { + pool.shutdown(); + pool.join(); + }}; + ScopeGuard onExitGuard = [&] { lk.unlock(); if (t.joinable()) { t.join(); } - }); - t = stdx::thread([&pool] { - pool->shutdown(); - pool->join(); - }); + }; ThreadPool::Stats stats; - while ((stats = pool->getStats()).numPendingTasks != 0U) { - sleepmillis(10); + while ((stats = pool.getStats()).numPendingTasks != 0) { + sleepFor(Milliseconds{10}); } // Accounts for cleanup and regular worker thread. - ASSERT_EQ(2U, stats.numThreads); - ASSERT_EQ(0U, stats.numIdleThreads); - pool.reset(); + ASSERT_EQ(stats.numThreads, 2); + ASSERT_EQ(stats.numIdleThreads, 0); + destroyPool(); MONGO_UNREACHABLE; } TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks) { - unittest::Barrier barrier(2U); + unittest::Barrier barrier(2); std::string journal; ThreadPool::Options options; options.threadNamePrefix = "mythread"; - options.maxThreads = 1U; + options.maxThreads = 1; options.onCreateThread = [&](const std::string& threadName) { journal.append(fmt::format("[onCreate({})]", threadName)); }; @@ -288,11 +313,11 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks) barrier.countDownAndWait(); }); barrier.countDownAndWait(); - ASSERT_EQUALS(journal, "[onCreate(mythread0)][Call(OK)]"); + ASSERT_EQ(journal, "[onCreate(mythread0)][Call(OK)]"); } -TEST(SimpleThreadPoolTest, JoinAllRetiredThreads) { - AtomicWord retiredThreads(0); +TEST_F(ThreadPoolTest, JoinAllRetiredThreads) { + Atomic retiredThreads(0); ThreadPool::Options options; options.minThreads = 4; options.maxThreads = 8; @@ -302,7 +327,7 @@ TEST(SimpleThreadPoolTest, JoinAllRetiredThreads) { }; unittest::Barrier barrier(options.maxThreads + 1); - ThreadPool pool(options); + auto& pool = makePool(options); for (auto i = options.maxThreads; i > 0; i--) { pool.schedule([&](auto status) { ASSERT_OK(status); @@ -314,22 +339,21 @@ TEST(SimpleThreadPoolTest, JoinAllRetiredThreads) { barrier.countDownAndWait(); while (pool.getStats().numThreads > options.minThreads) { - sleepmillis(100); + sleepFor(Microseconds{1}); } pool.shutdown(); pool.join(); - const auto expectedRetiredThreads = options.maxThreads - options.minThreads; - ASSERT_EQ(retiredThreads.load(), expectedRetiredThreads); + ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads); ASSERT_EQ(pool.getStats().numIdleThreads, 0); } -TEST(SimpleThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) { +TEST_F(ThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) { ThreadPool::Options options; options.minThreads = 1; options.maxThreads = 1; - ThreadPool pool(options); + auto& pool = makePool(options); unittest::Barrier barrier(2); pool.schedule([&](Status) { barrier.countDownAndWait(); @@ -337,7 +361,7 @@ TEST(SimpleThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) { // ThreadPool::shutdown(). Introducing the following sleep increases the chances of such an // ordering. However, this is a best-effort, and ThreadPool::shutdown() may still precede // ThreadPool::waitForIdle on slow machines. - sleepmillis(10); + sleepFor(Milliseconds{10}); }); pool.schedule([&](Status) { pool.shutdown(); }); pool.startup(); @@ -345,4 +369,79 @@ TEST(SimpleThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) { pool.waitForIdle(); } +TEST_F(ThreadPoolTest, UnusedPool) { + makePool({}); +} + +TEST_F(ThreadPoolTest, CannotScheduleAfterShutdown) { + auto& pool = makePool({}); + pool.shutdown(); + pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); }); +} + +TEST_F(ThreadPoolTest, PoolDestructorExecutesRemainingTasks) { + auto& pool = makePool({}); + bool executed = false; + pool.schedule([&](auto status) { + ASSERT_OK(status); + executed = true; + }); + destroyPool(); + ASSERT_TRUE(executed); +} + +TEST_F(ThreadPoolTest, PoolJoinExecutesRemainingTasks) { + auto& pool = makePool({}); + bool executed = false; + pool.schedule([&](auto status) { + ASSERT_OK(status); + executed = true; + }); + pool.shutdown(); + pool.join(); + ASSERT_TRUE(executed); +} + +TEST_F(ThreadPoolTest, RepeatedScheduleDoesntSmashStack) { + auto& pool = makePool({}); + auto finished = makePromiseFuture(); + auto func = [&](auto&& self, int depth) -> void { + if (depth) { + pool.schedule([&, depth](auto status) { + ASSERT_OK(status); + self(self, depth - 1); + }); + } else { + pool.shutdown(); + finished.promise.emplaceValue(); + } + }; + func(func, 10'000); + pool.startup(); + pool.join(); + finished.future.get(); +} + +DEATH_TEST_F(ThreadPoolDeathTest, DieOnDoubleStartUp, "already started") { + auto& pool = makePool({}); + pool.startup(); + pool.startup(); +} + +DEATH_TEST_F(ThreadPoolDeathTest, DieWhenExceptionBubblesUp, "task oops") { + auto& pool = makePool({}); + pool.startup(); + pool.schedule([](auto&&) { uassertStatusOK(Status({ErrorCodes::BadValue, "task oops"})); }); + pool.shutdown(); + pool.join(); +} + +DEATH_TEST_F(ThreadPoolDeathTest, DieOnDoubleJoin, "Attempted to join pool") { + auto& pool = makePool({}); + pool.shutdown(); + pool.join(); + pool.join(); +} + } // namespace +} // namespace mongo diff --git a/src/mongo/util/concurrency/thread_pool_test_common.cpp b/src/mongo/util/concurrency/thread_pool_test_common.cpp deleted file mode 100644 index c889179d95a..00000000000 --- a/src/mongo/util/concurrency/thread_pool_test_common.cpp +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#include -#include -// IWYU pragma: no_include "cxxabi.h" -#include "mongo/base/error_codes.h" -#include "mongo/base/status.h" -#include "mongo/logv2/log.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/unordered_map.h" -#include "mongo/unittest/death_test.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/concurrency/thread_pool_interface.h" -#include "mongo/util/concurrency/thread_pool_test_common.h" -#include "mongo/util/concurrency/thread_pool_test_fixture.h" -#include "mongo/util/str.h" - -#include -#include -#include -#include - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault - - -namespace mongo { -namespace { - -using ThreadPoolFactory = std::function()>; - -class CommonThreadPoolTestFixture : public ThreadPoolTest { -public: - CommonThreadPoolTestFixture(ThreadPoolFactory makeThreadPool) - : _makeThreadPool(std::move(makeThreadPool)) {} - -private: - std::unique_ptr makeThreadPool() override { - return _makeThreadPool(); - } - - ThreadPoolFactory _makeThreadPool; -}; - -using ThreadPoolTestCaseFactory = - std::function(ThreadPoolFactory)>; -using ThreadPoolTestCaseMap = stdx::unordered_map; - -static ThreadPoolTestCaseMap& threadPoolTestCaseRegistry() { - static ThreadPoolTestCaseMap registry; - return registry; -} - -class TptRegistrationAgent { - TptRegistrationAgent(const TptRegistrationAgent&) = delete; - TptRegistrationAgent& operator=(const TptRegistrationAgent&) = delete; - -public: - TptRegistrationAgent(const std::string& name, ThreadPoolTestCaseFactory makeTest) { - auto& entry = threadPoolTestCaseRegistry()[name]; - if (entry) { - LOGV2_FATAL(34355, "Multiple attempts to register ThreadPoolTest", "name"_attr = name); - } - entry = std::move(makeTest); - } -}; - -template -class TptDeathRegistrationAgent { - TptDeathRegistrationAgent(const TptDeathRegistrationAgent&) = delete; - TptDeathRegistrationAgent& operator=(const TptDeathRegistrationAgent&) = delete; - -public: - TptDeathRegistrationAgent(const std::string& name, ThreadPoolTestCaseFactory makeTest) { - auto& entry = threadPoolTestCaseRegistry()[name]; - if (entry) { - LOGV2_FATAL( - 34356, "Multiple attempts to register ThreadPoolDeathTest", "name"_attr = name); - } - entry = [makeTest](ThreadPoolFactory makeThreadPool) { - return std::make_unique<::mongo::unittest::DeathTest>(std::move(makeThreadPool)); - }; - } -}; - -#define COMMON_THREAD_POOL_TEST(TEST_NAME) \ - class TPT_##TEST_NAME : public CommonThreadPoolTestFixture { \ - public: \ - TPT_##TEST_NAME(ThreadPoolFactory makeThreadPool) \ - : CommonThreadPoolTestFixture(std::move(makeThreadPool)) {} \ - \ - private: \ - void _doTest() override; \ - static inline const TptRegistrationAgent _agent{ \ - #TEST_NAME, [](ThreadPoolFactory makeThreadPool) { \ - return std::make_unique(std::move(makeThreadPool)); \ - }}; \ - }; \ - void TPT_##TEST_NAME::_doTest() - -#define COMMON_THREAD_POOL_DEATH_TEST(TEST_NAME, MATCH_EXPR) \ - class TPT_##TEST_NAME : public CommonThreadPoolTestFixture { \ - public: \ - TPT_##TEST_NAME(ThreadPoolFactory makeThreadPool) \ - : CommonThreadPoolTestFixture(std::move(makeThreadPool)) {} \ - static std::string getPattern() { \ - return MATCH_EXPR; \ - } \ - static bool isRegex() { \ - return false; \ - } \ - static int getLine() { \ - return __LINE__; \ - } \ - static std::string getFile() { \ - return __FILE__; \ - } \ - \ - \ - private: \ - void _doTest() override; \ - static inline const TptDeathRegistrationAgent _agent{ \ - #TEST_NAME, [](ThreadPoolFactory makeThreadPool) { \ - return std::make_unique(std::move(makeThreadPool)); \ - }}; \ - }; \ - void TPT_##TEST_NAME::_doTest() - -COMMON_THREAD_POOL_TEST(UnusedPool) { - getThreadPool(); -} - -COMMON_THREAD_POOL_TEST(CannotScheduleAfterShutdown) { - auto& pool = getThreadPool(); - pool.shutdown(); - pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); }); -} - -COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleStartUp, "already started") { - auto& pool = getThreadPool(); - pool.startup(); - pool.startup(); -} - -namespace { -constexpr auto kExceptionMessage = "No good very bad exception"; -} - -COMMON_THREAD_POOL_DEATH_TEST(DieWhenExceptionBubblesUp, kExceptionMessage) { - auto& pool = getThreadPool(); - pool.startup(); - pool.schedule( - [](auto status) { uassertStatusOK(Status({ErrorCodes::BadValue, kExceptionMessage})); }); - pool.shutdown(); - pool.join(); -} - -COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleJoin, "Attempted to join pool") { - auto& pool = getThreadPool(); - pool.shutdown(); - pool.join(); - pool.join(); -} - -COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) { - auto& pool = getThreadPool(); - bool executed = false; - pool.schedule([&executed](auto status) { - ASSERT_OK(status); - executed = true; - }); - deleteThreadPool(); - ASSERT_EQ(executed, true); -} - -COMMON_THREAD_POOL_TEST(PoolJoinExecutesRemainingTasks) { - auto& pool = getThreadPool(); - bool executed = false; - pool.schedule([&executed](auto status) { - ASSERT_OK(status); - executed = true; - }); - pool.shutdown(); - pool.join(); - ASSERT_EQ(executed, true); -} - -COMMON_THREAD_POOL_TEST(RepeatedScheduleDoesntSmashStack) { - const std::size_t depth = 10000ul; - auto& pool = getThreadPool(); - std::function func; - std::size_t n = 0; - stdx::mutex mutex; - stdx::condition_variable condvar; - func = [&pool, &n, &func, &condvar, &mutex, depth]() { - stdx::unique_lock lk(mutex); - if (n < depth) { - n++; - lk.unlock(); - pool.schedule([&](auto status) { - ASSERT_OK(status); - func(); - }); - } else { - pool.shutdown(); - condvar.notify_one(); - } - }; - func(); - pool.startup(); - pool.join(); - - stdx::unique_lock lk(mutex); - condvar.wait(lk, [&n, depth] { return n == depth; }); -} - -} // namespace - -void addTestsForThreadPool(const std::string& suiteName, ThreadPoolFactory makeThreadPool) { - auto& suite = unittest::Suite::getSuite(suiteName); - for (const auto& testCase : threadPoolTestCaseRegistry()) { - suite.add(str::stream() << suiteName << "::" << testCase.first, - __FILE__, - [testCase, makeThreadPool] { testCase.second(makeThreadPool)->run(); }); - } -} - -} // namespace mongo diff --git a/src/mongo/util/concurrency/thread_pool_test_common.h b/src/mongo/util/concurrency/thread_pool_test_common.h deleted file mode 100644 index abd9cdc5025..00000000000 --- a/src/mongo/util/concurrency/thread_pool_test_common.h +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include -#include -#include - -namespace mongo { - -class ThreadPoolInterface; - -/** - * Sets up a unit test suite named "suiteName" that runs a battery of unit - * tests against thread pools returned by "makeThreadPool". These tests should - * work against any implementation of ThreadPoolInterface. - */ -void addTestsForThreadPool(const std::string& suiteName, - std::function()> makeThreadPool); - -} // namespace mongo diff --git a/src/mongo/util/concurrency/thread_pool_test_fixture.cpp b/src/mongo/util/concurrency/thread_pool_test_fixture.cpp deleted file mode 100644 index 7ef24936c42..00000000000 --- a/src/mongo/util/concurrency/thread_pool_test_fixture.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/util/concurrency/thread_pool_test_fixture.h" - -namespace mongo { - -void ThreadPoolTest::setUp() { - _threadPool = makeThreadPool(); -} - -void ThreadPoolTest::tearDown() {} - -} // namespace mongo diff --git a/src/mongo/util/concurrency/thread_pool_test_fixture.h b/src/mongo/util/concurrency/thread_pool_test_fixture.h deleted file mode 100644 index 7f251bd33e9..00000000000 --- a/src/mongo/util/concurrency/thread_pool_test_fixture.h +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/thread_pool_interface.h" - -#include - -namespace mongo { - -/** - * Test fixture for tests that require a ThreadPool backed by a NetworkInterfaceMock. - */ -class ThreadPoolTest : public unittest::Test { -protected: - ~ThreadPoolTest() override = default; - - ThreadPoolInterface& getThreadPool() { - return *_threadPool; - } - - void deleteThreadPool() { - _threadPool.reset(); - } - - /** - * Initializes both the NetworkInterfaceMock and ThreadPool but does not start the threadPool. - */ - void setUp() override; - - /** - * Destroys the replication threadPool. - * - * Shuts down and joins the running threadPool. - */ - void tearDown() override; - -private: - virtual std::unique_ptr makeThreadPool() = 0; - - std::unique_ptr _threadPool; -}; - -} // namespace mongo