SERVER-122159 Stop deadlock between FLE2 compact/cleanup and setFCV (#52342)

GitOrigin-RevId: 649405b59126badb7418c58c611f9352f640150c
This commit is contained in:
Enrico Golfieri 2026-05-26 15:23:38 +02:00 committed by MongoDB Bot
parent ec47150c40
commit 0c1abd37e1
10 changed files with 721 additions and 118 deletions

View File

@ -2064,6 +2064,18 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "fle_compact_cleanup_mutex",
srcs = [
"fle_compact_cleanup_mutex.cpp",
],
deps = [
":server_base",
":service_context",
"//src/mongo:base",
],
)
mongo_cc_library(
name = "fle_mocks",
srcs = [
@ -2089,6 +2101,19 @@ mongo_cc_library(
],
)
mongo_cc_unit_test(
name = "fle_compact_cleanup_mutex_test",
srcs = [
"fle_compact_cleanup_mutex_test.cpp",
],
tags = ["mongo_unittest_first_group"],
deps = [
":fle_compact_cleanup_mutex",
":service_context_d_test_fixture",
"//src/mongo/unittest",
],
)
mongo_cc_unit_test(
name = "fle_test",
srcs = [

View File

@ -172,6 +172,9 @@ filters:
approvers:
- 10gen/server-security
- 10gen/query-integration-features
- "fle_compact_cleanup_mutex*":
approvers:
- 10gen/server-security
- "/keyfile_option*":
approvers:
- 10gen/server-security

View File

@ -617,6 +617,7 @@ mongo_cc_library(
"//src/mongo/db:commands",
"//src/mongo/db:common",
"//src/mongo/db:dbdirectclient",
"//src/mongo/db:fle_compact_cleanup_mutex",
"//src/mongo/db:fle_crud",
"//src/mongo/db:mongohasher",
"//src/mongo/db:server_base",

View File

@ -47,6 +47,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/database_name.h"
#include "mongo/db/feature_flag.h"
#include "mongo/db/fle_compact_cleanup_mutex.h"
#include "mongo/db/fle_crud.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
@ -89,6 +90,54 @@ namespace mongo {
namespace {
auto acquireAndValidateCollections(OperationContext* opCtx,
CollectionAcquisitionRequests requests) {
CollectionOrViewAcquisitionRequests acquisitionRequests;
for (auto& request : requests) {
if (request.nssOrUUID.isNamespaceString()) {
acquisitionRequests.emplace_back(request.nssOrUUID.nss(),
request.expectedUUID,
request.placementConcern,
request.readConcern,
request.operationType,
AcquisitionPrerequisites::ViewMode::kCanBeView,
request.lockAcquisitionDeadline);
} else {
acquisitionRequests.emplace_back(std::move(request.nssOrUUID),
request.placementConcern,
request.readConcern,
request.operationType,
AcquisitionPrerequisites::ViewMode::kCanBeView,
request.lockAcquisitionDeadline);
}
}
auto acquisitions =
makeAcquisitionMap(acquireCollectionsOrViews(opCtx, acquisitionRequests, MODE_IS));
for (const auto& [nss, acq] : acquisitions) {
uassert(ErrorCodes::CommandNotSupportedOnView,
"Cannot cleanup structured encryption data on a view",
!acq.isView());
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Collection '" << nss.toStringForErrorMsg() << "' does not exist",
acq.collectionExists());
}
return acquisitions;
}
EncryptedStateCollectionsNamespaces validateCleanupRequestAndGetNamespaces(
OperationContext* opCtx, const CleanupStructuredEncryptionData& request) {
const auto& edcNss = request.getNamespace();
auto collections = acquireAndValidateCollections(
opCtx,
{CollectionAcquisitionRequest::fromOpCtx(
opCtx, edcNss, AcquisitionPrerequisites::OperationType::kRead)});
const auto& edcCollection = collections.at(edcNss);
validateCleanupRequest(request, *edcCollection.getCollectionPtr().get());
return uassertStatusOK(EncryptedStateCollectionsNamespaces::createFromDataCollection(
*edcCollection.getCollectionPtr().get()));
}
void createQEClusteredStateCollection(OperationContext* opCtx, const NamespaceString& nss) {
// Create QE state collection locally. This local collection creation is safe here because we
// instantiate a ScopedReplicaSetDDL object prior to this call. This object registers this DDL
@ -167,35 +216,12 @@ CleanupStats cleanupEncryptedCollection(OperationContext* opCtx,
const auto& edcNss = request.getNamespace();
AutoGetDb autoDb(opCtx, edcNss.dbName(), MODE_IX);
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Database '" << edcNss.dbName().toStringForErrorMsg()
<< "' does not exist",
autoDb.getDb());
Lock::CollectionLock edcLock(opCtx, edcNss, MODE_IS);
// Validate the request and acquire the relevant namespaces
EncryptedStateCollectionsNamespaces namespaces;
{
auto catalog = CollectionCatalog::get(opCtx);
// Check the data collection exists and is not a view
auto edc = catalog->lookupCollectionByNamespace(opCtx, edcNss);
if (!edc) {
uassert(ErrorCodes::CommandNotSupportedOnView,
"Cannot cleanup structured encryption data on a view",
!catalog->lookupView(opCtx, edcNss));
uasserted(ErrorCodes::NamespaceNotFound,
str::stream()
<< "Collection '" << edcNss.toStringForErrorMsg() << "' does not exist");
}
validateCleanupRequest(request, *edc);
namespaces =
uassertStatusOK(EncryptedStateCollectionsNamespaces::createFromDataCollection(*edc));
}
auto [namespaces, scopedCompactCleanupMutex] =
acquireFLECompactCleanupMutexWithStableNamespaces(
opCtx, [&] { return validateCleanupRequestAndGetNamespaces(opCtx, request); });
// Register the DDL with the replica set tracker so any local QE state collection DDL remains
// visible to metadata synchronization, without taking the replica set DDL locks here.
ReplicaSetDDLTracker::ScopedReplicaSetDDL scopedReplicaSetDDL(
opCtx,
std::vector<NamespaceString>{namespaces.edcNss,
@ -204,9 +230,6 @@ CleanupStats cleanupEncryptedCollection(OperationContext* opCtx,
namespaces.ecocRenameNss,
namespaces.ecocLockNss});
// Acquire exclusive lock on the associated 'ecoc.lock' namespace to serialize calls
// to cleanup and compact on the same EDC namespace.
Lock::CollectionLock compactionLock(opCtx, namespaces.ecocLockNss, MODE_X);
LOGV2(7618805, "Cleaning up the encrypted compaction collection", logAttrs(edcNss));
@ -279,14 +302,18 @@ CleanupStats cleanupEncryptedCollection(OperationContext* opCtx,
createQEClusteredStateCollection(opCtx, namespaces.ecocNss);
}
FLECleanupESCDeleteQueue pq;
{
AutoGetCollection ecocCompact(opCtx, namespaces.ecocRenameNss, MODE_IS);
auto collections = acquireAndValidateCollections(
opCtx,
{CollectionAcquisitionRequest::fromOpCtx(
opCtx, namespaces.ecocRenameNss, AcquisitionPrerequisites::OperationType::kRead),
CollectionAcquisitionRequest::fromOpCtx(
opCtx, namespaces.edcNss, AcquisitionPrerequisites::OperationType::kRead)});
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Renamed encrypted compaction collection "
<< namespaces.ecocRenameNss.toStringForErrorMsg()
<< " no longer exists prior to cleanup",
*ecocCompact);
// Ensure the EDC collection is still valid
validateCleanupRequest(request,
*collections.at(namespaces.edcNss).getCollectionPtr().get());
auto pqMemLimit =
ServerParameterSet::getClusterParameterSet()
@ -295,22 +322,31 @@ CleanupStats cleanupEncryptedCollection(OperationContext* opCtx,
.getMaxAnchorCompactionSize();
// Clean up entries for each encrypted field in compactionTokens
auto pq = processFLECleanup(opCtx,
request,
&getTransactionWithRetriesForMongoD,
namespaces,
pqMemLimit,
&stats.getEsc(),
&stats.getEcoc());
// Delete the entries in 'C' from the ESC
cleanupESCNonAnchors(
opCtx, namespaces.escNss, escDeleteSet, tagsPerDelete, &stats.getEsc());
// Delete the entries in the priority queue of ESC anchors from the ESC
cleanupESCAnchors(opCtx, namespaces.escNss, pq, tagsPerDelete, &stats.getEsc());
// Stash any resource before starting the internal transaction and restore them after the
// transaction commits
StashTransactionResourcesForMultiDocumentTransaction stasher(opCtx);
pq = processFLECleanup(opCtx,
request,
&getTransactionWithRetriesForMongoD,
namespaces,
pqMemLimit,
&stats.getEsc(),
&stats.getEcoc());
stasher.restoreOnCommit();
validateCleanupRequest(request,
*collections.at(namespaces.edcNss).getCollectionPtr().get());
}
// processFLECleanup() has fully consumed 'ecoc.compact' and materialized the ESC cleanup work
// into 'escDeleteSet' and 'pq'. The remaining cleanup writes only to the ESC, so it does not
// need to keep the read acquisition on 'ecoc.compact'.
// Delete the entries in 'C' from the ESC
cleanupESCNonAnchors(opCtx, namespaces.escNss, escDeleteSet, tagsPerDelete, &stats.getEsc());
// Delete the entries in the priority queue of ESC anchors from the ESC
cleanupESCAnchors(opCtx, namespaces.escNss, pq, tagsPerDelete, &stats.getEsc());
// Drop the 'ecoc.compact' collection
dropQEStateCollection(opCtx, namespaces.ecocRenameNss);

View File

@ -60,7 +60,6 @@
#include "mongo/db/query/write_ops/write_ops.h"
#include "mongo/db/query/write_ops/write_ops_gen.h"
#include "mongo/db/query/write_ops/write_ops_parsers.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session/logical_session_id.h"
#include "mongo/db/shard_role/shard_catalog/collection_options.h"
#include "mongo/db/transaction/transaction_api.h"
@ -75,7 +74,9 @@
#include <algorithm>
#include <array>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <tuple>

View File

@ -44,6 +44,7 @@
#include "mongo/util/modules.h"
#include <cstddef>
#include <memory>
#include <numeric>
#include <queue>
#include <vector>

View File

@ -45,6 +45,7 @@
#include "mongo/db/commands/fle2_compact_gen.h"
#include "mongo/db/curop.h"
#include "mongo/db/database_name.h"
#include "mongo/db/fle_compact_cleanup_mutex.h"
#include "mongo/db/fle_crud.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
@ -63,6 +64,7 @@
#include "mongo/db/shard_role/shard_catalog/drop_collection.h"
#include "mongo/db/shard_role/shard_catalog/operation_sharding_state.h"
#include "mongo/db/shard_role/shard_catalog/rename_collection.h"
#include "mongo/db/shard_role/shard_role.h"
#include "mongo/db/tenant_id.h"
#include "mongo/db/topology/sharding_state.h"
#include "mongo/logv2/log.h"
@ -90,6 +92,54 @@ MONGO_FAIL_POINT_DEFINE(fleCompactSkipECOCDropUnsharded);
namespace mongo {
namespace {
auto acquireAndValidateCollections(OperationContext* opCtx,
CollectionAcquisitionRequests requests) {
CollectionOrViewAcquisitionRequests acquisitionRequests;
for (auto& request : requests) {
if (request.nssOrUUID.isNamespaceString()) {
acquisitionRequests.emplace_back(request.nssOrUUID.nss(),
request.expectedUUID,
request.placementConcern,
request.readConcern,
request.operationType,
AcquisitionPrerequisites::ViewMode::kCanBeView,
request.lockAcquisitionDeadline);
} else {
acquisitionRequests.emplace_back(std::move(request.nssOrUUID),
request.placementConcern,
request.readConcern,
request.operationType,
AcquisitionPrerequisites::ViewMode::kCanBeView,
request.lockAcquisitionDeadline);
}
}
auto acquisitions =
makeAcquisitionMap(acquireCollectionsOrViews(opCtx, acquisitionRequests, MODE_IS));
for (const auto& [nss, acq] : acquisitions) {
uassert(ErrorCodes::CommandNotSupportedOnView,
"Cannot compact structured encryption data on a view",
!acq.isView());
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Collection '" << nss.toStringForErrorMsg() << "' does not exist",
acq.collectionExists());
}
return acquisitions;
}
EncryptedStateCollectionsNamespaces validateCompactRequestAndGetNamespaces(
OperationContext* opCtx, const CompactStructuredEncryptionData& request) {
const auto& edcNss = request.getNamespace();
auto collections = acquireAndValidateCollections(
opCtx,
{CollectionAcquisitionRequest::fromOpCtx(
opCtx, edcNss, AcquisitionPrerequisites::OperationType::kRead)});
const auto& edcCollection = collections.at(edcNss);
validateCompactRequest(request, *edcCollection.getCollectionPtr().get());
return uassertStatusOK(EncryptedStateCollectionsNamespaces::createFromDataCollection(
*edcCollection.getCollectionPtr().get()));
}
CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx,
const CompactStructuredEncryptionData& request) {
{
@ -113,31 +163,12 @@ CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx,
LOGV2(6319900, "Compacting the encrypted compaction collection", logAttrs(edcNss));
AutoGetDb autoDb(opCtx, edcNss.dbName(), MODE_IX);
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Database '" << edcNss.dbName().toStringForErrorMsg()
<< "' does not exist",
autoDb.getDb());
auto catalog = CollectionCatalog::get(opCtx);
Lock::CollectionLock edcLock(opCtx, edcNss, MODE_IS);
// Check the data collection exists and is not a view
auto edc = catalog->lookupCollectionByNamespace(opCtx, edcNss);
if (!edc) {
uassert(ErrorCodes::CommandNotSupportedOnView,
"Cannot compact structured encryption data on a view",
!catalog->lookupView(opCtx, edcNss));
uasserted(ErrorCodes::NamespaceNotFound,
str::stream() << "Collection '" << edcNss.toStringForErrorMsg()
<< "' does not exist");
}
validateCompactRequest(request, *edc);
auto namespaces =
uassertStatusOK(EncryptedStateCollectionsNamespaces::createFromDataCollection(*edc));
auto [namespaces, scopedCompactCleanupMutex] =
acquireFLECompactCleanupMutexWithStableNamespaces(
opCtx, [&] { return validateCompactRequestAndGetNamespaces(opCtx, request); });
// Register the DDL with the replica set tracker so any local QE state collection DDL remains
// visible to metadata synchronization, without taking the replica set DDL locks here.
ReplicaSetDDLTracker::ScopedReplicaSetDDL scopedReplicaSetDDL(
opCtx,
std::vector<NamespaceString>{namespaces.edcNss,
@ -146,50 +177,61 @@ CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx,
namespaces.ecocRenameNss,
namespaces.ecocLockNss});
// Acquire exclusive lock on the associated 'ecoc.lock' namespace to serialize calls
// to cleanup and compact on the same EDC namespace
Lock::CollectionLock compactionLock(opCtx, namespaces.ecocLockNss, MODE_X);
// Step 1: rename the ECOC collection if it exists
catalog = CollectionCatalog::get(opCtx);
auto ecoc = catalog->lookupCollectionByNamespace(opCtx, namespaces.ecocNss);
auto ecocRename = catalog->lookupCollectionByNamespace(opCtx, namespaces.ecocRenameNss);
CompactStats stats({}, {});
FLECompactESCDeleteSet escDeleteSet;
bool createEcoc = false;
bool renameEcoc = false;
{
CollectionAcquisitionRequests acquisitionRequests = {
CollectionAcquisitionRequest::fromOpCtx(
opCtx, namespaces.ecocNss, AcquisitionPrerequisites::OperationType::kRead),
CollectionAcquisitionRequest::fromOpCtx(
opCtx, namespaces.ecocRenameNss, AcquisitionPrerequisites::OperationType::kRead)};
auto acquisitions =
makeAcquisitionMap(acquireCollectionsMaybeLockFree(opCtx, acquisitionRequests));
if (!ecoc && !ecocRename) {
// nothing to compact
LOGV2(6548306, "Skipping compaction as there is no ECOC collection to compact");
return stats;
} else if (ecoc && !ecocRename) {
// load the random set of ESC non-anchor entries to be deleted post-compact.
// This must be done before renaming the ECOC because if not, we can end up with
// ESC entries that have no corresponding ECOC entry in the renamed ECOC.
auto memoryLimit =
ServerParameterSet::getClusterParameterSet()
->get<ClusterParameterWithStorage<FLECompactionOptions>>("fleCompactionOptions")
->getValue(boost::none)
.getMaxCompactionSize();
const auto& ecoc = acquisitions.at(namespaces.ecocNss).getCollectionPtr();
const auto& ecocRename = acquisitions.at(namespaces.ecocRenameNss).getCollectionPtr();
escDeleteSet =
readRandomESCNonAnchorIds(opCtx, namespaces.escNss, memoryLimit, &stats.getEsc());
// Step 1: rename the ECOC collection if it exists
if (!ecoc && !ecocRename) {
// nothing to compact
LOGV2(6548306, "Skipping compaction as there is no ECOC collection to compact");
return stats;
} else if (ecoc && !ecocRename) {
// load the random set of ESC non-anchor entries to be deleted post-compact.
// This must be done before renaming the ECOC because if not, we can end up with
// ESC entries that have no corresponding ECOC entry in the renamed ECOC.
auto memoryLimit =
ServerParameterSet::getClusterParameterSet()
->get<ClusterParameterWithStorage<FLECompactionOptions>>("fleCompactionOptions")
->getValue(boost::none)
.getMaxCompactionSize();
LOGV2(7293603,
"Renaming the encrypted compaction collection",
"ecocNss"_attr = namespaces.ecocNss,
"ecocRenameNss"_attr = namespaces.ecocRenameNss);
escDeleteSet =
readRandomESCNonAnchorIds(opCtx, namespaces.escNss, memoryLimit, &stats.getEsc());
LOGV2(7293603,
"Renaming the encrypted compaction collection",
"ecocNss"_attr = namespaces.ecocNss,
"ecocRenameNss"_attr = namespaces.ecocRenameNss);
renameEcoc = createEcoc = true;
} else {
LOGV2(7293610,
"Resuming compaction from a stale ECOC collection",
logAttrs(namespaces.ecocRenameNss));
if (!ecoc) {
createEcoc = true;
}
}
}
if (renameEcoc) {
RenameCollectionOptions renameOpts;
validateAndRunRenameCollection(
opCtx, namespaces.ecocNss, namespaces.ecocRenameNss, renameOpts);
ecoc = nullptr;
} else {
LOGV2(7293610,
"Resuming compaction from a stale ECOC collection",
logAttrs(namespaces.ecocRenameNss));
}
if (!ecoc) {
if (createEcoc) {
if (MONGO_unlikely(fleCompactHangBeforeECOCCreateUnsharded.shouldFail())) {
LOGV2(7299601, "Hanging due to fleCompactHangBeforeECOCCreateUnsharded fail point");
fleCompactHangBeforeECOCCreateUnsharded.pauseWhileSet();
@ -222,15 +264,23 @@ CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx,
// Step 2: for each encrypted field in compactionTokens, get distinct set of entries 'C'
// from ECOC, and for each entry in 'C', compact ESC.
{
// acquire IS lock on the ecocRenameNss to prevent it from being dropped during compact
AutoGetCollection tempEcocColl(opCtx, namespaces.ecocRenameNss, MODE_IS);
// Acquire ecoc.compact and the EDC before stashing transaction resources. Restore will
// re-establish these acquisitions after the internal transaction commits and fail if either
// collection was dropped or recreated while the resources were stashed.
auto collections = acquireAndValidateCollections(
opCtx,
{CollectionAcquisitionRequest::fromOpCtx(
opCtx, namespaces.ecocRenameNss, AcquisitionPrerequisites::OperationType::kRead),
CollectionAcquisitionRequest::fromOpCtx(
opCtx, namespaces.edcNss, AcquisitionPrerequisites::OperationType::kRead)});
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Renamed encrypted compaction collection "
<< namespaces.ecocRenameNss.toStringForErrorMsg()
<< " no longer exists prior to compaction",
*tempEcocColl);
// Ensure the EDC collection is still valid
validateCompactRequest(request,
*collections.at(namespaces.edcNss).getCollectionPtr().get());
// Stash any resource before starting the internal transaction and restore them after the
// transaction commits
StashTransactionResourcesForMultiDocumentTransaction stasher(opCtx);
processFLECompactV2(opCtx,
request,
&getTransactionWithRetriesForMongoD,
@ -238,6 +288,9 @@ CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx,
getFLE2TaskExecutorForMongoD(opCtx),
&stats.getEsc(),
&stats.getEcoc());
stasher.restoreOnCommit();
validateCompactRequest(request,
*collections.at(namespaces.edcNss).getCollectionPtr().get());
}
auto tagsPerDelete =

View File

@ -0,0 +1,137 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/fle_compact_cleanup_mutex.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/assert_util.h"
#include <map>
#include <mutex>
namespace mongo {
class FLECompactCleanupMutex {
public:
std::mutex mutex;
stdx::condition_variable cv;
bool isLocked = false;
};
namespace {
class FLECompactCleanupMutexRegistry {
public:
std::shared_ptr<FLECompactCleanupMutex> getOrCreateMutexEntry(
const NamespaceString& ecocLockNss) {
std::lock_guard<std::mutex> lk(_mapMutex);
auto it = _mutexes.find(ecocLockNss);
if (it != _mutexes.end()) {
if (auto state = it->second.lock()) {
return state;
}
}
auto state = std::make_shared<FLECompactCleanupMutex>();
_mutexes[ecocLockNss] = state;
return state;
}
void eraseMutexEntryIfUnused(const NamespaceString& ecocLockNss,
const std::shared_ptr<FLECompactCleanupMutex>& state) {
std::lock_guard<std::mutex> lk(_mapMutex);
auto it = _mutexes.find(ecocLockNss);
if (it == _mutexes.end()) {
return;
}
auto current = it->second.lock();
if (!current) {
_mutexes.erase(it);
return;
}
// 'current' is a temporary strong reference from weak_ptr::lock(), so a use count of 2
// means the scoped object being destroyed is the only real owner.
if (current == state && state.use_count() == 2) {
_mutexes.erase(it);
}
}
size_t sizeForTest() const {
std::lock_guard<std::mutex> lk(_mapMutex);
return _mutexes.size();
}
private:
mutable std::mutex _mapMutex;
std::map<NamespaceString, std::weak_ptr<FLECompactCleanupMutex>> _mutexes;
};
const auto getFLECompactCleanupMutexRegistry =
ServiceContext::declareDecoration<FLECompactCleanupMutexRegistry>();
} // namespace
ScopedFLECompactCleanupMutex::ScopedFLECompactCleanupMutex(OperationContext* opCtx,
const NamespaceString& ecocLockNss)
: _opCtx(opCtx),
_ecocLockNss(ecocLockNss),
_mutex(getFLECompactCleanupMutexRegistry(opCtx->getServiceContext())
.getOrCreateMutexEntry(ecocLockNss)) {
try {
std::unique_lock<std::mutex> lk(_mutex->mutex);
opCtx->waitForConditionOrInterrupt(_mutex->cv, lk, [this] { return !_mutex->isLocked; });
_mutex->isLocked = true;
} catch (...) {
getFLECompactCleanupMutexRegistry(_opCtx->getServiceContext())
.eraseMutexEntryIfUnused(_ecocLockNss, _mutex);
throw;
}
}
ScopedFLECompactCleanupMutex::~ScopedFLECompactCleanupMutex() {
{
std::lock_guard<std::mutex> lk(_mutex->mutex);
invariant(_mutex->isLocked);
_mutex->isLocked = false;
}
_mutex->cv.notify_one();
getFLECompactCleanupMutexRegistry(_opCtx->getServiceContext())
.eraseMutexEntryIfUnused(_ecocLockNss, _mutex);
}
size_t getFLECompactCleanupMutexRegistrySizeForTest(ServiceContext* serviceContext) {
return getFLECompactCleanupMutexRegistry(serviceContext).sizeForTest();
}
} // namespace mongo

View File

@ -0,0 +1,83 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/namespace_string.h"
#include "mongo/util/modules.h"
#include <cstddef>
#include <memory>
#include <utility>
namespace mongo {
class FLECompactCleanupMutex;
class OperationContext;
class ServiceContext;
/**
* Serializes FLE compact and cleanup operations for the same EDC using an in-memory mutex keyed by
* the derived 'ecoc.lock' namespace. This mutex intentionally lives outside the locker/transaction
* resource machinery so it remains effective across the internal transaction stash/restore flow.
*/
class MONGO_MOD_PUB ScopedFLECompactCleanupMutex {
public:
ScopedFLECompactCleanupMutex(OperationContext* opCtx, const NamespaceString& ecocLockNss);
~ScopedFLECompactCleanupMutex();
ScopedFLECompactCleanupMutex(const ScopedFLECompactCleanupMutex&) = delete;
ScopedFLECompactCleanupMutex& operator=(const ScopedFLECompactCleanupMutex&) = delete;
private:
OperationContext* _opCtx;
NamespaceString _ecocLockNss;
std::shared_ptr<FLECompactCleanupMutex> _mutex;
};
MONGO_MOD_PUB size_t getFLECompactCleanupMutexRegistrySizeForTest(ServiceContext* serviceContext);
template <typename GetNamespaces>
MONGO_MOD_PUB auto acquireFLECompactCleanupMutexWithStableNamespaces(OperationContext* opCtx,
GetNamespaces getNamespaces) {
auto namespaces = getNamespaces();
while (true) {
auto scopedMutex =
std::make_unique<ScopedFLECompactCleanupMutex>(opCtx, namespaces.ecocLockNss);
auto latestNamespaces = getNamespaces();
if (latestNamespaces.ecocLockNss == namespaces.ecocLockNss) {
return std::pair{std::move(latestNamespaces), std::move(scopedMutex)};
}
namespaces = std::move(latestNamespaces);
}
}
} // namespace mongo

View File

@ -0,0 +1,263 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/fle_compact_cleanup_mutex.h"
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/unittest/unittest.h"
#include <chrono>
#include <exception>
#include <functional>
#include <future>
#include <memory>
#include <thread>
namespace mongo {
namespace {
bool waitForPredicate(const std::function<bool()>& pred) {
constexpr auto kTimeout = std::chrono::seconds(5);
constexpr auto kSleepInterval = std::chrono::milliseconds(10);
const auto deadline = std::chrono::steady_clock::now() + kTimeout;
while (std::chrono::steady_clock::now() < deadline) {
if (pred()) {
return true;
}
std::this_thread::sleep_for(kSleepInterval);
}
return pred();
}
void markKilled(OperationContext* opCtx) {
std::lock_guard<Client> lk(*opCtx->getClient());
opCtx->markKilled(ErrorCodes::Interrupted);
}
void rethrowIfSet(const std::exception_ptr& ex) {
if (ex) {
std::rethrow_exception(ex);
}
}
class FLECompactCleanupMutexTest : public ServiceContextMongoDTest {
protected:
size_t registrySize() const {
return getFLECompactCleanupMutexRegistrySizeForTest(getServiceContext());
}
};
struct TestNamespaces {
NamespaceString ecocLockNss;
};
TEST_F(FLECompactCleanupMutexTest, StableNamespacesHelperReturnsPostLockNamespaces) {
auto opCtx = makeOperationContext();
std::vector<TestNamespaces> namespaceResults{
{NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock")},
{NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock")}};
size_t calls = 0;
auto [namespaces, scopedMutex] = acquireFLECompactCleanupMutexWithStableNamespaces(
opCtx.get(), [&] { return namespaceResults[calls++]; });
ASSERT_EQ(namespaceResults.back().ecocLockNss, namespaces.ecocLockNss);
ASSERT_EQ(2, calls);
ASSERT_EQ(1, registrySize());
scopedMutex.reset();
ASSERT_EQ(0, registrySize());
}
TEST_F(FLECompactCleanupMutexTest, StableNamespacesHelperRetriesWhenLockNamespaceChanges) {
auto opCtx = makeOperationContext();
auto firstNss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll1.ecoc.lock");
auto secondNss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll2.ecoc.lock");
std::vector<TestNamespaces> namespaceResults{{firstNss}, {secondNss}, {secondNss}};
size_t calls = 0;
auto [namespaces, scopedMutex] = acquireFLECompactCleanupMutexWithStableNamespaces(
opCtx.get(), [&] { return namespaceResults[calls++]; });
ASSERT_EQ(secondNss, namespaces.ecocLockNss);
ASSERT_EQ(3, calls);
ASSERT_EQ(1, registrySize());
scopedMutex.reset();
ASSERT_EQ(0, registrySize());
}
TEST_F(FLECompactCleanupMutexTest, ScopedAcquisitionsOnSameNamespaceSerialize) {
auto nss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock");
auto holderOpCtx = makeOperationContext();
auto holder = std::make_unique<ScopedFLECompactCleanupMutex>(holderOpCtx.get(), nss);
std::promise<OperationContext*> waiterOpCtxPromise;
auto waiterOpCtxFuture = waiterOpCtxPromise.get_future();
std::promise<void> acquiredPromise;
auto acquiredFuture = acquiredPromise.get_future();
std::promise<void> releasePromise;
auto releaseFuture = releasePromise.get_future();
std::exception_ptr waiterException;
std::thread waiter([&] {
try {
auto client = getServiceContext()->getService()->makeClient("sameNamespaceWaiter");
auto opCtx = client->makeOperationContext();
waiterOpCtxPromise.set_value(opCtx.get());
ScopedFLECompactCleanupMutex scoped(opCtx.get(), nss);
acquiredPromise.set_value();
releaseFuture.get();
} catch (...) {
waiterException = std::current_exception();
}
});
auto* waiterOpCtx = waiterOpCtxFuture.get();
const auto waiterBlocked =
waitForPredicate([&] { return waiterOpCtx->isWaitingForConditionOrInterrupt(); });
const auto acquiredBeforeRelease =
acquiredFuture.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready;
holder.reset();
const auto acquiredAfterRelease =
acquiredFuture.wait_for(std::chrono::seconds(5)) == std::future_status::ready;
if (!acquiredAfterRelease) {
markKilled(waiterOpCtx);
}
releasePromise.set_value();
waiter.join();
ASSERT_TRUE(waiterBlocked);
ASSERT_FALSE(acquiredBeforeRelease);
ASSERT_TRUE(acquiredAfterRelease);
rethrowIfSet(waiterException);
ASSERT_EQ(0, registrySize());
}
TEST_F(FLECompactCleanupMutexTest, ScopedAcquisitionsOnDifferentNamespacesDoNotBlock) {
auto nss1 = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll1.ecoc.lock");
auto nss2 = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll2.ecoc.lock");
auto holderOpCtx = makeOperationContext();
auto holder = std::make_unique<ScopedFLECompactCleanupMutex>(holderOpCtx.get(), nss1);
std::promise<void> acquiredPromise;
auto acquiredFuture = acquiredPromise.get_future();
std::promise<void> releasePromise;
auto releaseFuture = releasePromise.get_future();
std::exception_ptr waiterException;
std::thread waiter([&] {
try {
auto client = getServiceContext()->getService()->makeClient("differentNamespaceWaiter");
auto opCtx = client->makeOperationContext();
ScopedFLECompactCleanupMutex scoped(opCtx.get(), nss2);
acquiredPromise.set_value();
releaseFuture.get();
} catch (...) {
waiterException = std::current_exception();
}
});
const auto acquiredWhileFirstNamespaceHeld =
acquiredFuture.wait_for(std::chrono::seconds(5)) == std::future_status::ready;
const auto registrySizeWhileBothAreHeld = registrySize();
holder.reset();
releasePromise.set_value();
waiter.join();
ASSERT_TRUE(acquiredWhileFirstNamespaceHeld);
ASSERT_EQ(2, registrySizeWhileBothAreHeld);
rethrowIfSet(waiterException);
ASSERT_EQ(0, registrySize());
}
TEST_F(FLECompactCleanupMutexTest, InterruptedWaiterDoesNotLeakRegistryEntry) {
auto nss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock");
auto holderOpCtx = makeOperationContext();
auto holder = std::make_unique<ScopedFLECompactCleanupMutex>(holderOpCtx.get(), nss);
std::promise<OperationContext*> waiterOpCtxPromise;
auto waiterOpCtxFuture = waiterOpCtxPromise.get_future();
std::promise<void> interruptedPromise;
auto interruptedFuture = interruptedPromise.get_future();
std::exception_ptr waiterException;
std::thread waiter([&] {
try {
auto client = getServiceContext()->getService()->makeClient("interruptedWaiter");
auto opCtx = client->makeOperationContext();
waiterOpCtxPromise.set_value(opCtx.get());
ASSERT_THROWS_CODE(ScopedFLECompactCleanupMutex(opCtx.get(), nss),
DBException,
ErrorCodes::Interrupted);
interruptedPromise.set_value();
} catch (...) {
waiterException = std::current_exception();
}
});
auto* waiterOpCtx = waiterOpCtxFuture.get();
const auto waiterBlocked =
waitForPredicate([&] { return waiterOpCtx->isWaitingForConditionOrInterrupt(); });
markKilled(waiterOpCtx);
const auto interrupted =
interruptedFuture.wait_for(std::chrono::seconds(5)) == std::future_status::ready;
holder.reset();
waiter.join();
ASSERT_TRUE(waiterBlocked);
ASSERT_TRUE(interrupted);
rethrowIfSet(waiterException);
ASSERT_EQ(0, registrySize());
}
TEST_F(FLECompactCleanupMutexTest, RegistryEntriesAreRemovedAfterScopedObjectsAreDestroyed) {
auto nss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock");
auto opCtx = makeOperationContext();
ASSERT_EQ(0, registrySize());
{
ScopedFLECompactCleanupMutex scoped(opCtx.get(), nss);
ASSERT_EQ(1, registrySize());
}
ASSERT_EQ(0, registrySize());
}
} // namespace
} // namespace mongo