SERVER-126240 Improve unit testing for ConnectionPool cancellation, shutdown and metrics (#53561)
GitOrigin-RevId: 9695bbd81679351d39aa030fccdc82d34bd6e69b
This commit is contained in:
parent
2b4b703432
commit
59c88643b6
@ -289,6 +289,9 @@ class ConnectionPoolLeasingTest : public ConnectionPoolTest {};
|
||||
class ConnectionPoolFailureTest : public ConnectionPoolTest {};
|
||||
class ConnectionPoolExpiryTest : public ConnectionPoolTest {};
|
||||
class ConnectionPoolDropTest : public ConnectionPoolTest {};
|
||||
class ConnectionPoolCancellationTest : public ConnectionPoolTest {};
|
||||
class ConnectionPoolShutdownTest : public ConnectionPoolTest {};
|
||||
class ConnectionPoolMetricsTest : public ConnectionPoolTest {};
|
||||
|
||||
/**
|
||||
* Verify that a request is rejected immediately when the pending request queue is at capacity.
|
||||
@ -321,7 +324,11 @@ TEST_F(ConnectionPoolQueuingTest, RequestRejectedBeforeQueuing) {
|
||||
std::move(fut).get(), DBException, ErrorCodes::PooledConnectionAcquisitionRejected);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, StatsTest) {
|
||||
/**
|
||||
* Verify that connection stats (totalCreated, per-host created) are accumulated correctly across
|
||||
* pool drops and reconnections, and that dropping connections does not reset the created count.
|
||||
*/
|
||||
TEST_F(ConnectionPoolMetricsTest, ConnectionStatsAreReportedCorrectly) {
|
||||
constexpr auto numConnections = 3;
|
||||
auto hosts = std::vector<HostAndPort>(
|
||||
{HostAndPort("host1:123"), HostAndPort("host2:456"), HostAndPort("host3:789")});
|
||||
@ -2790,19 +2797,152 @@ TEST_F(ConnectionPoolQueuingTest, RequestWithNegativeTimeoutIsRejectedImmediatel
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, ReturnAfterShutdown) {
|
||||
TEST_F(ConnectionPoolShutdownTest, ReturnAfterShutdownIsSafe) {
|
||||
auto pool = makePool();
|
||||
|
||||
// Grab a connection and hold it to end of scope
|
||||
auto connFuture = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds(1));
|
||||
ConnectionImpl::pushSetup(Status::OK());
|
||||
auto conn = std::move(connFuture).get();
|
||||
doneWith(conn);
|
||||
|
||||
pool->shutdown();
|
||||
doneWith(conn);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, TotalConnUseTimeIncreasedForCheckedOutConnection) {
|
||||
TEST_F(ConnectionPoolShutdownTest, GetAfterShutdownReturnsShutdownError) {
|
||||
auto pool = makePool();
|
||||
|
||||
// Establish a ready connection so the pool is in a healthy state before shutdown.
|
||||
ConnectionImpl::pushSetup(Status::OK());
|
||||
pool->get_forTest(
|
||||
HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { doneWith(swConn.getValue()); });
|
||||
|
||||
pool->shutdown();
|
||||
|
||||
// get() after shutdown resolves immediately with ShutdownInProgress; no setup is spawned.
|
||||
Status gotStatus = Status::OK();
|
||||
pool->get_forTest(HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
gotStatus = swConn.getStatus();
|
||||
});
|
||||
EXPECT_EQ(gotStatus.code(), ErrorCodes::ShutdownInProgress);
|
||||
EXPECT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolShutdownTest,
|
||||
LeaseAfterShutdownReturnsShutdownErrorWithoutSpawningConnection) {
|
||||
auto pool = makePool();
|
||||
|
||||
// Establish a ready connection so the pool is in a healthy state before shutdown.
|
||||
ConnectionImpl::pushSetup(Status::OK());
|
||||
pool->get_forTest(
|
||||
HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { doneWith(swConn.getValue()); });
|
||||
|
||||
pool->shutdown();
|
||||
|
||||
Status gotStatus = Status::OK();
|
||||
pool->lease_forTest(HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
gotStatus = swConn.getStatus();
|
||||
});
|
||||
EXPECT_EQ(gotStatus.code(), ErrorCodes::ShutdownInProgress);
|
||||
EXPECT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolShutdownTest, ShutdownFailsPendingConnectionRequests) {
|
||||
ConnectionPool::Options options;
|
||||
options.maxConnections = 1;
|
||||
auto pool = makePool(options);
|
||||
|
||||
// Check out the only allowed connection so the pool is at capacity.
|
||||
ConnectionPool::ConnectionHandle conn;
|
||||
ConnectionImpl::pushSetup(Status::OK());
|
||||
pool->get_forTest(HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
ASSERT_OK(swConn.getStatus());
|
||||
conn = std::move(swConn.getValue());
|
||||
});
|
||||
ASSERT(conn);
|
||||
|
||||
// Issue a second get(). The pool is at maxConnections so no new setup is spawned. The request
|
||||
// stays queued in _requests waiting for capacity.
|
||||
Status gotStatus = Status::OK();
|
||||
pool->get_forTest(HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
gotStatus = swConn.getStatus();
|
||||
});
|
||||
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
|
||||
|
||||
// shutdown() fails all pending (capacity-blocked) requests with ShutdownInProgress.
|
||||
pool->shutdown();
|
||||
|
||||
ASSERT_EQ(gotStatus.code(), ErrorCodes::ShutdownInProgress);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolShutdownTest, ShutdownDiscardsInFlightSetupAndFailsPendingRequests) {
|
||||
ConnectionPool::Options options;
|
||||
options.maxConnections = 1;
|
||||
auto pool = makePool(options);
|
||||
|
||||
// Start a get() — a setup is spawned but not yet completed.
|
||||
Status gotStatus = Status::OK();
|
||||
pool->get_forTest(HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
gotStatus = swConn.getStatus();
|
||||
});
|
||||
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u);
|
||||
|
||||
pool->shutdown();
|
||||
EXPECT_EQ(gotStatus.code(), ErrorCodes::ShutdownInProgress);
|
||||
auto stats = getStats(pool);
|
||||
EXPECT_EQ(0u, stats.totalAvailable);
|
||||
EXPECT_EQ(0u, stats.totalRefreshing);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolShutdownTest, ShutdownDiscardsInFlightRefresh) {
|
||||
ConnectionPool::Options options;
|
||||
options.maxConnections = 1;
|
||||
options.refreshRequirement = Milliseconds(1000);
|
||||
options.refreshTimeout = Milliseconds(5000);
|
||||
auto pool = makePool(options);
|
||||
|
||||
auto now = Date_t::now();
|
||||
PoolImpl::setNow(now);
|
||||
|
||||
// Establish a connection and check it out.
|
||||
ConnectionPool::ConnectionHandle conn;
|
||||
ConnectionImpl::pushSetup(Status::OK());
|
||||
pool->get_forTest(HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
conn = std::move(swConn.getValue());
|
||||
});
|
||||
ASSERT(conn);
|
||||
|
||||
// Advance past refreshRequirement while the connection is checked out so it is stale on
|
||||
// return (advancing time here does not fire any refresh timers since the pool is empty).
|
||||
PoolImpl::setNow(now + Milliseconds(1500));
|
||||
|
||||
// Return the stale connection — it enters refresh.
|
||||
doneWith(conn);
|
||||
ASSERT_EQ(1u, getStats(pool).totalRefreshing);
|
||||
|
||||
pool->shutdown();
|
||||
auto stats = getStats(pool);
|
||||
EXPECT_EQ(0u, stats.totalAvailable);
|
||||
EXPECT_EQ(0u, stats.totalRefreshing);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolMetricsTest, SingleCheckoutContributesToTotalUsageTime) {
|
||||
constexpr Milliseconds checkOutLength = Milliseconds(10);
|
||||
auto pool = makePool();
|
||||
|
||||
@ -2828,7 +2968,7 @@ TEST_F(ConnectionPoolTest, TotalConnUseTimeIncreasedForCheckedOutConnection) {
|
||||
ASSERT_GREATER_THAN_OR_EQUALS(totalTimeUsageDelta, checkOutLength);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, OverlappingCheckoutsAdditivelyContributeToTotalUsageTime) {
|
||||
TEST_F(ConnectionPoolMetricsTest, OverlappingCheckoutsAdditivelyContributeToTotalUsageTime) {
|
||||
constexpr Milliseconds checkOutLength = Milliseconds(10);
|
||||
auto pool = makePool();
|
||||
|
||||
@ -3085,7 +3225,7 @@ TEST_F(ConnectionPoolLeasingTest, LeasedConnectionCountsTowardsMaxConnections) {
|
||||
doneWith(conn);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CancelGetBeforeCallDoesntPullConnection) {
|
||||
TEST_F(ConnectionPoolCancellationTest, CancelGetPreCancelledRejectsRequest) {
|
||||
CancellationSource source;
|
||||
auto pool = makePool();
|
||||
|
||||
@ -3096,7 +3236,7 @@ TEST_F(ConnectionPoolTest, CancelGetBeforeCallDoesntPullConnection) {
|
||||
ASSERT_THROWS_CODE(connFuture.get(), DBException, ErrorCodes::CallbackCanceled);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CancelGetEarlyDoesntPullConnection) {
|
||||
TEST_F(ConnectionPoolCancellationTest, CancelGetWhilePendingRejectsRequest) {
|
||||
CancellationSource source;
|
||||
auto pool = makePool();
|
||||
|
||||
@ -3109,7 +3249,7 @@ TEST_F(ConnectionPoolTest, CancelGetEarlyDoesntPullConnection) {
|
||||
ASSERT_THROWS_CODE(connFuture.get(), DBException, ErrorCodes::CallbackCanceled);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CancelGetEarlyWithReadyConnectionDoesntPullConnection) {
|
||||
TEST_F(ConnectionPoolCancellationTest, CancelGetEarlySkipsReadyConnection) {
|
||||
CancellationSource source;
|
||||
auto pool = makePool();
|
||||
|
||||
@ -3132,7 +3272,7 @@ TEST_F(ConnectionPoolTest, CancelGetEarlyWithReadyConnectionDoesntPullConnection
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CancelGetLatePullsConnection) {
|
||||
TEST_F(ConnectionPoolCancellationTest, CancelGetLateGetsConnection) {
|
||||
CancellationSource source;
|
||||
auto pool = makePool();
|
||||
|
||||
@ -3147,7 +3287,11 @@ TEST_F(ConnectionPoolTest, CancelGetLatePullsConnection) {
|
||||
doneWith(connFuture.get());
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CancelGetAfterDestruction) {
|
||||
/**
|
||||
* Verify that cancelling a token after the pool has been destroyed is safe and does not
|
||||
* crash or access freed memory.
|
||||
*/
|
||||
TEST_F(ConnectionPoolCancellationTest, CancelGetAfterDestruction) {
|
||||
CancellationSource source;
|
||||
auto pool = makePool();
|
||||
|
||||
@ -3166,7 +3310,8 @@ TEST_F(ConnectionPoolTest, CancelGetAfterDestruction) {
|
||||
source.cancel();
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, DismissBeforeCancelGet) {
|
||||
TEST_F(ConnectionPoolCancellationTest,
|
||||
CancellationSourceDestroyedWithoutCancelDoesNotAffectRequest) {
|
||||
CancellationSource source;
|
||||
auto pool = makePool();
|
||||
|
||||
@ -3182,7 +3327,7 @@ TEST_F(ConnectionPoolTest, DismissBeforeCancelGet) {
|
||||
doneWith(connFuture.get());
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, EnsureReasonIsLogged) {
|
||||
TEST_F(ConnectionPoolMetricsTest, ConnectionFailureReasonIsLogged) {
|
||||
ConnectionPool::Options options;
|
||||
options.minConnections = 0;
|
||||
auto pool = makePool(options);
|
||||
@ -3213,6 +3358,66 @@ TEST_F(ConnectionPoolTest, EnsureReasonIsLogged) {
|
||||
ASSERT_EQ(1ul, msgCounter);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolMetricsTest, ConnectionAcquisitionWaitTimeIsTrackedPerHost) {
|
||||
auto pool = makePool();
|
||||
|
||||
auto now = Date_t::now();
|
||||
PoolImpl::setNow(now);
|
||||
|
||||
// Issue a get() without completing setup so the request waits in the queue.
|
||||
bool gotConn = false;
|
||||
ConnectionPool::ConnectionHandle conn;
|
||||
pool->get_forTest(HostAndPort(),
|
||||
Milliseconds(5000),
|
||||
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
if (!swConn.isOK())
|
||||
return;
|
||||
gotConn = true;
|
||||
conn = std::move(swConn.getValue());
|
||||
});
|
||||
|
||||
// Advance the virtual clock before completing setup so that wait time is non-zero.
|
||||
PoolImpl::setNow(now + Milliseconds(10));
|
||||
|
||||
// Setup completes and the elapsed wait time is recorded in pool stats.
|
||||
ConnectionImpl::pushSetup(Status::OK());
|
||||
ASSERT_TRUE(gotConn);
|
||||
|
||||
doneWith(conn);
|
||||
|
||||
auto stats = getStats(pool);
|
||||
ASSERT_GTE(stats.totalConnectionAcquisitionWaitTime, Milliseconds(10));
|
||||
auto hostStats = stats.statsByHost.at(HostAndPort());
|
||||
ASSERT_GTE(hostStats.connectionAcquisitionWaitTime, Milliseconds(10));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolMetricsTest, ConnectionStatsCoverAllHosts) {
|
||||
const HostAndPort host1("host1:27017");
|
||||
const HostAndPort host2("host2:27017");
|
||||
auto pool = makePool();
|
||||
|
||||
// Establish and return one connection to each host.
|
||||
ConnectionImpl::pushSetup(Status::OK());
|
||||
pool->get_forTest(
|
||||
host1, Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
doneWith(swConn.getValue());
|
||||
});
|
||||
ConnectionImpl::pushSetup(Status::OK());
|
||||
pool->get_forTest(
|
||||
host2, Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
|
||||
doneWith(swConn.getValue());
|
||||
});
|
||||
|
||||
auto stats = getStats(pool);
|
||||
EXPECT_EQ(2u, stats.totalCreated);
|
||||
EXPECT_EQ(2u, stats.totalAvailable);
|
||||
EXPECT_EQ(2u, stats.statsByHost.size());
|
||||
EXPECT_EQ(1u, stats.statsByHost.at(host1).created);
|
||||
EXPECT_EQ(1u, stats.statsByHost.at(host1).available);
|
||||
EXPECT_EQ(1u, stats.statsByHost.at(host2).created);
|
||||
EXPECT_EQ(1u, stats.statsByHost.at(host2).available);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that a setup failure with established connections does not cause pool failure.
|
||||
*/
|
||||
@ -3847,7 +4052,7 @@ TEST_F(ConnectionPoolFailureTest, FailedPoolEnforcesBackoffOnEachSubsequentFailu
|
||||
// scheduled tasks are queued. By cancelling the token and shutting down the pool inside one
|
||||
// executor task, we guarantee the onCancel callback is deferred until after the SpecificPool
|
||||
// is destroyed.
|
||||
TEST_F(ConnectionPoolTest, CancellationCallbackSurvivesPoolDestruction) {
|
||||
TEST_F(ConnectionPoolCancellationTest, CancellationCallbackSurvivesPoolDestruction) {
|
||||
CancellationSource source;
|
||||
auto pool = makePool();
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user