SERVER-123853 Interrupt CSS waiters in the event of a clear (#54201)

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Mathias Stearn <mathias@mongodb.com>
GitOrigin-RevId: 7c7d4cf06d589dd2d7b0a78cd15590cafca324ac
This commit is contained in:
Jordi Olivares Provencio 2026-05-26 12:15:46 +02:00 committed by MongoDB Bot
parent 876d27d03e
commit 5d8deadeb6
5 changed files with 157 additions and 63 deletions

View File

@ -447,6 +447,10 @@ void CollectionShardingRuntime::_clearFilteringMetadata(OperationContext* opCtx,
_placementVersionInRecoverOrRefresh->cancellationSource.cancel();
}
_shardVersionWaiters.cancelWaiters(Status{ErrorCodes::CallbackCanceled,
"Filtering metadata got cleared, cancelling callback "
"to re-evaluate if it's still necessary"});
if (_nss.isNamespaceAlwaysUntracked()) {
// The namespace is always marked as untracked thus there is no need to clear anything.
return;

View File

@ -115,6 +115,7 @@ MONGO_FAIL_POINT_DEFINE(hangInEnsureChunkVersionIsGreaterThanInterruptible);
MONGO_FAIL_POINT_DEFINE(hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible);
MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread);
MONGO_FAIL_POINT_DEFINE(hangBeforePlacementVersionCriticalSectionWait);
MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForConfigTimeOrChunkVersionChange);
MONGO_FAIL_POINT_DEFINE(avoidTassertForInconsistentMetadata);
const auto getDecoration = ServiceContext::declareDecoration<FilteringMetadataCache>();
@ -1196,8 +1197,10 @@ void FilteringMetadataCache::_recoverCollectionMetadataFromDisk(
<< nss.toStringForErrorMsg() << "; last retry reason: " << lastRetryReason);
}
void FilteringMetadataCache::_waitForConfigTimeOrChunkVersionChange(
OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& chunkVersion) {
FilteringMetadataCache::WasWaitInterrupted
FilteringMetadataCache::_waitForConfigTimeOrChunkVersionChange(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& chunkVersion) {
// Use configTime as the upper bound for the wait. Once configTime is reached with majority
// read concern, all DDL critical sections that could have changed the shard version have been
@ -1225,8 +1228,11 @@ void FilteringMetadataCache::_waitForConfigTimeOrChunkVersionChange(
->registerWaiterForChunkVersion(opCtx, ShardVersionFactory::make(chunkVersion));
const auto fixedExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
// The order here is important for unit tests as the majorityFuture is immediately fulfilled but
// whenAny goes in order of the arguments to setup.
auto waitForEither = [&] {
whenAny(majorityFuture.thenRunOn(fixedExecutor), versionFuture.thenRunOn(fixedExecutor))
return whenAny(versionFuture.thenRunOn(fixedExecutor),
majorityFuture.thenRunOn(fixedExecutor))
.get(opCtx);
};
@ -1236,7 +1242,17 @@ void FilteringMetadataCache::_waitForConfigTimeOrChunkVersionChange(
// above.
makeNoopWriteToAdvanceClusterTime(opCtx, configTime).ignore();
waitForEither();
hangBeforeWaitingForConfigTimeOrChunkVersionChange.pauseWhileSet(opCtx);
auto status = waitForEither().result;
if (!status.isOK()) {
// clearFilteringMetadata cancels the CSS version waiter with CallbackCanceled. When that
// happens the metadata we recovered is now stale, so the caller must retry the recovery.
if (status.code() == ErrorCodes::CallbackCanceled) {
return WasWaitInterrupted::kYes;
}
uassertStatusOK(status);
}
auto& stats = ShardingStatistics::get(opCtx).authoritativeCollectionMetadataStatistics;
if (versionFuture.isReady()) {
@ -1244,6 +1260,7 @@ void FilteringMetadataCache::_waitForConfigTimeOrChunkVersionChange(
} else {
stats.registerPostRecoveryWaitResolvedByConfigTime();
}
return WasWaitInterrupted::kNo;
}
bool FilteringMetadataCache::_isRecoveredShardVersionSufficient(
@ -1358,68 +1375,77 @@ void FilteringMetadataCache::_onCollectionPlacementVersionMismatchAuthoritative(
logAttrs(nss),
"shardVersionReceived"_attr = receivedShardVersion);
// Step 1: Before reading from disk, join any ongoing placement version operations. If the
// metadata is already known and satisfies the received version, we can skip disk recovery
// entirely.
if (receivedShardVersion &&
_isRecoveredShardVersionSufficient(opCtx, nss, *receivedShardVersion)) {
ShardingStatistics::get(opCtx)
.authoritativeCollectionMetadataStatistics.registerVersionResolvedBeforeRecovery();
while (true) {
// Step 1: Before reading from disk, join any ongoing placement version operations. If the
// metadata is already known and satisfies the received version, we can skip disk recovery
// entirely.
if (receivedShardVersion &&
_isRecoveredShardVersionSufficient(opCtx, nss, *receivedShardVersion)) {
ShardingStatistics::get(opCtx)
.authoritativeCollectionMetadataStatistics.registerVersionResolvedBeforeRecovery();
LOGV2(12307911,
LOGV2(12307911,
"Authoritative collection metadata recovery completed successfully",
logAttrs(nss),
"outcome"_attr = "versionAlreadySufficientBeforeRecovery",
"durationMillis"_attr = t.millis());
return;
}
// Step 2: Recover the shard's current metadata from the authoritative on-disk catalog. This
// ensures the CSS reflects the current disk durable state.
_recoverCollectionMetadataFromDisk(opCtx, nss, receivedShardVersion);
// No specific version requested. The caller just wanted a forced recovery from disk, which
// has already completed above.
if (!receivedShardVersion) {
LOGV2(12307912,
"Authoritative collection metadata recovery completed successfully",
logAttrs(nss),
"outcome"_attr = "forcedDiskRecovery",
"durationMillis"_attr = t.millis());
return;
}
// Step 3: If both the router's version and the shard's version are comparable (both tracked
// or both untracked), we can use the partial ordering to determine whether the shard is
// already up to date or the router is stale. In either case the caller can return to the
// retry loop.
if (_isRecoveredShardVersionSufficient(opCtx, nss, *receivedShardVersion)) {
ShardingStatistics::get(opCtx)
.authoritativeCollectionMetadataStatistics.registerVersionResolvedAfterRecovery();
LOGV2(12307913,
"Authoritative collection metadata recovery completed successfully",
logAttrs(nss),
"outcome"_attr = "versionMatchedAfterDiskRecovery",
"durationMillis"_attr = t.millis());
return;
}
// Step 4: The versions are not comparable (e.g. one is tracked and the other is untracked).
// We cannot determine ordering from the version tuple alone, so we rely on the oplog
// timeline: wait until either the shard's version matches the router's (via oplog
// application) or until configTime is reached with majority read concern. configTime is an
// upper bound because all DDL critical sections that could change the shard version have
// been applied by that point. On a primary this wait is an instant no-op.
//
// Once done, the shard is guaranteed to be authoritative: if the versions still don't match
// after configTime, the router is stale.
auto result = _waitForConfigTimeOrChunkVersionChange(opCtx, nss, *receivedShardVersion);
if (result == WasWaitInterrupted::kYes) {
// Waits can get interrupted if there's a clearFilteringMetadata on the CSR. At this
// point what was recovered is now stale so it should be recovered again.
continue;
}
LOGV2(12307914,
"Authoritative collection metadata recovery completed successfully",
logAttrs(nss),
"outcome"_attr = "versionAlreadySufficientBeforeRecovery",
"outcome"_attr = "shardVersionMatchOrConfigTimeWait",
"durationMillis"_attr = t.millis());
return;
}
// Step 2: Recover the shard's current metadata from the authoritative on-disk catalog. This
// ensures the CSS reflects the current disk durable state.
_recoverCollectionMetadataFromDisk(opCtx, nss, receivedShardVersion);
// No specific version requested. The caller just wanted a forced recovery from disk, which has
// already completed above.
if (!receivedShardVersion) {
LOGV2(12307912,
"Authoritative collection metadata recovery completed successfully",
logAttrs(nss),
"outcome"_attr = "forcedDiskRecovery",
"durationMillis"_attr = t.millis());
return;
}
// Step 3: If both the router's version and the shard's version are comparable (both tracked or
// both untracked), we can use the partial ordering to determine whether the shard is already up
// to date or the router is stale. In either case the caller can return to the retry loop.
if (_isRecoveredShardVersionSufficient(opCtx, nss, *receivedShardVersion)) {
ShardingStatistics::get(opCtx)
.authoritativeCollectionMetadataStatistics.registerVersionResolvedAfterRecovery();
LOGV2(12307913,
"Authoritative collection metadata recovery completed successfully",
logAttrs(nss),
"outcome"_attr = "versionMatchedAfterDiskRecovery",
"durationMillis"_attr = t.millis());
return;
}
// Step 4: The versions are not comparable (e.g. one is tracked and the other is untracked).
// We cannot determine ordering from the version tuple alone, so we rely on the oplog timeline:
// wait until either the shard's version matches the router's (via oplog application) or until
// configTime is reached with majority read concern. configTime is an upper bound because all
// DDL critical sections that could change the shard version have been applied by that point.
// On a primary this wait is an instant no-op.
//
// Once done, the shard is guaranteed to be authoritative: if the versions still don't match
// after configTime, the router is stale.
_waitForConfigTimeOrChunkVersionChange(opCtx, nss, *receivedShardVersion);
LOGV2(12307914,
"Authoritative collection metadata recovery completed successfully",
logAttrs(nss),
"outcome"_attr = "shardVersionMatchOrConfigTimeWait",
"durationMillis"_attr = t.millis());
}
SharedSemiFuture<void> FilteringMetadataCache::_recoverRefreshCollectionPlacementVersion(

View File

@ -241,6 +241,7 @@ private:
const NamespaceString& nss,
const ChunkVersion& receivedShardVersion);
enum class WasWaitInterrupted { kNo, kYes };
/**
* Handles the case where router's shard version (receivedShardVersion) and shard's shard
* version (wantedShardVersion) are not directly comparable (e.g. one tracked, one untracked).
@ -253,9 +254,9 @@ private:
* have changed the version have already been applied. On a primary this wait is an instant
* no-op since it has already applied all oplog entries.
*/
void _waitForConfigTimeOrChunkVersionChange(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& chunkVersion);
WasWaitInterrupted _waitForConfigTimeOrChunkVersionChange(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& chunkVersion);
SharedSemiFuture<void> _recoverRefreshCollectionPlacementVersion(
ServiceContext* serviceContext,

View File

@ -454,6 +454,58 @@ TEST_F(AuthoritativeRefreshFixture, CriticalSectionBlocksRecoveryThenProceeds) {
ASSERT_EQ(metadataOpt->getCollPlacementVersion(), chunks.back().getVersion());
}
TEST_F(AuthoritativeRefreshFixture, ClearFilteringMetadataDuringPostRecoveryWaitTriggersRetry) {
RAIIServerParameterControllerForTest featureFlag("featureFlagAuthoritativeShardsCRUD", true);
auto* opCtx = operationContext();
const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, 5, kMyShardName);
populateDiskCatalog(opCtx, collType, chunks);
{
auto csr = CollectionShardingRuntime::acquireExclusive(opCtx, kTestNss);
csr->clearFilteringMetadata_authoritative(opCtx);
}
// Hang the recovery thread inside _waitForConfigTimeOrChunkVersionChange after the version
// and majority waiters are registered but before whenAny is scheduled, so that we have a
// deterministic window to clear the filtering metadata and cancel the version waiter.
auto* fp = globalFailPointRegistry().find("hangBeforeWaitingForConfigTimeOrChunkVersionChange");
auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn);
// Router reports UNTRACKED while the shard's on-disk metadata is tracked: the post-recovery
// comparison cannot order the two, so recovery falls into Step 4's wait.
stdx::thread recoveryThread([&] {
auto bgClient = getGlobalServiceContext()->getService()->makeClient("bgInterrupt");
auto bgOpCtx = bgClient->makeOperationContext();
ASSERT_OK(onShardVersionMismatch(bgOpCtx.get(), kTestNss, ChunkVersion::UNTRACKED()));
});
fp->waitForTimesEntered(initialTimesEntered + 1);
// Cancel the registered version waiter and clear the CSS metadata. The wait should observe
// CallbackCanceled and return kYes so the outer recovery loop performs another disk
// recovery iteration.
{
auto csr = CollectionShardingRuntime::acquireExclusive(opCtx, kTestNss);
csr->clearFilteringMetadata_authoritative(opCtx);
}
fp->setMode(FailPoint::off);
recoveryThread.join();
auto csr = CollectionShardingRuntime::acquireShared(opCtx, kTestNss);
auto metadataOpt = csr->getCurrentMetadataIfKnown();
ASSERT_TRUE(metadataOpt.has_value());
ASSERT_EQ(metadataOpt->getCollPlacementVersion(), chunks.back().getVersion());
// The retry path must have run disk recovery a second time: once for the original wait that was
// interrupted, and once for the retried iteration that completed.
auto stats = getStatistics(opCtx);
ASSERT_GTE(stats.getIntField("diskRecoveriesPerformed"), 2)
<< "Expected the wait interrupt to trigger a second disk recovery";
}
TEST_F(AuthoritativeRefreshFixture, RecoveryCreatesExactlyOneRecoverer) {
RAIIServerParameterControllerForTest featureFlag("featureFlagAuthoritativeShardsCRUD", true);
auto* opCtx = operationContext();

View File

@ -90,6 +90,17 @@ public:
}
}
/**
* Cancels all pending waiters with a specific error code.
*/
void cancelWaiters(const Status& error) {
std::unique_lock lk(_mutex);
for (auto& [_, waiterPromise] : _waiters) {
waiterPromise.setError(error);
}
_waiters.clear();
}
private:
std::mutex _mutex;
boost::optional<Key> _currentKeyValue;