SERVER-121474 Propagate TransientTransactionError label for FLE BulkWrites (#53650)
GitOrigin-RevId: 647332c25854a2f2e53685f81afcc595968a987a
This commit is contained in:
parent
29df950c07
commit
b8135565ee
@ -32,7 +32,7 @@ executor:
|
||||
internalQueryStatsErrorsAreCommandFatal: true
|
||||
internalQueryStatsRateLimit: -1
|
||||
internalQueryStatsWriteCmdSampleRate: 1
|
||||
num_mongos: 1
|
||||
num_mongos: 3
|
||||
num_shards: 2
|
||||
hooks:
|
||||
- class: RunQueryStats
|
||||
|
||||
@ -5,7 +5,3 @@ excludes:
|
||||
- "bulk_write.single_op_core_excluded_files"
|
||||
- "bulk_write.single_op_txns_excluded_files"
|
||||
- "bulk_write.single_op_fle_excluded_files"
|
||||
overrides:
|
||||
# TODO (SERVER-121474) - Remove this override once bulkWrite attaches errorLabels for transient
|
||||
# transaction errors in FLE operations.
|
||||
- "bulk_write.sharding_temporarily_enforce_1_mongos"
|
||||
|
||||
@ -58,14 +58,6 @@
|
||||
# and any writes performed are simply for setting up the test collection.
|
||||
- src/mongo/db/modules/enterprise/jstests/fle2/query/aggregate_lookupMultiSchema.js
|
||||
|
||||
# TODO (SERVER-121474) - Remove this override once bulkWrite attaches errorLabels for transient
|
||||
# transaction errors in FLE operations.
|
||||
- name: sharding_temporarily_enforce_1_mongos
|
||||
value:
|
||||
executor:
|
||||
fixture:
|
||||
num_mongos: 1
|
||||
|
||||
- name: txn_passthrough_runner_selftest
|
||||
value:
|
||||
exclude_files:
|
||||
|
||||
36
jstests/core/query/bulk/bulk_api_error_labels.js
Normal file
36
jstests/core/query/bulk/bulk_api_error_labels.js
Normal file
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Tests that errorLabels in bulk write command responses are propagated through the bulk API
|
||||
* to BulkWriteResult and BulkWriteError.
|
||||
*
|
||||
* @tags: [
|
||||
* not_allowed_with_signed_security_token,
|
||||
* does_not_support_repeated_reads,
|
||||
* no_selinux,
|
||||
* ]
|
||||
*/
|
||||
|
||||
const coll = db[jsTestName()];
|
||||
coll.drop();
|
||||
|
||||
const labelsResult = {
|
||||
ok: 1,
|
||||
nInserted: 1,
|
||||
nUpserted: 0,
|
||||
nMatched: 0,
|
||||
nModified: 0,
|
||||
nRemoved: 0,
|
||||
upserted: [],
|
||||
writeErrors: [],
|
||||
writeConcernErrors: [],
|
||||
errorLabels: ["TransientTransactionError"],
|
||||
};
|
||||
|
||||
// BulkWriteResult exposes errorLabels from bulkResult.
|
||||
const bwr = new BulkWriteResult(labelsResult, null, null);
|
||||
assert(bwr.hasOwnProperty("errorLabels"), "BulkWriteResult should carry errorLabels: " + tojson(bwr));
|
||||
assert.eq(["TransientTransactionError"], bwr.errorLabels);
|
||||
|
||||
// WriteResult exposes errorLabels from bulkResult.
|
||||
const wr = new WriteResult(labelsResult, null, null);
|
||||
assert(wr.hasOwnProperty("errorLabels"), "WriteResult should carry errorLabels: " + tojson(wr));
|
||||
assert.eq(["TransientTransactionError"], wr.errorLabels);
|
||||
@ -122,6 +122,8 @@ MONGO_FAIL_POINT_DEFINE(fleCrudHangPreFindAndModify);
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(fleCrudPauseNonTxnGetTags);
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(fleCrudThrowTransientTxnError);
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
std::vector<write_ops::WriteError> singleStatusToWriteErrors(const Status& status) {
|
||||
@ -461,6 +463,12 @@ insertSingleDocument(OperationContext* opCtx,
|
||||
fleCrudHangPreInsert.pauseWhileSet();
|
||||
}
|
||||
|
||||
if (MONGO_unlikely(fleCrudThrowTransientTxnError.shouldFail())) {
|
||||
uasserted(
|
||||
ErrorCodes::LockTimeout,
|
||||
"insertSingleDocument failed due to fleCrudThrowTransientTxnError fail point");
|
||||
}
|
||||
|
||||
*reply = uassertStatusOK(processInsert(&queryImpl,
|
||||
edcNss2,
|
||||
*serverPayload2.get(),
|
||||
@ -639,6 +647,10 @@ write_ops::DeleteCommandReply processDelete(OperationContext* opCtx,
|
||||
fleCrudHangPreDelete.pauseWhileSet();
|
||||
}
|
||||
|
||||
if (MONGO_unlikely(fleCrudThrowTransientTxnError.shouldFail())) {
|
||||
uasserted(ErrorCodes::LockTimeout,
|
||||
"processDelete failed due to fleCrudThrowTransientTxnError fail point");
|
||||
}
|
||||
|
||||
*reply = processDelete(&queryImpl, expCtx2, deleteRequest2, efc2);
|
||||
|
||||
@ -762,6 +774,11 @@ write_ops::UpdateCommandReply processUpdate(OperationContext* opCtx,
|
||||
fleCrudHangPreUpdate.pauseWhileSet();
|
||||
}
|
||||
|
||||
if (MONGO_unlikely(fleCrudThrowTransientTxnError.shouldFail())) {
|
||||
uasserted(ErrorCodes::LockTimeout,
|
||||
"processUpdate failed due to fleCrudThrowTransientTxnError fail point");
|
||||
}
|
||||
|
||||
*reply = processUpdate(&queryImpl, expCtx2, updateRequest2, efc2);
|
||||
|
||||
if (MONGO_unlikely(fleCrudHangUpdate.shouldFail())) {
|
||||
@ -1105,6 +1122,12 @@ StatusWith<std::pair<ReplyType, OpMsgRequest>> processFindAndModifyRequest(
|
||||
fleCrudHangPreFindAndModify.pauseWhileSet();
|
||||
}
|
||||
|
||||
if (MONGO_unlikely(fleCrudThrowTransientTxnError.shouldFail())) {
|
||||
uasserted(
|
||||
ErrorCodes::LockTimeout,
|
||||
"processFindAndModify failed due to fleCrudThrowTransientTxnError fail point");
|
||||
}
|
||||
|
||||
*reply = processCallback(expCtx, &queryImpl, findAndModifyRequest2);
|
||||
|
||||
if (MONGO_unlikely(fleCrudHangFindAndModify.shouldFail())) {
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/db/commands/query_cmd/bulk_write_common.h"
|
||||
#include "mongo/db/error_labels.h"
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
|
||||
|
||||
@ -221,37 +222,19 @@ std::pair<FLEBatchResult, bulk_write_exec::BulkWriteReplyInfo> attemptExecuteFLE
|
||||
const auto& ops = clientRequest.getOps();
|
||||
BulkWriteCRUDOp firstOp(ops[0]);
|
||||
auto firstOpType = firstOp.getType();
|
||||
try {
|
||||
BatchedCommandResponse response;
|
||||
FLEBatchResult fleResult;
|
||||
BatchedCommandResponse response;
|
||||
FLEBatchResult fleResult;
|
||||
|
||||
BatchedCommandRequest fleRequest = makeFLECommandRequest(opCtx, clientRequest, ops);
|
||||
fleResult = processFLEBatch(opCtx, fleRequest, &response);
|
||||
BatchedCommandRequest fleRequest = makeFLECommandRequest(opCtx, clientRequest, ops);
|
||||
fleResult = processFLEBatch(opCtx, fleRequest, &response);
|
||||
|
||||
if (fleResult == FLEBatchResult::kNotProcessed) {
|
||||
return {FLEBatchResult::kNotProcessed, bulk_write_exec::BulkWriteReplyInfo()};
|
||||
}
|
||||
|
||||
bulk_write_exec::BulkWriteReplyInfo replyInfo = processFLEResponse(
|
||||
opCtx, fleRequest, firstOpType, clientRequest.getErrorsOnly(), response);
|
||||
return {FLEBatchResult::kProcessed, std::move(replyInfo)};
|
||||
} catch (const DBException& ex) {
|
||||
LOGV2_WARNING(7749700,
|
||||
"Failed to process bulkWrite with Queryable Encryption",
|
||||
"error"_attr = redact(ex));
|
||||
// If Queryable encryption adds support for update with multi: true, we might have to update
|
||||
// the way we make replies here to handle SERVER-15292 correctly.
|
||||
bulk_write_exec::BulkWriteReplyInfo replyInfo;
|
||||
BulkWriteReplyItem reply(0, ex.toStatus());
|
||||
reply.setN(0);
|
||||
if (firstOpType == BulkWriteCRUDOp::kUpdate) {
|
||||
reply.setNModified(0);
|
||||
}
|
||||
|
||||
replyInfo.replyItems.push_back(reply);
|
||||
replyInfo.summaryFields.nErrors = 1;
|
||||
return {FLEBatchResult::kProcessed, std::move(replyInfo)};
|
||||
if (fleResult == FLEBatchResult::kNotProcessed) {
|
||||
return {FLEBatchResult::kNotProcessed, bulk_write_exec::BulkWriteReplyInfo()};
|
||||
}
|
||||
|
||||
bulk_write_exec::BulkWriteReplyInfo replyInfo =
|
||||
processFLEResponse(opCtx, fleRequest, firstOpType, clientRequest.getErrorsOnly(), response);
|
||||
return {FLEBatchResult::kProcessed, std::move(replyInfo)};
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -50,7 +50,8 @@ bulk_write_exec::BulkWriteReplyInfo processFLEResponse(OperationContext* opCtx,
|
||||
* Attempt to run the bulkWriteCommandRequest through Queryable Encryption code path.
|
||||
* Returns kNotProcessed if falling back to the regular bulk write code path is needed instead.
|
||||
*
|
||||
* This function does not throw, any errors are reported via the function return.
|
||||
* This function may throw. Errors from FLE CRUD operations are reported via the function return;
|
||||
* errors from request validation or response processing propagate as exceptions.
|
||||
*/
|
||||
std::pair<FLEBatchResult, bulk_write_exec::BulkWriteReplyInfo> attemptExecuteFLE(
|
||||
OperationContext* opCtx, const BulkWriteCommandRequest& clientRequest);
|
||||
|
||||
@ -107,6 +107,9 @@ function WriteResult(bulkResult, singleBatchType, writeConcern) {
|
||||
defineReadOnlyProperty(this, "nMatched", bulkResult.nMatched);
|
||||
defineReadOnlyProperty(this, "nModified", bulkResult.nModified);
|
||||
defineReadOnlyProperty(this, "nRemoved", bulkResult.nRemoved);
|
||||
if (bulkResult.hasOwnProperty("errorLabels")) {
|
||||
defineReadOnlyProperty(this, "errorLabels", bulkResult.errorLabels);
|
||||
}
|
||||
if (bulkResult.upserted.length > 0) {
|
||||
defineReadOnlyProperty(this, "_id", bulkResult.upserted[bulkResult.upserted.length - 1]._id);
|
||||
}
|
||||
@ -219,6 +222,9 @@ function BulkWriteResult(bulkResult, singleBatchType, writeConcern) {
|
||||
defineReadOnlyProperty(this, "nMatched", bulkResult.nMatched);
|
||||
defineReadOnlyProperty(this, "nModified", bulkResult.nModified);
|
||||
defineReadOnlyProperty(this, "nRemoved", bulkResult.nRemoved);
|
||||
if (bulkResult.hasOwnProperty("errorLabels")) {
|
||||
defineReadOnlyProperty(this, "errorLabels", bulkResult.errorLabels);
|
||||
}
|
||||
|
||||
//
|
||||
// Define access methods
|
||||
@ -867,6 +873,15 @@ let Bulk = function (collection, ordered) {
|
||||
if (result.writeConcernError) {
|
||||
bulkResult.writeConcernErrors.push(new WriteConcernError(result.writeConcernError));
|
||||
}
|
||||
|
||||
if (Array.isArray(result.errorLabels)) {
|
||||
if (!bulkResult.errorLabels) bulkResult.errorLabels = [];
|
||||
for (const label of result.errorLabels) {
|
||||
if (!bulkResult.errorLabels.includes(label)) {
|
||||
bulkResult.errorLabels.push(label);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
//
|
||||
|
||||
Loading…
Reference in New Issue
Block a user