SERVER-104596 Track memory in Or stage (#38393)

GitOrigin-RevId: 5da6175b83aa31c5bd70330769fe70e8abf8a431
This commit is contained in:
Christopher M. Wolff 2025-07-15 17:11:09 -07:00 committed by MongoDB Bot
parent a8229105e5
commit 5ac113adb1
17 changed files with 341 additions and 30 deletions

View File

@ -15,6 +15,8 @@
* ]
*/
import {runMemoryStatsTest} from "jstests/libs/query/memory_tracking_utils.js";
const stageName = "$bucketAuto";
const collName = jsTestName();
const coll = db[collName];
db[collName].drop();
@ -51,7 +53,7 @@ assert.commandWorked(coll.insertMany(docs));
comment: "memory stats bucketAuto test",
allowDiskUse: false,
},
stageName: "bucketAuto",
stageName,
expectedNumGetMores: 5,
});
}
@ -83,7 +85,7 @@ assert.commandWorked(coll.insertMany(docs));
comment: "memory stats bucketAuto with limit test",
allowDiskUse: false
},
stageName: "bucketAuto",
stageName,
expectedNumGetMores: 1,
skipInUseMemBytesCheck: true, // $limit will force execution to stop early, so
// inUseMemBytes may not appear in CurOp.
@ -125,7 +127,7 @@ assert.commandWorked(coll.insertMany(docs));
comment: "memory stats bucketAuto with spilling test",
allowDiskUse: true
},
stageName: "bucketAuto",
stageName,
expectedNumGetMores: 5
});

View File

@ -49,6 +49,6 @@ runMemoryStatsTest({
allowDiskUse: false,
cursor: {batchSize: 1}
},
stageName: "graphLookup",
stageName: "$graphLookup",
expectedNumGetMores: 8
});

View File

@ -15,6 +15,7 @@
* ]
*/
import {runMemoryStatsTest} from "jstests/libs/query/memory_tracking_utils.js";
const collName = jsTestName();
const coll = db[collName];
db[collName].drop();
@ -45,7 +46,7 @@ runMemoryStatsTest({
cursor: {batchSize: 1},
allowDiskUse: false
},
stageName: "group",
stageName: "$group",
expectedNumGetMores: 2
});

View File

@ -0,0 +1,60 @@
/**
* Tests that, when the memory tracking feature flag is enabled, memory tracking statistics are
* reported to the slow query log, system.profile, and explain("executionStats") for queries that
* use OrStage for record deduplication.
*
* @tags: [
* requires_profiling,
* requires_getmore,
* # The test queries the system.profile collection so it is not compatible with initial sync
* # since an initial sync may insert unexpected operations into the profile collection.
* queries_system_profile_collection,
* # The test runs the profile and getLog commands, which are not supported in Serverless.
* command_not_supported_in_serverless,
* requires_fcv_82,
* ]
*/
import {runMemoryStatsTest} from "jstests/libs/query/memory_tracking_utils.js";
import {checkSbeFullyEnabled} from "jstests/libs/query/sbe_util.js";
if (checkSbeFullyEnabled(db)) {
// This test is specifically for the classic "or" stage, so don't run the test if the stage
// might be executed in SBE
jsTestLog("Skipping test for classic 'or' stage when SBE is fully enabled.");
quit();
}
const collName = jsTestName();
const coll = db[collName];
db[collName].drop();
for (let i = 0; i < 10; ++i) {
for (let j = 0; j < 10; ++j) {
assert.commandWorked(coll.insertOne({_id: i * 10 + j, a: i, b: j % 2}));
}
}
assert.commandWorked(coll.createIndex({a: 1}));
assert.commandWorked(coll.createIndex({b: 1}));
// Create a query that can be solved by two index scans that need to be ORed together.
const pipeline = [
{$match: {$or: [{a: 5}, {b: 1}]}},
];
runMemoryStatsTest({
db,
collName,
commandObj: {
aggregate: collName,
pipeline,
comment: "memory stats Or stage test",
allowDiskUse: false,
cursor: {batchSize: 15}
},
stageName: "OR",
expectedNumGetMores: 3,
// This stage does not release memory on EOF.
checkInUseMemBytesResets: false,
});

View File

@ -71,7 +71,7 @@ assert.commandWorked(coll.insertMany(docs));
cursor: {batchSize: 1},
allowDiskUse: false
},
stageName: "_internalSetWindowFields",
stageName: "$_internalSetWindowFields",
expectedNumGetMores: 10,
checkInUseMemBytesResets: false
});

View File

@ -17,6 +17,7 @@
*/
import {runMemoryStatsTest} from "jstests/libs/query/memory_tracking_utils.js";
// We are testing SBE sort here, so the stage appears in explain output without the dollar sign.
const collName = jsTestName();
const coll = db[collName];
db[collName].drop();
@ -41,6 +42,7 @@ for (let i = 1; i <= nDocs; i++) {
}
assert.commandWorked(bulk.execute());
const stageName = "sort";
const pipeline = [{$sort: {_id: 1, b: -1}}];
const pipelineWithLimit = [{$sort: {_id: 1, b: -1}}, {$limit: nDocs / 10}];
{
@ -54,7 +56,7 @@ const pipelineWithLimit = [{$sort: {_id: 1, b: -1}}, {$limit: nDocs / 10}];
comment: "memory stats sort test",
allowDiskUse: false
},
stageName: "sort",
stageName,
expectedNumGetMores: 5,
});
}
@ -70,7 +72,7 @@ const pipelineWithLimit = [{$sort: {_id: 1, b: -1}}, {$limit: nDocs / 10}];
comment: "memory stats sort limit test",
allowDiskUse: false
},
stageName: "sort",
stageName,
expectedNumGetMores: 5,
});
}
@ -89,7 +91,7 @@ const pipelineWithLimit = [{$sort: {_id: 1, b: -1}}, {$limit: nDocs / 10}];
comment: "memory stats sort spilling test",
allowDiskUse: true
},
stageName: "sort",
stageName,
expectedNumGetMores: 5,
// Since we spill to disk when adding to the sorter, we don't expect to see inUseMemBytes
// populated as it should be 0 on each operation.

View File

@ -249,10 +249,9 @@ function verifyExplainMetrics({db, collName, pipeline, stageName, featureFlagEna
// If a query uses sbe, the explain version will be 2.
const isSbeExplain = explainRes.explainVersion === "2";
const stageKey = isSbeExplain ? stageName : '$' + stageName;
function getStagesFromExplain(explainRes, stageKey) {
let stages = getAggPlanStages(explainRes, stageKey);
function getStagesFromExplain(explainRes, stageName) {
let stages = getAggPlanStages(explainRes, stageName);
// Even if SBE is enabled, there are some stages that are not supported in SBE and will
// still run on classic. We should also check for the classic pipeline stage name.
if (isSbeExplain && stages.length == 0) {
@ -261,29 +260,29 @@ function verifyExplainMetrics({db, collName, pipeline, stageName, featureFlagEna
assert.eq(stages.length,
numStages,
" Found " + stages.length + " but expected to find " + numStages + " " +
stageKey + " stages " +
stageName + " stages " +
"in explain: " + tojson(explainRes));
return stages;
}
function assertNoMemoryMetricsInStages(explainRes, stageKey) {
let stages = getStagesFromExplain(explainRes, stageKey);
function assertNoMemoryMetricsInStages(explainRes, stageName) {
let stages = getStagesFromExplain(explainRes, stageName);
for (let stage of stages) {
assert(!stage.hasOwnProperty("maxUsedMemBytes"),
`Unexpected maxUsedMemBytes in ${stageKey} stage: ` + tojson(explainRes));
`Unexpected maxUsedMemBytes in ${stageName} stage: ` + tojson(explainRes));
}
}
function assertHasMemoryMetricsInStages(explainRes, stageKey) {
let stages = getStagesFromExplain(explainRes, stageKey);
function assertHasMemoryMetricsInStages(explainRes, stageName) {
let stages = getStagesFromExplain(explainRes, stageName);
for (let stage of stages) {
assert(stage.hasOwnProperty("maxUsedMemBytes"),
`Expected maxUsedMemBytes in ${stageKey} stage: ` + tojson(explainRes));
`Expected maxUsedMemBytes in ${stageName} stage: ` + tojson(explainRes));
// TODO SERVER-106000 Remove explicit check for $_internalSetWindowFields.
if (stageKey != "$_internalSetWindowFields") {
if (stageName != "$_internalSetWindowFields") {
assert.gt(stage.maxUsedMemBytes,
0,
`Expected maxUsedMemBytes to be positive in ${stageKey} stage: ` +
`Expected maxUsedMemBytes to be positive in ${stageName} stage: ` +
tojson(explainRes));
}
}
@ -297,7 +296,7 @@ function verifyExplainMetrics({db, collName, pipeline, stageName, featureFlagEna
// Memory usage metrics do not appear in the stage's statistics. Verify that the stage
// exists in the explain output.
assertNoMemoryMetricsInStages(explainRes, stageKey);
assertNoMemoryMetricsInStages(explainRes, stageName);
return;
}
@ -314,7 +313,7 @@ function verifyExplainMetrics({db, collName, pipeline, stageName, featureFlagEna
}
// Memory usage metrics appear within the stage's statistics.
assertHasMemoryMetricsInStages(explainRes, stageKey);
assertHasMemoryMetricsInStages(explainRes, stageName);
jsTestLog(
"Test that memory usage metrics do not appear in the explain output when the verbosity is lower than executionStats.");
@ -322,9 +321,9 @@ function verifyExplainMetrics({db, collName, pipeline, stageName, featureFlagEna
assert(!explainQueryPlannerRes.hasOwnProperty("maxUsedMemBytes"),
"Unexpected maxUsedMemBytes in explain: " + tojson(explainQueryPlannerRes));
// SBE stage metrics aren't outputted in queryPlanner explain, so checking
// the stageKey may result in no stages.
// the stageName may result in no stages.
if (!isSbeExplain) {
assertNoMemoryMetricsInStages(explainQueryPlannerRes, stageKey);
assertNoMemoryMetricsInStages(explainQueryPlannerRes, stageName);
}
}

View File

@ -54,7 +54,7 @@ runShardedMemoryStatsTest({
allowDiskUse: false,
},
pipelineComment: "sharded memory stats bucketAuto test",
stageName: "bucketAuto",
stageName: "$bucketAuto",
expectedNumGetMores: 5,
numShards: 2,
skipExplain: true // $bucketAuto will execute on the merging part of the pipeline and will not

View File

@ -57,7 +57,7 @@ runShardedMemoryStatsTest({
allowDiskUse: false,
},
pipelineComment: "sharded memory stats graphLookup test",
stageName: "graphLookup",
stageName: "$graphLookup",
expectedNumGetMores: 3,
numShards: 2,
skipExplain: true, // graphLookup will execute on the merging part of the pipeline and will not

View File

@ -42,7 +42,7 @@ runShardedMemoryStatsTest({
cursor: {batchSize: 1},
allowDiskUse: false
},
stageName: "group",
stageName: "$group",
expectedNumGetMores: 2,
numShards: 2
});

View File

@ -52,7 +52,7 @@ runShardedMemoryStatsTest({
allowDiskUse: false,
},
pipelineComment: "sharded memory stats setWindowFields test",
stageName: "_internalSetWindowFields",
stageName: "$_internalSetWindowFields",
expectedNumGetMores: 2,
// We don't expect any explain stages on the shards because $setWindowFields is always run on
// the merging node - so we skip explain.

View File

@ -191,6 +191,7 @@ mongo_cc_unit_test(
"exclusion_projection_executor_test.cpp",
"find_projection_executor_test.cpp",
"inclusion_projection_executor_test.cpp",
"or_test.cpp",
"orphan_chunk_skipper_test.cpp",
"projection_executor_builder_test.cpp",
"projection_executor_redaction_test.cpp",

View File

@ -31,6 +31,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/exec/filter.h"
#include "mongo/db/memory_tracking/operation_memory_usage_tracker.h"
#include <iterator>
#include <memory>
@ -55,7 +56,9 @@ OrStage::OrStage(ExpressionContext* expCtx,
_filter(filter),
_currentChild(0),
_dedup(dedup),
_recordIdDeduplicator(expCtx) {}
_recordIdDeduplicator(expCtx),
_memoryTracker(OperationMemoryUsageTracker::createSimpleMemoryUsageTrackerForStage(*expCtx)) {
}
void OrStage::addChild(std::unique_ptr<PlanStage> child) {
_children.emplace_back(std::move(child));
@ -84,6 +87,8 @@ PlanStage::StageState OrStage::doWork(WorkingSetID* out) {
// If we're deduping (and there's something to dedup by)
if (_dedup && member->hasRecordId()) {
uint64_t dedupBytesPrev = _recordIdDeduplicator.getApproximateSize();
++_specificStats.dupsTested;
// ...and we've seen the RecordId before
@ -92,6 +97,10 @@ PlanStage::StageState OrStage::doWork(WorkingSetID* out) {
++_specificStats.dupsDropped;
_ws->free(id);
return PlanStage::NEED_TIME;
} else {
uint64_t dedupBytes = _recordIdDeduplicator.getApproximateSize();
_memoryTracker.add(dedupBytes - dedupBytesPrev);
_specificStats.maxUsedMemBytes = _memoryTracker.maxMemoryBytes();
}
}

View File

@ -67,6 +67,10 @@ public:
const SpecificStats* getSpecificStats() const final;
const SimpleMemoryUsageTracker& getMemoryTracker_forTest() {
return _memoryTracker;
}
static const char* kStageType;
private:
@ -87,6 +91,9 @@ private:
// Stats
OrStats _specificStats;
// Track memory used by this stage for deduplicating.
SimpleMemoryUsageTracker _memoryTracker;
};
} // namespace mongo

View File

@ -0,0 +1,225 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/exec/or.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/unittest/assert.h"
#include "mongo/unittest/framework.h"
#include <algorithm>
#include <memory>
#include <vector>
namespace mongo {
namespace {
static const NamespaceString kNss = NamespaceString::createNamespaceString_forTest("db.dummy");
class OrTest : public ServiceContextMongoDTest {
public:
OrTest()
: _opCtx{makeOperationContext()},
_expCtx{ExpressionContextBuilder{}.opCtx(_opCtx.get()).ns(kNss).build()},
_ws{} {}
/**
* Helper to create a QueuedDataStage with the given documents.
*/
std::unique_ptr<QueuedDataStage> createQueuedDataStage(const std::vector<BSONObj>& documents) {
auto queuedStage = std::make_unique<QueuedDataStage>(_expCtx.get(), &_ws);
// Add documents to the queued stage
for (const auto& doc : documents) {
auto rid = doc["_id"]._numberInt();
WorkingSetID wsid = _ws.allocate();
WorkingSetMember* member = _ws.get(wsid);
member->doc = {SnapshotId(), Document{doc}};
member->recordId = RecordId{rid};
member->transitionToRecordIdAndObj();
queuedStage->pushBack(wsid);
}
return queuedStage;
}
/**
* Execute the OrStage and collect all results.
*/
std::vector<BSONObj> executeOrStage(OrStage& orStage, bool expectMemoryUsage) {
std::vector<BSONObj> results;
WorkingSetID wsid = WorkingSet::INVALID_ID;
uint64_t peakMemoryBytes = 0;
PlanStage::StageState state = PlanStage::NEED_TIME;
while (state != PlanStage::IS_EOF) {
state = orStage.work(&wsid);
if (state == PlanStage::ADVANCED) {
const auto& tracker = orStage.getMemoryTracker_forTest();
uint64_t currentMemoryBytes = tracker.currentMemoryBytes();
if (expectMemoryUsage) {
peakMemoryBytes = std::max(currentMemoryBytes, peakMemoryBytes);
// If we are deduping and we have processed a record, there should be non-zero
// memory usage.
ASSERT_GT(currentMemoryBytes, 0);
ASSERT_GTE(tracker.maxMemoryBytes(), peakMemoryBytes);
} else {
ASSERT_EQ(0, currentMemoryBytes);
}
WorkingSetMember* member = _ws.get(wsid);
ASSERT_TRUE(member->hasObj());
results.push_back(member->doc.value().toBson());
}
}
return results;
}
protected:
ServiceContext::UniqueOperationContext _opCtx;
boost::intrusive_ptr<ExpressionContext> _expCtx;
WorkingSet _ws;
};
// Test basic OrStage functionality with no children.
TEST_F(OrTest, EmptyOrStageReturnsEOF) {
const bool dedup = false;
const MatchExpression* matchExpr = nullptr;
OrStage orStage{_expCtx.get(), &_ws, dedup, matchExpr};
// Should be EOF immediately since no children
ASSERT_TRUE(orStage.isEOF());
WorkingSetID wsid = WorkingSet::INVALID_ID;
ASSERT_EQUALS(orStage.work(&wsid), PlanStage::IS_EOF);
}
// Test OrStage with one child stage.
TEST_F(OrTest, OrStageWithOneChildReturnsData) {
const bool dedup = false;
const MatchExpression* matchExpr = nullptr;
OrStage orStage(_expCtx.get(), &_ws, dedup, matchExpr);
// Create test documents.
std::vector<BSONObj> childDocs = {BSON("_id" << 1 << "x" << 10),
BSON("_id" << 2 << "x" << 20),
BSON("_id" << 3 << "x" << 30)};
// Add a child stage.
auto childStage = createQueuedDataStage(childDocs);
orStage.addChild(std::move(childStage));
// Execute and verify results.
const bool expectMemoryUsage = false;
std::vector<BSONObj> results = executeOrStage(orStage, expectMemoryUsage);
ASSERT_EQUALS(results.size(), childDocs.size());
for (size_t i = 0; i < results.size(); ++i) {
ASSERT_BSONOBJ_EQ(results[i], childDocs[i]);
}
}
// Test OrStage with multiple child stages.
TEST_F(OrTest, OrStageWithMultipleChildrenReturnsUnion) {
const bool dedup = false;
const MatchExpression* matchExpr = nullptr;
OrStage orStage{_expCtx.get(), &_ws, dedup, matchExpr};
std::vector<BSONObj> child1Docs = {BSON("_id" << 1 << "x" << 10),
BSON("_id" << 2 << "x" << 20)};
std::vector<BSONObj> child2Docs = {BSON("_id" << 3 << "x" << 30),
BSON("_id" << 4 << "x" << 40)};
auto childStage1 = createQueuedDataStage(child1Docs);
auto childStage2 = createQueuedDataStage(child2Docs);
orStage.addChild(std::move(childStage1));
orStage.addChild(std::move(childStage2));
const bool expectMemoryUsage = false;
std::vector<BSONObj> results = executeOrStage(orStage, expectMemoryUsage);
// OrStage should return all results from both children.
std::vector<BSONObj> expectedDocs = {BSON("_id" << 1 << "x" << 10),
BSON("_id" << 2 << "x" << 20),
BSON("_id" << 3 << "x" << 30),
BSON("_id" << 4 << "x" << 40)};
ASSERT_EQ(results.size(), expectedDocs.size());
for (size_t i = 0; i < results.size(); ++i) {
ASSERT_BSONOBJ_EQ(results[i], expectedDocs[i]);
}
}
// Test OrStage with multiple child stages and deduplication.
TEST_F(OrTest, OrStageWithMultipleChildrenDedupes) {
const bool dedup = true;
const MatchExpression* matchExpr = nullptr;
OrStage orStage{_expCtx.get(), &_ws, dedup, matchExpr};
std::vector<BSONObj> child1Docs = {BSON("_id" << 1 << "x" << 10),
BSON("_id" << 2 << "x" << 20),
BSON("_id" << 3 << "x" << 30)};
std::vector<BSONObj> child2Docs = {BSON("_id" << 2 << "x" << 20),
BSON("_id" << 3 << "x" << 30),
BSON("_id" << 4 << "x" << 40)};
auto childStage1 = createQueuedDataStage(child1Docs);
auto childStage2 = createQueuedDataStage(child2Docs);
orStage.addChild(std::move(childStage1));
orStage.addChild(std::move(childStage2));
const bool expectMemoryUsage = true;
std::vector<BSONObj> results = executeOrStage(orStage, expectMemoryUsage);
// OrStage should return all results from both children, with duplicates excluded.
std::vector<BSONObj> expectedDocs = {BSON("_id" << 1 << "x" << 10),
BSON("_id" << 2 << "x" << 20),
BSON("_id" << 3 << "x" << 30),
BSON("_id" << 4 << "x" << 40)};
ASSERT_EQ(results.size(), expectedDocs.size());
for (size_t i = 0; i < results.size(); ++i) {
ASSERT_BSONOBJ_EQ(results[i], expectedDocs[i]);
}
}
} // namespace
} // namespace mongo

View File

@ -770,6 +770,8 @@ struct OrStats : public SpecificStats {
size_t dupsTested = 0u;
size_t dupsDropped = 0u;
uint64_t maxUsedMemBytes = 0;
};
struct ProjectionStats : public SpecificStats {

View File

@ -494,6 +494,9 @@ void statsToBSON(const stage_builder::PlanStageToQsnMap& planStageQsnMap,
if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
bob->appendNumber("dupsTested", static_cast<long long>(spec->dupsTested));
bob->appendNumber("dupsDropped", static_cast<long long>(spec->dupsDropped));
if (feature_flags::gFeatureFlagQueryMemoryTracking.isEnabled()) {
bob->appendNumber("maxUsedMemBytes", static_cast<long long>(spec->maxUsedMemBytes));
}
}
} else if (STAGE_LIMIT == stats.stageType) {
LimitStats* spec = static_cast<LimitStats*>(stats.specific.get());