SERVER-126239 Improve unit testing for ConnectionPool failure, expiry and drop (#53560)

GitOrigin-RevId: f2c64ce876f3975943ae8b06109120ea12b8d85e
This commit is contained in:
Cheahuychou Mao 2026-05-21 12:09:01 -04:00 committed by MongoDB Bot
parent 84ce383729
commit 8f1a18bdbd

View File

@ -286,6 +286,9 @@ class ConnectionPoolSpawningTest : public ConnectionPoolTest {};
class ConnectionPoolSetupTest : public ConnectionPoolTest {};
class ConnectionPoolReturnAndRefreshTest : public ConnectionPoolTest {};
class ConnectionPoolLeasingTest : public ConnectionPoolTest {};
class ConnectionPoolFailureTest : public ConnectionPoolTest {};
class ConnectionPoolExpiryTest : public ConnectionPoolTest {};
class ConnectionPoolDropTest : public ConnectionPoolTest {};
/**
* Verify that a request is rejected immediately when the pending request queue is at capacity.
@ -618,9 +621,9 @@ TEST_F(ConnectionPoolReturnAndRefreshTest, ConnectionsNotUsedRecentlyArePurged)
}
/**
* Verify that a failed connection isn't returned to the pool
* Verify that a failed connection isn't returned to the pool.
*/
TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
TEST_F(ConnectionPoolFailureTest, ConnectionMarkedFailedIsDroppedOnReturn) {
auto pool = makePool();
// Grab the first connection and indicate that it failed
@ -650,10 +653,10 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
}
/**
* Verify that a connection returned with an error indicating the remote
* is unavailable drops current generation connections to that remote.
* Verify that a connection returned with an error indicating the host is unavailable drops
* all connections to that host.
*/
TEST_F(ConnectionPoolTest, FailedHostDropsConns) {
TEST_F(ConnectionPoolFailureTest, FailedHostErrorsDropConnections) {
auto pool = makePool();
ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), 0U);
@ -721,10 +724,10 @@ TEST_F(ConnectionPoolTest, FailedHostDropsConns) {
}
/**
* Verify that a connection returned with an error that does _not_ indicate
* the remote is unavailable does _not_ drop current generation connections to that remote.
* Verify that a connection returned with an error that does not indicate the host is
* unavailable does not drop other connections to that host.
*/
TEST_F(ConnectionPoolTest, OtherErrorsDontDropConns) {
TEST_F(ConnectionPoolFailureTest, NonFailedHostErrorsDontDropConnections) {
auto pool = makePool();
ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), 0U);
@ -1582,10 +1585,9 @@ TEST_F(ConnectionPoolSpawningTest, MinConnections) {
/**
* Verify that the hostTimeout is respected. This implies that an idle
* hostAndPort drops it's connections.
* Verify that an idle pool's connections are dropped after the host timeout elapses.
*/
TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
TEST_F(ConnectionPoolExpiryTest, IdlePoolExpiresAfterHostTimeout) {
ConnectionPool::Options options;
options.refreshRequirement = Milliseconds(5000);
options.refreshTimeout = Milliseconds(5000);
@ -1637,10 +1639,9 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
/**
* Verify that the hostTimeout happens, but that continued gets delay
* activation.
* Verify that the host timeout is delayed as long as there are pending checkout requests.
*/
TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
TEST_F(ConnectionPoolExpiryTest, IdlePoolExpiryIsDelayedByOutstandingRequests) {
ConnectionPool::Options options;
options.refreshRequirement = Milliseconds(5000);
options.refreshTimeout = Milliseconds(5000);
@ -1727,10 +1728,9 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
/**
* Verify that the hostTimeout happens and that having a connection checked out
* delays things
* Verify that the host timeout is delayed while connections are checked out.
*/
TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
TEST_F(ConnectionPoolExpiryTest, IdlePoolExpiryIsDelayedByCheckedOutConnections) {
ConnectionPool::Options options;
options.refreshRequirement = Milliseconds(5000);
options.refreshTimeout = Milliseconds(5000);
@ -1818,9 +1818,121 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
}
/**
* Verify that drop connections works
* Verify that pool expiry is delayed while a leased connection is outstanding.
*/
TEST_F(ConnectionPoolTest, dropConnections) {
TEST_F(ConnectionPoolExpiryTest, IdlePoolExpiryIsDelayedByLeasedConnections) {
ConnectionPool::Options options;
options.refreshRequirement = Milliseconds(5000);
options.refreshTimeout = Milliseconds(5000);
options.hostTimeout = Milliseconds(1000);
auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
ConnectionPool::ConnectionHandle leasedConn;
size_t leasedConnId = 0;
// Lease a connection.
ConnectionImpl::pushSetup(Status::OK());
unittest::threadAssertionMonitoredTest([&](auto& monitor) {
pool->lease_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
monitor.exec([&]() {
leasedConnId = verifyAndGetId(swConn);
leasedConn = std::move(swConn.getValue());
});
});
});
ASSERT(leasedConnId);
// Advance past hostTimeout: pool must NOT expire while the lease is outstanding.
PoolImpl::setNow(now + Milliseconds(1000));
ASSERT_EQ(1u, getStats(pool).totalLeased);
// Release the lease and advance another hostTimeout so the pool expires.
doneWith(leasedConn);
PoolImpl::setNow(now + Milliseconds(2000));
// A new checkout must spawn a fresh connection, confirming the old pool expired.
size_t newConnId = 0;
ConnectionImpl::pushSetup(Status::OK());
unittest::threadAssertionMonitoredTest([&](auto& monitor) {
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
monitor.exec([&]() {
newConnId = verifyAndGetId(swConn);
ASSERT_NE(leasedConnId, newConnId);
doneWith(swConn.getValue());
});
});
});
ASSERT(newConnId);
}
/**
* Verify that a pool is not destroyed while a connection handle is outstanding.
*/
TEST_F(ConnectionPoolExpiryTest, PoolIsDestroyedAfterAllHandlesAreReleased) {
ConnectionPool::Options options;
options.refreshRequirement = Milliseconds(5000);
options.refreshTimeout = Milliseconds(5000);
options.hostTimeout = Milliseconds(1000);
auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
ConnectionPool::ConnectionHandle conn;
size_t connId = 0;
// Check out a connection.
ConnectionImpl::pushSetup(Status::OK());
unittest::threadAssertionMonitoredTest([&](auto& monitor) {
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
monitor.exec([&]() {
connId = verifyAndGetId(swConn);
conn = std::move(swConn.getValue());
});
});
});
ASSERT(connId);
// The pool must not expire while the connection is checked out.
PoolImpl::setNow(now + Milliseconds(1000));
ASSERT_EQ(1u, getStats(pool).totalInUse);
// Return the connection and advance past another hostTimeout so the pool expires.
doneWith(conn);
PoolImpl::setNow(now + Milliseconds(2000));
// A subsequent checkout spawns a fresh connection, confirming the pool expired.
size_t newConnId = 0;
ConnectionImpl::pushSetup(Status::OK());
unittest::threadAssertionMonitoredTest([&](auto& monitor) {
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
monitor.exec([&]() {
newConnId = verifyAndGetId(swConn);
ASSERT_NE(connId, newConnId);
doneWith(swConn.getValue());
});
});
});
ASSERT(newConnId);
}
/**
* Verify that dropping connections for a specific host fails pending requests, causes returned
* checked-out connections to be discarded, and does not block subsequent checkouts on a stale
* in-flight refresh.
*/
TEST_F(ConnectionPoolDropTest, DropConnectionsForHost) {
ConnectionPool::Options options;
// ensure that only 1 connection is floating around
@ -1928,7 +2040,10 @@ TEST_F(ConnectionPoolTest, dropConnections) {
ASSERT(reachedB);
}
TEST_F(ConnectionPoolTest, DropAllConnectionsWithKeepOpen) {
/**
* Verify that a global dropConnections() skips pools marked keepOpen.
*/
TEST_F(ConnectionPoolDropTest, DropConnectionsForAllSkipsKeepOpenPools) {
ConnectionPool::Options options;
options.maxConnections = 2;
@ -2462,7 +2577,11 @@ void ConnectionPoolTest::dropConnectionsTest(std::shared_ptr<ConnectionPool> con
ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap4));
}
TEST_F(ConnectionPoolTest, DropConnections) {
/**
* Verify that a global dropConnections() removes connections across all host pools, respecting
* the keepOpen flag to protect individual pools from the drop.
*/
TEST_F(ConnectionPoolDropTest, DropConnectionsViaPool) {
ConnectionPool::Options options;
options.minConnections = 0;
auto pool = makePool(options);
@ -2470,7 +2589,11 @@ TEST_F(ConnectionPoolTest, DropConnections) {
dropConnectionsTest(pool, pool);
}
TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) {
/**
* Verify that dropping connections via EgressConnectionCloserManager produces the same behavior
* as calling dropConnections() directly on the pool.
*/
TEST_F(ConnectionPoolDropTest, DropConnectionsViaManager) {
EgressConnectionCloserManager manager;
ConnectionPool::Options options;
options.minConnections = 0;
@ -2480,6 +2603,83 @@ TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) {
dropConnectionsTest(pool, &manager);
}
/**
* Verify that dropping connections fails all pending requests in the queue with
* PooledConnectionsDropped.
*/
TEST_F(ConnectionPoolDropTest, DropConnectionsFailsPendingRequests) {
ConnectionPool::Options options;
options.maxConnections = 1;
auto pool = makePool(options);
// Check out the only allowed connection so subsequent requests must queue.
ConnectionPool::ConnectionHandle handle;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
handle = std::move(swConn.getValue());
});
ASSERT(handle);
// Queue two more requests that cannot be served while the connection is checked out.
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn1, conn2;
pool->get_forTest(
HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1 = std::move(swConn); });
pool->get_forTest(
HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2 = std::move(swConn); });
ASSERT(!conn1);
ASSERT(!conn2);
// Dropping connections fails both queued requests.
pool->dropConnections(HostAndPort());
ASSERT(conn1);
ASSERT(!conn1->isOK());
ASSERT_EQ(conn1->getStatus().code(), ErrorCodes::PooledConnectionsDropped);
ASSERT(conn2);
ASSERT(!conn2->isOK());
ASSERT_EQ(conn2->getStatus().code(), ErrorCodes::PooledConnectionsDropped);
doneWith(handle);
}
/**
* Verify that a checked-out connection returned after a drop is discarded rather than recycled
* into the pool.
*/
TEST_F(ConnectionPoolDropTest, CheckedOutConnectionReturnedAfterDropIsDiscarded) {
ConnectionPool::Options options;
options.maxConnections = 1;
auto pool = makePool(options);
// Check out a connection and hold it.
ConnectionPool::ConnectionHandle handle;
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
handle = std::move(swConn.getValue());
});
ASSERT(handle);
// Drop connections for the host.
pool->dropConnections(HostAndPort());
// Return the now-stale handle; it should be discarded, not recycled.
doneWith(handle);
// The stale connection must not end up in the ready pool.
ASSERT_EQ(0u, getStats(pool).totalAvailable);
ASSERT_EQ(1u, getStats(pool).totalCreated);
}
/**
* Verify that a request queued while the pool is at capacity is fulfilled once the checked-out
* connection is returned.
@ -3072,7 +3272,11 @@ TEST_F(ConnectionPoolSetupTest, SetupFailureWithEstablishedConnectionsDoesNotCau
}
}
TEST_F(ConnectionPoolTest, WhenARefreshFailsNewConnectionsAreSpawnedAfterHostTimeout) {
/**
* Verify that after a refresh failure, the pool spawns a replacement connection once
* kHostRetryTimeout has elapsed.
*/
TEST_F(ConnectionPoolFailureTest, RefreshFailureSpawnsNewConnectionsAfterRetryTimeout) {
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> connToTriggerSetup;
@ -3147,7 +3351,11 @@ TEST_F(ConnectionPoolTest, WhenARefreshFailsNewConnectionsAreSpawnedAfterHostTim
}
}
TEST_F(ConnectionPoolTest, WhenARefreshFailsNewConnectionsAreSpawnedOnNewConnectionRequest) {
/**
* Verify that after a refresh failure, the pool spawns a replacement connection when the next
* checkout request arrives, even before kHostRetryTimeout has elapsed.
*/
TEST_F(ConnectionPoolFailureTest, RefreshFailureSpawnsNewConnectionOnNextCheckout) {
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> connToTriggerSetup;
@ -3235,7 +3443,10 @@ TEST_F(ConnectionPoolTest, WhenARefreshFailsNewConnectionsAreSpawnedOnNewConnect
doneWith(conn1->getValue());
}
TEST_F(ConnectionPoolTest, HealthyPoolReportsHealthyState) {
/**
* Verify that a pool with no failures reports a healthy state.
*/
TEST_F(ConnectionPoolFailureTest, HealthyPoolReportsHealthyState) {
auto pool = makePool();
ConnectionImpl::pushSetup(Status::OK());
@ -3250,7 +3461,11 @@ TEST_F(ConnectionPoolTest, HealthyPoolReportsHealthyState) {
static_cast<int>(ConnectionPoolState::kHealthy));
}
TEST_F(ConnectionPoolTest, FailedPoolReportsFailedState) {
/**
* Verify that a pool that received a network error from a returned connection reports a failed
* state.
*/
TEST_F(ConnectionPoolFailureTest, FailedPoolReportsFailedState) {
auto pool = makePool();
auto now = Date_t::now();
@ -3519,14 +3734,13 @@ private:
stdx::unordered_map<PoolId, PoolData> _poolData;
};
// Reproduces a bug where a SpecificPool whose host has timed out bypasses the kHostRetryTimeout
// backoff after a connection failure. The sequence is:
// 1. processFailure() sets _state = kFailed
// 2. updateHealth() sees the pool is idle past hostTimeout and overwrites kFailed with kExpired
// 3. The controller returns canShutdown = false (other RS members are healthy)
// 4. spawnConnections() does not check for kExpired, so it spawns a new connection immediately
// 5. That connection fails → repeat from (1), creating a tight retry loop
TEST_F(ConnectionPoolTest, FailedExpiredPoolDoesNotRetryWithoutBackoff) {
/**
* Verify that a pool that expires while in a failed state does not immediately retry connections
* without waiting for kHostRetryTimeout. Regression test for a bug where expiring an
* already-failed pool caused the kHostRetryTimeout backoff to be bypassed, creating a tight
* retry loop.
*/
TEST_F(ConnectionPoolFailureTest, FailedPoolExpiredDoesNotRetryWithoutBackoff) {
ConnectionPool::Options options;
options.hostTimeout = Milliseconds(1);
options.refreshRequirement = Milliseconds(5);
@ -3577,6 +3791,49 @@ TEST_F(ConnectionPoolTest, FailedExpiredPoolDoesNotRetryWithoutBackoff) {
ASSERT_EQ(1u, ConnectionImpl::setupQueueDepth());
}
/**
* Verify that the kHostRetryTimeout backoff is re-enforced on each subsequent failure, not just
* the first. After a second failure the pool must again wait a full kHostRetryTimeout before
* spawning new connections.
*/
TEST_F(ConnectionPoolFailureTest, FailedPoolEnforcesBackoffOnEachSubsequentFailureCycle) {
auto pool = makePool();
auto now = Date_t::now();
PoolImpl::setNow(now);
// Request a connection to trigger a setup.
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn;
pool->get_forTest(
HostAndPort(), Seconds(60), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
conn = std::move(swConn);
});
// First setup fails, putting the pool in a failed state.
ConnectionImpl::pushSetup(Status(ErrorCodes::HostUnreachable, "host unreachable"));
ASSERT(conn);
ASSERT(!conn->isOK());
// No new setup before the first kHostRetryTimeout.
PoolImpl::setNow(now + ConnectionPool::kHostRetryTimeout - Milliseconds{1});
ASSERT_EQ(0u, ConnectionImpl::setupQueueDepth());
// At kHostRetryTimeout the pool recovers and spawns a new setup.
PoolImpl::setNow(now + ConnectionPool::kHostRetryTimeout);
ASSERT_EQ(1u, ConnectionImpl::setupQueueDepth());
// Second setup fails, putting the pool in a failed state again.
ConnectionImpl::pushSetup(Status(ErrorCodes::HostUnreachable, "host unreachable"));
// No new setup before the second kHostRetryTimeout (measured from the second failure).
PoolImpl::setNow(now + ConnectionPool::kHostRetryTimeout * 2 - Milliseconds{1});
ASSERT_EQ(0u, ConnectionImpl::setupQueueDepth());
// At the second kHostRetryTimeout another new setup is spawned.
PoolImpl::setNow(now + ConnectionPool::kHostRetryTimeout * 2);
ASSERT_EQ(1u, ConnectionImpl::setupQueueDepth());
}
// Reproduces a use-after-free where the cancellation callback registered in
// SpecificPool::getConnection captures a raw `this` pointer without a shared_from_this() anchor.
// The sequence is:
@ -3711,12 +3968,67 @@ private:
std::set<HostAndPort> _removedHosts;
};
/**
* Verify that a network failure on one host in a host group does not affect the other host's pool.
*/
TEST_F(ConnectionPoolFailureTest, HostGroupFailureOnOneHostDoesNotAffectOtherHosts) {
const HostAndPort primary("primary:27017");
const HostAndPort secondary("secondary:27017");
auto controller = std::make_shared<HostGroupLimitController>(primary, secondary);
ConnectionPool::Options options;
options.controllerFactory =
[controller]() -> std::shared_ptr<ConnectionPool::ControllerInterface> {
return controller;
};
auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
// Establish and return a connection to each host.
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(
primary, Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
doneWith(swConn.getValue());
});
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(
secondary, Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
doneWith(swConn.getValue());
});
// Return a primary connection with a network error, triggering pool failure.
pool->get_forTest(
primary, Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
doneWithError(swConn.getValue(), {ErrorCodes::HostUnreachable, "error"});
});
auto stats = getStats(pool);
ASSERT_EQ(static_cast<int>(stats.statsByHost.at(primary).poolState),
static_cast<int>(ConnectionPoolState::kFailed));
// The secondary pool should remain healthy with its connection available.
ASSERT_EQ(static_cast<int>(stats.statsByHost.at(secondary).poolState),
static_cast<int>(ConnectionPoolState::kHealthy));
ASSERT_EQ(1u, stats.statsByHost.at(secondary).available);
// A request to the secondary is served from the ready pool without a new setup.
pool->get_forTest(
secondary, Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT_OK(swConn.getStatus());
doneWith(swConn.getValue());
});
ASSERT_EQ(0u, ConnectionImpl::setupQueueDepth());
}
// Idle pools have no incoming requests or returning connections to trigger a state update.
// Instead, a repeating timer fires periodically, and that timer is the only mechanism by which
// an idle pool can detect that it has expired and shut itself down.
//
// Verify that the timer respects the configured host timeout.
TEST_F(ConnectionPoolTest, HostGroupPoolExpiresAfterHostTimeout) {
TEST_F(ConnectionPoolExpiryTest, HostGroupPoolExpiresAfterHostTimeout) {
const HostAndPort primary("primary:27017");
const HostAndPort secondary("secondary:27017");