diff --git a/src/mongo/db/commands/shutdown_d.cpp b/src/mongo/db/commands/shutdown_d.cpp index 4f842ab644d..7d2fdd953d6 100644 --- a/src/mongo/db/commands/shutdown_d.cpp +++ b/src/mongo/db/commands/shutdown_d.cpp @@ -89,7 +89,7 @@ Status stepDownForShutdown(OperationContext* opCtx, // Even if the ReplicationCoordinator failed to step down, ensure we still interrupt the // TransactionCoordinatorService (see SERVER-45009). - TransactionCoordinatorService::get(opCtx)->interrupt(); + TransactionCoordinatorService::get(opCtx)->interruptForStepDown(); } return Status::OK(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index eb1e88ac6f3..fe91d72e5c5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -1008,12 +1008,12 @@ void ReplicationCoordinatorExternalStateImpl::onStepDownHook() { void ReplicationCoordinatorExternalStateImpl::_shardingOnStepDownHook() { if (serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer)) { PeriodicShardedIndexConsistencyChecker::get(_service).onStepDown(); - TransactionCoordinatorService::get(_service)->interrupt(); + TransactionCoordinatorService::get(_service)->interruptForStepDown(); } if (ShardingState::get(_service)->enabled()) { if (!serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer)) { // Called earlier for config servers. - TransactionCoordinatorService::get(_service)->interrupt(); + TransactionCoordinatorService::get(_service)->interruptForStepDown(); } FilteringMetadataCache::get(_service)->onStepDown(); diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp index 33519c4d7a9..cd1a45b8ac1 100644 --- a/src/mongo/db/repl/wait_for_majority_service.cpp +++ b/src/mongo/db/repl/wait_for_majority_service.cpp @@ -82,11 +82,6 @@ std::unique_ptr makeThreadPool(StringData readOrWrite) { options.maxThreads = 2; return std::make_unique(options); } -inline Status waitUntilMajorityCanceledStatus() { - static StaticImmortal s = - Status{ErrorCodes::CallbackCanceled, "WaitForMajorityService::waitUntilMajority canceled"}; - return *s; -} } // namespace WaitForMajorityService::~WaitForMajorityService() { @@ -188,7 +183,8 @@ SemiFuture WaitForMajorityServiceImplBase::waitUntilMajority( } if (cancelToken.isCanceled()) { - return {SemiFuture::makeReady(waitUntilMajorityCanceledStatus())}; + return { + SemiFuture::makeReady(WaitForMajorityService::waitUntilMajorityCanceledStatus())}; } const bool wasEmpty = _queuedOpTimes.empty(); @@ -217,7 +213,7 @@ SemiFuture WaitForMajorityServiceImplBase::waitUntilMajority( } auto clientGuard = _waitForMajorityCancellationClient->bind(); if (!request->hasBeenProcessed.swap(true)) { - request->result.setError(waitUntilMajorityCanceledStatus()); + request->result.setError(WaitForMajorityService::waitUntilMajorityCanceledStatus()); stdx::lock_guard lk(_mutex); auto it = std::find_if( std::begin(_queuedOpTimes), diff --git a/src/mongo/db/repl/wait_for_majority_service.h b/src/mongo/db/repl/wait_for_majority_service.h index f2f5a3c2a5f..e5ab376b2ba 100644 --- a/src/mongo/db/repl/wait_for_majority_service.h +++ b/src/mongo/db/repl/wait_for_majority_service.h @@ -181,6 +181,11 @@ public: static WaitForMajorityService& get(ServiceContext* service); + static Status waitUntilMajorityCanceledStatus() { + return Status{ErrorCodes::CallbackCanceled, + "WaitForMajorityService::waitUntilMajority canceled"}; + } + /** * Sets up the background thread pool responsible for waiting for opTimes to be majority * committed. diff --git a/src/mongo/db/s/balancer/auto_merger_policy_test.cpp b/src/mongo/db/s/balancer/auto_merger_policy_test.cpp index 74a0ad236c5..a64dd98623b 100644 --- a/src/mongo/db/s/balancer/auto_merger_policy_test.cpp +++ b/src/mongo/db/s/balancer/auto_merger_policy_test.cpp @@ -90,7 +90,7 @@ protected: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp index f79be89afb5..cd5870a87b8 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp @@ -164,7 +164,7 @@ protected: _skipUpdatingCardinalityParamFP->setMode(FailPoint::off); _skipBlockingDDLCoordinatorsDuringAddAndRemoveShardFP->setMode(FailPoint::off); WaitForMajorityService::get(getServiceContext()).shutDown(); - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp index 2f61813273f..afe2b7963bd 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp @@ -95,7 +95,7 @@ class ShardingCatalogManagerBumpCollectionPlacementVersionAndChangeMetadataTest } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp index ae0c2222c3a..c35b0f66292 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp @@ -95,7 +95,7 @@ protected: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); WaitForMajorityService::get(getServiceContext()).shutDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_config_initialization_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_config_initialization_test.cpp index 72696152763..87f51f89a72 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_config_initialization_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_config_initialization_test.cpp @@ -122,7 +122,7 @@ protected: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); WaitForMajorityService::get(getServiceContext()).shutDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp index 070d573805f..926ed4c7b25 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp @@ -101,7 +101,7 @@ protected: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } @@ -702,7 +702,7 @@ protected: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp index 30ee53f42b2..da4d3afc22e 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp @@ -126,7 +126,7 @@ protected: void tearDown() override { _skipUpdatingCardinalityParamFP->setMode(FailPoint::off); _skipBlockingDDLCoordinatorsDuringAddAndRemoveShardFP->setMode(FailPoint::off); - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp index abf052e5ebd..a7942a628d6 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp @@ -87,7 +87,7 @@ protected: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/placement_history_bm.cpp b/src/mongo/db/s/placement_history_bm.cpp index 7866665adce..8c941c18753 100644 --- a/src/mongo/db/s/placement_history_bm.cpp +++ b/src/mongo/db/s/placement_history_bm.cpp @@ -100,7 +100,7 @@ public: } ~BenchmarkConfigServerTestFixture() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); WaitForMajorityService::get(getServiceContext()).shutDown(); ShardingCatalogManager::get(operationContext())->shutDown(); ConfigServerTestFixture::tearDown(); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp index 7950c4999e7..317fa1d81dc 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp @@ -311,7 +311,7 @@ public: void tearDown() override { globalFailPointRegistry().disableAllFailpoints(); - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); WaitForMajorityService::get(getServiceContext()).shutDown(); ConfigServerTestFixture::tearDown(); _registry->onShutdown(); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 26e901dcaea..ce6fb16d352 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -171,7 +171,7 @@ protected: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/sharding_catalog_client_aggregations_test.cpp b/src/mongo/db/s/sharding_catalog_client_aggregations_test.cpp index 09b1b532774..621e1c65bf1 100644 --- a/src/mongo/db/s/sharding_catalog_client_aggregations_test.cpp +++ b/src/mongo/db/s/sharding_catalog_client_aggregations_test.cpp @@ -112,7 +112,7 @@ public: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); WaitForMajorityService::get(getServiceContext()).shutDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/sharding_ddl_util_test.cpp b/src/mongo/db/s/sharding_ddl_util_test.cpp index 199e2ae01fc..e480e0a17e1 100644 --- a/src/mongo/db/s/sharding_ddl_util_test.cpp +++ b/src/mongo/db/s/sharding_ddl_util_test.cpp @@ -103,7 +103,7 @@ private: } void tearDown() override { - TransactionCoordinatorService::get(operationContext())->interrupt(); + TransactionCoordinatorService::get(operationContext())->interruptForStepDown(); ConfigServerTestFixture::tearDown(); } diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index d88c9f76f96..7ec1a04156c 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -244,7 +244,7 @@ void TransactionCoordinator::start(OperationContext* operationContext) { std::move(opTime), _lsid, _txnNumberAndRetryCounter, - _cancellationSource.token()); + _waitForMajorityCancellationSourceOnStepDown.token()); }) .thenRunOn(_scheduler->getExecutor()) .then([this, self = shared_from_this(), apiParams] { @@ -354,13 +354,14 @@ void TransactionCoordinator::start(OperationContext* operationContext) { }) .then([this, self = shared_from_this()](repl::OpTime opTime) { setDecisionPromise(*_decision, _decisionPromise); - return waitForMajorityWithHangFailpoint(_serviceContext, - hangBeforeWaitingForDecisionWriteConcern, - "hangBeforeWaitingForDecisionWriteConcern", - std::move(opTime), - _lsid, - _txnNumberAndRetryCounter, - _cancellationSource.token()); + return waitForMajorityWithHangFailpoint( + _serviceContext, + hangBeforeWaitingForDecisionWriteConcern, + "hangBeforeWaitingForDecisionWriteConcern", + std::move(opTime), + _lsid, + _txnNumberAndRetryCounter, + _waitForMajorityCancellationSourceOnStepDown.token()); }) .then([this, self = shared_from_this(), apiParams] { { @@ -545,8 +546,8 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() { "Transaction exceeded deadline or newer transaction started"}); } -void TransactionCoordinator::cancel() { - _cancellationSource.cancel(); +void TransactionCoordinator::cancelForStepDown() { + _waitForMajorityCancellationSourceOnStepDown.cancel(); } bool TransactionCoordinator::_reserveKickOffCommitPromise() { @@ -569,6 +570,18 @@ void TransactionCoordinator::_done(Status status) { << "Coordinator " << _lsid << ':' << _txnNumberAndRetryCounter.toBSON() << " stopped due to: " << status.reason()); + // The 'CallbackCanceled' error code may only terminate a TransactionCoordinator continuation if + // the coordinator is stepping down from primary while awaiting majority write concern. + if (status == ErrorCodes::CallbackCanceled) { + invariant(status.reason() == + WaitForMajorityService::waitUntilMajorityCanceledStatus().reason()); + invariant(_waitForMajorityCancellationSourceOnStepDown.token().isCanceled()); + status = Status(ErrorCodes::InterruptedDueToReplStateChange, + str::stream() + << "Coordinator " << _lsid << ':' << _txnNumberAndRetryCounter.toBSON() + << " stopped due to stepDown with: " << status.reason()); + } + LOGV2_DEBUG(22447, 3, "Two-phase commit completed", @@ -605,6 +618,20 @@ void TransactionCoordinator::_done(Status status) { } if (!status.isOK()) { + // If _participantsDurable is true, the TransactionCoordinator may have already sent a + // prepare command to at least one participant shard. + // Termination with an unexpected error (other than shutdown or stepping down) in this state + // could leave a prepared transaction stuck until the coordinator shuts down or steps down. + // In such cases, we issue a fatal assertion to ensure the problem is loudly reported and + // allow another coordinator to continue the commit path. + if (_participantsDurable && !status.isA() && + !status.isA()) { + LOGV2_FATAL(11353000, + "TransactionCoordinator encountered an unexpected termination error.", + "sessionId"_attr = _lsid, + "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter, + "error"_attr = status); + } _completionPromise.setError(status); } else { _completionPromise.setFrom(_decisionPromise.getFuture().getNoThrow().getStatus()); diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index ca5551f357a..8d3c148d271 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -165,7 +165,7 @@ public: * `WaitForMajority` invocations under this coordinator. typically invoked only by the * TransactionCoordinatorService during stepdown. */ - void cancel(); + void cancelForStepDown(); TxnRetryCounter getTxnRetryCounterForTest() const { return *_txnNumberAndRetryCounter.getTxnRetryCounter(); @@ -260,8 +260,8 @@ private: // The deadline for the TransactionCoordinator to reach a decision Date_t _deadline; - // The cancellation source for WaitForMajority. - CancellationSource _cancellationSource; + // The onStepDown cancellation source for WaitForMajority. + CancellationSource _waitForMajorityCancellationSourceOnStepDown; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index d955a3debed..2f326686206 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -360,7 +360,7 @@ void TransactionCoordinatorService::initializeIfNeeded(OperationContext* opCtx, } } -void TransactionCoordinatorService::interrupt() { +void TransactionCoordinatorService::interruptForStepDown() { std::vector> coordinatorsToCancel; { @@ -377,11 +377,11 @@ void TransactionCoordinatorService::interrupt() { } if (_catalogAndSchedulerToCleanup) { - _catalogAndSchedulerToCleanup->interrupt(); + _catalogAndSchedulerToCleanup->interruptForStepDown(); } for (auto& ptr : coordinatorsToCancel) { - ptr->cancel(); + ptr->cancelForStepDown(); } } @@ -390,7 +390,7 @@ void TransactionCoordinatorService::shutdown() { stdx::lock_guard lg(_mutex); _isShuttingDown = true; } - interrupt(); + interruptForStepDown(); _joinAndCleanup(); } @@ -423,7 +423,7 @@ void TransactionCoordinatorService::_joinAndCleanup() { schedulerToCleanup->join(); } -void TransactionCoordinatorService::CatalogAndScheduler::interrupt() { +void TransactionCoordinatorService::CatalogAndScheduler::interruptForStepDown() { scheduler.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Transaction coordinator service stepping down"}); catalog.onStepDown(); diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h index 79663688b8d..0b8015a9cc4 100644 --- a/src/mongo/db/s/transaction_coordinator_service.h +++ b/src/mongo/db/s/transaction_coordinator_service.h @@ -130,7 +130,7 @@ public: * Interrupts the scheduler and marks the coordinator catalog as stepping down, which triggers * all the coordinators to stop. */ - void interrupt(); + void interruptForStepDown(); /** * Shuts down this service. This will no longer be usable once shutdown is called. @@ -147,7 +147,7 @@ protected: struct CatalogAndScheduler { CatalogAndScheduler(ServiceContext* service) : scheduler(service) {} - void interrupt(); + void interruptForStepDown(); void join(); txn::AsyncWorkScheduler scheduler; diff --git a/src/mongo/db/s/transaction_coordinator_service_test.cpp b/src/mongo/db/s/transaction_coordinator_service_test.cpp index 4736590df80..225b1bd4550 100644 --- a/src/mongo/db/s/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_service_test.cpp @@ -253,7 +253,7 @@ class TransactionCoordinatorServiceInitializationTest : public TransactionCoordinatorServiceTestFixture { protected: void tearDown() override { - _testService.interrupt(); + _testService.interruptForStepDown(); executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); _testService.shutdown(); @@ -385,7 +385,7 @@ TEST_F(TransactionCoordinatorServiceInitializationTest, InterruptBeforeInitializ // Should cancel all outstanding tasks (including the recovery task started by // initialize above, which has not yet run). - _testService.interrupt(); + _testService.interruptForStepDown(); executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); ASSERT_TRUE(_testService.pendingCleanup()); @@ -408,7 +408,7 @@ TEST_F(TransactionCoordinatorServiceInitializationTest, InitializingTwiceForSame TEST_F(TransactionCoordinatorServiceInitializationTest, CannotInitializeForOldTermAfterInterruption) { _testService.initializeIfNeeded(operationContext(), /* term */ 1); - _testService.interrupt(); + _testService.interruptForStepDown(); executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); // Note: Replication machinery should not call initialize for an old term @@ -433,7 +433,7 @@ TEST_F(TransactionCoordinatorServiceInitializationTest, InterruptedNodeCleanupsDuringNextInitialization) { _testService.initializeIfNeeded(operationContext(), /* term */ 1); - _testService.interrupt(); + _testService.interruptForStepDown(); executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); ASSERT_TRUE(_testService.pendingCleanup()); @@ -447,7 +447,7 @@ TEST_F(TransactionCoordinatorServiceInitializationTest, TEST_F(TransactionCoordinatorServiceInitializationTest, OperationsFailsAfterServiceIsInterruptedUntilNextTerm) { _testService.initializeIfNeeded(operationContext(), /* term */ 1); - _testService.interrupt(); + _testService.interruptForStepDown(); executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); // Operations fails because service got interrupted. @@ -497,7 +497,7 @@ protected: } void tearDown() override { - service()->interrupt(); + service()->interruptForStepDown(); executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); service()->shutdown(); diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index 7d2c92a9405..57c1cc5e96f 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -73,6 +73,7 @@ #include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" #include "mongo/platform/atomic_word.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/log_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -1570,6 +1571,48 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesEndOfTransactionOplogEntry) ASSERT_BSONOBJ_EQ(o2.Obj(), expectedO2); } +TEST_F(TransactionCoordinatorTest, + CoordinatorTerminatedWithUnexpectedErrorBeforeDurablyWritingDecision) { + // Create the coordinator. + auto aws = std::make_unique(getServiceContext()); + auto coordinator = std::make_shared( + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); + coordinator->start(operationContext()); + + // Wait until the coordinator is writing the participant list. + FailPointEnableBlock fp("hangBeforeWaitingForParticipantListWriteConcern"); + coordinator->runCommit(operationContext(), kTwoShardIdList); + + // Shut down the coordinator's AWS with unexpected error. + killClientOpCtx(getServiceContext(), + "hangBeforeWaitingForParticipantListWriteConcern", + ErrorCodes::InternalError); + + executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); + ASSERT_THROWS_CODE(coordinator->onCompletion().get(), DBException, ErrorCodes::InternalError); + coordinator->shutdown(); + executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); +} + +DEATH_TEST_REGEX_F(TransactionCoordinatorTest, + CoordinatorTerminatedWithUnexpectedErrorAfterDurablyWritingDecision, + "Fatal assertion.*11353000") { + // Create the coordinator. + auto aws = std::make_unique(getServiceContext()); + auto awsPtr = aws.get(); + auto coordinator = std::make_shared( + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); + coordinator->start(operationContext()); + coordinator->runCommit(operationContext(), kTwoShardIdList); + assertPrepareSentAndRespondWithRetryableError(); + + // Shut down the coordinator's AWS with unexpected error while waiting on prepare response. + awsPtr->shutdown({ErrorCodes::InternalError, "dummy"}); + executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations(); + coordinator->onCompletion().get(); +} + + class TransactionCoordinatorMetricsTest : public TransactionCoordinatorTestBase { protected: TransactionCoordinatorMetricsTest()