SERVER-113740 Support prepared transactions with chunk migrations in DSC (#46173)

GitOrigin-RevId: b91b9a5ee2b7d8017cd41e07f9e1960918ff7ec2
This commit is contained in:
Wenqin 2026-01-26 12:35:08 -05:00 committed by MongoDB Bot
parent 42fed9d4ab
commit 9f21d93c97
14 changed files with 192 additions and 140 deletions

1
.github/CODEOWNERS vendored
View File

@ -2232,6 +2232,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
# The following patterns are parsed from ./src/mongo/db/modules/atlas/jstests/disagg_storage/OWNERS.yml
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/* @10gen/server-disagg-storage @svc-auto-approve-bot
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/sharding_basic.js @10gen/server-catalog-and-routing @svc-auto-approve-bot
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/chunk_migration* @10gen/server-cluster-scalability @svc-auto-approve-bot
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/log_serv* @10gen/server-pali @svc-auto-approve-bot
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/page_serv* @10gen/server-pali @svc-auto-approve-bot

View File

@ -1,7 +1,6 @@
/**
* Test that a migration will:
* 1. Ignore multi-statement transaction prepare conflicts in the clone phase, and
* 2. Pick up the changes for prepared transactions in the transfer mods phase.
* Test that a migration will pick up the changes for prepared transactions in the transfer mods
* phase.
*
* @tags: [uses_transactions, uses_prepare_transaction, requires_persistence]
*/
@ -12,6 +11,7 @@ import {
moveChunkStepNames,
pauseMigrateAtStep,
unpauseMigrateAtStep,
waitForMigrateStep,
waitForMoveChunkStep,
} from "jstests/libs/chunk_manipulation_util.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
@ -51,6 +51,10 @@ let runTest = function (testMode) {
]),
);
pauseMigrateAtStep(st.shard1, migrateStepNames.cloned);
const joinMoveChunk = moveChunkParallel(staticMongod, st.s.host, {x: 1}, null, "test.user", st.shard1.shardName);
const lsid = {id: UUID()};
const txnNumber = 0;
let stmtId = 0;
@ -132,20 +136,6 @@ let runTest = function (testMode) {
});
}
const joinMoveChunk = moveChunkParallel(staticMongod, st.s.host, {x: 1}, null, "test.user", st.shard1.shardName);
pauseMigrateAtStep(st.shard1, migrateStepNames.catchup);
// The donor shard only ignores prepare conflicts while scanning over the shard key index. We
// wait for donor shard to have finished buffering the RecordIds into memory from scanning over
// the shard key index before committing the transaction. Notably, the donor shard doesn't
// ignore prepare conflicts when fetching the full contents of the documents during calls to
// _migrateClone.
//
// TODO: SERVER-71028 Remove comment after making changes.
waitForMoveChunkStep(st.shard0, moveChunkStepNames.startedMoveChunk);
assert.commandWorked(
st.shard0.getDB(dbName).adminCommand(
Object.assign(
@ -160,7 +150,7 @@ let runTest = function (testMode) {
),
);
unpauseMigrateAtStep(st.shard1, migrateStepNames.catchup);
unpauseMigrateAtStep(st.shard1, migrateStepNames.cloned);
joinMoveChunk();

View File

@ -631,21 +631,26 @@ public:
/**
* This method is called when a transaction transitions into prepare while it is not primary,
* e.g. during secondary oplog application or recoverying prepared transactions from the
* oplog after restart. The method explicitly requires a session id (i.e. does not use the
* session id attached to the opCtx) because transaction oplog application currently applies the
* oplog entries for each prepared transaction in multiple internal sessions acquired from the
* e.g. during secondary oplog application, recovering prepared transactions from the
* oplog after restart or recovering prepared transactions from a precise checkpoint after
* restart. The method explicitly requires a session id (i.e. does not use the session id
* attached to the opCtx) because transaction oplog application currently applies the oplog
* entries for each prepared transaction in multiple internal sessions acquired from the
* InternalSessionPool. Currently, those internal sessions are completely unrelated to the
* session for the transaction itself. For a non-retryable internal transaction, not using the
* transaction session id in the codepath here can cause the opTime for the transaction to
* show up in the chunk migration opTime buffer although the writes they correspond to are not
* retryable and therefore are discarded anyway.
*
* WARNING: This should only be used by chunk migration. Statements and prepareOpTime are not
* available when a prepared transaction is recovered from a precise checkpoint, and chunk
* migration has special handling for this case.
*/
virtual void onTransactionPrepareNonPrimary(OperationContext* opCtx,
const LogicalSessionId& lsid,
const std::vector<repl::OplogEntry>& statements,
const repl::OpTime& prepareOpTime) = 0;
virtual void onTransactionPrepareNonPrimaryForChunkMigration(
OperationContext* opCtx,
const LogicalSessionId& lsid,
boost::optional<const std::vector<repl::OplogEntry>&> statements,
boost::optional<const repl::OpTime&> prepareOpTime) = 0;
/**
* The onTransactionAbort method is called when an atomic transaction aborts, before the

View File

@ -2353,11 +2353,6 @@ void OpObserverImpl::onTransactionPrepare(
}
}
void OpObserverImpl::onTransactionPrepareNonPrimary(OperationContext* opCtx,
const LogicalSessionId& lsid,
const std::vector<repl::OplogEntry>& statements,
const repl::OpTime& prepareOpTime) {}
void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) {
invariant(opCtx->getTxnNumber());

View File

@ -275,10 +275,11 @@ public:
const std::vector<OplogSlot>& reservedSlots,
const TransactionOperations& transactionOperations) final {}
void onTransactionPrepareNonPrimary(OperationContext* opCtx,
const LogicalSessionId& lsid,
const std::vector<repl::OplogEntry>& statements,
const repl::OpTime& prepareOpTime) final;
void onTransactionPrepareNonPrimaryForChunkMigration(
OperationContext* opCtx,
const LogicalSessionId& lsid,
boost::optional<const std::vector<repl::OplogEntry>&> statements,
boost::optional<const repl::OpTime&> prepareOpTime) final {};
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final;

View File

@ -262,10 +262,11 @@ public:
const std::vector<OplogSlot>& reservedSlots,
const TransactionOperations& transactionOperations) override {}
void onTransactionPrepareNonPrimary(OperationContext* opCtx,
const LogicalSessionId& lsid,
const std::vector<repl::OplogEntry>& statements,
const repl::OpTime& prepareOpTime) override {}
void onTransactionPrepareNonPrimaryForChunkMigration(
OperationContext* opCtx,
const LogicalSessionId& lsid,
boost::optional<const std::vector<repl::OplogEntry>&> statements,
boost::optional<const repl::OpTime&> prepareOpTime) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}

View File

@ -579,13 +579,15 @@ public:
}
}
void onTransactionPrepareNonPrimary(OperationContext* opCtx,
const LogicalSessionId& lsid,
const std::vector<repl::OplogEntry>& statements,
const repl::OpTime& prepareOpTime) override {
void onTransactionPrepareNonPrimaryForChunkMigration(
OperationContext* opCtx,
const LogicalSessionId& lsid,
boost::optional<const std::vector<repl::OplogEntry>&> statements,
boost::optional<const repl::OpTime&> prepareOpTime) override {
ReservedTimes times{opCtx};
for (auto& observer : _observers) {
observer->onTransactionPrepareNonPrimary(opCtx, lsid, statements, prepareOpTime);
observer->onTransactionPrepareNonPrimaryForChunkMigration(
opCtx, lsid, statements, prepareOpTime);
}
}

View File

@ -723,9 +723,10 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
txnParticipant.prepareTransaction(opCtx, prepareOp.getOpTime());
auto opObserver = opCtx->getServiceContext()->getOpObserver();
auto prepareOpTime = prepareOp.getOpTime();
invariant(opObserver);
opObserver->onTransactionPrepareNonPrimary(
opCtx, *prepareOp.getSessionId(), txnOps, prepareOp.getOpTime());
opObserver->onTransactionPrepareNonPrimaryForChunkMigration(
opCtx, *prepareOp.getSessionId(), txnOps, prepareOpTime);
// Prepare transaction success.
abortOnError.dismiss();
@ -926,10 +927,15 @@ void _recoverPreparedTransactionFromPreciseCheckpoint(
}
// Reload transaction participant state based on the transaction record.
const auto lsid = txnRecord.getSessionId();
txnParticipant.restorePreparedTxnFromPreciseCheckpoint(opCtx, std::move(txnRecord));
// TODO SERVER-113740: Trigger the op observers for chunk migrations once they support
// not having the full operations list. e.g. onTransactionPrepareNonPrimary()
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
// Statements and prepareOpTime are not recoverable from a precise checkpoint, so we don't
// pass them to the op-observer.
opObserver->onTransactionPrepareNonPrimaryForChunkMigration(
opCtx, lsid, boost::none /* statements */, boost::none /* prepareOpTime */);
// Stash the transaction so it yields its resources and can be resumed by future user
// operations.

View File

@ -330,20 +330,6 @@ Status MigrationChunkClonerSource::startClone(OperationContext* opCtx,
_sessionCatalogSource->fetchNextOplog(opCtx);
{
// Ignore prepare conflicts when we load ids of currently available documents. This is
// acceptable because we will track changes made by prepared transactions at transaction
// commit time.
auto originalPrepareConflictBehavior =
shard_role_details::getRecoveryUnit(opCtx)->getPrepareConflictBehavior();
ON_BLOCK_EXIT([&] {
shard_role_details::getRecoveryUnit(opCtx)->setPrepareConflictBehavior(
originalPrepareConflictBehavior);
});
shard_role_details::getRecoveryUnit(opCtx)->setPrepareConflictBehavior(
PrepareConflictBehavior::kIgnoreConflicts);
auto storeCurrentRecordIdStatus = _storeCurrentRecordId(opCtx);
if (storeCurrentRecordIdStatus == ErrorCodes::ChunkTooBig && _forceJumbo) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
@ -662,6 +648,17 @@ void MigrationChunkClonerSource::_nextCloneBatchFromIndexScan(
ElapsedTracker tracker(&opCtx->fastClockSource(),
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
// Prepare conflicts should never be ignored when performing the shard key index scan to get
// Record IDs of documents to transfer as part of cloning. This is because there can be prepared
// transactions recovered from a precise checkpoint as part of startup recovery that can not set
// up an onCommit handler to send transaction operations to chunk migration's internal buffers.
// As a result, we need to wait on those recovered prepared transactions to complete and be
// transferred as part of cloning, otherwise we can lose data.
tassert(11374001,
"Expected to not ignore prepare conflicts when performing index scan during cloning",
shard_role_details::getRecoveryUnit(opCtx)->getPrepareConflictBehavior() ==
PrepareConflictBehavior::kEnforce);
boost::optional<HandleTransactionResourcesFromStasher> scopedResourceHandler;
auto isFirstIteration = !_jumboChunkCloneState->clonerExec;
if (isFirstIteration) {
@ -1030,6 +1027,17 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
MigrationChunkClonerSource::_getIndexScanExecutor(OperationContext* opCtx,
const CollectionAcquisition& acquisition,
InternalPlanner::IndexScanOptions scanOption) {
// Prepare conflicts should never be ignored when performing the shard key index scan to get
// Record IDs of documents to transfer as part of cloning. This is because there can be prepared
// transactions recovered from a precise checkpoint as part of startup recovery that can not set
// up an onCommit handler to send transaction operations to chunk migration's internal buffers.
// As a result, we need to wait on those recovered prepared transactions to complete and be
// transferred as part of cloning, otherwise we can lose data.
tassert(11374002,
"Expected to not ignore prepare conflicts when performing index scan during cloning",
shard_role_details::getRecoveryUnit(opCtx)->getPrepareConflictBehavior() ==
PrepareConflictBehavior::kEnforce);
// Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any
// multi-key index prefixed by shard key cannot be multikey over the shard key fields.
const auto shardKeyIdx = findShardKeyPrefixedIndex(opCtx,

View File

@ -314,14 +314,25 @@ void MigrationChunkClonerSourceOpObserver::postTransactionPrepare(
*opCtx->getLogicalSessionId(), statements, prepareOpTime));
}
void MigrationChunkClonerSourceOpObserver::onTransactionPrepareNonPrimary(
void MigrationChunkClonerSourceOpObserver::onTransactionPrepareNonPrimaryForChunkMigration(
OperationContext* opCtx,
const LogicalSessionId& lsid,
const std::vector<repl::OplogEntry>& statements,
const repl::OpTime& prepareOpTime) {
boost::optional<const std::vector<repl::OplogEntry>&> statements,
boost::optional<const repl::OpTime&> prepareOpTime) {
if (!statements || !prepareOpTime) {
// Statements or prepareOpTime not being available means that the prepared transaction was
// recovered from a precise checkpoint. In this case, we intentionally do not (and cannot)
// install a LogTransactionOperationsForShardingHandler here. During chunk migration, the
// donors cloning phase enforces prepare conflicts, so the shard key index scan will wait
// until this recovered prepared transaction is committed/aborted and its effects are
// visible. As a result, we do not need an onCommit hook to populate the donations internal
// buffers after cloning starts.
return;
}
shard_role_details::getRecoveryUnit(opCtx)->registerChange(
std::make_unique<LogTransactionOperationsForShardingHandler>(
lsid, statements, prepareOpTime));
lsid, *statements, *prepareOpTime));
}
void MigrationChunkClonerSourceOpObserver::onBatchedWriteCommit(

View File

@ -116,10 +116,11 @@ public:
const std::vector<OplogSlot>& reservedSlots,
const TransactionOperations& transactionOperations) final;
void onTransactionPrepareNonPrimary(OperationContext* opCtx,
const LogicalSessionId& lsid,
const std::vector<repl::OplogEntry>& statements,
const repl::OpTime& prepareOpTime) final;
void onTransactionPrepareNonPrimaryForChunkMigration(
OperationContext* opCtx,
const LogicalSessionId& lsid,
boost::optional<const std::vector<repl::OplogEntry>&> statements,
boost::optional<const repl::OpTime&> prepareOpTime) final;
void onBatchedWriteCommit(OperationContext* opCtx,
WriteUnitOfWork::OplogEntryGroupType oplogGroupingFormat,

View File

@ -2107,10 +2107,6 @@ void TransactionParticipant::Participant::restorePreparedTxnFromPreciseCheckpoin
invariant(p().needToWriteAbortEntry);
invariant(p().autoCommit == boost::optional<bool>(false));
// TODO SERVER-113740: These will be unset and we need a way to ensure callers can handle that
// and won't introduce new dependencies on it.
// p().transactionOperations
{
stdx::lock_guard<Client> lg(*opCtx->getClient());
o(lg).txnState.transitionTo(TransactionState::kPrepared);
@ -2135,6 +2131,8 @@ void TransactionParticipant::Participant::restorePreparedTxnFromPreciseCheckpoin
p().activeTxnCommittedStatements = {};
o(lg).hasIncompleteHistory = true;
p().recoveredFromPreciseCheckpoint = true;
// This should be called after checking out the session without refresh, which already sets
// isValid.
invariant(o().isValid);
@ -2186,6 +2184,9 @@ TransactionOperations* TransactionParticipant::Participant::retrieveCompletedTra
invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared),
str::stream() << "Current state: " << o().txnState);
// Prepared transactions recovered from a precise checkpoint do not have transactionOperations.
invariant(!p().recoveredFromPreciseCheckpoint);
return &(p().transactionOperations);
}

View File

@ -1365,6 +1365,8 @@ private:
// transaction.
bool needToWriteAbortEntry{false};
// Set to true if the transaction was recovered from a precise checkpoint.
bool recoveredFromPreciseCheckpoint{false};
} _p;
}; // class TransactionParticipant

View File

@ -7977,11 +7977,74 @@ TEST_F(TxnParticipantTest, CanAddPreciseCheckpointFieldsForRecoveryNoNamespaces)
ASSERT_EQ(namespaces.size(), 0);
}
TEST_F(TxnParticipantTest, CanRecoverPreparedTxnFromSessionTxnRecord) {
//
// Generate a config.transactions entry for a prepared transaction and snapshot its initial
// TransactionParticipant state.
//
class TxnParticipantStartupRecoveryTest : public TxnParticipantTest {
protected:
void setUp() {
TxnParticipantTest::setUp();
//
// Generate a config.transactions entry for a prepared transaction and snapshot its initial
// TransactionParticipant state.
//
runFunctionFromDifferentOpCtx([this, sessionId = _sessionId, txnNumber = _txnNumber](
OperationContext* opCtx) {
const std::vector<NamespaceString> kNamespaces = {
NamespaceString::createNamespaceString_forTest("TestDB1", "TestColl1"),
NamespaceString::createNamespaceString_forTest("TestDB1", "TestColl2"),
NamespaceString::createNamespaceString_forTest("TestDB2", "TestColl1")};
std::vector<UUID> uuids;
for (const auto& nss : kNamespaces) {
AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X);
auto db = autoDb.ensureDbExists(opCtx);
ASSERT_TRUE(db);
WriteUnitOfWork wuow(opCtx);
CollectionOptions options;
auto collection = db->createCollection(opCtx, nss, options);
wuow.commit();
uuids.push_back(collection->uuid());
}
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNumber);
opCtx->setInMultiDocumentTransaction();
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx);
auto opCtxSession = mongoDSessionCatalog->checkOutSession(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant.beginOrContinue(opCtx,
{*opCtx->getTxnNumber()},
false /* autocommit */,
TransactionParticipant::TransactionActions::kStart);
txnParticipant.unstashTransactionResources(opCtx, "insert");
for (size_t collIndex = 0; collIndex < kNamespaces.size(); ++collIndex) {
auto operation = repl::DurableOplogEntry::makeInsertOperation(
kNamespaces[collIndex], uuids[collIndex], BSON("_id" << 0), BSON("_id" << 0));
txnParticipant.addTransactionOperation(opCtx, operation);
}
auto [timestamp, namespaces] = txnParticipant.prepareTransaction(opCtx, {});
SessionTxnRecord txnRecord;
txnRecord.setState(DurableTxnStateEnum::kPrepared);
txnRecord.setSessionId(*opCtx->getLogicalSessionId());
txnRecord.setTxnNum(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber());
txnRecord.setLastWriteOpTime(txnParticipant.getLastWriteOpTime());
txnRecord.setLastWriteDate(Date_t::now());
txnParticipant.addPreparedTransactionPreciseCheckpointRecoveryFields(txnRecord);
this->validatedTxnRecord =
std::make_unique<SessionTxnRecordForPrepareRecovery>(std::move(txnRecord));
this->expectedState.txnNumberAndRetryCounter =
txnParticipant.getActiveTxnNumberAndRetryCounter();
this->expectedState.lastWriteOpTime = txnParticipant.getLastWriteOpTime();
this->expectedState.prepareOpTime = txnParticipant.getPrepareOpTime();
});
// Reset the transaction participant catalog to simulate a process restart.
SessionCatalog::get(opCtx()->getServiceContext())->reset_forTest();
}
std::unique_ptr<SessionTxnRecordForPrepareRecovery> validatedTxnRecord;
struct TransactionParticipantStateSnapshot {
@ -7989,67 +8052,9 @@ TEST_F(TxnParticipantTest, CanRecoverPreparedTxnFromSessionTxnRecord) {
repl::OpTime lastWriteOpTime;
repl::OpTime prepareOpTime;
} expectedState;
};
runFunctionFromDifferentOpCtx([&validatedTxnRecord,
&expectedState,
sessionId = _sessionId,
txnNumber = _txnNumber](OperationContext* opCtx) {
const std::vector<NamespaceString> kNamespaces = {
NamespaceString::createNamespaceString_forTest("TestDB1", "TestColl1"),
NamespaceString::createNamespaceString_forTest("TestDB1", "TestColl2"),
NamespaceString::createNamespaceString_forTest("TestDB2", "TestColl1")};
std::vector<UUID> uuids;
for (const auto& nss : kNamespaces) {
AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X);
auto db = autoDb.ensureDbExists(opCtx);
ASSERT_TRUE(db);
WriteUnitOfWork wuow(opCtx);
CollectionOptions options;
auto collection = db->createCollection(opCtx, nss, options);
wuow.commit();
uuids.push_back(collection->uuid());
}
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNumber);
opCtx->setInMultiDocumentTransaction();
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx);
auto opCtxSession = mongoDSessionCatalog->checkOutSession(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant.beginOrContinue(opCtx,
{*opCtx->getTxnNumber()},
false /* autocommit */,
TransactionParticipant::TransactionActions::kStart);
txnParticipant.unstashTransactionResources(opCtx, "insert");
for (size_t collIndex = 0; collIndex < kNamespaces.size(); ++collIndex) {
auto operation = repl::DurableOplogEntry::makeInsertOperation(
kNamespaces[collIndex], uuids[collIndex], BSON("_id" << 0), BSON("_id" << 0));
txnParticipant.addTransactionOperation(opCtx, operation);
}
auto [timestamp, namespaces] = txnParticipant.prepareTransaction(opCtx, {});
SessionTxnRecord txnRecord;
txnRecord.setState(DurableTxnStateEnum::kPrepared);
txnRecord.setSessionId(*opCtx->getLogicalSessionId());
txnRecord.setTxnNum(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber());
txnRecord.setLastWriteOpTime(txnParticipant.getLastWriteOpTime());
txnRecord.setLastWriteDate(Date_t::now());
txnParticipant.addPreparedTransactionPreciseCheckpointRecoveryFields(txnRecord);
validatedTxnRecord =
std::make_unique<SessionTxnRecordForPrepareRecovery>(std::move(txnRecord));
expectedState.txnNumberAndRetryCounter = txnParticipant.getActiveTxnNumberAndRetryCounter();
expectedState.lastWriteOpTime = txnParticipant.getLastWriteOpTime();
expectedState.prepareOpTime = txnParticipant.getPrepareOpTime();
});
// Reset the transaction participant catalog to simulate a process restart.
SessionCatalog::get(opCtx()->getServiceContext())->reset_forTest();
TEST_F(TxnParticipantStartupRecoveryTest, CanRecoverPreparedTxnFromSessionTxnRecord) {
// Simulate running as a standby.
repl::UnreplicatedWritesBlock uwb(opCtx());
@ -8057,7 +8062,6 @@ TEST_F(TxnParticipantTest, CanRecoverPreparedTxnFromSessionTxnRecord) {
// Run through the flow to reconstruct a prepared transaction after a restart with precise
// checkpoints then verify its state was correctly restored.
//
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx());
auto sessionCheckout = mongoDSessionCatalog->checkOutSessionWithoutRefresh(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
@ -8101,5 +8105,29 @@ TEST_F(TxnParticipantTest, CanRecoverPreparedTxnFromSessionTxnRecord) {
ASSERT_FALSE(txnParticipant.getTxnResourceStashLockerForTest()->isLocked());
}
using TxnParticipantStartupRecoveryDeathTest = TxnParticipantStartupRecoveryTest;
DEATH_TEST_F(TxnParticipantStartupRecoveryDeathTest,
CannotRetrieveTransactionOperations,
"invariant") {
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx());
auto sessionCheckout = mongoDSessionCatalog->checkOutSessionWithoutRefresh(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
for (const auto& ns : validatedTxnRecord->getAffectedNamespaces()) {
(void)acquireCollection(opCtx(),
CollectionAcquisitionRequest(ns,
PlacementConcern::kPretendUnsharded,
repl::ReadConcernArgs::get(opCtx()),
AcquisitionPrerequisites::kWrite),
MODE_IX);
}
txnParticipant.restorePreparedTxnFromPreciseCheckpoint(opCtx(), std::move(*validatedTxnRecord));
txnParticipant.retrieveCompletedTransactionOperations(opCtx());
}
} // namespace
} // namespace mongo