diff --git a/src/mongo/db/BUILD.bazel b/src/mongo/db/BUILD.bazel index 7b0227a4458..3d8de448058 100644 --- a/src/mongo/db/BUILD.bazel +++ b/src/mongo/db/BUILD.bazel @@ -2064,6 +2064,18 @@ mongo_cc_library( ], ) +mongo_cc_library( + name = "fle_compact_cleanup_mutex", + srcs = [ + "fle_compact_cleanup_mutex.cpp", + ], + deps = [ + ":server_base", + ":service_context", + "//src/mongo:base", + ], +) + mongo_cc_library( name = "fle_mocks", srcs = [ @@ -2089,6 +2101,19 @@ mongo_cc_library( ], ) +mongo_cc_unit_test( + name = "fle_compact_cleanup_mutex_test", + srcs = [ + "fle_compact_cleanup_mutex_test.cpp", + ], + tags = ["mongo_unittest_first_group"], + deps = [ + ":fle_compact_cleanup_mutex", + ":service_context_d_test_fixture", + "//src/mongo/unittest", + ], +) + mongo_cc_unit_test( name = "fle_test", srcs = [ diff --git a/src/mongo/db/OWNERS.yml b/src/mongo/db/OWNERS.yml index c384a9ca4ff..9ac0c63d6df 100644 --- a/src/mongo/db/OWNERS.yml +++ b/src/mongo/db/OWNERS.yml @@ -172,6 +172,9 @@ filters: approvers: - 10gen/server-security - 10gen/query-integration-features + - "fle_compact_cleanup_mutex*": + approvers: + - 10gen/server-security - "/keyfile_option*": approvers: - 10gen/server-security diff --git a/src/mongo/db/commands/BUILD.bazel b/src/mongo/db/commands/BUILD.bazel index 897b75861fe..39137b614d5 100644 --- a/src/mongo/db/commands/BUILD.bazel +++ b/src/mongo/db/commands/BUILD.bazel @@ -617,6 +617,7 @@ mongo_cc_library( "//src/mongo/db:commands", "//src/mongo/db:common", "//src/mongo/db:dbdirectclient", + "//src/mongo/db:fle_compact_cleanup_mutex", "//src/mongo/db:fle_crud", "//src/mongo/db:mongohasher", "//src/mongo/db:server_base", diff --git a/src/mongo/db/commands/fle2_cleanup_cmd.cpp b/src/mongo/db/commands/fle2_cleanup_cmd.cpp index 65ffe7763f7..904a8d18bc1 100644 --- a/src/mongo/db/commands/fle2_cleanup_cmd.cpp +++ b/src/mongo/db/commands/fle2_cleanup_cmd.cpp @@ -47,6 +47,7 @@ #include "mongo/db/curop.h" #include "mongo/db/database_name.h" #include "mongo/db/feature_flag.h" +#include "mongo/db/fle_compact_cleanup_mutex.h" #include "mongo/db/fle_crud.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" @@ -89,6 +90,54 @@ namespace mongo { namespace { +auto acquireAndValidateCollections(OperationContext* opCtx, + CollectionAcquisitionRequests requests) { + CollectionOrViewAcquisitionRequests acquisitionRequests; + for (auto& request : requests) { + if (request.nssOrUUID.isNamespaceString()) { + acquisitionRequests.emplace_back(request.nssOrUUID.nss(), + request.expectedUUID, + request.placementConcern, + request.readConcern, + request.operationType, + AcquisitionPrerequisites::ViewMode::kCanBeView, + request.lockAcquisitionDeadline); + } else { + acquisitionRequests.emplace_back(std::move(request.nssOrUUID), + request.placementConcern, + request.readConcern, + request.operationType, + AcquisitionPrerequisites::ViewMode::kCanBeView, + request.lockAcquisitionDeadline); + } + } + + auto acquisitions = + makeAcquisitionMap(acquireCollectionsOrViews(opCtx, acquisitionRequests, MODE_IS)); + for (const auto& [nss, acq] : acquisitions) { + uassert(ErrorCodes::CommandNotSupportedOnView, + "Cannot cleanup structured encryption data on a view", + !acq.isView()); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Collection '" << nss.toStringForErrorMsg() << "' does not exist", + acq.collectionExists()); + } + return acquisitions; +} + +EncryptedStateCollectionsNamespaces validateCleanupRequestAndGetNamespaces( + OperationContext* opCtx, const CleanupStructuredEncryptionData& request) { + const auto& edcNss = request.getNamespace(); + auto collections = acquireAndValidateCollections( + opCtx, + {CollectionAcquisitionRequest::fromOpCtx( + opCtx, edcNss, AcquisitionPrerequisites::OperationType::kRead)}); + const auto& edcCollection = collections.at(edcNss); + validateCleanupRequest(request, *edcCollection.getCollectionPtr().get()); + return uassertStatusOK(EncryptedStateCollectionsNamespaces::createFromDataCollection( + *edcCollection.getCollectionPtr().get())); +} + void createQEClusteredStateCollection(OperationContext* opCtx, const NamespaceString& nss) { // Create QE state collection locally. This local collection creation is safe here because we // instantiate a ScopedReplicaSetDDL object prior to this call. This object registers this DDL @@ -167,35 +216,12 @@ CleanupStats cleanupEncryptedCollection(OperationContext* opCtx, const auto& edcNss = request.getNamespace(); - AutoGetDb autoDb(opCtx, edcNss.dbName(), MODE_IX); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Database '" << edcNss.dbName().toStringForErrorMsg() - << "' does not exist", - autoDb.getDb()); - Lock::CollectionLock edcLock(opCtx, edcNss, MODE_IS); - - // Validate the request and acquire the relevant namespaces - EncryptedStateCollectionsNamespaces namespaces; - { - auto catalog = CollectionCatalog::get(opCtx); - - // Check the data collection exists and is not a view - auto edc = catalog->lookupCollectionByNamespace(opCtx, edcNss); - if (!edc) { - uassert(ErrorCodes::CommandNotSupportedOnView, - "Cannot cleanup structured encryption data on a view", - !catalog->lookupView(opCtx, edcNss)); - uasserted(ErrorCodes::NamespaceNotFound, - str::stream() - << "Collection '" << edcNss.toStringForErrorMsg() << "' does not exist"); - } - - validateCleanupRequest(request, *edc); - - namespaces = - uassertStatusOK(EncryptedStateCollectionsNamespaces::createFromDataCollection(*edc)); - } + auto [namespaces, scopedCompactCleanupMutex] = + acquireFLECompactCleanupMutexWithStableNamespaces( + opCtx, [&] { return validateCleanupRequestAndGetNamespaces(opCtx, request); }); + // Register the DDL with the replica set tracker so any local QE state collection DDL remains + // visible to metadata synchronization, without taking the replica set DDL locks here. ReplicaSetDDLTracker::ScopedReplicaSetDDL scopedReplicaSetDDL( opCtx, std::vector{namespaces.edcNss, @@ -204,9 +230,6 @@ CleanupStats cleanupEncryptedCollection(OperationContext* opCtx, namespaces.ecocRenameNss, namespaces.ecocLockNss}); - // Acquire exclusive lock on the associated 'ecoc.lock' namespace to serialize calls - // to cleanup and compact on the same EDC namespace. - Lock::CollectionLock compactionLock(opCtx, namespaces.ecocLockNss, MODE_X); LOGV2(7618805, "Cleaning up the encrypted compaction collection", logAttrs(edcNss)); @@ -279,14 +302,18 @@ CleanupStats cleanupEncryptedCollection(OperationContext* opCtx, createQEClusteredStateCollection(opCtx, namespaces.ecocNss); } + FLECleanupESCDeleteQueue pq; { - AutoGetCollection ecocCompact(opCtx, namespaces.ecocRenameNss, MODE_IS); + auto collections = acquireAndValidateCollections( + opCtx, + {CollectionAcquisitionRequest::fromOpCtx( + opCtx, namespaces.ecocRenameNss, AcquisitionPrerequisites::OperationType::kRead), + CollectionAcquisitionRequest::fromOpCtx( + opCtx, namespaces.edcNss, AcquisitionPrerequisites::OperationType::kRead)}); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Renamed encrypted compaction collection " - << namespaces.ecocRenameNss.toStringForErrorMsg() - << " no longer exists prior to cleanup", - *ecocCompact); + // Ensure the EDC collection is still valid + validateCleanupRequest(request, + *collections.at(namespaces.edcNss).getCollectionPtr().get()); auto pqMemLimit = ServerParameterSet::getClusterParameterSet() @@ -295,22 +322,31 @@ CleanupStats cleanupEncryptedCollection(OperationContext* opCtx, .getMaxAnchorCompactionSize(); // Clean up entries for each encrypted field in compactionTokens - auto pq = processFLECleanup(opCtx, - request, - &getTransactionWithRetriesForMongoD, - namespaces, - pqMemLimit, - &stats.getEsc(), - &stats.getEcoc()); - - // Delete the entries in 'C' from the ESC - cleanupESCNonAnchors( - opCtx, namespaces.escNss, escDeleteSet, tagsPerDelete, &stats.getEsc()); - - // Delete the entries in the priority queue of ESC anchors from the ESC - cleanupESCAnchors(opCtx, namespaces.escNss, pq, tagsPerDelete, &stats.getEsc()); + // Stash any resource before starting the internal transaction and restore them after the + // transaction commits + StashTransactionResourcesForMultiDocumentTransaction stasher(opCtx); + pq = processFLECleanup(opCtx, + request, + &getTransactionWithRetriesForMongoD, + namespaces, + pqMemLimit, + &stats.getEsc(), + &stats.getEcoc()); + stasher.restoreOnCommit(); + validateCleanupRequest(request, + *collections.at(namespaces.edcNss).getCollectionPtr().get()); } + // processFLECleanup() has fully consumed 'ecoc.compact' and materialized the ESC cleanup work + // into 'escDeleteSet' and 'pq'. The remaining cleanup writes only to the ESC, so it does not + // need to keep the read acquisition on 'ecoc.compact'. + + // Delete the entries in 'C' from the ESC + cleanupESCNonAnchors(opCtx, namespaces.escNss, escDeleteSet, tagsPerDelete, &stats.getEsc()); + + // Delete the entries in the priority queue of ESC anchors from the ESC + cleanupESCAnchors(opCtx, namespaces.escNss, pq, tagsPerDelete, &stats.getEsc()); + // Drop the 'ecoc.compact' collection dropQEStateCollection(opCtx, namespaces.ecocRenameNss); diff --git a/src/mongo/db/commands/fle2_compact.cpp b/src/mongo/db/commands/fle2_compact.cpp index 2505883d108..423601fb913 100644 --- a/src/mongo/db/commands/fle2_compact.cpp +++ b/src/mongo/db/commands/fle2_compact.cpp @@ -60,7 +60,6 @@ #include "mongo/db/query/write_ops/write_ops.h" #include "mongo/db/query/write_ops/write_ops_gen.h" #include "mongo/db/query/write_ops/write_ops_parsers.h" -#include "mongo/db/service_context.h" #include "mongo/db/session/logical_session_id.h" #include "mongo/db/shard_role/shard_catalog/collection_options.h" #include "mongo/db/transaction/transaction_api.h" @@ -75,7 +74,9 @@ #include #include #include +#include #include +#include #include #include #include diff --git a/src/mongo/db/commands/fle2_compact.h b/src/mongo/db/commands/fle2_compact.h index 071b74cba96..7d2ec7b6f7b 100644 --- a/src/mongo/db/commands/fle2_compact.h +++ b/src/mongo/db/commands/fle2_compact.h @@ -44,6 +44,7 @@ #include "mongo/util/modules.h" #include +#include #include #include #include diff --git a/src/mongo/db/commands/fle2_compact_cmd.cpp b/src/mongo/db/commands/fle2_compact_cmd.cpp index 4c90ec23e27..4a2d93133ec 100644 --- a/src/mongo/db/commands/fle2_compact_cmd.cpp +++ b/src/mongo/db/commands/fle2_compact_cmd.cpp @@ -45,6 +45,7 @@ #include "mongo/db/commands/fle2_compact_gen.h" #include "mongo/db/curop.h" #include "mongo/db/database_name.h" +#include "mongo/db/fle_compact_cleanup_mutex.h" #include "mongo/db/fle_crud.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" @@ -63,6 +64,7 @@ #include "mongo/db/shard_role/shard_catalog/drop_collection.h" #include "mongo/db/shard_role/shard_catalog/operation_sharding_state.h" #include "mongo/db/shard_role/shard_catalog/rename_collection.h" +#include "mongo/db/shard_role/shard_role.h" #include "mongo/db/tenant_id.h" #include "mongo/db/topology/sharding_state.h" #include "mongo/logv2/log.h" @@ -90,6 +92,54 @@ MONGO_FAIL_POINT_DEFINE(fleCompactSkipECOCDropUnsharded); namespace mongo { namespace { +auto acquireAndValidateCollections(OperationContext* opCtx, + CollectionAcquisitionRequests requests) { + CollectionOrViewAcquisitionRequests acquisitionRequests; + for (auto& request : requests) { + if (request.nssOrUUID.isNamespaceString()) { + acquisitionRequests.emplace_back(request.nssOrUUID.nss(), + request.expectedUUID, + request.placementConcern, + request.readConcern, + request.operationType, + AcquisitionPrerequisites::ViewMode::kCanBeView, + request.lockAcquisitionDeadline); + } else { + acquisitionRequests.emplace_back(std::move(request.nssOrUUID), + request.placementConcern, + request.readConcern, + request.operationType, + AcquisitionPrerequisites::ViewMode::kCanBeView, + request.lockAcquisitionDeadline); + } + } + + auto acquisitions = + makeAcquisitionMap(acquireCollectionsOrViews(opCtx, acquisitionRequests, MODE_IS)); + for (const auto& [nss, acq] : acquisitions) { + uassert(ErrorCodes::CommandNotSupportedOnView, + "Cannot compact structured encryption data on a view", + !acq.isView()); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Collection '" << nss.toStringForErrorMsg() << "' does not exist", + acq.collectionExists()); + } + return acquisitions; +} + +EncryptedStateCollectionsNamespaces validateCompactRequestAndGetNamespaces( + OperationContext* opCtx, const CompactStructuredEncryptionData& request) { + const auto& edcNss = request.getNamespace(); + auto collections = acquireAndValidateCollections( + opCtx, + {CollectionAcquisitionRequest::fromOpCtx( + opCtx, edcNss, AcquisitionPrerequisites::OperationType::kRead)}); + const auto& edcCollection = collections.at(edcNss); + validateCompactRequest(request, *edcCollection.getCollectionPtr().get()); + return uassertStatusOK(EncryptedStateCollectionsNamespaces::createFromDataCollection( + *edcCollection.getCollectionPtr().get())); +} + CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx, const CompactStructuredEncryptionData& request) { { @@ -113,31 +163,12 @@ CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx, LOGV2(6319900, "Compacting the encrypted compaction collection", logAttrs(edcNss)); - AutoGetDb autoDb(opCtx, edcNss.dbName(), MODE_IX); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Database '" << edcNss.dbName().toStringForErrorMsg() - << "' does not exist", - autoDb.getDb()); - - auto catalog = CollectionCatalog::get(opCtx); - Lock::CollectionLock edcLock(opCtx, edcNss, MODE_IS); - - // Check the data collection exists and is not a view - auto edc = catalog->lookupCollectionByNamespace(opCtx, edcNss); - if (!edc) { - uassert(ErrorCodes::CommandNotSupportedOnView, - "Cannot compact structured encryption data on a view", - !catalog->lookupView(opCtx, edcNss)); - uasserted(ErrorCodes::NamespaceNotFound, - str::stream() << "Collection '" << edcNss.toStringForErrorMsg() - << "' does not exist"); - } - - validateCompactRequest(request, *edc); - - auto namespaces = - uassertStatusOK(EncryptedStateCollectionsNamespaces::createFromDataCollection(*edc)); + auto [namespaces, scopedCompactCleanupMutex] = + acquireFLECompactCleanupMutexWithStableNamespaces( + opCtx, [&] { return validateCompactRequestAndGetNamespaces(opCtx, request); }); + // Register the DDL with the replica set tracker so any local QE state collection DDL remains + // visible to metadata synchronization, without taking the replica set DDL locks here. ReplicaSetDDLTracker::ScopedReplicaSetDDL scopedReplicaSetDDL( opCtx, std::vector{namespaces.edcNss, @@ -146,50 +177,61 @@ CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx, namespaces.ecocRenameNss, namespaces.ecocLockNss}); - // Acquire exclusive lock on the associated 'ecoc.lock' namespace to serialize calls - // to cleanup and compact on the same EDC namespace - Lock::CollectionLock compactionLock(opCtx, namespaces.ecocLockNss, MODE_X); - - // Step 1: rename the ECOC collection if it exists - catalog = CollectionCatalog::get(opCtx); - auto ecoc = catalog->lookupCollectionByNamespace(opCtx, namespaces.ecocNss); - auto ecocRename = catalog->lookupCollectionByNamespace(opCtx, namespaces.ecocRenameNss); - CompactStats stats({}, {}); FLECompactESCDeleteSet escDeleteSet; + bool createEcoc = false; + bool renameEcoc = false; + { + CollectionAcquisitionRequests acquisitionRequests = { + CollectionAcquisitionRequest::fromOpCtx( + opCtx, namespaces.ecocNss, AcquisitionPrerequisites::OperationType::kRead), + CollectionAcquisitionRequest::fromOpCtx( + opCtx, namespaces.ecocRenameNss, AcquisitionPrerequisites::OperationType::kRead)}; + auto acquisitions = + makeAcquisitionMap(acquireCollectionsMaybeLockFree(opCtx, acquisitionRequests)); - if (!ecoc && !ecocRename) { - // nothing to compact - LOGV2(6548306, "Skipping compaction as there is no ECOC collection to compact"); - return stats; - } else if (ecoc && !ecocRename) { - // load the random set of ESC non-anchor entries to be deleted post-compact. - // This must be done before renaming the ECOC because if not, we can end up with - // ESC entries that have no corresponding ECOC entry in the renamed ECOC. - auto memoryLimit = - ServerParameterSet::getClusterParameterSet() - ->get>("fleCompactionOptions") - ->getValue(boost::none) - .getMaxCompactionSize(); + const auto& ecoc = acquisitions.at(namespaces.ecocNss).getCollectionPtr(); + const auto& ecocRename = acquisitions.at(namespaces.ecocRenameNss).getCollectionPtr(); - escDeleteSet = - readRandomESCNonAnchorIds(opCtx, namespaces.escNss, memoryLimit, &stats.getEsc()); + // Step 1: rename the ECOC collection if it exists + if (!ecoc && !ecocRename) { + // nothing to compact + LOGV2(6548306, "Skipping compaction as there is no ECOC collection to compact"); + return stats; + } else if (ecoc && !ecocRename) { + // load the random set of ESC non-anchor entries to be deleted post-compact. + // This must be done before renaming the ECOC because if not, we can end up with + // ESC entries that have no corresponding ECOC entry in the renamed ECOC. + auto memoryLimit = + ServerParameterSet::getClusterParameterSet() + ->get>("fleCompactionOptions") + ->getValue(boost::none) + .getMaxCompactionSize(); - LOGV2(7293603, - "Renaming the encrypted compaction collection", - "ecocNss"_attr = namespaces.ecocNss, - "ecocRenameNss"_attr = namespaces.ecocRenameNss); + escDeleteSet = + readRandomESCNonAnchorIds(opCtx, namespaces.escNss, memoryLimit, &stats.getEsc()); + + LOGV2(7293603, + "Renaming the encrypted compaction collection", + "ecocNss"_attr = namespaces.ecocNss, + "ecocRenameNss"_attr = namespaces.ecocRenameNss); + renameEcoc = createEcoc = true; + } else { + LOGV2(7293610, + "Resuming compaction from a stale ECOC collection", + logAttrs(namespaces.ecocRenameNss)); + if (!ecoc) { + createEcoc = true; + } + } + } + + if (renameEcoc) { RenameCollectionOptions renameOpts; validateAndRunRenameCollection( opCtx, namespaces.ecocNss, namespaces.ecocRenameNss, renameOpts); - ecoc = nullptr; - } else { - LOGV2(7293610, - "Resuming compaction from a stale ECOC collection", - logAttrs(namespaces.ecocRenameNss)); } - - if (!ecoc) { + if (createEcoc) { if (MONGO_unlikely(fleCompactHangBeforeECOCCreateUnsharded.shouldFail())) { LOGV2(7299601, "Hanging due to fleCompactHangBeforeECOCCreateUnsharded fail point"); fleCompactHangBeforeECOCCreateUnsharded.pauseWhileSet(); @@ -222,15 +264,23 @@ CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx, // Step 2: for each encrypted field in compactionTokens, get distinct set of entries 'C' // from ECOC, and for each entry in 'C', compact ESC. { - // acquire IS lock on the ecocRenameNss to prevent it from being dropped during compact - AutoGetCollection tempEcocColl(opCtx, namespaces.ecocRenameNss, MODE_IS); + // Acquire ecoc.compact and the EDC before stashing transaction resources. Restore will + // re-establish these acquisitions after the internal transaction commits and fail if either + // collection was dropped or recreated while the resources were stashed. + auto collections = acquireAndValidateCollections( + opCtx, + {CollectionAcquisitionRequest::fromOpCtx( + opCtx, namespaces.ecocRenameNss, AcquisitionPrerequisites::OperationType::kRead), + CollectionAcquisitionRequest::fromOpCtx( + opCtx, namespaces.edcNss, AcquisitionPrerequisites::OperationType::kRead)}); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Renamed encrypted compaction collection " - << namespaces.ecocRenameNss.toStringForErrorMsg() - << " no longer exists prior to compaction", - *tempEcocColl); + // Ensure the EDC collection is still valid + validateCompactRequest(request, + *collections.at(namespaces.edcNss).getCollectionPtr().get()); + // Stash any resource before starting the internal transaction and restore them after the + // transaction commits + StashTransactionResourcesForMultiDocumentTransaction stasher(opCtx); processFLECompactV2(opCtx, request, &getTransactionWithRetriesForMongoD, @@ -238,6 +288,9 @@ CompactStats compactEncryptedCompactionCollection(OperationContext* opCtx, getFLE2TaskExecutorForMongoD(opCtx), &stats.getEsc(), &stats.getEcoc()); + stasher.restoreOnCommit(); + validateCompactRequest(request, + *collections.at(namespaces.edcNss).getCollectionPtr().get()); } auto tagsPerDelete = diff --git a/src/mongo/db/fle_compact_cleanup_mutex.cpp b/src/mongo/db/fle_compact_cleanup_mutex.cpp new file mode 100644 index 00000000000..e53fe06d22f --- /dev/null +++ b/src/mongo/db/fle_compact_cleanup_mutex.cpp @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/fle_compact_cleanup_mutex.h" + +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/assert_util.h" + +#include +#include + +namespace mongo { + +class FLECompactCleanupMutex { +public: + std::mutex mutex; + stdx::condition_variable cv; + bool isLocked = false; +}; + +namespace { + +class FLECompactCleanupMutexRegistry { +public: + std::shared_ptr getOrCreateMutexEntry( + const NamespaceString& ecocLockNss) { + std::lock_guard lk(_mapMutex); + auto it = _mutexes.find(ecocLockNss); + if (it != _mutexes.end()) { + if (auto state = it->second.lock()) { + return state; + } + } + + auto state = std::make_shared(); + _mutexes[ecocLockNss] = state; + return state; + } + + void eraseMutexEntryIfUnused(const NamespaceString& ecocLockNss, + const std::shared_ptr& state) { + std::lock_guard lk(_mapMutex); + auto it = _mutexes.find(ecocLockNss); + if (it == _mutexes.end()) { + return; + } + + auto current = it->second.lock(); + if (!current) { + _mutexes.erase(it); + return; + } + + // 'current' is a temporary strong reference from weak_ptr::lock(), so a use count of 2 + // means the scoped object being destroyed is the only real owner. + if (current == state && state.use_count() == 2) { + _mutexes.erase(it); + } + } + + size_t sizeForTest() const { + std::lock_guard lk(_mapMutex); + return _mutexes.size(); + } + +private: + mutable std::mutex _mapMutex; + std::map> _mutexes; +}; + +const auto getFLECompactCleanupMutexRegistry = + ServiceContext::declareDecoration(); + +} // namespace + +ScopedFLECompactCleanupMutex::ScopedFLECompactCleanupMutex(OperationContext* opCtx, + const NamespaceString& ecocLockNss) + : _opCtx(opCtx), + _ecocLockNss(ecocLockNss), + _mutex(getFLECompactCleanupMutexRegistry(opCtx->getServiceContext()) + .getOrCreateMutexEntry(ecocLockNss)) { + try { + std::unique_lock lk(_mutex->mutex); + opCtx->waitForConditionOrInterrupt(_mutex->cv, lk, [this] { return !_mutex->isLocked; }); + _mutex->isLocked = true; + } catch (...) { + getFLECompactCleanupMutexRegistry(_opCtx->getServiceContext()) + .eraseMutexEntryIfUnused(_ecocLockNss, _mutex); + throw; + } +} + +ScopedFLECompactCleanupMutex::~ScopedFLECompactCleanupMutex() { + { + std::lock_guard lk(_mutex->mutex); + invariant(_mutex->isLocked); + _mutex->isLocked = false; + } + _mutex->cv.notify_one(); + + getFLECompactCleanupMutexRegistry(_opCtx->getServiceContext()) + .eraseMutexEntryIfUnused(_ecocLockNss, _mutex); +} + +size_t getFLECompactCleanupMutexRegistrySizeForTest(ServiceContext* serviceContext) { + return getFLECompactCleanupMutexRegistry(serviceContext).sizeForTest(); +} + +} // namespace mongo diff --git a/src/mongo/db/fle_compact_cleanup_mutex.h b/src/mongo/db/fle_compact_cleanup_mutex.h new file mode 100644 index 00000000000..3088d3d10ec --- /dev/null +++ b/src/mongo/db/fle_compact_cleanup_mutex.h @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/namespace_string.h" +#include "mongo/util/modules.h" + +#include +#include +#include + +namespace mongo { + +class FLECompactCleanupMutex; +class OperationContext; +class ServiceContext; + +/** + * Serializes FLE compact and cleanup operations for the same EDC using an in-memory mutex keyed by + * the derived 'ecoc.lock' namespace. This mutex intentionally lives outside the locker/transaction + * resource machinery so it remains effective across the internal transaction stash/restore flow. + */ +class MONGO_MOD_PUB ScopedFLECompactCleanupMutex { +public: + ScopedFLECompactCleanupMutex(OperationContext* opCtx, const NamespaceString& ecocLockNss); + ~ScopedFLECompactCleanupMutex(); + + ScopedFLECompactCleanupMutex(const ScopedFLECompactCleanupMutex&) = delete; + ScopedFLECompactCleanupMutex& operator=(const ScopedFLECompactCleanupMutex&) = delete; + +private: + OperationContext* _opCtx; + NamespaceString _ecocLockNss; + std::shared_ptr _mutex; +}; + +MONGO_MOD_PUB size_t getFLECompactCleanupMutexRegistrySizeForTest(ServiceContext* serviceContext); + +template +MONGO_MOD_PUB auto acquireFLECompactCleanupMutexWithStableNamespaces(OperationContext* opCtx, + GetNamespaces getNamespaces) { + auto namespaces = getNamespaces(); + while (true) { + auto scopedMutex = + std::make_unique(opCtx, namespaces.ecocLockNss); + + auto latestNamespaces = getNamespaces(); + if (latestNamespaces.ecocLockNss == namespaces.ecocLockNss) { + return std::pair{std::move(latestNamespaces), std::move(scopedMutex)}; + } + + namespaces = std::move(latestNamespaces); + } +} + +} // namespace mongo diff --git a/src/mongo/db/fle_compact_cleanup_mutex_test.cpp b/src/mongo/db/fle_compact_cleanup_mutex_test.cpp new file mode 100644 index 00000000000..2ddaec7a218 --- /dev/null +++ b/src/mongo/db/fle_compact_cleanup_mutex_test.cpp @@ -0,0 +1,263 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/fle_compact_cleanup_mutex.h" + +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/unittest/unittest.h" + +#include +#include +#include +#include +#include +#include + +namespace mongo { +namespace { + +bool waitForPredicate(const std::function& pred) { + constexpr auto kTimeout = std::chrono::seconds(5); + constexpr auto kSleepInterval = std::chrono::milliseconds(10); + + const auto deadline = std::chrono::steady_clock::now() + kTimeout; + while (std::chrono::steady_clock::now() < deadline) { + if (pred()) { + return true; + } + std::this_thread::sleep_for(kSleepInterval); + } + return pred(); +} + +void markKilled(OperationContext* opCtx) { + std::lock_guard lk(*opCtx->getClient()); + opCtx->markKilled(ErrorCodes::Interrupted); +} + +void rethrowIfSet(const std::exception_ptr& ex) { + if (ex) { + std::rethrow_exception(ex); + } +} + +class FLECompactCleanupMutexTest : public ServiceContextMongoDTest { +protected: + size_t registrySize() const { + return getFLECompactCleanupMutexRegistrySizeForTest(getServiceContext()); + } +}; + +struct TestNamespaces { + NamespaceString ecocLockNss; +}; + +TEST_F(FLECompactCleanupMutexTest, StableNamespacesHelperReturnsPostLockNamespaces) { + auto opCtx = makeOperationContext(); + std::vector namespaceResults{ + {NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock")}, + {NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock")}}; + size_t calls = 0; + + auto [namespaces, scopedMutex] = acquireFLECompactCleanupMutexWithStableNamespaces( + opCtx.get(), [&] { return namespaceResults[calls++]; }); + + ASSERT_EQ(namespaceResults.back().ecocLockNss, namespaces.ecocLockNss); + ASSERT_EQ(2, calls); + ASSERT_EQ(1, registrySize()); + + scopedMutex.reset(); + ASSERT_EQ(0, registrySize()); +} + +TEST_F(FLECompactCleanupMutexTest, StableNamespacesHelperRetriesWhenLockNamespaceChanges) { + auto opCtx = makeOperationContext(); + auto firstNss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll1.ecoc.lock"); + auto secondNss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll2.ecoc.lock"); + std::vector namespaceResults{{firstNss}, {secondNss}, {secondNss}}; + size_t calls = 0; + + auto [namespaces, scopedMutex] = acquireFLECompactCleanupMutexWithStableNamespaces( + opCtx.get(), [&] { return namespaceResults[calls++]; }); + + ASSERT_EQ(secondNss, namespaces.ecocLockNss); + ASSERT_EQ(3, calls); + ASSERT_EQ(1, registrySize()); + + scopedMutex.reset(); + ASSERT_EQ(0, registrySize()); +} + +TEST_F(FLECompactCleanupMutexTest, ScopedAcquisitionsOnSameNamespaceSerialize) { + auto nss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock"); + auto holderOpCtx = makeOperationContext(); + auto holder = std::make_unique(holderOpCtx.get(), nss); + + std::promise waiterOpCtxPromise; + auto waiterOpCtxFuture = waiterOpCtxPromise.get_future(); + std::promise acquiredPromise; + auto acquiredFuture = acquiredPromise.get_future(); + std::promise releasePromise; + auto releaseFuture = releasePromise.get_future(); + std::exception_ptr waiterException; + + std::thread waiter([&] { + try { + auto client = getServiceContext()->getService()->makeClient("sameNamespaceWaiter"); + auto opCtx = client->makeOperationContext(); + waiterOpCtxPromise.set_value(opCtx.get()); + + ScopedFLECompactCleanupMutex scoped(opCtx.get(), nss); + acquiredPromise.set_value(); + releaseFuture.get(); + } catch (...) { + waiterException = std::current_exception(); + } + }); + + auto* waiterOpCtx = waiterOpCtxFuture.get(); + const auto waiterBlocked = + waitForPredicate([&] { return waiterOpCtx->isWaitingForConditionOrInterrupt(); }); + const auto acquiredBeforeRelease = + acquiredFuture.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready; + + holder.reset(); + const auto acquiredAfterRelease = + acquiredFuture.wait_for(std::chrono::seconds(5)) == std::future_status::ready; + if (!acquiredAfterRelease) { + markKilled(waiterOpCtx); + } + releasePromise.set_value(); + waiter.join(); + + ASSERT_TRUE(waiterBlocked); + ASSERT_FALSE(acquiredBeforeRelease); + ASSERT_TRUE(acquiredAfterRelease); + rethrowIfSet(waiterException); + ASSERT_EQ(0, registrySize()); +} + +TEST_F(FLECompactCleanupMutexTest, ScopedAcquisitionsOnDifferentNamespacesDoNotBlock) { + auto nss1 = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll1.ecoc.lock"); + auto nss2 = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll2.ecoc.lock"); + auto holderOpCtx = makeOperationContext(); + auto holder = std::make_unique(holderOpCtx.get(), nss1); + + std::promise acquiredPromise; + auto acquiredFuture = acquiredPromise.get_future(); + std::promise releasePromise; + auto releaseFuture = releasePromise.get_future(); + std::exception_ptr waiterException; + + std::thread waiter([&] { + try { + auto client = getServiceContext()->getService()->makeClient("differentNamespaceWaiter"); + auto opCtx = client->makeOperationContext(); + + ScopedFLECompactCleanupMutex scoped(opCtx.get(), nss2); + acquiredPromise.set_value(); + releaseFuture.get(); + } catch (...) { + waiterException = std::current_exception(); + } + }); + + const auto acquiredWhileFirstNamespaceHeld = + acquiredFuture.wait_for(std::chrono::seconds(5)) == std::future_status::ready; + const auto registrySizeWhileBothAreHeld = registrySize(); + + holder.reset(); + releasePromise.set_value(); + waiter.join(); + + ASSERT_TRUE(acquiredWhileFirstNamespaceHeld); + ASSERT_EQ(2, registrySizeWhileBothAreHeld); + rethrowIfSet(waiterException); + ASSERT_EQ(0, registrySize()); +} + +TEST_F(FLECompactCleanupMutexTest, InterruptedWaiterDoesNotLeakRegistryEntry) { + auto nss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock"); + auto holderOpCtx = makeOperationContext(); + auto holder = std::make_unique(holderOpCtx.get(), nss); + + std::promise waiterOpCtxPromise; + auto waiterOpCtxFuture = waiterOpCtxPromise.get_future(); + std::promise interruptedPromise; + auto interruptedFuture = interruptedPromise.get_future(); + std::exception_ptr waiterException; + + std::thread waiter([&] { + try { + auto client = getServiceContext()->getService()->makeClient("interruptedWaiter"); + auto opCtx = client->makeOperationContext(); + waiterOpCtxPromise.set_value(opCtx.get()); + + ASSERT_THROWS_CODE(ScopedFLECompactCleanupMutex(opCtx.get(), nss), + DBException, + ErrorCodes::Interrupted); + interruptedPromise.set_value(); + } catch (...) { + waiterException = std::current_exception(); + } + }); + + auto* waiterOpCtx = waiterOpCtxFuture.get(); + const auto waiterBlocked = + waitForPredicate([&] { return waiterOpCtx->isWaitingForConditionOrInterrupt(); }); + markKilled(waiterOpCtx); + const auto interrupted = + interruptedFuture.wait_for(std::chrono::seconds(5)) == std::future_status::ready; + + holder.reset(); + waiter.join(); + + ASSERT_TRUE(waiterBlocked); + ASSERT_TRUE(interrupted); + rethrowIfSet(waiterException); + ASSERT_EQ(0, registrySize()); +} + +TEST_F(FLECompactCleanupMutexTest, RegistryEntriesAreRemovedAfterScopedObjectsAreDestroyed) { + auto nss = NamespaceString::createNamespaceString_forTest("test.enxcol_.coll.ecoc.lock"); + auto opCtx = makeOperationContext(); + + ASSERT_EQ(0, registrySize()); + { + ScopedFLECompactCleanupMutex scoped(opCtx.get(), nss); + ASSERT_EQ(1, registrySize()); + } + ASSERT_EQ(0, registrySize()); +} + +} // namespace +} // namespace mongo