SERVER-109034 In $merge when possible try insert and only use update as a fallback (#44991)

GitOrigin-RevId: a374e0532cc7076bb8e2003c4c3b2b5c8792aa4f
This commit is contained in:
Ivan Fefer 2026-01-02 12:36:21 +01:00 committed by MongoDB Bot
parent 4d39f0eb11
commit 4b466c9483
22 changed files with 574 additions and 153 deletions

View File

@ -135,19 +135,66 @@ const pipeline = [mergeStage];
return;
}
dropWithoutImplicitRecreate(source.getName());
assert.commandWorked(source.insert({_id: 4, a: 1}));
assert.commandWorked(target.createIndex({a: 1}, {unique: true}));
const error = assert.throws(() => source.aggregate(pipeline));
assert.commandFailedWithCode(error, ErrorCodes.DuplicateKey);
assertArrayEq({
actual: target.find().toArray(),
expected: [
{_id: 1, a: 1, b: "a"},
{_id: 2, a: 2, b: "b"},
],
});
assert.commandWorked(target.dropIndex({a: 1}));
{
dropWithoutImplicitRecreate(source.getName());
assert.commandWorked(source.insert({_id: 4, a: 1}));
assert.commandWorked(target.createIndex({a: 1}, {unique: true}));
const error = assert.throws(() => source.aggregate(pipeline));
assert.commandFailedWithCode(error, ErrorCodes.DuplicateKey);
assertArrayEq({
actual: target.find().toArray(),
expected: [
{_id: 1, a: 1, b: "a"},
{_id: 2, a: 2, b: "b"},
],
});
assert.commandWorked(target.dropIndex({a: 1}));
}
{
dropWithoutImplicitRecreate(source.getName());
dropWithoutImplicitRecreate(target.getName());
const document = {_id: 0, a: 1, b: 2};
assert.commandWorked(source.insert(document));
assert.commandWorked(target.insert(document));
assert.commandWorked(target.createIndex({a: 1}, {unique: true}));
assert.commandWorked(target.createIndex({b: 1}, {unique: true}));
const mergeOnAStage = {
$merge: {
into: target.getName(),
on: "a",
whenMatched: "keepExisting",
whenNotMatched: "insert",
},
};
const mergeOnBStage = {
$merge: {
into: target.getName(),
on: "b",
whenMatched: "keepExisting",
whenNotMatched: "insert",
},
};
const assertMerge = function (mergeStage) {
assert.doesNotThrow(() => source.aggregate([mergeStage]).itcount());
assertArrayEq({
actual: target.find().toArray(),
expected: [document],
});
};
assertMerge(mergeStage);
assertMerge(mergeOnAStage);
assertMerge(mergeOnBStage);
assert.commandWorked(target.dropIndex({a: 1}));
assert.commandWorked(target.dropIndex({b: 1}));
}
})();
// Test $merge fails if it cannot find an index to verify that the 'on' fields will be unique.

View File

@ -20,12 +20,16 @@ export function waitForCurOpByFilter(db, filter, options = {}) {
return results;
}
export function getCurOpFilterForFailPoint(failPointName) {
return {$or: [{failpointMsg: failPointName}, {msg: failPointName}]};
}
// Wait until the current operation reaches the fail point "failPoint" for the given namespace
// "nss". Accepts an optional filter to apply alongside the "failpointMsg". Returns the resulting
// array of operations.
export function waitForCurOpByFailPoint(db, nss, failPoint, filter = {}, options = {}) {
const adjustedFilter = {
$and: [{ns: nss}, filter, {$or: [{failpointMsg: failPoint}, {msg: failPoint}]}],
$and: [{ns: nss}, filter, getCurOpFilterForFailPoint(failPoint)],
};
return waitForCurOpByFilter(db, adjustedFilter, options);
}
@ -33,7 +37,7 @@ export function waitForCurOpByFailPoint(db, nss, failPoint, filter = {}, options
// Wait until the current operation reaches the fail point "failPoint" with no namespace. Returns
// the resulting array of operations.
export function waitForCurOpByFailPointNoNS(db, failPoint, filter = {}, options = {}) {
const adjustedFilter = {$and: [filter, {$or: [{failpointMsg: failPoint}, {msg: failPoint}]}]};
const adjustedFilter = {$and: [filter, getCurOpFilterForFailPoint(failPoint)]};
return waitForCurOpByFilter(db, adjustedFilter, options);
}

View File

@ -6,9 +6,10 @@
* ]
*/
import {withEachMergeMode} from "jstests/aggregation/extras/merge_helpers.js";
import {waitForCurOpByFailPointNoNS} from "jstests/libs/curop_helpers.js";
import {getCurOpFilterForFailPoint, waitForCurOpByFilter} from "jstests/libs/curop_helpers.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
const kDBName = "test";
const kSourceCollName = "merge_max_time_ms_source";
@ -32,7 +33,7 @@ function insertDocs(coll) {
function forceAggregationToHangAndCheckMaxTimeMsExpires(
whenMatched,
whenNotMatched,
failPointName,
failPointNames,
conn,
failPointConn,
maxTimeMsConn,
@ -45,20 +46,16 @@ function forceAggregationToHangAndCheckMaxTimeMsExpires(
// Enable a failPoint so that the write will hang. 'shouldCheckForInterrupt' is set to true
// so that maxTimeMS expiration can occur while the $merge operation's thread is hanging on
// this failpoiint.
const failpointCommand = {
configureFailPoint: failPointName,
mode: "alwaysOn",
data: {nss: kDBName + "." + kDestCollName, shouldCheckForInterrupt: true},
};
assert.commandWorked(failPointConn.getDB("admin").runCommand(failpointCommand));
const failPoints = failPointNames.map((failPointName) =>
configureFailPoint(failPointConn, failPointName, {
nss: kDBName + "." + kDestCollName,
shouldCheckForInterrupt: true,
}),
);
// Make sure we don't run out of time on either of the involved nodes before the failpoint is
// hit.
assert.commandWorked(conn.getDB("admin").runCommand({configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}));
assert.commandWorked(
maxTimeMsConn.getDB("admin").runCommand({configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}),
);
const maxTimeFailPoints = [conn, maxTimeMsConn].map((conn) => configureFailPoint(conn, "maxTimeNeverTimeOut"));
// Build the parallel shell function.
let shellStr = `const testDB = db.getSiblingDB('${kDBName}');`;
@ -83,11 +80,10 @@ function forceAggregationToHangAndCheckMaxTimeMsExpires(
shellStr += `(${runAggregate.toString()})();`;
const awaitShell = startParallelShell(shellStr, conn.port);
waitForCurOpByFailPointNoNS(failPointConn.getDB("admin"), failPointName, {}, {allUsers: true});
const curOpFilter = {$or: failPointNames.map((fp) => getCurOpFilterForFailPoint(fp))};
waitForCurOpByFilter(failPointConn.getDB("admin"), curOpFilter, {allUsers: true});
assert.commandWorked(
maxTimeMsConn.getDB("admin").runCommand({configureFailPoint: "maxTimeNeverTimeOut", mode: "off"}),
);
maxTimeFailPoints.forEach((fp) => fp.off());
// The aggregation running in the parallel shell will hang on the failpoint, burning
// its time. Wait until the maxTimeMS has definitely expired.
@ -95,7 +91,7 @@ function forceAggregationToHangAndCheckMaxTimeMsExpires(
// Now drop the failpoint, allowing the aggregation to proceed. It should hit an
// interrupt check and terminate immediately.
assert.commandWorked(failPointConn.getDB("admin").runCommand({configureFailPoint: failPointName, mode: "off"}));
failPoints.forEach((fp) => fp.off());
// Wait for the parallel shell to finish.
assert.eq(awaitShell(), 0);
@ -133,14 +129,14 @@ function runUnshardedTest(whenMatched, whenNotMatched, conn, primaryConn, maxTim
assert.commandWorked(destColl.remove({}));
// Force the aggregation to hang while the batch is being written. The failpoint changes
// depending on the mode. If 'whenMatched' is set to "fail" then the implementation will end
// up issuing insert commands instead of updates.
const kFailPointName = whenMatched === "fail" ? "hangDuringBatchInsert" : "hangDuringBatchUpdate";
// Force the aggregation to hang while the batch is being written. Depending on mode, $merge
// can perform inserts or updates. To make the test resiliant to $merge mode changing, we hang
// on both possible code paths.
const kFailPointNames = ["hangDuringBatchInsert", "hangDuringBatchUpdate"];
forceAggregationToHangAndCheckMaxTimeMsExpires(
whenMatched,
whenNotMatched,
kFailPointName,
kFailPointNames,
conn,
primaryConn,
maxTimeMsConn,
@ -152,7 +148,7 @@ function runUnshardedTest(whenMatched, whenNotMatched, conn, primaryConn, maxTim
forceAggregationToHangAndCheckMaxTimeMsExpires(
whenMatched,
whenNotMatched,
"hangWhileBuildingDocumentSourceMergeBatch",
["hangWhileBuildingDocumentSourceMergeBatch"],
conn,
conn,
conn,

View File

@ -56,10 +56,12 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
.itcount(),
);
assert.eq(whenNotMatchedMode === "discard" ? 0 : 2, outColl.find().itcount());
if (whenMatchedMode === "fail") {
assert.eq(1, primary.system.profile.find({"op": "insert", "command.comment": commentStr}).itcount());
const insertCount = primary.system.profile.find({"op": "insert", "command.comment": commentStr}).itcount();
const updateCount = primary.system.profile.find({"op": "update", "command.comment": commentStr}).itcount();
if (insertCount !== 0) {
assert.eq(1, insertCount);
} else {
assert.eq(2, primary.system.profile.find({"op": "update", "command.comment": commentStr}).itcount());
assert.eq(2, updateCount);
}
outColl.drop();
});

View File

@ -10,6 +10,7 @@
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
const kDBName = "out_merge_on_secondary_db";
@ -31,7 +32,9 @@ const outColl = primaryDB.getCollection("outColl");
assert.commandWorked(inputColl.insert({_id: 0, a: 1}, {writeConcern: {w: 2}}));
assert.commandWorked(inputColl.insert({_id: 1, a: 2}, {writeConcern: {w: 2}}));
const mergeFailpoint = "hangDuringBatchUpdate";
const mergeFailpoint = FeatureFlagUtil.isPresentAndEnabled(secondaryDB, "MergeStageInsertWithUpdateBackup")
? "hangDuringBatchInsert"
: "hangDuringBatchUpdate";
const outFailpoint = "hangDuringBatchInsert";
/**

View File

@ -191,7 +191,7 @@ testEpochChangeDuringAgg({
expectedError: ErrorCodes.QueryPlanKilled,
});
testEpochChangeDuringAgg({
mergeSpec: {into: target.getName(), whenMatched: "replace", whenNotMatched: "insert"},
mergeSpec: {into: target.getName(), whenMatched: "replace", whenNotMatched: "discard"},
failpoint: "hangDuringBatchUpdate",
failpointData: {nss: target.getFullName()},
expectedError: ErrorCodes.QueryPlanKilled,

View File

@ -45,7 +45,7 @@ function testWriteConcernError(rs) {
cursor: {},
});
jsTestLog("Testing Mode: " + tojson(whenMatchedMode) + tojson(whenNotMatchedMode));
jsTestLog("Testing Mode: " + tojson(whenMatchedMode) + " " + tojson(whenNotMatchedMode));
jsTestLog("Target collection after $merge: " + tojson(target.find().toArray()));
let oplogEntries = shard0

View File

@ -68,10 +68,14 @@ std::pair<MergeStage::BatchObject, int> MergeStage::makeBatchObject(Document doc
return {std::move(batchObject), size};
}
bool MergeStage::shouldFlush(size_t currentBatchSize) const {
return _mergeProcessor->shouldFlush(currentBatchSize);
}
void MergeStage::flush(BatchedCommandRequest bcr, BatchedObjects batch) {
try {
DocumentSourceWriteBlock writeBlock(pExpCtx->getOperationContext());
_mergeProcessor->flush(_outputNs, std::move(bcr), std::move(batch));
_mergeProcessor->flush(_outputNs, *_mergeOnFields, std::move(bcr), std::move(batch));
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
uassertStatusOKWithContext(ex.toStatus(),
"$merge failed to update the matching document, did you "

View File

@ -84,6 +84,8 @@ public:
BatchedCommandRequest makeBatchedWriteRequest() const override;
std::pair<BatchObject, int> makeBatchObject(Document doc) const override;
bool shouldFlush(size_t batchSize) const final;
private:
void flush(BatchedCommandRequest bcr, BatchedObjects batch) override;
void waitWhileFailPointEnabled() override;

View File

@ -288,9 +288,9 @@ void OutStage::flush(BatchedCommandRequest bcr, BatchedObjects batch) {
auto targetEpoch = boost::none;
if (_timeseries) {
for (const auto& insertStatus : pExpCtx->getMongoProcessInterface()->insertTimeseries(
for (const auto& writeError : pExpCtx->getMongoProcessInterface()->insertTimeseries(
pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)) {
uassertStatusOK(insertStatus);
uassertStatusOK(writeError.getStatus());
}
} else {
// Use the UUID to catch a mismatch if the temp collection was dropped and recreated.
@ -302,9 +302,9 @@ void OutStage::flush(BatchedCommandRequest bcr, BatchedObjects batch) {
insertCommand->getWriteCommandRequestBase().setCollectionUUID(_tempNsUUID);
}
try {
for (const auto& insertStatus : pExpCtx->getMongoProcessInterface()->insert(
for (const auto& writeError : pExpCtx->getMongoProcessInterface()->insert(
pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)) {
uassertStatusOK(insertStatus);
uassertStatusOK(writeError.getStatus());
}
} catch (ExceptionFor<ErrorCodes::CollectionUUIDMismatch>& ex) {

View File

@ -141,7 +141,7 @@ protected:
bufferedBytes += objSize;
if (!batch.empty() &&
(bufferedBytes > maxBatchSizeBytes ||
batch.size() >= write_ops::kMaxWriteBatchSize)) {
batch.size() >= write_ops::kMaxWriteBatchSize || shouldFlush(batch.size()))) {
flush(std::move(batchWrite), std::move(batch));
batch.clear();
batchWrite = makeBatchedWriteRequest();
@ -218,6 +218,10 @@ protected:
*/
virtual std::pair<B, int> makeBatchObject(Document doc) const = 0;
virtual bool shouldFlush(size_t currentBatchSize) const {
return false;
}
/**
* A subclass may override this method to enable a fail point right after a next input element
* has been retrieved, but not processed yet.

View File

@ -89,7 +89,10 @@ const auto kDefaultPipelineLet = BSON("new" << "$$ROOT");
* Checks if a pair of whenMatched/whenNotMatched merge modes is supported.
*/
bool isSupportedMergeMode(WhenMatched whenMatched, WhenNotMatched whenNotMatched) {
return getMergeStrategyDescriptors().count({whenMatched, whenNotMatched}) > 0;
// AllowInsertWithUpdateBackupStrategies doesn't affect the set of supported modes, so we can
// pass any value.
return getMergeStrategyDescriptors(MergeProcessor::AllowInsertWithUpdateBackupStrategies{false})
.count({whenMatched, whenNotMatched}) > 0;
}
/**
@ -209,8 +212,12 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed
PrivilegeVector DocumentSourceMerge::LiteParsed::requiredPrivileges(
bool isMongos, bool bypassDocumentValidation) const {
tassert(11282974, "Missing foreignNss", _foreignNss);
auto actions =
ActionSet{getMergeStrategyDescriptors().at({_whenMatched, _whenNotMatched}).actions};
// AllowInsertWithUpdateBackupStrategies doesn't affect required priviledges, so we can pass any
// value.
auto actions = ActionSet{
getMergeStrategyDescriptors(MergeProcessor::AllowInsertWithUpdateBackupStrategies{false})
.at({_whenMatched, _whenNotMatched})
.actions};
if (bypassDocumentValidation) {
actions.addAction(ActionType::bypassDocumentValidation);
}
@ -218,15 +225,17 @@ PrivilegeVector DocumentSourceMerge::LiteParsed::requiredPrivileges(
return {{ResourcePattern::forExactNamespace(*_foreignNss), actions}};
}
DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WhenMatched whenMatched,
WhenNotMatched whenNotMatched,
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues)
DocumentSourceMerge::DocumentSourceMerge(
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WhenMatched whenMatched,
WhenNotMatched whenNotMatched,
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues,
MergeProcessor::AllowInsertWithUpdateBackupStrategies allowInsertWithUpdateBackupStrategies)
: DocumentSourceWriter(kStageName.data(), std::move(outputNs), expCtx),
_mergeOnFields(std::make_shared<std::set<FieldPath>>(std::move(mergeOnFields))),
_mergeOnFieldsIncludesId(_mergeOnFields->count("_id") == 1),
@ -236,7 +245,9 @@ DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
std::move(letVariables),
std::move(pipeline),
std::move(collectionPlacementVersion),
allowMergeOnNullishValues)) {};
allowMergeOnNullishValues,
allowInsertWithUpdateBackupStrategies)) {};
boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
NamespaceString outputNs,
@ -248,6 +259,31 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues) {
return DocumentSourceMerge::create(
std::move(outputNs),
expCtx,
whenMatched,
whenNotMatched,
std::move(letVariables),
std::move(pipeline),
std::move(mergeOnFields),
std::move(collectionPlacementVersion),
allowMergeOnNullishValues,
MergeProcessor::AllowInsertWithUpdateBackupStrategies{
feature_flags::gFeatureFlagMergeStageInsertWithUpdateBackup.isEnabled()});
}
boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WhenMatched whenMatched,
WhenNotMatched whenNotMatched,
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues,
MergeProcessor::AllowInsertWithUpdateBackupStrategies allowInsertWithUpdateBackupStrategies) {
uassert(51189,
fmt::format("Combination of {} modes 'whenMatched: {}' and 'whenNotMatched: {}' "
"is not supported",
@ -306,7 +342,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
std::move(pipeline),
std::move(mergeOnFields),
std::move(collectionPlacementVersion),
allowMergeOnNullishValues);
allowMergeOnNullishValues,
allowInsertWithUpdateBackupStrategies);
}
boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson(

View File

@ -179,7 +179,8 @@ public:
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final;
/**
* Creates a new $merge stage from the given arguments.
* Creates a new $merge stage from the given arguments. AllowInsertWithUpdateBackupStrategies is
* set according to the feature flag.
*/
static boost::intrusive_ptr<DocumentSource> create(
NamespaceString outputNs,
@ -192,6 +193,22 @@ public:
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues);
/**
* Creates a new $merge stage from the given arguments.
*/
static boost::intrusive_ptr<DocumentSource> create(
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
MergeStrategyDescriptor::WhenMatched whenMatched,
MergeStrategyDescriptor::WhenNotMatched whenNotMatched,
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues,
MergeProcessor::AllowInsertWithUpdateBackupStrategies
allowInsertWithUpdateBackupStrategies);
/**
* Parses a $merge stage from the user-supplied BSON.
*/
@ -210,6 +227,9 @@ public:
}
}
const std::set<FieldPath>& getMergeOnFields() const {
return *_mergeOnFields;
}
private:
friend boost::intrusive_ptr<exec::agg::Stage> documentSourceMergeToStageFn(
@ -229,7 +249,9 @@ private:
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues);
bool allowMergeOnNullishValues,
MergeProcessor::AllowInsertWithUpdateBackupStrategies
allowInsertWithUpdateBackupStrategies);
// Holds the fields used for uniquely identifying documents. There must exist a unique index

View File

@ -1175,5 +1175,19 @@ TEST_F(DocumentSourceMergeTest, QueryShape) {
<< "Expected [" << expectedBson << "] but found [" << result << "]";
}
TEST_F(DocumentSourceMergeTest, AllowInsertWithUpdateBackupStrategiesSameModesAndPrivileges) {
const auto& mapFalse =
getMergeStrategyDescriptors(MergeProcessor::AllowInsertWithUpdateBackupStrategies{false});
const auto& mapTrue =
getMergeStrategyDescriptors(MergeProcessor::AllowInsertWithUpdateBackupStrategies{false});
ASSERT_EQ(mapFalse.size(), mapTrue.size());
for (const auto& [mode, descriptorFalse] : mapFalse) {
const auto& descriptorTrue = mapTrue.at(mode);
ASSERT_EQ(descriptorFalse.actions, descriptorTrue.actions);
}
}
} // namespace
} // namespace mongo

View File

@ -32,6 +32,8 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/variable_validation.h"
#include "mongo/db/pipeline/writer_util.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
@ -50,6 +52,8 @@ using BatchTransform = MergeStrategyDescriptor::BatchTransform;
using UpdateModification = write_ops::UpdateModification;
using UpsertType = MongoProcessInterface::UpsertType;
enum AllowDuplicateKeyErrorsFromMergeIndex : bool {};
BatchedCommandGenerator makeInsertCommandGenerator() {
return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest {
return makeInsertCommand(ns, expCtx->getBypassDocumentValidation());
@ -107,11 +111,13 @@ std::vector<write_ops::UpdateOpEntry> constructUpdateEntries(
MergeStrategy makeUpdateStrategy() {
return [](const auto& expCtx,
const auto& ns,
const auto& mergeOnFields,
const auto& wc,
auto epoch,
auto&& batch,
auto&& bcr,
UpsertType upsert) {
UpsertType upsert,
InsertStrategyStatistics& _) {
constexpr auto multi = false;
auto updateCommand = bcr.extractUpdateRequest();
updateCommand->setUpdates(constructUpdateEntries(expCtx, std::move(batch), upsert, multi));
@ -130,11 +136,13 @@ MergeStrategy makeUpdateStrategy() {
MergeStrategy makeStrictUpdateStrategy() {
return [](const auto& expCtx,
const auto& ns,
const auto& mergeOnFields,
const auto& wc,
auto epoch,
auto&& batch,
auto&& bcr,
UpsertType upsert) {
UpsertType upsert,
InsertStrategyStatistics& _) {
const int64_t batchSize = batch.size();
constexpr auto multi = false;
auto updateCommand = bcr.extractUpdateRequest();
@ -148,17 +156,78 @@ MergeStrategy makeStrictUpdateStrategy() {
};
}
bool collatorsMatch(const ExpressionContext* expCtx, const BSONObj& indexCollator) {
if (CollatorInterface::isSimpleCollator(expCtx->getCollator())) {
return indexCollator.isEmpty() || indexCollator.woCompare(CollationSpec::kSimpleSpec) == 0;
} else {
auto indexCollatorInterface = uassertStatusOK(
CollatorFactoryInterface::get(expCtx->getOperationContext()->getServiceContext())
->makeFromBSON(indexCollator));
return CollatorInterface::collatorsMatch(expCtx->getCollator(),
indexCollatorInterface.get());
}
}
bool keyPatternNamesExactPaths(const BSONObj& keyPattern,
const std::set<FieldPath>& uniqueKeyPaths) {
size_t nFieldsMatched = 0;
for (auto&& elem : keyPattern) {
if (!elem.isNumber()) {
return false;
}
if (uniqueKeyPaths.find(elem.fieldNameStringData()) == uniqueKeyPaths.end()) {
return false;
}
++nFieldsMatched;
}
return nFieldsMatched == uniqueKeyPaths.size();
}
bool ignoreDuplicateKeyError(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const std::set<FieldPath>& mergeOnFields,
const DuplicateKeyErrorInfo& errorInfo) {
if (!collatorsMatch(expCtx.get(), errorInfo.getCollation())) {
return false;
}
const auto& keyPattern = errorInfo.getKeyPattern();
if (keyPatternNamesExactPaths(keyPattern, mergeOnFields)) {
return true;
}
if (keyPattern.nFields() == 1 && keyPattern.firstElementFieldNameStringData() == "_id" &&
mergeOnFields.contains("_id")) {
return true;
}
return false;
}
template <AllowDuplicateKeyErrorsFromMergeIndex allowDuplicateKeyErrorsFromMergeIndex>
inline bool canIgnoreInsertStatus(const Status& status,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const std::set<FieldPath>& mergeOnFields) {
if constexpr (!allowDuplicateKeyErrorsFromMergeIndex) {
return false;
} else {
if (status.code() == ErrorCodes::DuplicateKey) {
const auto& extraInfo = *status.template extraInfo<DuplicateKeyErrorInfo>();
return ignoreDuplicateKeyError(expCtx, mergeOnFields, extraInfo);
}
return false;
}
}
/**
* Creates a merge strategy which uses insert semantics to perform a merge operation.
*/
MergeStrategy makeInsertStrategy() {
return [](const auto& expCtx,
const auto& ns,
return [](const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
const std::set<FieldPath>& mergeOnFields,
const auto& wc,
auto epoch,
auto&& batch,
auto&& bcr,
UpsertType upsertType) {
UpsertType upsertType,
InsertStrategyStatistics& _) {
std::vector<BSONObj> objectsToInsert(batch.size());
// The batch stores replacement style updates, but for this "insert" style of $merge we'd
// like to just insert the new document without attempting any sort of replacement.
@ -167,13 +236,129 @@ MergeStrategy makeInsertStrategy() {
});
auto insertCommand = bcr.extractInsertRequest();
insertCommand->setDocuments(std::move(objectsToInsert));
for (const auto& insertStatus : expCtx->getMongoProcessInterface()->insert(
expCtx, ns, std::move(insertCommand), wc, epoch)) {
uassertStatusOK(insertStatus);
auto insertResult = expCtx->getMongoProcessInterface()->insert(
expCtx, ns, std::move(insertCommand), wc, epoch);
for (const write_ops::WriteError& writeError : insertResult) {
uassertStatusOK(writeError.getStatus());
}
};
}
bool shouldAttemptInsert(const InsertStrategyStatistics& insertStats) {
return insertStats.insertDocAttempts <
static_cast<size_t>(internalQueryMergeMinInsertAttempts.loadRelaxed()) ||
static_cast<double>(insertStats.insertErrors) / insertStats.insertDocAttempts <=
internalQueryMergeMaxInsertErrorRate.loadRelaxed();
}
void runBackupStrategy(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
const std::set<FieldPath>& mergeOnFields,
const auto& wc,
auto epoch,
MongoProcessInterface::BatchedObjects&& batch,
BatchedCommandRequest&& bcr,
UpsertType upsertType,
const MergeStrategy& backupStrategy,
const BatchTransform& backupTransform,
InsertStrategyStatistics& insertStats) {
if (batch.empty()) {
return;
}
if (backupTransform) {
for (auto& batchObject : batch) {
backupTransform(batchObject);
}
}
backupStrategy(expCtx,
ns,
mergeOnFields,
wc,
epoch,
std::move(batch),
std::move(bcr),
upsertType,
insertStats);
}
template <AllowDuplicateKeyErrorsFromMergeIndex allowDuplicateKeyErrorsFromMergeIndex>
MergeStrategy makeInsertStrategyWithBackup(BatchTransform backupTransform,
UpsertType backupUpsertType) {
return [backupTransform = std::move(backupTransform),
backupUpsertType = backupUpsertType,
backupStrategy = makeUpdateStrategy(),
backupCommandGenerator =
makeUpdateCommandGenerator()](const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
const std::set<FieldPath>& mergeOnFields,
const auto& wc,
auto epoch,
MongoProcessInterface::BatchedObjects&& batch,
auto&& bcr,
UpsertType upsertType,
InsertStrategyStatistics& insertStats) {
if (!shouldAttemptInsert(insertStats)) {
return runBackupStrategy(expCtx,
ns,
mergeOnFields,
wc,
epoch,
std::move(batch),
backupCommandGenerator(expCtx, ns),
backupUpsertType,
backupStrategy,
backupTransform,
insertStats);
}
std::vector<BSONObj> objectsToInsert(batch.size());
// The batch stores replacement style updates, but for this "insert" style of $merge we'd
// like to just insert the new document without attempting any sort of replacement.
std::transform(batch.begin(), batch.end(), objectsToInsert.begin(), [](const auto& obj) {
return get<UpdateModification>(obj).getUpdateReplacement();
});
auto insertCommand = bcr.extractInsertRequest();
insertCommand->setDocuments(std::move(objectsToInsert));
auto insertResult = expCtx->getMongoProcessInterface()->insert(
expCtx, ns, std::move(insertCommand), wc, epoch);
insertStats.insertDocAttempts += batch.size();
insertStats.insertErrors += insertResult.size();
if (insertResult.empty()) {
return;
}
MongoProcessInterface::BatchedObjects updateBatch;
updateBatch.reserve(insertResult.size());
for (const write_ops::WriteError& writeError : insertResult) {
if (writeError.getStatus().isOK() ||
canIgnoreInsertStatus<allowDuplicateKeyErrorsFromMergeIndex>(
writeError.getStatus(), expCtx, mergeOnFields)) {
continue;
}
// DuplicateKey error might mean that matching document exists, so we retry the
// operation with update strategy instead. Any other error should be propagated.
if (writeError.getStatus().code() != ErrorCodes::DuplicateKey) {
uassertStatusOK(writeError.getStatus());
}
updateBatch.push_back(std::move(batch[writeError.getIndex()]));
}
runBackupStrategy(expCtx,
ns,
mergeOnFields,
wc,
epoch,
std::move(updateBatch),
backupCommandGenerator(expCtx, ns),
backupUpsertType,
backupStrategy,
backupTransform,
insertStats);
};
}
/**
* Creates a batched object transformation function which wraps 'obj' into the given 'updateOp'
* operator.
@ -185,6 +370,13 @@ BatchTransform makeUpdateTransform(const std::string& updateOp) {
};
}
template <typename BaseContainer, typename ExtendedContainer>
BaseContainer extendContainer(BaseContainer baseContainer, ExtendedContainer extendedContainer) {
baseContainer.insert(std::make_move_iterator(extendedContainer.begin()),
std::make_move_iterator(extendedContainer.end()));
return baseContainer;
}
} // namespace
/**
@ -195,23 +387,18 @@ BatchTransform makeUpdateTransform(const std::string& updateOp) {
* from this map based on the requested merge modes, and then passed to the $merge stage
* constructor.
*/
const MergeStrategyDescriptorsMap& getMergeStrategyDescriptors() {
const MergeStrategyDescriptorsMap& getMergeStrategyDescriptors(
MergeProcessor::AllowInsertWithUpdateBackupStrategies allowInsertWithUpdateBackupStrategies) {
// Rather than defining this map as a global object, we'll wrap the static map into a function
// to prevent static initialization order fiasco which may happen since ActionType instances
// are also defined as global objects and there is no way to tell the linker which objects must
// be initialized first. By wrapping the map into a function we can guarantee that it won't be
// initialized until the first use, which is when the program already started and all global
// variables had been initialized.
static const auto mergeStrategyDescriptors =
MergeStrategyDescriptorsMap{// whenMatched: replace, whenNotMatched: insert
{MergeStrategyDescriptor::kReplaceInsertMode,
{MergeStrategyDescriptor::kReplaceInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(),
{},
UpsertType::kGenerateNewDoc,
makeUpdateCommandGenerator()}},
// whenMatched: replace, whenNotMatched: fail
// This map contains merge strategy descriptors that don't depend on the feature flag
static const auto kBaseMergeStrategyDescriptors =
MergeStrategyDescriptorsMap{// whenMatched: replace, whenNotMatched: fail
{MergeStrategyDescriptor::kReplaceFailMode,
{MergeStrategyDescriptor::kReplaceFailMode,
{ActionType::update},
@ -227,14 +414,6 @@ const MergeStrategyDescriptorsMap& getMergeStrategyDescriptors() {
{},
UpsertType::kNone,
makeUpdateCommandGenerator()}},
// whenMatched: merge, whenNotMatched: insert
{MergeStrategyDescriptor::kMergeInsertMode,
{MergeStrategyDescriptor::kMergeInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(),
makeUpdateTransform("$set"),
UpsertType::kGenerateNewDoc,
makeUpdateCommandGenerator()}},
// whenMatched: merge, whenNotMatched: fail
{MergeStrategyDescriptor::kMergeFailMode,
{MergeStrategyDescriptor::kMergeFailMode,
@ -251,14 +430,6 @@ const MergeStrategyDescriptorsMap& getMergeStrategyDescriptors() {
makeUpdateTransform("$set"),
UpsertType::kNone,
makeUpdateCommandGenerator()}},
// whenMatched: keepExisting, whenNotMatched: insert
{MergeStrategyDescriptor::kKeepExistingInsertMode,
{MergeStrategyDescriptor::kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(),
makeUpdateTransform("$setOnInsert"),
UpsertType::kGenerateNewDoc,
makeUpdateCommandGenerator()}},
// whenMatched: [pipeline], whenNotMatched: insert
{MergeStrategyDescriptor::kPipelineInsertMode,
{MergeStrategyDescriptor::kPipelineInsertMode,
@ -291,19 +462,89 @@ const MergeStrategyDescriptorsMap& getMergeStrategyDescriptors() {
{},
UpsertType::kNone,
makeInsertCommandGenerator()}}};
return mergeStrategyDescriptors;
static const auto kMergeStrategyDescriptorsWithoutBackup = extendContainer(
kBaseMergeStrategyDescriptors,
MergeStrategyDescriptorsMap{// whenMatched: replace, whenNotMatched: insert
{MergeStrategyDescriptor::kReplaceInsertMode,
{MergeStrategyDescriptor::kReplaceInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(),
{},
UpsertType::kGenerateNewDoc,
makeUpdateCommandGenerator()}},
// whenMatched: merge, whenNotMatched: insert
{MergeStrategyDescriptor::kMergeInsertMode,
{MergeStrategyDescriptor::kMergeInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(),
makeUpdateTransform("$set"),
UpsertType::kGenerateNewDoc,
makeUpdateCommandGenerator()}},
// whenMatched: keepExisting, whenNotMatched: insert
{MergeStrategyDescriptor::kKeepExistingInsertMode,
{MergeStrategyDescriptor::kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
makeUpdateStrategy(),
makeUpdateTransform("$setOnInsert"),
UpsertType::kGenerateNewDoc,
makeUpdateCommandGenerator()}}});
static const auto kMergeStrategyDescriptorsWithBackup = extendContainer(
kBaseMergeStrategyDescriptors,
MergeStrategyDescriptorsMap{
// whenMatched: replace, whenNotMatched: insert with backup
{MergeStrategyDescriptor::kReplaceInsertMode,
{MergeStrategyDescriptor::kReplaceInsertMode,
{ActionType::insert, ActionType::update},
makeInsertStrategyWithBackup<AllowDuplicateKeyErrorsFromMergeIndex{false}>(
{}, UpsertType::kGenerateNewDoc),
{},
UpsertType::kNone,
makeInsertCommandGenerator(),
/*.isInsertWithUpdateBackupStrategy=*/true}},
// whenMatched: merge, whenNotMatched: insert with backup
{MergeStrategyDescriptor::kMergeInsertMode,
{MergeStrategyDescriptor::kMergeInsertMode,
{ActionType::insert, ActionType::update},
makeInsertStrategyWithBackup<AllowDuplicateKeyErrorsFromMergeIndex{false}>(
makeUpdateTransform("$set"), UpsertType::kGenerateNewDoc),
{},
UpsertType::kNone,
makeInsertCommandGenerator(),
/*.isInsertWithUpdateBackupStrategy=*/true}},
// whenMatched: keepExisting, whenNotMatched: insert with backup
{MergeStrategyDescriptor::kKeepExistingInsertMode,
{MergeStrategyDescriptor::kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
makeInsertStrategyWithBackup<AllowDuplicateKeyErrorsFromMergeIndex{true}>(
makeUpdateTransform("$setOnInsert"), UpsertType::kGenerateNewDoc),
{},
UpsertType::kNone,
makeInsertCommandGenerator(),
/*.isInsertWithUpdateBackupStrategy=*/true}},
});
if (allowInsertWithUpdateBackupStrategies) {
return kMergeStrategyDescriptorsWithBackup;
} else {
return kMergeStrategyDescriptorsWithoutBackup;
}
}
MergeProcessor::MergeProcessor(const boost::intrusive_ptr<ExpressionContext>& expCtx,
MergeStrategyDescriptor::WhenMatched whenMatched,
MergeStrategyDescriptor::WhenNotMatched whenNotMatched,
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues)
MergeProcessor::MergeProcessor(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
MergeStrategyDescriptor::WhenMatched whenMatched,
MergeStrategyDescriptor::WhenNotMatched whenNotMatched,
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues,
AllowInsertWithUpdateBackupStrategies allowInsertWithUpdateBackupStrategies)
: _expCtx(expCtx),
_writeConcern(expCtx->getOperationContext()->getWriteConcern()),
_descriptor(getMergeStrategyDescriptors().at({whenMatched, whenNotMatched})),
_descriptor(getMergeStrategyDescriptors(allowInsertWithUpdateBackupStrategies)
.at({whenMatched, whenNotMatched})),
_pipeline(std::move(pipeline)),
_collectionPlacementVersion(collectionPlacementVersion),
_allowMergeOnNullishValues(allowMergeOnNullishValues) {
@ -349,18 +590,21 @@ MongoProcessInterface::BatchObject MergeProcessor::makeBatchObject(
}
void MergeProcessor::flush(const NamespaceString& outputNs,
const std::set<FieldPath>& mergeOnFields,
BatchedCommandRequest bcr,
MongoProcessInterface::BatchedObjects batch) const {
MongoProcessInterface::BatchedObjects batch) {
auto targetEpoch = _collectionPlacementVersion
? boost::optional<OID>(_collectionPlacementVersion->epoch())
: boost::none;
_descriptor.strategy(_expCtx,
outputNs,
mergeOnFields,
_writeConcern,
targetEpoch,
std::move(batch),
std::move(bcr),
_descriptor.upsertType);
_descriptor.upsertType,
_insertStats);
}
BSONObj MergeProcessor::_extractMergeOnFieldsFromDoc(
@ -393,5 +637,10 @@ BSONObj MergeProcessor::_extractMergeOnFieldsFromDoc(
return result.freeze().toBson();
}
bool MergeProcessor::shouldFlush(size_t currentBatchSize) {
return _descriptor.isInsertWithUpdateBackupStrategy &&
_insertStats.insertDocAttempts < _insertStats.minInsertAttempts &&
_insertStats.insertDocAttempts + currentBatchSize >= _insertStats.minInsertAttempts;
}
} // namespace mongo

View File

@ -45,6 +45,14 @@
namespace MONGO_MOD_PUBLIC mongo {
struct MONGO_MOD_PRIVATE InsertStrategyStatistics {
size_t insertDocAttempts = 0;
size_t insertErrors = 0;
const size_t minInsertAttempts =
static_cast<size_t>(internalQueryMergeMinInsertAttempts.loadRelaxed());
const double maxInsertErrorRate = internalQueryMergeMaxInsertErrorRate.loadRelaxed();
};
// A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the
// client should be authorized to perform in order to be able to execute a merge operation using
// this merge strategy. Additionally holds a 'BatchedCommandGenerator' that will initialize a
@ -60,11 +68,13 @@ struct MergeStrategyDescriptor {
// whenMatched/whenNotMatched modes.
using MergeStrategy = std::function<void(const boost::intrusive_ptr<ExpressionContext>&,
const NamespaceString&,
const std::set<FieldPath>&,
const WriteConcernOptions&,
boost::optional<OID>,
MongoProcessInterface::BatchedObjects&&,
BatchedCommandRequest&&,
UpsertType upsert)>;
UpsertType upsert,
InsertStrategyStatistics&)>;
// A function object that will be invoked to generate a BatchedCommandRequest.
using BatchedCommandGenerator = std::function<BatchedCommandRequest(
@ -97,17 +107,21 @@ struct MergeStrategyDescriptor {
BatchTransform transform;
UpsertType upsertType;
BatchedCommandGenerator batchedCommandGenerator;
bool isInsertWithUpdateBackupStrategy = false;
};
const std::map<const MergeStrategyDescriptor::MergeMode, const MergeStrategyDescriptor>&
getMergeStrategyDescriptors();
/**
* This class is used by the aggregation framework and streams enterprise module
* to perform the document processing needed for $merge.
*/
class MergeProcessor {
public:
/**
* The strictly-typed flag to allow or disable merge strategires that try insert first and
* fallback to update if errors happen.
*/
enum AllowInsertWithUpdateBackupStrategies : bool {};
/**
* If 'collectionPlacementVersion' is provided then processing will stop with an error if the
* collection's epoch changes during the course of execution. This is used as a mechanism to
@ -119,7 +133,8 @@ public:
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
boost::optional<ChunkVersion> collectionPlacementVersion,
bool allowMergeOnNullishValues);
bool allowMergeOnNullishValues,
AllowInsertWithUpdateBackupStrategies allowInsertWithUpdateBackupStrategies);
const MergeStrategyDescriptor& getMergeStrategyDescriptor() const {
return _descriptor;
@ -146,8 +161,11 @@ public:
bool mergeOnFieldPathsIncludeId) const;
void flush(const NamespaceString& outputNs,
const std::set<FieldPath>& mergeOnFieldPaths,
BatchedCommandRequest bcr,
MongoProcessInterface::BatchedObjects batch) const;
MongoProcessInterface::BatchedObjects batch);
bool shouldFlush(size_t currentBatchSize);
private:
/**
@ -209,6 +227,12 @@ private:
boost::optional<ChunkVersion> _collectionPlacementVersion;
bool _allowMergeOnNullishValues;
InsertStrategyStatistics _insertStats;
};
const std::map<const MergeStrategyDescriptor::MergeMode, const MergeStrategyDescriptor>&
getMergeStrategyDescriptors(
MergeProcessor::AllowInsertWithUpdateBackupStrategies allowInsertWithUpdateBackupStrategies);
} // namespace MONGO_MOD_PUBLIC mongo

View File

@ -237,12 +237,12 @@ public:
virtual void updateClientOperationTime(OperationContext* opCtx) const = 0;
/**
* Executes 'insertCommand' against 'ns'. Returns a vector of statuses. Will contain at least
* one error status if insert failed to not swallow any errors. If 'targetEpoch' is set, throws
* ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or the epoch
* changes during the course of the insert.
* Executes 'insertCommand' against 'ns'. Returns an empty vector on success and a vector of
* write erros on failure. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the
* targeted collection does not have the same epoch or the epoch changes during the course of
* the insert.
*/
using InsertResult = absl::InlinedVector<Status, 4>;
using InsertResult = std::vector<write_ops::WriteError>;
virtual InsertResult insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,

View File

@ -207,14 +207,13 @@ MongoProcessInterface::InsertResult NonShardServerProcessInterface::insert(
write_ops_exec::performInserts(expCtx->getOperationContext(), *insertCommand);
InsertResult results;
for (const auto& writeResult : writeResults.results) {
if (writeResult.getStatus() != Status::OK()) {
results.push_back(writeResult.getStatus());
results.reserve(writeResults.results.size());
for (size_t i = 0; i < writeResults.results.size(); ++i) {
Status status = writeResults.results[i].getStatus();
if (!status.isOK()) {
results.emplace_back(static_cast<int32_t>(i), std::move(status));
}
}
if (results.empty()) {
results.push_back(Status::OK());
}
return results;
}
@ -224,21 +223,16 @@ MongoProcessInterface::InsertResult NonShardServerProcessInterface::insertTimese
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
InsertResult result;
auto [preConditions, _] = timeseries::getCollectionPreConditionsAndIsTimeseriesLogicalRequest(
expCtx->getOperationContext(), ns, *insertCommand, insertCommand->getCollectionUUID());
auto insertReply = timeseries::write_ops::performTimeseriesWrites(
expCtx->getOperationContext(), *insertCommand, preConditions);
InsertResult result;
if (insertReply.getWriteErrors().has_value()) {
for (const auto& writeError : *insertReply.getWriteErrors()) {
result.push_back(writeError.getStatus());
}
result.assign(insertReply.getWriteErrors()->begin(), insertReply.getWriteErrors()->end());
uassert(10903400, "Write errors must not be empty", !result.empty());
} else {
result.push_back(Status::OK());
}
return result;
}

View File

@ -103,25 +103,21 @@ MongoProcessInterface::InsertResult ReplicaSetNodeProcessInterface::insert(
auto statusWithReply = _executeCommandOnPrimaryRaw(opCtx, ns, batchInsertCommand.toBSON());
if (!statusWithReply.isOK()) {
return {statusWithReply.getStatus()};
return {write_ops::WriteError{0, statusWithReply.getStatus()}};
}
BatchedCommandResponse response;
std::string errMsg;
InsertResult result;
if (!response.parseBSON(statusWithReply.getValue(), &errMsg)) {
result.emplace_back(ErrorCodes::FailedToParse, errMsg);
result.emplace_back(0, Status{ErrorCodes::FailedToParse, errMsg});
} else if (!response.getOk()) {
result.push_back(response.getTopLevelStatus());
result.emplace_back(0, response.getTopLevelStatus());
} else if (response.isErrDetailsSet()) {
result.reserve(response.getErrDetails().size());
for (const auto& error : response.getErrDetails()) {
result.push_back(error.getStatus());
}
result.assign(response.getErrDetails().begin(), response.getErrDetails().end());
} else if (response.isWriteConcernErrorSet()) {
result.push_back(response.getWriteConcernError()->toStatus());
} else {
result.push_back(Status::OK());
result.emplace_back(0, response.getWriteConcernError()->toStatus());
}
return result;
}

View File

@ -224,16 +224,12 @@ MongoProcessInterface::InsertResult ShardServerProcessInterface::insert(
InsertResult result;
if (!response.getOk()) {
result.push_back(response.getTopLevelStatus());
result.emplace_back(0, response.getTopLevelStatus());
} else if (response.isErrDetailsSet()) {
result.reserve(response.getErrDetails().size());
for (const auto& error : response.getErrDetails()) {
result.push_back(error.getStatus());
}
result.assign(response.getErrDetails().begin(), response.getErrDetails().end());
} else if (response.isWriteConcernErrorSet()) {
result.push_back(response.getWriteConcernError()->toStatus());
} else {
result.push_back(Status::OK());
result.emplace_back(0, response.getWriteConcernError()->toStatus());
}
return result;
}

View File

@ -349,3 +349,9 @@ feature_flags:
default: false
fcv_gated: false
incremental_rollout_phase: in_development
featureFlagMergeStageInsertWithUpdateBackup:
description: "Feature flag to enable $merge stage strategy that tries to insert documents and falls back to update in case of errors"
cpp_varname: gFeatureFlagMergeStageInsertWithUpdateBackup
default: true
fcv_gated: false

View File

@ -1857,6 +1857,27 @@ server_parameters:
cpp_vartype: bool
default: false
redact: false
internalQueryMergeMinInsertAttempts:
description: >-
When $merge stage is using a insert with backup strategy, a minimal number of insert
attempts before it can switch to update permamently.
set_at: [startup, runtime]
cpp_varname: internalQueryMergeMinInsertAttempts
cpp_vartype: AtomicWord<long long>
default: 1000
redact: false
internalQueryMergeMaxInsertErrorRate:
description: >-
When $merge stage is using a insert with backup strategy, a max allowed insert error
rate. If insert error rate is higher, than the stage will use update without trying
insert first.
set_at: [startup, runtime]
cpp_varname: internalQueryMergeMaxInsertErrorRate
cpp_vartype: AtomicWord<double>
default: 0.7
redact: false
# Note for adding additional query knobs:
#
# When adding a new query knob, you should consider whether or not you need to add an 'on_update'