SERVER-117319 Make resharding support fetching findAndModify pre/post-images from snapshot (#46626)

GitOrigin-RevId: 5def981d4bcfa12b28b2952692e60bfe04177f10
This commit is contained in:
Cheahuychou Mao 2026-02-12 13:07:32 -05:00 committed by MongoDB Bot
parent 4cecc4d765
commit 180a8e2623
12 changed files with 622 additions and 292 deletions

View File

@ -0,0 +1,110 @@
/**
* Tests that the oplog history for a retryable findAndModify can be fetched and migrated correctly
* during resharding critical section.
*
* 1. Pause resharding before entering the critical section.
* 2. Pause the oplog fetcher on the recipient.
* 3. Execute a findAndModify as a retryable write.
* 4. Unpause resharding. Wait for it to enter the critical section.
* 5. Resume the oplog fetcher.
* 6. Wait for resharding to complete.
* 7. Retry the findAndModify and verify the response is identical and the write was not
* re-executed.
*
* @tags: [
* requires_fcv_83
* ]
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const st = new ShardingTest({shards: 2});
const dbName = "testDb";
const collName = "testColl";
const ns = dbName + "." + collName;
const testDB = st.s.getDB(dbName);
const testColl = testDB.getCollection(collName);
// Make shard0 the primary shard for the test database.
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
// Create the test collection and insert a document.
assert.commandWorked(testColl.insert({_id: 1, x: 1, counter: 0}));
const configPrimary = st.configRS.getPrimary();
const donorPrimary = st.rs0.getPrimary();
const recipientPrimary = st.rs1.getPrimary();
const pauseBeforeStartingCriticalSectionFp = configureFailPoint(
configPrimary,
"reshardingPauseCoordinatorBeforeBlockingWrites",
);
jsTest.log("Starting moveCollection");
const moveCollThread = new Thread(
(mongosHost, ns, toShard) => {
const mongos = new Mongo(mongosHost);
return mongos.adminCommand({moveCollection: ns, toShard: toShard});
},
st.s.host,
ns,
st.shard1.shardName,
);
moveCollThread.start();
jsTest.log("Waiting for resharding to pause before blocking writes");
pauseBeforeStartingCriticalSectionFp.wait();
jsTest.log("Waiting for the oplog fetcher on the recipient to pause after fetching some oplog entries");
const pauseOplogFetcherFp = configureFailPoint(recipientPrimary, "pauseReshardingOplogFetcherAfterConsuming");
// Perform an insert so there is an oplog entry for the recipient to fetch.
assert.commandWorked(testDB.runCommand({insert: collName, documents: [{_id: 2, x: 2, counter: 0}]}));
pauseOplogFetcherFp.wait();
jsTest.log("Performing findAndModify as a retryable write");
const lsid = {id: UUID()};
const txnNumber = NumberLong(1);
const findAndModifyCmd = {
findAndModify: collName,
query: {_id: 1},
update: {$inc: {counter: 1}},
new: true,
lsid: lsid,
txnNumber: txnNumber,
};
const initialRes = assert.commandWorked(testDB.runCommand(findAndModifyCmd));
jsTest.log("Initial findAndModify response: " + tojson(initialRes));
assert.eq(initialRes.lastErrorObject.n, 1, initialRes);
assert.eq(initialRes.lastErrorObject.updatedExisting, true, initialRes);
assert.eq(initialRes.value.counter, 1, initialRes);
assert.eq(initialRes.value._id, 1, initialRes);
jsTest.log("Unpause resharding and wait for the critical section to start");
pauseBeforeStartingCriticalSectionFp.off();
assert.soon(() => {
const docs = donorPrimary.getCollection("config.collection_critical_sections").find({_id: ns}).toArray();
return docs.length > 0;
}, "Critical section did not start");
jsTest.log("Unpause the oplog fetcher on the recipient");
pauseOplogFetcherFp.off();
jsTest.log("Waiting for moveCollection to complete");
moveCollThread.join();
assert.commandWorked(moveCollThread.returnData());
// Retry the findAndModify. Verify the response is identical and the write was not re-executed.
const retriedRes = assert.commandWorked(testDB.runCommand(findAndModifyCmd));
jsTest.log("Retried findAndModify response: " + tojson(retriedRes));
assert.docEq(retriedRes.lastErrorObject, initialRes.lastErrorObject, {retriedRes, initialRes});
assert.docEq(retriedRes.value, initialRes.value, {retriedRes, initialRes});
const doc = testColl.findOne({_id: 1});
assert.eq(doc.counter, 1, {doc});
st.stop();

View File

@ -42,17 +42,24 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_find_and_modify_image_lookup.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
#include "mongo/db/query/write_ops/find_and_modify_image_lookup_util.h"
#include "mongo/db/read_concern.h"
#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/rss/replicated_storage_service.h"
#include "mongo/db/scoped_read_concern.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/session/logical_session_id_gen.h"
#include "mongo/db/session/logical_session_id_helpers.h"
#include "mongo/db/shard_role/shard_catalog/raw_data_operation.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/str.h"
#include "mongo/util/time_support.h"
@ -73,79 +80,56 @@ namespace {
using OplogEntry = repl::OplogEntryBase;
MONGO_FAIL_POINT_DEFINE(failFindAndModifyImageLookupStageFindOneWithSnapshotTooOld);
/**
* Fetches the pre- or post-image entry for the given 'findAndModify' oplog entry or for the given
* inner op in the given 'applyOps' oplog entry from the findAndModify image collection, and returns
* a forged noop oplog entry containing the image. Returns none if no matching image entry is not
* found.
* Perform a local findOne using the given read concern to find the document that matches in the
* given filter in the given collection. Returns none if no document is found.
*/
boost::optional<repl::OplogEntry> forgeNoopImageOplogEntry(
const boost::intrusive_ptr<ExpressionContext> pExpCtx,
const repl::OplogEntry& oplogEntry,
boost::optional<repl::DurableReplOperation> innerOp = boost::none) {
invariant(!innerOp ||
(oplogEntry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps));
const auto sessionId = *oplogEntry.getSessionId();
auto localImageCollInfo = pExpCtx->getMongoProcessInterface()->getCollectionOptions(
pExpCtx->getOperationContext(), NamespaceString::kConfigImagesNamespace);
// Extract the UUID from the collection information. We should always have a valid uuid here.
auto imageCollUUID = invariantStatusOK(UUID::parse(localImageCollInfo["uuid"]));
const auto& readConcernBson =
repl::ReadConcernArgs::get(pExpCtx->getOperationContext()).toBSON();
auto imageDoc = pExpCtx->getMongoProcessInterface()->lookupSingleDocument(
pExpCtx,
NamespaceString::kConfigImagesNamespace,
imageCollUUID,
Document{BSON("_id" << sessionId.toBSON())},
readConcernBson);
if (!imageDoc) {
// If no image document with the corresponding 'sessionId' is found, we skip forging the
// no-op and rely on the retryable write mechanism to catch that no pre- or post- image
// exists.
LOGV2_DEBUG(580602,
2,
"Not forging no-op image oplog entry because no image document found with "
"sessionId",
"sessionId"_attr = sessionId);
return boost::none;
boost::optional<BSONObj> findOneLocally(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
const NamespaceString& nss,
const BSONObj& filter,
const boost::optional<repl::ReadConcernArgs>& readConcern) {
if (MONGO_unlikely(failFindAndModifyImageLookupStageFindOneWithSnapshotTooOld.shouldFail(
[&](const BSONObj& data) {
return data.getStringField("nss") == nss.toString_forTest();
}))) {
tassert(11731903,
"Expected the findOne to have readConcern 'snapshot'",
readConcern->getLevel() == repl::ReadConcernLevelEnum::kSnapshotReadConcern);
uasserted(ErrorCodes::SnapshotTooOld, "Failing findOne during findAndModify image lookup");
}
auto image = repl::ImageEntry::parse(imageDoc->toBson(), IDLParserContext("image entry"));
if (image.getTxnNumber() != oplogEntry.getTxnNumber()) {
// In our snapshot, fetch the current transaction number for a session. If that
// transaction number doesn't match what's found on the image lookup, it implies that
// the image is not the correct version for this oplog entry. We will not forge a noop
// from it.
LOGV2_DEBUG(
580603,
2,
"Not forging no-op image oplog entry because image document has a different txnNum",
"sessionId"_attr = oplogEntry.getSessionId(),
"expectedTxnNum"_attr = oplogEntry.getTxnNumber(),
"actualTxnNum"_attr = image.getTxnNumber());
return boost::none;
const bool isRawData = isRawDataOperation(pExpCtx->getOperationContext());
ON_BLOCK_EXIT([&] { isRawDataOperation(pExpCtx->getOperationContext()) = isRawData; });
if (gFeatureFlagAllBinariesSupportRawDataOperations.isEnabledUseLatestFCVWhenUninitialized(
VersionContext::getDecoration(pExpCtx->getOperationContext()),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
// This must be set for the request to work against a timeseries collection.
isRawDataOperation(pExpCtx->getOperationContext()) = true;
}
// Forge a no-op image entry to be returned.
repl::MutableOplogEntry forgedNoop;
forgedNoop.setSessionId(sessionId);
forgedNoop.setTxnNumber(*oplogEntry.getTxnNumber());
forgedNoop.setObject(image.getImage());
forgedNoop.setOpType(repl::OpTypeEnum::kNoop);
forgedNoop.setWallClockTime(oplogEntry.getWallClockTime());
forgedNoop.setNss(innerOp ? innerOp->getNss() : oplogEntry.getNss());
forgedNoop.setUuid(innerOp ? innerOp->getUuid() : *oplogEntry.getUuid());
forgedNoop.setStatementIds(innerOp ? innerOp->getStatementIds() : oplogEntry.getStatementIds());
boost::optional<ScopedReadConcern> scopedReadConcern;
if (readConcern) {
scopedReadConcern.emplace(pExpCtx->getOperationContext(), *readConcern);
// Set the opTime to be the findAndModify timestamp - 1. We guarantee that there will be no
// collisions because we always reserve an extra oplog slot when writing the retryable
// findAndModify entry on the primary.
forgedNoop.setOpTime(repl::OpTime(oplogEntry.getTimestamp() - 1, *oplogEntry.getTerm()));
return repl::OplogEntry{forgedNoop.toBSON()};
auto status = mongo::waitForReadConcern(pExpCtx->getOperationContext(),
*readConcern,
nss.dbName(),
true /* allowAfterClusterTime */);
if (!status.isOK()) {
LOGV2_WARNING(11731902,
"Failed to wait for read concern before doing a local find",
"nss"_attr = nss,
"readConcern"_attr = readConcern,
"status"_attr = status);
return boost::none;
}
}
auto doc = pExpCtx->getMongoProcessInterface()->lookupSingleDocumentLocally(
pExpCtx, nss, Document{filter});
return doc ? boost::make_optional(doc->toBson()) : boost::none;
}
} // namespace
@ -227,12 +211,19 @@ Document FindAndModifyImageLookupStage::downConvertIfNeedsRetryImage(Document in
return inputDoc;
}
auto findOneLocallyFunc = [&](const NamespaceString& nss,
const BSONObj& filter,
const boost::optional<repl::ReadConcernArgs>& readConcern) {
return findOneLocally(pExpCtx, nss, filter, readConcern);
};
if (inputOplogEntry.isCrudOpType() && inputOplogEntry.getNeedsRetryImage()) {
// Strip the needsRetryImage field if set.
MutableDocument downConvertedDoc{inputDoc};
downConvertedDoc.remove(repl::OplogEntryBase::kNeedsRetryImageFieldName);
if (const auto forgedNoopOplogEntry = forgeNoopImageOplogEntry(pExpCtx, inputOplogEntry)) {
if (const auto forgedNoopOplogEntry = forgeNoopImageOplogEntry(
pExpCtx->getOperationContext(), inputOplogEntry, findOneLocallyFunc)) {
const auto imageType = inputOplogEntry.getNeedsRetryImage();
const auto imageOpTime = forgedNoopOplogEntry->getOpTime();
downConvertedDoc.setField(
@ -268,8 +259,42 @@ Document FindAndModifyImageLookupStage::downConvertIfNeedsRetryImage(Document in
continue;
}
const auto forgedNoopOplogEntry =
forgeNoopImageOplogEntry(pExpCtx, inputOplogEntry, op);
auto mutableOp = uassertStatusOK(
repl::MutableOplogEntry::parse(inputOplogEntry.getEntry().toBSON()));
mutableOp.setMultiOpType(boost::none);
mutableOp.setDurableReplOperation(op);
auto findAndModifyOplogEntry = repl::OplogEntry(mutableOp.toBSON());
const auto forgedNoopOplogEntry = [&]() -> boost::optional<repl::OplogEntry> {
auto& rss = rss::ReplicatedStorageService::get(pExpCtx->getOperationContext());
if (!rss.getPersistenceProvider().supportsFindAndModifyImageCollection()) {
// The commitTimestamp is only needed when fetching the image from the snapshot.
// Despite the name, 'commitTxnTs' is actually the timestamp of the last
// oplog entry in the oplog chain for the transaction.
// - For unprepared transaction, it is the timestamp for the terminal oplog
// entry which is also the commit timestamp for the transaction.
// - For prepared transaction, it is the timestamp for the commitTransaction
// oplog entry which is not the same as the commit timestamp for the
// transaction.
auto lastOplogEntryDoc = findOneLocally(pExpCtx,
NamespaceString::kRsOplogNamespace,
BSON("ts" << *commitTxnTs),
boost::none /* readConcern */);
if (!lastOplogEntryDoc) {
// The commit oplog entry is no longer available.
return boost::none;
}
auto lastOplogEntry =
uassertStatusOK(repl::OplogEntry::parse(*lastOplogEntryDoc));
auto commitTimestamp =
uassertStatusOK(lastOplogEntry.extractCommitTransactionTimestamp());
findAndModifyOplogEntry.setCommitTransactionTimestamp(commitTimestamp);
}
return forgeNoopImageOplogEntry(
pExpCtx->getOperationContext(), findAndModifyOplogEntry, findOneLocallyFunc);
}();
// Downcovert the document for this applyOps oplog entry by downcoverting this
// operation.

View File

@ -539,6 +539,7 @@ mongo_cc_library(
"//src/mongo/db:mongohasher",
"//src/mongo/db:query_expressions",
"//src/mongo/db:query_matcher",
"//src/mongo/db:rw_concern_d",
"//src/mongo/db:service_context",
"//src/mongo/db:shard_filterer",
"//src/mongo/db:shard_role_api",
@ -578,6 +579,7 @@ mongo_cc_library(
"//src/mongo/db/query/stage_memory_limit_knobs",
"//src/mongo/db/query/util:query_string_util",
"//src/mongo/db/query/util:rank_fusion_util",
"//src/mongo/db/query/write_ops:find_and_modify_image_lookup_util",
"//src/mongo/db/repl:apply_ops_command_info",
"//src/mongo/db/repl:image_collection_entry",
"//src/mongo/db/repl:oplog_entry",
@ -1454,6 +1456,7 @@ mongo_cc_unit_test(
"//src/mongo/db/query/search:mongot_cursor",
"//src/mongo/db/query/search:mongot_options",
"//src/mongo/db/repl:image_collection_entry",
"//src/mongo/db/repl:mock_repl_coord_server_fixture",
"//src/mongo/db/repl:replmocks",
"//src/mongo/db/repl:storage_interface_impl",
"//src/mongo/db/sharding_environment:shard_server_test_fixture",

View File

@ -33,6 +33,8 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/agg/document_source_to_stage_registry.h"
#include "mongo/db/exec/agg/mock_stage.h"
#include "mongo/db/exec/document_value/document.h"
@ -42,6 +44,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h"
#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
@ -51,6 +54,7 @@
#include "mongo/db/session/logical_session_id.h"
#include "mongo/db/session/logical_session_id_gen.h"
#include "mongo/db/version_context.h"
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/time_support.h"
@ -75,6 +79,7 @@ repl::OplogEntry makeOplogEntry(
NamespaceString nss,
UUID uuid,
BSONObj oField,
boost::optional<BSONObj> o2Field,
OperationSessionInfo sessionInfo,
std::vector<StmtId> stmtIds,
boost::optional<repl::OpTime> preImageOpTime = boost::none,
@ -90,12 +95,12 @@ repl::OplogEntry makeOplogEntry(
boost::none, // versionContext
repl::OplogEntry::kOplogVersion, // version
oField, // o
boost::none, // o2
o2Field, // o2
sessionInfo, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
stmtIds, // statement ids
boost::none, // optime of previous write within same transaction
repl::OpTime(), // optime of previous write within same transaction
preImageOpTime, // pre-image optime
postImageOpTime, // post-image optime
boost::none, // ShardId of resharding recipient
@ -103,21 +108,41 @@ repl::OplogEntry makeOplogEntry(
needsRetryImage)}; // needsRetryImage
}
/**
* Creates OplogEntry with given field values.
*/
repl::OplogEntry makeOplogEntry(
repl::OpTime opTime,
repl::OpTypeEnum opType,
NamespaceString nss,
UUID uuid,
BSONObj oField,
OperationSessionInfo sessionInfo,
std::vector<StmtId> stmtIds,
boost::optional<repl::OpTime> preImageOpTime = boost::none,
boost::optional<repl::OpTime> postImageOpTime = boost::none,
boost::optional<repl::RetryImageEnum> needsRetryImage = boost::none) {
return makeOplogEntry(opTime,
opType,
nss,
uuid,
oField,
boost::none /* o2Field */,
sessionInfo,
stmtIds,
preImageOpTime,
postImageOpTime,
needsRetryImage);
}
struct MockMongoInterface final : public StubMongoProcessInterface {
MockMongoInterface(std::vector<Document> documentsForLookup = {})
: _documentsForLookup{std::move(documentsForLookup)} {}
BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) override {
static const UUID* oplog_uuid = new UUID(UUID::gen());
return BSON("uuid" << *oplog_uuid);
}
boost::optional<Document> lookupSingleDocument(
boost::optional<Document> lookupSingleDocumentLocally(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
boost::optional<UUID> collectionUUID,
const Document& documentKey,
boost::optional<BSONObj> readConcern) final {
const Document& documentKey) final {
Matcher matcher(documentKey.toBson(), expCtx);
auto it =
std::find_if(_documentsForLookup.begin(),
@ -133,7 +158,30 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
};
// This provides access to getExpCtx(), but we'll use a different name for this test suite.
using FindAndModifyImageLookupTest = AggregationContextFixture;
class FindAndModifyImageLookupTest : public AggregationContextFixture {
public:
FindAndModifyImageLookupTest() : AggregationContextFixture() {}
void mockImageDocument(const LogicalSessionId sessionId,
TxnNumber txnNum,
Timestamp ts,
repl::RetryImageEnum imageType,
BSONObj image) {
repl::ImageEntry imageEntry;
imageEntry.set_id(sessionId);
imageEntry.setTxnNumber(txnNum);
imageEntry.setTs(ts);
imageEntry.setImageKind(imageType);
imageEntry.setImage(image);
getExpCtx()->setMongoProcessInterface(std::make_unique<MockMongoInterface>(
std::vector<Document>{Document{imageEntry.toBSON()}}));
}
private:
boost::intrusive_ptr<ExpressionContext> _expCtx;
RAIIServerParameterControllerForTest _featureFlagController{
"featureFlagDisallowFindAndModifyImageCollection", false};
};
TEST_F(FindAndModifyImageLookupTest, NoopWhenEntryDoesNotHaveNeedsRetryImageField) {
auto documentSourceImageLookup = DocumentSourceFindAndModifyImageLookup::create(getExpCtx());
@ -172,55 +220,85 @@ TEST_F(FindAndModifyImageLookupTest, NoopWhenEntryDoesNotHaveNeedsRetryImageFiel
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
}
TEST_F(FindAndModifyImageLookupTest, ShouldNotForgeImageEntryWhenImageDocMissing) {
auto documentSourceImageLookup = DocumentSourceFindAndModifyImageLookup::create(getExpCtx());
auto imageLookupStage = exec::agg::buildStage(documentSourceImageLookup);
const auto sessionId = makeLogicalSessionIdForTest();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(1);
const auto stmtId = 1;
const auto opTime = repl::OpTime(Timestamp(2, 1), 1);
const auto oplogEntryDoc =
Document(makeOplogEntry(opTime,
repl::OpTypeEnum::kUpdate,
NamespaceString::createNamespaceString_forTest("test.foo"),
UUID::gen(),
BSON("a" << 1),
sessionInfo,
{stmtId},
boost::none /* preImageOpTime */,
boost::none /* postImageOpTime */,
repl::RetryImageEnum::kPreImage)
.getEntry()
.toBSON());
auto mock = exec::agg::MockStage::createForTest(oplogEntryDoc, getExpCtx());
imageLookupStage->setSource(mock.get());
TEST_F(FindAndModifyImageLookupTest,
ShouldNotForgeImageEntryWhenMatchingImageDocOrSnapshotIsNotFoundCrudOp) {
for (auto disallowImageCollection : {true, false}) {
LOGV2(11731900,
"Running case",
"test"_attr = unittest::getTestName(),
"disallowImageCollection"_attr = disallowImageCollection);
// Mock out the foreign collection.
getExpCtx()->setMongoProcessInterface(
std::make_unique<MockMongoInterface>(std::vector<Document>{}));
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagDisallowFindAndModifyImageCollection", disallowImageCollection);
auto nss = NamespaceString::createNamespaceString_forTest("test.foo");
auto next = imageLookupStage->getNext();
ASSERT_TRUE(next.isAdvanced());
// The needsRetryImage field should have been stripped even though we are not forging an image
// entry.
MutableDocument expected{oplogEntryDoc};
expected.remove(repl::OplogEntryBase::kNeedsRetryImageFieldName);
ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected.freeze());
boost::optional<FailPoint*> fp;
boost::optional<int> timesEnteredBefore;
if (disallowImageCollection) {
// The image should be fetched from the snapshot rather than the image collection. Force
// the snapshot read to fail with SnapshotTooOld.
fp = globalFailPointRegistry().find(
"failFindAndModifyImageLookupStageFindOneWithSnapshotTooOld");
timesEnteredBefore =
fp.get()->setMode(FailPoint::alwaysOn, 0, BSON("nss" << nss.toString_forTest()));
}
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
auto documentSourceImageLookup =
DocumentSourceFindAndModifyImageLookup::create(getExpCtx());
auto imageLookupStage = exec::agg::buildStage(documentSourceImageLookup);
const auto sessionId = makeLogicalSessionIdForTest();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(1);
const auto stmtId = 1;
const auto opTime = repl::OpTime(Timestamp(2, 1), 1);
const auto oplogEntryDoc = Document(makeOplogEntry(opTime,
repl::OpTypeEnum::kUpdate,
nss,
UUID::gen(),
BSON("$set" << BSON("a" << 1)),
BSON("_id" << 1),
sessionInfo,
{stmtId},
boost::none /* preImageOpTime */,
boost::none /* postImageOpTime */,
repl::RetryImageEnum::kPreImage)
.getEntry()
.toBSON());
auto mock = exec::agg::MockStage::createForTest(oplogEntryDoc, getExpCtx());
imageLookupStage->setSource(mock.get());
// Mock out the foreign collection.
getExpCtx()->setMongoProcessInterface(
std::make_unique<MockMongoInterface>(std::vector<Document>{}));
auto next = imageLookupStage->getNext();
ASSERT_TRUE(next.isAdvanced());
// The needsRetryImage field should have been stripped even though we are not forging an
// image entry.
MutableDocument expected{oplogEntryDoc};
expected.remove(repl::OplogEntryBase::kNeedsRetryImageFieldName);
ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected.freeze());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
if (fp) {
auto timesEnteredAfter = fp.get()->setMode(FailPoint::off);
ASSERT_EQ(timesEnteredAfter, *timesEnteredBefore + 1);
}
}
}
TEST_F(FindAndModifyImageLookupTest, ShouldNotForgeImageEntryWhenImageDocHasDifferentTxnNumber) {
auto documentSourceImageLookup = DocumentSourceFindAndModifyImageLookup::create(getExpCtx());
auto imageLookupStage = exec::agg::buildStage(documentSourceImageLookup);
const auto sessionId = makeLogicalSessionIdForTest();
const auto txnNum = 1LL;
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(1);
sessionInfo.setTxnNumber(txnNum);
const auto stmtId = 1;
const auto ts = Timestamp(2, 1);
const auto opTime = repl::OpTime(ts, 1);
@ -242,15 +320,7 @@ TEST_F(FindAndModifyImageLookupTest, ShouldNotForgeImageEntryWhenImageDocHasDiff
// Create an 'ImageEntry' with a higher 'txnNumber'.
const auto preImage = BSON("a" << 2);
repl::ImageEntry imageEntry;
imageEntry.set_id(sessionId);
imageEntry.setTxnNumber(2);
imageEntry.setTs(ts);
imageEntry.setImageKind(repl::RetryImageEnum::kPreImage);
imageEntry.setImage(preImage);
// Mock out the foreign collection.
getExpCtx()->setMongoProcessInterface(
std::make_unique<MockMongoInterface>(std::vector<Document>{Document{imageEntry.toBSON()}}));
mockImageDocument(sessionId, txnNum + 1, ts, repl::RetryImageEnum::kPreImage, preImage);
auto next = imageLookupStage->getNext();
ASSERT_TRUE(next.isAdvanced());
@ -270,7 +340,8 @@ TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIs
repl::RetryImageEnum::kPostImage};
for (auto imageType : cases) {
LOGV2(5806002,
"ForgeImageEntryTestCase",
"Running case",
"test"_attr = unittest::getTestName(),
"imageType"_attr = repl::RetryImage_serializer(imageType));
auto documentSourceImageLookup =
DocumentSourceFindAndModifyImageLookup::create(getExpCtx());
@ -291,7 +362,8 @@ TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIs
repl::OpTypeEnum::kUpdate,
nss,
uuid,
BSON("a" << 1),
BSON("$set" << BSON("a" << 1)),
BSON("_id" << 1),
sessionInfo,
{stmtId},
boost::none /* preImageOpTime */,
@ -304,15 +376,7 @@ TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIs
imageLookupStage->setSource(mock.get());
const auto prePostImage = BSON("a" << 2);
repl::ImageEntry imageEntry;
imageEntry.set_id(sessionId);
imageEntry.setTxnNumber(txnNum);
imageEntry.setTs(ts);
imageEntry.setImageKind(imageType);
imageEntry.setImage(prePostImage);
// Mock out the foreign collection.
getExpCtx()->setMongoProcessInterface(std::make_unique<MockMongoInterface>(
std::vector<Document>{Document{imageEntry.toBSON()}}));
mockImageDocument(sessionId, txnNum, ts, imageType, prePostImage);
// The next doc should be the doc for the forged image oplog entry.
auto next = imageLookupStage->getNext();
@ -357,7 +421,8 @@ TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIs
repl::RetryImageEnum::kPostImage};
for (auto imageType : cases) {
LOGV2(6344105,
"ForgeImageEntryTestCase",
"Running case",
"test"_attr = unittest::getTestName(),
"imageType"_attr = repl::RetryImage_serializer(imageType));
auto documentSourceImageLookup = DocumentSourceFindAndModifyImageLookup::create(
getExpCtx(), true /* includeCommitTransactionTimestamp */);
@ -400,15 +465,7 @@ TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIs
imageLookupStage->setSource(mock.get());
const auto prePostImage = BSON("_id" << 1);
repl::ImageEntry imageEntry;
imageEntry.set_id(sessionId);
imageEntry.setTxnNumber(txnNum);
imageEntry.setTs(applyOpsTs);
imageEntry.setImageKind(imageType);
imageEntry.setImage(prePostImage);
// Mock out the foreign collection.
getExpCtx()->setMongoProcessInterface(std::make_unique<MockMongoInterface>(
std::vector<Document>{Document{imageEntry.toBSON()}}));
mockImageDocument(sessionId, txnNum, applyOpsTs, imageType, prePostImage);
// The next doc should be the doc for the forged image oplog entry and it should contain the
// commit transaction timestamp.
@ -467,86 +524,119 @@ TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIs
}
}
TEST_F(FindAndModifyImageLookupTest,
ShouldNotForgeImageEntryWhenMatchingImageDocIsNotFoundApplyOpsOp) {
auto documentSourceImageLookup = DocumentSourceFindAndModifyImageLookup::create(
getExpCtx(), true /* includeCommitTransactionTimestamp */);
auto imageLookupStage = exec::agg::buildStage(documentSourceImageLookup);
const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
const auto txnNum = 1LL;
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(txnNum);
const auto stmtId = 1;
const auto applyOpsTs = Timestamp(2, 1);
const auto applyOpsOpTime = repl::OpTime(applyOpsTs, 1);
const auto commitTxnTs = Timestamp(3, 1);
const auto commitTxnTsFieldName = CommitTransactionOplogObject::kCommitTimestampFieldName;
const auto nss = NamespaceString::createNamespaceString_forTest("test.foo");
const auto uuid = UUID::gen();
ShouldNotForgeImageEntryWhenMatchingImageDocOrSnapshotIsNotFoundApplyOpsOp) {
for (auto disallowImageCollection : {true, false}) {
LOGV2(11731901,
"Running case",
"test"_attr = unittest::getTestName(),
"disallowImageCollection"_attr = disallowImageCollection);
// Define an applyOps oplog entry containing a findAndModify/update operation entry with
// the 'needsRetryImage' field set.
auto insertOp = repl::MutableOplogEntry::makeInsertOperation(
nss, uuid, BSON("_id" << 0 << "a" << 0), BSON("_id" << 0));
auto updateOp = repl::MutableOplogEntry::makeUpdateOperation(
nss, uuid, BSON("$set" << BSON("a" << 1)), BSON("_id" << 1));
updateOp.setStatementIds({stmtId});
updateOp.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
BSONObjBuilder applyOpsBuilder;
applyOpsBuilder.append("applyOps", BSON_ARRAY(insertOp.toBSON() << updateOp.toBSON()));
auto oplogEntryUUID = UUID::gen();
auto oplogEntryBson = makeOplogEntry(applyOpsOpTime,
repl::OpTypeEnum::kCommand,
{},
oplogEntryUUID,
applyOpsBuilder.obj(),
sessionInfo,
{})
.getEntry()
.toBSON()
.addFields(BSON(commitTxnTsFieldName << commitTxnTs));
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagDisallowFindAndModifyImageCollection", disallowImageCollection);
auto nss = NamespaceString::createNamespaceString_forTest("test.foo");
auto mock = exec::agg::MockStage::createForTest(Document(oplogEntryBson), getExpCtx());
imageLookupStage->setSource(mock.get());
boost::optional<FailPoint*> fp;
boost::optional<int> timesEnteredBefore;
if (disallowImageCollection) {
// The image should be fetched from the snapshot rather than the image collection. Force
// the snapshot read to fail with SnapshotTooOld.
fp = globalFailPointRegistry().find(
"failFindAndModifyImageLookupStageFindOneWithSnapshotTooOld");
timesEnteredBefore =
fp.get()->setMode(FailPoint::alwaysOn, 0, BSON("nss" << nss.toString_forTest()));
}
// Mock out the foreign collection.
getExpCtx()->setMongoProcessInterface(
std::make_unique<MockMongoInterface>(std::vector<Document>{Document{}}));
auto documentSourceImageLookup = DocumentSourceFindAndModifyImageLookup::create(
getExpCtx(), true /* includeCommitTransactionTimestamp */);
auto imageLookupStage = exec::agg::buildStage(documentSourceImageLookup);
const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
const auto txnNum = 1LL;
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(txnNum);
const auto stmtId = 1;
const auto applyOpsTs = Timestamp(2, 1);
const auto applyOpsOpTime = repl::OpTime(applyOpsTs, 1);
const auto commitTxnTs = applyOpsTs;
const auto commitTxnTsFieldName = CommitTransactionOplogObject::kCommitTimestampFieldName;
const auto uuid = UUID::gen();
// The next doc should be the doc for original applyOps oplog entry but the
// findAndModify/update operation entry should have 'needsRetryImage' field removed.
auto next = imageLookupStage->getNext();
const auto downConvertedOplogEntryBson = next.releaseDocument().toBson();
// Define an applyOps oplog entry containing a findAndModify/update operation entry with
// the 'needsRetryImage' field set.
auto insertOp = repl::MutableOplogEntry::makeInsertOperation(
nss, uuid, BSON("_id" << 0 << "a" << 0), BSON("_id" << 0));
auto updateOp = repl::MutableOplogEntry::makeUpdateOperation(
nss, uuid, BSON("$set" << BSON("a" << 1)), BSON("_id" << 1));
updateOp.setStatementIds({stmtId});
updateOp.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
BSONObjBuilder applyOpsBuilder;
applyOpsBuilder.append("applyOps", BSON_ARRAY(insertOp.toBSON() << updateOp.toBSON()));
auto oplogEntryUUID = UUID::gen();
auto oplogEntryBson = makeOplogEntry(applyOpsOpTime,
repl::OpTypeEnum::kCommand,
{},
oplogEntryUUID,
applyOpsBuilder.obj(),
sessionInfo,
{})
.getEntry()
.toBSON()
.addFields(BSON(commitTxnTsFieldName << commitTxnTs));
auto updateOpWithoutNeedsRetryImage = repl::MutableOplogEntry::makeUpdateOperation(
nss, uuid, BSON("$set" << BSON("a" << 1)), BSON("_id" << 1));
updateOpWithoutNeedsRetryImage.setStatementIds({stmtId});
BSONObjBuilder applyOpsWithoutNeedsRetryImageBuilder;
applyOpsWithoutNeedsRetryImageBuilder.append(
"applyOps", BSON_ARRAY(insertOp.toBSON() << updateOpWithoutNeedsRetryImage.toBSON()));
auto expectedOplogEntryBson = makeOplogEntry(applyOpsOpTime,
repl::OpTypeEnum::kCommand,
{},
oplogEntryUUID,
applyOpsWithoutNeedsRetryImageBuilder.obj(),
sessionInfo,
{})
.getEntry()
.toBSON()
.addFields(BSON(commitTxnTsFieldName << commitTxnTs));
auto mock = exec::agg::MockStage::createForTest(Document(oplogEntryBson), getExpCtx());
imageLookupStage->setSource(mock.get());
ASSERT_BSONOBJ_EQ(expectedOplogEntryBson, downConvertedOplogEntryBson);
if (disallowImageCollection) {
// Mock the oplog document since fetching the image from the snapshot involves
// extracting the commit timestamp from the last oplog entry in the oplog chain.
getExpCtx()->setMongoProcessInterface(std::make_unique<MockMongoInterface>(
std::vector<Document>{Document{oplogEntryBson.removeField(commitTxnTsFieldName)}}));
} else {
// Mock out the foreign collection.
getExpCtx()->setMongoProcessInterface(
std::make_unique<MockMongoInterface>(std::vector<Document>{Document{}}));
}
auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(
downConvertedOplogEntryBson.getObjectField(repl::OplogEntry::kObjectFieldName));
auto operationDocs = applyOpsInfo.getOperations();
ASSERT_EQ(operationDocs.size(), 2U);
// The next doc should be the doc for original applyOps oplog entry but the
// findAndModify/update operation entry should have 'needsRetryImage' field removed.
auto next = imageLookupStage->getNext();
const auto downConvertedOplogEntryBson = next.releaseDocument().toBson();
ASSERT_BSONOBJ_EQ(operationDocs[0], insertOp.toBSON());
auto updateOpWithoutNeedsRetryImage = repl::MutableOplogEntry::makeUpdateOperation(
nss, uuid, BSON("$set" << BSON("a" << 1)), BSON("_id" << 1));
updateOpWithoutNeedsRetryImage.setStatementIds({stmtId});
BSONObjBuilder applyOpsWithoutNeedsRetryImageBuilder;
applyOpsWithoutNeedsRetryImageBuilder.append(
"applyOps", BSON_ARRAY(insertOp.toBSON() << updateOpWithoutNeedsRetryImage.toBSON()));
auto expectedOplogEntryBson = makeOplogEntry(applyOpsOpTime,
repl::OpTypeEnum::kCommand,
{},
oplogEntryUUID,
applyOpsWithoutNeedsRetryImageBuilder.obj(),
sessionInfo,
{})
.getEntry()
.toBSON()
.addFields(BSON(commitTxnTsFieldName << commitTxnTs));
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_BSONOBJ_EQ(expectedOplogEntryBson, downConvertedOplogEntryBson);
auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(
downConvertedOplogEntryBson.getObjectField(repl::OplogEntry::kObjectFieldName));
auto operationDocs = applyOpsInfo.getOperations();
ASSERT_EQ(operationDocs.size(), 2U);
ASSERT_BSONOBJ_EQ(operationDocs[0], insertOp.toBSON());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
ASSERT_TRUE(imageLookupStage->getNext().isEOF());
if (fp) {
auto timesEnteredAfter = fp.get()->setMode(FailPoint::off);
ASSERT_EQ(timesEnteredAfter, *timesEnteredBefore + 1);
}
}
}
} // namespace
} // namespace mongo

View File

@ -1313,17 +1313,44 @@ boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocumentLoca
const NamespaceString& nss,
const Document& documentKey) {
OperationContext* opCtx = expCtx->getOperationContext();
// This helper is only meant to be used for accessing a collection locally. It is only allowed
// to be used against a user collection when specifying read concern "snapshot" with
// "atClusterTime", and the caller must guarantee that the chunk containing the document is
// owned by the mongod at at timestamp.
if (!nss.isNamespaceAlwaysUntracked()) {
const auto& readConcern = repl::ReadConcernArgs::get(opCtx);
tassert(11731904,
str::stream() << "Trying to look up a document in a user collection locally "
"without doing placement version check with read concern "
<< readConcern.toBSONInner(),
readConcern.getLevel() == repl::ReadConcernLevelEnum::kSnapshotReadConcern &&
readConcern.getArgsAtClusterTime().has_value());
}
// Using kPretendUnsharded (and skipping version check) as this helper is only used to access
// the config.system.preimages which is always present and local.
// a collection locally.
const auto acquisition = acquireCollectionMaybeLockFree(
opCtx,
CollectionAcquisitionRequest(nss,
PlacementConcern::kPretendUnsharded,
repl::ReadConcernArgs::get(opCtx),
AcquisitionPrerequisites::kRead));
auto documentKeyBson = documentKey.toBson();
BSONObj document;
if (!Helpers::findById(expCtx->getOperationContext(), nss, documentKey.toBson(), document)) {
return boost::none;
if (nss == NamespaceString::kRsOplogNamespace) {
dassert(!documentKey.getField("ts").missing(),
"Expected the document key for the oplog collection to contain 'ts' field");
if (!Helpers::findOne(opCtx, acquisition, documentKeyBson, document)) {
return boost::none;
}
} else {
dassert(!documentKey.getField("_id").missing(),
"Expected the document key to contain '_id' field");
if (!Helpers::findById(expCtx->getOperationContext(), nss, documentKeyBson, document)) {
return boost::none;
}
}
return Document(document).getOwned();
}

View File

@ -613,10 +613,15 @@ public:
boost::optional<BSONObj> readConcern) = 0;
/**
* Returns zero or one document with the document _id being equal to 'documentKey'. The document
* is looked up only on the current node. Returns boost::none if no matching documents were
* found, including cases where the given namespace does not exist. It is illegal to call this
* method on nodes other than mongod.
* Returns zero or one document with the given 'documentKey'. If the collection is the oplog
* collection, 'documentKey' must be the "ts" field. Otherwise, it must be the "_id" field.
* The document is looked up only on the current node. Returns boost::none if no matching
* documents were found, including cases where the given namespace does not exist.
* - It is illegal to call this method on nodes other than mongod.
* - On a shardsvr mongod, this method is only allowed to be called against a user collection
* when specifying read concern "snapshot" with "atClusterTime", and the caller must
* guarantee that the chunk containing the document (if exists) is owned by the mongod at
* at timestamp.
*/
virtual boost::optional<Document> lookupSingleDocumentLocally(
const boost::intrusive_ptr<ExpressionContext>& expCtx,

View File

@ -90,12 +90,11 @@ mongo_cc_library(
],
deps = [
"//src/mongo:base",
"//src/mongo/db:dbdirectclient",
"//src/mongo/db:local_executor",
"//src/mongo/db:mongod_options",
"//src/mongo/db:server_feature_flags",
"//src/mongo/db:service_context",
"//src/mongo/db/query:command_request_response",
"//src/mongo/db/repl:image_collection_entry",
"//src/mongo/db/repl:oplog_entry",
],
)
@ -112,6 +111,7 @@ mongo_cc_library(
deps = [
":find_and_modify_image_lookup_util",
"//src/mongo:base",
"//src/mongo/db:local_executor",
"//src/mongo/db:mongod_options",
"//src/mongo/db:profile_collection",
"//src/mongo/db:query_exec",

View File

@ -31,12 +31,10 @@
#include "mongo/db/query/write_ops/find_and_modify_image_lookup_util.h"
#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/local_executor.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/find_command.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/rss/replicated_storage_service.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@ -67,10 +65,54 @@ BSONObj extractFindAndModifyIdFilter(const repl::OplogEntry& oplogEntry) {
!idField.eoo());
return idField.wrap();
}
/**
* Fetches the pre- or post-image for the given findAndModify operation from the image collection.
*/
boost::optional<BSONObj> fetchPreOrPostImageFromImageCollection(
const repl::OplogEntry oplogEntry, FindOneLocallyFunc findOneLocallyFunc) {
auto imageDoc = findOneLocallyFunc(NamespaceString::kConfigImagesNamespace,
BSON("_id" << oplogEntry.getSessionId()->toBSON()),
boost::none /* readConcern */);
if (!imageDoc) {
return boost::none;
}
auto image = repl::ImageEntry::parse(*imageDoc, IDLParserContext("image entry"));
if (image.getTxnNumber() != oplogEntry.getTxnNumber()) {
// In our snapshot, fetch the current transaction number for a session. If that transaction
// number doesn't match what's found on the image lookup, it implies that the image is not
// the correct version for this oplog entry. We will not forge a noop from it.
LOGV2_DEBUG(
580603,
2,
"Not forging no-op image oplog entry because image document has a different txnNum",
"sessionId"_attr = oplogEntry.getSessionId(),
"expectedTxnNum"_attr = oplogEntry.getTxnNumber(),
"actualTxnNum"_attr = image.getTxnNumber());
return boost::none;
}
return image.getImage();
}
/**
* Fetches the pre- or post-image for the given findAndModify operation.
*/
boost::optional<BSONObj> fetchPreOrPostImage(OperationContext* opCtx,
const repl::OplogEntry& oplogEntry,
FindOneLocallyFunc findOneLocallyFunc) {
auto& rss = rss::ReplicatedStorageService::get(opCtx);
if (rss.getPersistenceProvider().supportsFindAndModifyImageCollection()) {
return fetchPreOrPostImageFromImageCollection(oplogEntry, findOneLocallyFunc);
}
return fetchPreOrPostImageFromSnapshot(oplogEntry, findOneLocallyFunc);
}
} // namespace
boost::optional<BSONObj> fetchPreOrPostImageFromSnapshot(OperationContext* opCtx,
const repl::OplogEntry& oplogEntry) {
boost::optional<BSONObj> fetchPreOrPostImageFromSnapshot(const repl::OplogEntry& oplogEntry,
FindOneLocallyFunc findOneLocallyFunc) {
invariant(oplogEntry.getNeedsRetryImage());
auto idFilter = extractFindAndModifyIdFilter(oplogEntry);
@ -78,47 +120,50 @@ boost::optional<BSONObj> fetchPreOrPostImageFromSnapshot(OperationContext* opCtx
? *oplogEntry.getCommitTransactionTimestamp()
: oplogEntry.getTimestamp();
// Set up a separate OperationContext since waiting for read concern is not supported running a
// transaction and the caller may be handling a retry in a retryable internal transaction.
auto newClient = opCtx->getService()->makeClient("fetchPreOrPostImageFromSnapshot");
auto executor = getLocalExecutor(opCtx);
AlternativeClientRegion acr(newClient);
CancelableOperationContext newOpCtx(
cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
repl::ReadConcernArgs snapshotReadConcern(repl::ReadConcernLevel::kSnapshotReadConcern);
snapshotReadConcern.setArgsAtClusterTimeForSnapshot(
oplogEntry.getNeedsRetryImage() == repl::RetryImageEnum::kPostImage ? opTimestamp
: opTimestamp - 1);
repl::ReadConcernArgs::get(newOpCtx.get()) = snapshotReadConcern;
DBDirectClient client(newOpCtx.get());
try {
FindCommandRequest findRequest(oplogEntry.getNss());
findRequest.setFilter(idFilter);
if (gFeatureFlagAllBinariesSupportRawDataOperations.isEnabledUseLatestFCVWhenUninitialized(
VersionContext::getDecoration(newOpCtx.get()),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
// This must be set for the request to work against a timeseries collection.
findRequest.setRawData(true);
}
auto cursor = client.find(findRequest);
auto doc = findOneLocallyFunc(oplogEntry.getNss(), idFilter, snapshotReadConcern);
tassert(11730902,
str::stream() << "Could not find the document that the findAndModify operation "
"wrote to in the snapshot for "
<< oplogEntry.getTimestamp(),
cursor->more());
auto doc = cursor->next();
tassert(
11730903,
str::stream() << "Found multiple documents with _id that the findAndModify operation "
"wrote to in the snapshot for "
<< oplogEntry.getTimestamp(),
!cursor->more());
doc);
return doc;
} catch (const ExceptionFor<ErrorCategory::SnapshotError>&) {
return boost::none;
}
}
boost::optional<repl::OplogEntry> forgeNoopImageOplogEntry(OperationContext* opCtx,
const repl::OplogEntry& oplogEntry,
FindOneLocallyFunc findOneLocallyFunc) {
invariant(oplogEntry.getNeedsRetryImage());
auto image = fetchPreOrPostImage(opCtx, oplogEntry, findOneLocallyFunc);
if (!image) {
return boost::none;
}
repl::MutableOplogEntry forgedNoop;
forgedNoop.setSessionId(*oplogEntry.getSessionId());
forgedNoop.setTxnNumber(*oplogEntry.getTxnNumber());
forgedNoop.setObject(*image);
forgedNoop.setOpType(repl::OpTypeEnum::kNoop);
forgedNoop.setWallClockTime(oplogEntry.getWallClockTime());
forgedNoop.setNss(oplogEntry.getNss());
forgedNoop.setUuid(oplogEntry.getUuid());
forgedNoop.setStatementIds(oplogEntry.getStatementIds());
// Set the opTime to be the findAndModify timestamp - 1. We guarantee that there will be no
// collisions because we always reserve an extra oplog slot when writing the retryable
// findAndModify entry on the primary.
forgedNoop.setOpTime(repl::OpTime(oplogEntry.getTimestamp() - 1, *oplogEntry.getTerm()));
return repl::OplogEntry{forgedNoop.toBSON()};
}
} // namespace mongo

View File

@ -36,16 +36,31 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/util/modules.h"
namespace mongo {
using FindOneLocallyFunc = std::function<boost::optional<BSONObj>(
const NamespaceString& nss,
const BSONObj& filter,
const boost::optional<repl::ReadConcernArgs>& readConcern)>;
/**
* Fetches and returns the pre- or post-image of the findAndModify operation by performing a
* snapshot read against the collection wrote to. If the operation was executed in a transaction,
* the oplog entry must have the commit timestamp.
*/
MONGO_MOD_PUBLIC boost::optional<BSONObj> fetchPreOrPostImageFromSnapshot(
OperationContext* opCtx, const repl::OplogEntry& oplogEntry);
const repl::OplogEntry& oplogEntry, FindOneLocallyFunc findOneLocallyFunc);
/**
* Fetches the pre- or post-image for the given findAndModify operation either from the image
* collection or by performing a snapshot read against the collection the findAndModify wrote to.
* Returns a forged noop oplog entry containing the image. Returns none if no image is found.
*/
boost::optional<repl::OplogEntry> forgeNoopImageOplogEntry(OperationContext* opCtx,
const repl::OplogEntry& oplogEntry,
FindOneLocallyFunc findOneLocallyFunc);
} // namespace mongo

View File

@ -34,15 +34,18 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/basic_types.h"
#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/local_executor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/write_ops/find_and_modify_image_lookup_util.h"
#include "mongo/db/query/write_ops/write_ops_gen.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/rss/replicated_storage_service.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/session/logical_session_id.h"
#include "mongo/db/session/logical_session_id_gen.h"
#include "mongo/idl/idl_parser.h"
@ -289,7 +292,38 @@ BSONObj extractPreOrPostImage(OperationContext* opCtx, const repl::OplogEntry& o
return fetchPreOrPostImageFromImageCollection(opCtx, oplog);
}
auto image = fetchPreOrPostImageFromSnapshot(opCtx, oplog);
auto findOneLocallyFunc = [&](const NamespaceString& nss,
const BSONObj& filter,
const boost::optional<repl::ReadConcernArgs>& readConcern)
-> boost::optional<BSONObj> {
// Set up a separate OperationContext since waiting for read concern is not
// supported when running in a transaction and the retry might be running in a
// retryable internal transaction.
auto newClient = opCtx->getService()->makeClient("extractPreOrPostImage");
auto executor = getLocalExecutor(opCtx);
AlternativeClientRegion acr(newClient);
CancelableOperationContext newOpCtx(
cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
FindCommandRequest findRequest(nss);
findRequest.setFilter(filter);
if (gFeatureFlagAllBinariesSupportRawDataOperations
.isEnabledUseLatestFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
// This must be set for the request to work against a timeseries collection.
findRequest.setRawData(true);
}
repl::ReadConcernArgs::get(newOpCtx.get()) =
readConcern ? *readConcern : repl::ReadConcernArgs::get(opCtx);
DBDirectClient client(newOpCtx.get());
auto cursor = client.find(findRequest);
return cursor->more() ? boost::make_optional(cursor->next()) : boost::none;
};
auto image = fetchPreOrPostImageFromSnapshot(oplog, findOneLocallyFunc);
uassert(ErrorCodes::IncompleteTransactionHistory,
makePreOrPostImageNotFoundErrorMessage(oplog),
image);

View File

@ -907,9 +907,8 @@ void OplogEntry::setCommitTransactionTimestamp(boost::optional<mongo::Timestamp>
StatusWith<Timestamp> OplogEntry::extractCommitTransactionTimestamp() const {
if (!isInTransaction()) {
return Status{mongo::ErrorCodes::Error(11730800),
str::stream() << "Expected a transaction oplog entry but found op type "
<< repl::OpType_serializer(getOpType()) << " with session id "
<< getSessionId() << " and txn number " << getTxnNumber()};
str::stream() << "Expected a transaction oplog entry but found "
<< redact(toBSONForLogging())};
}
if (isTerminalApplyOps()) {
return getTimestamp();

View File

@ -193,20 +193,10 @@ public:
new MockTransactionHistoryIterator(_mockResults, time));
}
BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) override {
auto optionIter = _collectionOptions.find(nss);
invariant(optionIter != _collectionOptions.end(),
str::stream() << nss.toStringForErrorMsg() << " was not registered");
return optionIter->second;
}
boost::optional<Document> lookupSingleDocument(
boost::optional<Document> lookupSingleDocumentLocally(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
boost::optional<UUID> collectionUUID,
const Document& documentKey,
boost::optional<BSONObj> readConcern) override {
const Document& documentKey) override {
DBDirectClient client(expCtx->getOperationContext());
auto result = client.findOne(nss, documentKey.toBson());
if (result.isEmpty()) {
@ -216,13 +206,8 @@ public:
return Document(result.getOwned());
}
void setCollectionOptions(const NamespaceString& nss, const BSONObj option) {
_collectionOptions[nss] = option;
}
private:
std::deque<DocumentSource::GetNextResult> _mockResults;
std::map<NamespaceString, BSONObj> _collectionOptions;
};
repl::MutableOplogEntry makeOplog(const NamespaceString& nss,
@ -1533,10 +1518,6 @@ TEST_F(ReshardingAggWithStorageTest, RetryableFindAndModifyWithImageLookup) {
{
auto mockMongoInterface = std::make_shared<MockMongoInterface>(pipelineSource);
// Register a dummy uuid just to not make test crash. The stub for findSingleDoc ignores
// the UUID so it doesn't matter what the value here is.
mockMongoInterface->setCollectionOptions(NamespaceString::kConfigImagesNamespace,
BSON("uuid" << UUID::gen()));
expCtx->setMongoProcessInterface(std::move(mockMongoInterface));
}
@ -1635,10 +1616,6 @@ TEST_F(ReshardingAggWithStorageTest,
expCtx->setNamespaceString(NamespaceString::kRsOplogNamespace);
{
auto mockMongoInterface = std::make_shared<MockMongoInterface>(pipelineSource);
// Register a dummy uuid just to not make test crash. The stub for findSingleDoc ignores
// the UUID so it doesn't matter what the value here is.
mockMongoInterface->setCollectionOptions(NamespaceString::kConfigImagesNamespace,
BSON("uuid" << UUID::gen()));
expCtx->setMongoProcessInterface(std::move(mockMongoInterface));
}