SERVER-122063 Attach metrics to insert response on shards for query stats (#53174)

GitOrigin-RevId: fbc375c7864326b5e4ae8ec5ef0045569d49c238
This commit is contained in:
Jinfeng Ni 2026-05-22 14:07:13 -07:00 committed by MongoDB Bot
parent 53c3f3c5fb
commit a509e05dc1
14 changed files with 734 additions and 49 deletions

View File

@ -20,8 +20,8 @@ const kQueryStatsServerParams = {
internalQueryStatsWriteCmdSampleRate: 1,
};
function testSingleDocInsert(testDB, coll, collName) {
coll.drop();
function testSingleDocInsert(testDB, coll, collName, shardConn = null) {
assert.commandWorked(coll.deleteMany({}));
const cmd = {
insert: collName,
@ -51,10 +51,25 @@ function testSingleDocInsert(testDB, coll, collName) {
expectedDocsReturnedMin: 0,
expectedDocsReturnedSumOfSq: 0,
});
if (shardConn) {
// Verify the shard also recorded the insert in its own query stats store.
const shardEntries = getQueryStatsInsertCmd(shardConn, {collName: collName});
assert.eq(shardEntries.length, 1, "Expected shard to have a query stats entry", {shardEntries});
assertAggregatedMetricsSingleExec(shardEntries[0], {
keysExamined: 0,
docsExamined: 0,
hasSortStage: false,
usedDisk: false,
fromMultiPlanner: false,
fromPlanCache: false,
writes: {nMatched: 0, nUpserted: 0, nModified: 0, nDeleted: 0, nInserted: 1, nUpdateOps: 0},
});
}
}
function testMultiDocInsert(testDB, coll, collName) {
coll.drop();
function testMultiDocInsert(testDB, coll, collName, shardConn = null) {
assert.commandWorked(coll.deleteMany({}));
const cmd = {
insert: collName,
@ -84,6 +99,21 @@ function testMultiDocInsert(testDB, coll, collName) {
expectedDocsReturnedMin: 0,
expectedDocsReturnedSumOfSq: 0,
});
if (shardConn) {
// Verify the shard also recorded the insert in its own query stats store.
const shardEntries = getQueryStatsInsertCmd(shardConn, {collName: collName});
assert.eq(shardEntries.length, 1, "Expected shard to have a query stats entry", {shardEntries});
assertAggregatedMetricsSingleExec(shardEntries[0], {
keysExamined: 0,
docsExamined: 0,
hasSortStage: false,
usedDisk: false,
fromMultiPlanner: false,
fromPlanCache: false,
writes: {nMatched: 0, nUpserted: 0, nModified: 0, nDeleted: 0, nInserted: 3, nUpdateOps: 0},
});
}
}
describe("query stats insert command metrics (replica set)", function () {
@ -289,9 +319,7 @@ describe("query stats insert command metrics (replica set)", function () {
});
});
// TODO SERVER-122076: Enable once query stats collection for inserts is wired through the
// sharded write path. Currently this branch only implements collection for standalone/replica set.
describe.skip("query stats insert command metrics (sharded)", function () {
describe("query stats insert command metrics (sharded)", function () {
const collName = jsTestName() + "_sharded";
let st;
let testDB;
@ -301,10 +329,20 @@ describe.skip("query stats insert command metrics (sharded)", function () {
st = new ShardingTest({
shards: 2,
mongosOptions: {setParameter: kQueryStatsServerParams},
rsOptions: {setParameter: kQueryStatsServerParams},
});
testDB = st.s.getDB("test");
coll = testDB[collName];
st.shardColl(coll, {_id: 1}, {_id: 0});
// Pin shard1 as primary so routing is deterministic: negative _ids stay on shard1
// (primary), non-negative _ids move to shard0 (non-primary).
assert.commandWorked(
testDB.adminCommand({enableSharding: testDB.getName(), primaryShard: st.shard1.shardName}),
);
assert.commandWorked(st.s.adminCommand({shardcollection: coll.getFullName(), key: {_id: 1}}));
assert.commandWorked(st.s.adminCommand({split: coll.getFullName(), middle: {_id: 0}}));
assert.commandWorked(
st.s.adminCommand({movechunk: coll.getFullName(), find: {_id: 0}, to: st.shard0.shardName}),
);
});
after(function () {
@ -313,13 +351,15 @@ describe.skip("query stats insert command metrics (sharded)", function () {
beforeEach(function () {
resetQueryStatsStore(st.s, "1MB");
resetQueryStatsStore(st.shard0, "1MB");
resetQueryStatsStore(st.shard1, "1MB");
});
it("should record single-doc insert metrics", function () {
testSingleDocInsert(testDB, coll, collName);
it("should record single-doc insert metrics for a sharded collection", function () {
testSingleDocInsert(testDB, coll, collName, st.shard0);
});
it("should record multi-doc insert metrics", function () {
testMultiDocInsert(testDB, coll, collName);
it("should record multi-doc insert metrics for a sharded collection", function () {
testMultiDocInsert(testDB, coll, collName, st.shard0);
});
});

View File

@ -0,0 +1,396 @@
/**
* Tests that query stats are collected for insert commands routed through mongos and that execution
* metrics from shards are aggregated into the router-side query stats entry.
*
* @tags: [featureFlagQueryStatsInsert]
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {after, before, beforeEach, describe, it} from "jstests/libs/mochalite.js";
import {
assertAggregatedMetricsSingleExec,
assertExpectedResults,
getLatestQueryStatsEntry,
getQueryStatsInsertCmd,
getQueryStatsUpdateCmd,
resetQueryStatsStore,
} from "jstests/libs/query/query_stats_utils.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
describe("query stats insert command metrics (mongos)", function () {
let st;
let mongos;
let testDB;
before(function () {
const queryStatsParams = {
internalQueryStatsWriteCmdSampleRate: 1,
};
st = new ShardingTest({
shards: 2,
mongosOptions: {setParameter: queryStatsParams},
rsOptions: {setParameter: queryStatsParams},
});
mongos = st.s;
testDB = mongos.getDB("test");
assert.commandWorked(
testDB.adminCommand({enableSharding: testDB.getName(), primaryShard: st.shard1.shardName}),
);
});
after(function () {
st?.stop();
});
beforeEach(function () {
resetQueryStatsStore(mongos, "1MB");
});
// Verifies that mongos aggregates execution metrics from all shards when an insert fans out
// to multiple shards. Documents with negative _id values go to shard1 (primary); non-negative to shard0 (non-primary).
describe("multi-shard aggregation", function () {
const collName = jsTestName() + "_multi_shard";
let coll;
before(function () {
coll = testDB[collName];
// shard1 is the primary (set in outer before). Shard on {_id: 1}, split at {_id: 0}:
// negative _ids stay on shard1 (primary), non-negative move to shard0 (non-primary).
assert.commandWorked(st.s.adminCommand({shardcollection: coll.getFullName(), key: {_id: 1}}));
assert.commandWorked(st.s.adminCommand({split: coll.getFullName(), middle: {_id: 0}}));
assert.commandWorked(
st.s.adminCommand({movechunk: coll.getFullName(), find: {_id: 0}, to: st.shard0.shardName}),
);
});
beforeEach(function () {
assert.commandWorked(coll.deleteMany({}));
});
it("should aggregate nInserted from both shards when docs span multiple shards", function () {
// Insert 3 docs to shard0 (non-negative _ids) and 2 docs to shard1 (negative _ids).
const cmd = {
insert: collName,
documents: [
{_id: 1, v: "a"},
{_id: 2, v: "b"},
{_id: 3, v: "c"},
{_id: -1, v: "d"},
{_id: -2, v: "e"},
],
};
const result = assert.commandWorked(testDB.runCommand(cmd));
assert.eq(result.n, 5);
const entry = getLatestQueryStatsEntry(mongos, {collName: coll.getName()});
assert.eq(entry.key.queryShape.command, "insert");
assertAggregatedMetricsSingleExec(entry, {
keysExamined: 0,
docsExamined: 0,
hasSortStage: false,
usedDisk: false,
fromMultiPlanner: false,
fromPlanCache: false,
writes: {
nMatched: 0,
nUpserted: 0,
nModified: 0,
nDeleted: 0,
nInserted: 5,
nUpdateOps: 0,
},
});
assertExpectedResults({
results: entry,
expectedQueryStatsKey: entry.key,
expectedExecCount: 1,
expectedDocsReturnedSum: 0,
expectedDocsReturnedMax: 0,
expectedDocsReturnedMin: 0,
expectedDocsReturnedSumOfSq: 0,
});
});
it("should record nInserted correctly for a single-shard insert", function () {
// All docs have non-negative _id, so they all go to shard0 (non-primary).
const cmd = {
insert: collName,
documents: [
{_id: 10, v: "x"},
{_id: 11, v: "y"},
],
};
const result = assert.commandWorked(testDB.runCommand(cmd));
assert.eq(result.n, 2);
const entry = getLatestQueryStatsEntry(mongos, {collName: coll.getName()});
assert.eq(entry.key.queryShape.command, "insert");
assertAggregatedMetricsSingleExec(entry, {
keysExamined: 0,
docsExamined: 0,
hasSortStage: false,
usedDisk: false,
fromMultiPlanner: false,
fromPlanCache: false,
writes: {
nMatched: 0,
nUpserted: 0,
nModified: 0,
nDeleted: 0,
nInserted: 2,
nUpdateOps: 0,
},
});
assertExpectedResults({
results: entry,
expectedQueryStatsKey: entry.key,
expectedExecCount: 1,
expectedDocsReturnedSum: 0,
expectedDocsReturnedMax: 0,
expectedDocsReturnedMin: 0,
expectedDocsReturnedSumOfSq: 0,
});
});
it("should accumulate execCount and nInserted across multiple insert commands", function () {
// Two separate insert commands with the same shape should accumulate into one entry.
const cmd1 = {insert: collName, documents: [{_id: 20, v: 1}]};
const cmd2 = {insert: collName, documents: [{_id: 21, v: 2}]};
assert.commandWorked(testDB.runCommand(cmd1));
assert.commandWorked(testDB.runCommand(cmd2));
const entries = getQueryStatsInsertCmd(mongos, {collName: collName});
assert.eq(entries.length, 1, "Expected 1 query stats entry: " + tojson(entries));
assert.eq(entries[0].metrics.execCount, 2);
assert.eq(
entries[0].metrics.writes.nInserted.sum,
NumberLong(2),
"nInserted.sum should be 2 (one doc per command)",
);
});
});
// StaleConfig retry: when a shard returns StaleConfig, mongos retries the write internally.
// Query stats should record the insert only once (for the successful execution).
describe("StaleConfig retried insert", function () {
const collName = jsTestName() + "_stale_config";
let coll;
let shard1Primary;
before(function () {
coll = testDB[collName];
assert.commandWorked(testDB.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}}));
shard1Primary = st.rs1.getPrimary();
});
it("should record query stats once despite StaleConfig retry", function () {
resetQueryStatsStore(st.shard1, "1MB");
// Wait for any pending range deletions on shard1 before activating the failpoint,
// since alwaysThrowStaleConfigInfo fires for all namespaces.
assert.soon(
() => shard1Primary.getDB("config").rangeDeletions.find().itcount() === 0,
"Timed out waiting for range deletions on shard1 to complete",
);
const fp = configureFailPoint(shard1Primary, "alwaysThrowStaleConfigInfo", {}, {times: 1});
const result = assert.commandWorked(
testDB.runCommand({
insert: collName,
documents: [{_id: 100, v: "stale_retry"}],
}),
);
assert.eq(result.n, 1);
assert(fp.waitWithTimeout(1000), "alwaysThrowStaleConfigInfo failpoint was never triggered");
const mongosEntries = getQueryStatsInsertCmd(mongos, {collName: collName});
assert.eq(mongosEntries.length, 1, "Expected 1 mongos query stats entry: " + tojson(mongosEntries));
const entry = getLatestQueryStatsEntry(mongos, {collName: coll.getName()});
assert.eq(entry.key.queryShape.command, "insert");
assertAggregatedMetricsSingleExec(entry, {
keysExamined: 0,
docsExamined: 0,
hasSortStage: false,
usedDisk: false,
fromMultiPlanner: false,
fromPlanCache: false,
writes: {
nMatched: 0,
nUpserted: 0,
nModified: 0,
nDeleted: 0,
nInserted: 1,
nUpdateOps: 0,
},
});
assertExpectedResults({
results: entry,
expectedQueryStatsKey: entry.key,
expectedExecCount: 1,
expectedDocsReturnedSum: 0,
expectedDocsReturnedMax: 0,
expectedDocsReturnedMin: 0,
expectedDocsReturnedSumOfSq: 0,
});
fp.off();
});
});
// Verify parseAndRegisterRequest skips command shapes it does not handle:
// - bulkWrite / findAndModify: not a BatchedCommandRequest, so isBatchWriteCommand() is false.
// - delete: a BatchedCommandRequest, but BatchType_Delete is filtered out.
// None of these should produce insert or update entries in mongos query stats.
describe("skip unsupported command types", function () {
const collName = jsTestName() + "_skip";
let coll;
before(function () {
coll = testDB[collName];
// shard1 is the primary (set in outer before). Non-negative _ids go to shard0 (non-primary).
assert.commandWorked(st.s.adminCommand({shardcollection: coll.getFullName(), key: {_id: 1}}));
assert.commandWorked(st.s.adminCommand({split: coll.getFullName(), middle: {_id: 0}}));
assert.commandWorked(
st.s.adminCommand({movechunk: coll.getFullName(), find: {_id: 0}, to: st.shard0.shardName}),
);
});
it("should not record query stats for delete command", function () {
assert.commandWorked(coll.insert({_id: 1, v: 1}));
resetQueryStatsStore(mongos, "1MB");
assert.commandWorked(
testDB.runCommand({
delete: collName,
deletes: [{q: {_id: 1}, limit: 1}],
}),
);
assert.eq(
getQueryStatsInsertCmd(mongos, {collName: collName}).length,
0,
"Expected no insert query stats entry for delete command",
);
assert.eq(
getQueryStatsUpdateCmd(mongos, {collName: collName}).length,
0,
"Expected no update query stats entry for delete command",
);
});
it("should not record query stats for bulkWrite command", function () {
resetQueryStatsStore(mongos, "1MB");
assert.commandWorked(
testDB.adminCommand({
bulkWrite: 1,
ops: [{insert: 0, document: {_id: 50, v: "bulk"}}],
nsInfo: [{ns: coll.getFullName()}],
}),
);
assert.eq(
getQueryStatsInsertCmd(mongos, {collName: collName}).length,
0,
"Expected no insert query stats entry for bulkWrite command",
);
assert.eq(
getQueryStatsUpdateCmd(mongos, {collName: collName}).length,
0,
"Expected no update query stats entry for bulkWrite command",
);
});
it("should not record query stats for findAndModify command", function () {
assert.commandWorked(coll.insert({_id: 60, v: 1}));
resetQueryStatsStore(mongos, "1MB");
assert.commandWorked(
testDB.runCommand({
findAndModify: collName,
query: {_id: 60},
update: {$set: {v: 999}},
}),
);
assert.eq(
getQueryStatsInsertCmd(mongos, {collName: collName}).length,
0,
"Expected no insert query stats entry for findAndModify command",
);
assert.eq(
getQueryStatsUpdateCmd(mongos, {collName: collName}).length,
0,
"Expected no update query stats entry for findAndModify command",
);
});
});
// Timeseries inserts are handled by performTimeseriesWrites on the shard. Verify that
// the shard-side execution metrics (nInserted) are still propagated back to mongos.
describe("timeseries insert metrics", function () {
const timeField = "time";
const metaField = "meta";
const collName = jsTestName() + "_ts";
let coll;
before(function () {
coll = testDB[collName];
// Collection is intentionally left unsharded so it lives entirely on shard1 (primary).
assert.commandWorked(
testDB.createCollection(collName, {
timeseries: {timeField: timeField, metaField: metaField},
}),
);
});
beforeEach(function () {
resetQueryStatsStore(mongos, "1MB");
});
it("should record nInserted correctly for a timeseries insert routed through mongos", function () {
const result = assert.commandWorked(
testDB.runCommand({
insert: collName,
documents: [
{[timeField]: ISODate("2021-05-18T00:00:00.000Z"), v: 1, [metaField]: "a"},
{[timeField]: ISODate("2021-05-18T01:00:00.000Z"), v: 2, [metaField]: "a"},
{[timeField]: ISODate("2021-05-18T02:00:00.000Z"), v: 3, [metaField]: "b"},
],
}),
);
assert.eq(result.n, 3);
const entry = getLatestQueryStatsEntry(mongos, {collName: collName});
assert.eq(entry.key.queryShape.command, "insert");
assertAggregatedMetricsSingleExec(entry, {
keysExamined: 0,
docsExamined: 0,
hasSortStage: false,
usedDisk: false,
fromMultiPlanner: false,
fromPlanCache: false,
writes: {
nMatched: 0,
nUpserted: 0,
nModified: 0,
nDeleted: 0,
nInserted: 3,
nUpdateOps: 0,
},
});
});
});
});

View File

@ -307,35 +307,45 @@ public:
auto [preConditions, isTimeseriesLogicalRequest] =
timeseries::getCollectionPreConditionsAndIsTimeseriesLogicalRequest(
opCtx, ns(), request(), request().getCollectionUUID());
// An insert command always has exactly one operation; its QueryStatsMetrics index is 0.
auto maybeAttachQueryStatsMetrics = [&](write_ops::InsertCommandReply& reply) {
if (request().getIncludeQueryStatsMetrics()) {
std::vector<write_ops::QueryStatsMetrics> metrics;
metrics.emplace_back(0, CurOp::get(opCtx)->debug().getCursorMetrics());
reply.setQueryStatsMetrics(std::move(metrics));
}
};
write_ops::InsertCommandReply insertReply;
if (isTimeseriesLogicalRequest) {
// Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's
// constructor.
try {
return timeseries::write_ops::performTimeseriesWrites(
insertReply = timeseries::write_ops::performTimeseriesWrites(
opCtx, request(), preConditions);
} catch (DBException& ex) {
ex.addContext(str::stream()
<< "time-series insert failed: " << ns().toStringForErrorMsg());
throw;
}
} else {
if (hangInsertBeforeWrite.shouldFail([&](const BSONObj& data) {
const auto fpNss = NamespaceStringUtil::parseFailPointData(data, "ns"_sd);
return fpNss == request().getNamespace();
})) {
hangInsertBeforeWrite.pauseWhileSet();
}
auto reply = write_ops_exec::performInserts(opCtx, request(), preConditions);
populateReply(opCtx,
!request().getWriteCommandRequestBase().getOrdered(),
request().getDocuments().size(),
std::move(reply),
&insertReply);
}
if (hangInsertBeforeWrite.shouldFail([&](const BSONObj& data) {
const auto fpNss = NamespaceStringUtil::parseFailPointData(data, "ns"_sd);
return fpNss == request().getNamespace();
})) {
hangInsertBeforeWrite.pauseWhileSet();
}
auto reply = write_ops_exec::performInserts(opCtx, request(), preConditions);
write_ops::InsertCommandReply insertReply;
populateReply(opCtx,
!request().getWriteCommandRequestBase().getOrdered(),
request().getDocuments().size(),
std::move(reply),
&insertReply);
maybeAttachQueryStatsMetrics(insertReply);
return insertReply;
} catch (const DBException& ex) {
NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code());

View File

@ -540,6 +540,10 @@ bool verifySizeEstimate(const write_ops::UpdateOpEntry& update) {
bool verifySizeEstimate(const InsertCommandRequest& insertReq,
const OpMsgRequest* unparsedRequest) {
int size = getInsertHeaderSizeEstimate(insertReq);
if (insertReq.getIncludeQueryStatsMetrics()) {
size += InsertCommandRequest::kIncludeQueryStatsMetricsFieldName.size() + kBoolSize +
kPerElementOverhead;
}
for (auto&& docToInsert : insertReq.getDocuments()) {
size += docToInsert.objsize() + kWriteCommandBSONArrayPerElementOverheadBytes;
}
@ -610,6 +614,7 @@ int getInsertHeaderSizeEstimate(const InsertCommandRequest& insertReq) {
size += InsertCommandRequest::kCommandName.size() + kPerElementOverhead +
insertReq.getNamespace().size() + 1 /* ns string null terminator */;
return size;
}

View File

@ -133,6 +133,18 @@ structs:
is_command_reply: true
chained_structs:
WriteCommandReplyBase: writeCommandReplyBase
fields:
queryStatsMetrics:
description: >-
Metrics for the insert command if requested by the router. Uses
array<QueryStatsMetrics> (the same type as UpdateCommandReply) rather than
optional<CursorMetrics> so that batch_write_exec.cpp can aggregate shard metrics
for both insert and update via the same uniform loop that dispatches on
QueryStatsMetrics::originalOpIndex. An insert command always produces at most one
element (originalOpIndex == 0).
type: array<QueryStatsMetrics>
optional: true
stability: internal
Upserted:
description: "Contains documents that have been upserted."
@ -462,6 +474,10 @@ commands:
type: array<object>
supports_doc_sequence: true
stability: stable
includeQueryStatsMetrics:
description: "Whether to include query stats metrics in the response."
type: optionalBool
stability: internal
# IMPORTANT: If any changes are made to the fields here, please update the corresponding update
# size estimation functions in 'write_ops.cpp'.

View File

@ -1431,6 +1431,12 @@ WriteResult performInserts(
const bool bypassEmptyTsReplacement = (source == OperationSource::kFromMigrate) ||
static_cast<bool>(wholeOp.getBypassEmptyTsReplacement());
// If the router requested insert metrics, mark the CurOp so that end-of-op metrics are
// captured and returned in the response.
if (wholeOp.getIncludeQueryStatsMetrics()) {
curOp.debug().getQueryStatsInfo().metricsRequested = true;
}
// Register query stats once before the batch loop.
// we read from 'preConditions' rather than calling acquireCollection(MODE_IS) to avoid
// lock acquisition on the hot path.

View File

@ -194,6 +194,10 @@ TEST_F(WriteOpsExecTest, TestInsertRequestSizeEstimationLogic) {
wcb.setOriginalCollation(fromjson("{locale: 'fr'}"));
insert.setWriteCommandRequestBase(wcb);
ASSERT(write_ops::verifySizeEstimate(insert));
// includeQueryStatsMetrics
insert.setIncludeQueryStatsMetrics(true);
ASSERT(write_ops::verifySizeEstimate(insert));
}
TEST_F(WriteOpsExecTest, TestUpdateRequestSizeEstimationLogic) {
@ -485,7 +489,6 @@ protected:
OpObserverMock* _opObserverMock;
};
TEST_F(WriteOpsExecOplogTest, VerifySingleInsertOplogDoesntBatch) {
NamespaceString ns =
NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "insertColl");

View File

@ -78,6 +78,7 @@ mongo_cc_library(
"write_cmd_query_stats_registrar.cpp",
],
deps = [
"//src/mongo/db/query/query_stats:insert_key",
"//src/mongo/db/query/query_stats:update_key",
],
)

View File

@ -608,6 +608,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(
return BatchedCommandRequest([&] {
write_ops::InsertCommandRequest insertOp(targeter.getNS());
insertOp.setDocuments(std::move(*insertDocs));
registrar.setIncludeQueryStatsMetricsIfRequestedForInsert(_opCtx, insertOp);
return insertOp;
}());
case BatchedCommandRequest::BatchType_Update: {

View File

@ -3227,5 +3227,42 @@ TEST_F(BatchWriteOpTest,
ASSERT_FALSE(clientResponse.areQueryStatsMetricsSet());
}
TEST_F(BatchWriteOpTest, BuildBatchRequestSetsIncludeQueryStatsMetricsForInsert) {
NamespaceString nss = NamespaceString::createNamespaceString_forTest("foo.bar");
ShardEndpoint endpoint(
ShardId("shard"), ShardVersionFactory::make(ChunkVersion::IGNORED()), boost::none);
auto targeter = initTargeterFullRange(nss, endpoint);
BatchedCommandRequest request([&] {
write_ops::InsertCommandRequest insertOp(nss);
insertOp.setDocuments({BSON("x" << 1)});
return insertOp;
}());
auto targetAndBuild = [&] {
BatchWriteOp batchOp(_opCtx, request);
std::map<ShardId, std::unique_ptr<TargetedWriteBatch>> targeted;
ASSERT_OK(batchOp.targetBatch(targeter, false, &targeted));
ASSERT_EQUALS(targeted.size(), 1u);
return batchOp.buildBatchRequest(*targeted.begin()->second, targeter, boost::none);
};
// No query stats entry registered: includeQueryStatsMetrics is not set.
{
auto built = targetAndBuild();
ASSERT_FALSE(built.getInsertRequest().getIncludeQueryStatsMetrics());
}
// Query stats entry registered at opIndex 0: includeQueryStatsMetrics is set to true.
{
OpDebug::QueryStatsInfo qsi;
qsi.keyHash = 42;
CurOp::get(_opCtx)->debug().setQueryStatsInfoAtOpIndex(0, std::move(qsi));
auto built = targetAndBuild();
ASSERT_TRUE(built.getInsertRequest().getIncludeQueryStatsMetrics());
}
}
} // namespace
} // namespace mongo

View File

@ -379,6 +379,12 @@ BatchedCommandRequest WriteBatchExecutor::buildBatchWriteRequest(
write_ops::InsertCommandRequest insertRequest(targetedNss);
insertRequest.setDocuments(std::move(insertDocs));
// Request shard-side metrics if the router registered an insert query stats key.
// All documents in a single insert command share one query stats entry (opIndex 0).
query_stats::WriteCmdQueryStatsRegistrar registrar;
registrar.setIncludeQueryStatsMetricsIfRequestedForInsert(opCtx, insertRequest);
return std::move(insertRequest);
} else if (batchType == BatchedCommandRequest::BatchType_Update) {
// Copy the UpdateOpEntry from the original command, and then update the "sampleId"

View File

@ -31,12 +31,16 @@
#include "mongo/db/curop.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_shape/insert_cmd_shape.h"
#include "mongo/db/query/query_shape/shape_helpers.h"
#include "mongo/db/query/query_shape/update_cmd_shape.h"
#include "mongo/db/query/query_stats/insert_key.h"
#include "mongo/db/query/query_stats/query_stats.h"
#include "mongo/db/query/query_stats/update_key.h"
#include "mongo/db/query/write_ops/parsed_update.h"
#include "mongo/db/query/write_ops/update_request.h"
#include "mongo/db/query/write_ops/write_ops_gen.h"
namespace mongo::query_stats {
@ -161,6 +165,55 @@ void parseAndRegisterUpdateOp(OperationContext* opCtx,
});
}
/**
* Registers query stats for a single insert command on the mongos (router) side.
* Stores the key in queryStatsInfoByBatchOp at opIndex 0, matching the per-batch-op
* tracking used by the update path.
*/
void parseAndRegisterInsertOp(OperationContext* opCtx,
const write_ops::InsertCommandRequest& insertRequest,
bool skipRegistration) {
if (!feature_flags::gFeatureFlagQueryStatsInsert.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
return;
}
if (opCtx->isCommandForwardedFromRouter()) {
return;
}
if (insertRequest.getEncryptionInformation()) {
return;
}
query_shape::DeferredQueryShape deferredShape{[&]() {
return shape_helpers::tryMakeShape<query_shape::InsertCmdShape>(insertRequest);
}};
// QueryShapeHash(QSH) will be recorded in CurOp, but it is not being used for anything else
// downstream yet until we support inserts in PQS. Using std::ignore to indicate that discarding
// the returned QSH is intended.
std::ignore = CurOp::get(opCtx)->debug().ensureQueryShapeHash(
opCtx, [&]() -> boost::optional<query_shape::QueryShapeHash> {
return shape_helpers::computeQueryShapeHash(opCtx,
deferredShape,
insertRequest.getNamespace(),
true /*skipInternalClientCheck*/);
});
if (skipRegistration) {
return;
}
query_stats::registerWriteRequest(opCtx, insertRequest.getNamespace(), kInsertOpIndex, [&]() {
uassertStatusOKWithContext(deferredShape->getStatus(),
"Failed to compute insert query shape");
return std::make_unique<query_stats::InsertKey>(
opCtx, insertRequest, std::move(deferredShape->getValue()));
});
}
/**
* Helper function to check if a command is an aggregation pipeline. This is useful because a
* pipeline having a $merge stage may call cluster::write() to update documents and unexpectedly
@ -191,11 +244,15 @@ void WriteCmdQueryStatsRegistrar::parseAndRegisterRequest(OperationContext* opCt
return;
}
// Skip as we only support batch update commands now.
// TODO: Remove or update this filter after we support other write commands.
if (!cmdRef.isBatchWriteCommand() ||
cmdRef.getBatchedCommandRequest().getBatchType() !=
BatchedCommandRequest::BatchType_Update) {
if (!cmdRef.isBatchWriteCommand()) {
return;
}
const auto batchType = cmdRef.getBatchedCommandRequest().getBatchType();
// Only insert and update commands are currently supported.
if (batchType != BatchedCommandRequest::BatchType_Insert &&
batchType != BatchedCommandRequest::BatchType_Update) {
return;
}
@ -210,25 +267,40 @@ void WriteCmdQueryStatsRegistrar::parseAndRegisterRequest(OperationContext* opCt
// Initializes the map to indicate that we have already processed the command.
opDebug.ensureQueryStatsInfoForBatchWrites();
size_t nOps = cmdRef.getNumOps();
for (size_t opIndex = 0; opIndex < nOps; opIndex++) {
const auto& updateOp = cmdRef.getOp(opIndex).getUpdateOp();
if (batchType == BatchedCommandRequest::BatchType_Insert) {
const auto& insertRequest = cmdRef.getBatchedCommandRequest().getInsertRequest();
if (opCtx->isCommandForwardedFromRouter()) {
// Create QueryStatsInfo if the 'updateOp' is requested for the metrics.
if (auto requestedOpIndex = updateOp.getIncludeQueryStatsMetricsForOpIndex();
requestedOpIndex) {
opDebug.setQueryStatsInfoAtOpIndex(*requestedOpIndex, {});
// For the embedded-router case, create QueryStatsInfo at the requested opIndex so
// that aggregateQueryStatsMetrics can store the shard metrics.
if (insertRequest.getIncludeQueryStatsMetrics()) {
opDebug.setQueryStatsInfoAtOpIndex(kInsertOpIndex, {});
}
} else {
parseAndRegisterUpdateOp(opCtx, updateOp.getNss(), opIndex, updateOp, skipRegistration);
parseAndRegisterInsertOp(opCtx, insertRequest, skipRegistration);
}
}
} else if (batchType == BatchedCommandRequest::BatchType_Update) {
size_t nOps = cmdRef.getNumOps();
// If we are collecting query stats for any of the ops, record the batch size now.
opDebug.forEachQueryStatsInfoForBatchWrites(
[nOps](size_t opIndex, OpDebug::QueryStatsInfo& info) {
info.additiveMetrics.nUpdateOps = nOps;
});
for (size_t opIndex = 0; opIndex < nOps; opIndex++) {
const auto& updateOp = cmdRef.getOp(opIndex).getUpdateOp();
if (opCtx->isCommandForwardedFromRouter()) {
// Create QueryStatsInfo if the 'updateOp' is requested for the metrics.
if (auto requestedOpIndex = updateOp.getIncludeQueryStatsMetricsForOpIndex();
requestedOpIndex) {
opDebug.setQueryStatsInfoAtOpIndex(*requestedOpIndex, {});
}
} else {
parseAndRegisterUpdateOp(
opCtx, updateOp.getNss(), opIndex, updateOp, skipRegistration);
}
}
// If we are collecting query stats for any of the ops, record the batch size now.
opDebug.forEachQueryStatsInfoForBatchWrites(
[nOps](size_t opIndex, OpDebug::QueryStatsInfo& info) {
info.additiveMetrics.nUpdateOps = nOps;
});
}
}
void WriteCmdQueryStatsRegistrar::setIncludeQueryStatsMetricsIfRequested(
@ -256,4 +328,26 @@ void WriteCmdQueryStatsRegistrar::setIncludeQueryStatsMetricsIfRequested(
}
}
void WriteCmdQueryStatsRegistrar::setIncludeQueryStatsMetricsIfRequestedForInsert(
OperationContext* opCtx, write_ops::InsertCommandRequest& insertRequest) {
CurOp* curOp = CurOp::get(opCtx);
if (isAggregationPipeline(curOp)) {
return;
}
if (opCtx->isCommandForwardedFromRouter()) {
return;
}
if (query_stats::shouldRequestRemoteMetrics(curOp->debug(), kInsertOpIndex) &&
_numOpsWithMetricsRequested < kMaxBatchOpsMetricsRequested) {
insertRequest.setIncludeQueryStatsMetrics(true);
_numOpsWithMetricsRequested++;
} else {
// Explicitly unset it to stop propagating the field and ignore it in case the field is
// set from user.
insertRequest.setIncludeQueryStatsMetrics(OptionalBool{});
}
}
} // namespace mongo::query_stats

View File

@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/curop.h"
#include "mongo/db/query/write_ops/write_ops_gen.h"
#include "mongo/s/write_ops/write_command_ref.h"
#include "mongo/util/modules.h"
@ -37,6 +38,9 @@
namespace mongo::query_stats {
// An insert command always has exactly one operation; this is its fixed QueryStatsMetrics index.
inline constexpr size_t kInsertOpIndex = 0;
class WriteCmdQueryStatsRegistrar {
public:
/**
@ -67,6 +71,13 @@ public:
int opIndex,
write_ops::UpdateOpEntry& updateOpEntry);
/**
* Sets includeQueryStatsMetrics in 'insertRequest' to request shard-side execution metrics
* when the router has registered the insert for query stats collection.
*/
void setIncludeQueryStatsMetricsIfRequestedForInsert(
OperationContext* opCtx, write_ops::InsertCommandRequest& insertRequest);
private:
size_t _numOpsWithMetricsRequested = 0;
};

View File

@ -31,6 +31,7 @@
#include "mongo/s/write_ops/write_cmd_query_stats_registrar.h"
#include "mongo/bson/json.h"
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/query_stats/mock_key.h"
@ -451,6 +452,64 @@ TEST_F(WriteCmdQueryStatsRegistrarTest, SetIncludeQueryStatsMetricsIfRequestedTe
}
}
TEST_F(WriteCmdQueryStatsRegistrarTest, SetIncludeQueryStatsMetricsIfRequestedForInsertTest) {
const NamespaceString nss = NamespaceString::createNamespaceString_forTest("test", "coll");
auto makeInsertRequest = [&]() {
write_ops::InsertCommandRequest insertReq(nss);
insertReq.setDocuments({BSON("_id" << 1)});
return insertReq;
};
auto registerMockKey = [&](OperationContext* ctx) {
OpDebug::QueryStatsInfo qsi;
qsi.key = std::make_unique<query_stats::MockKey>(ctx);
qsi.keyHash = 42;
CurOp::get(ctx)->debug().setQueryStatsInfoAtOpIndex(0 /* kInsertOpIndex */, std::move(qsi));
};
// Runs 'fn' with a fresh opCtx on a dedicated client. AlternativeClientRegion stays alive
// for the duration of the call, ensuring the client swap is active throughout 'fn'.
auto withFreshOpCtx = [&](std::string clientName, auto fn) {
auto client = getServiceContext()->getService()->makeClient(clientName);
AlternativeClientRegion acr(client);
auto opCtxHolder = cc().makeOperationContext();
fn(opCtxHolder.get());
};
// No query stats entry registered: the flag is explicitly cleared, even if the client pre-set
// it.
withFreshOpCtx("no-key-client", [&](OperationContext* localOpCtx) {
WriteCmdQueryStatsRegistrar queryStatsRegistrar;
auto insertReq = makeInsertRequest();
insertReq.setIncludeQueryStatsMetrics(true);
queryStatsRegistrar.setIncludeQueryStatsMetricsIfRequestedForInsert(localOpCtx, insertReq);
ASSERT_FALSE(insertReq.getIncludeQueryStatsMetrics());
});
// Query stats entry registered: the flag is set to true.
withFreshOpCtx("key-registered-client", [&](OperationContext* localOpCtx) {
registerMockKey(localOpCtx);
WriteCmdQueryStatsRegistrar queryStatsRegistrar;
auto insertReq = makeInsertRequest();
ASSERT_FALSE(insertReq.getIncludeQueryStatsMetrics());
queryStatsRegistrar.setIncludeQueryStatsMetricsIfRequestedForInsert(localOpCtx, insertReq);
ASSERT_TRUE(insertReq.getIncludeQueryStatsMetrics());
});
// Command forwarded from router: function returns early without modifying the request.
withFreshOpCtx("forwarded-from-router-client", [&](OperationContext* localOpCtx) {
localOpCtx->setCommandForwardedFromRouter();
registerMockKey(localOpCtx);
WriteCmdQueryStatsRegistrar queryStatsRegistrar;
auto insertReq = makeInsertRequest();
insertReq.setIncludeQueryStatsMetrics(true);
queryStatsRegistrar.setIncludeQueryStatsMetricsIfRequestedForInsert(localOpCtx, insertReq);
// Pre-set value is left untouched since the function returns early.
ASSERT_TRUE(insertReq.getIncludeQueryStatsMetrics());
});
}
TEST_F(WriteCmdQueryStatsRegistrarTest, PassthroughMetricsOpIndexForCoordinateMultiUpdateTest) {
// Setting this to simulate the scenario that a primary shard receives a dispatched update
// command (_shardsvrCoordinateMultiUpdate) from the router and it has to act like a router.