SERVER-104122 add support to unified write executor to handle shard key updates in the command layer (#45428)

Co-authored-by: Mihai Andrei <mihai.andrei@mongodb.com>
GitOrigin-RevId: 00a3884c6301876ef31ce70066459bc9833e7d36
This commit is contained in:
Mickey J Winters 2025-12-22 12:05:10 -05:00 committed by MongoDB Bot
parent 37c7455a82
commit 9a3a5e3cdb
38 changed files with 340 additions and 432 deletions

View File

@ -13,11 +13,12 @@ selector:
- jstests/sharding/analyze_shard_key/sample_rates_sharded.js
- jstests/sharding/analyze_shard_key/sample_write_queries_unsharded.js
- jstests/sharding/analyze_shard_key/sample_write_queries_sharded.js
- jstests/sharding/analyze_shard_key/shard_key_updates.js
# - jstests/sharding/applyOps_multiple_namespaces.js
- jstests/sharding/batched_writes_with_id_without_shard_key_basic.js
- jstests/sharding/batched_writes_with_id_without_shard_key_stale_config.js
- jstests/sharding/chunk_migration_with_bulk_write.js
- jstests/sharding/compound_hashed_shard_key_targeting.js
- jstests/sharding/query/compound_hashed_shard_key_targeting.js
# - jstests/sharding/database_versioning_all_commands.js
- jstests/sharding/deleteOne_with_id_without_shard_key_basic.js
- jstests/sharding/deleteOne_with_id_without_shard_key_stale_config.js
@ -30,6 +31,7 @@ selector:
- jstests/sharding/migration_blocking_operation/implicit_create_from_upsert_with_paused_migrations.js
- jstests/sharding/migration_blocking_operation/mongos_calls_shardsvr_coordinate_multi_update_command.js
- jstests/sharding/migration_blocking_operation/pause_during_multi_updates_cluster_parameter_requires_feature_flag.js
- jstests/sharding/move_chunk_update_shard_key_in_retryable_write.js
- jstests/sharding/multi_collection_transaction_placement_conflict_workaround.js
- jstests/sharding/num_hosts_targeted_metrics.js
- jstests/sharding/query/bulk_write_basic.js
@ -38,9 +40,16 @@ selector:
- jstests/sharding/query/find_and_modify/*.js
- jstests/sharding/query/let_rand.js
- jstests/sharding/query/out_merge/**/*.js
# - jstests/sharding/query/update/update_shard_key_bulk_write.js
- jstests/sharding/query/update/update_compound_shard_key.js
- jstests/sharding/query/update/update_shard_key_bulk_write.js
# - jstests/sharding/query/update_delete_many_metrics.js
- jstests/sharding/query/update/update_shard_key_conflicting_writes.js
- jstests/sharding/query/update/update_shard_key_doc_moves_shards.js
- jstests/sharding/query/update/update_shard_key_pipeline_update.js
- jstests/sharding/query/update/update_shard_key_with_dollar_fields.js
- jstests/sharding/query/update/update_shard_key_without_shard_key.js
- jstests/sharding/read_write_concern_defaults_application.js
- jstests/sharding/resharding_update_shard_key_in_retryable_write.js
- jstests/sharding/retryable_update_one_by_id_chunk_migration.js
- jstests/sharding/retryable_upsert_single_write_shard.js
- jstests/sharding/retryable_write_error_labels.js
@ -50,7 +59,13 @@ selector:
- jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js
- jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js
- jstests/sharding/timeseries/**/*.js
- jstests/sharding/timeseries/timeseries_buckets_find_and_modify.js
- jstests/sharding/timeseries/timeseries_find_and_modify_update.js
- jstests/sharding/timeseries/timeseries_shardkey_update.js
- jstests/sharding/timeseries/timeseries_update_one.js
- jstests/sharding/timeseries/timeseries_upsert.js
- jstests/sharding/txn*.js
- jstests/sharding/unshard_collection_update_shard_key_in_retryable_write.js
- jstests/sharding/updateOne_with_id_without_shard_key_basic.js
- jstests/sharding/updateOne_with_id_without_shard_key_stale_config.js
- jstests/sharding/updateOne_without_shard_key/deleteOne_without_shard_key_basic.js
@ -59,6 +74,8 @@ selector:
- jstests/sharding/updateOne_without_shard_key/find_and_modify_without_shard_key_sort.js
- jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js
- jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_sort.js
- jstests/sharding/updateOne_without_shard_key/would_change_owning_shard_test.js
- jstests/sharding/updateOne_without_shard_key/write_without_shard_key_metrics.js
- jstests/sharding/updateOne_without_shard_key/write_without_shard_key_single_shard_data_placement_change.js
- jstests/sharding/updateOne_without_shard_key/bulk_write_without_shard_key_errors_only.js

View File

@ -14,7 +14,6 @@ import {withTxnAndAutoRetry} from "jstests/concurrency/fsm_workload_helpers/auto
import {ConcurrentOperation} from "jstests/concurrency/fsm_workload_helpers/cluster_scalability/move_chunk_errors.js";
import {$config as $baseConfig} from "jstests/concurrency/fsm_workloads/random_moveChunk/random_moveChunk_base.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
export const $config = extendWorkload($baseConfig, function ($config, $super) {
$config.threadCount = 5;
@ -259,6 +258,7 @@ export const $config = extendWorkload($baseConfig, function ($config, $super) {
let performFindAndModify = () => {
try {
// TODO SERVER-114994 findAndModify support in UWE.
const modifiedDoc = collection.findAndModify({
query: {_id: idToUpdate, skey: currentShardKey},
update: this.generateRandomUpdateStyle(idToUpdate, newShardKey, counterForId),
@ -535,12 +535,6 @@ export const $config = extendWorkload($baseConfig, function ($config, $super) {
* document is given to each one.
*/
$config.setup = function setup(db, collName, cluster) {
// TODO SERVER-104122: Handle WCOS error in UWE.
const uweEnabled = isUweEnabled(db);
if (uweEnabled) {
quit();
}
const ns = db[collName].getFullName();
for (let tid = 0; tid < this.threadCount; ++tid) {

View File

@ -15,7 +15,6 @@
import {extendWorkload} from "jstests/concurrency/fsm_libs/extend_workload.js";
import {$config as $baseConfig} from "jstests/concurrency/fsm_workloads/sharded_partitioned/crud_base_partitioned.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
export const $config = extendWorkload($baseConfig, function ($config, $super) {
$config.threadCount = 10;
@ -479,6 +478,7 @@ export const $config = extendWorkload($baseConfig, function ($config, $super) {
containsMatchedDocs,
);
// TODO SERVER-114994 findAndModify support in UWE.
const cmdObj = {
findAndModify: collName,
query: query,
@ -697,12 +697,6 @@ export const $config = extendWorkload($baseConfig, function ($config, $super) {
};
$config.setup = function setup(db, collName, cluster) {
// TODO SERVER-104122: Handle WCOS error in UWE.
const uweEnabled = isUweEnabled(db);
if (uweEnabled) {
quit();
}
// There isn't a way to determine what the thread ids are in setup phase so just assume
// that they are [0, 1, ..., this.threadCount-1].
for (let tid = 0; tid < this.threadCount; ++tid) {

View File

@ -225,7 +225,8 @@ function runTest(isMongos, cluster, bulkWrite, retryCount, timeseries) {
},
);
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
// TODO SERVER-114992: Not running in master because of tag but passes with flag off and fails
// with flag on.
const uweEnabled = isUweEnabled(testDB);
if (isMongos && !uweEnabled) {
// Update modifying owning shard requires a transaction or retryable write, we do not want

View File

@ -9,7 +9,6 @@ import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {flushRoutersAndRefreshShardMetadata} from "jstests/sharding/libs/sharded_transactions_helpers.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
// Verifies the transaction server status response has the fields that we expect.
function verifyServerStatusFields(res) {
@ -229,8 +228,6 @@ const st = new ShardingTest({
},
});
const uweEnabled = isUweEnabled(st.s);
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
const session = st.s.startSession();
@ -719,47 +716,45 @@ jsTest.log("Active transaction.");
const retrySession = st.s.startSession({retryWrites: true});
const retrySessionDB = retrySession.getDatabase(dbName);
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
jsTest.log("Change shard key with retryable write - findAndModify.");
(() => {
// Insert document to be updated.
assert.commandWorked(retrySessionDB[collName].insert({skey: -10}));
jsTest.log("Change shard key with retryable write - findAndModify.");
(() => {
// Insert document to be updated.
assert.commandWorked(retrySessionDB[collName].insert({skey: -10}));
// Retryable write findAndModify that would change the shard key. Uses a txn internally. Throws
// on error.
retrySessionDB[collName].findAndModify({query: {skey: -10}, update: {$set: {skey: 10}}});
// TODO SERVER-114994 findAndModify support in UWE.
// Retryable write findAndModify that would change the shard key. Uses a txn internally. Throws
// on error.
retrySessionDB[collName].findAndModify({query: {skey: -10}, update: {$set: {skey: 10}}});
expectedStats.totalStarted += 1;
expectedStats.totalCommitted += 1;
expectedStats.commitTypes.twoPhaseCommit.initiated += 1;
expectedStats.commitTypes.twoPhaseCommit.successful += 1;
expectedStats.totalContactedParticipants += 2;
expectedStats.totalParticipantsAtCommit += 2;
expectedStats.totalRequestsTargeted += 4;
expectedStats.totalStarted += 1;
expectedStats.totalCommitted += 1;
expectedStats.commitTypes.twoPhaseCommit.initiated += 1;
expectedStats.commitTypes.twoPhaseCommit.successful += 1;
expectedStats.totalContactedParticipants += 2;
expectedStats.totalParticipantsAtCommit += 2;
expectedStats.totalRequestsTargeted += 4;
verifyServerStatusValues(st, expectedStats);
})();
verifyServerStatusValues(st, expectedStats);
})();
jsTest.log("Change shard key with retryable write - batch write command.");
(() => {
// Insert document to be updated.
assert.commandWorked(retrySessionDB[collName].insert({skey: -15}));
jsTest.log("Change shard key with retryable write - batch write command.");
(() => {
// Insert document to be updated.
assert.commandWorked(retrySessionDB[collName].insert({skey: -15}));
// Retryable write update that would change the shard key. Uses a txn internally.
assert.commandWorked(retrySessionDB[collName].update({skey: -15}, {$set: {skey: 15}}));
// Retryable write update that would change the shard key. Uses a txn internally.
assert.commandWorked(retrySessionDB[collName].update({skey: -15}, {$set: {skey: 15}}));
expectedStats.totalStarted += 1;
expectedStats.totalCommitted += 1;
expectedStats.commitTypes.twoPhaseCommit.initiated += 1;
expectedStats.commitTypes.twoPhaseCommit.successful += 1;
expectedStats.totalContactedParticipants += 2;
expectedStats.totalParticipantsAtCommit += 2;
expectedStats.totalRequestsTargeted += 4;
expectedStats.totalStarted += 1;
expectedStats.totalCommitted += 1;
expectedStats.commitTypes.twoPhaseCommit.initiated += 1;
expectedStats.commitTypes.twoPhaseCommit.successful += 1;
expectedStats.totalContactedParticipants += 2;
expectedStats.totalParticipantsAtCommit += 2;
expectedStats.totalRequestsTargeted += 4;
verifyServerStatusValues(st, expectedStats);
})();
}
verifyServerStatusValues(st, expectedStats);
})();
session.endSession();
st.stop();

View File

@ -154,7 +154,7 @@ const expectedSampledQueryDocs = [];
}),
);
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
// TODO SERVER-114992: failed to find sampled query.
if (uweEnabled) {
return;
}
@ -297,7 +297,7 @@ const expectedSampledQueryDocs = [];
mongosDB.runCommand({explain: {findAndModify: collName, query: {x: 501}, update: {$set: {y: 501}}}}),
);
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
// TODO SERVER-114992: failed to find sampled query.
if (uweEnabled) {
return;
}

View File

@ -40,7 +40,7 @@ const st = new ShardingTest({
mongosOptions: {setParameter: {queryAnalysisSamplerConfigurationRefreshSecs}},
});
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
// TODO SERVER-114992: failed to find sampled query.
let uweEnabled = false;
st.forEachConnection((conn) => {
uweEnabled = uweEnabled || isUweEnabled(conn);

View File

@ -25,7 +25,6 @@ import {
flushRoutersAndRefreshShardMetadata,
isUpdateDocumentShardKeyUsingTransactionApiEnabled,
} from "jstests/sharding/libs/sharded_transactions_helpers.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
// For startParallelOps to write its state
let staticMongod = MongoRunner.runMongod({});
@ -37,14 +36,6 @@ let st = new ShardingTest({
rsOptions: {setParameter: {maxTransactionLockRequestTimeoutMillis: ReplSetTest.kDefaultTimeoutMS}},
});
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(st.s);
if (uweEnabled) {
st.stop();
MongoRunner.stopMongod(staticMongod);
quit();
}
const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;
@ -107,6 +98,7 @@ const updateCmdObjBase = {
ordered: false,
};
// TODO SERVER-114994 findAndModifySupport for UWE.
const findAndModifyUpdateCmdObjBase = {
findAndModify: collName,
query: {x: shardKeyValueOnShard0},

View File

@ -174,7 +174,7 @@ res = assert.commandWorked(coll.update({a: 22, b: {subObj: "str_0"}, c: "update"
assert.eq(res.nModified, 1, res);
// Verify that the 'update' command gets targeted to 'shard1DB'.
// TODO SERVER-104122: Handle WCOS error in UWE.
// TODO SERVER-114992: couldn't find matching bulkWrite.
if (!uweEnabled) {
profilerHasAtLeastOneMatchingEntryOrThrow({profileDB: shard1DB, filter: {ns: ns, "op": shardCmdName}});
profilerHasZeroMatchingEntriesOrThrow({profileDB: shard0DB, filter: {ns: ns, "op": shardCmdName}});

View File

@ -9,17 +9,9 @@
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {isUpdateDocumentShardKeyUsingTransactionApiEnabled} from "jstests/sharding/libs/sharded_transactions_helpers.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const st = new ShardingTest({mongos: 1, shards: 3});
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(st.s);
if (uweEnabled) {
st.stop();
quit();
}
const updateDocumentShardKeyUsingTransactionApiEnabled = isUpdateDocumentShardKeyUsingTransactionApiEnabled(st.s);
const kDbName = "update_compound_sk";

View File

@ -11,7 +11,6 @@ import {cursorEntryValidator, cursorSizeValidator, summaryFieldsValidator} from
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {shardCollectionMoveChunks} from "jstests/sharding/libs/update_shard_key_helpers.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const st = new ShardingTest({
mongos: 1,
@ -19,13 +18,6 @@ const st = new ShardingTest({
rsOptions: {setParameter: {maxTransactionLockRequestTimeoutMillis: ReplSetTest.kDefaultTimeoutMS}},
});
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(st.s);
if (uweEnabled) {
st.stop();
quit();
}
const kDbName = "db";
const mongos = st.s0;
const shard0 = st.shard0.shardName;

View File

@ -15,7 +15,6 @@ import {
enableCoordinateCommitReturnImmediatelyAfterPersistingDecision,
waitForFailpoint,
} from "jstests/sharding/libs/sharded_transactions_helpers.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
let st = new ShardingTest({mongos: 1, shards: 2});
let kDbName = "db";
@ -23,13 +22,6 @@ let mongos = st.s0;
let ns = kDbName + ".foo";
let db = mongos.getDB(kDbName);
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(st.s);
if (uweEnabled) {
st.stop();
quit();
}
enableCoordinateCommitReturnImmediatelyAfterPersistingDecision(st);
assert.commandWorked(mongos.adminCommand({enableSharding: kDbName, primaryShard: st.shard0.shardName}));

View File

@ -31,7 +31,6 @@ import {
runUpdateCmdSuccess,
shardCollectionMoveChunks,
} from "jstests/sharding/libs/update_shard_key_helpers.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const st = new ShardingTest({
mongos: 1,
@ -39,13 +38,6 @@ const st = new ShardingTest({
rsOptions: {setParameter: {maxTransactionLockRequestTimeoutMillis: ReplSetTest.kDefaultTimeoutMS}},
});
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(st.s);
if (uweEnabled) {
st.stop();
quit();
}
const kDbName = "db";
const mongos = st.s0;
const shard0 = st.shard0.shardName;

View File

@ -18,7 +18,6 @@ import {
assertCanUpdatePartialShardKey,
assertCanUpdatePrimitiveShardKey,
} from "jstests/sharding/libs/update_shard_key_helpers.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const st = new ShardingTest({
mongos: 1,
@ -50,6 +49,7 @@ const changeShardKeyOptions = [
changeShardKeyOptions.forEach(function (updateConfig) {
let runInTxn, isFindAndModify, upsert;
[runInTxn, isFindAndModify, upsert] = [updateConfig[0], updateConfig[1], updateConfig[2]];
// TODO SERVER-114994 findAndModify support in UWE.
jsTestLog(
"Testing changing the shard key using pipeline style update and " +
@ -169,13 +169,6 @@ changeShardKeyOptions.forEach(function (updateConfig) {
}
});
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(st.s);
if (uweEnabled) {
st.stop();
quit();
}
// Test pipeline updates where the document being updated will move shards.
changeShardKeyOptions.forEach(function (updateConfig) {

View File

@ -6,23 +6,12 @@
*/
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const st = new ShardingTest({
mongos: 1,
shards: {rs0: {nodes: 1}, rs1: {nodes: 1}},
});
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
let uweEnabled = false;
st.forEachConnection((conn) => {
uweEnabled = uweEnabled || isUweEnabled(conn);
});
if (uweEnabled) {
st.stop();
quit();
}
const coll = st.s.getDB("test").getCollection(jsTestName());
const badDoc = {
@ -66,6 +55,7 @@ assert.commandWorked(
const retrySession = st.s.startSession({retryWrites: true});
const sessionColl = retrySession.getDatabase(coll.getDB().getName()).getCollection(coll.getName());
// TODO SERVER-114994 findAndModify support for UWE.
assert.docEq(badDoc, sessionColl.findAndModify({query: {_id: 1}, update: {$set: {shard: 2}}}));
assert.docEq({shard: 2}, sessionColl.findOne({_id: 1}, {_id: 0, shard: 1}));

View File

@ -13,19 +13,11 @@ import {ShardingTest} from "jstests/libs/shardingtest.js";
import {execCtxTypes} from "jstests/noPassthrough/rs_endpoint/lib/util.js";
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
import {makeCommitTransactionCmdObj} from "jstests/sharding/libs/sharded_transactions_helpers.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const st = new ShardingTest({shards: 4});
const mongos = st.s0;
const db = mongos.getDB(jsTestName());
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(db);
if (uweEnabled) {
st.stop();
quit();
}
const coll = db.coll;
coll.drop();
@ -143,6 +135,7 @@ runAllTestsForConfig({replacementUpdate: true});
runAllTestsForConfig({upsert: true, replacementUpdate: true});
// Test the "findAndModify" command.
// TODO SERVER-114994: Add findAndModify support to UWE.
runAllTestsForConfig({findAndModify: true});
runAllTestsForConfig({findAndModify: true, upsert: true});
runAllTestsForConfig({findAndModify: true, replacementUpdate: true});

View File

@ -6,7 +6,6 @@
* @tags: [requires_fcv_60]
*/
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
function runTest(reshardInPlace) {
const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace});
@ -41,13 +40,6 @@ function runTest(reshardInPlace) {
const mongosConn = mongosTestColl.getMongo();
const mongosTestDB = mongosConn.getDB(dbName);
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(mongosTestDB);
if (uweEnabled) {
reshardingTest.teardown();
quit();
}
// Test commands that the shard key of a document in the test collection from change its shard
// key. Note we don't test the remove:true case because the document can't move shards if it is
// being deleted.
@ -59,6 +51,7 @@ function runTest(reshardInPlace) {
txnNumber: NumberLong(1),
};
// TODO SERVER-114994 findAndModify support for UWE.
const findAndModifyUpdateCmdObj = {
findAndModify: collName,
query: {oldShardKey: -2, newShardKey: -2},

View File

@ -25,7 +25,6 @@ import {
testDB,
} from "jstests/core/timeseries/libs/timeseries_writes_util.js";
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
import {getRawOperationSpec, getTimeseriesCollForRawOps} from "jstests/libs/raw_operation_utils.js";
const docs = [doc1_a_nofields, doc2_a_f101, doc3_a_f102, doc4_b_f103, doc5_b_f104, doc6_c_f105, doc7_c_f106];
@ -33,13 +32,13 @@ const docs = [doc1_a_nofields, doc2_a_f101, doc3_a_f102, doc4_b_f103, doc5_b_f10
Random.setRandomSeed();
setUpShardedCluster();
const uweEnabled = isUweEnabled(testDB);
const testBucketDelete = function (queryField) {
const coll = prepareShardedCollection({collName: "testBucketDelete", initialDocList: docs});
const orgBucketDocs = getTimeseriesCollForRawOps(coll.getDB(), coll).find().rawData().toArray();
const bucketDocIdx = Random.randInt(orgBucketDocs.length);
// TODO SERVER-114994 findAndModify support in UWE.
const res = assert.commandWorked(
getTimeseriesCollForRawOps(coll.getDB(), coll).runCommand({
findAndModify: getTimeseriesCollForRawOps(coll.getDB(), coll).getName(),
@ -115,10 +114,7 @@ const testBucketMetaUpdateToOwningShardChange = function (queryField) {
assert(!newBucketDocs.find((e) => e === orgBucketDocs[bucketDocIdx]), tojson(newBucketDocs));
};
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
testBucketMetaUpdateToOwningShardChange("_id");
testBucketMetaUpdateToOwningShardChange("meta");
}
testBucketMetaUpdateToOwningShardChange("_id");
testBucketMetaUpdateToOwningShardChange("meta");
tearDownShardedCluster();

View File

@ -23,17 +23,15 @@ import {
makeBucketFilter,
metaFieldName,
setUpShardedCluster,
testDB,
tearDownShardedCluster,
testDB,
testFindOneAndUpdateOnShardedCollection,
timeFieldName,
} from "jstests/core/timeseries/libs/timeseries_writes_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const docs = [doc1_a_nofields, doc2_a_f101, doc3_a_f102, doc4_b_f103, doc5_b_f104, doc6_c_f105, doc7_c_f106];
setUpShardedCluster();
const uweEnabled = isUweEnabled(testDB);
(function testSortOptionFailsOnShardedCollection() {
testFindOneAndUpdateOnShardedCollection({
@ -68,34 +66,31 @@ const uweEnabled = isUweEnabled(testDB);
// Verifies that the collation is properly propagated to the bucket-level filter when the
// query-level collation overrides the collection default collation. This is a two phase update due
// to the user-specified collation. This should run in a transaction.
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
(function testTwoPhaseUpdateCanHonorCollationOnShardedCollection() {
const returnDoc = Object.assign({}, doc3_a_f102, {[metaFieldName]: "C"});
const copyDocs = docs.map((doc) => Object.assign({}, doc));
const resultDocList = copyDocs.filter((doc) => doc._id !== 3);
resultDocList.push(returnDoc);
(function testTwoPhaseUpdateCanHonorCollationOnShardedCollection() {
const returnDoc = Object.assign({}, doc3_a_f102, {[metaFieldName]: "C"});
const copyDocs = docs.map((doc) => Object.assign({}, doc));
const resultDocList = copyDocs.filter((doc) => doc._id !== 3);
resultDocList.push(returnDoc);
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {
filter: {[metaFieldName]: "a", f: 102},
// This excercises the shard key update in the two phase update.
update: {$set: {[metaFieldName]: "C"}},
returnNew: true,
// caseInsensitive collation
collation: {locale: "en", strength: 2},
},
res: {
resultDocList: resultDocList,
returnDoc: returnDoc,
writeType: "twoPhaseProtocol",
dataBearingShard: "primary",
},
});
})();
}
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {
filter: {[metaFieldName]: "a", f: 102},
// This excercises the shard key update in the two phase update.
update: {$set: {[metaFieldName]: "C"}},
returnNew: true,
// caseInsensitive collation
collation: {locale: "en", strength: 2},
},
res: {
resultDocList: resultDocList,
returnDoc: returnDoc,
writeType: "twoPhaseProtocol",
dataBearingShard: "primary",
},
});
})();
// Query on the meta field and 'f' field leads to a targeted update but no measurement is updated.
(function testTargetedUpdateByNonMatchingFilter() {
@ -208,22 +203,20 @@ const replacementDoc = {
// Query on the 'f' field leads to a two phase update. Replacement-style update. The meta value
// makes the measurement belong to a different shard and the request runs in a transaction. This
// should succeed.
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
(function testTwoPhaseShardKeyUpdateByFieldFilter() {
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {filter: {f: 106}, update: replacementDoc, returnNew: true},
// Don't validate the resultDocList because we don't know which doc will be replaced.
res: {
returnDoc: replacementDoc,
writeType: "twoPhaseProtocol",
dataBearingShard: "other",
},
});
})();
}
(function testTwoPhaseShardKeyUpdateByFieldFilter() {
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {filter: {f: 106}, update: replacementDoc, returnNew: true},
// Don't validate the resultDocList because we don't know which doc will be replaced.
res: {
returnDoc: replacementDoc,
writeType: "twoPhaseProtocol",
dataBearingShard: "other",
},
});
})();
// Query on the meta field and 'f' field leads to a targeted update when the meta field is not
// included in the shard key. Replacement-style update. The new meta value makes the measurement
@ -239,47 +232,42 @@ if (!uweEnabled) {
// Query on the meta field and 'f' field leads to a targeted update when the meta field is included
// in the shard key. Replacement-style update. The new meta value makes the measurement belong to a
// different shard. This should run in a transaction.
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
(function testTargetedShardKeyUpdateByMetaAndFieldFilter() {
const copyDocs = docs.map((doc) => Object.assign({}, doc));
const resultDocList = copyDocs.filter((doc) => doc._id !== 4);
resultDocList.push(replacementDoc);
(function testTargetedShardKeyUpdateByMetaAndFieldFilter() {
const copyDocs = docs.map((doc) => Object.assign({}, doc));
const resultDocList = copyDocs.filter((doc) => doc._id !== 4);
resultDocList.push(replacementDoc);
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {filter: {[metaFieldName]: "B", f: 103}, update: replacementDoc, returnNew: true},
res: {
resultDocList: resultDocList,
returnDoc: replacementDoc,
writeType: "targeted",
dataBearingShard: "other",
// We can't verify explain output because explain can't run in a transaction.
},
});
})();
}
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {filter: {[metaFieldName]: "B", f: 103}, update: replacementDoc, returnNew: true},
res: {
resultDocList: resultDocList,
returnDoc: replacementDoc,
writeType: "targeted",
dataBearingShard: "other",
// We can't verify explain output because explain can't run in a transaction.
},
});
})();
// Meta filter matches all docs with tag: "B" but only update one. The replacement doc has tag: "A"
// and so, the measurement will be moved to a different shard. This should run in a transaction and
// succeed.
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
(function testTargetedShardKeyUpdateByMetaFilter() {
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {filter: {[metaFieldName]: "B"}, update: replacementDoc, returnNew: true},
// Don't validate the resultDocList because we don't know which doc will be replaced.
res: {
returnDoc: replacementDoc,
writeType: "targeted",
dataBearingShard: "other",
},
});
})();
}
(function testTargetedShardKeyUpdateByMetaFilter() {
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {filter: {[metaFieldName]: "B"}, update: replacementDoc, returnNew: true},
// Don't validate the resultDocList because we don't know which doc will be replaced.
res: {
returnDoc: replacementDoc,
writeType: "targeted",
dataBearingShard: "other",
},
});
})();
// The update is targeted but there's actually no match. So, the update becomes an upsert.
(function testTargetedPipelineUpsertByMetaAndFieldFilter() {
@ -320,35 +308,32 @@ if (!uweEnabled) {
// The update is targeted but there's actually no match. The update becomes an upsert but the
// replacement document has a different shard key value.
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
(function testTargetedReplacementUpsertByMetaAndFieldFilter() {
const replacementDoc = Object.assign(
{},
{_id: -100, [metaFieldName]: "A", [timeFieldName]: generateTimeValue(10), f: 2345},
);
const resultDocList = docs.map((doc) => Object.assign({}, doc));
resultDocList.push(replacementDoc);
(function testTargetedReplacementUpsertByMetaAndFieldFilter() {
const replacementDoc = Object.assign(
{},
{_id: -100, [metaFieldName]: "A", [timeFieldName]: generateTimeValue(10), f: 2345},
);
const resultDocList = docs.map((doc) => Object.assign({}, doc));
resultDocList.push(replacementDoc);
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {
filter: {[metaFieldName]: "B", f: 2345},
update: replacementDoc,
upsert: true,
returnNew: true,
},
res: {
resultDocList: resultDocList,
returnDoc: replacementDoc,
writeType: "targeted",
dataBearingShard: "other",
nUpserted: 1,
},
});
})();
}
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
startTxn: true,
cmd: {
filter: {[metaFieldName]: "B", f: 2345},
update: replacementDoc,
upsert: true,
returnNew: true,
},
res: {
resultDocList: resultDocList,
returnDoc: replacementDoc,
writeType: "targeted",
dataBearingShard: "other",
nUpserted: 1,
},
});
})();
(function testTwoPhaseReplacementUpsertByFieldFilter() {
const replacementDoc = Object.assign(
@ -358,6 +343,7 @@ if (!uweEnabled) {
const resultDocList = docs.map((doc) => Object.assign({}, doc));
resultDocList.push(replacementDoc);
// TODO SERVER-114994 findAndModify support in UWE.
testFindOneAndUpdateOnShardedCollection({
initialDocList: docs,
cmd: {

View File

@ -29,19 +29,11 @@ import {
timeFieldName,
} from "jstests/core/timeseries/libs/timeseries_writes_util.js";
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const docs = [doc1_a_nofields, doc2_a_f101, doc3_a_f102, doc4_b_f103, doc5_b_f104, doc6_c_f105, doc7_c_f106];
setUpShardedCluster();
// TODO SERVER-104122: Handle WCOS error in UWE.
const uweEnabled = isUweEnabled(testDB);
if (uweEnabled) {
tearDownShardedCluster();
quit();
}
(function testUpdateMultiModifyingShardKey() {
// This will create a sharded collection with 2 chunks: (MinKey, meta: "A"] and [meta: "B",
// MaxKey).
@ -108,6 +100,7 @@ if (uweEnabled) {
// MaxKey).
const coll = prepareShardedCollection({collName: getCallerName(1), initialDocList: docs, includeMeta: true});
// TODO SERVER-114994 findAndModify support in UWE.
// This findAndModify command tries to update doc5_b_f104 into {_id: 5, meta: "A", f: 104}. The
// owning shard would be the shard that owns (MinKey, meta: "A"].
const findOneAndUpdateCmd = {

View File

@ -20,13 +20,10 @@ import {
st,
timeFieldName,
} from "jstests/core/timeseries/libs/timeseries_writes_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
setUpShardedCluster();
const testDB = getTestDB();
const uweEnabled = isUweEnabled(testDB);
const runTest = function ({
initialDocList,
query,
@ -175,20 +172,17 @@ const runTest = function ({
});
})();
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
(function testTargetSingleShardretryableWriteByReplacementChangeShard() {
runTest({
initialDocList: [doc2_a_f101, doc4_b_f103],
query: {[metaFieldName]: "B"},
update: {[metaFieldName]: "A", [timeFieldName]: generateTimeValue(4), f: 110},
replacement: true,
nModified: 1,
resultDocList: [doc2_a_f101, {_id: 4, [metaFieldName]: "A", [timeFieldName]: generateTimeValue(4), f: 110}],
retryableWrite: true,
});
})();
}
(function testTargetSingleShardretryableWriteByReplacementChangeShard() {
runTest({
initialDocList: [doc2_a_f101, doc4_b_f103],
query: {[metaFieldName]: "B"},
update: {[metaFieldName]: "A", [timeFieldName]: generateTimeValue(4), f: 110},
replacement: true,
nModified: 1,
resultDocList: [doc2_a_f101, {_id: 4, [metaFieldName]: "A", [timeFieldName]: generateTimeValue(4), f: 110}],
retryableWrite: true,
});
})();
(function testTwoPhaseUpdate() {
runTest({

View File

@ -14,12 +14,9 @@ import {
testDB,
timeFieldName,
} from "jstests/core/timeseries/libs/timeseries_writes_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
setUpShardedCluster();
const uweEnabled = isUweEnabled(testDB);
const collName = "sharded_timeseries_upsert";
const dateTime = new ISODate();
@ -287,22 +284,19 @@ function runTest({collConfig, updateOp, upsertedDoc, errorCode, updateShardKey =
});
})();
// TODO SERVER-104122: Handle WCOS error in UWE.
if (!uweEnabled) {
(function testSingleUpdateReplacementDocWouldChangeOwningShard() {
runTest({
collConfig: metaShardKey,
updateOp: {
q: {[metaFieldName]: -1},
u: {[metaFieldName]: 10, [timeFieldName]: dateTime, f: 15},
multi: false,
upsert: true,
},
upsertedDoc: {[metaFieldName]: 10, [timeFieldName]: dateTime, f: 15},
updateShardKey: true,
});
})();
}
(function testSingleUpdateReplacementDocWouldChangeOwningShard() {
runTest({
collConfig: metaShardKey,
updateOp: {
q: {[metaFieldName]: -1},
u: {[metaFieldName]: 10, [timeFieldName]: dateTime, f: 15},
multi: false,
upsert: true,
},
upsertedDoc: {[metaFieldName]: 10, [timeFieldName]: dateTime, f: 15},
updateShardKey: true,
});
})();
(function testSingleUpdateReplacementDocWithNoShardKey() {
runTest({

View File

@ -10,7 +10,6 @@
* ]
*/
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
function runTest(reshardInPlace) {
const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 1, reshardInPlace});
@ -40,16 +39,6 @@ function runTest(reshardInPlace) {
const mongosConn = mongosTestColl.getMongo();
const mongosTestDB = mongosConn.getDB(dbName);
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
let uweEnabled = false;
reshardingTest._st.forEachConnection((conn) => {
uweEnabled = uweEnabled || isUweEnabled(conn);
});
if (uweEnabled) {
reshardingTest.teardown();
quit();
}
// Test commands that the shard key of a document in the test collection from change its shard
// key. Note we don't test the remove:true case because the document can't move shards if it is
// being deleted.
@ -61,6 +50,7 @@ function runTest(reshardInPlace) {
txnNumber: NumberLong(1),
};
// TODO SERVER-114994 findAndModify support in UWE.
const findAndModifyUpdateCmdObj = {
findAndModify: collName,
query: {oldShardKey: -2},

View File

@ -9,7 +9,6 @@
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {WriteWithoutShardKeyTestUtil} from "jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
// Make sure we're testing with no implicit session.
TestData.disableImplicitSessions = true;
@ -17,8 +16,6 @@ TestData.disableImplicitSessions = true;
// 2 shards single node, 1 mongos, 1 config server 3-node
const st = new ShardingTest({});
const uweEnabled = isUweEnabled(st.s);
const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;
@ -380,8 +377,6 @@ const testCases = [
expectedResponse: {n: 1, nModified: 1},
dbName: dbName,
collName: collName,
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
skip: uweEnabled,
},
{
logMessage: "Running a single update where no document matches on the query and {upsert: true}",

View File

@ -13,7 +13,6 @@
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {WriteWithoutShardKeyTestUtil} from "jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
// Make sure we're testing with no implicit session.
TestData.disableImplicitSessions = true;
@ -28,13 +27,6 @@ const shardKey1 = -2;
const shardKey2 = 2;
const docsToInsert = [{_id: 0, x: shardKey1, y: 1}];
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(st.s);
if (uweEnabled) {
st.stop();
quit();
}
// Sets up a 2 shard cluster using 'x' as a shard key where Shard 0 owns x <
// splitPoint and Shard 1 splitPoint >= 0.
WriteWithoutShardKeyTestUtil.setupShardedCollection(
@ -62,6 +54,7 @@ let testCases = [
opType: WriteWithoutShardKeyTestUtil.OperationType.updateOne,
},
{
// TODO SERVER-114994 findAndModify support for UWE.
logMessage: "Running WouldChangeOwningShard findAndModify without shard key",
docsToInsert: docsToInsert,
cmdObj: {

View File

@ -10,7 +10,6 @@
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {WriteWithoutShardKeyTestUtil} from "jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
// 2 shards single node, 1 mongos, 1 config server 3-node.
const st = new ShardingTest({});
@ -56,6 +55,7 @@ function runCommandAndCheckError(testCase) {
const res = st.getDB(dbName).runCommand(testCase.cmdObj);
assert.commandFailedWithCode(res, testCase.errorCode);
// TODO SERVER-114994 findAndModify support in UWE.
// FindAndModify is not a batch command, thus will not have a writeErrors field.
if (!testCase.cmdObj.findAndModify) {
res.writeErrors.forEach((writeError) => {
@ -304,27 +304,23 @@ const WCOStestCases = [
},
];
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
const uweEnabled = isUweEnabled(st.s);
if (!uweEnabled) {
WCOStestCases.forEach((testCase) => {
jsTest.log(testCase.logMessage);
runCommandAndVerify(testCase);
});
WCOStestCases.forEach((testCase) => {
jsTest.log(testCase.logMessage);
runCommandAndVerify(testCase);
});
mongosServerStatus = st.s.getDB(dbName).adminCommand({serverStatus: 1});
mongosServerStatus = st.s.getDB(dbName).adminCommand({serverStatus: 1});
// Verify all counter metrics were updated correctly after the wcos write commands.
assert.eq(2, mongosServerStatus.metrics.query.updateOneTargetedShardedCount);
assert.eq(3, mongosServerStatus.metrics.query.deleteOneTargetedShardedCount);
assert.eq(1, mongosServerStatus.metrics.query.findAndModifyTargetedShardedCount);
assert.eq(1, mongosServerStatus.metrics.query.updateOneUnshardedCount);
assert.eq(1, mongosServerStatus.metrics.query.deleteOneUnshardedCount);
assert.eq(1, mongosServerStatus.metrics.query.findAndModifyUnshardedCount);
assert.eq(7, mongosServerStatus.metrics.query.updateOneNonTargetedShardedCount);
assert.eq(4, mongosServerStatus.metrics.query.deleteOneNonTargetedShardedCount);
assert.eq(3, mongosServerStatus.metrics.query.findAndModifyNonTargetedShardedCount);
}
// Verify all counter metrics were updated correctly after the wcos write commands.
assert.eq(2, mongosServerStatus.metrics.query.updateOneTargetedShardedCount);
assert.eq(3, mongosServerStatus.metrics.query.deleteOneTargetedShardedCount);
assert.eq(1, mongosServerStatus.metrics.query.findAndModifyTargetedShardedCount);
assert.eq(1, mongosServerStatus.metrics.query.updateOneUnshardedCount);
assert.eq(1, mongosServerStatus.metrics.query.deleteOneUnshardedCount);
assert.eq(1, mongosServerStatus.metrics.query.findAndModifyUnshardedCount);
assert.eq(7, mongosServerStatus.metrics.query.updateOneNonTargetedShardedCount);
assert.eq(4, mongosServerStatus.metrics.query.deleteOneNonTargetedShardedCount);
assert.eq(3, mongosServerStatus.metrics.query.findAndModifyNonTargetedShardedCount);
// Insert Docs for error testing.
const insertDocs = [
@ -429,20 +425,17 @@ errorTestCases.forEach((testCase) => {
runCommandAndCheckError(testCase);
});
// TODO SERVER-104122: Enable when 'WouldChangeOwningShard' writes are supported.
if (!uweEnabled) {
mongosServerStatus = st.s.getDB(dbName).adminCommand({serverStatus: 1});
mongosServerStatus = st.s.getDB(dbName).adminCommand({serverStatus: 1});
// Verify all counter metrics were not updated after the error write commands.
assert.eq(2, mongosServerStatus.metrics.query.updateOneTargetedShardedCount);
assert.eq(3, mongosServerStatus.metrics.query.deleteOneTargetedShardedCount);
assert.eq(1, mongosServerStatus.metrics.query.findAndModifyTargetedShardedCount);
assert.eq(1, mongosServerStatus.metrics.query.updateOneUnshardedCount);
assert.eq(1, mongosServerStatus.metrics.query.deleteOneUnshardedCount);
assert.eq(1, mongosServerStatus.metrics.query.findAndModifyUnshardedCount);
assert.eq(12, mongosServerStatus.metrics.query.updateOneNonTargetedShardedCount);
assert.eq(6, mongosServerStatus.metrics.query.deleteOneNonTargetedShardedCount);
assert.eq(4, mongosServerStatus.metrics.query.findAndModifyNonTargetedShardedCount);
}
// Verify all counter metrics were not updated after the error write commands.
assert.eq(2, mongosServerStatus.metrics.query.updateOneTargetedShardedCount);
assert.eq(3, mongosServerStatus.metrics.query.deleteOneTargetedShardedCount);
assert.eq(1, mongosServerStatus.metrics.query.findAndModifyTargetedShardedCount);
assert.eq(1, mongosServerStatus.metrics.query.updateOneUnshardedCount);
assert.eq(1, mongosServerStatus.metrics.query.deleteOneUnshardedCount);
assert.eq(1, mongosServerStatus.metrics.query.findAndModifyUnshardedCount);
assert.eq(12, mongosServerStatus.metrics.query.updateOneNonTargetedShardedCount);
assert.eq(6, mongosServerStatus.metrics.query.deleteOneNonTargetedShardedCount);
assert.eq(4, mongosServerStatus.metrics.query.findAndModifyNonTargetedShardedCount);
st.stop();

View File

@ -103,14 +103,18 @@ bulk_write_exec::BulkWriteReplyInfo bulkWrite(
const BulkWriteCommandRequest& request,
const std::vector<std::unique_ptr<NSTargeter>>& targeters,
bulk_write_exec::BulkWriteExecStats& execStats) {
if (request.getNsInfo()[0].getEncryptionInformation().has_value()) {
auto [result, replies] = attemptExecuteFLE(opCtx, request);
if (result == FLEBatchResult::kProcessed) {
return replies;
} // else fallthrough.
if (unified_write_executor::isEnabled(opCtx)) {
execStats.markIgnore();
return unified_write_executor::bulkWrite(opCtx, request);
} else {
if (request.getNsInfo()[0].getEncryptionInformation().has_value()) {
auto [result, replies] = attemptExecuteFLE(opCtx, request);
if (result == FLEBatchResult::kProcessed) {
return replies;
} // else fallthrough.
}
return bulk_write_exec::execute(opCtx, targeters, request, execStats);
}
return bulk_write_exec::execute(opCtx, targeters, request, execStats);
}
} // namespace cluster

View File

@ -252,8 +252,10 @@ public:
bulkRequest.setLet(expCtx->variables.toBSON(expCtx->variablesParseState, *let));
}
bulk_write_exec::BulkWriteExecStats execStats;
bulk_write_exec::BulkWriteReplyInfo replyInfo;
if (unified_write_executor::isEnabled(opCtx)) {
response = unified_write_executor::bulkWrite(opCtx, bulkRequest);
replyInfo = unified_write_executor::bulkWrite(opCtx, bulkRequest);
} else {
// This is used only for the ScopedDebugInfo construction below.
stdx::unordered_map<NamespaceString, boost::optional<BSONObj>>
@ -288,8 +290,7 @@ public:
// a transaction, where per-operation WC settings are not supported);
// - Once done, The original WC is re-established to allow populateCursorReply
// evaluating whether a reply needs to be returned to the external client.
bulk_write_exec::BulkWriteExecStats execStats;
auto bulkWriteReply = [&] {
replyInfo = [&] {
WriteConcernOptions originalWC = opCtx->getWriteConcern();
ScopeGuard resetWriteConcernGuard(
[opCtx, &originalWC] { opCtx->setWriteConcern(originalWC); });
@ -300,16 +301,15 @@ public:
}
return cluster::bulkWrite(opCtx, bulkRequest, targeters, execStats);
}();
bool updatedShardKey = handleWouldChangeOwningShardError(
opCtx, bulkRequest, bulkWriteReply, targeters);
// TODO SERVER-83869 handle BulkWriteExecStats for batches of size > 1 containing
// updates that modify a documents owning shard.
execStats.updateMetrics(opCtx, targeters, updatedShardKey);
response = populateCursorReply(
opCtx, bulkRequest, request.body, std::move(bulkWriteReply));
}
bool updatedShardKey =
handleWouldChangeOwningShardError(opCtx, bulkRequest, replyInfo, targeters);
// TODO SERVER-83869 handle BulkWriteExecStats for batches of size > 1 containing
// updates that modify a documents owning shard.
execStats.updateMetrics(opCtx, targeters, updatedShardKey);
response = populateCursorReply(opCtx, bulkRequest, request.body, std::move(replyInfo));
result.appendElements(response.toBSON());
return true;
}
@ -394,7 +394,7 @@ public:
// targeted namespace for the first op, as a write that change's a document's owning
// shard must be the only write in the incoming request.
auto firstWriteNSIndex = BulkWriteCRUDOp(request.getOps()[0]).getNsInfoIdx();
auto nss = targeters[firstWriteNSIndex]->getNS();
auto nss = request.getNsInfo()[firstWriteNSIndex].getNs();
bool updatedShardKey = false;
boost::optional<BSONObj> upsertedId;

View File

@ -242,6 +242,9 @@ void BulkWriteExecStats::noteTwoPhaseWriteProtocol(const BulkWriteCommandRequest
void BulkWriteExecStats::updateMetrics(OperationContext* opCtx,
const std::vector<std::unique_ptr<NSTargeter>>& targeters,
bool updatedShardKey) {
if (_ignore) {
return;
}
// Record the number of shards targeted by this bulkWrite.
CurOp::get(opCtx)->debug().nShards = _targetedShards.size();

View File

@ -75,6 +75,17 @@ public:
const std::vector<std::unique_ptr<NSTargeter>>& targeters,
bool updatedShardKey);
/**
* Set of methods to determine whether this 'BulkWriteExecStats' object should be ignored or
* not (that is, whether it should not be used to update targeting or query counter stats).
*/
void markIgnore() {
_ignore = true;
}
bool getIgnore() const {
return _ignore;
}
private:
// Indexed by the namespace index.
stdx::unordered_map<size_t, int> _numShardsOwningChunks;
@ -83,6 +94,7 @@ private:
size_t,
stdx::unordered_map<BatchedCommandRequest::BatchType, stdx::unordered_set<ShardId>>>
_targetedShardsPerNsAndBatchType;
bool _ignore = false;
};
/**

View File

@ -61,7 +61,7 @@ void Stats::recordTargetingStats(const std::vector<ShardEndpoint>& targetedShard
void Stats::updateMetrics(OperationContext* opCtx) {
// Record the number of shards targeted by this write.
// TODO SERVER-104122 increment 'nShards' by 1 if we've targeted shards and updated the shard
// TODO SERVER-114992 increment 'nShards' by 1 if we've targeted shards and updated the shard
// key.
CurOp::get(opCtx)->debug().nShards = _targetedShards.size();
@ -72,7 +72,7 @@ void Stats::updateMetrics(OperationContext* opCtx) {
for (const auto& [writeType, shards] : targetingStats.targetedShardsByWriteType) {
const int perWriteNShards = shards.size();
// TODO SERVER-104122: add one to 'nShards' if updated shard key. This information is
// TODO SERVER-114992: add one to 'nShards' if updated shard key. This information is
// returned from WCOS handling, see batch_write_exec.cpp
NumHostsTargetedMetrics::QueryType metricsWriteType;
switch (writeType) {

View File

@ -43,6 +43,11 @@
namespace mongo {
namespace unified_write_executor {
// Decorator used to cache the value of 'gFeatureFlagUnifiedWriteExecutor' so that it remains
// constant across a query.
const OperationContext::Decoration<boost::optional<bool>> useUweForOpCtx =
OperationContext::declareDecoration<boost::optional<bool>>();
namespace {
bool isNonVerboseWriteCommand(OperationContext* opCtx, WriteCommandRef cmdRef) {
// When determining if a write command is non-verbose, we follow slightly different rules
@ -111,19 +116,19 @@ BatchedCommandResponse write(OperationContext* opCtx,
executeWriteCommand(opCtx, WriteCommandRef{request}, BSONObj(), targetEpoch));
}
BulkWriteCommandReply bulkWrite(OperationContext* opCtx,
const BulkWriteCommandRequest& request,
BSONObj originalCommand) {
bulk_write_exec::BulkWriteReplyInfo bulkWrite(OperationContext* opCtx,
const BulkWriteCommandRequest& request,
BSONObj originalCommand) {
if (request.getNsInfo()[0].getEncryptionInformation().has_value()) {
auto [result, replyInfo] = attemptExecuteFLE(opCtx, request);
if (result == FLEBatchResult::kProcessed) {
return populateCursorReply(opCtx, request, originalCommand, std::move(replyInfo));
return std::move(replyInfo);
}
// When FLE logic determines there is no need of processing, we fall through to the normal
// case.
}
return std::get<BulkWriteCommandReply>(
return std::get<bulk_write_exec::BulkWriteReplyInfo>(
executeWriteCommand(opCtx, WriteCommandRef{request}, originalCommand));
}
@ -158,7 +163,11 @@ FindAndModifyCommandResponse findAndModify(
}
bool isEnabled(OperationContext* opCtx) {
return feature_flags::gFeatureFlagUnifiedWriteExecutor.checkEnabled();
// Cache the value on an opCtx decorator so that the value remains consistent accross a query.
if (!useUweForOpCtx(opCtx).has_value()) {
useUweForOpCtx(opCtx) = feature_flags::gFeatureFlagUnifiedWriteExecutor.checkEnabled();
}
return *useUweForOpCtx(opCtx);
}
} // namespace unified_write_executor

View File

@ -33,6 +33,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/bulk_write_reply_info.h"
#include "mongo/s/write_ops/write_command_ref.h"
#include "mongo/util/modules.h"
@ -43,8 +44,9 @@ struct FindAndModifyCommandResponse {
StatusWith<write_ops::FindAndModifyCommandReply> swReply;
boost::optional<WriteConcernErrorDetail> wce;
};
using WriteCommandResponse =
std::variant<BatchedCommandResponse, BulkWriteCommandReply, FindAndModifyCommandResponse>;
using WriteCommandResponse = std::variant<BatchedCommandResponse,
bulk_write_exec::BulkWriteReplyInfo,
FindAndModifyCommandResponse>;
/**
* This function will execute the specified write command and return a response.
@ -64,9 +66,9 @@ BatchedCommandResponse write(OperationContext* opCtx,
/**
* Helper function for executing bulk commands.
*/
BulkWriteCommandReply bulkWrite(OperationContext* opCtx,
const BulkWriteCommandRequest& request,
BSONObj originalCommand = BSONObj());
bulk_write_exec::BulkWriteReplyInfo bulkWrite(OperationContext* opCtx,
const BulkWriteCommandRequest& request,
BSONObj originalCommand = BSONObj());
/**
* Helper function for executing findAndModify commands.

View File

@ -30,6 +30,7 @@
#include "mongo/s/write_ops/unified_write_executor/unified_write_executor.h"
#include "mongo/db/sharding_environment/sharding_mongos_test_fixture.h"
#include "mongo/s/commands/query_cmd/populate_cursor.h"
#include "mongo/s/write_ops/unified_write_executor/write_op.h"
#include "mongo/unittest/unittest.h"
@ -205,7 +206,8 @@ TEST_F(UnifiedWriteExecutorTest, BulkWriteBasic) {
{NamespaceInfoEntry(nss1), NamespaceInfoEntry(nss2)});
auto future = launchAsync([&]() {
auto reply = bulkWrite(operationContext(), request);
auto replyInfo = bulkWrite(operationContext(), request);
auto reply = populateCursorReply(operationContext(), request, request.toBSON(), replyInfo);
auto replyItems = reply.getCursor().getFirstBatch();
ASSERT_EQ(replyItems.size(), 2);
ASSERT_BSONOBJ_EQ(replyItems[0].toBSON(), BSON("ok" << 1.0 << "idx" << 0 << "n" << 1));
@ -359,7 +361,8 @@ TEST_F(UnifiedWriteExecutorTest, BulkWriteImplicitCollectionCreation) {
{NamespaceInfoEntry(nss1)});
auto future = launchAsync([&]() {
auto reply = bulkWrite(operationContext(), request);
auto replyInfo = bulkWrite(operationContext(), request);
auto reply = populateCursorReply(operationContext(), request, request.toBSON(), replyInfo);
auto replyItems = reply.getCursor().getFirstBatch();
ASSERT_EQ(replyItems.size(), 1);
ASSERT_BSONOBJ_EQ(replyItems[0].toBSON(), BSON("ok" << 1.0 << "idx" << 0 << "n" << 1));
@ -429,7 +432,8 @@ TEST_F(UnifiedWriteExecutorTest, OrderedBulkWriteErrorsAndStops) {
request.setOrdered(true);
auto future = launchAsync([&]() {
auto reply = bulkWrite(operationContext(), request);
auto replyInfo = bulkWrite(operationContext(), request);
auto reply = populateCursorReply(operationContext(), request, request.toBSON(), replyInfo);
auto replyItems = reply.getCursor().getFirstBatch();
ASSERT_EQ(replyItems.size(), 1);
ASSERT_BSONOBJ_EQ(replyItems[0].toBSON(),
@ -478,7 +482,8 @@ TEST_F(UnifiedWriteExecutorTest, UnorderedBulkWriteErrorsAndStops) {
auto future = launchAsync([&]() {
auto reply = bulkWrite(operationContext(), request);
auto replyInfo = bulkWrite(operationContext(), request);
auto reply = populateCursorReply(operationContext(), request, request.toBSON(), replyInfo);
auto replyItems = reply.getCursor().getFirstBatch();
ASSERT_EQ(replyItems.size(), 2);
ASSERT_BSONOBJ_EQ(replyItems[0].toBSON(),

View File

@ -820,7 +820,6 @@ ShardResponse ShardResponse::make(StatusWith<executor::RemoteCommandResponse> sw
// If there was a local error, return a ShardResponse that reports this local error.
if (!swResponse.isOK()) {
// TODO SERVER-104122 Support for 'WouldChangeOwningShard' writes.
const Status& status = swResponse.getStatus();
LOGV2_DEBUG(10896501,

View File

@ -343,7 +343,6 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
OperationContext* opCtx,
RoutingContext& routingCtx,
const NoRetryWriteBatchResponse& response) {
// TODO SERVER-104122 Support for 'WouldChangeOwningShard' writes.
const auto& op = response.getOp();
// Process write concern error (if any).
@ -356,7 +355,6 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
if (response.isError()) {
const auto& status = response.getStatus();
// TODO SERVER-104122 Support for 'WouldChangeOwningShard' writes.
LOGV2_DEBUG(10896500,
4,
"Cluster write op executing in internal transaction failed with error",
@ -1221,8 +1219,8 @@ WriteCommandResponse WriteBatchResponseProcessor::generateClientResponse(Operati
}});
}
BulkWriteCommandReply WriteBatchResponseProcessor::generateClientResponseForBulkWriteCommand(
OperationContext* opCtx) {
bulk_write_exec::BulkWriteReplyInfo
WriteBatchResponseProcessor::generateClientResponseForBulkWriteCommand(OperationContext* opCtx) {
const bool errorsOnly = _cmdRef.getErrorsOnly().value_or(false);
std::vector<BulkWriteReplyItem> results;
@ -1256,9 +1254,15 @@ BulkWriteCommandReply WriteBatchResponseProcessor::generateClientResponseForBulk
info.wcErrors = BulkWriteWriteConcernError{totalWcError->toStatus().code(),
totalWcError->toStatus().reason()};
}
return info;
}
return populateCursorReply(
opCtx, _cmdRef.getBulkWriteCommandRequest(), _originalCommand, std::move(info));
BulkWriteCommandReply WriteBatchResponseProcessor::generateClientResponseForBulkWriteForTest(
OperationContext* opCtx) {
return populateCursorReply(opCtx,
_cmdRef.getBulkWriteCommandRequest(),
_originalCommand,
generateClientResponseForBulkWriteCommand(opCtx));
}
BatchedCommandResponse WriteBatchResponseProcessor::generateClientResponseForBatchedCommand(

View File

@ -150,10 +150,16 @@ public:
BatchedCommandResponse generateClientResponseForBatchedCommand(OperationContext* opCtx);
BulkWriteCommandReply generateClientResponseForBulkWriteCommand(OperationContext* opCtx);
bulk_write_exec::BulkWriteReplyInfo generateClientResponseForBulkWriteCommand(
OperationContext* opCtx);
FindAndModifyCommandResponse generateClientResponseForFindAndModifyCommand();
/**
* Seralizes bulkWriteCommand for use in unit testing.
*/
BulkWriteCommandReply generateClientResponseForBulkWriteForTest(OperationContext* opCtx);
/**
* This method is called by the scheduler to record target errors that occurred during batch
* creation.

View File

@ -124,7 +124,7 @@ TEST_F(WriteBatchResponseProcessorTest, OKReplies) {
{{shard1Name, ShardResponse::make(rcr1, {WriteOp(request, 0)}, inTransaction)},
{shard2Name, ShardResponse::make(rcr2, {WriteOp(request, 1)}, inTransaction)}}});
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNInserted(), 2);
ASSERT_EQ(clientReply.getNMatched(), 6);
ASSERT_EQ(clientReply.getNModified(), 6);
@ -196,7 +196,7 @@ TEST_F(WriteBatchResponseProcessorTest, AllStatisticsCopied) {
opCtx,
routingCtx,
SimpleWriteBatchResponse{{{shard1Name, ShardResponse::make(rcr1, {WriteOp(request, 0)})}}});
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNInserted(), 1);
ASSERT_EQ(clientReply.getNMatched(), 1);
ASSERT_EQ(clientReply.getNModified(), 1);
@ -268,7 +268,7 @@ TEST_F(WriteBatchResponseProcessorTest, MixedErrorsAndOk) {
{shard2Name, ShardResponse::make(rcr2, {op2, op3})},
{shard3Name, ShardResponse::make(rcr3, {op4})}}});
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 2);
// Should still be able to keep processing even if we encountered an inner error.
ASSERT_EQ(clientReply.getNInserted(), 2);
@ -325,7 +325,7 @@ TEST_F(WriteBatchResponseProcessorTest, CreateCollection) {
// Confirm so far we've only processed one error. Copy the processor since generating a
// response consumes the results array.
auto tempProcessor = processor;
auto clientReply = tempProcessor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = tempProcessor.generateClientResponseForBulkWriteForTest(opCtx);
// Should have 0 errors since we can retry CannotImplicitlyCreateCollection
ASSERT_EQ(clientReply.getNErrors(), 0);
// Should still be able to keep processing even if we encountered an inner error.
@ -353,7 +353,7 @@ TEST_F(WriteBatchResponseProcessorTest, CreateCollection) {
ASSERT_EQ(processor.getNumErrorsRecorded(), 0);
ASSERT(result.opsToRetry.empty());
ASSERT(result.collsToCreate.empty());
clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
// Assert we have both ops completed now.
ASSERT_EQ(clientReply.getNErrors(), 0);
@ -402,7 +402,7 @@ TEST_F(WriteBatchResponseProcessorTest, SingleReplyItemForBatchOfThree) {
ASSERT(result.collsToCreate.contains(nss1));
// Assert the generated response is as expected.
auto response = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto response = processor.generateClientResponseForBulkWriteForTest(opCtx);
// Should have 0 errors since we can retry CannotImplicitlyCreateCollection.
ASSERT_EQ(response.getNErrors(), 0);
ASSERT_EQ(response.getNInserted(), 0);
@ -546,7 +546,7 @@ TEST_F(WriteBatchResponseProcessorTest, IdxsCorrectlyRewrittenInReplyItems) {
ASSERT_EQ(result.collsToCreate.size(), 1);
ASSERT(result.collsToCreate.contains(nss1));
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 3);
ASSERT_EQ(clientReply.getNInserted(), 2);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -620,7 +620,7 @@ TEST_F(WriteBatchResponseProcessorTest, RetryStalenessErrors) {
ASSERT(result.collsToCreate.empty());
// Assert errors was not incremented since we can retry Staleness errors.
auto response = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto response = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(response.getNErrors(), 0);
ASSERT_EQ(response.getNInserted(), 0);
}
@ -815,7 +815,7 @@ TEST_F(WriteBatchResponseProcessorTest, RetryShardsCannotRefreshDueToLocksHeldEr
ASSERT(result.collsToCreate.empty());
// Assert errors was not incremented since we can retry ShardsCannotRefresh errors.
auto response = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto response = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(response.getNErrors(), 0);
ASSERT_EQ(response.getNInserted(), 1);
@ -864,7 +864,7 @@ TEST_F(WriteBatchResponseProcessorTest, ProcessesSingleWriteConcernError) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
ASSERT_EQ(clientReply.getNInserted(), 2);
auto batch = clientReply.getCursor().getFirstBatch();
@ -925,7 +925,7 @@ TEST_F(WriteBatchResponseProcessorTest, ProcessesMultipleWriteConcernErrors) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
ASSERT_EQ(clientReply.getNInserted(), 2);
auto batch = clientReply.getCursor().getFirstBatch();
@ -981,7 +981,7 @@ TEST_F(WriteBatchResponseProcessorTest, ProcessesExceededMemoryLimitError) {
{shard2Name, ShardResponse::make(rcr1, {WriteOp(request, 1)})}}});
ASSERT_FALSE(processor.checkBulkWriteReplyMaxSize(opCtx));
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
// Should have an OK response.
@ -1007,7 +1007,7 @@ TEST_F(WriteBatchResponseProcessorTest, ProcessesExceededMemoryLimitError) {
ASSERT_TRUE(processor.checkBulkWriteReplyMaxSize(opCtx));
ASSERT_EQ(processor.getNumErrorsRecorded(), 1);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
// Should have an OK response even if the last error is terminal.
@ -1202,7 +1202,7 @@ TEST_F(WriteBatchResponseProcessorTest, NonVerboseMode) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
ASSERT_EQ(clientReply.getNInserted(), 2);
auto batch = clientReply.getCursor().getFirstBatch();
@ -1237,7 +1237,7 @@ TEST_F(WriteBatchResponseProcessorTest, NonVerboseModeWithErrors) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNInserted(), 0);
auto batch = clientReply.getCursor().getFirstBatch();
@ -1289,7 +1289,7 @@ TEST_F(WriteBatchResponseProcessorTest, NonVerboseModeWithMixedErrorsAndOk) {
SimpleWriteBatchResponse{{{shard1Name, ShardResponse::make(rcr1, {op1})},
{shard2Name, ShardResponse::make(rcr2, {op2, op3})}}});
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNInserted(), 2);
auto batch = clientReply.getCursor().getFirstBatch();
@ -1363,7 +1363,7 @@ TEST_F(WriteBatchResponseProcessorTest, MultiWritesOKReplies) {
{shard2Name, ShardResponse::make(rcr2, {op1, op2, op3})},
{shard3Name, ShardResponse::make(rcr3, {op1, op2, op3})}}});
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
ASSERT_EQ(clientReply.getNModified(), 10);
ASSERT_EQ(clientReply.getNInserted(), 0);
@ -1432,7 +1432,7 @@ TEST_F(WriteBatchResponseProcessorTest, MixedMultiAndNonMultiWritesOKReplies) {
{shard2Name, ShardResponse::make(rcr2, {op1, op2, op3})},
{shard3Name, ShardResponse::make(rcr3, {op1, op3})}}});
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
ASSERT_EQ(clientReply.getNModified(), 11);
ASSERT_EQ(clientReply.getNInserted(), 1);
@ -1502,7 +1502,7 @@ TEST_F(WriteBatchResponseProcessorTest, MultiWriteMixedOKAndRetryableErrorThenOK
ASSERT_EQ(nextResult.opsToRetry.size(), 0);
ASSERT_EQ(nextResult.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
ASSERT_EQ(clientReply.getNModified(), 2);
@ -1577,7 +1577,7 @@ TEST_F(WriteBatchResponseProcessorTest, MultiWriteMixedOKAndRetryableErrorThenNo
ASSERT_EQ(nextResult.opsToRetry.size(), 0);
ASSERT_EQ(nextResult.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNModified(), 1);
@ -1629,7 +1629,7 @@ TEST_F(WriteBatchResponseProcessorTest, MultiWriteMixedOKAndNonRetryableError) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNModified(), 1);
@ -1686,7 +1686,7 @@ TEST_F(WriteBatchResponseProcessorTest, MultiWriteNonRetryableErrors) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -1768,7 +1768,7 @@ TEST_F(WriteBatchResponseProcessorTest, MultiWriteRetryableNonRetryableAndOK) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNModified(), 6);
@ -1926,7 +1926,7 @@ TEST_F(WriteBatchResponseProcessorTxnTest, OKReplies) {
ASSERT_EQ(processor.getNumErrorsRecorded(), 0);
// Confirm the generated bulk reply and batched command response are both correct.
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNInserted(), 2);
ASSERT_EQ(clientReply.getNMatched(), 0);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -2045,7 +2045,7 @@ TEST_F(WriteBatchResponseProcessorTxnTest, NonTransientTransactionErrorInARSHalt
ASSERT_EQ(processor.getNumErrorsRecorded(), 1);
// Confirm the generated bulk reply and batched command response are both correct.
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNInserted(), 1);
ASSERT_EQ(clientReply.getNMatched(), 0);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -2155,7 +2155,7 @@ TEST_F(WriteBatchResponseProcessorTxnTest,
ASSERT_EQ(processor.getNumErrorsRecorded(), 1);
// Confirm the generated bulk reply and batched command response are both correct.
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNInserted(), 1);
ASSERT_EQ(clientReply.getNMatched(), 0);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -2219,7 +2219,7 @@ TEST_F(WriteBatchResponseProcessorTxnTest, NonTransientTransactionErrorInReplyIt
ASSERT_EQ(processor.getNumErrorsRecorded(), 1);
// Confirm the generated bulk reply and batched command response are both correct.
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNInserted(), 0);
ASSERT_EQ(clientReply.getNMatched(), 0);
@ -2272,7 +2272,7 @@ TEST_F(WriteBatchResponseProcessorTxnTest, RetryableErrorInReplyItemHaltsProcess
ASSERT_EQ(processor.getNumErrorsRecorded(), 1);
// Confirm the generated bulk reply and batched command response are both correct.
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNInserted(), 0);
ASSERT_EQ(clientReply.getNMatched(), 0);
@ -2341,7 +2341,7 @@ TEST_F(WriteBatchResponseProcessorTxnTest, ProcessorSetsRetriedStmtIdsInClientRe
ASSERT_EQ(processor.getNumErrorsRecorded(), 0);
// Confirm the generated bulk reply and batched command response are both correct.
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNInserted(), 2);
ASSERT_EQ(clientReply.getNMatched(), 0);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -2406,7 +2406,7 @@ TEST_F(WriteBatchResponseProcessorTest, SimpleWriteErrorsOnlyModeNoError) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
ASSERT_EQ(clientReply.getNInserted(), 0);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -2454,7 +2454,7 @@ TEST_F(WriteBatchResponseProcessorTest, SimpleWriteErrorsOnlyModeWithError) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
ASSERT_EQ(clientReply.getNInserted(), 0);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -2510,7 +2510,7 @@ TEST_F(WriteBatchResponseProcessorTest, SimpleWriteErrorsOnlyModeUnordered) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 2);
ASSERT_EQ(clientReply.getNInserted(), 0);
ASSERT_EQ(clientReply.getNModified(), 0);
@ -2551,7 +2551,7 @@ TEST_F(WriteBatchResponseProcessorTest, TwoPhaseWriteErrorsOnlyModeNoError) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 0);
auto batch = clientReply.getCursor().getFirstBatch();
// Assert that no error response was returned
@ -2587,7 +2587,7 @@ TEST_F(WriteBatchResponseProcessorTest, TwoPhaseWriteErrorsOnlyModeWithError) {
ASSERT_EQ(result.opsToRetry.size(), 0);
ASSERT_EQ(result.collsToCreate.size(), 0);
auto clientReply = processor.generateClientResponseForBulkWriteCommand(opCtx);
auto clientReply = processor.generateClientResponseForBulkWriteForTest(opCtx);
ASSERT_EQ(clientReply.getNErrors(), 1);
auto batch = clientReply.getCursor().getFirstBatch();
// Assert that one error response was returned