SERVER-117320 Make chunk migration support fetching findAndModify pre/post-images from snapshot (#48025)

GitOrigin-RevId: d55eae7473536f80576412c6c4f2bb343e063589
This commit is contained in:
Cheahuychou Mao 2026-02-17 16:11:52 -05:00 committed by MongoDB Bot
parent 0ea819057c
commit 312e78c94a
10 changed files with 395 additions and 154 deletions

View File

@ -0,0 +1,111 @@
/**
* Tests that the oplog history for a retryable findAndModify can be fetched and migrated correctly
* during chunk migration critical section.
*
* 1. Pause migration before entering the critical section.
* 2. Pause the session oplog fetching by pausing the _getNextSessionMods command on the donor.
* 3. Execute a findAndModify as a retryable write.
* 4. Unpause migration. Wait for it to enter the critical section.
* 5. Resume the session oplog fetching.
* 6. Wait for migration to complete.
* 7. Retry the findAndModify and verify the response is identical and the write was not
* re-executed.
*
* @tags: [
* requires_fcv_83
* ]
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const st = new ShardingTest({shards: 2});
const dbName = "testDb";
const collName = "testColl";
const ns = dbName + "." + collName;
const testDB = st.s.getDB(dbName);
const testColl = testDB.getCollection(collName);
// Create a sharded collection with shard0 as the primary shard.
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
assert.commandWorked(testColl.insert({_id: 1, x: 1, counter: 0}));
const donorPrimary = st.rs0.getPrimary();
const pauseSessionFetcherFp = configureFailPoint(donorPrimary, "pauseChunkMigrationSessionOplogFetching");
const hangBeforeCriticalSectionFp = configureFailPoint(donorPrimary, "hangBeforeEnteringCriticalSection");
jsTest.log("Starting moveChunk");
const moveChunkThread = new Thread(
(mongosHost, ns, toShard) => {
const mongos = new Mongo(mongosHost);
return mongos.adminCommand({moveChunk: ns, find: {x: 1}, to: toShard});
},
st.s.host,
ns,
st.shard1.shardName,
);
moveChunkThread.start();
// Perform an insert so there is an oplog entry for the donor to fetch and send to the recipient.
assert.commandWorked(testDB.runCommand({insert: collName, documents: [{_id: 2, x: 1, counter: 0}]}));
jsTest.log("Waiting for the session oplog fetcher on the donor to pause");
pauseSessionFetcherFp.wait();
jsTest.log("Waiting for migration to pause before entering critical section");
hangBeforeCriticalSectionFp.wait();
jsTest.log("Performing findAndModify as a retryable write");
const lsid = {id: UUID()};
const txnNumber = NumberLong(1);
const findAndModifyCmd = {
findAndModify: collName,
query: {_id: 1},
update: {$inc: {counter: 1}},
new: true,
lsid: lsid,
txnNumber: txnNumber,
};
const initialRes = assert.commandWorked(testDB.runCommand(findAndModifyCmd));
jsTest.log("Initial findAndModify response: " + tojson(initialRes));
assert.eq(initialRes.lastErrorObject.n, 1, initialRes);
assert.eq(initialRes.lastErrorObject.updatedExisting, true, initialRes);
assert.eq(initialRes.value.counter, 1, initialRes);
assert.eq(initialRes.value._id, 1, initialRes);
const cacheColl = donorPrimary.getCollection("config.cache.collections");
const collDocBefore = cacheColl.findOne({_id: ns});
const criticalSectionCounterBefore = collDocBefore ? collDocBefore.enterCriticalSectionCounter || 0 : 0;
jsTest.log("Unpause migration and wait for the critical section to start");
hangBeforeCriticalSectionFp.off();
assert.soon(() => {
const collDocAfter = cacheColl.findOne({_id: ns});
const criticalSectionCounterAfter = collDocAfter ? collDocAfter.enterCriticalSectionCounter || 0 : 0;
return criticalSectionCounterAfter > criticalSectionCounterBefore;
}, "Critical section did not start (enterCriticalSectionCounter did not increment)");
jsTest.log("Unpause the session oplog fetching");
pauseSessionFetcherFp.off();
jsTest.log("Waiting for moveChunk to complete");
moveChunkThread.join();
assert.commandWorked(moveChunkThread.returnData());
// Retry the findAndModify. Verify the response is identical and the write was not re-executed.
const retriedRes = assert.commandWorked(testDB.runCommand(findAndModifyCmd));
jsTest.log("Retried findAndModify response: " + tojson(retriedRes));
assert.docEq(retriedRes.lastErrorObject, initialRes.lastErrorObject, {retriedRes, initialRes});
assert.docEq(retriedRes.value, initialRes.value, {retriedRes, initialRes});
// Verify the document was only updated once.
const doc = testColl.findOne({_id: 1});
assert.eq(doc.counter, 1, {doc});
st.stop();

View File

@ -57,6 +57,7 @@
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
#include "mongo/util/uuid.h"
@ -85,6 +86,10 @@ repl::OplogEntry makeOplogEntry(
boost::optional<repl::OpTime> preImageOpTime = boost::none,
boost::optional<repl::OpTime> postImageOpTime = boost::none,
boost::optional<repl::RetryImageEnum> needsRetryImage = boost::none) {
// Set the mock oplog entry's wall clock time to less than the current time so that tests can
// verify later that the forged image noop oplog entry's wall clock time clock time is set to
// the current time instead of the findAndModify oplog entry's wall clock time.
auto wallClockTime = Date_t::now() - Milliseconds(1);
return {
repl::DurableOplogEntry(opTime, // optime
opType, // opType
@ -98,8 +103,8 @@ repl::OplogEntry makeOplogEntry(
o2Field, // o2
sessionInfo, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
stmtIds, // statement ids
wallClockTime,
stmtIds, // statement ids
repl::OpTime(), // optime of previous write within same transaction
preImageOpTime, // pre-image optime
postImageOpTime, // post-image optime
@ -358,19 +363,19 @@ TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIs
const auto uuid = UUID::gen();
// Define a findAndModify/update oplog entry with the 'needsRetryImage' field set.
const auto oplogEntryBson = makeOplogEntry(opTime,
repl::OpTypeEnum::kUpdate,
nss,
uuid,
BSON("$set" << BSON("a" << 1)),
BSON("_id" << 1),
sessionInfo,
{stmtId},
boost::none /* preImageOpTime */,
boost::none /* postImageOpTime */,
imageType)
.getEntry()
.toBSON();
const auto oplogEntry = makeOplogEntry(opTime,
repl::OpTypeEnum::kUpdate,
nss,
uuid,
BSON("$set" << BSON("a" << 1)),
BSON("_id" << 1),
sessionInfo,
{stmtId},
boost::none /* preImageOpTime */,
boost::none /* postImageOpTime */,
imageType)
.getEntry();
const auto oplogEntryBson = oplogEntry.toBSON();
auto mock = exec::agg::MockStage::createForTest(Document(oplogEntryBson), getExpCtx());
imageLookupStage->setSource(mock.get());
@ -394,6 +399,9 @@ TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIs
ASSERT_EQUALS(stmtId, stmtIds.front());
ASSERT_EQUALS(ts - 1, forgedImageEntry.getTimestamp());
ASSERT_EQUALS(1, forgedImageEntry.getTerm().value());
// Check that the wall clock time is set to the current time which should be larger than the
// the findAndModify oplog entry's wall clock time.
ASSERT_LT(oplogEntry.getWallClockTime(), forgedImageEntry.getWallClockTime());
// The next doc should be the doc for the original findAndModify oplog entry with the
// 'needsRetryImage' field removed and 'preImageOpTime'/'postImageOpTime' field appended.
@ -608,16 +616,8 @@ TEST_F(FindAndModifyImageLookupTest,
BSONObjBuilder applyOpsWithoutNeedsRetryImageBuilder;
applyOpsWithoutNeedsRetryImageBuilder.append(
"applyOps", BSON_ARRAY(insertOp.toBSON() << updateOpWithoutNeedsRetryImage.toBSON()));
auto expectedOplogEntryBson = makeOplogEntry(applyOpsOpTime,
repl::OpTypeEnum::kCommand,
{},
oplogEntryUUID,
applyOpsWithoutNeedsRetryImageBuilder.obj(),
sessionInfo,
{})
.getEntry()
.toBSON()
.addFields(BSON(commitTxnTsFieldName << commitTxnTs));
auto expectedOplogEntryBson = oplogEntryBson.addFields(BSON(
repl::OplogEntry::kObjectFieldName << applyOpsWithoutNeedsRetryImageBuilder.obj()));
ASSERT_BSONOBJ_EQ(expectedOplogEntryBson, downConvertedOplogEntryBson);

View File

@ -35,6 +35,7 @@
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/rss/replicated_storage_service.h"
#include "mongo/util/time_support.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@ -154,15 +155,31 @@ boost::optional<repl::OplogEntry> forgeNoopImageOplogEntry(OperationContext* opC
forgedNoop.setTxnNumber(*oplogEntry.getTxnNumber());
forgedNoop.setObject(*image);
forgedNoop.setOpType(repl::OpTypeEnum::kNoop);
forgedNoop.setWallClockTime(oplogEntry.getWallClockTime());
// The wall clock time for migrated oplog entries may not get overwritten on the recipient, and
// currently replication lag is calculated based on the oplog wall clock time. For this reason,
// set the forged noop oplog entry's wall clock time to the current time instead of the
// findAndModify oplog entry's wall clock time.
forgedNoop.setWallClockTime(Date_t::now());
forgedNoop.setNss(oplogEntry.getNss());
forgedNoop.setUuid(oplogEntry.getUuid());
forgedNoop.setStatementIds(oplogEntry.getStatementIds());
// Set the opTime to be the findAndModify timestamp - 1. We guarantee that there will be no
// collisions because we always reserve an extra oplog slot when writing the retryable
// findAndModify entry on the primary.
// The op time for migrated oplog entries do get overwritten on the recipient anyway when they
// get written to the oplog. However, we set the forged noop oplog entry's op time to the
// findAndModify oplog entry's op time - 1 for the following reasons.
// 1. The oplog fetching in resharding uses the oplog timestamp as the _id for the documents in
// the oplog buffer collection and as the resume id. So the op time for each forged noop oplog
// entry must be valid and unique. Therefore:
// - When writing a retryable findAndModify oplog entry on the primary, we reserve an
// additional oplog slot right before it.
// - When forging pre/post-image noop oplog entry, we set its timestamp to that of the
// findAndModify's oplog entry minus 1.
// 2. The oplog fetching in chunk migration does not have such requirements since there is no
// resumability. However, for consistency we still set the timestamp the same way.
forgedNoop.setOpTime(repl::OpTime(oplogEntry.getTimestamp() - 1, *oplogEntry.getTerm()));
return repl::OplogEntry{forgedNoop.toBSON()};
}

View File

@ -59,8 +59,9 @@ MONGO_MOD_PUBLIC boost::optional<BSONObj> fetchPreOrPostImageFromSnapshot(
* collection or by performing a snapshot read against the collection the findAndModify wrote to.
* Returns a forged noop oplog entry containing the image. Returns none if no image is found.
*/
boost::optional<repl::OplogEntry> forgeNoopImageOplogEntry(OperationContext* opCtx,
const repl::OplogEntry& oplogEntry,
FindOneLocallyFunc findOneLocallyFunc);
MONGO_MOD_PUBLIC boost::optional<repl::OplogEntry> forgeNoopImageOplogEntry(
OperationContext* opCtx,
const repl::OplogEntry& oplogEntry,
FindOneLocallyFunc findOneLocallyFunc);
} // namespace mongo

View File

@ -191,16 +191,17 @@ LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingH
_stmts(stmts),
_prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {}
void LogTransactionOperationsForShardingHandler::commit(OperationContext* opCtx,
boost::optional<Timestamp>) noexcept {
void LogTransactionOperationsForShardingHandler::commit(
OperationContext* opCtx, boost::optional<Timestamp> commitTimestamp) noexcept {
std::set<NamespaceString> namespacesTouchedByTransaction;
// Inform the session migration subsystem that a transaction has committed for the given
// namespace.
auto addToSessionMigrationOptimeQueueIfNeeded =
[&namespacesTouchedByTransaction, lsid = _lsid](MigrationChunkClonerSource* const cloner,
const NamespaceString& nss,
const repl::OpTime opTime) {
[&namespacesTouchedByTransaction, lsid = _lsid, commitTimestamp](
MigrationChunkClonerSource* const cloner,
const NamespaceString& nss,
const repl::OpTime opTime) {
if (isInternalSessionForNonRetryableWrite(lsid)) {
// Transactions inside internal sessions for non-retryable writes are not
// retryable so there is no need to transfer the write history to the
@ -208,8 +209,10 @@ void LogTransactionOperationsForShardingHandler::commit(OperationContext* opCtx,
return;
}
if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) {
cloner->_addToSessionMigrationOptimeQueue(
opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
cloner->_addToSessionMigrationQueue(
{opTime,
SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction,
commitTimestamp});
namespacesTouchedByTransaction.emplace(nss);
}
@ -595,11 +598,10 @@ void MigrationChunkClonerSource::onDeleteOp(OperationContext* opCtx,
_decrementOutstandingOperationTrackRequests();
}
void MigrationChunkClonerSource::_addToSessionMigrationOptimeQueue(
const repl::OpTime& opTime,
SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType) {
if (!opTime.isNull()) {
_sessionCatalogSource->notifyNewWriteOpTime(opTime, entryAtOpTimeType);
void MigrationChunkClonerSource::_addToSessionMigrationQueue(
SessionCatalogMigrationSource::OpTimeBundle opTimeBundle) {
if (!opTimeBundle.opTime.isNull()) {
_sessionCatalogSource->notifyNewWriteOpTime(opTimeBundle);
}
}
@ -626,8 +628,8 @@ void MigrationChunkClonerSource::_addToTransferModsQueue(const BSONObj& idObj,
MONGO_UNREACHABLE;
}
_addToSessionMigrationOptimeQueue(
opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
_addToSessionMigrationQueue(
{opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
}
bool MigrationChunkClonerSource::_addedOperationToOutstandingOperationTrackRequests() {
@ -1624,8 +1626,8 @@ void LogRetryableApplyOpsForShardingHandler::commit(OperationContext* opCtx,
auto cloner = MigrationSourceManager::getCurrentCloner(*scopedCss);
if (cloner) {
for (const auto& opTime : _opTimes) {
cloner->_addToSessionMigrationOptimeQueue(
opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
cloner->_addToSessionMigrationQueue(
{opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
}
}
}

View File

@ -623,13 +623,7 @@ private:
* Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as
* part of session migration.
*/
void _addToSessionMigrationOptimeQueue(
const repl::OpTime& opTime,
SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType);
void _addToSessionMigrationOptimeQueueForTransactionCommit(
const repl::OpTime& opTime,
SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType);
void _addToSessionMigrationQueue(SessionCatalogMigrationSource::OpTimeBundle opTimeBundle);
/*
* Appends the relevant document changes to the appropriate internal data structures (known

View File

@ -58,6 +58,7 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/notification.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/str.h"
#include <memory>
@ -77,6 +78,8 @@
namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(pauseChunkMigrationSessionOplogFetching);
/**
* Shortcut class to perform the appropriate checks and acquire the cloner associated with the
* currently active migration. Uses the currently registered migration for this shard and ensures
@ -337,6 +340,8 @@ public:
OperationContext* opCtx,
const MigrationSessionId& migrationSessionId,
BSONArrayBuilder* arrBuilder) {
pauseChunkMigrationSessionOplogFetching.pauseWhileSet(opCtx);
boost::optional<repl::OpTime> opTime;
std::shared_ptr<Notification<bool>> newOplogNotification;

View File

@ -44,6 +44,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer/op_observer.h"
#include "mongo/db/query/find_command.h"
#include "mongo/db/query/write_ops/find_and_modify_image_lookup_util.h"
#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/oplog.h"
@ -53,6 +54,8 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/session_catalog_migration.h"
#include "mongo/db/scoped_read_concern.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session/logical_session_id.h"
#include "mongo/db/session/logical_session_id_gen.h"
@ -73,6 +76,7 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/decorable.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/namespace_string_util.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/str.h"
@ -90,58 +94,58 @@
#include <boost/optional.hpp>
#include <boost/optional/optional.hpp>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(failChunkMigrationFindAndModifyImageLookupFindOneWithSnapshotTooOld);
struct LastTxnSession {
LogicalSessionId sessionId;
TxnNumber txnNumber;
};
boost::optional<repl::OplogEntry> forgeNoopEntryFromImageCollection(
OperationContext* opCtx, const repl::OplogEntry retryableFindAndModifyOplogEntry) {
invariant(retryableFindAndModifyOplogEntry.getNeedsRetryImage());
DBDirectClient client(opCtx);
BSONObj imageObj =
client.findOne(NamespaceString::kConfigImagesNamespace,
BSON("_id" << retryableFindAndModifyOplogEntry.getSessionId()->toBSON()));
if (imageObj.isEmpty()) {
return boost::none;
}
auto image = repl::ImageEntry::parse(imageObj, IDLParserContext("image entry"));
if (image.getTxnNumber() != retryableFindAndModifyOplogEntry.getTxnNumber()) {
// In our snapshot, fetch the current transaction number for a session. If that transaction
// number doesn't match what's found on the image lookup, it implies that the image is not
// the correct version for this oplog entry. We will not forge a noop from it.
return boost::none;
}
repl::MutableOplogEntry forgedNoop;
forgedNoop.setSessionId(image.get_id());
forgedNoop.setTxnNumber(image.getTxnNumber());
forgedNoop.setObject(image.getImage());
forgedNoop.setOpType(repl::OpTypeEnum::kNoop);
// The wallclock and namespace are not directly available on the txn document when
// forging the noop image document.
forgedNoop.setWallClockTime(Date_t::now());
forgedNoop.setNss(retryableFindAndModifyOplogEntry.getNss());
forgedNoop.setUuid(retryableFindAndModifyOplogEntry.getUuid());
// The OpTime is probably the last write time, but the destination will overwrite this
// anyways. Just set an OpTime to satisfy the IDL constraints for calling `toBSON`.
repl::OpTimeBase opTimeBase(Timestamp::min());
opTimeBase.setTerm(-1);
forgedNoop.setOpTimeBase(opTimeBase);
forgedNoop.setStatementIds(retryableFindAndModifyOplogEntry.getStatementIds());
forgedNoop.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp::min(), -1));
return repl::OplogEntry::parse(forgedNoop.toBSON()).getValue();
}
boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx,
repl::OplogEntry* oplog) {
if (oplog->getNeedsRetryImage()) {
auto ret = forgeNoopEntryFromImageCollection(opCtx, *oplog);
auto findOneLocallyFunc = [&](const NamespaceString& nss,
const BSONObj& filter,
const boost::optional<repl::ReadConcernArgs>& readConcern)
-> boost::optional<BSONObj> {
if (MONGO_unlikely(
failChunkMigrationFindAndModifyImageLookupFindOneWithSnapshotTooOld.shouldFail(
[&](const BSONObj& data) {
return data.getStringField("nss") == nss.toString_forTest();
}))) {
tassert(11732001,
"Expected the findOne to have readConcern 'snapshot'",
readConcern->getLevel() ==
repl::ReadConcernLevelEnum::kSnapshotReadConcern);
uasserted(ErrorCodes::SnapshotTooOld,
"Failing findOne during findAndModify image lookup");
}
boost::optional<ScopedReadConcern> scopedReadConcern;
if (readConcern) {
scopedReadConcern.emplace(opCtx, *readConcern);
}
FindCommandRequest findRequest(nss);
findRequest.setFilter(filter);
if (gFeatureFlagAllBinariesSupportRawDataOperations
.isEnabledUseLatestFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
// This must be set for the request to work against a timeseries collection.
findRequest.setRawData(true);
}
DBDirectClient client(opCtx);
auto cursor = client.find(findRequest);
return cursor->more() ? boost::make_optional(cursor->next()) : boost::none;
};
auto ret = forgeNoopImageOplogEntry(opCtx, *oplog, findOneLocallyFunc);
if (ret == boost::none) {
// No pre/post image was found. Defensively strip the `needsRetryImage` value to remove
// any notion this operation was a retryable findAndModify. If the request is retried on
@ -573,6 +577,9 @@ void SessionCatalogMigrationSource::_extractOplogEntriesForRetryableApplyOps(
continue;
}
if (auto commitTimestamp = applyOpsOplogEntry.getCommitTransactionTimestamp()) {
unrolledOplogEntry.setCommitTransactionTimestamp(*commitTimestamp);
}
oplogBuffer->emplace_back(unrolledOplogEntry);
skippedEntryTracker.dismiss();
@ -715,9 +722,7 @@ void SessionCatalogMigrationSource::_tryFetchNextNewWriteOplog(stdx::unique_lock
// The oplog buffer is empty. Peek the next opTime and fetch its oplog entry while not
// holding the mutex. We cannot dequeue the opTime upfront since the the read can fail
// with a WriteConflictException error.
repl::OpTime opTimeToFetch;
EntryAtOpTimeType entryAtOpTimeType;
std::tie(opTimeToFetch, entryAtOpTimeType) = _newWriteOpTimeList.front();
auto [opTimeToFetch, entryAtOpTimeType, commitTimestamp] = _newWriteOpTimeList.front();
lk.unlock();
DBDirectClient client(opCtx);
@ -747,7 +752,8 @@ void SessionCatalogMigrationSource::_tryFetchNextNewWriteOplog(stdx::unique_lock
prevOpTime && !prevOpTime->isNull()) {
// Add the opTime for the previous applyOps oplog entry in the transaction
// to the queue.
_notifyNewWriteOpTime(lk, *prevOpTime, EntryAtOpTimeType::kTransaction);
_notifyNewWriteOpTime(
lk, {*prevOpTime, EntryAtOpTimeType::kTransaction, commitTimestamp});
}
}
};
@ -780,6 +786,11 @@ void SessionCatalogMigrationSource::_tryFetchNextNewWriteOplog(stdx::unique_lock
invariant(nextNewWriteOplog.getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
const auto sessionId = *nextNewWriteOplog.getSessionId();
uassert(11732000,
"Expected a transaction oplog entry to have a commit timestamp",
commitTimestamp);
nextNewWriteOplog.setCommitTransactionTimestamp(*commitTimestamp);
if (isInternalSessionForNonRetryableWrite(sessionId)) {
dassert(0,
str::stream() << "Cannot add op time for a non-retryable "
@ -852,16 +863,13 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
return true;
}
void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime,
EntryAtOpTimeType entryAtOpTimeType) {
void SessionCatalogMigrationSource::notifyNewWriteOpTime(OpTimeBundle opTimeBundle) {
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
_notifyNewWriteOpTime(lk, opTime, entryAtOpTimeType);
_notifyNewWriteOpTime(lk, opTimeBundle);
}
void SessionCatalogMigrationSource::_notifyNewWriteOpTime(WithLock,
repl::OpTime opTime,
EntryAtOpTimeType entryAtOpTimeType) {
_newWriteOpTimeList.emplace_back(opTime, entryAtOpTimeType);
void SessionCatalogMigrationSource::_notifyNewWriteOpTime(WithLock, OpTimeBundle opTimeBundle) {
_newWriteOpTimeList.emplace_back(opTimeBundle);
if (_newOplogNotification) {
_newOplogNotification->set(false);
@ -885,8 +893,11 @@ SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator(
return _record.getState() ? EntryType::kNonRetryableTransaction
: EntryType::kRetryableWrite;
}()) {
_writeHistoryIterator =
std::make_unique<TransactionHistoryIterator>(_record.getLastWriteOpTime());
_writeHistoryIterator = std::make_unique<TransactionHistoryIterator>(
_record.getLastWriteOpTime(),
true /* permitYield */,
_record.getState() ? TransactionHistoryIterator::IncludeCommitTimestamp::kYes
: TransactionHistoryIterator::IncludeCommitTimestamp::kNo);
}
boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::SessionOplogIterator::getNext(

View File

@ -155,10 +155,16 @@ public:
*/
OplogResult getLastFetchedOplog();
struct OpTimeBundle {
repl::OpTime opTime;
EntryAtOpTimeType entryAtOpTimeType;
boost::optional<Timestamp> commitTimestamp = boost::none;
};
/**
* Remembers the oplog timestamp of a new write that just occurred.
*/
void notifyNewWriteOpTime(repl::OpTime opTimestamp, EntryAtOpTimeType entryAtOpTimeType);
void notifyNewWriteOpTime(OpTimeBundle opTimeBundle);
/**
* Returns the rollback ID recorded at the beginning of session migration.
@ -304,9 +310,7 @@ private:
/**
* Same as notifyNewWriteOpTime but must be called while holding the _newOplogMutex.
*/
void _notifyNewWriteOpTime(WithLock,
repl::OpTime opTimestamp,
EntryAtOpTimeType entryAtOpTimeType);
void _notifyNewWriteOpTime(WithLock, OpTimeBundle opTimeBundle);
/*
* Derives retryable write oplog entries from the given retryable internal transaction applyOps
@ -358,7 +362,7 @@ private:
uint64_t _averageSessionDocSize{0};
// Stores oplog opTime of new writes that are coming in.
std::list<std::pair<repl::OpTime, EntryAtOpTimeType>> _newWriteOpTimeList;
std::list<OpTimeBundle> _newWriteOpTimeList;
// Used to store the last fetched and processed oplog entry from _newWriteOpTimeList. This
// enables calling get() multiple times.

View File

@ -58,6 +58,7 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
#include "mongo/util/uuid.h"
@ -90,10 +91,12 @@ const ChunkRange kNestedChunkRange(BSON("x.y" << 0), BSON("x.y" << 100));
const LogicalSessionId kMigrationLsid;
enum class TransactionType { kUnprepared, kPrepared };
enum class PrePostImageLocation { kSideCollection, kOplog, kSnapshot };
struct PrePostImageTestCase {
repl::RetryImageEnum imageType;
boost::optional<TransactionType> txnType;
boost::optional<PrePostImageLocation> imageLocation;
BSONObj toBSON() const {
BSONObjBuilder b;
@ -101,11 +104,26 @@ struct PrePostImageTestCase {
if (txnType) {
b.append("txnType", *txnType == TransactionType::kPrepared ? "prepared" : "unprepared");
}
if (imageLocation) {
b.append("imageLocation", *imageLocation);
}
return b.obj();
}
};
class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {};
class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {
public:
void setUpFeatureFlags(const PrePostImageTestCase& testCase) {
_disallowImageCollectionFeatureFlag.reset();
if (testCase.imageLocation != PrePostImageLocation::kSideCollection) {
_disallowImageCollectionFeatureFlag.emplace(
"featureFlagDisallowFindAndModifyImageCollection", true);
}
}
private:
boost::optional<RAIIServerParameterControllerForTest> _disallowImageCollectionFeatureFlag;
};
/**
* Creates an OplogEntry with given parameters and preset defaults for this test suite.
@ -777,13 +795,17 @@ TEST_F(SessionCatalogMigrationSourceTest, ForgeImageEntriesWhenFetchingEntriesWi
DBDirectClient client(opCtx());
client.insert(NamespaceString::kConfigImagesNamespace, imageEntry.toBSON());
// Set the findAndModify oplog entry's wall clock time to less than the current time so that
// we can verify later that the forged image noop oplog entry's wall clock time is set to
// the current time instead of the findAndModify oplog entry's wall clock time.
auto entryWallClockTime = Date_t::now() - Milliseconds(1);
// Insert an oplog entry with a non-null needsRetryImage field.
auto entry = makeOplogEntry(
repl::OpTime(Timestamp(52, 346), 2), // optime
repl::OpTypeEnum::kDelete, // op type
BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time,
entryWallClockTime, // wall clock time,
sessionId,
txnNumber,
{1}, // statement id
@ -818,6 +840,9 @@ TEST_F(SessionCatalogMigrationSourceTest, ForgeImageEntriesWhenFetchingEntriesWi
for (size_t i = 0; i < entry.getStatementIds().size(); i++) {
ASSERT_EQ(entry.getStatementIds()[i], nextOplogResult.oplog->getStatementIds()[i]);
}
// Check that the wall clock time is set to the current time which should be larger than the
// the findAndModify oplog entry's wall clock time.
ASSERT_LT(entry.getWallClockTime(), nextOplogResult.oplog->getWallClockTime());
// The next oplog entry should be the original entry that generated the image entry.
ASSERT_TRUE(migrationSource.hasMoreOplog());
@ -955,9 +980,9 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(
entry2.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{entry2.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
migrationSource.notifyNewWriteOpTime(
entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
@ -995,8 +1020,8 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(
repl::OpTime(Timestamp(100, 3), 1),
SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{repl::OpTime(Timestamp(100, 3), 1),
SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException);
}
@ -1022,7 +1047,9 @@ TEST_F(SessionCatalogMigrationSourceTest,
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(
entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
{entry.getOpTime(),
SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction,
entry.getOpTime().getTimestamp()});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
@ -1064,7 +1091,9 @@ DEATH_TEST_F(SessionCatalogMigrationSourceTestDeathTest,
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(
entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
{entry.getOpTime(),
SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction,
entry.getOpTime().getTimestamp()});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_EQ(migrationSource.getSessionOplogEntriesToBeMigratedSoFar(), 0);
@ -1150,7 +1179,9 @@ TEST_F(SessionCatalogMigrationSourceTest,
numOps += 2;
migrationSource.notifyNewWriteOpTime(
entry4.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
{entry4.getOpTime(),
SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction,
entry4.getOpTime().getTimestamp()});
const auto expectedSessionId = *getParentSessionId(sessionId);
const auto expectedTxnNumber = *sessionId.getTxnNumber();
@ -1265,7 +1296,9 @@ TEST_F(SessionCatalogMigrationSourceTest,
numOps += 2;
migrationSource.notifyNewWriteOpTime(
entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
{entry3.getOpTime(),
SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction,
entry3.getOpTime().getTimestamp()});
const auto expectedSessionId = *getParentSessionId(sessionId);
const auto expectedTxnNumber = *sessionId.getTxnNumber();
@ -1386,7 +1419,9 @@ TEST_F(SessionCatalogMigrationSourceTest,
numOps += 2;
migrationSource.notifyNewWriteOpTime(
entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
{entry3.getOpTime(),
SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction,
entry3.getOpTime().getTimestamp()});
const auto expectedSessionId = *getParentSessionId(sessionId);
const auto expectedTxnNumber = *sessionId.getTxnNumber();
@ -1445,7 +1480,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(
entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
@ -1472,7 +1507,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(
entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
@ -1498,7 +1533,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(
entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
@ -1760,11 +1795,10 @@ TEST_F(SessionCatalogMigrationSourceTest,
repl::OpTime lastWriteOpTime;
if (isPrepared) {
auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 4), 1);
auto entry4 = makeCommandOplogEntry(commitOpTime,
entry2.getOpTime(),
BSON("commitTransaction" << 1),
sessionId,
txnNumber);
CommitTransactionOplogObject commandObj;
commandObj.setCommitTimestamp(commitOpTime.getTimestamp());
auto entry4 = makeCommandOplogEntry(
commitOpTime, entry2.getOpTime(), commandObj.toBSON(), sessionId, txnNumber);
insertOplogEntry(entry4);
lastWriteOpTime = commitOpTime;
} else {
@ -1886,11 +1920,10 @@ TEST_F(SessionCatalogMigrationSourceTest,
repl::OpTime lastWriteOpTime;
if (isPrepared) {
auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 5), 1);
auto entry3 = makeCommandOplogEntry(commitOpTime,
entry2.getOpTime(),
BSON("commitTransaction" << 1),
sessionId,
txnNumber);
CommitTransactionOplogObject commandObj;
commandObj.setCommitTimestamp(commitOpTime.getTimestamp());
auto entry3 = makeCommandOplogEntry(
commitOpTime, entry2.getOpTime(), commandObj.toBSON(), sessionId, txnNumber);
insertOplogEntry(entry3);
lastWriteOpTime = commitOpTime;
} else {
@ -2133,11 +2166,10 @@ TEST_F(
repl::OpTime lastWriteOpTime;
if (testCase.txnType == TransactionType::kPrepared) {
auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 4), 1);
auto entry3 = makeCommandOplogEntry(commitOpTime,
entry2.getOpTime(),
BSON("commitTransaction" << 1),
sessionId,
txnNumber);
CommitTransactionOplogObject commandObj;
commandObj.setCommitTimestamp(commitOpTime.getTimestamp());
auto entry3 = makeCommandOplogEntry(
commitOpTime, entry2.getOpTime(), commandObj.toBSON(), sessionId, txnNumber);
insertOplogEntry(entry3);
lastWriteOpTime = commitOpTime;
} else {
@ -2200,7 +2232,10 @@ TEST_F(
TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForRetryableWrite) {
std::vector<PrePostImageTestCase> testCases;
for (auto imageType : {repl::RetryImageEnum::kPreImage, repl::RetryImageEnum::kPostImage}) {
testCases.emplace_back(imageType);
for (auto imageLocation :
{PrePostImageLocation::kSideCollection, PrePostImageLocation::kSnapshot}) {
testCases.push_back({.imageType = imageType, .imageLocation = imageLocation});
}
}
for (const auto& testCase : testCases) {
@ -2208,6 +2243,18 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForRetryableWrite)
"Running case",
"test"_attr = unittest::getTestName(),
"testCase"_attr = testCase.toBSON());
setUpFeatureFlags(testCase);
boost::optional<FailPoint*> fp;
boost::optional<int> timesEnteredBefore;
if (testCase.imageLocation == PrePostImageLocation::kSnapshot) {
// The image should be fetched from the snapshot rather than the image collection. Force
// the snapshot read to fail with SnapshotTooOld.
fp = globalFailPointRegistry().find(
"failChunkMigrationFindAndModifyImageLookupFindOneWithSnapshotTooOld");
timesEnteredBefore =
fp.get()->setMode(FailPoint::alwaysOn, 0, BSON("nss" << kNs.toString_forTest()));
}
const auto sessionId = makeLogicalSessionIdForTest();
const auto txnNumber = TxnNumber{1};
@ -2257,6 +2304,11 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForRetryableWrite)
ASSERT_EQ(migrationSource.getSessionOplogEntriesToBeMigratedSoFar(), 1);
ASSERT_EQ(migrationSource.getSessionOplogEntriesSkippedSoFarLowerBound(), 0);
if (fp) {
auto timesEnteredAfter = fp.get()->setMode(FailPoint::off);
ASSERT_EQ(timesEnteredAfter, *timesEnteredBefore + 1);
}
client.remove(NamespaceString::kSessionTransactionsTableNamespace, sessionRecord.toBSON());
}
}
@ -2264,7 +2316,10 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForRetryableWrite)
TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForNewCommittedInternalTransaction) {
std::vector<PrePostImageTestCase> testCases;
for (auto imageType : {repl::RetryImageEnum::kPreImage, repl::RetryImageEnum::kPostImage}) {
testCases.emplace_back(imageType);
for (auto imageLocation :
{PrePostImageLocation::kSideCollection, PrePostImageLocation::kSnapshot}) {
testCases.push_back({.imageType = imageType, .imageLocation = imageLocation});
}
}
auto opTimeSecs = 250;
@ -2273,6 +2328,18 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForNewCommittedInt
"Running case",
"test"_attr = unittest::getTestName(),
"testCase"_attr = testCase.toBSON());
setUpFeatureFlags(testCase);
boost::optional<FailPoint*> fp;
boost::optional<int> timesEnteredBefore;
if (testCase.imageLocation == PrePostImageLocation::kSnapshot) {
// The image should be fetched from the snapshot rather than the image collection. Force
// the snapshot read to fail with SnapshotTooOld.
fp = globalFailPointRegistry().find(
"failChunkMigrationFindAndModifyImageLookupFindOneWithSnapshotTooOld");
timesEnteredBefore =
fp.get()->setMode(FailPoint::alwaysOn, 0, BSON("nss" << kNs.toString_forTest()));
}
const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
const auto txnNumber = TxnNumber{1};
@ -2331,7 +2398,9 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForNewCommittedInt
numOps += 2;
migrationSource.notifyNewWriteOpTime(
entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
{entry3.getOpTime(),
SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction,
entry3.getOpTime().getTimestamp()});
const auto expectedSessionId = *getParentSessionId(sessionId);
const auto expectedTxnNumber = *sessionId.getTxnNumber();
@ -2359,6 +2428,11 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForNewCommittedInt
ASSERT_EQ(migrationSource.getSessionOplogEntriesSkippedSoFarLowerBound(),
numOps - expectedOps.size());
if (fp) {
auto timesEnteredAfter = fp.get()->setMode(FailPoint::off);
ASSERT_EQ(timesEnteredAfter, *timesEnteredBefore + 1);
}
opTimeSecs++;
}
}
@ -2368,8 +2442,9 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForCommittedIntern
std::vector<PrePostImageTestCase> testCases;
for (auto imageType : {repl::RetryImageEnum::kPreImage, repl::RetryImageEnum::kPostImage}) {
for (auto txnType : {TransactionType::kUnprepared, TransactionType::kPrepared}) {
testCases.emplace_back(imageType, txnType);
for (auto imageLocation :
{PrePostImageLocation::kSideCollection, PrePostImageLocation::kSnapshot}) {
testCases.emplace_back(imageType, TransactionType::kUnprepared, imageLocation);
}
}
@ -2378,6 +2453,18 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForCommittedIntern
"Running case",
"test"_attr = unittest::getTestName(),
"testCase"_attr = testCase.toBSON());
setUpFeatureFlags(testCase);
boost::optional<FailPoint*> fp;
boost::optional<int> timesEnteredBefore;
if (testCase.imageLocation == PrePostImageLocation::kSnapshot) {
// The image should be fetched from the snapshot rather than the image collection. Force
// the snapshot read to fail with SnapshotTooOld.
fp = globalFailPointRegistry().find(
"failChunkMigrationFindAndModifyImageLookupFindOneWithSnapshotTooOld");
timesEnteredBefore =
fp.get()->setMode(FailPoint::alwaysOn, 0, BSON("nss" << kNs.toString_forTest()));
}
const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
const auto txnNumber = TxnNumber{1};
@ -2469,6 +2556,11 @@ TEST_F(SessionCatalogMigrationSourceTest, MissingImageDocumentForCommittedIntern
ASSERT_EQ(migrationSource.getSessionOplogEntriesToBeMigratedSoFar(), expectedOps.size());
ASSERT_EQ(migrationSource.getSessionOplogEntriesSkippedSoFarLowerBound(), 0);
if (fp) {
auto timesEnteredAfter = fp.get()->setMode(FailPoint::off);
ASSERT_EQ(timesEnteredAfter, *timesEnteredBefore + 1);
}
opTimeSecs++;
client.remove(NamespaceString::kSessionTransactionsTableNamespace, txnRecord.toBSON());
}
@ -2633,7 +2725,9 @@ TEST_F(SessionCatalogMigrationSourceTest,
numOps += 2;
migrationSource.notifyNewWriteOpTime(
entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
{entry3.getOpTime(),
SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction,
entry3.getOpTime().getTimestamp()});
const auto expectedSessionId = *getParentSessionId(sessionId);
const auto expectedTxnNumber = *sessionId.getTxnNumber();
@ -2952,8 +3046,9 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreAbortedTransaction) {
insertOplogEntry(entry1);
auto abortOpTime = repl::OpTime(Timestamp(opTimeSecs, 2), 1);
AbortTransactionOplogObject commandObj;
auto entry2 = makeCommandOplogEntry(
abortOpTime, applyOpsOpTime, BSON("abortTransaction" << 1), sessionId, txnNumber);
abortOpTime, applyOpsOpTime, commandObj.toBSON(), sessionId, txnNumber);
insertOplogEntry(entry2);
SessionTxnRecord txnRecord;
@ -3457,7 +3552,7 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithCommittedWrit
// Test inCatchupPhase() and untransferredCatchUpDataSize() with new writes.
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(
entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.inCatchupPhase());
@ -3465,7 +3560,7 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithCommittedWrit
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(
entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.inCatchupPhase());
@ -3500,7 +3595,7 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithNoCommittedWr
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(
entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.inCatchupPhase());
@ -3861,7 +3956,8 @@ TEST_F(SessionCatalogMigrationSourceTest, DiscardOplogEntryForNonRetryableWrite)
insertOplogEntry(updateOplog);
migrationSource.notifyNewWriteOpTime(
updateOplog.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
{updateOplog.getOpTime(),
SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite});
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));