From 9c69faba5cbb52201b2bd014b561c4f3f3f02f22 Mon Sep 17 00:00:00 2001 From: Myles <1308306+my1es@users.noreply.github.com> Date: Tue, 6 May 2025 17:30:12 -0500 Subject: [PATCH] SERVER-103841: transaction coordinator lifetime (#35591) GitOrigin-RevId: dd34f52ddbb0df6d213462c96573f1094345da87 --- src/mongo/db/s/transaction_coordinator.cpp | 20 +++- src/mongo/db/s/transaction_coordinator.h | 14 ++- .../transaction_coordinator_catalog_test.cpp | 84 ++++--------- .../db/s/transaction_coordinator_service.cpp | 51 +++++--- .../db/s/transaction_coordinator_service.h | 16 ++- .../db/s/transaction_coordinator_test.cpp | 113 +++++------------- 6 files changed, 130 insertions(+), 168 deletions(-) diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index e0cdb544c2f..4fbf6792c59 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -55,6 +55,7 @@ #include "mongo/db/s/single_transaction_coordinator_stats.h" #include "mongo/db/s/transaction_coordinator.h" #include "mongo/db/s/transaction_coordinator_metrics_observer.h" +#include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/db/s/transaction_coordinator_util.h" #include "mongo/db/server_options.h" #include "mongo/db/vector_clock_mutable.h" @@ -150,8 +151,7 @@ TransactionCoordinator::TransactionCoordinator( const LogicalSessionId& lsid, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, std::unique_ptr scheduler, - Date_t deadline, - const CancellationToken& cancelToken) + Date_t deadline) : _serviceContext(operationContext->getServiceContext()), _lsid(lsid), _txnNumberAndRetryCounter(txnNumberAndRetryCounter), @@ -159,8 +159,7 @@ TransactionCoordinator::TransactionCoordinator( _sendPrepareScheduler(_scheduler->makeChildScheduler()), _transactionCoordinatorMetricsObserver( std::make_unique()), - _deadline(deadline), - _cancelToken(cancelToken) { + _deadline(deadline) { invariant(_txnNumberAndRetryCounter.getTxnRetryCounter()); } @@ -168,6 +167,11 @@ void TransactionCoordinator::start(OperationContext* operationContext) { invariant(_scheduler, "TransactionCoordinator shutdown before starting"); invariant(_sendPrepareScheduler, "TransactionCoordinator shutdown before starting"); + onCompletion().unsafeToInlineFuture().getAsync([this, self = shared_from_this()](auto) { + auto* tcs = TransactionCoordinatorService::get(_serviceContext); + tcs->notifyCoordinatorFinished(self); + }); + auto apiParams = APIParameters::get(operationContext); auto kickOffCommitPF = makePromiseFuture(); _kickOffCommitPromise = std::move(kickOffCommitPF.promise); @@ -244,7 +248,7 @@ void TransactionCoordinator::start(OperationContext* operationContext) { std::move(opTime), _lsid, _txnNumberAndRetryCounter, - _cancelToken); + _cancellationSource.token()); }) .thenRunOn(_scheduler->getExecutor()) .then([this, self = shared_from_this(), apiParams] { @@ -360,7 +364,7 @@ void TransactionCoordinator::start(OperationContext* operationContext) { std::move(opTime), _lsid, _txnNumberAndRetryCounter, - _cancelToken); + _cancellationSource.token()); }) .then([this, self = shared_from_this(), apiParams] { { @@ -534,6 +538,10 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() { "Transaction exceeded deadline or newer transaction started"}); } +void TransactionCoordinator::cancel() { + _cancellationSource.cancel(); +} + bool TransactionCoordinator::_reserveKickOffCommitPromise() { stdx::lock_guard lg(_mutex); if (_kickOffCommitPromiseSet) diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index ed7d48bae27..a84896bb117 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -99,8 +99,7 @@ public: const LogicalSessionId& lsid, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, std::unique_ptr scheduler, - Date_t deadline, - const CancellationToken& cancelToken); + Date_t deadline); ~TransactionCoordinator(); @@ -160,6 +159,13 @@ public: */ void cancelIfCommitNotYetStarted(); + /** + * Cancels the owned cancellation token which interrupts/cancels all associated + * `WaitForMajority` invocations under this coordinator. typically invoked only by the + * TransactionCoordinatorService during stepdown. + */ + void cancel(); + TxnRetryCounter getTxnRetryCounterForTest() const { return *_txnNumberAndRetryCounter.getTxnRetryCounter(); } @@ -253,8 +259,8 @@ private: // The deadline for the TransactionCoordinator to reach a decision Date_t _deadline; - // The cancellation token for WaitForMajority. - const CancellationToken _cancelToken; + // The cancellation source for WaitForMajority. + CancellationSource _cancellationSource; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp index a60e53f11b3..c1a05d815fd 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp @@ -78,15 +78,13 @@ protected: void createCoordinatorInCatalog(OperationContext* opCtx, LogicalSessionId lsid, - TxnNumberAndRetryCounter txnNumberAndRetryCounter, - CancellationToken cancelToken) { + TxnNumberAndRetryCounter txnNumberAndRetryCounter) { auto newCoordinator = std::make_shared( operationContext(), lsid, txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - cancelToken); + Date_t::max()); newCoordinator->start(operationContext()); _coordinatorCatalog->insert(opCtx, lsid, txnNumberAndRetryCounter, newCoordinator); @@ -107,9 +105,7 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{2, 0}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); auto coordinatorInCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter2); ASSERT(coordinatorInCatalog == nullptr); @@ -120,9 +116,7 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); auto coordinatorInCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter2); ASSERT(coordinatorInCatalog == nullptr); @@ -131,9 +125,7 @@ TEST_F(TransactionCoordinatorCatalogTest, TEST_F(TransactionCoordinatorCatalogTest, CreateFollowedByGetReturnsCoordinator) { LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter{1, 0}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter); auto coordinatorInCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter); ASSERT(coordinatorInCatalog != nullptr); @@ -146,10 +138,8 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{2, 0}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2); auto coordinator1InCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1); @@ -164,19 +154,16 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - // Can only create a new TransactionCoordinator after the previous TransactionCoordinator with // the same txnNumber has reached abort decision. - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); auto coordinator1InCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1); coordinator1InCatalog->runCommit(operationContext(), kOneShardIdList); assertCommandSentAndRespondWith("prepareTransaction", kNoSuchTransaction, boost::none); coordinator1InCatalog->getDecision().wait(); - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2); coordinator1InCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1); @@ -201,11 +188,9 @@ DEATH_TEST_F(TransactionCoordinatorCatalogTest, "Invariant failure") { LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter{1, 0}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter); // Re-creating w/ same session id and txn number should cause invariant failure - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter); } TEST_F(TransactionCoordinatorCatalogTest, GetLatestOnSessionWithNoCoordinatorsReturnsNone) { @@ -220,10 +205,8 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{2, 0}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2); auto latestTxnNumberRetryCounterAndCoordinator = _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); @@ -241,19 +224,17 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; // Can only create a new TransactionCoordinator after the previous TransactionCoordinator with // the same txnNumber has reached abort decision. - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); auto coordinator1InCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1); coordinator1InCatalog->runCommit(operationContext(), kOneShardIdList); assertCommandSentAndRespondWith("prepareTransaction", kNoSuchTransaction, boost::none); coordinator1InCatalog->getDecision().wait(); - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2); auto latestTxnNumberRetryCounterAndCoordinator = _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); @@ -275,9 +256,7 @@ TEST_F(TransactionCoordinatorCatalogTest, TEST_F(TransactionCoordinatorCatalogTest, CoordinatorsRemoveThemselvesFromCatalogWhenTheyComplete) { LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter{1, 0}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter); auto coordinator = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter); coordinator->cancelIfCommitNotYetStarted(); @@ -298,11 +277,9 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{2, 0}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2); auto latestTxnNumberRetryCounterAndCoordinator = _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); @@ -321,19 +298,17 @@ TEST_F( LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; // Can only create a new TransactionCoordinator after the previous TransactionCoordinator with // the same txnNumber has reached abort decision. - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); auto coordinator1InCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1); coordinator1InCatalog->runCommit(operationContext(), kOneShardIdList); assertCommandSentAndRespondWith("prepareTransaction", kNoSuchTransaction, boost::none); coordinator1InCatalog->getDecision().wait(); - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2); auto latestTxnNumberRetryCounterAndCoordinator = _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); @@ -358,10 +333,8 @@ TEST_F( LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); auto coordinator1 = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1); coordinator1->runCommit(operationContext(), kOneShardIdList); @@ -371,8 +344,7 @@ TEST_F( lsid, txnNumberAndRetryCounter2, std::make_unique(getServiceContext()), - Date_t::max(), - cancelToken); + Date_t::max()); coordinator2->start(operationContext()); ASSERT_THROWS_CODE(_coordinatorCatalog->insert( @@ -399,10 +371,8 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0}; TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; - createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken); + createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1); auto coordinator1 = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1); coordinator1->runCommit(operationContext(), kOneShardIdList); @@ -414,8 +384,7 @@ TEST_F(TransactionCoordinatorCatalogTest, lsid, txnNumberAndRetryCounter2, std::make_unique(getServiceContext()), - Date_t::max(), - cancelToken); + Date_t::max()); coordinator2->start(operationContext()); ASSERT_THROWS_CODE(_coordinatorCatalog->insert( @@ -438,8 +407,6 @@ TEST_F(TransactionCoordinatorCatalogTest, TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoCatalog) { LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumberAndRetryCounter txnNumberAndRetryCounter{1, 0}; - CancellationSource cancelSource; - CancellationToken cancelToken{cancelSource.token()}; txn::AsyncWorkScheduler aws(getServiceContext()); TransactionCoordinatorCatalog catalog; @@ -449,8 +416,7 @@ TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoC lsid, txnNumberAndRetryCounter, aws.makeChildScheduler(), - network()->now() + Seconds{5}, - cancelToken); + network()->now() + Seconds{5}); coordinator->start(operationContext()); aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Test step down"}); diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index 922fb9857c0..7f33ce38fe1 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -103,12 +103,12 @@ void TransactionCoordinatorService::createCoordinator( latestCoordinator->cancelIfCommitNotYetStarted(); } - auto coordinator = std::make_shared(opCtx, - lsid, - txnNumberAndRetryCounter, - scheduler.makeChildScheduler(), - commitDeadline, - _cancelSource.token()); + auto coordinator = std::make_shared( + opCtx, lsid, txnNumberAndRetryCounter, scheduler.makeChildScheduler(), commitDeadline); + { + stdx::lock_guard lock(_mutex); + _activeTransactionCoordinators.insert(coordinator); + } coordinator->start(opCtx); try { @@ -237,8 +237,7 @@ void TransactionCoordinatorService::_scheduleRecoveryTask(OperationContext* opCt _catalogAndScheduler->scheduler .scheduleWorkIn( recoveryDelay, - [catalogAndScheduler = _catalogAndScheduler, - cancelSource = _cancelSource](OperationContext* opCtx) { + [catalogAndScheduler = _catalogAndScheduler](OperationContext* opCtx) { if (MONGO_unlikely(hangBeforeTxnCoordinatorOnStepUpWork.shouldFail())) { LOGV2(8288301, "Hit hangBeforeTxnCoordinatorOnStepUpWork failpoint"); hangBeforeTxnCoordinatorOnStepUpWork.pauseWhileSet(opCtx); @@ -300,8 +299,7 @@ void TransactionCoordinatorService::_scheduleRecoveryTask(OperationContext* opCt lsid, TxnNumberAndRetryCounter{txnNumber, txnRetryCounter}, scheduler.makeChildScheduler(), - clockSource->now() + Seconds(gTransactionLifetimeLimitSeconds.load()), - cancelSource.token()); + clockSource->now() + Seconds(gTransactionLifetimeLimitSeconds.load())); coordinator->start(opCtx); catalog.insert(opCtx, @@ -347,7 +345,6 @@ void TransactionCoordinatorService::initializeIfNeeded(OperationContext* opCtx, LOGV2(9307801, "Creating the transaction catalog and scheduler for primary node."); _catalogAndScheduler = std::make_shared(opCtx->getServiceContext()); - _cancelSource = CancellationSource(); _initTerm = term; _scheduleRecoveryTask(opCtx, recoveryDelay); @@ -363,16 +360,28 @@ void TransactionCoordinatorService::initializeIfNeeded(OperationContext* opCtx, } void TransactionCoordinatorService::interrupt() { + std::vector> coordinatorsToCancel; + { stdx::lock_guard lg(_mutex); - if (!_catalogAndScheduler) - return; - - _catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler); + if (_catalogAndScheduler) { + _catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler); + } + for (auto& ptr : _activeTransactionCoordinators) { + if (auto shared = ptr.lock(); shared) { + coordinatorsToCancel.emplace_back(std::move(shared)); + } + } + _activeTransactionCoordinators.clear(); } - _cancelSource.cancel(); - _catalogAndSchedulerToCleanup->interrupt(); + if (_catalogAndSchedulerToCleanup) { + _catalogAndSchedulerToCleanup->interrupt(); + } + + for (auto& ptr : coordinatorsToCancel) { + ptr->cancel(); + } } void TransactionCoordinatorService::shutdown() { @@ -425,4 +434,12 @@ void TransactionCoordinatorService::CatalogAndScheduler::join() { catalog.join(); } +void TransactionCoordinatorService::notifyCoordinatorFinished( + const std::shared_ptr coordinator) { + std::lock_guard lock(_mutex); + // we don't need to know or care if we actually erased this weak ptr. its valid for this + // service to cancel and clear its set of coordinators and have already erased them when + // this continuation executes. all we're trying to do is bound memory usage. + _activeTransactionCoordinators.erase(coordinator); +} } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h index 4e6dd0daad5..e5278e5d054 100644 --- a/src/mongo/db/s/transaction_coordinator_service.h +++ b/src/mongo/db/s/transaction_coordinator_service.h @@ -137,6 +137,12 @@ public: */ void shutdown(); + /** + * Notifies this service that the provided TransactionCoordinator is finished and no longer + * needs to be interrupted when the service is interrupted. + */ + void notifyCoordinatorFinished(std::shared_ptr); + protected: struct CatalogAndScheduler { CatalogAndScheduler(ServiceContext* service) : scheduler(service) {} @@ -207,8 +213,14 @@ private: // Set to true during initialization to avoid multiple thread attempting to initialize at once. bool _isInitializing{false}; - // Used to cancel WaitForMajority for TransactionCoordinator when this service gets interrupted. - CancellationSource _cancelSource; + // tracks active transactionCoordinators to be interrupted on step-down. previously they were + // tracked implicitly through futures associated with the above CancellationSource, but that + // does not provide a way to deregister those futures when sub sources complete. + // NOTE: this must be an ordered container because std::weak_ptr only exposes an ordering + // operator, it does not expose an address or other control block information that would allow + // storage in a hash-based container. + std::set, std::owner_less<>> + _activeTransactionCoordinators; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index d0d9aaecc35..af15899d8c7 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -260,8 +260,6 @@ protected: LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; TxnNumberAndRetryCounter _txnNumberAndRetryCounter{1, 1}; - CancellationSource cancelSource; - CancellationToken _cancelToken{cancelSource.token()}; }; class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase { @@ -1244,8 +1242,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitRes _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1271,8 +1268,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommi _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1300,8 +1296,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1329,8 +1324,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1357,8 +1351,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortRe _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1384,8 +1377,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1418,8 +1410,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1449,8 +1440,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1485,8 +1475,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator->getDecision(); @@ -1518,8 +1507,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesEndOfTransactionOplogEntry) _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kOneShardIdList); assertPrepareSentAndRespondWithSuccess(); @@ -1704,8 +1692,7 @@ protected: _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); @@ -1886,8 +1873,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) { _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); const auto& stats = coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2063,8 +2049,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) { _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); const auto& stats = coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2110,12 +2095,8 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina auto aws = std::make_unique(getServiceContext()); auto awsPtr = aws.get(); - auto coordinator = std::make_shared(operationContext(), - _lsid, - _txnNumberAndRetryCounter, - std::move(aws), - Date_t::max(), - _cancelToken); + auto coordinator = std::make_shared( + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); coordinator->start(operationContext()); const auto& stats = coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2162,12 +2143,8 @@ TEST_F(TransactionCoordinatorMetricsTest, expectedMetrics.totalCreated++; auto aws = std::make_unique(getServiceContext()); - auto coordinator = std::make_shared(operationContext(), - _lsid, - _txnNumberAndRetryCounter, - std::move(aws), - Date_t::max(), - _cancelToken); + auto coordinator = std::make_shared( + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); coordinator->start(operationContext()); const auto& stats = coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2234,12 +2211,8 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique(getServiceContext()); auto awsPtr = aws.get(); - auto coordinator = std::make_shared(operationContext(), - _lsid, - _txnNumberAndRetryCounter, - std::move(aws), - Date_t::max(), - _cancelToken); + auto coordinator = std::make_shared( + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); coordinator->start(operationContext()); const auto& stats = coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2305,12 +2278,8 @@ TEST_F(TransactionCoordinatorMetricsTest, expectedMetrics.totalCreated++; auto aws = std::make_unique(getServiceContext()); - auto coordinator = std::make_shared(operationContext(), - _lsid, - _txnNumberAndRetryCounter, - std::move(aws), - Date_t::max(), - _cancelToken); + auto coordinator = std::make_shared( + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); coordinator->start(operationContext()); const auto& stats = coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2384,12 +2353,8 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique(getServiceContext()); auto awsPtr = aws.get(); - auto coordinator = std::make_shared(operationContext(), - _lsid, - _txnNumberAndRetryCounter, - std::move(aws), - Date_t::max(), - _cancelToken); + auto coordinator = std::make_shared( + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); coordinator->start(operationContext()); const auto& stats = coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2462,12 +2427,8 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina auto aws = std::make_unique(getServiceContext()); auto awsPtr = aws.get(); - auto coordinator = std::make_shared(operationContext(), - _lsid, - _txnNumberAndRetryCounter, - std::move(aws), - Date_t::max(), - _cancelToken); + auto coordinator = std::make_shared( + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); coordinator->start(operationContext()); const auto& stats = coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2547,8 +2508,7 @@ TEST_F(TransactionCoordinatorMetricsTest, _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::now() + Seconds(1), - _cancelToken); + Date_t::now() + Seconds(1)); coordinator->start(operationContext()); // Wait until the coordinator is writing the participant list. @@ -2621,8 +2581,7 @@ TEST_F(TransactionCoordinatorMetricsTest, DoesNotLogTransactionsUnderSlowMSThres _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); @@ -2658,8 +2617,7 @@ TEST_F( _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); tickSource()->advance(Milliseconds(101)); @@ -2693,8 +2651,7 @@ TEST_F(TransactionCoordinatorMetricsTest, LogsTransactionsOverSlowMSThreshold) { _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); @@ -2745,8 +2702,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesTerminationCauseFor _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); coordinator->runCommit(operationContext(), kTwoShardIdList); @@ -2800,8 +2756,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesStepDurationsAndTot _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); { @@ -2909,8 +2864,7 @@ TEST_F(TransactionCoordinatorMetricsTest, RecoveryFromFailureIndicatedInReportSt _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); const auto assertRecoveryFlag = [opCtx = operationContext(), @@ -2951,8 +2905,7 @@ TEST_F(TransactionCoordinatorMetricsTest, ClientInformationIncludedInReportState _lsid, _txnNumberAndRetryCounter, std::make_unique(getServiceContext()), - Date_t::max(), - _cancelToken); + Date_t::max()); coordinator->start(operationContext()); {