SERVER-103841: transaction coordinator lifetime (#35591)
GitOrigin-RevId: dd34f52ddbb0df6d213462c96573f1094345da87
This commit is contained in:
parent
332cd4e96d
commit
9c69faba5c
@ -55,6 +55,7 @@
|
||||
#include "mongo/db/s/single_transaction_coordinator_stats.h"
|
||||
#include "mongo/db/s/transaction_coordinator.h"
|
||||
#include "mongo/db/s/transaction_coordinator_metrics_observer.h"
|
||||
#include "mongo/db/s/transaction_coordinator_service.h"
|
||||
#include "mongo/db/s/transaction_coordinator_util.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/db/vector_clock_mutable.h"
|
||||
@ -150,8 +151,7 @@ TransactionCoordinator::TransactionCoordinator(
|
||||
const LogicalSessionId& lsid,
|
||||
const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
|
||||
std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
|
||||
Date_t deadline,
|
||||
const CancellationToken& cancelToken)
|
||||
Date_t deadline)
|
||||
: _serviceContext(operationContext->getServiceContext()),
|
||||
_lsid(lsid),
|
||||
_txnNumberAndRetryCounter(txnNumberAndRetryCounter),
|
||||
@ -159,8 +159,7 @@ TransactionCoordinator::TransactionCoordinator(
|
||||
_sendPrepareScheduler(_scheduler->makeChildScheduler()),
|
||||
_transactionCoordinatorMetricsObserver(
|
||||
std::make_unique<TransactionCoordinatorMetricsObserver>()),
|
||||
_deadline(deadline),
|
||||
_cancelToken(cancelToken) {
|
||||
_deadline(deadline) {
|
||||
invariant(_txnNumberAndRetryCounter.getTxnRetryCounter());
|
||||
}
|
||||
|
||||
@ -168,6 +167,11 @@ void TransactionCoordinator::start(OperationContext* operationContext) {
|
||||
invariant(_scheduler, "TransactionCoordinator shutdown before starting");
|
||||
invariant(_sendPrepareScheduler, "TransactionCoordinator shutdown before starting");
|
||||
|
||||
onCompletion().unsafeToInlineFuture().getAsync([this, self = shared_from_this()](auto) {
|
||||
auto* tcs = TransactionCoordinatorService::get(_serviceContext);
|
||||
tcs->notifyCoordinatorFinished(self);
|
||||
});
|
||||
|
||||
auto apiParams = APIParameters::get(operationContext);
|
||||
auto kickOffCommitPF = makePromiseFuture<void>();
|
||||
_kickOffCommitPromise = std::move(kickOffCommitPF.promise);
|
||||
@ -244,7 +248,7 @@ void TransactionCoordinator::start(OperationContext* operationContext) {
|
||||
std::move(opTime),
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
_cancelToken);
|
||||
_cancellationSource.token());
|
||||
})
|
||||
.thenRunOn(_scheduler->getExecutor())
|
||||
.then([this, self = shared_from_this(), apiParams] {
|
||||
@ -360,7 +364,7 @@ void TransactionCoordinator::start(OperationContext* operationContext) {
|
||||
std::move(opTime),
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
_cancelToken);
|
||||
_cancellationSource.token());
|
||||
})
|
||||
.then([this, self = shared_from_this(), apiParams] {
|
||||
{
|
||||
@ -534,6 +538,10 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() {
|
||||
"Transaction exceeded deadline or newer transaction started"});
|
||||
}
|
||||
|
||||
void TransactionCoordinator::cancel() {
|
||||
_cancellationSource.cancel();
|
||||
}
|
||||
|
||||
bool TransactionCoordinator::_reserveKickOffCommitPromise() {
|
||||
stdx::lock_guard<stdx::mutex> lg(_mutex);
|
||||
if (_kickOffCommitPromiseSet)
|
||||
|
||||
@ -99,8 +99,7 @@ public:
|
||||
const LogicalSessionId& lsid,
|
||||
const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
|
||||
std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
|
||||
Date_t deadline,
|
||||
const CancellationToken& cancelToken);
|
||||
Date_t deadline);
|
||||
|
||||
~TransactionCoordinator();
|
||||
|
||||
@ -160,6 +159,13 @@ public:
|
||||
*/
|
||||
void cancelIfCommitNotYetStarted();
|
||||
|
||||
/**
|
||||
* Cancels the owned cancellation token which interrupts/cancels all associated
|
||||
* `WaitForMajority` invocations under this coordinator. typically invoked only by the
|
||||
* TransactionCoordinatorService during stepdown.
|
||||
*/
|
||||
void cancel();
|
||||
|
||||
TxnRetryCounter getTxnRetryCounterForTest() const {
|
||||
return *_txnNumberAndRetryCounter.getTxnRetryCounter();
|
||||
}
|
||||
@ -253,8 +259,8 @@ private:
|
||||
// The deadline for the TransactionCoordinator to reach a decision
|
||||
Date_t _deadline;
|
||||
|
||||
// The cancellation token for WaitForMajority.
|
||||
const CancellationToken _cancelToken;
|
||||
// The cancellation source for WaitForMajority.
|
||||
CancellationSource _cancellationSource;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -78,15 +78,13 @@ protected:
|
||||
|
||||
void createCoordinatorInCatalog(OperationContext* opCtx,
|
||||
LogicalSessionId lsid,
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter,
|
||||
CancellationToken cancelToken) {
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter) {
|
||||
auto newCoordinator = std::make_shared<TransactionCoordinator>(
|
||||
operationContext(),
|
||||
lsid,
|
||||
txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
cancelToken);
|
||||
Date_t::max());
|
||||
newCoordinator->start(operationContext());
|
||||
|
||||
_coordinatorCatalog->insert(opCtx, lsid, txnNumberAndRetryCounter, newCoordinator);
|
||||
@ -107,9 +105,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{2, 0};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
auto coordinatorInCatalog =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter2);
|
||||
ASSERT(coordinatorInCatalog == nullptr);
|
||||
@ -120,9 +116,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
auto coordinatorInCatalog =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter2);
|
||||
ASSERT(coordinatorInCatalog == nullptr);
|
||||
@ -131,9 +125,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
TEST_F(TransactionCoordinatorCatalogTest, CreateFollowedByGetReturnsCoordinator) {
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter{1, 0};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter);
|
||||
auto coordinatorInCatalog =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter);
|
||||
ASSERT(coordinatorInCatalog != nullptr);
|
||||
@ -146,10 +138,8 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{2, 0};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2);
|
||||
|
||||
auto coordinator1InCatalog =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
@ -164,19 +154,16 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
|
||||
// Can only create a new TransactionCoordinator after the previous TransactionCoordinator with
|
||||
// the same txnNumber has reached abort decision.
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
auto coordinator1InCatalog =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
coordinator1InCatalog->runCommit(operationContext(), kOneShardIdList);
|
||||
assertCommandSentAndRespondWith("prepareTransaction", kNoSuchTransaction, boost::none);
|
||||
coordinator1InCatalog->getDecision().wait();
|
||||
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2);
|
||||
|
||||
coordinator1InCatalog =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
@ -201,11 +188,9 @@ DEATH_TEST_F(TransactionCoordinatorCatalogTest,
|
||||
"Invariant failure") {
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter{1, 0};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter);
|
||||
// Re-creating w/ same session id and txn number should cause invariant failure
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter);
|
||||
}
|
||||
|
||||
TEST_F(TransactionCoordinatorCatalogTest, GetLatestOnSessionWithNoCoordinatorsReturnsNone) {
|
||||
@ -220,10 +205,8 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{2, 0};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2);
|
||||
|
||||
auto latestTxnNumberRetryCounterAndCoordinator =
|
||||
_coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
|
||||
@ -241,19 +224,17 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
|
||||
// Can only create a new TransactionCoordinator after the previous TransactionCoordinator with
|
||||
// the same txnNumber has reached abort decision.
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
auto coordinator1InCatalog =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
coordinator1InCatalog->runCommit(operationContext(), kOneShardIdList);
|
||||
assertCommandSentAndRespondWith("prepareTransaction", kNoSuchTransaction, boost::none);
|
||||
coordinator1InCatalog->getDecision().wait();
|
||||
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2);
|
||||
|
||||
auto latestTxnNumberRetryCounterAndCoordinator =
|
||||
_coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
|
||||
@ -275,9 +256,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
TEST_F(TransactionCoordinatorCatalogTest, CoordinatorsRemoveThemselvesFromCatalogWhenTheyComplete) {
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter{1, 0};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter);
|
||||
auto coordinator = _coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter);
|
||||
|
||||
coordinator->cancelIfCommitNotYetStarted();
|
||||
@ -298,11 +277,9 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{2, 0};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2);
|
||||
|
||||
auto latestTxnNumberRetryCounterAndCoordinator =
|
||||
_coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
|
||||
@ -321,19 +298,17 @@ TEST_F(
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
|
||||
// Can only create a new TransactionCoordinator after the previous TransactionCoordinator with
|
||||
// the same txnNumber has reached abort decision.
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
auto coordinator1InCatalog =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
coordinator1InCatalog->runCommit(operationContext(), kOneShardIdList);
|
||||
assertCommandSentAndRespondWith("prepareTransaction", kNoSuchTransaction, boost::none);
|
||||
coordinator1InCatalog->getDecision().wait();
|
||||
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter2);
|
||||
|
||||
auto latestTxnNumberRetryCounterAndCoordinator =
|
||||
_coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
|
||||
@ -358,10 +333,8 @@ TEST_F(
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
auto coordinator1 =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
coordinator1->runCommit(operationContext(), kOneShardIdList);
|
||||
@ -371,8 +344,7 @@ TEST_F(
|
||||
lsid,
|
||||
txnNumberAndRetryCounter2,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
cancelToken);
|
||||
Date_t::max());
|
||||
coordinator2->start(operationContext());
|
||||
|
||||
ASSERT_THROWS_CODE(_coordinatorCatalog->insert(
|
||||
@ -399,10 +371,8 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter1{1, 0};
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter2{1, 1};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1, cancelToken);
|
||||
createCoordinatorInCatalog(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
auto coordinator1 =
|
||||
_coordinatorCatalog->get(operationContext(), lsid, txnNumberAndRetryCounter1);
|
||||
coordinator1->runCommit(operationContext(), kOneShardIdList);
|
||||
@ -414,8 +384,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
lsid,
|
||||
txnNumberAndRetryCounter2,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
cancelToken);
|
||||
Date_t::max());
|
||||
coordinator2->start(operationContext());
|
||||
|
||||
ASSERT_THROWS_CODE(_coordinatorCatalog->insert(
|
||||
@ -438,8 +407,6 @@ TEST_F(TransactionCoordinatorCatalogTest,
|
||||
TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoCatalog) {
|
||||
LogicalSessionId lsid = makeLogicalSessionIdForTest();
|
||||
TxnNumberAndRetryCounter txnNumberAndRetryCounter{1, 0};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken cancelToken{cancelSource.token()};
|
||||
|
||||
txn::AsyncWorkScheduler aws(getServiceContext());
|
||||
TransactionCoordinatorCatalog catalog;
|
||||
@ -449,8 +416,7 @@ TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoC
|
||||
lsid,
|
||||
txnNumberAndRetryCounter,
|
||||
aws.makeChildScheduler(),
|
||||
network()->now() + Seconds{5},
|
||||
cancelToken);
|
||||
network()->now() + Seconds{5});
|
||||
coordinator->start(operationContext());
|
||||
|
||||
aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Test step down"});
|
||||
|
||||
@ -103,12 +103,12 @@ void TransactionCoordinatorService::createCoordinator(
|
||||
latestCoordinator->cancelIfCommitNotYetStarted();
|
||||
}
|
||||
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(opCtx,
|
||||
lsid,
|
||||
txnNumberAndRetryCounter,
|
||||
scheduler.makeChildScheduler(),
|
||||
commitDeadline,
|
||||
_cancelSource.token());
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(
|
||||
opCtx, lsid, txnNumberAndRetryCounter, scheduler.makeChildScheduler(), commitDeadline);
|
||||
{
|
||||
stdx::lock_guard<stdx::mutex> lock(_mutex);
|
||||
_activeTransactionCoordinators.insert(coordinator);
|
||||
}
|
||||
coordinator->start(opCtx);
|
||||
|
||||
try {
|
||||
@ -237,8 +237,7 @@ void TransactionCoordinatorService::_scheduleRecoveryTask(OperationContext* opCt
|
||||
_catalogAndScheduler->scheduler
|
||||
.scheduleWorkIn(
|
||||
recoveryDelay,
|
||||
[catalogAndScheduler = _catalogAndScheduler,
|
||||
cancelSource = _cancelSource](OperationContext* opCtx) {
|
||||
[catalogAndScheduler = _catalogAndScheduler](OperationContext* opCtx) {
|
||||
if (MONGO_unlikely(hangBeforeTxnCoordinatorOnStepUpWork.shouldFail())) {
|
||||
LOGV2(8288301, "Hit hangBeforeTxnCoordinatorOnStepUpWork failpoint");
|
||||
hangBeforeTxnCoordinatorOnStepUpWork.pauseWhileSet(opCtx);
|
||||
@ -300,8 +299,7 @@ void TransactionCoordinatorService::_scheduleRecoveryTask(OperationContext* opCt
|
||||
lsid,
|
||||
TxnNumberAndRetryCounter{txnNumber, txnRetryCounter},
|
||||
scheduler.makeChildScheduler(),
|
||||
clockSource->now() + Seconds(gTransactionLifetimeLimitSeconds.load()),
|
||||
cancelSource.token());
|
||||
clockSource->now() + Seconds(gTransactionLifetimeLimitSeconds.load()));
|
||||
coordinator->start(opCtx);
|
||||
|
||||
catalog.insert(opCtx,
|
||||
@ -347,7 +345,6 @@ void TransactionCoordinatorService::initializeIfNeeded(OperationContext* opCtx,
|
||||
|
||||
LOGV2(9307801, "Creating the transaction catalog and scheduler for primary node.");
|
||||
_catalogAndScheduler = std::make_shared<CatalogAndScheduler>(opCtx->getServiceContext());
|
||||
_cancelSource = CancellationSource();
|
||||
_initTerm = term;
|
||||
|
||||
_scheduleRecoveryTask(opCtx, recoveryDelay);
|
||||
@ -363,16 +360,28 @@ void TransactionCoordinatorService::initializeIfNeeded(OperationContext* opCtx,
|
||||
}
|
||||
|
||||
void TransactionCoordinatorService::interrupt() {
|
||||
std::vector<std::shared_ptr<TransactionCoordinator>> coordinatorsToCancel;
|
||||
|
||||
{
|
||||
stdx::lock_guard<stdx::mutex> lg(_mutex);
|
||||
if (!_catalogAndScheduler)
|
||||
return;
|
||||
|
||||
_catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler);
|
||||
if (_catalogAndScheduler) {
|
||||
_catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler);
|
||||
}
|
||||
for (auto& ptr : _activeTransactionCoordinators) {
|
||||
if (auto shared = ptr.lock(); shared) {
|
||||
coordinatorsToCancel.emplace_back(std::move(shared));
|
||||
}
|
||||
}
|
||||
_activeTransactionCoordinators.clear();
|
||||
}
|
||||
|
||||
_cancelSource.cancel();
|
||||
_catalogAndSchedulerToCleanup->interrupt();
|
||||
if (_catalogAndSchedulerToCleanup) {
|
||||
_catalogAndSchedulerToCleanup->interrupt();
|
||||
}
|
||||
|
||||
for (auto& ptr : coordinatorsToCancel) {
|
||||
ptr->cancel();
|
||||
}
|
||||
}
|
||||
|
||||
void TransactionCoordinatorService::shutdown() {
|
||||
@ -425,4 +434,12 @@ void TransactionCoordinatorService::CatalogAndScheduler::join() {
|
||||
catalog.join();
|
||||
}
|
||||
|
||||
void TransactionCoordinatorService::notifyCoordinatorFinished(
|
||||
const std::shared_ptr<TransactionCoordinator> coordinator) {
|
||||
std::lock_guard<stdx::mutex> lock(_mutex);
|
||||
// we don't need to know or care if we actually erased this weak ptr. its valid for this
|
||||
// service to cancel and clear its set of coordinators and have already erased them when
|
||||
// this continuation executes. all we're trying to do is bound memory usage.
|
||||
_activeTransactionCoordinators.erase(coordinator);
|
||||
}
|
||||
} // namespace mongo
|
||||
|
||||
@ -137,6 +137,12 @@ public:
|
||||
*/
|
||||
void shutdown();
|
||||
|
||||
/**
|
||||
* Notifies this service that the provided TransactionCoordinator is finished and no longer
|
||||
* needs to be interrupted when the service is interrupted.
|
||||
*/
|
||||
void notifyCoordinatorFinished(std::shared_ptr<TransactionCoordinator>);
|
||||
|
||||
protected:
|
||||
struct CatalogAndScheduler {
|
||||
CatalogAndScheduler(ServiceContext* service) : scheduler(service) {}
|
||||
@ -207,8 +213,14 @@ private:
|
||||
// Set to true during initialization to avoid multiple thread attempting to initialize at once.
|
||||
bool _isInitializing{false};
|
||||
|
||||
// Used to cancel WaitForMajority for TransactionCoordinator when this service gets interrupted.
|
||||
CancellationSource _cancelSource;
|
||||
// tracks active transactionCoordinators to be interrupted on step-down. previously they were
|
||||
// tracked implicitly through futures associated with the above CancellationSource, but that
|
||||
// does not provide a way to deregister those futures when sub sources complete.
|
||||
// NOTE: this must be an ordered container because std::weak_ptr only exposes an ordering
|
||||
// operator, it does not expose an address or other control block information that would allow
|
||||
// storage in a hash-based container.
|
||||
std::set<std::weak_ptr<TransactionCoordinator>, std::owner_less<>>
|
||||
_activeTransactionCoordinators;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -260,8 +260,6 @@ protected:
|
||||
|
||||
LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
|
||||
TxnNumberAndRetryCounter _txnNumberAndRetryCounter{1, 1};
|
||||
CancellationSource cancelSource;
|
||||
CancellationToken _cancelToken{cancelSource.token()};
|
||||
};
|
||||
|
||||
class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase {
|
||||
@ -1244,8 +1242,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitRes
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1271,8 +1268,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommi
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1300,8 +1296,7 @@ TEST_F(TransactionCoordinatorTest,
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1329,8 +1324,7 @@ TEST_F(TransactionCoordinatorTest,
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1357,8 +1351,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortRe
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1384,8 +1377,7 @@ TEST_F(TransactionCoordinatorTest,
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1418,8 +1410,7 @@ TEST_F(TransactionCoordinatorTest,
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1449,8 +1440,7 @@ TEST_F(TransactionCoordinatorTest,
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1485,8 +1475,7 @@ TEST_F(TransactionCoordinatorTest,
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
auto commitDecisionFuture = coordinator->getDecision();
|
||||
@ -1518,8 +1507,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesEndOfTransactionOplogEntry)
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
coordinator->runCommit(operationContext(), kOneShardIdList);
|
||||
assertPrepareSentAndRespondWithSuccess();
|
||||
@ -1704,8 +1692,7 @@ protected:
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
@ -1886,8 +1873,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) {
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
const auto& stats =
|
||||
coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
|
||||
@ -2063,8 +2049,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) {
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
const auto& stats =
|
||||
coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
|
||||
@ -2110,12 +2095,8 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina
|
||||
|
||||
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
|
||||
auto awsPtr = aws.get();
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(),
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::move(aws),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(
|
||||
operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
const auto& stats =
|
||||
coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
|
||||
@ -2162,12 +2143,8 @@ TEST_F(TransactionCoordinatorMetricsTest,
|
||||
expectedMetrics.totalCreated++;
|
||||
|
||||
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(),
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::move(aws),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(
|
||||
operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
const auto& stats =
|
||||
coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
|
||||
@ -2234,12 +2211,8 @@ TEST_F(TransactionCoordinatorMetricsTest,
|
||||
|
||||
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
|
||||
auto awsPtr = aws.get();
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(),
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::move(aws),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(
|
||||
operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
const auto& stats =
|
||||
coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
|
||||
@ -2305,12 +2278,8 @@ TEST_F(TransactionCoordinatorMetricsTest,
|
||||
expectedMetrics.totalCreated++;
|
||||
|
||||
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(),
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::move(aws),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(
|
||||
operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
const auto& stats =
|
||||
coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
|
||||
@ -2384,12 +2353,8 @@ TEST_F(TransactionCoordinatorMetricsTest,
|
||||
|
||||
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
|
||||
auto awsPtr = aws.get();
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(),
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::move(aws),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(
|
||||
operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
const auto& stats =
|
||||
coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
|
||||
@ -2462,12 +2427,8 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina
|
||||
|
||||
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
|
||||
auto awsPtr = aws.get();
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(),
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::move(aws),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
auto coordinator = std::make_shared<TransactionCoordinator>(
|
||||
operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
const auto& stats =
|
||||
coordinator->getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
|
||||
@ -2547,8 +2508,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::now() + Seconds(1),
|
||||
_cancelToken);
|
||||
Date_t::now() + Seconds(1));
|
||||
coordinator->start(operationContext());
|
||||
|
||||
// Wait until the coordinator is writing the participant list.
|
||||
@ -2621,8 +2581,7 @@ TEST_F(TransactionCoordinatorMetricsTest, DoesNotLogTransactionsUnderSlowMSThres
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
@ -2658,8 +2617,7 @@ TEST_F(
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
|
||||
tickSource()->advance(Milliseconds(101));
|
||||
@ -2693,8 +2651,7 @@ TEST_F(TransactionCoordinatorMetricsTest, LogsTransactionsOverSlowMSThreshold) {
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
@ -2745,8 +2702,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesTerminationCauseFor
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
|
||||
coordinator->runCommit(operationContext(), kTwoShardIdList);
|
||||
@ -2800,8 +2756,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesStepDurationsAndTot
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
|
||||
{
|
||||
@ -2909,8 +2864,7 @@ TEST_F(TransactionCoordinatorMetricsTest, RecoveryFromFailureIndicatedInReportSt
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
|
||||
const auto assertRecoveryFlag = [opCtx = operationContext(),
|
||||
@ -2951,8 +2905,7 @@ TEST_F(TransactionCoordinatorMetricsTest, ClientInformationIncludedInReportState
|
||||
_lsid,
|
||||
_txnNumberAndRetryCounter,
|
||||
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
|
||||
Date_t::max(),
|
||||
_cancelToken);
|
||||
Date_t::max());
|
||||
coordinator->start(operationContext());
|
||||
|
||||
{
|
||||
|
||||
Loading…
Reference in New Issue
Block a user