From ecaeff59031f80b091f9ee8dee34fc040fd3932e Mon Sep 17 00:00:00 2001 From: Drew Paroski Date: Thu, 12 Feb 2026 10:57:34 -0500 Subject: [PATCH] SERVER-119175 [UWE] Fix logic for ignoring CollectionUUIDMismatch errors when appropriate (#47745) GitOrigin-RevId: 2ea298282f8f7d2f55f8ea2ad213b7d3284ed0be --- jstests/sharding/OWNERS.yml | 3 + ...swallow_unnecessary_uuid_mismatch_error.js | 94 ++++-- .../write_batch_executor.cpp | 3 +- .../write_batch_executor.h | 6 +- .../write_batch_response_processor.cpp | 315 +++++++++++------- .../write_batch_response_processor.h | 41 ++- .../write_op_analyzer.cpp | 7 +- .../write_op_analyzer.h | 3 +- .../write_op_batcher.cpp | 5 + .../unified_write_executor/write_op_batcher.h | 2 +- src/mongo/s/write_ops/write_op.cpp | 2 +- src/mongo/s/write_ops/write_op_helper.cpp | 10 +- src/mongo/s/write_ops/write_op_helper.h | 17 +- 13 files changed, 316 insertions(+), 192 deletions(-) diff --git a/jstests/sharding/OWNERS.yml b/jstests/sharding/OWNERS.yml index e559426cad2..a053856c981 100644 --- a/jstests/sharding/OWNERS.yml +++ b/jstests/sharding/OWNERS.yml @@ -163,3 +163,6 @@ filters: - "hashed_shard_key_index_optional_shard_collection.js": approvers: - 10gen/server-cluster-scalability + - "swallow_unnecessary_uuid_mismatch_error.js": + approvers: + - 10gen/query-execution-router diff --git a/jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js b/jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js index 42a7495bd94..0360fedd148 100644 --- a/jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js +++ b/jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js @@ -12,33 +12,33 @@ const st = new ShardingTest({shards: 3}); const db = st.s.getDB(jsTestName()); assert.commandWorked(st.s.adminCommand({enableSharding: db.getName(), primaryShard: st.shard0.shardName})); -const shardedColl1 = db.sharded_1; - -assert.commandWorked(shardedColl1.insert({_id: 0})); -assert.commandWorked(shardedColl1.insert({_id: 10})); -assert.commandWorked(shardedColl1.insert({_id: 50})); -assert.commandWorked(shardedColl1.insert({_id: 100})); - -const splitChunk = function (coll, splitPointKeyValue) { - assert.commandWorked(st.s.adminCommand({split: coll.getFullName(), middle: {_id: splitPointKeyValue}})); -}; - -const uuid = function (coll) { +const getUUIDForCollection = function (coll) { return assert .commandWorked(db.runCommand({listCollections: 1})) .cursor.firstBatch.find((c) => c.name === coll.getName()).info.uuid; }; -// shard0: [inf, 50), shard1: [50, inf), shard2 has no chunks -assert.commandWorked(st.s.adminCommand({shardCollection: shardedColl1.getFullName(), key: {_id: 1}})); -splitChunk(shardedColl1, 50); -assert.commandWorked( - st.s.adminCommand({moveChunk: shardedColl1.getFullName(), find: {_id: 50}, to: st.shard1.shardName}), -); +const coll1 = db.sharded_1; +const collName1 = coll1.getName(); +const collFullName1 = coll1.getFullName(); -const cmdObj = { - delete: shardedColl1.getName(), - collectionUUID: uuid(shardedColl1), +const coll2 = db.sharded_2; +const collName2 = coll2.getName(); +const collFullName2 = coll2.getFullName(); + +assert.commandWorked(coll1.insert([{_id: 0}, {_id: 10}, {_id: 50}, {_id: 100}])); + +// shard0: [inf, 50), shard1: [50, inf), shard2 has no chunks +assert.commandWorked(st.s.adminCommand({shardCollection: collFullName1, key: {_id: 1}})); +assert.commandWorked(st.s.adminCommand({split: collFullName1, middle: {_id: 50}})); +assert.commandWorked(st.s.adminCommand({moveChunk: collFullName1, find: {_id: 50}, to: st.shard1.shardName})); + +let uuid1 = getUUIDForCollection(coll1); + +// Test multi deletes by _id, where the shard key equals "{_id: 1}". +let cmdObj = { + delete: collName1, + collectionUUID: uuid1, deletes: [ { q: { @@ -57,4 +57,56 @@ let res = db.runCommand(cmdObj); assert.commandWorked(res); // Only 3 documents fulfill the criteria so verify we've deleted them. assert.eq(3, res.n); + +assert.commandWorked(coll2.insert([{_id: 0}, {_id: 10}, {_id: 50}, {_id: 100}])); +assert.commandWorked(coll2.insert([{_id: 11}, {_id: 51}])); + +// Create an index on {x: 1}. +assert.commandWorked(db.runCommand({createIndexes: collName2, indexes: [{name: "x_1", key: {x: 1}}]})); + +// Shard the collection on {x: 1} and split up the key space so that shard0 owns [inf, 50) and +// shard1 owns [50, inf). +assert.commandWorked(st.s.adminCommand({shardCollection: collFullName2, key: {x: 1}})); +assert.commandWorked(st.s.adminCommand({split: collFullName2, middle: {x: 50}})); +assert.commandWorked(st.s.adminCommand({moveChunk: collFullName2, find: {x: 50}, to: st.shard1.shardName})); + +let uuid2 = getUUIDForCollection(coll2); + +// Test multi deletes by _id, where the shard key does not include "_id". +cmdObj = { + delete: collName2, + collectionUUID: uuid2, + deletes: [ + { + q: { + $and: [ + {$expr: {$eq: [{$mod: ["$_id", 2]}, {$literal: 0}]}}, + {$expr: {$gte: ["$_id", {$literal: 0}]}}, + {$expr: {$lt: ["$_id", {$literal: 99}]}}, + ], + }, + limit: 0, + hint: {_id: 1}, + }, + ], +}; + +res = db.runCommand(cmdObj); +assert.commandWorked(res); +assert.eq(3, res.n); + +// Test non-multi deletes by _id, where the shard key does not include "_id". +cmdObj = { + delete: collName2, + collectionUUID: uuid2, + deletes: [ + {q: {_id: 11}, limit: 1, hint: {_id: 1}}, + {q: {_id: 51}, limit: 1, hint: {_id: 1}}, + ], +}; + +res = db.runCommand(cmdObj); +assert.commandWorked(res); +assert.eq(2, res.n); + st.stop(); diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.cpp b/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.cpp index afd062145f0..d1ed7ea3eef 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.cpp @@ -1074,7 +1074,8 @@ WriteBatchResponse WriteBatchExecutor::_execute(OperationContext* opCtx, routingCtx.onRequestSentForNss(nss); } - auto resp = SimpleWriteBatchResponse::makeEmpty(batch.isRetryableWriteWithId); + auto resp = + SimpleWriteBatchResponse::makeEmpty(batch.isRetryableWriteWithId, batch.opsUsingSVIgnored); bool stopParsingResponses = false; while (!stopParsingResponses && !sender.done()) { diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.h b/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.h index 1cd5fa42e57..d23cc933646 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.h +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.h @@ -186,9 +186,11 @@ struct EmptyBatchResponse {}; struct SimpleWriteBatchResponse { std::vector> shardResponses; bool isRetryableWriteWithId = false; + absl::flat_hash_set opsUsingSVIgnored; - static SimpleWriteBatchResponse makeEmpty(bool isRetryableWriteWithId) { - return SimpleWriteBatchResponse{{}, isRetryableWriteWithId}; + static SimpleWriteBatchResponse makeEmpty(bool isRetryableWriteWithId, + absl::flat_hash_set opsUsingSVIgnored) { + return SimpleWriteBatchResponse{{}, isRetryableWriteWithId, std::move(opsUsingSVIgnored)}; } }; diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp index 35b2eecf6b2..7b578d41c29 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp @@ -53,41 +53,38 @@ using ItemVariant = WriteBatchResponseProcessor::ItemVariant; using Unexecuted = WriteBatchResponseProcessor::Unexecuted; using SucceededWithoutItem = WriteBatchResponseProcessor::SucceededWithoutItem; using FindAndModifyReplyItem = WriteBatchResponseProcessor::FindAndModifyReplyItem; -using GroupItemsResult = WriteBatchResponseProcessor::GroupItemsResult; using ItemsByOpMap = WriteBatchResponseProcessor::ItemsByOpMap; using ShardResult = WriteBatchResponseProcessor::ShardResult; namespace { -// This function returns the Status for a given ItemVariant ('itemVar'). If 'itemVar' is Unexecuted, -// this function will return an OK status. -Status getItemStatus(const ItemVariant& itemVar) { - return visit(OverloadedVisitor( - [&](const Unexecuted&) { return Status::OK(); }, - [&](const SucceededWithoutItem&) { return Status::OK(); }, - [&](const BulkWriteReplyItem& item) { return item.getStatus(); }, - [&](const FindAndModifyReplyItem& item) { return item.swReply.getStatus(); }), - itemVar); -} - // This function returns the error code for a given ItemVariant ('itemVar'). If 'itemVar' is -// Unexecuted, this function will return ErrorCodes::OK. -ErrorCodes::Error getErrorCode(const ItemVariant& itemVar) { +// Unexecuted, this function will return boost::none. +boost::optional getErrorCode(const ItemVariant& itemVar) { + using RetT = boost::optional; return visit( OverloadedVisitor( - [&](const Unexecuted&) { return ErrorCodes::OK; }, - [&](const SucceededWithoutItem&) { return ErrorCodes::OK; }, - [&](const BulkWriteReplyItem& item) { return item.getStatus().code(); }, - [&](const FindAndModifyReplyItem& item) { return item.swReply.getStatus().code(); }), + [&](const Unexecuted&) -> RetT { return boost::none; }, + [&](const SucceededWithoutItem&) -> RetT { return ErrorCodes::OK; }, + [&](const BulkWriteReplyItem& item) -> RetT { return item.getStatus().code(); }, + [&](const FindAndModifyReplyItem& item) -> RetT { + return item.swReply.getStatus().code(); + }), itemVar); } -// Like getErrorCode(), but takes a 'std::pair' as its input. -ErrorCodes::Error getErrorCodeForShardItemPair(const std::pair& p) { - return getErrorCode(p.second); +bool isOKItem(const ItemVariant& itemVar) { + auto code = getErrorCode(itemVar); + return code && *code == ErrorCodes::OK; } bool isRetryableError(const ItemVariant& itemVar) { - return write_op_helpers::isRetryErrCode(getErrorCode(itemVar)); + auto code = getErrorCode(itemVar); + return code && write_op_helpers::isRetryErrCode(*code); +} + +bool isNonRetryableError(const ItemVariant& itemVar) { + auto code = getErrorCode(itemVar); + return code && *code != ErrorCodes::OK && !write_op_helpers::isRetryErrCode(*code); } template @@ -122,6 +119,21 @@ std::shared_ptr getCannotImplicitlyC return {}; } +std::shared_ptr getCannotImplicitlyCreateCollectionInfo( + const ItemVariant& itemVar) { + using RetT = std::shared_ptr; + return visit(OverloadedVisitor( + [&](const Unexecuted&) -> RetT { return {}; }, + [&](const SucceededWithoutItem&) -> RetT { return {}; }, + [&](const BulkWriteReplyItem& item) -> RetT { + return getCannotImplicitlyCreateCollectionInfo(item.getStatus()); + }, + [&](const FindAndModifyReplyItem& item) -> RetT { + return getCannotImplicitlyCreateCollectionInfo(item.swReply.getStatus()); + }), + itemVar); +} + std::shared_ptr getCollectionUUIDMismatchInfo( const Status& status) { if (status == ErrorCodes::CollectionUUIDMismatch) { @@ -169,6 +181,9 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( OperationContext* opCtx, RoutingContext& routingCtx, const SimpleWriteBatchResponse& response) { const bool ordered = _cmdRef.getOrdered(); const bool inTransaction = static_cast(TransactionRouter::get(opCtx)); + auto isNonRetryableErrorForPair = [](auto&& p) { + return isNonRetryableError(p.second); + }; std::vector> shardResults; shardResults.reserve(response.shardResponses.size()); @@ -179,48 +194,53 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( onShardResponse(opCtx, routingCtx, shardId, shardResponse)); } - // Organize the items from the shard responses by op, and check if any of the items is an - // unrecoverable error. - auto [itemsByOp, unrecoverable, hasRetryableError] = groupItemsByOp(opCtx, shardResults); + // Organize the items from the shard responses by op. + auto itemsByOp = groupItemsByOp(opCtx, shardResults, response.opsUsingSVIgnored); - // For "RetryableWriteWithId" simple write batches, a different retry strategy is used. If the - // batch has any retryable errors (and 'inTransaction' and 'unrecoverable' are false), then all - // of the ops in the batch are retried regardless of what their execution status was. + // For "RetryableWriteWithId" simple write batches, a different retry strategy is used. If + // 'inTransaction' is false -AND- the batch contains a retryable error -AND- there wasn't + // a non-retryable error, then all of the ops in the batch are retried regardless of what + // their execution status was. // // This is the key difference between "RetryableWriteWithId" simple write batches and regular - // simple write batch. - if (response.isRetryableWriteWithId && hasRetryableError && !inTransaction && !unrecoverable) { - std::set toRetry; - CollectionsToCreate collsToCreate; - - // For each 'op', queue 'op' for retry and also increment counters as appropriate. + // simple write batches. + if (response.isRetryableWriteWithId && !inTransaction) { + bool hasRetryableErr = false; + bool hasNonRetryableErr = false; for (auto& [op, items] : itemsByOp) { - for (const auto& [shardId, itemVar] : items) { - if (isRetryableError(itemVar)) { - // If 'itemVar' has a Status that is a retryable error, we pass that in when - // calling queueOpForRetry(). - queueOpForRetry(op, getItemStatus(itemVar), toRetry, collsToCreate); - } else { - // Otherwise, call queueOpForRetry() without a Status. - queueOpForRetry(op, toRetry); - } - } - - if (getWriteOpType(op) == kUpdate) { - getQueryCounters(opCtx).updateOneWithoutShardKeyWithIdRetryCount.increment(1); - } else if (getWriteOpType(op) == kDelete) { - getQueryCounters(opCtx).deleteOneWithoutShardKeyWithIdRetryCount.increment(1); + for (auto& [_, itemVar] : items) { + hasRetryableErr |= isRetryableError(itemVar); + hasNonRetryableErr |= isNonRetryableError(itemVar); } } - ProcessorResult result; - result.opsToRetry.insert(result.opsToRetry.end(), toRetry.begin(), toRetry.end()); - result.collsToCreate = std::move(collsToCreate); + if (hasRetryableErr && !(ordered && hasNonRetryableErr)) { + std::set toRetry; + CollectionsToCreate collsToCreate; - // Print the contents of 'opsToRetry' to the log if appropriate. - logOpsToRetry(result.opsToRetry); + // For each 'op', queue 'op' for retry and also increment counters as appropriate. + for (auto& [op, items] : itemsByOp) { + for (const auto& [shardId, itemVar] : items) { + queueOpForRetry(op, toRetry); + queueCreateCollectionIfNeeded(itemVar, collsToCreate); + } - return result; + if (getWriteOpType(op) == kUpdate) { + getQueryCounters(opCtx).updateOneWithoutShardKeyWithIdRetryCount.increment(1); + } else if (getWriteOpType(op) == kDelete) { + getQueryCounters(opCtx).deleteOneWithoutShardKeyWithIdRetryCount.increment(1); + } + } + + ProcessorResult result; + result.opsToRetry.insert(result.opsToRetry.end(), toRetry.begin(), toRetry.end()); + result.collsToCreate = std::move(collsToCreate); + + // Print the contents of 'opsToRetry' to the log if appropriate. + logOpsToRetry(result.opsToRetry); + + return result; + } } // Update the counters (excluding _nErrors), update the list of retried stmtIds, and process @@ -239,15 +259,14 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( std::set toRetry; CollectionsToCreate collsToCreate; absl::flat_hash_map> successfulShardSet; + bool continueProcessing = true; // Process the results for each op that was part of this batch. for (auto& [op, items] : itemsByOp) { tassert(11182201, "Expected op to have at least one item", !items.empty()); - const bool shouldRetryOnUnexecutedOrRetryableError = !unrecoverable && - !write_op_helpers::hasAnyNonRetryableError(items, getErrorCodeForShardItemPair); - - bool continueProcessing = true; + const bool shouldRetryOnUnexecutedOrRetryableError = + !inTransaction && std::none_of(items.begin(), items.end(), isNonRetryableErrorForPair); // Process all of the reply items that correspond to 'op'. for (const auto& [shardId, itemVar] : items) { @@ -271,13 +290,16 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( }, [&](const BulkWriteReplyItem& item) { const auto& status = item.getStatus(); + // If 'item' has an OK status, add 'shardId' to 'successfulShardSet'. + if (status.isOK()) { + successfulShardSet[getWriteOpId(op)].insert(shardId); + } + if (!write_op_helpers::isRetryErrCode(status.code()) || inTransaction) { - // If 'item' has an OK status, add 'shardId' to 'successfulShardSet'. - if (status.isOK()) { - successfulShardSet[getWriteOpId(op)].insert(shardId); - } - // Add 'item' to _results. + // If 'item' is a not a retryable error, or if the command is running in a + // transaction, record the result. recordResult(opCtx, op, std::move(item)); + // If we recorded an error and the write command is ordered or running in // a transaction, then return false to stop processing. if (_nErrors > 0 && (ordered || inTransaction)) { @@ -286,9 +308,9 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( } else if (shouldRetryOnUnexecutedOrRetryableError) { // If the command isn't running in a transaction and 'item' is a retryable // error and 'shouldRetryOnUnexecutedOrRetryableError' is true, then queue - // 'op' to be retried and add any CannotImplicitlyCreateCollection errors - // to 'collsToCreate'. - queueOpForRetry(op, status, toRetry, collsToCreate); + // 'op' to be retried. + queueOpForRetry(op, toRetry); + queueCreateCollectionIfNeeded(status, collsToCreate); } return true; }, @@ -306,7 +328,8 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( // If the command isn't running in a transaction and 'item' is a retryable // error and 'shouldRetryOnUnexecutedOrRetryableError' is true, then queue // 'op' to be retried. - queueOpForRetry(op, status, toRetry, collsToCreate); + queueOpForRetry(op, toRetry); + queueCreateCollectionIfNeeded(status, collsToCreate); } return true; }); @@ -466,18 +489,28 @@ void WriteBatchResponseProcessor::queueOpForRetry(const WriteOp& op, toRetry.emplace(op); } -void WriteBatchResponseProcessor::queueOpForRetry(const WriteOp& op, - const Status& status, - std::set& toRetry, - CollectionsToCreate& collsToCreate) const { - toRetry.emplace(op); - - if (auto info = getCannotImplicitlyCreateCollectionInfo(status)) { +void WriteBatchResponseProcessor::queueCreateCollectionIfNeeded( + std::shared_ptr info, + CollectionsToCreate& collsToCreate) { + // If 'info' is not null, add it to 'collsToCreate'. + if (info) { auto nss = info->getNss(); collsToCreate.emplace(std::move(nss), std::move(info)); } } +void WriteBatchResponseProcessor::queueCreateCollectionIfNeeded( + const Status& status, CollectionsToCreate& collsToCreate) { + return queueCreateCollectionIfNeeded(getCannotImplicitlyCreateCollectionInfo(status), + collsToCreate); +} + +void WriteBatchResponseProcessor::queueCreateCollectionIfNeeded( + const ItemVariant& itemVar, CollectionsToCreate& collsToCreate) { + return queueCreateCollectionIfNeeded(getCannotImplicitlyCreateCollectionInfo(itemVar), + collsToCreate); +} + ShardResult WriteBatchResponseProcessor::onShardResponse(OperationContext* opCtx, RoutingContext& routingCtx, const ShardId& shardId, @@ -653,40 +686,68 @@ ItemVariant WriteBatchResponseProcessor::makeErrorItem(const WriteOp& op, } } -GroupItemsResult WriteBatchResponseProcessor::groupItemsByOp( - OperationContext* opCtx, std::vector>& shardResults) const { - const bool ordered = _cmdRef.getOrdered(); +ItemsByOpMap WriteBatchResponseProcessor::groupItemsByOp( + OperationContext* opCtx, + const std::vector>& shardResults, + const absl::flat_hash_set& opsUsingSVIgnored) const { const bool inTransaction = static_cast(TransactionRouter::get(opCtx)); - - auto isUnrecoverable = [&](const ItemVariant& itemVar) { - const auto code = getErrorCode(itemVar); - return (code != ErrorCodes::OK && - (inTransaction || (ordered && !write_op_helpers::isRetryErrCode(code)))); + auto isOKItemForPair = [](auto&& p) { + return isOKItem(p.second); }; - // Organize the items from the shard responses by op, and check if any of the items is an - // unrecoverable error. - ItemsByOpMap itemsByOp; - bool unrecoverable = false; - bool hasRetryableError = false; + auto isCollUUIDMismatchWithoutActualNamespace = [&](const ItemVariant& itemVar) -> bool { + if (const auto* bwItem = get_if(&itemVar)) { + return write_op_helpers::isCollUUIDMismatchWithoutActualNamespace(bwItem->getStatus()); + } + return false; + }; + ItemsByOpMap itemMap; + absl::flat_hash_set opsToRevisit; + + // Organize the items by op and store them into 'itemMap'. for (const auto& [shardId, shardResult] : shardResults) { - for (auto& [op, itemVar] : shardResult.items) { - unrecoverable |= isUnrecoverable(itemVar); - hasRetryableError |= isRetryableError(itemVar); + for (const auto& [op, itemVar] : shardResult.items) { + const auto opId = getWriteOpId(op); + const bool opUsesSVIgnored = !inTransaction && opsUsingSVIgnored.count(opId); - auto it = itemsByOp.find(op); - if (it != itemsByOp.end()) { - it->second.emplace_back(shardId, std::move(itemVar)); - } else { - std::vector> vec; - vec.emplace_back(shardId, std::move(itemVar)); - itemsByOp.emplace(op, std::move(vec)); + // For ops that use shardVersion IGNORED and are broadcast to all shards, by design we + // ignore CollectionUUIDMismatch errors that don't have an actual namespace if the op + // was successful on other shards. + // + // We implement this special behavior by adding the op to 'opsToRevisit' and then + // revisiting the op during a second pass (after this loop finishes). + if (opUsesSVIgnored && isCollUUIDMismatchWithoutActualNamespace(itemVar)) { + opsToRevisit.emplace(op); + } + + // Store 'itemVar' into 'itemMap'. + auto [it, _] = itemMap.try_emplace(op); + it->second.emplace_back(shardId, itemVar); + } + } + + // Visit the ops in 'opsToRevisit' and ignore CollectionUUIDMismatch errors for these ops + // as appropriate. + for (auto& op : opsToRevisit) { + auto& items = itemMap[op]; + auto opId = getWriteOpId(op); + + if (opHasSuccess(opId) || std::any_of(items.begin(), items.end(), isOKItemForPair)) { + // If 'op' has a successful result recorded already or if 'items' has an OK item, then + // remove all CollectionUUIDMismatch errors without an actual namespace from 'items'. + std::erase_if(items, [&](auto&& p) { + return isCollUUIDMismatchWithoutActualNamespace(p.second); + }); + + // If 'items' is now empty, remove 'op' from 'itemMap'. + if (items.empty()) { + itemMap.erase(op); } } } - return GroupItemsResult{std::move(itemsByOp), unrecoverable, hasRetryableError}; + return itemMap; } void WriteBatchResponseProcessor::processCountersAndRetriedStmtIds( @@ -993,13 +1054,14 @@ BulkWriteReplyItem combineErrorReplies(WriteOpId opId, std::vector& actualCollection, - bool& hasContactedPrimaryShard) { +void addActualNamespaceForCollUUIDMismatchError(OperationContext* opCtx, + Status& error, + boost::optional& actualCollection, + bool& hasContactedPrimaryShard) { // Return early if 'error' is not a CollectionUUIDMismatch error or if it's not missing the // "actualCollection" field. auto info = getCollectionUUIDMismatchInfo(error); @@ -1063,6 +1125,20 @@ void WriteBatchResponseProcessor::recordTargetErrors(OperationContext* opCtx, } } +bool WriteBatchResponseProcessor::opHasSuccess(WriteOpId opId) const { + auto it = _results.find(opId); + if (it == _results.end()) { + return false; + } + + auto visitItem = OverloadedVisitor( + [&](const BulkWriteOpResults& result) { return result.hasSuccess; }, + [&](const FindAndModifyReplyItem& result) { return result.swReply.getStatus().isOK(); }); + + const auto& resultVar = it->second; + return visit(std::move(visitItem), resultVar); +} + bool WriteBatchResponseProcessor::checkBulkWriteReplyMaxSize(OperationContext* opCtx) { // Cannot exceed the reply size limit if we have no responses. if (_results.empty()) { @@ -1092,12 +1168,10 @@ bool WriteBatchResponseProcessor::checkBulkWriteReplyMaxSize(OperationContext* o std::vector>> WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) { - const bool inTransaction = static_cast(TransactionRouter::get(opCtx)); - std::vector>> aggregatedReplies; std::map> actualCollections; std::map hasContactedPrimaryShard; - bool hasCollUUIDMismatchErrorsWithNoActualCollection = false; + bool hasCollUUIDMismatchErrorsWithoutActualNamespace = false; for (const auto& nss : _cmdRef.getNssSet()) { actualCollections.emplace(nss, boost::none); @@ -1105,7 +1179,7 @@ WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) { } // For CollectionUUIDMismatch errors, this lambda checks if "actualCollection" is set, and - // it updates the 'actualCollection' and 'hasCollUUIDMismatchErrorsWithNoActualCollection' + // it updates the 'actualCollection' and 'hasCollUUIDMismatchErrorsWithoutActualNamespace' // variables appropriately. auto noteCollUUIDMismatchError = [&](const WriteOp& op, const Status& status) { auto info = getCollectionUUIDMismatchInfo(status); @@ -1118,7 +1192,7 @@ WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) { actualCollection = info->actualCollection(); } } else { - hasCollUUIDMismatchErrorsWithNoActualCollection = true; + hasCollUUIDMismatchErrorsWithoutActualNamespace = true; } }; @@ -1150,19 +1224,6 @@ WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) { continue; } - // There are errors that are safe to ignore if they were correctly applied to other - // shards and we're using ShardVersion::IGNORED. They are safe to ignore as they can be - // interpreted as no-ops if the shard response had been instead a successful result - // since they wouldn't have modified any data. As a result, we can swallow the errors - // and treat them as a successful operation. - const bool opIsMulti = _cmdRef.getOp(opId).getMulti(); - const bool canIgnoreErrors = - write_op_helpers::shouldTargetAllShardsSVIgnored(inTransaction, opIsMulti) && - write_op_helpers::isSafeToIgnoreErrorInPartiallyAppliedOp(reply->getStatus()); - if (canIgnoreErrors) { - reply = boost::none; - } - if (reply && reply->getStatus() == ErrorCodes::CollectionUUIDMismatch) { noteCollUUIDMismatchError(op, reply->getStatus()); } @@ -1199,16 +1260,16 @@ WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) { } } - // If there were any CollectionUUIDMismatch errors where "actualCollection" is not set, then - // call addActualCollectionForCollUUIDMismatchError() to populate the "actualCollection" field. - if (hasCollUUIDMismatchErrorsWithNoActualCollection) { + // If there were any CollectionUUIDMismatch errors without an actual namespace, then call + // addActualNamespaceForCollUUIDMismatchError() to populate the "actualCollection" field. + if (hasCollUUIDMismatchErrorsWithoutActualNamespace) { for (auto& [opId, item] : aggregatedReplies) { if (item && item->getStatus() == ErrorCodes::CollectionUUIDMismatch) { auto op = WriteOp{_cmdRef.getOp(opId)}; const NamespaceString& nss = op.getNss(); auto error = item->getStatus(); - addActualCollectionForCollUUIDMismatchError( + addActualNamespaceForCollUUIDMismatchError( opCtx, error, actualCollections[nss], hasContactedPrimaryShard[nss]); item->setStatus(std::move(error)); diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.h b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.h index dd65dbe2f8d..62b68fff02d 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.h +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.h @@ -98,12 +98,6 @@ public: using ItemsByOpMap = std::map>>; - struct GroupItemsResult { - ItemsByOpMap itemsByOp; - bool unrecoverable = false; - bool hasRetryableError = false; - }; - struct ShardResult { boost::optional batchWriteReply; boost::optional bulkWriteReply; @@ -188,6 +182,12 @@ public: return _numOkItemsProcessed; } + /** + * Returns true if one or more successful results have been recorded for 'opId', otherwise + * returns false. + */ + bool opHasSuccess(WriteOpId opId) const; + /** * Returns true if we've exceeded the max reply size, false otherwise. If we have exceeded the * max size, we record this as an error for the next write op. @@ -277,13 +277,25 @@ private: void queueOpForRetry(const WriteOp& op, std::set& toRetry) const; /** - * Helper method that adds 'op' to 'toRetry', and extracts CannotImplicityCreateCollectionInfo - * from 'status' (if it exists) and stores it into 'collsToCreate'. + * Helper method that stores the given CannotImplicityCreateCollectionInfo ('info') into + * 'collsToCreate'. If 'info' is null, this method does nothing. */ - void queueOpForRetry(const WriteOp& op, - const Status& status, - std::set& toRetry, - CollectionsToCreate& collsToCreate) const; + void queueCreateCollectionIfNeeded( + std::shared_ptr info, + CollectionsToCreate& collsToCreate); + + /** + * Helper method that tries to extract a CannotImplicityCreateCollectionInfo from 'status', + * and if successful it stores the CannotImplicityCreateCollectionInfo into 'collsToCreate'. + */ + void queueCreateCollectionIfNeeded(const Status& status, CollectionsToCreate& collsToCreate); + + /** + * Helper method that tries to extract a CannotImplicityCreateCollectionInfo from 'itemVar', + * and if successful it stores the CannotImplicityCreateCollectionInfo into 'collsToCreate'. + */ + void queueCreateCollectionIfNeeded(const ItemVariant& itemVar, + CollectionsToCreate& collsToCreate); /** * Generate an ItemVariant for 'op' with an error given by 'status', returned from 'shardId'. @@ -297,8 +309,9 @@ private: * method also returns a flag indicating if 'shardResults' contains a retryable error, and * another flag indicating if an error occurred that is "unrecoverable". */ - GroupItemsResult groupItemsByOp( - OperationContext* opCtx, std::vector>& shardResults) const; + ItemsByOpMap groupItemsByOp(OperationContext* opCtx, + const std::vector>& shardResults, + const absl::flat_hash_set& opsUsingSVIgnored) const; /** * Gets the error details and upsert details from 'result.batchWriteReply', converts these diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.cpp b/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.cpp index 160c771eae4..b983407eebd 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.cpp @@ -61,6 +61,7 @@ StatusWith WriteOpAnalyzerImpl::analyze(OperationContext* opCtx, const WriteOp& op) try { auto nss = op.getNss(); bool isViewfulTimeseries = false; + bool usesSVIgnored = false; // TODO SERVER-106874 remove the namespace translation check entirely once 9.0 becomes last // LTS. By then we will only have viewless timeseries that do not require nss translation. @@ -177,6 +178,9 @@ StatusWith WriteOpAnalyzerImpl::analyze(OperationContext* opCtx, // 'type' is kMultiShard (instead of targeting all shards). tr.endpoints = targetAllShards(opCtx, cri); + // Record that we set shardVersion to IGNORED for this op. + usesSVIgnored = true; + for (auto& endpoint : tr.endpoints) { auto& shardVersion = endpoint.shardVersion; tassert(11841901, @@ -204,7 +208,8 @@ StatusWith WriteOpAnalyzerImpl::analyze(OperationContext* opCtx, recordTargetingStats(opCtx, cri, tr, op); } - return Analysis{type, std::move(tr.endpoints), isViewfulTimeseries, std::move(sampleId)}; + return Analysis{ + type, std::move(tr.endpoints), isViewfulTimeseries, std::move(sampleId), usesSVIgnored}; } catch (const DBException& ex) { auto status = ex.toStatus(); diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.h b/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.h index f664649cc81..e11e097c859 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.h +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.h @@ -62,8 +62,9 @@ struct Analysis { // 'isViewfulTimeseries' is set to true when the write op is on the main namespace of a viewful // timeseries collection. This flag makes sure the executor sends the command with translation // to buckets namespace correctly. - bool isViewfulTimeseries; + bool isViewfulTimeseries = false; boost::optional targetedSampleId; + bool usesSVIgnored = false; }; class WriteOpAnalyzer { public: diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.cpp b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.cpp index c4abed14c32..7a6cd5289fc 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.cpp @@ -243,11 +243,16 @@ public: } const auto& targetedSampleId = analysis.targetedSampleId; + if (targetedSampleId && targetedSampleId->isFor(shardName)) { auto& request = _batch->requestByShardId[shardName]; request.sampleIds.emplace(getWriteOpId(writeOp), targetedSampleId->getId()); } } + + if (analysis.usesSVIgnored) { + _batch->opsUsingSVIgnored.emplace(getWriteOpId(writeOp)); + } } /** diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.h b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.h index 274bc566ffd..2ee404f6cbe 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.h +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.h @@ -63,8 +63,8 @@ struct SimpleWriteBatch { }; std::map requestByShardId; - bool isRetryableWriteWithId = false; + absl::flat_hash_set opsUsingSVIgnored; static SimpleWriteBatch makeEmpty(bool isRetryableWriteWithId) { return SimpleWriteBatch{{}, isRetryableWriteWithId}; diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index decf2756bb8..f1becc964a0 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -416,7 +416,7 @@ void WriteOp::_updateOpState(OperationContext* opCtx, // have modified any data. As a result, we can swallow the errors and treat them as a // successful operation. if (isTargetingAllShardsWithSVIgnored && - write_op_helpers::isSafeToIgnoreErrorInPartiallyAppliedOp(_error->getStatus()) && + write_op_helpers::isCollUUIDMismatchWithoutActualNamespace(_error->getStatus()) && !_successfulShardSet.empty()) { if (!hasPendingChild) { _error.reset(); diff --git a/src/mongo/s/write_ops/write_op_helper.cpp b/src/mongo/s/write_ops/write_op_helper.cpp index d9f8993902a..e3a611d83d3 100644 --- a/src/mongo/s/write_ops/write_op_helper.cpp +++ b/src/mongo/s/write_ops/write_op_helper.cpp @@ -51,15 +51,7 @@ bool isOnlyTargetDataOwningShardsForMultiWritesEnabled() { return clusterParam->getValue(boost::none).getEnabled(); } -bool shouldTargetAllShardsSVIgnored(bool inTransaction, bool isMulti) { - // Fetch the 'onlyTargetDataOwningShardsForMultiWrites' cluster param. - if (isMulti && !inTransaction) { - return !isOnlyTargetDataOwningShardsForMultiWritesEnabled(); - } - return false; -} - -bool isSafeToIgnoreErrorInPartiallyAppliedOp(const Status& status) { +bool isCollUUIDMismatchWithoutActualNamespace(const Status& status) { return status.code() == ErrorCodes::CollectionUUIDMismatch && !status.extraInfo()->actualCollection(); } diff --git a/src/mongo/s/write_ops/write_op_helper.h b/src/mongo/s/write_ops/write_op_helper.h index 119fdf41603..76822f89483 100644 --- a/src/mongo/s/write_ops/write_op_helper.h +++ b/src/mongo/s/write_ops/write_op_helper.h @@ -152,21 +152,10 @@ ItemType getFirstNonRetryableError(const std::vector& items, GetCodeFn bool isOnlyTargetDataOwningShardsForMultiWritesEnabled(); /** - * Returns whether an operation should target all shards with ShardVersion::IGNORED(). This is - * true for multi: true writes where 'onlyTargetDataOwningShardsForMultiWrites' is false and we are - * not in a transaction. + * Return true if 'status' contains a CollectionUUIDMismatch error without an actual namespace, + * otherwise return false. */ -bool shouldTargetAllShardsSVIgnored(bool inTransaction, bool isMulti); - -/** - * Used to check if a partially applied (successful on some shards but not others)operation has an - * errors that is safe to ignore. UUID mismatch errors are safe to ignore if the actualCollection is - * null in conjuntion with other successful operations. This is true because it means we wrongly - * targeted a non-owning shard with the operation and we wouldn't have applied any modifications - * anyway. Note this is only safe if we're using ShardVersion::IGNORED since we're ignoring any - * placement concern and broadcasting to all shards. - */ -bool isSafeToIgnoreErrorInPartiallyAppliedOp(const Status& status); +bool isCollUUIDMismatchWithoutActualNamespace(const Status& status); int computeBaseSizeEstimate(OperationContext* opCtx, const BulkWriteCommandRequest& client);