From 8010c36590c83dedb10795f22061ba0287d35e57 Mon Sep 17 00:00:00 2001 From: seanzimm <102551488+seanzimm@users.noreply.github.com> Date: Tue, 5 May 2026 11:30:10 -0400 Subject: [PATCH] SERVER-124365: Extend Intent lifetime inside WUOW (#52491) GitOrigin-RevId: 42cf5a3e894215adc52c615b80ab01d797f25d92 --- src/mongo/db/mongod_main.cpp | 28 +- src/mongo/db/repl/BUILD.bazel | 3 + src/mongo/db/repl/intent_guard.cpp | 8 +- src/mongo/db/repl/intent_guard.h | 9 +- src/mongo/db/repl/intent_registry.cpp | 396 ++++++++++++++---- src/mongo/db/repl/intent_registry.h | 51 ++- src/mongo/db/repl/intent_registry_test.cpp | 66 ++- src/mongo/db/repl/local_oplog_info.cpp | 14 + src/mongo/db/repl/oplog.cpp | 19 - .../db/repl/replication_coordinator_impl.cpp | 16 +- ...ion_coordinator_impl_step_up_step_down.cpp | 1 - src/mongo/db/repl/rollback_impl.cpp | 19 +- .../shard_role/lock_manager/d_concurrency.cpp | 19 + .../transaction/transaction_participant.cpp | 11 + 14 files changed, 537 insertions(+), 123 deletions(-) diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 59fd2d6b4bd..088849f1886 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -185,6 +185,7 @@ #include "mongo/db/shard_role/shard_catalog/db_raii.h" #include "mongo/db/shard_role/shard_catalog/shard_catalog_history_cleanup.h" #include "mongo/db/shard_role/shard_catalog/shard_filtering_metadata_refresh.h" +#include "mongo/db/shard_role/transaction_resources.h" #include "mongo/db/sharding_environment/config_server_op_observer.h" #include "mongo/db/sharding_environment/grid.h" #include "mongo/db/sharding_environment/shard_server_op_observer.h" @@ -1783,12 +1784,27 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { "Enqueuing the ReplicationStateTransitionLock for shutdown"); boost::optional rstg; if (gFeatureFlagIntentRegistration.isEnabled()) { - rstg.emplace(rss::consensus::IntentRegistry::get(serviceContext) - .killConflictingOperations( - rss::consensus::IntentRegistry::InterruptionType::Shutdown, - opCtx, - 0 /* no timeout */) - .get()); + rstg.emplace( + rss::consensus::IntentRegistry::get(serviceContext) + .killConflictingOperations( + rss::consensus::IntentRegistry::InterruptionType::Shutdown, + opCtx, + [svcCtx = serviceContext] { + // Kill unprepared transactions to release intents whose lifetimes were + // extended by the WUOW but do not belong to an active opCtx. + auto client = svcCtx->getService()->makeClient( + "KillSessionsForShutdown", Client::noSession()); + AlternativeClientRegion acr(client); + auto killOpCtx = cc().makeOperationContext(); + shard_role_details::getRecoveryUnit(killOpCtx.get()) + ->setNoEvictionAfterCommitOrRollback(); + SessionKiller::Matcher matcher(KillAllSessionsByPatternSet{ + makeKillAllSessionsByPattern(killOpCtx.get())}); + killSessionsAbortUnpreparedTransactions( + killOpCtx.get(), matcher, ErrorCodes::InterruptedAtShutdown); + }, + boost::optional{0} /* no timeout */) + .get()); } repl::ReplicationStateTransitionLockGuard rstl( opCtx, MODE_X, repl::ReplicationStateTransitionLockGuard::EnqueueOnly()); diff --git a/src/mongo/db/repl/BUILD.bazel b/src/mongo/db/repl/BUILD.bazel index 8fe59a092c6..e6a52a6ae51 100644 --- a/src/mongo/db/repl/BUILD.bazel +++ b/src/mongo/db/repl/BUILD.bazel @@ -1771,6 +1771,7 @@ mongo_cc_unit_test( tags = ["mongo_unittest_third_group"], deps = [ ":intent_registry_test_fixture", + "//src/mongo/idl:server_parameter_test_util", ], ) @@ -2374,10 +2375,12 @@ mongo_cc_library( "local_oplog_info.cpp", ], deps = [ + ":intent_registry", ":oplog_visibility_manager", ":optime", ":repl_coordinator_interface", "//src/mongo/db:server_base", + "//src/mongo/db:server_feature_flags", "//src/mongo/db/admission:flow_control", "//src/mongo/db/rss:replicated_storage_service", "//src/mongo/db/storage:oplog_truncate_markers", diff --git a/src/mongo/db/repl/intent_guard.cpp b/src/mongo/db/repl/intent_guard.cpp index 4cc733ffbad..881cb1d53d8 100644 --- a/src/mongo/db/repl/intent_guard.cpp +++ b/src/mongo/db/repl/intent_guard.cpp @@ -32,11 +32,13 @@ namespace mongo::rss::consensus { IntentGuard::IntentGuard(IntentRegistry::Intent intent, OperationContext* opctx) : _opCtx(opctx), - _token(IntentRegistry::get(_opCtx->getServiceContext()).registerIntent(intent, _opCtx)) {} + _svcCtx(_opCtx->getClient()->getServiceContext()), + _token(IntentRegistry::get(_svcCtx).registerIntent(intent, _opCtx)) {} void IntentGuard::reset() { - if (_opCtx) { - IntentRegistry::get(_opCtx->getServiceContext()).deregisterIntent(_token); + if (_svcCtx) { + IntentRegistry::get(_svcCtx).deregisterIntent(_token); + _svcCtx = nullptr; _opCtx = nullptr; } } diff --git a/src/mongo/db/repl/intent_guard.h b/src/mongo/db/repl/intent_guard.h index b8d4b7631f0..6be3e3eb3e8 100644 --- a/src/mongo/db/repl/intent_guard.h +++ b/src/mongo/db/repl/intent_guard.h @@ -51,11 +51,15 @@ public: IntentGuard(const IntentGuard&) = delete; IntentGuard& operator=(const IntentGuard&) = delete; - IntentGuard(IntentGuard&& o) noexcept : _opCtx(std::exchange(o._opCtx, {})), _token(o._token) {} + IntentGuard(IntentGuard&& o) noexcept + : _opCtx(std::exchange(o._opCtx, {})), + _svcCtx(std::exchange(o._svcCtx, {})), + _token(o._token) {} IntentGuard& operator=(IntentGuard&& o) noexcept { if (this != &o) { _opCtx = std::exchange(o._opCtx, {}); + _svcCtx = std::exchange(o._svcCtx, {}); _token = o._token; } return *this; @@ -84,6 +88,9 @@ public: private: OperationContext* _opCtx; + // Stored separately so reset() can find the registry even when _opCtx is stale (e.g., + // when this guard is deferred to WUOW end and the original opCtx has been destroyed). + ServiceContext* _svcCtx = nullptr; IntentRegistry::IntentToken _token; }; diff --git a/src/mongo/db/repl/intent_registry.cpp b/src/mongo/db/repl/intent_registry.cpp index 54910b3431c..be844718b75 100644 --- a/src/mongo/db/repl/intent_registry.cpp +++ b/src/mongo/db/repl/intent_registry.cpp @@ -30,6 +30,7 @@ #include "mongo/db/repl/intent_registry.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/session/logical_session_id_helpers.h" #include "mongo/db/shard_role/transaction_resources.h" #include "mongo/db/storage/storage_options.h" #include "mongo/logv2/log.h" @@ -44,10 +45,68 @@ namespace mongo { namespace rss { namespace consensus { + +// Decoration on Client. When a client is destroyed (e.g., connection closed) while its stashed +// WUOW still holds write-intent tokens, deregisters all tokens for that client before the client +// pointer is freed. This prevents _killOperationsByIntent from dereferencing a dangling client. +// +// Safety: deregisterTokensForClient acquires tokenMap.lock without holding ClientLock. When +// _killOperationsByIntent holds tokenMap.lock and calls ClientLock(client), this cleanup is +// blocked until the lock is released — at which point the client is still alive. Once this +// cleanup acquires the lock and removes the tokens, _killOperationsByIntent will no longer find +// entries for this client. +class ClientIntentCleanup { +public: + void init(IntentRegistry* registry, Client* client) { + if (!_registry) { + _registry = registry; + _client = client; + } + } + + ~ClientIntentCleanup() { + if (_registry) { + _registry->deregisterTokensForClient(_client); + } + } + +private: + IntentRegistry* _registry = nullptr; + Client* _client = nullptr; +}; + +// Decoration on OperationContext. When an opCtx with active write intents is destroyed (e.g., +// within a multi-document transaction where the lock release is deferred to the WUOW end), +// removes the opId-to-counter mapping from IntentRegistry so the deferred deregistration +// callback skips decrementing the counter of whatever new opCtx is at the same memory address. +class WriteIntentCleanup { +public: + void init(IntentRegistry* registry, uint64_t opId) { + if (!_registry) { + _registry = registry; + _opId = opId; + } + } + + ~WriteIntentCleanup() { + if (_registry) { + _registry->_unregisterWriteCountForOpId(_opId); + } + } + +private: + IntentRegistry* _registry = nullptr; + uint64_t _opId = 0; +}; + namespace { auto registryDecoration = ServiceContext::declareDecoration(); const auto writeIntentCountOnOpCtx = OperationContext::declareDecoration>(); +// Declared after writeIntentCountOnOpCtx so it destructs first (reverse order), ensuring the +// counter pointer is removed from the registry before the counter memory is freed. +const auto writeIntentCleanup = OperationContext::declareDecoration(); +const auto clientIntentCleanup = Client::declareDecoration(); } // namespace @@ -138,18 +197,39 @@ IntentRegistry::IntentToken IntentRegistry::registerIntent(IntentRegistry::Inten } IntentToken token(intent); - LOGV2_DEBUG(9945004, - 3, - "Register Intent", - "token"_attr = token.id(), - "intent"_attr = intentToString(intent), - "opCtx"_attr = opCtx->getOpID()); if (intent == Intent::Write || intent == Intent::BlockingWrite) { - writeIntentCountOnOpCtx(opCtx).fetchAndAdd(1); + auto newCount = writeIntentCountOnOpCtx(opCtx).addAndFetch(1); + LOGV2_DEBUG(12436500, + 3, + "Register Intent", + "token"_attr = token.id(), + "intent"_attr = intentToString(intent), + "opCtx"_attr = opCtx->getOpID(), + "writeIntentCount"_attr = newCount); + // Register the opId -> counter mapping so deregisterIntent can correctly + // skip the decrement if this opCtx is destroyed before the WUOW ends. + { + std::lock_guard opIdLock(_opIdMutex); + _opIdToWriteCountPtr.try_emplace(opCtx->getOpID(), &writeIntentCountOnOpCtx(opCtx)); + } + writeIntentCleanup(opCtx).init(this, opCtx->getOpID()); + clientIntentCleanup(opCtx->getClient()).init(this, opCtx->getClient()); + } else { + LOGV2_DEBUG(12436501, + 3, + "Register Intent", + "token"_attr = token.id(), + "intent"_attr = intentToString(intent), + "opCtx"_attr = opCtx->getOpID()); } { std::unique_lock lockTokenMap(tokenMap.lock); - tokenMap.map.insert({token.id(), opCtx}); + tokenMap.map.insert({token.id(), + {opCtx, + opCtx->getClient(), + opCtx->getServiceContext(), + opCtx->getOpID(), + opCtx->getLogicalSessionId()}}); } return token; @@ -159,7 +239,36 @@ void IntentRegistry::deregisterIntent(IntentRegistry::IntentToken token) { std::lock_guard lock(tokenMap.lock); if (token.intent() == Intent::Write || token.intent() == Intent::BlockingWrite) { - writeIntentCountOnOpCtx(tokenMap.map[token.id()]).fetchAndSubtract(1); + auto it = tokenMap.map.find(token.id()); + if (it != tokenMap.map.end()) { + const uint64_t opId = it->second.opId; + // Use the opId (captured at registration time) to find the counter. If the opCtx was + // destroyed before this callback fired (e.g., deferred lock release in a multi-document + // transaction), WriteIntentCleanup will have already removed the entry, and we skip + // the decrement to avoid corrupting a new opCtx allocated at the same address. + std::lock_guard opIdLock(_opIdMutex); + auto countIt = _opIdToWriteCountPtr.find(opId); + if (countIt != _opIdToWriteCountPtr.end()) { + auto newCount = countIt->second->subtractAndFetch(1); + LOGV2_DEBUG(12436502, + 3, + "Deregister Intent", + "token"_attr = token.id(), + "intent"_attr = intentToString(token.intent()), + "opId"_attr = opId, + "writeIntentCount"_attr = newCount); + if (newCount == 0) { + _opIdToWriteCountPtr.erase(countIt); + } + } else { + LOGV2_DEBUG(12436503, + 3, + "Deregister Intent skipped (opCtx already destroyed)", + "token"_attr = token.id(), + "intent"_attr = intentToString(token.intent()), + "opId"_attr = opId); + } + } } (void)tokenMap.map.erase(token.id()); if (tokenMap.map.empty()) { @@ -167,6 +276,61 @@ void IntentRegistry::deregisterIntent(IntentRegistry::IntentToken token) { } } +void IntentRegistry::_unregisterWriteCountForOpId(uint64_t opId) { + std::lock_guard opIdLock(_opIdMutex); + _opIdToWriteCountPtr.erase(opId); +} + +void IntentRegistry::deregisterTokensForClient(Client* client) { + for (size_t i = 0; i < _tokenMaps.size(); i++) { + auto intent = static_cast(i); + auto& tokenMap = _tokenMaps[i]; + + std::vector tokensToDeregister; + { + std::lock_guard lock(tokenMap.lock); + for (auto& [id, entry] : tokenMap.map) { + if (entry.client == client) { + tokensToDeregister.push_back(IntentToken(intent, id)); + } + } + } + + for (auto& token : tokensToDeregister) { + deregisterIntent(token); + } + } +} + +void IntentRegistry::deregisterTokensForSession(OperationContext* opCtx, + const LogicalSessionId& lsid) { + // Match on either the child lsid directly or its parent lsid for internal transactions. + auto parentLsid = getParentSessionId(lsid); + uint64_t currentOpId = opCtx->getOpID(); + + for (size_t i = 0; i < _tokenMaps.size(); i++) { + auto intent = static_cast(i); + auto& tokenMap = _tokenMaps[i]; + + std::vector tokensToDeregister; + { + std::lock_guard lock(tokenMap.lock); + for (auto& [id, entry] : tokenMap.map) { + bool lsidMatch = entry.lsid && + (*entry.lsid == lsid || (parentLsid && *entry.lsid == *parentLsid)); + bool opIdMatch = entry.opId == currentOpId; + if (lsidMatch || opIdMatch) { + tokensToDeregister.push_back(IntentToken(intent, id)); + } + } + } + + for (auto& token : tokensToDeregister) { + deregisterIntent(token); + } + } +} + bool IntentRegistry::canDeclareIntent(Intent intent, OperationContext* opCtx) { invariant(intent < Intent::_NumDistinctIntents_); invariant(opCtx); @@ -196,6 +360,7 @@ bool IntentRegistry::canDeclareIntent(Intent intent, OperationContext* opCtx) { std::future IntentRegistry::killConflictingOperations( IntentRegistry::InterruptionType interrupt, OperationContext* opCtx, + std::function postInterruptionCallback, boost::optional timeout_sec) { LOGV2(9945003, "Intent Registry killConflictingOperations", "interrupt"_attr = interrupt); _pendingStateChange.fetchAndAdd(1); @@ -203,7 +368,8 @@ std::future IntentRegistry::killConflictingOper timeout_sec ? *timeout_sec : repl::fassertOnLockTimeoutForStepUpDown.load()); _waitForDrain(Intent::BlockingWrite, - std::chrono::duration_cast(timeOutSec)); + std::chrono::duration_cast(timeOutSec), + interrupt); { std::unique_lock lock(_stateMutex); if (_interruptionCtx) { @@ -216,60 +382,73 @@ std::future IntentRegistry::killConflictingOper } // NOLINTNEXTLINE - return std::async(std::launch::async, [&, interrupt, timeOutSec] { - const std::vector* intents = nullptr; - switch (interrupt) { - case InterruptionType::Rollback: { - static const std::vector rollbackIntents = {Intent::Write, Intent::Read}; - intents = &rollbackIntents; - } break; - case InterruptionType::Shutdown: { - static const std::vector shutdownIntents = { - Intent::Write, Intent::Read, Intent::LocalWrite}; - intents = &shutdownIntents; - } break; - case InterruptionType::StepDown: { - static const std::vector stepdownIntents = {Intent::Write}; - intents = &stepdownIntents; - } break; - case InterruptionType::StepUp: - break; - default: - break; - } - - if (intents) { - for (auto intent : *intents) { - _killOperationsByIntent(intent, interrupt); + return std::async( + std::launch::async, [&, interrupt, timeOutSec, cb = std::move(postInterruptionCallback)] { + const std::vector* intents = nullptr; + switch (interrupt) { + case InterruptionType::Rollback: { + static const std::vector rollbackIntents = {Intent::Write, + Intent::Read}; + intents = &rollbackIntents; + } break; + case InterruptionType::Shutdown: { + static const std::vector shutdownIntents = { + Intent::Write, Intent::Read, Intent::LocalWrite}; + intents = &shutdownIntents; + } break; + case InterruptionType::StepDown: { + static const std::vector stepdownIntents = {Intent::Write}; + intents = &stepdownIntents; + } break; + case InterruptionType::StepUp: + break; + default: + break; } - Timer timer; - auto timeout = std::chrono::duration_cast(timeOutSec); - for (auto intent : *intents) { - _waitForDrain(intent, timeout); - // Negative duration to cv::wait_for can cause undefined behavior - // Since timeout == 0 is a special case to enable untimed wait we prevent a - // non-zero timeout to ever drop to 0 by setting it to at least to 1ms - if (timeout.count()) { - timeout -= std::min( - std::chrono::milliseconds(durationCount(timer.elapsed())), - timeout - 1ms); + + if (intents) { + for (auto intent : *intents) { + _killOperationsByIntent(intent, interrupt); + } + + if (cb) { + try { + cb(); + } catch (const DBException& e) { + LOGV2_WARNING(12436505, + "postInterruptionCallback threw during intent drain", + "error"_attr = e.toStatus()); + } + } + + Timer timer; + auto timeout = std::chrono::duration_cast(timeOutSec); + for (auto intent : *intents) { + _waitForDrain(intent, timeout, interrupt); + // Negative duration to cv::wait_for can cause undefined behavior + // Since timeout == 0 is a special case to enable untimed wait we prevent a + // non-zero timeout to ever drop to 0 by setting it to at least to 1ms + if (timeout.count()) { + timeout -= std::min( + std::chrono::milliseconds(durationCount(timer.elapsed())), + timeout - 1ms); + } } } - } - updateAndLogStateTransitionMetrics(interrupt, _totalOpsKilled); - _totalOpsKilled = 0; + updateAndLogStateTransitionMetrics(interrupt, _totalOpsKilled); + _totalOpsKilled = 0; - return ReplicationStateTransitionGuard([&]() { - std::lock_guard lock(_stateMutex); - _interruptionCtx = nullptr; - _lastInterruption = InterruptionType::None; - _activeInterruptionCV.notify_one(); - if (_pendingStateChange.subtractAndFetch(1) == 0) { - _pendingStateChangeCV.notify_all(); - } + return ReplicationStateTransitionGuard([&]() { + std::lock_guard lock(_stateMutex); + _interruptionCtx = nullptr; + _lastInterruption = InterruptionType::None; + _activeInterruptionCV.notify_one(); + if (_pendingStateChange.subtractAndFetch(1) == 0) { + _pendingStateChangeCV.notify_all(); + } + }); }); - }); } void IntentRegistry::updateAndLogStateTransitionMetrics(IntentRegistry::InterruptionType interrupt, @@ -323,9 +502,10 @@ void IntentRegistry::_killOperationsByIntent(IntentRegistry::Intent intent, InterruptionType interruption) { auto& tokenMap = _tokenMaps[(size_t)intent]; std::lock_guard lock(tokenMap.lock); - for (auto& [token, toKill] : tokenMap.map) { - auto serviceCtx = toKill->getServiceContext(); - auto client = toKill->getClient(); + for (auto& [token, entry] : tokenMap.map) { + auto* client = entry.client; + auto* svcCtx = entry.svcCtx; + if (interruption == InterruptionType::StepDown && !client->canKillOperationInStepdown()) { LOGV2(10336502, "Skipping killing intent for stepdown due to unkillable client", @@ -333,55 +513,119 @@ void IntentRegistry::_killOperationsByIntent(IntentRegistry::Intent intent, "registered_token"_attr = token); continue; } + + ClientLock clientLock(client); + + auto* currentOpCtx = client->getOperationContext(); + if (!currentOpCtx) { + // No active opCtx — token belongs to a stashed WUOW or a normal WUOW whose opCtx + // was temporarily released. Skip it; the postInterruptionCallback will abort stashed + // transactions, causing their WUOWs to deregister via the normal path. + LOGV2(12436506, + "Skipping intent token: client has no active opCtx", + "name"_attr = client->desc(), + "registered_token"_attr = token, + "registered_opId"_attr = entry.opId); + continue; + } + + // If the opId differs, the original opCtx was destroyed while the WUOW was stashed in + // the session (e.g., a multi-document transaction between statements). The current opCtx + // is an unrelated new operation on the same client — killing it would cause collateral + // damage without releasing the stashed WUOW. Skip it; the postInterruptionCallback will + // abort the stashed transaction via killSessionsAbortUnpreparedTransactions. + if (currentOpCtx->getOpID() != entry.opId) { + LOGV2(12436507, + "Skipping token: current opCtx differs from registrant (stashed WUOW)", + "name"_attr = client->desc(), + "registered_token"_attr = token, + "registered_opId"_attr = entry.opId, + "current_opId"_attr = currentOpCtx->getOpID()); + continue; + } + // Do not kill opCtx's that are inside an UninterruptibleLockGuard. - if (toKill->uninterruptibleLocksRequested_DO_NOT_USE()) { // NOLINT + if (currentOpCtx->uninterruptibleLocksRequested_DO_NOT_USE()) { // NOLINT LOGV2(10336500, "Skipping killing intent due to UninterruptibleLockGuard", "name"_attr = client->desc(), "registered_token"_attr = token); continue; } - ClientLock lock(client); + + if (currentOpCtx->getKillStatus() != ErrorCodes::OK) { + continue; + } + if (interruption == InterruptionType::Shutdown) { - serviceCtx->killOperation(lock, toKill, ErrorCodes::InterruptedAtShutdown); + svcCtx->killOperation(clientLock, currentOpCtx, ErrorCodes::InterruptedAtShutdown); } else { - serviceCtx->killOperation(lock, toKill, ErrorCodes::InterruptedDueToReplStateChange); + svcCtx->killOperation( + clientLock, currentOpCtx, ErrorCodes::InterruptedDueToReplStateChange); } _totalOpsKilled += 1; LOGV2(9795400, "Repl state change interrupted a thread.", "name"_attr = client->desc(), "registered token"_attr = token, - "killcode"_attr = toKill->getKillStatus()); + "killcode"_attr = currentOpCtx->getKillStatus()); } } void IntentRegistry::_waitForDrain(IntentRegistry::Intent intent, - std::chrono::milliseconds timeout) { + std::chrono::milliseconds timeout, + InterruptionType interruption) { + static constexpr auto kRetryKillInterval = std::chrono::milliseconds(100); + auto& tokenMap = _tokenMaps[(size_t)intent]; std::unique_lock lock(tokenMap.lock); - if (timeout.count() && - !tokenMap.cv.wait_for(lock, timeout, [&tokenMap] { return tokenMap.map.empty(); })) { + + auto logAndFassert = [&]() { LOGV2( 9795403, "There are still registered intents", "Intent"_attr = intentToString(intent)); - for (auto& [token, opCtx] : tokenMap.map) { + for (auto& [token, entry] : tokenMap.map) { LOGV2(9795402, "Registered token:", "token_id"_attr = token, - "client"_attr = opCtx->getClient()->desc()); + "client"_attr = entry.client->desc()); } LOGV2_FATAL_CONTINUE(9795404, "Timeout while waiting on intent queue to drain, printing stack " "traces then calling abort() to allow the cluster to progress."); - #if defined(MONGO_STACKTRACE_CAN_DUMP_ALL_THREADS) - // Dump the stack of each thread. printAllThreadStacksBlocking(); #endif - fasserted(9795401); - } else if (!timeout.count()) { - tokenMap.cv.wait(lock, [&tokenMap] { return tokenMap.map.empty(); }); + }; + + if (timeout.count()) { + auto deadline = std::chrono::steady_clock::now() + timeout; + while (!tokenMap.map.empty()) { + auto now = std::chrono::steady_clock::now(); + if (now >= deadline) { + logAndFassert(); + return; + } + auto remaining = std::chrono::duration_cast(deadline - now); + auto waitTime = std::min(remaining, kRetryKillInterval); + tokenMap.cv.wait_for(lock, waitTime, [&tokenMap] { return tokenMap.map.empty(); }); + + if (!tokenMap.map.empty()) { + lock.unlock(); + _killOperationsByIntent(intent, interruption); + lock.lock(); + } + } + } else { + while (!tokenMap.map.empty()) { + tokenMap.cv.wait_for( + lock, kRetryKillInterval, [&tokenMap] { return tokenMap.map.empty(); }); + if (!tokenMap.map.empty()) { + lock.unlock(); + _killOperationsByIntent(intent, interruption); + lock.lock(); + } + } } } diff --git a/src/mongo/db/repl/intent_registry.h b/src/mongo/db/repl/intent_registry.h index dbb1457b484..cbfa143c6e2 100644 --- a/src/mongo/db/repl/intent_registry.h +++ b/src/mongo/db/repl/intent_registry.h @@ -35,10 +35,12 @@ #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/rwmutex.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/modules.h" #include +#include #include #include #include @@ -83,8 +85,20 @@ public: } }; +// Defined in intent_registry.cpp. Removes the per-opCtx write-intent counter from IntentRegistry +// when an OperationContext is destroyed so deferred deregistrations cannot corrupt a new opCtx +// that has been allocated at the same memory address. +class WriteIntentCleanup; + +// Defined in intent_registry.cpp. Deregisters all tokens for a Client when that Client is +// destroyed while a stashed WUOW still holds write-intent tokens, preventing +// _killOperationsByIntent from dereferencing the freed client pointer. +class ClientIntentCleanup; + class IntentRegistry { friend class IntentRegistryTest; + friend class WriteIntentCleanup; + friend class ClientIntentCleanup; public: enum class Intent { @@ -116,6 +130,7 @@ public: Intent intent() const; private: + IntentToken(Intent intent, idType id) : _intent(intent), _id(id) {} static inline AtomicWord _currentTokenId = {}; Intent _intent; idType _id; @@ -179,10 +194,14 @@ public: * that conflict with the ongoing state transtion from registering their * intent, except those that originate from the same OperationContext to allow transition * threads to perform necessary work. + * + * postInterruptionCallback, if provided, is invoked on the async drain thread after active + * conflicting operations are killed, before waiting for the drain to complete. */ std::future killConflictingOperations( InterruptionType interruption, OperationContext* opCtx, + std::function postInterruptionCallback = nullptr, boost::optional timeout_sec = boost::none); /** @@ -212,16 +231,39 @@ public: std::vector getTotalIntentsDeclared() const; + /** + * Deregisters all intent tokens registered by the given client. + */ + void deregisterTokensForClient(Client* client); + + // Deregisters all intent tokens associated with the given session. Called from + // prepareTransaction so prepared transactions don't block the stepdown/shutdown drain. + void deregisterTokensForSession(OperationContext* opCtx, const LogicalSessionId& lsid); + private: + struct TokenMapEntry { + OperationContext* opCtx; + Client* client; + ServiceContext* svcCtx; + uint64_t opId; + boost::optional lsid; + }; + struct tokenMap { mutable std::mutex lock; stdx::condition_variable cv; - absl::flat_hash_map map; + absl::flat_hash_map map; }; bool _validIntent(Intent intent) const; void _killOperationsByIntent(Intent intent, InterruptionType interruption); - void _waitForDrain(Intent intent, std::chrono::milliseconds timeout); + void _waitForDrain(Intent intent, + std::chrono::milliseconds timeout, + InterruptionType interruption); + + // Called by WriteIntentCleanup when an opCtx with active write intents is destroyed. + // Removes the mapping so deferred deregistration callbacks skip the decrement. + void _unregisterWriteCountForOpId(uint64_t opId); bool _enabled = true; RWMutex _stateMutex; @@ -232,6 +274,11 @@ private: Atomic _pendingStateChange = 0; stdx::condition_variable _pendingStateChangeCV; + // Maps opCtx opId -> write intent counter pointer. Removed when the opCtx is destroyed + // (via WriteIntentCleanup) so deferred callbacks don't corrupt a new opCtx's counter. + mutable std::mutex _opIdMutex; + absl::flat_hash_map*> _opIdToWriteCountPtr; + // Tracks number of operations killed on state transition. size_t _totalOpsKilled = 0; }; diff --git a/src/mongo/db/repl/intent_registry_test.cpp b/src/mongo/db/repl/intent_registry_test.cpp index d098f34c91d..11c99a5db87 100644 --- a/src/mongo/db/repl/intent_registry_test.cpp +++ b/src/mongo/db/repl/intent_registry_test.cpp @@ -29,6 +29,10 @@ #include "mongo/db/repl/intent_guard.h" #include "mongo/db/repl/intent_registry_test_fixture.h" +#include "mongo/db/server_feature_flags_gen.h" +#include "mongo/db/shard_role/lock_manager/d_concurrency.h" +#include "mongo/db/shard_role/transaction_resources.h" +#include "mongo/idl/server_parameter_test_controller.h" #include "mongo/stdx/thread.h" #include @@ -171,7 +175,7 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsStepUp) { // killConflictingOperations with a StepUp interruption should reject any attempts to register a // Write Intent while the interruption is ongoing. auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::StepUp, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::StepUp, opCtx.get(), nullptr, timeout_sec); std::this_thread::sleep_for(std::chrono::milliseconds(kPostInterruptSleepMs)); kill.get(); @@ -218,7 +222,7 @@ DEATH_TEST_F(IntentRegistryTestDeathTest, KillConflictingOperationsDrainTimeout, // killConflictingOperations will timeout if there is an existing kill and the intents are not // deregistered within the drain timeout. auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::Shutdown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::Shutdown, opCtx.get(), nullptr, timeout_sec); kill.get(); } @@ -257,7 +261,7 @@ DEATH_TEST_F(IntentRegistryTestDeathTest, auto client = serviceContext->getService()->makeClient(std::to_string(client_i++)); auto opCtx = client->makeOperationContext(); auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::Shutdown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::Shutdown, opCtx.get(), nullptr, timeout_sec); // total deregister time 2.1s > 2s std::this_thread::sleep_for(5s); @@ -299,12 +303,12 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsReleaseGuard) { auto client = serviceContext->getService()->makeClient(std::to_string(client_i++)); auto opCtx = client->makeOperationContext(); auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::StepUp, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::StepUp, opCtx.get(), nullptr, timeout_sec); auto int_guard = kill.get(); int_guard.release(); kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::Shutdown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::Shutdown, opCtx.get(), nullptr, timeout_sec); guards.clear(); kill.get(); } @@ -336,7 +340,7 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsBackToBack) { auto client = serviceContext->getService()->makeClient(std::to_string(client_i++)); auto opCtx = client->makeOperationContext(); auto killsd = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::StepDown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::StepDown, opCtx.get(), nullptr, timeout_sec); // Killing all writes to let stepdown kill finish in separate thread stdx::thread killwrites = stdx::thread([&] { for (auto& guard : guards) { @@ -350,7 +354,7 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsBackToBack) { }); // Another call for kill conflicting ops, will block till above thread finishes; auto killsh = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::Shutdown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::Shutdown, opCtx.get(), nullptr, timeout_sec); guards.clear(); (void)killsh.get(); killwrites.join(); @@ -384,14 +388,14 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsDestroyGuard) { auto client = serviceContext->getService()->makeClient(std::to_string(client_i++)); auto opCtx = client->makeOperationContext(); auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::StepUp, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::StepUp, opCtx.get(), nullptr, timeout_sec); { // Get a guard and immediately destroy to enable additional interrupt auto int_guard = kill.get(); } kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::Shutdown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::Shutdown, opCtx.get(), nullptr, timeout_sec); guards.clear(); kill.get(); } @@ -423,7 +427,7 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsShutdown) { auto client = serviceContext->getService()->makeClient(std::to_string(client_i++)); auto opCtx = client->makeOperationContext(); auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::Shutdown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::Shutdown, opCtx.get(), nullptr, timeout_sec); std::this_thread::sleep_for(std::chrono::milliseconds(kPostInterruptSleepMs)); @@ -504,7 +508,7 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsSameOpCtxCanDeclareIntents) auto client = serviceContext->getService()->makeClient(std::to_string(client_i++)); auto opCtx = client->makeOperationContext(); auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::Shutdown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::Shutdown, opCtx.get(), nullptr, timeout_sec); std::this_thread::sleep_for(std::chrono::milliseconds(kPostInterruptSleepMs)); @@ -572,7 +576,7 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsRollback) { auto client = serviceContext->getService()->makeClient(std::to_string(client_i++)); auto opCtx = client->makeOperationContext(); auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::Rollback, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::Rollback, opCtx.get(), nullptr, timeout_sec); std::this_thread::sleep_for(std::chrono::milliseconds(kPostInterruptSleepMs)); // Any attempt to register an intent during a Rollback interruption should throw an @@ -657,7 +661,7 @@ TEST_F(IntentRegistryTest, KillConflictingOperationsStepDown) { auto client = serviceContext->getService()->makeClient(std::to_string(client_i++)); auto opCtx = client->makeOperationContext(); auto kill = _intentRegistry.killConflictingOperations( - IntentRegistry::InterruptionType::StepDown, opCtx.get(), timeout_sec); + IntentRegistry::InterruptionType::StepDown, opCtx.get(), nullptr, timeout_sec); std::this_thread::sleep_for(std::chrono::milliseconds(kPostInterruptSleepMs)); { auto clientWritePrepared = @@ -748,5 +752,41 @@ TEST_F(IntentRegistryTest, IntegrityRegistryEnableDisable) { executePerIntent(createGuardDuringDisable); } +// Verifies that a Write intent acquired through GlobalLock(MODE_IX) stays live until the +// enclosing WriteUnitOfWork ends, not merely until the GlobalLock object is destroyed. +TEST_F(IntentRegistryTest, WriteIntentLifetimeExtendedThroughWriteUnitOfWork) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagIntentRegistration", true}; + _intentRegistry.enable(); + + auto client = getServiceContext()->getService()->makeClient("test-wuow-intent"); + auto opCtx = client->makeOperationContext(); + + // No write intent at the start. + ASSERT_FALSE(_intentRegistry.hasWriteIntentDeclared(opCtx.get())); + + { + WriteUnitOfWork wuow(opCtx.get()); + + { + // Acquiring a GlobalLock in MODE_IX registers a Write intent. + Lock::GlobalLock globalLock( + opCtx.get(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); + ASSERT_TRUE(globalLock.isLocked()); + ASSERT_TRUE(_intentRegistry.hasWriteIntentDeclared(opCtx.get())); + + // GlobalLock destructs here. The underlying lock is deferred by the WUOW two-phase + // locking mechanism, so the Write intent must be deferred too. + } + + // The GlobalLock object is gone, but the Write intent should still be registered because + // the WUOW has not ended yet. + ASSERT_TRUE(_intentRegistry.hasWriteIntentDeclared(opCtx.get())); + + wuow.commit(); + } + + // Once the WUOW ends, both the lock and the intent are released. + ASSERT_FALSE(_intentRegistry.hasWriteIntentDeclared(opCtx.get())); +} } // namespace mongo diff --git a/src/mongo/db/repl/local_oplog_info.cpp b/src/mongo/db/repl/local_oplog_info.cpp index ea231fdf0ae..bc5fe78d503 100644 --- a/src/mongo/db/repl/local_oplog_info.cpp +++ b/src/mongo/db/repl/local_oplog_info.cpp @@ -35,10 +35,13 @@ // IWYU pragma: no_include "ext/alloc_traits.h" #include "mongo/db/admission/flow_control.h" #include "mongo/db/logical_time.h" +#include "mongo/db/repl/always_allow_non_local_writes.h" +#include "mongo/db/repl/intent_registry.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/rss/replicated_storage_service.h" +#include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/shard_role/transaction_resources.h" #include "mongo/db/storage/oplog_truncate_marker_parameters_gen.h" #include "mongo/db/storage/record_store.h" @@ -53,6 +56,8 @@ #include #include +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + namespace mongo { namespace { @@ -143,6 +148,15 @@ void LocalOplogInfo::setNewTimestamp(ServiceContext* service, const Timestamp& n std::vector LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, std::size_t count, std::size_t opTimeOffset) { + if (gFeatureFlagIntentRegistration.isEnabled() && + !rss::consensus::IntentRegistry::get(opCtx->getServiceContext()) + .hasWriteIntentDeclared(opCtx) && + !repl::alwaysAllowNonLocalWrites(opCtx)) { + LOGV2_FATAL(12436504, + "Attempted to reserve optime without a declared write intent", + "opCtx"_attr = opCtx->getOpID()); + } + auto replCoord = repl::ReplicationCoordinator::get(opCtx); long long term = repl::OpTime::kUninitializedTerm; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 7e3bea59d68..cfcf9c4cb10 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -73,7 +73,6 @@ #include "mongo/db/repl/create_oplog_entry_gen.h" #include "mongo/db/repl/dbcheck/dbcheck.h" #include "mongo/db/repl/image_collection_entry_gen.h" -#include "mongo/db/repl/intent_guard.h" #include "mongo/db/repl/local_oplog_info.h" #include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/db/repl/optime.h" @@ -148,7 +147,6 @@ #include "mongo/util/file.h" #include "mongo/util/processinfo.h" #include "mongo/util/serialization_context.h" -#include "mongo/util/stacktrace.h" #include "mongo/util/str.h" #include "mongo/util/string_map.h" #include "mongo/util/version/releases.h" @@ -578,23 +576,6 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { WriteUnitOfWork wuow(opCtx); if (slot.isNull()) { - // Declaring Write intent ensures we are the primary node and this operation will be - // interrupted by StepDown. Only a primary node should be able to allocate optimes for new - // entries in the oplog. - boost::optional writeGuard; - if (gFeatureFlagIntentRegistration.isEnabled() && - !rss::consensus::IntentRegistry::get(opCtx->getServiceContext()) - .hasWriteIntentDeclared(opCtx) && - !replCoord->isOplogDisabledFor(opCtx, oplogEntry->getNss()) && - !alwaysAllowNonLocalWrites(opCtx)) { - printStackTrace(); - LOGV2_ERROR(11006000, - "Could not acquire write intent when trying to reserve optime", - "opCtx"_attr = opCtx->getOpID(), - "nss"_attr = oplogEntry->getNss().toStringForErrorMsg(), - "opType"_attr = idl::serialize(oplogEntry->getOpType())); - } - slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0]; // It would be better to make the oplogEntry a const reference. But because in some cases, a // new OpTime needs to be assigned within the WUOW as explained earlier, we instead pass diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 99c20afd648..293b11b1b99 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3902,7 +3902,21 @@ Status ReplicationCoordinatorImpl::_doReplSetReconfig(OperationContext* opCtx, rss::consensus::ReplicationStateTransitionGuard ReplicationCoordinatorImpl::_killConflictingOperations( rss::consensus::IntentRegistry::InterruptionType interrupt, OperationContext* opCtx) { - return _intentRegistry.killConflictingOperations(interrupt, opCtx).get(); + auto svcCtx = opCtx->getServiceContext(); + auto callback = [svcCtx] { + // Kill unprepared transactions to release intents whose lifetimes were extended by the WUOW + // but do not belong to an active opCtx. + auto client = + svcCtx->getService()->makeClient("KillSessionsForStepDown", Client::noSession()); + AlternativeClientRegion acr(client); + auto killOpCtx = cc().makeOperationContext(); + shard_role_details::getRecoveryUnit(killOpCtx.get())->setNoEvictionAfterCommitOrRollback(); + SessionKiller::Matcher matcher( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(killOpCtx.get())}); + killSessionsAbortUnpreparedTransactions( + killOpCtx.get(), matcher, ErrorCodes::InterruptedDueToReplStateChange); + }; + return _intentRegistry.killConflictingOperations(interrupt, opCtx, std::move(callback)).get(); } void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx, diff --git a/src/mongo/db/repl/replication_coordinator_impl_step_up_step_down.cpp b/src/mongo/db/repl/replication_coordinator_impl_step_up_step_down.cpp index b318b96ecd5..42b16124a54 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_step_up_step_down.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_step_up_step_down.cpp @@ -36,7 +36,6 @@ #include "mongo/db/repl/replication_metrics.h" #include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/session/kill_sessions_local.h" -#include "mongo/db/session/session_killer.h" #include "mongo/db/shard_role/lock_manager/dump_lock_manager.h" #include "mongo/db/storage/execution_context.h" #include "mongo/db/storage/prepare_conflict_tracker.h" diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 09a00ad1157..630b43aff61 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -398,7 +398,24 @@ Status RollbackImpl::_transitionToRollback(OperationContext* opCtx) { .killConflictingOperations( rss::consensus::IntentRegistry::InterruptionType::Rollback, opCtx, - 0 /* no timeout */) + [svcCtx = opCtx->getServiceContext()] { + // Kill unprepared transactions to release intents whose + // lifetimes were extended by the WUOW but do not belong to + // an active opCtx. + auto client = svcCtx->getService()->makeClient( + "KillSessionsForRollback", Client::noSession()); + AlternativeClientRegion acr(client); + auto killOpCtx = cc().makeOperationContext(); + shard_role_details::getRecoveryUnit(killOpCtx.get()) + ->setNoEvictionAfterCommitOrRollback(); + SessionKiller::Matcher matcher(KillAllSessionsByPatternSet{ + makeKillAllSessionsByPattern(killOpCtx.get())}); + killSessionsAbortUnpreparedTransactions( + killOpCtx.get(), + matcher, + ErrorCodes::InterruptedDueToReplStateChange); + }, + boost::optional{0} /* no timeout */) .get()); } rstlLock.emplace(opCtx, MODE_X, ReplicationStateTransitionLockGuard::EnqueueOnly()); diff --git a/src/mongo/db/shard_role/lock_manager/d_concurrency.cpp b/src/mongo/db/shard_role/lock_manager/d_concurrency.cpp index d3bab4a4692..1bdd5fa51c5 100644 --- a/src/mongo/db/shard_role/lock_manager/d_concurrency.cpp +++ b/src/mongo/db/shard_role/lock_manager/d_concurrency.cpp @@ -222,10 +222,29 @@ Lock::GlobalLock::~GlobalLock() { // prevent lock release. const bool willReleaseLock = !locker->isGlobalLockedRecursively() && !locker->inAWriteUnitOfWork(); + + // The lock will be two-phase deferred when we are in a WUOW and this is not a recursive + // unlock. Computed before _unlock() modifies lock state. + const bool lockWillBeDeferred = + !locker->isGlobalLockedRecursively() && locker->inAWriteUnitOfWork(); + if (willReleaseLock) { shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot(); } _unlock(); + + // Mirror the lock deferral for the intent: keep the IntentGuard alive until the WUOW + // ends so the intent lifetime matches the global lock lifetime. The guard is wrapped in a + // shared_ptr because the Change lambdas must be copyable. + if (lockWillBeDeferred && _guard) { + auto sharedGuard = std::make_shared(std::move(*_guard)); + _guard = boost::none; + shard_role_details::getRecoveryUnit(_opCtx)->registerChange( + [sharedGuard](OperationContext*, boost::optional) noexcept { + sharedGuard->reset(); + }, + [sharedGuard](OperationContext*) noexcept { sharedGuard->reset(); }); + } } if (!_skipRSTLLock && (lockResult == LOCK_OK || lockResult == LOCK_WAITING)) { diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index b6c48bbc175..41dd50ad29f 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -2088,6 +2088,9 @@ TransactionParticipant::Participant::prepareTransaction( // RSTL. const bool unlocked = shard_role_details::getLocker(opCtx)->unlockRSTLforPrepare(); invariant(unlocked || gFeatureFlagIntentRegistration.isEnabled()); + if (gFeatureFlagIntentRegistration.isEnabled()) { + rss::consensus::IntentRegistry::get(opCtx).deregisterTokensForSession(opCtx, _sessionId()); + } return {prepareOplogSlot.getTimestamp(), o().affectedNamespaces}; } @@ -2145,6 +2148,9 @@ void TransactionParticipant::Participant::restorePreparedTxnFromPreciseCheckpoin const bool unlocked = shard_role_details::getLocker(opCtx)->unlockRSTLforPrepare(); invariant(unlocked || gFeatureFlagIntentRegistration.isEnabled()); + if (gFeatureFlagIntentRegistration.isEnabled()) { + rss::consensus::IntentRegistry::get(opCtx).deregisterTokensForSession(opCtx, _sessionId()); + } } void TransactionParticipant::Participant::setPrepareOpTimeForRecovery(OperationContext* opCtx, @@ -2222,6 +2228,11 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC "commitTransaction must provide commitTimestamp to prepared transaction.", !o().txnState.isPrepared()); + boost::optional txnGuard; + if (gFeatureFlagIntentRegistration.isEnabled()) { + txnGuard.emplace(rss::consensus::IntentRegistry::Intent::Write, opCtx); + } + auto* txnOps = retrieveCompletedTransactionOperations(opCtx); auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver);