SERVER-114992 Enable more tests and support WCOS metrics in UWE (#45617)

GitOrigin-RevId: ad14164d4e7cc495d3b5b67b680289fd996e3eda
This commit is contained in:
Mihai Andrei 2025-12-24 11:54:07 -05:00 committed by MongoDB Bot
parent 4295703db1
commit 836d6223d4
14 changed files with 111 additions and 66 deletions

View File

@ -1,3 +1,5 @@
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
/**
* Helper function to check a BulkWrite cursorEntry.
*/
@ -387,7 +389,9 @@ export class BulkWriteMetricChecker {
targetedInsert *= retryCount;
}
if (this.timeseries && !this.bulkWrite && updateShardField === "manyShards") {
// Note that in the case of the UWE, we do not double count retryable timeseries updates.
const uweEnabled = isUweEnabled(this.testDB);
if (this.timeseries && !this.bulkWrite && !uweEnabled && updateShardField === "manyShards") {
targetedUpdate = targetedUpdate * 2;
}

View File

@ -6,7 +6,6 @@
import {getTimeseriesCollForDDLOps} from "jstests/core/timeseries/libs/viewless_timeseries_util.js";
import {BulkWriteMetricChecker} from "jstests/libs/bulk_write_utils.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
@ -225,10 +224,7 @@ function runTest(isMongos, cluster, bulkWrite, retryCount, timeseries) {
},
);
// TODO SERVER-114992: Not running in master because of tag but passes with flag off and fails
// with flag on.
const uweEnabled = isUweEnabled(testDB);
if (isMongos && !uweEnabled) {
if (isMongos) {
// Update modifying owning shard requires a transaction or retryable write, we do not want
// actual retries here.
metricChecker.retryCount = 1;

View File

@ -6,7 +6,6 @@
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {QuerySamplingUtil} from "jstests/sharding/analyze_shard_key/libs/query_sampling_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
// Make the periodic jobs for refreshing sample rates and writing sampled queries and diffs have a
// period of 1 second to speed up the test.
@ -25,8 +24,6 @@ const st = new ShardingTest({
mongosOptions: {setParameter: {queryAnalysisSamplerConfigurationRefreshSecs}},
});
const uweEnabled = isUweEnabled(st.s);
const dbName = "testDb";
const collName = "testColl";
const ns = dbName + "." + collName;
@ -154,10 +151,6 @@ const expectedSampledQueryDocs = [];
}),
);
// TODO SERVER-114992: failed to find sampled query.
if (uweEnabled) {
return;
}
// This is a WouldChangeOwningShard update. It causes the document to move from shard0 to
// shard1.
const singleUpdateOp2 = {
@ -297,10 +290,6 @@ const expectedSampledQueryDocs = [];
mongosDB.runCommand({explain: {findAndModify: collName, query: {x: 501}, update: {$set: {y: 501}}}}),
);
// TODO SERVER-114992: failed to find sampled query.
if (uweEnabled) {
return;
}
// This is a WouldChangeOwningShard update. It causes the document to move from shard1 to
// shard2.
const originalCmdObj1 = {

View File

@ -9,7 +9,6 @@ import {ShardingTest} from "jstests/libs/shardingtest.js";
import {extractUUIDFromObject} from "jstests/libs/uuid_util.js";
import {AnalyzeShardKeyUtil} from "jstests/sharding/analyze_shard_key/libs/analyze_shard_key_util.js";
import {QuerySamplingUtil} from "jstests/sharding/analyze_shard_key/libs/query_sampling_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
// This command involves running commands outside a session.
TestData.disableImplicitSessions = true;
@ -40,16 +39,6 @@ const st = new ShardingTest({
mongosOptions: {setParameter: {queryAnalysisSamplerConfigurationRefreshSecs}},
});
// TODO SERVER-114992: failed to find sampled query.
let uweEnabled = false;
st.forEachConnection((conn) => {
uweEnabled = uweEnabled || isUweEnabled(conn);
});
if (uweEnabled) {
st.stop();
quit();
}
const execCtxTypes = {
kNoClientSession: 1,
kClientSessionNotRetryableWrite: 2,

View File

@ -16,7 +16,6 @@ import {
import {assertStagesForExplainOfCommand} from "jstests/libs/query/analyze_plan.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {findChunksUtil} from "jstests/sharding/libs/find_chunks_util.js";
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
const st = new ShardingTest({shards: 2});
const kDbName = jsTestName();
@ -29,7 +28,6 @@ const sessionDB = session.getDatabase(kDbName);
const coll = sessionDB["coll"];
const shard0DB = st.shard0.getDB(kDbName);
const shard1DB = st.shard1.getDB(kDbName);
const uweEnabled = isUweEnabled(st.s);
/**
* Enables profiling on both shards so that we can verify the targeting behaviour.
@ -174,11 +172,8 @@ res = assert.commandWorked(coll.update({a: 22, b: {subObj: "str_0"}, c: "update"
assert.eq(res.nModified, 1, res);
// Verify that the 'update' command gets targeted to 'shard1DB'.
// TODO SERVER-114992: couldn't find matching bulkWrite.
if (!uweEnabled) {
profilerHasAtLeastOneMatchingEntryOrThrow({profileDB: shard1DB, filter: {ns: ns, "op": shardCmdName}});
profilerHasZeroMatchingEntriesOrThrow({profileDB: shard0DB, filter: {ns: ns, "op": shardCmdName}});
}
profilerHasAtLeastOneMatchingEntryOrThrow({profileDB: shard1DB, filter: {ns: ns, "op": shardCmdName}});
profilerHasZeroMatchingEntriesOrThrow({profileDB: shard0DB, filter: {ns: ns, "op": shardCmdName}});
// Verify that the 'count' command gets targeted to 'shard0DB' after the update.
assert.eq(coll.count(updateObj["$set"]), 1);

View File

@ -38,6 +38,7 @@
#include "mongo/logv2/log.h"
#include "mongo/s/write_ops/bulk_write_exec.h"
#include "mongo/s/write_ops/fle.h"
#include "mongo/s/write_ops/unified_write_executor/stats.h"
#include "mongo/s/write_ops/unified_write_executor/unified_write_executor.h"
#include "mongo/util/decorable.h"
@ -48,6 +49,28 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
namespace {
// Utility to port 'uweStats' to 'batchStats'. This is done to enable re-using the existing logic
// for cluster write statistics.
// TODO SERVER-109104: This can be removed once we delete BatchWriteExec.
void fillOutBatchWriteStats(unified_write_executor::Stats& uweStats,
BatchWriteExecStats& batchStats) {
const auto& map = uweStats.getTargetingStatsMap();
if (map.empty()) {
return;
}
tassert(11499200,
"Expected only a single entry in targeting stats map for a batch write",
map.size() == 1 && map.count(0) != 0);
const auto& targetingStats = map.at(0);
batchStats.noteNumShardsOwningChunks(targetingStats.numShardsOwningChunks);
batchStats.noteTargetedCollectionIsSharded(targetingStats.isSharded);
for (const auto& shard : uweStats.getTargetedShards()) {
batchStats.noteTargetedShard(shard);
}
}
} // namespace
namespace cluster {
void write(OperationContext* opCtx,
@ -59,6 +82,7 @@ void write(OperationContext* opCtx,
NotPrimaryErrorTracker::Disabled scopeDisabledTracker(
&NotPrimaryErrorTracker::get(opCtx->getClient()));
tassert(11499201, "Should have initialized 'stats' object", stats);
CollectionRoutingInfoTargeter targeter(opCtx, request.getNS(), targetEpoch);
if (nss) {
*nss = targeter.getNS();
@ -68,9 +92,10 @@ void write(OperationContext* opCtx,
4817400, 2, {logv2::LogComponent::kShardMigrationPerf}, "Starting batch write");
if (unified_write_executor::isEnabled(opCtx)) {
*response = unified_write_executor::write(opCtx, request, targetEpoch);
unified_write_executor::Stats uweStats;
*response = unified_write_executor::write(opCtx, request, uweStats, targetEpoch);
// SERVER-109104 This can be removed once we delete BatchWriteExec.
stats->markIgnore();
fillOutBatchWriteStats(uweStats, *stats);
} else {
// Create an RAII object that prints the collection's shard key in the case of a tassert
// or crash.
@ -102,8 +127,8 @@ bulk_write_exec::BulkWriteReplyInfo bulkWrite(
const std::vector<std::unique_ptr<NSTargeter>>& targeters,
bulk_write_exec::BulkWriteExecStats& execStats) {
if (unified_write_executor::isEnabled(opCtx)) {
execStats.markIgnore();
return unified_write_executor::bulkWrite(opCtx, request);
unified_write_executor::Stats uweStats;
return unified_write_executor::bulkWrite(opCtx, request, uweStats);
} else {
if (request.getNsInfo()[0].getEncryptionInformation().has_value()) {
auto [result, replies] = attemptExecuteFLE(opCtx, request);

View File

@ -74,6 +74,7 @@
#include "mongo/s/query/exec/router_stage_queued_data.h"
#include "mongo/s/would_change_owning_shard_exception.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/unified_write_executor/stats.h"
#include "mongo/s/write_ops/unified_write_executor/unified_write_executor.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/decorable.h"
@ -254,8 +255,10 @@ public:
bulk_write_exec::BulkWriteExecStats execStats;
bulk_write_exec::BulkWriteReplyInfo replyInfo;
unified_write_executor::Stats uweStats;
if (unified_write_executor::isEnabled(opCtx)) {
replyInfo = unified_write_executor::bulkWrite(opCtx, bulkRequest);
replyInfo = unified_write_executor::bulkWrite(opCtx, bulkRequest, uweStats);
execStats.markIgnore();
} else {
// This is used only for the ScopedDebugInfo construction below.
stdx::unordered_map<NamespaceString, boost::optional<BSONObj>>
@ -306,7 +309,11 @@ public:
handleWouldChangeOwningShardError(opCtx, bulkRequest, replyInfo, targeters);
// TODO SERVER-83869 handle BulkWriteExecStats for batches of size > 1 containing
// updates that modify a documents owning shard.
execStats.updateMetrics(opCtx, targeters, updatedShardKey);
if (!execStats.getIgnore()) {
execStats.updateMetrics(opCtx, targeters, updatedShardKey);
} else {
uweStats.updateMetrics(opCtx, updatedShardKey);
}
response = populateCursorReply(opCtx, bulkRequest, request.body, std::move(replyInfo));

View File

@ -59,21 +59,27 @@ void Stats::recordTargetingStats(const std::vector<ShardEndpoint>& targetedShard
}
}
void Stats::updateMetrics(OperationContext* opCtx) {
void Stats::updateMetrics(OperationContext* opCtx, bool updatedShardKey) {
// Record the number of shards targeted by this write.
// TODO SERVER-114992 increment 'nShards' by 1 if we've targeted shards and updated the shard
// key.
CurOp::get(opCtx)->debug().nShards = _targetedShards.size();
for (const auto& [nsIdx, targetingStats] : _targetingStatsMap) {
const bool isSharded = targetingStats.isSharded;
const int nShardsOwningChunks = targetingStats.numShardsOwningChunks;
for (const auto& [writeType, shards] : targetingStats.targetedShardsByWriteType) {
const int perWriteNShards = shards.size();
if (nShardsOwningChunks == 0) {
continue;
}
for (const auto& [writeType, shards] : targetingStats.targetedShardsByWriteType) {
int perWriteNShards = shards.size();
// If we have no information on the shards targeted, ignore updatedShardKey,
// updateHostsTargetedMetrics will report this as TargetType::kManyShards.
if (perWriteNShards != 0 && updatedShardKey) {
perWriteNShards += 1;
}
// TODO SERVER-114992: add one to 'nShards' if updated shard key. This information is
// returned from WCOS handling, see batch_write_exec.cpp
NumHostsTargetedMetrics::QueryType metricsWriteType;
switch (writeType) {
case WriteType::kInsert:
@ -95,12 +101,16 @@ void Stats::updateMetrics(OperationContext* opCtx) {
}
}
void Stats::incrementOpCounters(OperationContext* opCtx, WriteCommandRef::OpRef op) {
void Stats::incrementOpCounters(OperationContext* opCtx,
WriteCommandRef::OpRef op,
bool statusOkOrNotWCOS) {
if (op.isInsertOp()) {
serviceOpCounters(ClusterRole::RouterServer).gotInsert();
} else if (op.isUpdateOp()) {
serviceOpCounters(ClusterRole::RouterServer).gotUpdate();
if (statusOkOrNotWCOS) {
serviceOpCounters(ClusterRole::RouterServer).gotUpdate();
}
auto updateRef = op.getUpdateOp();
// 'isMulti' is set to false as the metrics for multi updates were registered

View File

@ -67,12 +67,22 @@ public:
/**
* Method to update CurOp and NumHostsTargetedMetrics.
*/
void updateMetrics(OperationContext* opCtx);
void updateMetrics(OperationContext* opCtx, bool updatedShardKey);
/**
* Helper to increment query counters for 'op'.
*/
void incrementOpCounters(OperationContext* opCtx, WriteCommandRef::OpRef op);
void incrementOpCounters(OperationContext* opCtx,
WriteCommandRef::OpRef op,
bool statusOkOrNotWCOS);
const stdx::unordered_map<size_t, TargetingStats>& getTargetingStatsMap() {
return _targetingStatsMap;
}
const stdx::unordered_set<ShardId>& getTargetedShards() {
return _targetedShards;
}
private:
stdx::unordered_map<size_t, TargetingStats> _targetingStatsMap;

View File

@ -69,6 +69,7 @@ bool isNonVerboseWriteCommand(OperationContext* opCtx, WriteCommandRef cmdRef) {
WriteCommandResponse executeWriteCommand(OperationContext* opCtx,
WriteCommandRef cmdRef,
unified_write_executor::Stats& stats,
BSONObj originalCommand,
boost::optional<OID> targetEpoch) {
@ -76,7 +77,6 @@ WriteCommandResponse executeWriteCommand(OperationContext* opCtx,
const bool isNonVerbose = isNonVerboseWriteCommand(opCtx, cmdRef);
Stats stats;
WriteOpProducer producer(cmdRef);
WriteOpAnalyzerImpl analyzer = WriteOpAnalyzerImpl(stats);
@ -94,12 +94,12 @@ WriteCommandResponse executeWriteCommand(OperationContext* opCtx,
WriteBatchScheduler scheduler(cmdRef, *batcher, executor, processor, targetEpoch);
scheduler.run(opCtx);
stats.updateMetrics(opCtx);
return processor.generateClientResponse(opCtx);
}
BatchedCommandResponse write(OperationContext* opCtx,
const BatchedCommandRequest& request,
unified_write_executor::Stats& stats,
boost::optional<OID> targetEpoch) {
if (request.hasEncryptionInformation()) {
BatchedCommandResponse response;
@ -112,11 +112,12 @@ BatchedCommandResponse write(OperationContext* opCtx,
}
return std::get<BatchedCommandResponse>(
executeWriteCommand(opCtx, WriteCommandRef{request}, BSONObj(), targetEpoch));
executeWriteCommand(opCtx, WriteCommandRef{request}, stats, BSONObj(), targetEpoch));
}
bulk_write_exec::BulkWriteReplyInfo bulkWrite(OperationContext* opCtx,
const BulkWriteCommandRequest& request,
unified_write_executor::Stats& stats,
BSONObj originalCommand) {
if (request.getNsInfo()[0].getEncryptionInformation().has_value()) {
auto [result, replyInfo] = attemptExecuteFLE(opCtx, request);
@ -128,7 +129,7 @@ bulk_write_exec::BulkWriteReplyInfo bulkWrite(OperationContext* opCtx,
}
return std::get<bulk_write_exec::BulkWriteReplyInfo>(
executeWriteCommand(opCtx, WriteCommandRef{request}, originalCommand));
executeWriteCommand(opCtx, WriteCommandRef{request}, stats, originalCommand));
}
// TODO(SERVER-115515) Clean up FAM code in UWE (here and across files in
@ -157,8 +158,10 @@ FindAndModifyCommandResponse findAndModify(
"Cannot specify runtime constants option to a mongos",
request.getLegacyRuntimeConstants() == boost::none);
request.setLegacyRuntimeConstants(Variables::generateRuntimeConstants(opCtx));
unified_write_executor::Stats stats;
return std::get<FindAndModifyCommandResponse>(
executeWriteCommand(opCtx, WriteCommandRef{request}, originalCommand));
executeWriteCommand(opCtx, WriteCommandRef{request}, stats, originalCommand));
}
bool isEnabled(OperationContext* opCtx) {

View File

@ -34,6 +34,7 @@
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/bulk_write_reply_info.h"
#include "mongo/s/write_ops/unified_write_executor/stats.h"
#include "mongo/s/write_ops/write_command_ref.h"
#include "mongo/util/modules.h"
@ -53,6 +54,7 @@ using WriteCommandResponse = std::variant<BatchedCommandResponse,
*/
WriteCommandResponse executeWriteCommand(OperationContext* opCtx,
WriteCommandRef cmdRef,
unified_write_executor::Stats& stats,
BSONObj originalCommand = BSONObj(),
boost::optional<OID> targetEpoch = boost::none);
@ -61,6 +63,7 @@ WriteCommandResponse executeWriteCommand(OperationContext* opCtx,
*/
BatchedCommandResponse write(OperationContext* opCtx,
const BatchedCommandRequest& request,
unified_write_executor::Stats& stats,
boost::optional<OID> targetEpoch = boost::none);
/**
@ -68,6 +71,7 @@ BatchedCommandResponse write(OperationContext* opCtx,
*/
bulk_write_exec::BulkWriteReplyInfo bulkWrite(OperationContext* opCtx,
const BulkWriteCommandRequest& request,
unified_write_executor::Stats& stats,
BSONObj originalCommand = BSONObj());
/**

View File

@ -206,7 +206,8 @@ TEST_F(UnifiedWriteExecutorTest, BulkWriteBasic) {
{NamespaceInfoEntry(nss1), NamespaceInfoEntry(nss2)});
auto future = launchAsync([&]() {
auto replyInfo = bulkWrite(operationContext(), request);
Stats uweStats;
auto replyInfo = bulkWrite(operationContext(), request, uweStats);
auto reply = populateCursorReply(operationContext(), request, request.toBSON(), replyInfo);
auto replyItems = reply.getCursor().getFirstBatch();
ASSERT_EQ(replyItems.size(), 2);
@ -272,7 +273,8 @@ TEST_F(UnifiedWriteExecutorTest, BatchWriteBasic) {
auto future = launchAsync([&]() {
auto resp = write(operationContext(), insertRequest);
Stats uweStats;
auto resp = write(operationContext(), insertRequest, uweStats);
ASSERT(resp.getOk());
ASSERT_FALSE(resp.isErrDetailsSet());
ASSERT_EQ(resp.getN(), 2);
@ -308,7 +310,8 @@ TEST_F(UnifiedWriteExecutorTest, BatchWriteBasic) {
}());
auto updateFuture = launchAsync([&]() {
auto resp = write(operationContext(), updateRequest);
Stats uweStats;
auto resp = write(operationContext(), updateRequest, uweStats);
ASSERT(resp.getOk());
ASSERT_FALSE(resp.isErrDetailsSet());
ASSERT_EQ(resp.getN(), 1);
@ -339,7 +342,8 @@ TEST_F(UnifiedWriteExecutorTest, BatchWriteBasic) {
}());
auto deleteFuture = launchAsync([&]() {
auto resp = write(operationContext(), deleteRequest);
Stats uweStats;
auto resp = write(operationContext(), deleteRequest, uweStats);
ASSERT(resp.getOk());
ASSERT_FALSE(resp.isErrDetailsSet());
ASSERT_EQ(resp.getN(), 1);
@ -361,7 +365,8 @@ TEST_F(UnifiedWriteExecutorTest, BulkWriteImplicitCollectionCreation) {
{NamespaceInfoEntry(nss1)});
auto future = launchAsync([&]() {
auto replyInfo = bulkWrite(operationContext(), request);
Stats uweStats;
auto replyInfo = bulkWrite(operationContext(), request, uweStats);
auto reply = populateCursorReply(operationContext(), request, request.toBSON(), replyInfo);
auto replyItems = reply.getCursor().getFirstBatch();
ASSERT_EQ(replyItems.size(), 1);
@ -432,7 +437,8 @@ TEST_F(UnifiedWriteExecutorTest, OrderedBulkWriteErrorsAndStops) {
request.setOrdered(true);
auto future = launchAsync([&]() {
auto replyInfo = bulkWrite(operationContext(), request);
Stats uweStats;
auto replyInfo = bulkWrite(operationContext(), request, uweStats);
auto reply = populateCursorReply(operationContext(), request, request.toBSON(), replyInfo);
auto replyItems = reply.getCursor().getFirstBatch();
ASSERT_EQ(replyItems.size(), 1);
@ -482,7 +488,8 @@ TEST_F(UnifiedWriteExecutorTest, UnorderedBulkWriteErrorsAndStops) {
auto future = launchAsync([&]() {
auto replyInfo = bulkWrite(operationContext(), request);
Stats uweStats;
auto replyInfo = bulkWrite(operationContext(), request, uweStats);
auto reply = populateCursorReply(operationContext(), request, request.toBSON(), replyInfo);
auto replyItems = reply.getCursor().getFirstBatch();
ASSERT_EQ(replyItems.size(), 2);

View File

@ -1228,6 +1228,11 @@ WriteBatchResponseProcessor::generateClientResponseForBulkWriteCommand(Operation
auto finalResults = finalizeRepliesForOps(opCtx);
for (auto& [id, item] : finalResults) {
// Check to see if we've recieved a WCOS error. If we have, then we should skip incrementing
// the relevant op counters as this will doulbe count our update. Instead, we will increment
// this as part of the WCOS handling in the command layer.
const bool isNotAWCOSError = !item || item->getStatus().isOK() ||
item->getStatus() != ErrorCodes::WouldChangeOwningShard || _cmdRef.getNumOps() > 1;
if (!_isNonVerbose && (!errorsOnly || (item && !item->getStatus().isOK()))) {
tassert(11182221, "Expected a BulkWriteReplyItem", item.has_value());
@ -1238,10 +1243,9 @@ WriteBatchResponseProcessor::generateClientResponseForBulkWriteCommand(Operation
item->getIdx(),
id),
static_cast<WriteOpId>(item->getIdx()) == id);
results.emplace_back(std::move(*item));
}
_stats.incrementOpCounters(opCtx, _cmdRef.getOp(id));
_stats.incrementOpCounters(opCtx, _cmdRef.getOp(id), isNotAWCOSError);
}
bulk_write_exec::SummaryFields fields(

View File

@ -139,6 +139,8 @@ StatusWith<Analysis> WriteOpAnalyzerImpl::analyze(OperationContext* opCtx,
!(tr.useTwoPhaseWriteProtocol || tr.isNonTargetedRetryableWriteWithId));
// Note we do not translate viewful timeseries collection namespace here, it will be
// translated within the transaction when we analyze the request again.
// Note also that we do not record any targeting stats here as we will do so when we analyze
// the request a second time.
return Analysis{AnalysisType::kInternalTransaction,
std::move(tr.endpoints),
false /* isTimeseries */,