SERVER-126237 Improve unit testing for ConnectionPool checkout, setup, return and refresh (#53558)

GitOrigin-RevId: c07d39923c62a680103fbdd4b08f2ad95160643e
This commit is contained in:
Cheahuychou Mao 2026-05-20 14:14:16 -04:00 committed by MongoDB Bot
parent 158253ad8c
commit 1eb7a7c36e
3 changed files with 383 additions and 48 deletions

View File

@ -117,6 +117,15 @@ protected:
ExecutorFuture(_executor).getAsync([conn = std::move(conn)](auto) {});
}
/**
* Destroys a connection handle without indicating success or failure by scheduling its
* destruction onto the executor. Use this instead of destroying the handle directly to avoid
* deadlocking with the inline test executor.
*/
void dropHandle(ConnectionPool::ConnectionHandle& conn) {
ExecutorFuture(_executor).getAsync([conn = std::move(conn)](auto) {});
}
using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>;
auto getId(const ConnectionPool::ConnectionHandle& conn) {
@ -270,7 +279,17 @@ private:
logv2::LogComponent::kConnectionPool, logv2::LogSeverity::Debug(5)};
};
TEST_F(ConnectionPoolTest, CheckRejectedConnectionRequest) {
// Fixtures are listed in execution-flow order. Tests within each fixture run in definition order.
class ConnectionPoolCheckoutTest : public ConnectionPoolTest {};
class ConnectionPoolQueuingTest : public ConnectionPoolTest {};
class ConnectionPoolSpawningTest : public ConnectionPoolTest {};
class ConnectionPoolSetupTest : public ConnectionPoolTest {};
class ConnectionPoolReturnAndRefreshTest : public ConnectionPoolTest {};
/**
* Verify that a request is rejected immediately when the pending request queue is at capacity.
*/
TEST_F(ConnectionPoolQueuingTest, RequestRejectedWhenQueueDepthExceeded) {
ConnectionPool::Options opts;
opts.connectionRequestsMaxQueueDepth = 1;
auto pool = makePool(opts);
@ -286,10 +305,10 @@ TEST_F(ConnectionPoolTest, CheckRejectedConnectionRequest) {
}
/**
* Verify that the limit on the size of connection requests queue
* is enforced properly.
* Verify that a request is rejected immediately before it enters the queue, without needing the
* queue to be at capacity.
*/
TEST_F(ConnectionPoolTest, CheckRejectedConnectionRequestBasic) {
TEST_F(ConnectionPoolQueuingTest, RequestRejectedBeforeQueuing) {
auto pool = makePool();
FailPointEnableBlock fpb("connectionPoolRejectsConnectionRequests");
auto fut = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds(1));
@ -347,7 +366,7 @@ TEST_F(ConnectionPoolTest, StatsTest) {
* Verify that we get the same connection if we grab one, return it and grab
* another.
*/
TEST_F(ConnectionPoolTest, SameConn) {
TEST_F(ConnectionPoolCheckoutTest, ReturnedConnectionIsReusedOnNextCheckout) {
auto pool = makePool();
// Grab and stash an id for the first request
@ -379,7 +398,7 @@ TEST_F(ConnectionPoolTest, SameConn) {
/**
* Verify that connections are obtained in MRU order.
*/
TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
TEST_F(ConnectionPoolCheckoutTest, ConnectionsAreAcquiredInMRUOrder) {
auto pool = makePool();
std::random_device rd;
std::mt19937 rng(rd());
@ -479,9 +498,9 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
}
/**
* Verify that recently used connections are not purged.
* Verify that recently used connections are not purged, while connections not used recently are.
*/
TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
TEST_F(ConnectionPoolReturnAndRefreshTest, ConnectionsNotUsedRecentlyArePurged) {
ConnectionPool::Options options;
options.minConnections = 0;
options.refreshRequirement = Milliseconds(1000);
@ -781,10 +800,9 @@ TEST_F(ConnectionPoolTest, OtherErrorsDontDropConns) {
}
/**
* Verify that providing different host and ports gives you different
* connections.
* Verify that requests for different hosts get different connections.
*/
TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
TEST_F(ConnectionPoolCheckoutTest, DifferentHostsDifferentConnections) {
auto pool = makePool();
// Conn 1 from port 30000
@ -814,9 +832,9 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
}
/**
* Verify that not returning handle's to the pool spins up new connections.
* Verify that a checked-out connection is not reused for a new request for the same host.
*/
TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) {
TEST_F(ConnectionPoolCheckoutTest, CheckedOutConnectionIsNotReusedForNewRequest) {
auto pool = makePool();
// Get the first connection, move it out rather than letting it return
@ -856,29 +874,102 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) {
}
/**
* When the timeout duration comes from the parameter, the connection timeout status should
* always be `PooledConnectionAcquisitionExceededTimeLimit`.
* Verify that a reused connection has its status reset to a clean state before being
* handed to the next caller.
*/
TEST_F(ConnectionPoolTest, TimeoutOnAquisitionTimeout) {
TEST_F(ConnectionPoolCheckoutTest, CheckedOutConnectionStatusIsResetToUnknown) {
auto pool = makePool();
// Check out a connection and indicate success so it returns to the ready pool with a
// non-unknown status.
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
doneWith(swConn.getValue());
});
// Check out the same connection again and drop it without indicating success or failure.
// If the status reset on checkout is working, the pool treats this as a non-network error
// and drops the connection rather than recycling it.
ConnectionPool::ConnectionHandle conn;
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
conn = std::move(swConn.getValue());
});
dropHandle(conn);
// The connection is discarded rather than recycled, so the ready pool is empty and a new setup
// spawns.
ASSERT_EQ(0, getStats(pool).totalAvailable);
ASSERT_EQ(static_cast<int>(ConnectionPoolState::kHealthy),
static_cast<int>(getStats(pool).statsByHost.at(HostAndPort()).poolState));
ASSERT_EQ(1, ConnectionImpl::setupQueueDepth());
}
/**
* Verify that an unhealthy connection in the ready pool is dropped at checkout and a fresh
* connection is established instead.
*/
TEST_F(ConnectionPoolCheckoutTest, UnhealthyReadyConnectionIsDroppedOnCheckout) {
auto pool = makePool();
size_t conn1Id = 0;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
conn1Id = getId(swConn.getValue());
dynamic_cast<ConnectionImpl*>(swConn.getValue().get())->setUnhealthy();
doneWith(swConn.getValue());
});
size_t conn2Id = 0;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
conn2Id = getId(swConn.getValue());
doneWith(swConn.getValue());
});
ASSERT(conn1Id);
ASSERT(conn2Id);
ASSERT_NE(conn1Id, conn2Id);
}
/**
* Verify that when the caller's acquisition timeout fires first, the error returned is
* PooledConnectionAcquisitionExceededTimeLimit.
*/
TEST_F(ConnectionPoolQueuingTest,
AcquisitionTimeoutBeforePendingTimeoutReturnsExceededTimeLimitError) {
assertTimeoutHelper(
/* timeout duration */ Milliseconds{100},
/* expected timeout codes */ ErrorCodes::PooledConnectionAcquisitionExceededTimeLimit);
}
/**
* When the timeout duration comes from controller refresh, the connection timeout status should
* always be `HostUnreachable`.
* Verify that when the pool's pending connection timeout fires first, the error returned is
* HostUnreachable.
*/
TEST_F(ConnectionPoolTest, TimeoutOnControllerRefresh) {
TEST_F(ConnectionPoolQueuingTest,
PendingTimeoutBeforeAcquisitionTimeoutReturnsHostUnreachableError) {
assertTimeoutHelper(
/* timeout duration */ Milliseconds{500},
/* expected timeout codes */ ErrorCodes::HostUnreachable);
}
/**
* Verify that refresh callbacks happen at the appropriate moments.
* Verify that an idle connection is refreshed after the refresh requirement timeout elapses.
*/
TEST_F(ConnectionPoolTest, refreshHappens) {
TEST_F(ConnectionPoolReturnAndRefreshTest,
IdleConnectionIsRefreshedAfterRefreshRequirementTimeout) {
bool refreshedA = false;
bool refreshedB = false;
ConnectionImpl::pushRefresh([&]() {
@ -930,9 +1021,9 @@ TEST_F(ConnectionPoolTest, refreshHappens) {
}
/**
* Verify that refresh can timeout.
* Verify that refresh can time out.
*/
TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
TEST_F(ConnectionPoolReturnAndRefreshTest, RefreshTimesOut) {
ConnectionPool::Options options;
options.refreshRequirement = Milliseconds(1000);
options.refreshTimeout = Milliseconds(2000);
@ -1017,9 +1108,9 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
}
/**
* Verify that requests are served in expiration order, not insertion order
* Verify that requests are served in expiration order, not insertion order.
*/
TEST_F(ConnectionPoolTest, requestsServedByUrgency) {
TEST_F(ConnectionPoolQueuingTest, RequestsAreServedInExpirationOrder) {
auto pool = makePool();
bool reachedA = false;
@ -1068,9 +1159,9 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) {
}
/**
* Verify that we respect maxConnections
* Verify that the pool does not create more connections than the configured maximum.
*/
TEST_F(ConnectionPoolTest, maxPoolRespected) {
TEST_F(ConnectionPoolSpawningTest, MaxConnections) {
ConnectionPool::Options options;
options.minConnections = 1;
options.maxConnections = 2;
@ -1140,9 +1231,9 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) {
}
/**
* Verify that we respect maxConnecting
* Verify that new setups are blocked when the concurrent setup limit is reached.
*/
TEST_F(ConnectionPoolTest, maxConnectingRespected) {
TEST_F(ConnectionPoolSpawningTest, MaxConnectingLimitCapsNewSetups) {
ConnectionPool::Options options;
options.minConnections = 1;
options.maxConnecting = 2;
@ -1213,10 +1304,9 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) {
}
/**
* Verify that refresh callbacks block new connections, then trigger new connection spawns after
* they return
* Verify that in-progress refreshes count toward the concurrent setup limit and block new setups.
*/
TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) {
TEST_F(ConnectionPoolSpawningTest, MaxConnectingLimitRefreshBlocksNewSetup) {
ConnectionPool::Options options;
options.maxConnecting = 1;
options.refreshRequirement = Milliseconds(1000);
@ -1270,9 +1360,9 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) {
}
/**
* Verify that refreshes block new connects, but don't themselves respect maxConnecting
* Verify that in-progress refreshes are not themselves subject to the concurrent setup limit.
*/
TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) {
TEST_F(ConnectionPoolSpawningTest, MaxConnectingLimitDoesNotApplyToRefreshes) {
ConnectionPool::Options options;
options.maxConnecting = 2;
options.minConnections = 3;
@ -1374,9 +1464,9 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) {
}
/**
* Verify that minConnections is respected
* Verify that the pool maintains at least the configured minimum number of connections.
*/
TEST_F(ConnectionPoolTest, minPoolRespected) {
TEST_F(ConnectionPoolSpawningTest, MinConnections) {
ConnectionPool::Options options;
options.minConnections = 2;
options.maxConnections = 3;
@ -1910,7 +2000,7 @@ TEST_F(ConnectionPoolTest, DropAllConnectionsWithKeepOpen) {
* out host as Unknown. Therefore, pending connections may be dropped in this layer as
* a reaction to setup timeout.
*/
TEST_F(ConnectionPoolTest, SetupTimeoutsFailOtherPendingRequestsWhenPoolIsEmpty) {
TEST_F(ConnectionPoolSetupTest, SetupTimeoutsFailOtherPendingRequestsWhenPoolIsEmpty) {
ConnectionPool::Options options;
options.maxConnections = 1;
@ -1952,7 +2042,11 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsFailOtherPendingRequestsWhenPoolIsEmpty)
ASSERT_EQ(conn2->getStatus(), ErrorCodes::HostUnreachable);
}
TEST_F(ConnectionPoolTest, SetupTimeoutsDontFailOtherPendingRequestsWhenPoolIsNotEmpty) {
/**
* Verify that a setup timeout does not fail other pending requests when established connections
* exist. Those requests wait until their own acquisition timeout expires instead.
*/
TEST_F(ConnectionPoolSetupTest, SetupTimeoutsDontFailOtherPendingRequestsWhenPoolIsNotEmpty) {
auto refreshTimeout = Seconds{2};
auto acquisitionTimeout = Seconds{20};
@ -2010,9 +2104,9 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontFailOtherPendingRequestsWhenPoolIsNo
}
/**
* Verify that timeouts during refresh time out other pending requests.
* Verify that a refresh timeout fails all pending requests for the same host.
*/
TEST_F(ConnectionPoolTest, RefreshTimeoutsFailPendingRequests) {
TEST_F(ConnectionPoolReturnAndRefreshTest, RefreshTimeoutFailsPendingRequests) {
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> connToTriggerSetup;
auto [pool, inUseConnections] = setupConnectionPool(
@ -2079,7 +2173,10 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsFailPendingRequests) {
ASSERT_EQ(conn1->getStatus(), ErrorCodes::HostUnreachable);
}
TEST_F(ConnectionPoolTest, RefreshTimeoutsDropAvailableConnections) {
/**
* Verify that a refresh timeout drops available connections for the same host.
*/
TEST_F(ConnectionPoolReturnAndRefreshTest, RefreshTimeoutDropsAvailableConnections) {
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> connToTriggerSetup;
const auto refreshTimeout = Seconds{2};
@ -2146,6 +2243,162 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDropAvailableConnections) {
}
}
/**
* Verify that dropping a handle without indicating success or failure drops only that single
* connection, leaves the pool healthy, and spawns a replacement.
*/
TEST_F(ConnectionPoolReturnAndRefreshTest,
DroppingHandleWithoutIndicatingDropsSingleConnectionOnly) {
ConnectionPool::Options options;
options.minConnections = 2;
options.maxConnections = 2;
auto pool = makePool(options);
// Check out both connections.
ConnectionPool::ConnectionHandle conn1;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
conn1 = std::move(swConn.getValue());
});
ConnectionPool::ConnectionHandle conn2;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
conn2 = std::move(swConn.getValue());
});
// Drop conn1 without indicating success or failure.
dropHandle(conn1);
auto connStats = getStats(pool);
ASSERT_EQ(0, connStats.totalAvailable);
ASSERT_EQ(1, connStats.totalInUse);
auto hostStats = connStats.statsByHost.at(HostAndPort());
ASSERT_EQ(static_cast<int>(hostStats.poolState),
static_cast<int>(ConnectionPoolState::kHealthy));
ASSERT_EQ(1, ConnectionImpl::setupQueueDepth());
doneWith(conn2);
}
/**
* Verify that a ConnectionError during refresh drops only that single connection and leaves
* the pool healthy, unlike other errors that fail the entire pool.
*/
TEST_F(ConnectionPoolReturnAndRefreshTest,
RefreshFailureWithConnectionErrorDropsSingleConnectionOnly) {
ConnectionPool::Options options;
options.minConnections = 0;
options.refreshRequirement = Milliseconds(1000);
auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
// Check out one connection, keeping it checked out so no ready-pool timer is armed.
ConnectionPool::ConnectionHandle conn;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
conn = std::move(swConn.getValue());
});
// Advance past the refresh requirement while the connection is checked out.
PoolImpl::setNow(now + Milliseconds(1000));
// Return the stale connection so it is queued for refresh.
doneWith(conn);
// Fail the refresh with a ConnectionError.
ConnectionImpl::pushRefresh(Status(ErrorCodes::ConnectionError, "connection error"));
// With minConnections=0, no replacement is spawned after the drop.
auto connStats = getStats(pool);
ASSERT_EQ(0, connStats.totalAvailable);
ASSERT_EQ(0, connStats.totalRefreshing);
ASSERT_EQ(1, connStats.totalCreated);
auto hostStats = connStats.statsByHost.at(HostAndPort());
ASSERT_EQ(static_cast<int>(hostStats.poolState),
static_cast<int>(ConnectionPoolState::kHealthy));
}
/**
* Verify that a connection that completes refresh after a failure event is discarded rather than
* added to the pool.
*/
TEST_F(ConnectionPoolReturnAndRefreshTest, InFlightRefreshCompletedAfterProcessFailureIsDiscarded) {
ConnectionPool::Options options;
options.minConnections = 2;
options.maxConnections = 2;
options.maxConnecting = 2;
options.refreshRequirement = Milliseconds(1000);
auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
// Establish both connections (both start checked out).
ConnectionPool::ConnectionHandle conn1;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
conn1 = std::move(swConn.getValue());
});
ConnectionPool::ConnectionHandle conn2;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
conn2 = std::move(swConn.getValue());
});
// Advance past the refresh requirement while both connections are checked out.
PoolImpl::setNow(now + Milliseconds(1000));
// Return both stale connections so they are queued for refresh.
doneWith(conn1);
doneWith(conn2);
{
auto connStats = getStats(pool);
ASSERT_EQ(0, connStats.totalAvailable);
ASSERT_EQ(2, connStats.totalRefreshing);
}
// Fail conn1's refresh with HostUnreachable, putting the pool in a failed state.
ConnectionImpl::pushRefresh(Status(ErrorCodes::HostUnreachable, "host unreachable"));
{
auto stats = getStats(pool);
auto hostStats = stats.statsByHost.at(HostAndPort());
ASSERT_EQ(static_cast<int>(hostStats.poolState),
static_cast<int>(ConnectionPoolState::kFailed));
}
// Fire conn2's refresh successfully. It is discarded because the pool has already failed.
ConnectionImpl::pushRefresh(Status::OK());
auto connStats = getStats(pool);
ASSERT_EQ(0, connStats.totalAvailable);
ASSERT_EQ(0, connStats.totalRefreshing);
auto hostStats = connStats.statsByHost.at(HostAndPort());
ASSERT_EQ(static_cast<int>(hostStats.poolState),
static_cast<int>(ConnectionPoolState::kFailed));
}
template <typename Ptr>
void ConnectionPoolTest::dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t) {
auto now = Date_t::now();
@ -2226,7 +2479,11 @@ TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) {
dropConnectionsTest(pool, &manager);
}
TEST_F(ConnectionPoolTest, AsyncGet) {
/**
* Verify that a request queued while the pool is at capacity is fulfilled once the checked-out
* connection is returned.
*/
TEST_F(ConnectionPoolQueuingTest, QueuedRequestIsServedWhenConnectionBecomesAvailable) {
ConnectionPool::Options options;
options.maxConnections = 1;
auto pool = makePool(options);
@ -2305,7 +2562,11 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
}
}
TEST_F(ConnectionPoolTest, NegativeTimeout) {
/**
* Verify that a pending request whose timeout has already elapsed is rejected cleanly even
* when the remaining time is negative.
*/
TEST_F(ConnectionPoolQueuingTest, RequestWithNegativeTimeoutIsRejectedImmediately) {
ConnectionPool::Options options;
options.maxConnections = 1;
auto pool = makePool(options);
@ -2623,7 +2884,10 @@ TEST_F(ConnectionPoolTest, EnsureReasonIsLogged) {
ASSERT_EQ(1ul, msgCounter);
}
TEST_F(ConnectionPoolTest, SetupFailuresShouldNotDropOpenConnections) {
/**
* Verify that a setup failure with established connections does not cause pool failure.
*/
TEST_F(ConnectionPoolSetupTest, SetupFailureWithEstablishedConnectionsDoesNotCausePoolFailure) {
// None of these errors should drop available connections. The pool spawns replacement
// connections for each failure.
std::vector<ErrorCodes::Error> setupFailures = {
@ -2876,7 +3140,10 @@ TEST_F(ConnectionPoolTest, FailedPoolReportsFailedState) {
static_cast<int>(ConnectionPoolState::kFailed));
}
TEST_F(ConnectionPoolTest, SetupFailureWithNoEstablishedConnectionsCausesPoolFailure) {
/**
* Verify that a setup failure with no established connections causes pool failure.
*/
TEST_F(ConnectionPoolSetupTest, SetupFailureWithNoEstablishedConnectionsCausesPoolFailure) {
auto pool = makePool();
auto now = Date_t::now();
@ -2900,7 +3167,12 @@ TEST_F(ConnectionPoolTest, SetupFailureWithNoEstablishedConnectionsCausesPoolFai
static_cast<int>(ConnectionPoolState::kFailed));
}
TEST_F(ConnectionPoolTest, SetupFailureWithMultiplePendingAndNoEstablishedCausesPoolFailure) {
/**
* Verify that a setup failure with multiple in-flight setups and no established connections causes
* pool failure.
*/
TEST_F(ConnectionPoolSetupTest,
SetupFailureWithMultipleInFlightSetupsAndNoEstablishedCausesPoolFailure) {
ConnectionPool::Options options;
options.minConnections = 3;
options.maxConnections = 3;
@ -2936,7 +3208,11 @@ TEST_F(ConnectionPoolTest, SetupFailureWithMultiplePendingAndNoEstablishedCauses
static_cast<int>(ConnectionPoolState::kFailed));
}
TEST_F(ConnectionPoolTest, MultipleConsecutiveSetupFailuresDoNotBlockNewConnections) {
/**
* Verify that consecutive setup failures do not block new connection attempts. The pool
* continues to spawn replacements after each failure.
*/
TEST_F(ConnectionPoolSetupTest, MultipleConsecutiveSetupFailuresDoNotBlockNewConnections) {
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> connToTriggerSetup;
auto [pool, inUseConnections] = setupConnectionPool(
@ -2996,6 +3272,58 @@ TEST_F(ConnectionPoolTest, MultipleConsecutiveSetupFailuresDoNotBlockNewConnecti
static_cast<int>(ConnectionPoolState::kHealthy));
}
/**
* Verify that a connection that completes setup after a failure event is discarded rather than
* added to the pool.
*/
TEST_F(ConnectionPoolSetupTest, InFlightSetupCompletedAfterProcessFailureIsDiscarded) {
ConnectionPool::Options options;
options.minConnections = 3;
options.maxConnecting = 3;
auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn1;
pool->get_forTest(
HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
conn1 = std::move(swConn);
});
{
auto connStats = getStats(pool);
ASSERT_EQ(0, connStats.totalAvailable);
ASSERT_EQ(0, connStats.totalInUse);
ASSERT_EQ(3, connStats.totalRefreshing);
}
// One setup failure puts the pool in a failed state with two remaining in-flight setups.
ConnectionImpl::pushSetup(Status(ErrorCodes::HostUnreachable, "host unreachable"));
ASSERT(conn1);
ASSERT(!conn1->isOK());
{
auto stats = getStats(pool);
auto hostStats = stats.statsByHost.at(HostAndPort());
ASSERT_EQ(static_cast<int>(hostStats.poolState),
static_cast<int>(ConnectionPoolState::kFailed));
}
// The two remaining in-flight setups complete successfully but are discarded because the
// pool has already failed.
ConnectionImpl::pushSetup(Status::OK());
ConnectionImpl::pushSetup(Status::OK());
auto connStats = getStats(pool);
ASSERT_EQ(0, connStats.totalAvailable);
ASSERT_EQ(0, connStats.totalRefreshing);
auto hostStats = connStats.statsByHost.at(HostAndPort());
ASSERT_EQ(static_cast<int>(hostStats.poolState),
static_cast<int>(ConnectionPoolState::kFailed));
}
/**
* A controller that behaves like LimitController but always returns canShutdown = false.
* This simulates the ShardingTaskExecutorPoolController behavior where other members of a replica

View File

@ -117,7 +117,11 @@ const HostAndPort& ConnectionImpl::getHostAndPort() const {
}
bool ConnectionImpl::isHealthy() {
return true;
return _healthy;
}
void ConnectionImpl::setUnhealthy() {
_healthy = false;
}
void ConnectionImpl::clear() {

View File

@ -111,6 +111,8 @@ public:
bool isHealthy() override;
void setUnhealthy();
// Dump all connection callbacks
static void clear();
@ -139,6 +141,7 @@ private:
static void processRefresh();
HostAndPort _hostAndPort;
bool _healthy{true};
SetupCallback _setupCallback;
RefreshCallback _refreshCallback;
TimerImpl _timer;