Co-authored-by: auto-revert-processor <devprod-si-team@mongodb.com> GitOrigin-RevId: 534ccf175499fea09fe51cd5ea093981b6be6073
This commit is contained in:
parent
b5b63641b7
commit
be29a9a08f
1
.github/CODEOWNERS
vendored
1
.github/CODEOWNERS
vendored
@ -2234,7 +2234,6 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
||||
# The following patterns are parsed from ./src/mongo/db/modules/atlas/jstests/disagg_storage/OWNERS.yml
|
||||
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/* @10gen/server-disagg-storage @svc-auto-approve-bot
|
||||
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/sharding_basic.js @10gen/server-catalog-and-routing @svc-auto-approve-bot
|
||||
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/chunk_migration* @10gen/server-cluster-scalability @svc-auto-approve-bot
|
||||
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/log_serv* @10gen/server-pali @svc-auto-approve-bot
|
||||
/src/mongo/db/modules/atlas/jstests/disagg_storage/**/page_serv* @10gen/server-pali @svc-auto-approve-bot
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
/**
|
||||
* Test that a migration will pick up the changes for prepared transactions in the transfer mods
|
||||
* phase.
|
||||
* Test that a migration will:
|
||||
* 1. Ignore multi-statement transaction prepare conflicts in the clone phase, and
|
||||
* 2. Pick up the changes for prepared transactions in the transfer mods phase.
|
||||
*
|
||||
* @tags: [uses_transactions, uses_prepare_transaction, requires_persistence]
|
||||
*/
|
||||
@ -11,7 +12,6 @@ import {
|
||||
moveChunkStepNames,
|
||||
pauseMigrateAtStep,
|
||||
unpauseMigrateAtStep,
|
||||
waitForMigrateStep,
|
||||
waitForMoveChunkStep,
|
||||
} from "jstests/libs/chunk_manipulation_util.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
@ -51,10 +51,6 @@ let runTest = function (testMode) {
|
||||
]),
|
||||
);
|
||||
|
||||
pauseMigrateAtStep(st.shard1, migrateStepNames.cloned);
|
||||
|
||||
const joinMoveChunk = moveChunkParallel(staticMongod, st.s.host, {x: 1}, null, "test.user", st.shard1.shardName);
|
||||
|
||||
const lsid = {id: UUID()};
|
||||
const txnNumber = 0;
|
||||
let stmtId = 0;
|
||||
@ -136,6 +132,20 @@ let runTest = function (testMode) {
|
||||
});
|
||||
}
|
||||
|
||||
const joinMoveChunk = moveChunkParallel(staticMongod, st.s.host, {x: 1}, null, "test.user", st.shard1.shardName);
|
||||
|
||||
pauseMigrateAtStep(st.shard1, migrateStepNames.catchup);
|
||||
|
||||
// The donor shard only ignores prepare conflicts while scanning over the shard key index. We
|
||||
// wait for donor shard to have finished buffering the RecordIds into memory from scanning over
|
||||
// the shard key index before committing the transaction. Notably, the donor shard doesn't
|
||||
// ignore prepare conflicts when fetching the full contents of the documents during calls to
|
||||
// _migrateClone.
|
||||
//
|
||||
// TODO: SERVER-71028 Remove comment after making changes.
|
||||
|
||||
waitForMoveChunkStep(st.shard0, moveChunkStepNames.startedMoveChunk);
|
||||
|
||||
assert.commandWorked(
|
||||
st.shard0.getDB(dbName).adminCommand(
|
||||
Object.assign(
|
||||
@ -150,7 +160,7 @@ let runTest = function (testMode) {
|
||||
),
|
||||
);
|
||||
|
||||
unpauseMigrateAtStep(st.shard1, migrateStepNames.cloned);
|
||||
unpauseMigrateAtStep(st.shard1, migrateStepNames.catchup);
|
||||
|
||||
joinMoveChunk();
|
||||
|
||||
|
||||
@ -631,26 +631,21 @@ public:
|
||||
|
||||
/**
|
||||
* This method is called when a transaction transitions into prepare while it is not primary,
|
||||
* e.g. during secondary oplog application, recovering prepared transactions from the
|
||||
* oplog after restart or recovering prepared transactions from a precise checkpoint after
|
||||
* restart. The method explicitly requires a session id (i.e. does not use the session id
|
||||
* attached to the opCtx) because transaction oplog application currently applies the oplog
|
||||
* entries for each prepared transaction in multiple internal sessions acquired from the
|
||||
* e.g. during secondary oplog application or recoverying prepared transactions from the
|
||||
* oplog after restart. The method explicitly requires a session id (i.e. does not use the
|
||||
* session id attached to the opCtx) because transaction oplog application currently applies the
|
||||
* oplog entries for each prepared transaction in multiple internal sessions acquired from the
|
||||
* InternalSessionPool. Currently, those internal sessions are completely unrelated to the
|
||||
* session for the transaction itself. For a non-retryable internal transaction, not using the
|
||||
* transaction session id in the codepath here can cause the opTime for the transaction to
|
||||
* show up in the chunk migration opTime buffer although the writes they correspond to are not
|
||||
* retryable and therefore are discarded anyway.
|
||||
*
|
||||
* WARNING: This should only be used by chunk migration. Statements and prepareOpTime are not
|
||||
* available when a prepared transaction is recovered from a precise checkpoint, and chunk
|
||||
* migration has special handling for this case.
|
||||
*/
|
||||
virtual void onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
boost::optional<const std::vector<repl::OplogEntry>&> statements,
|
||||
boost::optional<const repl::OpTime&> prepareOpTime) = 0;
|
||||
virtual void onTransactionPrepareNonPrimary(OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
const std::vector<repl::OplogEntry>& statements,
|
||||
const repl::OpTime& prepareOpTime) = 0;
|
||||
|
||||
/**
|
||||
* The onTransactionAbort method is called when an atomic transaction aborts, before the
|
||||
|
||||
@ -2348,6 +2348,11 @@ void OpObserverImpl::onTransactionPrepare(
|
||||
}
|
||||
}
|
||||
|
||||
void OpObserverImpl::onTransactionPrepareNonPrimary(OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
const std::vector<repl::OplogEntry>& statements,
|
||||
const repl::OpTime& prepareOpTime) {}
|
||||
|
||||
void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,
|
||||
boost::optional<OplogSlot> abortOplogEntryOpTime) {
|
||||
invariant(opCtx->getTxnNumber());
|
||||
|
||||
@ -275,11 +275,10 @@ public:
|
||||
const std::vector<OplogSlot>& reservedSlots,
|
||||
const TransactionOperations& transactionOperations) final {}
|
||||
|
||||
void onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
boost::optional<const std::vector<repl::OplogEntry>&> statements,
|
||||
boost::optional<const repl::OpTime&> prepareOpTime) final {};
|
||||
void onTransactionPrepareNonPrimary(OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
const std::vector<repl::OplogEntry>& statements,
|
||||
const repl::OpTime& prepareOpTime) final;
|
||||
|
||||
void onTransactionAbort(OperationContext* opCtx,
|
||||
boost::optional<OplogSlot> abortOplogEntryOpTime) final;
|
||||
|
||||
@ -262,11 +262,10 @@ public:
|
||||
const std::vector<OplogSlot>& reservedSlots,
|
||||
const TransactionOperations& transactionOperations) override {}
|
||||
|
||||
void onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
boost::optional<const std::vector<repl::OplogEntry>&> statements,
|
||||
boost::optional<const repl::OpTime&> prepareOpTime) override {}
|
||||
void onTransactionPrepareNonPrimary(OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
const std::vector<repl::OplogEntry>& statements,
|
||||
const repl::OpTime& prepareOpTime) override {}
|
||||
|
||||
void onTransactionAbort(OperationContext* opCtx,
|
||||
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
|
||||
|
||||
@ -579,15 +579,13 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
boost::optional<const std::vector<repl::OplogEntry>&> statements,
|
||||
boost::optional<const repl::OpTime&> prepareOpTime) override {
|
||||
void onTransactionPrepareNonPrimary(OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
const std::vector<repl::OplogEntry>& statements,
|
||||
const repl::OpTime& prepareOpTime) override {
|
||||
ReservedTimes times{opCtx};
|
||||
for (auto& observer : _observers) {
|
||||
observer->onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
opCtx, lsid, statements, prepareOpTime);
|
||||
observer->onTransactionPrepareNonPrimary(opCtx, lsid, statements, prepareOpTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -723,10 +723,9 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
|
||||
txnParticipant.prepareTransaction(opCtx, prepareOp.getOpTime());
|
||||
|
||||
auto opObserver = opCtx->getServiceContext()->getOpObserver();
|
||||
auto prepareOpTime = prepareOp.getOpTime();
|
||||
invariant(opObserver);
|
||||
opObserver->onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
opCtx, *prepareOp.getSessionId(), txnOps, prepareOpTime);
|
||||
opObserver->onTransactionPrepareNonPrimary(
|
||||
opCtx, *prepareOp.getSessionId(), txnOps, prepareOp.getOpTime());
|
||||
|
||||
// Prepare transaction success.
|
||||
abortOnError.dismiss();
|
||||
@ -927,15 +926,10 @@ void _recoverPreparedTransactionFromPreciseCheckpoint(
|
||||
}
|
||||
|
||||
// Reload transaction participant state based on the transaction record.
|
||||
const auto lsid = txnRecord.getSessionId();
|
||||
txnParticipant.restorePreparedTxnFromPreciseCheckpoint(opCtx, std::move(txnRecord));
|
||||
|
||||
auto opObserver = opCtx->getServiceContext()->getOpObserver();
|
||||
invariant(opObserver);
|
||||
// Statements and prepareOpTime are not recoverable from a precise checkpoint, so we don't
|
||||
// pass them to the op-observer.
|
||||
opObserver->onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
opCtx, lsid, boost::none /* statements */, boost::none /* prepareOpTime */);
|
||||
// TODO SERVER-113740: Trigger the op observers for chunk migrations once they support
|
||||
// not having the full operations list. e.g. onTransactionPrepareNonPrimary()
|
||||
|
||||
// Stash the transaction so it yields its resources and can be resumed by future user
|
||||
// operations.
|
||||
|
||||
@ -330,6 +330,20 @@ Status MigrationChunkClonerSource::startClone(OperationContext* opCtx,
|
||||
_sessionCatalogSource->fetchNextOplog(opCtx);
|
||||
|
||||
{
|
||||
// Ignore prepare conflicts when we load ids of currently available documents. This is
|
||||
// acceptable because we will track changes made by prepared transactions at transaction
|
||||
// commit time.
|
||||
auto originalPrepareConflictBehavior =
|
||||
shard_role_details::getRecoveryUnit(opCtx)->getPrepareConflictBehavior();
|
||||
|
||||
ON_BLOCK_EXIT([&] {
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setPrepareConflictBehavior(
|
||||
originalPrepareConflictBehavior);
|
||||
});
|
||||
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setPrepareConflictBehavior(
|
||||
PrepareConflictBehavior::kIgnoreConflicts);
|
||||
|
||||
auto storeCurrentRecordIdStatus = _storeCurrentRecordId(opCtx);
|
||||
if (storeCurrentRecordIdStatus == ErrorCodes::ChunkTooBig && _forceJumbo) {
|
||||
stdx::lock_guard<stdx::mutex> sl(_mutex);
|
||||
@ -648,17 +662,6 @@ void MigrationChunkClonerSource::_nextCloneBatchFromIndexScan(
|
||||
ElapsedTracker tracker(&opCtx->fastClockSource(),
|
||||
internalQueryExecYieldIterations.load(),
|
||||
Milliseconds(internalQueryExecYieldPeriodMS.load()));
|
||||
// Prepare conflicts should never be ignored when performing the shard key index scan to get
|
||||
// Record IDs of documents to transfer as part of cloning. This is because there can be prepared
|
||||
// transactions recovered from a precise checkpoint as part of startup recovery that can not set
|
||||
// up an onCommit handler to send transaction operations to chunk migration's internal buffers.
|
||||
// As a result, we need to wait on those recovered prepared transactions to complete and be
|
||||
// transferred as part of cloning, otherwise we can lose data.
|
||||
tassert(11374001,
|
||||
"Expected to not ignore prepare conflicts when performing index scan during cloning",
|
||||
shard_role_details::getRecoveryUnit(opCtx)->getPrepareConflictBehavior() ==
|
||||
PrepareConflictBehavior::kEnforce);
|
||||
|
||||
boost::optional<HandleTransactionResourcesFromStasher> scopedResourceHandler;
|
||||
auto isFirstIteration = !_jumboChunkCloneState->clonerExec;
|
||||
if (isFirstIteration) {
|
||||
@ -1027,17 +1030,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
|
||||
MigrationChunkClonerSource::_getIndexScanExecutor(OperationContext* opCtx,
|
||||
const CollectionAcquisition& acquisition,
|
||||
InternalPlanner::IndexScanOptions scanOption) {
|
||||
// Prepare conflicts should never be ignored when performing the shard key index scan to get
|
||||
// Record IDs of documents to transfer as part of cloning. This is because there can be prepared
|
||||
// transactions recovered from a precise checkpoint as part of startup recovery that can not set
|
||||
// up an onCommit handler to send transaction operations to chunk migration's internal buffers.
|
||||
// As a result, we need to wait on those recovered prepared transactions to complete and be
|
||||
// transferred as part of cloning, otherwise we can lose data.
|
||||
tassert(11374002,
|
||||
"Expected to not ignore prepare conflicts when performing index scan during cloning",
|
||||
shard_role_details::getRecoveryUnit(opCtx)->getPrepareConflictBehavior() ==
|
||||
PrepareConflictBehavior::kEnforce);
|
||||
|
||||
// Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any
|
||||
// multi-key index prefixed by shard key cannot be multikey over the shard key fields.
|
||||
const auto shardKeyIdx = findShardKeyPrefixedIndex(opCtx,
|
||||
|
||||
@ -314,25 +314,14 @@ void MigrationChunkClonerSourceOpObserver::postTransactionPrepare(
|
||||
*opCtx->getLogicalSessionId(), statements, prepareOpTime));
|
||||
}
|
||||
|
||||
void MigrationChunkClonerSourceOpObserver::onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
void MigrationChunkClonerSourceOpObserver::onTransactionPrepareNonPrimary(
|
||||
OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
boost::optional<const std::vector<repl::OplogEntry>&> statements,
|
||||
boost::optional<const repl::OpTime&> prepareOpTime) {
|
||||
if (!statements || !prepareOpTime) {
|
||||
// Statements or prepareOpTime not being available means that the prepared transaction was
|
||||
// recovered from a precise checkpoint. In this case, we intentionally do not (and cannot)
|
||||
// install a LogTransactionOperationsForShardingHandler here. During chunk migration, the
|
||||
// donor’s cloning phase enforces prepare conflicts, so the shard key index scan will wait
|
||||
// until this recovered prepared transaction is committed/aborted and its effects are
|
||||
// visible. As a result, we do not need an onCommit hook to populate the donation’s internal
|
||||
// buffers after cloning starts.
|
||||
return;
|
||||
}
|
||||
|
||||
const std::vector<repl::OplogEntry>& statements,
|
||||
const repl::OpTime& prepareOpTime) {
|
||||
shard_role_details::getRecoveryUnit(opCtx)->registerChange(
|
||||
std::make_unique<LogTransactionOperationsForShardingHandler>(
|
||||
lsid, *statements, *prepareOpTime));
|
||||
lsid, statements, prepareOpTime));
|
||||
}
|
||||
|
||||
void MigrationChunkClonerSourceOpObserver::onBatchedWriteCommit(
|
||||
|
||||
@ -116,11 +116,10 @@ public:
|
||||
const std::vector<OplogSlot>& reservedSlots,
|
||||
const TransactionOperations& transactionOperations) final;
|
||||
|
||||
void onTransactionPrepareNonPrimaryForChunkMigration(
|
||||
OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
boost::optional<const std::vector<repl::OplogEntry>&> statements,
|
||||
boost::optional<const repl::OpTime&> prepareOpTime) final;
|
||||
void onTransactionPrepareNonPrimary(OperationContext* opCtx,
|
||||
const LogicalSessionId& lsid,
|
||||
const std::vector<repl::OplogEntry>& statements,
|
||||
const repl::OpTime& prepareOpTime) final;
|
||||
|
||||
void onBatchedWriteCommit(OperationContext* opCtx,
|
||||
WriteUnitOfWork::OplogEntryGroupType oplogGroupingFormat,
|
||||
|
||||
@ -2107,6 +2107,10 @@ void TransactionParticipant::Participant::restorePreparedTxnFromPreciseCheckpoin
|
||||
invariant(p().needToWriteAbortEntry);
|
||||
invariant(p().autoCommit == boost::optional<bool>(false));
|
||||
|
||||
// TODO SERVER-113740: These will be unset and we need a way to ensure callers can handle that
|
||||
// and won't introduce new dependencies on it.
|
||||
// p().transactionOperations
|
||||
|
||||
{
|
||||
stdx::lock_guard<Client> lg(*opCtx->getClient());
|
||||
o(lg).txnState.transitionTo(TransactionState::kPrepared);
|
||||
@ -2131,8 +2135,6 @@ void TransactionParticipant::Participant::restorePreparedTxnFromPreciseCheckpoin
|
||||
p().activeTxnCommittedStatements = {};
|
||||
o(lg).hasIncompleteHistory = true;
|
||||
|
||||
p().recoveredFromPreciseCheckpoint = true;
|
||||
|
||||
// This should be called after checking out the session without refresh, which already sets
|
||||
// isValid.
|
||||
invariant(o().isValid);
|
||||
@ -2184,9 +2186,6 @@ TransactionOperations* TransactionParticipant::Participant::retrieveCompletedTra
|
||||
invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared),
|
||||
str::stream() << "Current state: " << o().txnState);
|
||||
|
||||
// Prepared transactions recovered from a precise checkpoint do not have transactionOperations.
|
||||
invariant(!p().recoveredFromPreciseCheckpoint);
|
||||
|
||||
return &(p().transactionOperations);
|
||||
}
|
||||
|
||||
|
||||
@ -1365,8 +1365,6 @@ private:
|
||||
// transaction.
|
||||
bool needToWriteAbortEntry{false};
|
||||
|
||||
// Set to true if the transaction was recovered from a precise checkpoint.
|
||||
bool recoveredFromPreciseCheckpoint{false};
|
||||
} _p;
|
||||
}; // class TransactionParticipant
|
||||
|
||||
|
||||
@ -7977,74 +7977,11 @@ TEST_F(TxnParticipantTest, CanAddPreciseCheckpointFieldsForRecoveryNoNamespaces)
|
||||
ASSERT_EQ(namespaces.size(), 0);
|
||||
}
|
||||
|
||||
class TxnParticipantStartupRecoveryTest : public TxnParticipantTest {
|
||||
protected:
|
||||
void setUp() {
|
||||
TxnParticipantTest::setUp();
|
||||
|
||||
//
|
||||
// Generate a config.transactions entry for a prepared transaction and snapshot its initial
|
||||
// TransactionParticipant state.
|
||||
//
|
||||
runFunctionFromDifferentOpCtx([this, sessionId = _sessionId, txnNumber = _txnNumber](
|
||||
OperationContext* opCtx) {
|
||||
const std::vector<NamespaceString> kNamespaces = {
|
||||
NamespaceString::createNamespaceString_forTest("TestDB1", "TestColl1"),
|
||||
NamespaceString::createNamespaceString_forTest("TestDB1", "TestColl2"),
|
||||
NamespaceString::createNamespaceString_forTest("TestDB2", "TestColl1")};
|
||||
|
||||
std::vector<UUID> uuids;
|
||||
for (const auto& nss : kNamespaces) {
|
||||
AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X);
|
||||
auto db = autoDb.ensureDbExists(opCtx);
|
||||
ASSERT_TRUE(db);
|
||||
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
CollectionOptions options;
|
||||
auto collection = db->createCollection(opCtx, nss, options);
|
||||
wuow.commit();
|
||||
uuids.push_back(collection->uuid());
|
||||
}
|
||||
|
||||
opCtx->setLogicalSessionId(sessionId);
|
||||
opCtx->setTxnNumber(txnNumber);
|
||||
opCtx->setInMultiDocumentTransaction();
|
||||
|
||||
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx);
|
||||
auto opCtxSession = mongoDSessionCatalog->checkOutSession(opCtx);
|
||||
auto txnParticipant = TransactionParticipant::get(opCtx);
|
||||
txnParticipant.beginOrContinue(opCtx,
|
||||
{*opCtx->getTxnNumber()},
|
||||
false /* autocommit */,
|
||||
TransactionParticipant::TransactionActions::kStart);
|
||||
|
||||
txnParticipant.unstashTransactionResources(opCtx, "insert");
|
||||
for (size_t collIndex = 0; collIndex < kNamespaces.size(); ++collIndex) {
|
||||
auto operation = repl::DurableOplogEntry::makeInsertOperation(
|
||||
kNamespaces[collIndex], uuids[collIndex], BSON("_id" << 0), BSON("_id" << 0));
|
||||
txnParticipant.addTransactionOperation(opCtx, operation);
|
||||
}
|
||||
auto [timestamp, namespaces] = txnParticipant.prepareTransaction(opCtx, {});
|
||||
|
||||
SessionTxnRecord txnRecord;
|
||||
txnRecord.setState(DurableTxnStateEnum::kPrepared);
|
||||
txnRecord.setSessionId(*opCtx->getLogicalSessionId());
|
||||
txnRecord.setTxnNum(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber());
|
||||
txnRecord.setLastWriteOpTime(txnParticipant.getLastWriteOpTime());
|
||||
txnRecord.setLastWriteDate(Date_t::now());
|
||||
txnParticipant.addPreparedTransactionPreciseCheckpointRecoveryFields(txnRecord);
|
||||
this->validatedTxnRecord =
|
||||
std::make_unique<SessionTxnRecordForPrepareRecovery>(std::move(txnRecord));
|
||||
|
||||
this->expectedState.txnNumberAndRetryCounter =
|
||||
txnParticipant.getActiveTxnNumberAndRetryCounter();
|
||||
this->expectedState.lastWriteOpTime = txnParticipant.getLastWriteOpTime();
|
||||
this->expectedState.prepareOpTime = txnParticipant.getPrepareOpTime();
|
||||
});
|
||||
|
||||
// Reset the transaction participant catalog to simulate a process restart.
|
||||
SessionCatalog::get(opCtx()->getServiceContext())->reset_forTest();
|
||||
}
|
||||
TEST_F(TxnParticipantTest, CanRecoverPreparedTxnFromSessionTxnRecord) {
|
||||
//
|
||||
// Generate a config.transactions entry for a prepared transaction and snapshot its initial
|
||||
// TransactionParticipant state.
|
||||
//
|
||||
|
||||
std::unique_ptr<SessionTxnRecordForPrepareRecovery> validatedTxnRecord;
|
||||
struct TransactionParticipantStateSnapshot {
|
||||
@ -8052,9 +7989,67 @@ protected:
|
||||
repl::OpTime lastWriteOpTime;
|
||||
repl::OpTime prepareOpTime;
|
||||
} expectedState;
|
||||
};
|
||||
|
||||
TEST_F(TxnParticipantStartupRecoveryTest, CanRecoverPreparedTxnFromSessionTxnRecord) {
|
||||
runFunctionFromDifferentOpCtx([&validatedTxnRecord,
|
||||
&expectedState,
|
||||
sessionId = _sessionId,
|
||||
txnNumber = _txnNumber](OperationContext* opCtx) {
|
||||
const std::vector<NamespaceString> kNamespaces = {
|
||||
NamespaceString::createNamespaceString_forTest("TestDB1", "TestColl1"),
|
||||
NamespaceString::createNamespaceString_forTest("TestDB1", "TestColl2"),
|
||||
NamespaceString::createNamespaceString_forTest("TestDB2", "TestColl1")};
|
||||
|
||||
std::vector<UUID> uuids;
|
||||
for (const auto& nss : kNamespaces) {
|
||||
AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X);
|
||||
auto db = autoDb.ensureDbExists(opCtx);
|
||||
ASSERT_TRUE(db);
|
||||
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
CollectionOptions options;
|
||||
auto collection = db->createCollection(opCtx, nss, options);
|
||||
wuow.commit();
|
||||
uuids.push_back(collection->uuid());
|
||||
}
|
||||
|
||||
opCtx->setLogicalSessionId(sessionId);
|
||||
opCtx->setTxnNumber(txnNumber);
|
||||
opCtx->setInMultiDocumentTransaction();
|
||||
|
||||
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx);
|
||||
auto opCtxSession = mongoDSessionCatalog->checkOutSession(opCtx);
|
||||
auto txnParticipant = TransactionParticipant::get(opCtx);
|
||||
txnParticipant.beginOrContinue(opCtx,
|
||||
{*opCtx->getTxnNumber()},
|
||||
false /* autocommit */,
|
||||
TransactionParticipant::TransactionActions::kStart);
|
||||
|
||||
txnParticipant.unstashTransactionResources(opCtx, "insert");
|
||||
for (size_t collIndex = 0; collIndex < kNamespaces.size(); ++collIndex) {
|
||||
auto operation = repl::DurableOplogEntry::makeInsertOperation(
|
||||
kNamespaces[collIndex], uuids[collIndex], BSON("_id" << 0), BSON("_id" << 0));
|
||||
txnParticipant.addTransactionOperation(opCtx, operation);
|
||||
}
|
||||
auto [timestamp, namespaces] = txnParticipant.prepareTransaction(opCtx, {});
|
||||
|
||||
SessionTxnRecord txnRecord;
|
||||
txnRecord.setState(DurableTxnStateEnum::kPrepared);
|
||||
txnRecord.setSessionId(*opCtx->getLogicalSessionId());
|
||||
txnRecord.setTxnNum(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber());
|
||||
txnRecord.setLastWriteOpTime(txnParticipant.getLastWriteOpTime());
|
||||
txnRecord.setLastWriteDate(Date_t::now());
|
||||
txnParticipant.addPreparedTransactionPreciseCheckpointRecoveryFields(txnRecord);
|
||||
validatedTxnRecord =
|
||||
std::make_unique<SessionTxnRecordForPrepareRecovery>(std::move(txnRecord));
|
||||
|
||||
expectedState.txnNumberAndRetryCounter = txnParticipant.getActiveTxnNumberAndRetryCounter();
|
||||
expectedState.lastWriteOpTime = txnParticipant.getLastWriteOpTime();
|
||||
expectedState.prepareOpTime = txnParticipant.getPrepareOpTime();
|
||||
});
|
||||
|
||||
// Reset the transaction participant catalog to simulate a process restart.
|
||||
SessionCatalog::get(opCtx()->getServiceContext())->reset_forTest();
|
||||
|
||||
// Simulate running as a standby.
|
||||
repl::UnreplicatedWritesBlock uwb(opCtx());
|
||||
|
||||
@ -8062,6 +8057,7 @@ TEST_F(TxnParticipantStartupRecoveryTest, CanRecoverPreparedTxnFromSessionTxnRec
|
||||
// Run through the flow to reconstruct a prepared transaction after a restart with precise
|
||||
// checkpoints then verify its state was correctly restored.
|
||||
//
|
||||
|
||||
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx());
|
||||
auto sessionCheckout = mongoDSessionCatalog->checkOutSessionWithoutRefresh(opCtx());
|
||||
auto txnParticipant = TransactionParticipant::get(opCtx());
|
||||
@ -8105,29 +8101,5 @@ TEST_F(TxnParticipantStartupRecoveryTest, CanRecoverPreparedTxnFromSessionTxnRec
|
||||
ASSERT_FALSE(txnParticipant.getTxnResourceStashLockerForTest()->isLocked());
|
||||
}
|
||||
|
||||
using TxnParticipantStartupRecoveryDeathTest = TxnParticipantStartupRecoveryTest;
|
||||
DEATH_TEST_F(TxnParticipantStartupRecoveryDeathTest,
|
||||
CannotRetrieveTransactionOperations,
|
||||
"invariant") {
|
||||
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx());
|
||||
auto sessionCheckout = mongoDSessionCatalog->checkOutSessionWithoutRefresh(opCtx());
|
||||
auto txnParticipant = TransactionParticipant::get(opCtx());
|
||||
|
||||
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
|
||||
|
||||
for (const auto& ns : validatedTxnRecord->getAffectedNamespaces()) {
|
||||
(void)acquireCollection(opCtx(),
|
||||
CollectionAcquisitionRequest(ns,
|
||||
PlacementConcern::kPretendUnsharded,
|
||||
repl::ReadConcernArgs::get(opCtx()),
|
||||
AcquisitionPrerequisites::kWrite),
|
||||
MODE_IX);
|
||||
}
|
||||
|
||||
txnParticipant.restorePreparedTxnFromPreciseCheckpoint(opCtx(), std::move(*validatedTxnRecord));
|
||||
|
||||
txnParticipant.retrieveCompletedTransactionOperations(opCtx());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
|
||||
Loading…
Reference in New Issue
Block a user