SERVER-119175 [UWE] Fix logic for ignoring CollectionUUIDMismatch errors when appropriate (#47745)
GitOrigin-RevId: 2ea298282f8f7d2f55f8ea2ad213b7d3284ed0be
This commit is contained in:
parent
9b39ddac21
commit
ecaeff5903
@ -163,3 +163,6 @@ filters:
|
||||
- "hashed_shard_key_index_optional_shard_collection.js":
|
||||
approvers:
|
||||
- 10gen/server-cluster-scalability
|
||||
- "swallow_unnecessary_uuid_mismatch_error.js":
|
||||
approvers:
|
||||
- 10gen/query-execution-router
|
||||
|
||||
@ -12,33 +12,33 @@ const st = new ShardingTest({shards: 3});
|
||||
const db = st.s.getDB(jsTestName());
|
||||
assert.commandWorked(st.s.adminCommand({enableSharding: db.getName(), primaryShard: st.shard0.shardName}));
|
||||
|
||||
const shardedColl1 = db.sharded_1;
|
||||
|
||||
assert.commandWorked(shardedColl1.insert({_id: 0}));
|
||||
assert.commandWorked(shardedColl1.insert({_id: 10}));
|
||||
assert.commandWorked(shardedColl1.insert({_id: 50}));
|
||||
assert.commandWorked(shardedColl1.insert({_id: 100}));
|
||||
|
||||
const splitChunk = function (coll, splitPointKeyValue) {
|
||||
assert.commandWorked(st.s.adminCommand({split: coll.getFullName(), middle: {_id: splitPointKeyValue}}));
|
||||
};
|
||||
|
||||
const uuid = function (coll) {
|
||||
const getUUIDForCollection = function (coll) {
|
||||
return assert
|
||||
.commandWorked(db.runCommand({listCollections: 1}))
|
||||
.cursor.firstBatch.find((c) => c.name === coll.getName()).info.uuid;
|
||||
};
|
||||
|
||||
// shard0: [inf, 50), shard1: [50, inf), shard2 has no chunks
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: shardedColl1.getFullName(), key: {_id: 1}}));
|
||||
splitChunk(shardedColl1, 50);
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: shardedColl1.getFullName(), find: {_id: 50}, to: st.shard1.shardName}),
|
||||
);
|
||||
const coll1 = db.sharded_1;
|
||||
const collName1 = coll1.getName();
|
||||
const collFullName1 = coll1.getFullName();
|
||||
|
||||
const cmdObj = {
|
||||
delete: shardedColl1.getName(),
|
||||
collectionUUID: uuid(shardedColl1),
|
||||
const coll2 = db.sharded_2;
|
||||
const collName2 = coll2.getName();
|
||||
const collFullName2 = coll2.getFullName();
|
||||
|
||||
assert.commandWorked(coll1.insert([{_id: 0}, {_id: 10}, {_id: 50}, {_id: 100}]));
|
||||
|
||||
// shard0: [inf, 50), shard1: [50, inf), shard2 has no chunks
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: collFullName1, key: {_id: 1}}));
|
||||
assert.commandWorked(st.s.adminCommand({split: collFullName1, middle: {_id: 50}}));
|
||||
assert.commandWorked(st.s.adminCommand({moveChunk: collFullName1, find: {_id: 50}, to: st.shard1.shardName}));
|
||||
|
||||
let uuid1 = getUUIDForCollection(coll1);
|
||||
|
||||
// Test multi deletes by _id, where the shard key equals "{_id: 1}".
|
||||
let cmdObj = {
|
||||
delete: collName1,
|
||||
collectionUUID: uuid1,
|
||||
deletes: [
|
||||
{
|
||||
q: {
|
||||
@ -57,4 +57,56 @@ let res = db.runCommand(cmdObj);
|
||||
assert.commandWorked(res);
|
||||
// Only 3 documents fulfill the criteria so verify we've deleted them.
|
||||
assert.eq(3, res.n);
|
||||
|
||||
assert.commandWorked(coll2.insert([{_id: 0}, {_id: 10}, {_id: 50}, {_id: 100}]));
|
||||
assert.commandWorked(coll2.insert([{_id: 11}, {_id: 51}]));
|
||||
|
||||
// Create an index on {x: 1}.
|
||||
assert.commandWorked(db.runCommand({createIndexes: collName2, indexes: [{name: "x_1", key: {x: 1}}]}));
|
||||
|
||||
// Shard the collection on {x: 1} and split up the key space so that shard0 owns [inf, 50) and
|
||||
// shard1 owns [50, inf).
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: collFullName2, key: {x: 1}}));
|
||||
assert.commandWorked(st.s.adminCommand({split: collFullName2, middle: {x: 50}}));
|
||||
assert.commandWorked(st.s.adminCommand({moveChunk: collFullName2, find: {x: 50}, to: st.shard1.shardName}));
|
||||
|
||||
let uuid2 = getUUIDForCollection(coll2);
|
||||
|
||||
// Test multi deletes by _id, where the shard key does not include "_id".
|
||||
cmdObj = {
|
||||
delete: collName2,
|
||||
collectionUUID: uuid2,
|
||||
deletes: [
|
||||
{
|
||||
q: {
|
||||
$and: [
|
||||
{$expr: {$eq: [{$mod: ["$_id", 2]}, {$literal: 0}]}},
|
||||
{$expr: {$gte: ["$_id", {$literal: 0}]}},
|
||||
{$expr: {$lt: ["$_id", {$literal: 99}]}},
|
||||
],
|
||||
},
|
||||
limit: 0,
|
||||
hint: {_id: 1},
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
res = db.runCommand(cmdObj);
|
||||
assert.commandWorked(res);
|
||||
assert.eq(3, res.n);
|
||||
|
||||
// Test non-multi deletes by _id, where the shard key does not include "_id".
|
||||
cmdObj = {
|
||||
delete: collName2,
|
||||
collectionUUID: uuid2,
|
||||
deletes: [
|
||||
{q: {_id: 11}, limit: 1, hint: {_id: 1}},
|
||||
{q: {_id: 51}, limit: 1, hint: {_id: 1}},
|
||||
],
|
||||
};
|
||||
|
||||
res = db.runCommand(cmdObj);
|
||||
assert.commandWorked(res);
|
||||
assert.eq(2, res.n);
|
||||
|
||||
st.stop();
|
||||
|
||||
@ -1074,7 +1074,8 @@ WriteBatchResponse WriteBatchExecutor::_execute(OperationContext* opCtx,
|
||||
routingCtx.onRequestSentForNss(nss);
|
||||
}
|
||||
|
||||
auto resp = SimpleWriteBatchResponse::makeEmpty(batch.isRetryableWriteWithId);
|
||||
auto resp =
|
||||
SimpleWriteBatchResponse::makeEmpty(batch.isRetryableWriteWithId, batch.opsUsingSVIgnored);
|
||||
bool stopParsingResponses = false;
|
||||
|
||||
while (!stopParsingResponses && !sender.done()) {
|
||||
|
||||
@ -186,9 +186,11 @@ struct EmptyBatchResponse {};
|
||||
struct SimpleWriteBatchResponse {
|
||||
std::vector<std::pair<ShardId, ShardResponse>> shardResponses;
|
||||
bool isRetryableWriteWithId = false;
|
||||
absl::flat_hash_set<WriteOpId> opsUsingSVIgnored;
|
||||
|
||||
static SimpleWriteBatchResponse makeEmpty(bool isRetryableWriteWithId) {
|
||||
return SimpleWriteBatchResponse{{}, isRetryableWriteWithId};
|
||||
static SimpleWriteBatchResponse makeEmpty(bool isRetryableWriteWithId,
|
||||
absl::flat_hash_set<WriteOpId> opsUsingSVIgnored) {
|
||||
return SimpleWriteBatchResponse{{}, isRetryableWriteWithId, std::move(opsUsingSVIgnored)};
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -53,41 +53,38 @@ using ItemVariant = WriteBatchResponseProcessor::ItemVariant;
|
||||
using Unexecuted = WriteBatchResponseProcessor::Unexecuted;
|
||||
using SucceededWithoutItem = WriteBatchResponseProcessor::SucceededWithoutItem;
|
||||
using FindAndModifyReplyItem = WriteBatchResponseProcessor::FindAndModifyReplyItem;
|
||||
using GroupItemsResult = WriteBatchResponseProcessor::GroupItemsResult;
|
||||
using ItemsByOpMap = WriteBatchResponseProcessor::ItemsByOpMap;
|
||||
using ShardResult = WriteBatchResponseProcessor::ShardResult;
|
||||
|
||||
namespace {
|
||||
// This function returns the Status for a given ItemVariant ('itemVar'). If 'itemVar' is Unexecuted,
|
||||
// this function will return an OK status.
|
||||
Status getItemStatus(const ItemVariant& itemVar) {
|
||||
return visit(OverloadedVisitor(
|
||||
[&](const Unexecuted&) { return Status::OK(); },
|
||||
[&](const SucceededWithoutItem&) { return Status::OK(); },
|
||||
[&](const BulkWriteReplyItem& item) { return item.getStatus(); },
|
||||
[&](const FindAndModifyReplyItem& item) { return item.swReply.getStatus(); }),
|
||||
itemVar);
|
||||
}
|
||||
|
||||
// This function returns the error code for a given ItemVariant ('itemVar'). If 'itemVar' is
|
||||
// Unexecuted, this function will return ErrorCodes::OK.
|
||||
ErrorCodes::Error getErrorCode(const ItemVariant& itemVar) {
|
||||
// Unexecuted, this function will return boost::none.
|
||||
boost::optional<ErrorCodes::Error> getErrorCode(const ItemVariant& itemVar) {
|
||||
using RetT = boost::optional<ErrorCodes::Error>;
|
||||
return visit(
|
||||
OverloadedVisitor(
|
||||
[&](const Unexecuted&) { return ErrorCodes::OK; },
|
||||
[&](const SucceededWithoutItem&) { return ErrorCodes::OK; },
|
||||
[&](const BulkWriteReplyItem& item) { return item.getStatus().code(); },
|
||||
[&](const FindAndModifyReplyItem& item) { return item.swReply.getStatus().code(); }),
|
||||
[&](const Unexecuted&) -> RetT { return boost::none; },
|
||||
[&](const SucceededWithoutItem&) -> RetT { return ErrorCodes::OK; },
|
||||
[&](const BulkWriteReplyItem& item) -> RetT { return item.getStatus().code(); },
|
||||
[&](const FindAndModifyReplyItem& item) -> RetT {
|
||||
return item.swReply.getStatus().code();
|
||||
}),
|
||||
itemVar);
|
||||
}
|
||||
|
||||
// Like getErrorCode(), but takes a 'std::pair<ShardId,ItemVariant>' as its input.
|
||||
ErrorCodes::Error getErrorCodeForShardItemPair(const std::pair<ShardId, ItemVariant>& p) {
|
||||
return getErrorCode(p.second);
|
||||
bool isOKItem(const ItemVariant& itemVar) {
|
||||
auto code = getErrorCode(itemVar);
|
||||
return code && *code == ErrorCodes::OK;
|
||||
}
|
||||
|
||||
bool isRetryableError(const ItemVariant& itemVar) {
|
||||
return write_op_helpers::isRetryErrCode(getErrorCode(itemVar));
|
||||
auto code = getErrorCode(itemVar);
|
||||
return code && write_op_helpers::isRetryErrCode(*code);
|
||||
}
|
||||
|
||||
bool isNonRetryableError(const ItemVariant& itemVar) {
|
||||
auto code = getErrorCode(itemVar);
|
||||
return code && *code != ErrorCodes::OK && !write_op_helpers::isRetryErrCode(*code);
|
||||
}
|
||||
|
||||
template <typename ResultT>
|
||||
@ -122,6 +119,21 @@ std::shared_ptr<const CannotImplicitlyCreateCollectionInfo> getCannotImplicitlyC
|
||||
return {};
|
||||
}
|
||||
|
||||
std::shared_ptr<const CannotImplicitlyCreateCollectionInfo> getCannotImplicitlyCreateCollectionInfo(
|
||||
const ItemVariant& itemVar) {
|
||||
using RetT = std::shared_ptr<const CannotImplicitlyCreateCollectionInfo>;
|
||||
return visit(OverloadedVisitor(
|
||||
[&](const Unexecuted&) -> RetT { return {}; },
|
||||
[&](const SucceededWithoutItem&) -> RetT { return {}; },
|
||||
[&](const BulkWriteReplyItem& item) -> RetT {
|
||||
return getCannotImplicitlyCreateCollectionInfo(item.getStatus());
|
||||
},
|
||||
[&](const FindAndModifyReplyItem& item) -> RetT {
|
||||
return getCannotImplicitlyCreateCollectionInfo(item.swReply.getStatus());
|
||||
}),
|
||||
itemVar);
|
||||
}
|
||||
|
||||
std::shared_ptr<const CollectionUUIDMismatchInfo> getCollectionUUIDMismatchInfo(
|
||||
const Status& status) {
|
||||
if (status == ErrorCodes::CollectionUUIDMismatch) {
|
||||
@ -169,6 +181,9 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
||||
OperationContext* opCtx, RoutingContext& routingCtx, const SimpleWriteBatchResponse& response) {
|
||||
const bool ordered = _cmdRef.getOrdered();
|
||||
const bool inTransaction = static_cast<bool>(TransactionRouter::get(opCtx));
|
||||
auto isNonRetryableErrorForPair = [](auto&& p) {
|
||||
return isNonRetryableError(p.second);
|
||||
};
|
||||
|
||||
std::vector<std::pair<ShardId, ShardResult>> shardResults;
|
||||
shardResults.reserve(response.shardResponses.size());
|
||||
@ -179,48 +194,53 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
||||
onShardResponse(opCtx, routingCtx, shardId, shardResponse));
|
||||
}
|
||||
|
||||
// Organize the items from the shard responses by op, and check if any of the items is an
|
||||
// unrecoverable error.
|
||||
auto [itemsByOp, unrecoverable, hasRetryableError] = groupItemsByOp(opCtx, shardResults);
|
||||
// Organize the items from the shard responses by op.
|
||||
auto itemsByOp = groupItemsByOp(opCtx, shardResults, response.opsUsingSVIgnored);
|
||||
|
||||
// For "RetryableWriteWithId" simple write batches, a different retry strategy is used. If the
|
||||
// batch has any retryable errors (and 'inTransaction' and 'unrecoverable' are false), then all
|
||||
// of the ops in the batch are retried regardless of what their execution status was.
|
||||
// For "RetryableWriteWithId" simple write batches, a different retry strategy is used. If
|
||||
// 'inTransaction' is false -AND- the batch contains a retryable error -AND- there wasn't
|
||||
// a non-retryable error, then all of the ops in the batch are retried regardless of what
|
||||
// their execution status was.
|
||||
//
|
||||
// This is the key difference between "RetryableWriteWithId" simple write batches and regular
|
||||
// simple write batch.
|
||||
if (response.isRetryableWriteWithId && hasRetryableError && !inTransaction && !unrecoverable) {
|
||||
std::set<WriteOp> toRetry;
|
||||
CollectionsToCreate collsToCreate;
|
||||
|
||||
// For each 'op', queue 'op' for retry and also increment counters as appropriate.
|
||||
// simple write batches.
|
||||
if (response.isRetryableWriteWithId && !inTransaction) {
|
||||
bool hasRetryableErr = false;
|
||||
bool hasNonRetryableErr = false;
|
||||
for (auto& [op, items] : itemsByOp) {
|
||||
for (const auto& [shardId, itemVar] : items) {
|
||||
if (isRetryableError(itemVar)) {
|
||||
// If 'itemVar' has a Status that is a retryable error, we pass that in when
|
||||
// calling queueOpForRetry().
|
||||
queueOpForRetry(op, getItemStatus(itemVar), toRetry, collsToCreate);
|
||||
} else {
|
||||
// Otherwise, call queueOpForRetry() without a Status.
|
||||
queueOpForRetry(op, toRetry);
|
||||
}
|
||||
}
|
||||
|
||||
if (getWriteOpType(op) == kUpdate) {
|
||||
getQueryCounters(opCtx).updateOneWithoutShardKeyWithIdRetryCount.increment(1);
|
||||
} else if (getWriteOpType(op) == kDelete) {
|
||||
getQueryCounters(opCtx).deleteOneWithoutShardKeyWithIdRetryCount.increment(1);
|
||||
for (auto& [_, itemVar] : items) {
|
||||
hasRetryableErr |= isRetryableError(itemVar);
|
||||
hasNonRetryableErr |= isNonRetryableError(itemVar);
|
||||
}
|
||||
}
|
||||
|
||||
ProcessorResult result;
|
||||
result.opsToRetry.insert(result.opsToRetry.end(), toRetry.begin(), toRetry.end());
|
||||
result.collsToCreate = std::move(collsToCreate);
|
||||
if (hasRetryableErr && !(ordered && hasNonRetryableErr)) {
|
||||
std::set<WriteOp> toRetry;
|
||||
CollectionsToCreate collsToCreate;
|
||||
|
||||
// Print the contents of 'opsToRetry' to the log if appropriate.
|
||||
logOpsToRetry(result.opsToRetry);
|
||||
// For each 'op', queue 'op' for retry and also increment counters as appropriate.
|
||||
for (auto& [op, items] : itemsByOp) {
|
||||
for (const auto& [shardId, itemVar] : items) {
|
||||
queueOpForRetry(op, toRetry);
|
||||
queueCreateCollectionIfNeeded(itemVar, collsToCreate);
|
||||
}
|
||||
|
||||
return result;
|
||||
if (getWriteOpType(op) == kUpdate) {
|
||||
getQueryCounters(opCtx).updateOneWithoutShardKeyWithIdRetryCount.increment(1);
|
||||
} else if (getWriteOpType(op) == kDelete) {
|
||||
getQueryCounters(opCtx).deleteOneWithoutShardKeyWithIdRetryCount.increment(1);
|
||||
}
|
||||
}
|
||||
|
||||
ProcessorResult result;
|
||||
result.opsToRetry.insert(result.opsToRetry.end(), toRetry.begin(), toRetry.end());
|
||||
result.collsToCreate = std::move(collsToCreate);
|
||||
|
||||
// Print the contents of 'opsToRetry' to the log if appropriate.
|
||||
logOpsToRetry(result.opsToRetry);
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
// Update the counters (excluding _nErrors), update the list of retried stmtIds, and process
|
||||
@ -239,15 +259,14 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
||||
std::set<WriteOp> toRetry;
|
||||
CollectionsToCreate collsToCreate;
|
||||
absl::flat_hash_map<WriteOpId, std::set<ShardId>> successfulShardSet;
|
||||
bool continueProcessing = true;
|
||||
|
||||
// Process the results for each op that was part of this batch.
|
||||
for (auto& [op, items] : itemsByOp) {
|
||||
tassert(11182201, "Expected op to have at least one item", !items.empty());
|
||||
|
||||
const bool shouldRetryOnUnexecutedOrRetryableError = !unrecoverable &&
|
||||
!write_op_helpers::hasAnyNonRetryableError(items, getErrorCodeForShardItemPair);
|
||||
|
||||
bool continueProcessing = true;
|
||||
const bool shouldRetryOnUnexecutedOrRetryableError =
|
||||
!inTransaction && std::none_of(items.begin(), items.end(), isNonRetryableErrorForPair);
|
||||
|
||||
// Process all of the reply items that correspond to 'op'.
|
||||
for (const auto& [shardId, itemVar] : items) {
|
||||
@ -271,13 +290,16 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
||||
},
|
||||
[&](const BulkWriteReplyItem& item) {
|
||||
const auto& status = item.getStatus();
|
||||
// If 'item' has an OK status, add 'shardId' to 'successfulShardSet'.
|
||||
if (status.isOK()) {
|
||||
successfulShardSet[getWriteOpId(op)].insert(shardId);
|
||||
}
|
||||
|
||||
if (!write_op_helpers::isRetryErrCode(status.code()) || inTransaction) {
|
||||
// If 'item' has an OK status, add 'shardId' to 'successfulShardSet'.
|
||||
if (status.isOK()) {
|
||||
successfulShardSet[getWriteOpId(op)].insert(shardId);
|
||||
}
|
||||
// Add 'item' to _results.
|
||||
// If 'item' is a not a retryable error, or if the command is running in a
|
||||
// transaction, record the result.
|
||||
recordResult(opCtx, op, std::move(item));
|
||||
|
||||
// If we recorded an error and the write command is ordered or running in
|
||||
// a transaction, then return false to stop processing.
|
||||
if (_nErrors > 0 && (ordered || inTransaction)) {
|
||||
@ -286,9 +308,9 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
||||
} else if (shouldRetryOnUnexecutedOrRetryableError) {
|
||||
// If the command isn't running in a transaction and 'item' is a retryable
|
||||
// error and 'shouldRetryOnUnexecutedOrRetryableError' is true, then queue
|
||||
// 'op' to be retried and add any CannotImplicitlyCreateCollection errors
|
||||
// to 'collsToCreate'.
|
||||
queueOpForRetry(op, status, toRetry, collsToCreate);
|
||||
// 'op' to be retried.
|
||||
queueOpForRetry(op, toRetry);
|
||||
queueCreateCollectionIfNeeded(status, collsToCreate);
|
||||
}
|
||||
return true;
|
||||
},
|
||||
@ -306,7 +328,8 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
||||
// If the command isn't running in a transaction and 'item' is a retryable
|
||||
// error and 'shouldRetryOnUnexecutedOrRetryableError' is true, then queue
|
||||
// 'op' to be retried.
|
||||
queueOpForRetry(op, status, toRetry, collsToCreate);
|
||||
queueOpForRetry(op, toRetry);
|
||||
queueCreateCollectionIfNeeded(status, collsToCreate);
|
||||
}
|
||||
return true;
|
||||
});
|
||||
@ -466,18 +489,28 @@ void WriteBatchResponseProcessor::queueOpForRetry(const WriteOp& op,
|
||||
toRetry.emplace(op);
|
||||
}
|
||||
|
||||
void WriteBatchResponseProcessor::queueOpForRetry(const WriteOp& op,
|
||||
const Status& status,
|
||||
std::set<WriteOp>& toRetry,
|
||||
CollectionsToCreate& collsToCreate) const {
|
||||
toRetry.emplace(op);
|
||||
|
||||
if (auto info = getCannotImplicitlyCreateCollectionInfo(status)) {
|
||||
void WriteBatchResponseProcessor::queueCreateCollectionIfNeeded(
|
||||
std::shared_ptr<const CannotImplicitlyCreateCollectionInfo> info,
|
||||
CollectionsToCreate& collsToCreate) {
|
||||
// If 'info' is not null, add it to 'collsToCreate'.
|
||||
if (info) {
|
||||
auto nss = info->getNss();
|
||||
collsToCreate.emplace(std::move(nss), std::move(info));
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBatchResponseProcessor::queueCreateCollectionIfNeeded(
|
||||
const Status& status, CollectionsToCreate& collsToCreate) {
|
||||
return queueCreateCollectionIfNeeded(getCannotImplicitlyCreateCollectionInfo(status),
|
||||
collsToCreate);
|
||||
}
|
||||
|
||||
void WriteBatchResponseProcessor::queueCreateCollectionIfNeeded(
|
||||
const ItemVariant& itemVar, CollectionsToCreate& collsToCreate) {
|
||||
return queueCreateCollectionIfNeeded(getCannotImplicitlyCreateCollectionInfo(itemVar),
|
||||
collsToCreate);
|
||||
}
|
||||
|
||||
ShardResult WriteBatchResponseProcessor::onShardResponse(OperationContext* opCtx,
|
||||
RoutingContext& routingCtx,
|
||||
const ShardId& shardId,
|
||||
@ -653,40 +686,68 @@ ItemVariant WriteBatchResponseProcessor::makeErrorItem(const WriteOp& op,
|
||||
}
|
||||
}
|
||||
|
||||
GroupItemsResult WriteBatchResponseProcessor::groupItemsByOp(
|
||||
OperationContext* opCtx, std::vector<std::pair<ShardId, ShardResult>>& shardResults) const {
|
||||
const bool ordered = _cmdRef.getOrdered();
|
||||
ItemsByOpMap WriteBatchResponseProcessor::groupItemsByOp(
|
||||
OperationContext* opCtx,
|
||||
const std::vector<std::pair<ShardId, ShardResult>>& shardResults,
|
||||
const absl::flat_hash_set<WriteOpId>& opsUsingSVIgnored) const {
|
||||
const bool inTransaction = static_cast<bool>(TransactionRouter::get(opCtx));
|
||||
|
||||
auto isUnrecoverable = [&](const ItemVariant& itemVar) {
|
||||
const auto code = getErrorCode(itemVar);
|
||||
return (code != ErrorCodes::OK &&
|
||||
(inTransaction || (ordered && !write_op_helpers::isRetryErrCode(code))));
|
||||
auto isOKItemForPair = [](auto&& p) {
|
||||
return isOKItem(p.second);
|
||||
};
|
||||
|
||||
// Organize the items from the shard responses by op, and check if any of the items is an
|
||||
// unrecoverable error.
|
||||
ItemsByOpMap itemsByOp;
|
||||
bool unrecoverable = false;
|
||||
bool hasRetryableError = false;
|
||||
auto isCollUUIDMismatchWithoutActualNamespace = [&](const ItemVariant& itemVar) -> bool {
|
||||
if (const auto* bwItem = get_if<BulkWriteReplyItem>(&itemVar)) {
|
||||
return write_op_helpers::isCollUUIDMismatchWithoutActualNamespace(bwItem->getStatus());
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
ItemsByOpMap itemMap;
|
||||
absl::flat_hash_set<WriteOp> opsToRevisit;
|
||||
|
||||
// Organize the items by op and store them into 'itemMap'.
|
||||
for (const auto& [shardId, shardResult] : shardResults) {
|
||||
for (auto& [op, itemVar] : shardResult.items) {
|
||||
unrecoverable |= isUnrecoverable(itemVar);
|
||||
hasRetryableError |= isRetryableError(itemVar);
|
||||
for (const auto& [op, itemVar] : shardResult.items) {
|
||||
const auto opId = getWriteOpId(op);
|
||||
const bool opUsesSVIgnored = !inTransaction && opsUsingSVIgnored.count(opId);
|
||||
|
||||
auto it = itemsByOp.find(op);
|
||||
if (it != itemsByOp.end()) {
|
||||
it->second.emplace_back(shardId, std::move(itemVar));
|
||||
} else {
|
||||
std::vector<std::pair<ShardId, ItemVariant>> vec;
|
||||
vec.emplace_back(shardId, std::move(itemVar));
|
||||
itemsByOp.emplace(op, std::move(vec));
|
||||
// For ops that use shardVersion IGNORED and are broadcast to all shards, by design we
|
||||
// ignore CollectionUUIDMismatch errors that don't have an actual namespace if the op
|
||||
// was successful on other shards.
|
||||
//
|
||||
// We implement this special behavior by adding the op to 'opsToRevisit' and then
|
||||
// revisiting the op during a second pass (after this loop finishes).
|
||||
if (opUsesSVIgnored && isCollUUIDMismatchWithoutActualNamespace(itemVar)) {
|
||||
opsToRevisit.emplace(op);
|
||||
}
|
||||
|
||||
// Store 'itemVar' into 'itemMap'.
|
||||
auto [it, _] = itemMap.try_emplace(op);
|
||||
it->second.emplace_back(shardId, itemVar);
|
||||
}
|
||||
}
|
||||
|
||||
// Visit the ops in 'opsToRevisit' and ignore CollectionUUIDMismatch errors for these ops
|
||||
// as appropriate.
|
||||
for (auto& op : opsToRevisit) {
|
||||
auto& items = itemMap[op];
|
||||
auto opId = getWriteOpId(op);
|
||||
|
||||
if (opHasSuccess(opId) || std::any_of(items.begin(), items.end(), isOKItemForPair)) {
|
||||
// If 'op' has a successful result recorded already or if 'items' has an OK item, then
|
||||
// remove all CollectionUUIDMismatch errors without an actual namespace from 'items'.
|
||||
std::erase_if(items, [&](auto&& p) {
|
||||
return isCollUUIDMismatchWithoutActualNamespace(p.second);
|
||||
});
|
||||
|
||||
// If 'items' is now empty, remove 'op' from 'itemMap'.
|
||||
if (items.empty()) {
|
||||
itemMap.erase(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return GroupItemsResult{std::move(itemsByOp), unrecoverable, hasRetryableError};
|
||||
return itemMap;
|
||||
}
|
||||
|
||||
void WriteBatchResponseProcessor::processCountersAndRetriedStmtIds(
|
||||
@ -993,13 +1054,14 @@ BulkWriteReplyItem combineErrorReplies(WriteOpId opId, std::vector<BulkWriteRepl
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to populate the "actualCollection" field of a CollectionUUIDMismatch error if it's not
|
||||
* populated already, contacting the primary shard if necessary.
|
||||
* If 'error' is a CollectionUUIDMismatch error without an actual namespace, this function attempts
|
||||
* to populate the "actualCollection" field on 'error', contacting the primary shard if necessary.
|
||||
* Otherwise, this function does nothing.
|
||||
*/
|
||||
void addActualCollectionForCollUUIDMismatchError(OperationContext* opCtx,
|
||||
Status& error,
|
||||
boost::optional<std::string>& actualCollection,
|
||||
bool& hasContactedPrimaryShard) {
|
||||
void addActualNamespaceForCollUUIDMismatchError(OperationContext* opCtx,
|
||||
Status& error,
|
||||
boost::optional<std::string>& actualCollection,
|
||||
bool& hasContactedPrimaryShard) {
|
||||
// Return early if 'error' is not a CollectionUUIDMismatch error or if it's not missing the
|
||||
// "actualCollection" field.
|
||||
auto info = getCollectionUUIDMismatchInfo(error);
|
||||
@ -1063,6 +1125,20 @@ void WriteBatchResponseProcessor::recordTargetErrors(OperationContext* opCtx,
|
||||
}
|
||||
}
|
||||
|
||||
bool WriteBatchResponseProcessor::opHasSuccess(WriteOpId opId) const {
|
||||
auto it = _results.find(opId);
|
||||
if (it == _results.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto visitItem = OverloadedVisitor(
|
||||
[&](const BulkWriteOpResults& result) { return result.hasSuccess; },
|
||||
[&](const FindAndModifyReplyItem& result) { return result.swReply.getStatus().isOK(); });
|
||||
|
||||
const auto& resultVar = it->second;
|
||||
return visit(std::move(visitItem), resultVar);
|
||||
}
|
||||
|
||||
bool WriteBatchResponseProcessor::checkBulkWriteReplyMaxSize(OperationContext* opCtx) {
|
||||
// Cannot exceed the reply size limit if we have no responses.
|
||||
if (_results.empty()) {
|
||||
@ -1092,12 +1168,10 @@ bool WriteBatchResponseProcessor::checkBulkWriteReplyMaxSize(OperationContext* o
|
||||
|
||||
std::vector<std::pair<WriteOpId, boost::optional<BulkWriteReplyItem>>>
|
||||
WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) {
|
||||
const bool inTransaction = static_cast<bool>(TransactionRouter::get(opCtx));
|
||||
|
||||
std::vector<std::pair<WriteOpId, boost::optional<BulkWriteReplyItem>>> aggregatedReplies;
|
||||
std::map<NamespaceString, boost::optional<std::string>> actualCollections;
|
||||
std::map<NamespaceString, bool> hasContactedPrimaryShard;
|
||||
bool hasCollUUIDMismatchErrorsWithNoActualCollection = false;
|
||||
bool hasCollUUIDMismatchErrorsWithoutActualNamespace = false;
|
||||
|
||||
for (const auto& nss : _cmdRef.getNssSet()) {
|
||||
actualCollections.emplace(nss, boost::none);
|
||||
@ -1105,7 +1179,7 @@ WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) {
|
||||
}
|
||||
|
||||
// For CollectionUUIDMismatch errors, this lambda checks if "actualCollection" is set, and
|
||||
// it updates the 'actualCollection' and 'hasCollUUIDMismatchErrorsWithNoActualCollection'
|
||||
// it updates the 'actualCollection' and 'hasCollUUIDMismatchErrorsWithoutActualNamespace'
|
||||
// variables appropriately.
|
||||
auto noteCollUUIDMismatchError = [&](const WriteOp& op, const Status& status) {
|
||||
auto info = getCollectionUUIDMismatchInfo(status);
|
||||
@ -1118,7 +1192,7 @@ WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) {
|
||||
actualCollection = info->actualCollection();
|
||||
}
|
||||
} else {
|
||||
hasCollUUIDMismatchErrorsWithNoActualCollection = true;
|
||||
hasCollUUIDMismatchErrorsWithoutActualNamespace = true;
|
||||
}
|
||||
};
|
||||
|
||||
@ -1150,19 +1224,6 @@ WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// There are errors that are safe to ignore if they were correctly applied to other
|
||||
// shards and we're using ShardVersion::IGNORED. They are safe to ignore as they can be
|
||||
// interpreted as no-ops if the shard response had been instead a successful result
|
||||
// since they wouldn't have modified any data. As a result, we can swallow the errors
|
||||
// and treat them as a successful operation.
|
||||
const bool opIsMulti = _cmdRef.getOp(opId).getMulti();
|
||||
const bool canIgnoreErrors =
|
||||
write_op_helpers::shouldTargetAllShardsSVIgnored(inTransaction, opIsMulti) &&
|
||||
write_op_helpers::isSafeToIgnoreErrorInPartiallyAppliedOp(reply->getStatus());
|
||||
if (canIgnoreErrors) {
|
||||
reply = boost::none;
|
||||
}
|
||||
|
||||
if (reply && reply->getStatus() == ErrorCodes::CollectionUUIDMismatch) {
|
||||
noteCollUUIDMismatchError(op, reply->getStatus());
|
||||
}
|
||||
@ -1199,16 +1260,16 @@ WriteBatchResponseProcessor::finalizeRepliesForOps(OperationContext* opCtx) {
|
||||
}
|
||||
}
|
||||
|
||||
// If there were any CollectionUUIDMismatch errors where "actualCollection" is not set, then
|
||||
// call addActualCollectionForCollUUIDMismatchError() to populate the "actualCollection" field.
|
||||
if (hasCollUUIDMismatchErrorsWithNoActualCollection) {
|
||||
// If there were any CollectionUUIDMismatch errors without an actual namespace, then call
|
||||
// addActualNamespaceForCollUUIDMismatchError() to populate the "actualCollection" field.
|
||||
if (hasCollUUIDMismatchErrorsWithoutActualNamespace) {
|
||||
for (auto& [opId, item] : aggregatedReplies) {
|
||||
if (item && item->getStatus() == ErrorCodes::CollectionUUIDMismatch) {
|
||||
auto op = WriteOp{_cmdRef.getOp(opId)};
|
||||
const NamespaceString& nss = op.getNss();
|
||||
|
||||
auto error = item->getStatus();
|
||||
addActualCollectionForCollUUIDMismatchError(
|
||||
addActualNamespaceForCollUUIDMismatchError(
|
||||
opCtx, error, actualCollections[nss], hasContactedPrimaryShard[nss]);
|
||||
|
||||
item->setStatus(std::move(error));
|
||||
|
||||
@ -98,12 +98,6 @@ public:
|
||||
|
||||
using ItemsByOpMap = std::map<WriteOp, std::vector<std::pair<ShardId, ItemVariant>>>;
|
||||
|
||||
struct GroupItemsResult {
|
||||
ItemsByOpMap itemsByOp;
|
||||
bool unrecoverable = false;
|
||||
bool hasRetryableError = false;
|
||||
};
|
||||
|
||||
struct ShardResult {
|
||||
boost::optional<BatchWriteCommandReply> batchWriteReply;
|
||||
boost::optional<BulkWriteCommandReply> bulkWriteReply;
|
||||
@ -188,6 +182,12 @@ public:
|
||||
return _numOkItemsProcessed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if one or more successful results have been recorded for 'opId', otherwise
|
||||
* returns false.
|
||||
*/
|
||||
bool opHasSuccess(WriteOpId opId) const;
|
||||
|
||||
/**
|
||||
* Returns true if we've exceeded the max reply size, false otherwise. If we have exceeded the
|
||||
* max size, we record this as an error for the next write op.
|
||||
@ -277,13 +277,25 @@ private:
|
||||
void queueOpForRetry(const WriteOp& op, std::set<WriteOp>& toRetry) const;
|
||||
|
||||
/**
|
||||
* Helper method that adds 'op' to 'toRetry', and extracts CannotImplicityCreateCollectionInfo
|
||||
* from 'status' (if it exists) and stores it into 'collsToCreate'.
|
||||
* Helper method that stores the given CannotImplicityCreateCollectionInfo ('info') into
|
||||
* 'collsToCreate'. If 'info' is null, this method does nothing.
|
||||
*/
|
||||
void queueOpForRetry(const WriteOp& op,
|
||||
const Status& status,
|
||||
std::set<WriteOp>& toRetry,
|
||||
CollectionsToCreate& collsToCreate) const;
|
||||
void queueCreateCollectionIfNeeded(
|
||||
std::shared_ptr<const CannotImplicitlyCreateCollectionInfo> info,
|
||||
CollectionsToCreate& collsToCreate);
|
||||
|
||||
/**
|
||||
* Helper method that tries to extract a CannotImplicityCreateCollectionInfo from 'status',
|
||||
* and if successful it stores the CannotImplicityCreateCollectionInfo into 'collsToCreate'.
|
||||
*/
|
||||
void queueCreateCollectionIfNeeded(const Status& status, CollectionsToCreate& collsToCreate);
|
||||
|
||||
/**
|
||||
* Helper method that tries to extract a CannotImplicityCreateCollectionInfo from 'itemVar',
|
||||
* and if successful it stores the CannotImplicityCreateCollectionInfo into 'collsToCreate'.
|
||||
*/
|
||||
void queueCreateCollectionIfNeeded(const ItemVariant& itemVar,
|
||||
CollectionsToCreate& collsToCreate);
|
||||
|
||||
/**
|
||||
* Generate an ItemVariant for 'op' with an error given by 'status', returned from 'shardId'.
|
||||
@ -297,8 +309,9 @@ private:
|
||||
* method also returns a flag indicating if 'shardResults' contains a retryable error, and
|
||||
* another flag indicating if an error occurred that is "unrecoverable".
|
||||
*/
|
||||
GroupItemsResult groupItemsByOp(
|
||||
OperationContext* opCtx, std::vector<std::pair<ShardId, ShardResult>>& shardResults) const;
|
||||
ItemsByOpMap groupItemsByOp(OperationContext* opCtx,
|
||||
const std::vector<std::pair<ShardId, ShardResult>>& shardResults,
|
||||
const absl::flat_hash_set<WriteOpId>& opsUsingSVIgnored) const;
|
||||
|
||||
/**
|
||||
* Gets the error details and upsert details from 'result.batchWriteReply', converts these
|
||||
|
||||
@ -61,6 +61,7 @@ StatusWith<Analysis> WriteOpAnalyzerImpl::analyze(OperationContext* opCtx,
|
||||
const WriteOp& op) try {
|
||||
auto nss = op.getNss();
|
||||
bool isViewfulTimeseries = false;
|
||||
bool usesSVIgnored = false;
|
||||
|
||||
// TODO SERVER-106874 remove the namespace translation check entirely once 9.0 becomes last
|
||||
// LTS. By then we will only have viewless timeseries that do not require nss translation.
|
||||
@ -177,6 +178,9 @@ StatusWith<Analysis> WriteOpAnalyzerImpl::analyze(OperationContext* opCtx,
|
||||
// 'type' is kMultiShard (instead of targeting all shards).
|
||||
tr.endpoints = targetAllShards(opCtx, cri);
|
||||
|
||||
// Record that we set shardVersion to IGNORED for this op.
|
||||
usesSVIgnored = true;
|
||||
|
||||
for (auto& endpoint : tr.endpoints) {
|
||||
auto& shardVersion = endpoint.shardVersion;
|
||||
tassert(11841901,
|
||||
@ -204,7 +208,8 @@ StatusWith<Analysis> WriteOpAnalyzerImpl::analyze(OperationContext* opCtx,
|
||||
recordTargetingStats(opCtx, cri, tr, op);
|
||||
}
|
||||
|
||||
return Analysis{type, std::move(tr.endpoints), isViewfulTimeseries, std::move(sampleId)};
|
||||
return Analysis{
|
||||
type, std::move(tr.endpoints), isViewfulTimeseries, std::move(sampleId), usesSVIgnored};
|
||||
} catch (const DBException& ex) {
|
||||
auto status = ex.toStatus();
|
||||
|
||||
|
||||
@ -62,8 +62,9 @@ struct Analysis {
|
||||
// 'isViewfulTimeseries' is set to true when the write op is on the main namespace of a viewful
|
||||
// timeseries collection. This flag makes sure the executor sends the command with translation
|
||||
// to buckets namespace correctly.
|
||||
bool isViewfulTimeseries;
|
||||
bool isViewfulTimeseries = false;
|
||||
boost::optional<analyze_shard_key::TargetedSampleId> targetedSampleId;
|
||||
bool usesSVIgnored = false;
|
||||
};
|
||||
class WriteOpAnalyzer {
|
||||
public:
|
||||
|
||||
@ -243,11 +243,16 @@ public:
|
||||
}
|
||||
|
||||
const auto& targetedSampleId = analysis.targetedSampleId;
|
||||
|
||||
if (targetedSampleId && targetedSampleId->isFor(shardName)) {
|
||||
auto& request = _batch->requestByShardId[shardName];
|
||||
request.sampleIds.emplace(getWriteOpId(writeOp), targetedSampleId->getId());
|
||||
}
|
||||
}
|
||||
|
||||
if (analysis.usesSVIgnored) {
|
||||
_batch->opsUsingSVIgnored.emplace(getWriteOpId(writeOp));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -63,8 +63,8 @@ struct SimpleWriteBatch {
|
||||
};
|
||||
|
||||
std::map<ShardId, ShardRequest> requestByShardId;
|
||||
|
||||
bool isRetryableWriteWithId = false;
|
||||
absl::flat_hash_set<WriteOpId> opsUsingSVIgnored;
|
||||
|
||||
static SimpleWriteBatch makeEmpty(bool isRetryableWriteWithId) {
|
||||
return SimpleWriteBatch{{}, isRetryableWriteWithId};
|
||||
|
||||
@ -416,7 +416,7 @@ void WriteOp::_updateOpState(OperationContext* opCtx,
|
||||
// have modified any data. As a result, we can swallow the errors and treat them as a
|
||||
// successful operation.
|
||||
if (isTargetingAllShardsWithSVIgnored &&
|
||||
write_op_helpers::isSafeToIgnoreErrorInPartiallyAppliedOp(_error->getStatus()) &&
|
||||
write_op_helpers::isCollUUIDMismatchWithoutActualNamespace(_error->getStatus()) &&
|
||||
!_successfulShardSet.empty()) {
|
||||
if (!hasPendingChild) {
|
||||
_error.reset();
|
||||
|
||||
@ -51,15 +51,7 @@ bool isOnlyTargetDataOwningShardsForMultiWritesEnabled() {
|
||||
return clusterParam->getValue(boost::none).getEnabled();
|
||||
}
|
||||
|
||||
bool shouldTargetAllShardsSVIgnored(bool inTransaction, bool isMulti) {
|
||||
// Fetch the 'onlyTargetDataOwningShardsForMultiWrites' cluster param.
|
||||
if (isMulti && !inTransaction) {
|
||||
return !isOnlyTargetDataOwningShardsForMultiWritesEnabled();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool isSafeToIgnoreErrorInPartiallyAppliedOp(const Status& status) {
|
||||
bool isCollUUIDMismatchWithoutActualNamespace(const Status& status) {
|
||||
return status.code() == ErrorCodes::CollectionUUIDMismatch &&
|
||||
!status.extraInfo<CollectionUUIDMismatchInfo>()->actualCollection();
|
||||
}
|
||||
|
||||
@ -152,21 +152,10 @@ ItemType getFirstNonRetryableError(const std::vector<ItemType>& items, GetCodeFn
|
||||
bool isOnlyTargetDataOwningShardsForMultiWritesEnabled();
|
||||
|
||||
/**
|
||||
* Returns whether an operation should target all shards with ShardVersion::IGNORED(). This is
|
||||
* true for multi: true writes where 'onlyTargetDataOwningShardsForMultiWrites' is false and we are
|
||||
* not in a transaction.
|
||||
* Return true if 'status' contains a CollectionUUIDMismatch error without an actual namespace,
|
||||
* otherwise return false.
|
||||
*/
|
||||
bool shouldTargetAllShardsSVIgnored(bool inTransaction, bool isMulti);
|
||||
|
||||
/**
|
||||
* Used to check if a partially applied (successful on some shards but not others)operation has an
|
||||
* errors that is safe to ignore. UUID mismatch errors are safe to ignore if the actualCollection is
|
||||
* null in conjuntion with other successful operations. This is true because it means we wrongly
|
||||
* targeted a non-owning shard with the operation and we wouldn't have applied any modifications
|
||||
* anyway. Note this is only safe if we're using ShardVersion::IGNORED since we're ignoring any
|
||||
* placement concern and broadcasting to all shards.
|
||||
*/
|
||||
bool isSafeToIgnoreErrorInPartiallyAppliedOp(const Status& status);
|
||||
bool isCollUUIDMismatchWithoutActualNamespace(const Status& status);
|
||||
|
||||
int computeBaseSizeEstimate(OperationContext* opCtx, const BulkWriteCommandRequest& client);
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user