SERVER-113530 Address fragility in the transaction coordinator (#44211) (#48925)

GitOrigin-RevId: aac96b3a0a1cd6d2588dc070823eeadd3e001d08
This commit is contained in:
Moustafa Maher 2026-03-05 13:32:38 -08:00 committed by MongoDB Bot
parent 590a3ee8e5
commit 1613813459
23 changed files with 121 additions and 50 deletions

View File

@ -89,7 +89,7 @@ Status stepDownForShutdown(OperationContext* opCtx,
// Even if the ReplicationCoordinator failed to step down, ensure we still interrupt the
// TransactionCoordinatorService (see SERVER-45009).
TransactionCoordinatorService::get(opCtx)->interrupt();
TransactionCoordinatorService::get(opCtx)->interruptForStepDown();
}
return Status::OK();
}

View File

@ -1008,12 +1008,12 @@ void ReplicationCoordinatorExternalStateImpl::onStepDownHook() {
void ReplicationCoordinatorExternalStateImpl::_shardingOnStepDownHook() {
if (serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer)) {
PeriodicShardedIndexConsistencyChecker::get(_service).onStepDown();
TransactionCoordinatorService::get(_service)->interrupt();
TransactionCoordinatorService::get(_service)->interruptForStepDown();
}
if (ShardingState::get(_service)->enabled()) {
if (!serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer)) {
// Called earlier for config servers.
TransactionCoordinatorService::get(_service)->interrupt();
TransactionCoordinatorService::get(_service)->interruptForStepDown();
}
FilteringMetadataCache::get(_service)->onStepDown();

View File

@ -82,11 +82,6 @@ std::unique_ptr<ThreadPool> makeThreadPool(StringData readOrWrite) {
options.maxThreads = 2;
return std::make_unique<ThreadPool>(options);
}
inline Status waitUntilMajorityCanceledStatus() {
static StaticImmortal s =
Status{ErrorCodes::CallbackCanceled, "WaitForMajorityService::waitUntilMajority canceled"};
return *s;
}
} // namespace
WaitForMajorityService::~WaitForMajorityService() {
@ -188,7 +183,8 @@ SemiFuture<void> WaitForMajorityServiceImplBase::waitUntilMajority(
}
if (cancelToken.isCanceled()) {
return {SemiFuture<void>::makeReady(waitUntilMajorityCanceledStatus())};
return {
SemiFuture<void>::makeReady(WaitForMajorityService::waitUntilMajorityCanceledStatus())};
}
const bool wasEmpty = _queuedOpTimes.empty();
@ -217,7 +213,7 @@ SemiFuture<void> WaitForMajorityServiceImplBase::waitUntilMajority(
}
auto clientGuard = _waitForMajorityCancellationClient->bind();
if (!request->hasBeenProcessed.swap(true)) {
request->result.setError(waitUntilMajorityCanceledStatus());
request->result.setError(WaitForMajorityService::waitUntilMajorityCanceledStatus());
stdx::lock_guard lk(_mutex);
auto it = std::find_if(
std::begin(_queuedOpTimes),

View File

@ -181,6 +181,11 @@ public:
static WaitForMajorityService& get(ServiceContext* service);
static Status waitUntilMajorityCanceledStatus() {
return Status{ErrorCodes::CallbackCanceled,
"WaitForMajorityService::waitUntilMajority canceled"};
}
/**
* Sets up the background thread pool responsible for waiting for opTimes to be majority
* committed.

View File

@ -90,7 +90,7 @@ protected:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -164,7 +164,7 @@ protected:
_skipUpdatingCardinalityParamFP->setMode(FailPoint::off);
_skipBlockingDDLCoordinatorsDuringAddAndRemoveShardFP->setMode(FailPoint::off);
WaitForMajorityService::get(getServiceContext()).shutDown();
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -95,7 +95,7 @@ class ShardingCatalogManagerBumpCollectionPlacementVersionAndChangeMetadataTest
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -95,7 +95,7 @@ protected:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
WaitForMajorityService::get(getServiceContext()).shutDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -122,7 +122,7 @@ protected:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
WaitForMajorityService::get(getServiceContext()).shutDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -101,7 +101,7 @@ protected:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}
@ -702,7 +702,7 @@ protected:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -126,7 +126,7 @@ protected:
void tearDown() override {
_skipUpdatingCardinalityParamFP->setMode(FailPoint::off);
_skipBlockingDDLCoordinatorsDuringAddAndRemoveShardFP->setMode(FailPoint::off);
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -87,7 +87,7 @@ protected:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -100,7 +100,7 @@ public:
}
~BenchmarkConfigServerTestFixture() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
WaitForMajorityService::get(getServiceContext()).shutDown();
ShardingCatalogManager::get(operationContext())->shutDown();
ConfigServerTestFixture::tearDown();

View File

@ -311,7 +311,7 @@ public:
void tearDown() override {
globalFailPointRegistry().disableAllFailpoints();
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
WaitForMajorityService::get(getServiceContext()).shutDown();
ConfigServerTestFixture::tearDown();
_registry->onShutdown();

View File

@ -171,7 +171,7 @@ protected:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -112,7 +112,7 @@ public:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
WaitForMajorityService::get(getServiceContext()).shutDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -103,7 +103,7 @@ private:
}
void tearDown() override {
TransactionCoordinatorService::get(operationContext())->interrupt();
TransactionCoordinatorService::get(operationContext())->interruptForStepDown();
ConfigServerTestFixture::tearDown();
}

View File

@ -244,7 +244,7 @@ void TransactionCoordinator::start(OperationContext* operationContext) {
std::move(opTime),
_lsid,
_txnNumberAndRetryCounter,
_cancellationSource.token());
_waitForMajorityCancellationSourceOnStepDown.token());
})
.thenRunOn(_scheduler->getExecutor())
.then([this, self = shared_from_this(), apiParams] {
@ -354,13 +354,14 @@ void TransactionCoordinator::start(OperationContext* operationContext) {
})
.then([this, self = shared_from_this()](repl::OpTime opTime) {
setDecisionPromise(*_decision, _decisionPromise);
return waitForMajorityWithHangFailpoint(_serviceContext,
hangBeforeWaitingForDecisionWriteConcern,
"hangBeforeWaitingForDecisionWriteConcern",
std::move(opTime),
_lsid,
_txnNumberAndRetryCounter,
_cancellationSource.token());
return waitForMajorityWithHangFailpoint(
_serviceContext,
hangBeforeWaitingForDecisionWriteConcern,
"hangBeforeWaitingForDecisionWriteConcern",
std::move(opTime),
_lsid,
_txnNumberAndRetryCounter,
_waitForMajorityCancellationSourceOnStepDown.token());
})
.then([this, self = shared_from_this(), apiParams] {
{
@ -545,8 +546,8 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() {
"Transaction exceeded deadline or newer transaction started"});
}
void TransactionCoordinator::cancel() {
_cancellationSource.cancel();
void TransactionCoordinator::cancelForStepDown() {
_waitForMajorityCancellationSourceOnStepDown.cancel();
}
bool TransactionCoordinator::_reserveKickOffCommitPromise() {
@ -569,6 +570,18 @@ void TransactionCoordinator::_done(Status status) {
<< "Coordinator " << _lsid << ':' << _txnNumberAndRetryCounter.toBSON()
<< " stopped due to: " << status.reason());
// The 'CallbackCanceled' error code may only terminate a TransactionCoordinator continuation if
// the coordinator is stepping down from primary while awaiting majority write concern.
if (status == ErrorCodes::CallbackCanceled) {
invariant(status.reason() ==
WaitForMajorityService::waitUntilMajorityCanceledStatus().reason());
invariant(_waitForMajorityCancellationSourceOnStepDown.token().isCanceled());
status = Status(ErrorCodes::InterruptedDueToReplStateChange,
str::stream()
<< "Coordinator " << _lsid << ':' << _txnNumberAndRetryCounter.toBSON()
<< " stopped due to stepDown with: " << status.reason());
}
LOGV2_DEBUG(22447,
3,
"Two-phase commit completed",
@ -605,6 +618,20 @@ void TransactionCoordinator::_done(Status status) {
}
if (!status.isOK()) {
// If _participantsDurable is true, the TransactionCoordinator may have already sent a
// prepare command to at least one participant shard.
// Termination with an unexpected error (other than shutdown or stepping down) in this state
// could leave a prepared transaction stuck until the coordinator shuts down or steps down.
// In such cases, we issue a fatal assertion to ensure the problem is loudly reported and
// allow another coordinator to continue the commit path.
if (_participantsDurable && !status.isA<ErrorCategory::NotPrimaryError>() &&
!status.isA<ErrorCategory::ShutdownError>()) {
LOGV2_FATAL(11353000,
"TransactionCoordinator encountered an unexpected termination error.",
"sessionId"_attr = _lsid,
"txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter,
"error"_attr = status);
}
_completionPromise.setError(status);
} else {
_completionPromise.setFrom(_decisionPromise.getFuture().getNoThrow().getStatus());

View File

@ -165,7 +165,7 @@ public:
* `WaitForMajority` invocations under this coordinator. typically invoked only by the
* TransactionCoordinatorService during stepdown.
*/
void cancel();
void cancelForStepDown();
TxnRetryCounter getTxnRetryCounterForTest() const {
return *_txnNumberAndRetryCounter.getTxnRetryCounter();
@ -260,8 +260,8 @@ private:
// The deadline for the TransactionCoordinator to reach a decision
Date_t _deadline;
// The cancellation source for WaitForMajority.
CancellationSource _cancellationSource;
// The onStepDown cancellation source for WaitForMajority.
CancellationSource _waitForMajorityCancellationSourceOnStepDown;
};
} // namespace mongo

View File

@ -360,7 +360,7 @@ void TransactionCoordinatorService::initializeIfNeeded(OperationContext* opCtx,
}
}
void TransactionCoordinatorService::interrupt() {
void TransactionCoordinatorService::interruptForStepDown() {
std::vector<std::shared_ptr<TransactionCoordinator>> coordinatorsToCancel;
{
@ -377,11 +377,11 @@ void TransactionCoordinatorService::interrupt() {
}
if (_catalogAndSchedulerToCleanup) {
_catalogAndSchedulerToCleanup->interrupt();
_catalogAndSchedulerToCleanup->interruptForStepDown();
}
for (auto& ptr : coordinatorsToCancel) {
ptr->cancel();
ptr->cancelForStepDown();
}
}
@ -390,7 +390,7 @@ void TransactionCoordinatorService::shutdown() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
_isShuttingDown = true;
}
interrupt();
interruptForStepDown();
_joinAndCleanup();
}
@ -423,7 +423,7 @@ void TransactionCoordinatorService::_joinAndCleanup() {
schedulerToCleanup->join();
}
void TransactionCoordinatorService::CatalogAndScheduler::interrupt() {
void TransactionCoordinatorService::CatalogAndScheduler::interruptForStepDown() {
scheduler.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown,
"Transaction coordinator service stepping down"});
catalog.onStepDown();

View File

@ -130,7 +130,7 @@ public:
* Interrupts the scheduler and marks the coordinator catalog as stepping down, which triggers
* all the coordinators to stop.
*/
void interrupt();
void interruptForStepDown();
/**
* Shuts down this service. This will no longer be usable once shutdown is called.
@ -147,7 +147,7 @@ protected:
struct CatalogAndScheduler {
CatalogAndScheduler(ServiceContext* service) : scheduler(service) {}
void interrupt();
void interruptForStepDown();
void join();
txn::AsyncWorkScheduler scheduler;

View File

@ -253,7 +253,7 @@ class TransactionCoordinatorServiceInitializationTest
: public TransactionCoordinatorServiceTestFixture {
protected:
void tearDown() override {
_testService.interrupt();
_testService.interruptForStepDown();
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
_testService.shutdown();
@ -385,7 +385,7 @@ TEST_F(TransactionCoordinatorServiceInitializationTest, InterruptBeforeInitializ
// Should cancel all outstanding tasks (including the recovery task started by
// initialize above, which has not yet run).
_testService.interrupt();
_testService.interruptForStepDown();
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
ASSERT_TRUE(_testService.pendingCleanup());
@ -408,7 +408,7 @@ TEST_F(TransactionCoordinatorServiceInitializationTest, InitializingTwiceForSame
TEST_F(TransactionCoordinatorServiceInitializationTest,
CannotInitializeForOldTermAfterInterruption) {
_testService.initializeIfNeeded(operationContext(), /* term */ 1);
_testService.interrupt();
_testService.interruptForStepDown();
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
// Note: Replication machinery should not call initialize for an old term
@ -433,7 +433,7 @@ TEST_F(TransactionCoordinatorServiceInitializationTest,
InterruptedNodeCleanupsDuringNextInitialization) {
_testService.initializeIfNeeded(operationContext(), /* term */ 1);
_testService.interrupt();
_testService.interruptForStepDown();
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
ASSERT_TRUE(_testService.pendingCleanup());
@ -447,7 +447,7 @@ TEST_F(TransactionCoordinatorServiceInitializationTest,
TEST_F(TransactionCoordinatorServiceInitializationTest,
OperationsFailsAfterServiceIsInterruptedUntilNextTerm) {
_testService.initializeIfNeeded(operationContext(), /* term */ 1);
_testService.interrupt();
_testService.interruptForStepDown();
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
// Operations fails because service got interrupted.
@ -497,7 +497,7 @@ protected:
}
void tearDown() override {
service()->interrupt();
service()->interruptForStepDown();
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
service()->shutdown();

View File

@ -73,6 +73,7 @@
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/log_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@ -1570,6 +1571,48 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesEndOfTransactionOplogEntry)
ASSERT_BSONOBJ_EQ(o2.Obj(), expectedO2);
}
TEST_F(TransactionCoordinatorTest,
CoordinatorTerminatedWithUnexpectedErrorBeforeDurablyWritingDecision) {
// Create the coordinator.
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto coordinator = std::make_shared<TransactionCoordinator>(
operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
coordinator->start(operationContext());
// Wait until the coordinator is writing the participant list.
FailPointEnableBlock fp("hangBeforeWaitingForParticipantListWriteConcern");
coordinator->runCommit(operationContext(), kTwoShardIdList);
// Shut down the coordinator's AWS with unexpected error.
killClientOpCtx(getServiceContext(),
"hangBeforeWaitingForParticipantListWriteConcern",
ErrorCodes::InternalError);
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
ASSERT_THROWS_CODE(coordinator->onCompletion().get(), DBException, ErrorCodes::InternalError);
coordinator->shutdown();
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
}
DEATH_TEST_REGEX_F(TransactionCoordinatorTest,
CoordinatorTerminatedWithUnexpectedErrorAfterDurablyWritingDecision,
"Fatal assertion.*11353000") {
// Create the coordinator.
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
auto coordinator = std::make_shared<TransactionCoordinator>(
operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
coordinator->start(operationContext());
coordinator->runCommit(operationContext(), kTwoShardIdList);
assertPrepareSentAndRespondWithRetryableError();
// Shut down the coordinator's AWS with unexpected error while waiting on prepare response.
awsPtr->shutdown({ErrorCodes::InternalError, "dummy"});
executor::NetworkInterfaceMock::InNetworkGuard(network())->runReadyNetworkOperations();
coordinator->onCompletion().get();
}
class TransactionCoordinatorMetricsTest : public TransactionCoordinatorTestBase {
protected:
TransactionCoordinatorMetricsTest()