SERVER-121846 Do not quit when one plan has mergeCursors (#50441)

GitOrigin-RevId: 1e0bb4b59ccc08fef067c820ab0c3cec67eae8be
This commit is contained in:
fotiniAlvanaki 2026-03-26 14:40:42 +00:00 committed by MongoDB Bot
parent fa8abfc720
commit 3c30c7c8c7
2 changed files with 208 additions and 174 deletions

View File

@ -3,21 +3,23 @@
*
* @tags: [
* requires_fcv_82,
* # We modify the value of a query knob. setParameter is not persistent.
* # We use explain and we modify the value of a query knob using setParameter.
* does_not_support_stepdowns,
* does_not_support_transactions,
* # This test runs commands that are not allowed with security token: setParameter.
* not_allowed_with_signed_security_token,
* requires_getmore,
* uses_getmore_outside_of_transaction,
* # This test sets a server parameter via setParameterOnAllNonConfigNodes. To keep the
* # host list consistent, no add/remove shard operations should occur during the test
* # and the read preference should not change.
* assumes_stable_shard_list,
* assumes_read_preference_unchanged,
* does_not_support_transactions,
* # releaseMemory needs special permission
* assumes_superuser_permissions,
* # This test relies on aggregations returning specific batch-sized responses.
* assumes_no_implicit_cursor_exhaustion,
* # This test sets a server parameter via setParameterOnAllNonConfigNodes. To keep the host list
* # consistent, no add/remove shard operations should occur during the test.
* assumes_stable_shard_list,
* # This test holds open cursors across releaseMemory and getMore calls. Random sharding
* # metadata clears can invalidate those cursors mid-flight with StaleConfig errors.
* incompatible_with_random_sharding_metadata_clears,
* ]
*/
@ -61,10 +63,11 @@ setServerParameter(classicIncreasedSpillingKnob, false);
// For _internalInhibitOptimization tests, prevent DSCursor from reading all the data in a single
// batch.
const dsCursorKnobs = ["internalDocumentSourceCursorInitialBatchSize", "internalDocumentSourceCursorBatchSizeBytes"];
const dsCursorKnobNewValues = [1, 1024];
const dsCursorKnobValues = [];
for (const knob of dsCursorKnobs) {
dsCursorKnobValues.push(getServerParameter(knob));
setServerParameter(knob, 1);
for (let i = 0; i < dsCursorKnobs.length; i++) {
dsCursorKnobValues.push(getServerParameter(dsCursorKnobs[i]));
setServerParameter(dsCursorKnobs[i], dsCursorKnobNewValues[i]);
}
const collFew = db[jsTestName() + "_few"];
@ -126,188 +129,217 @@ const pipelines = [
groupPipeline.concat({$_internalInhibitOptimization: {}}), // Prevents the pipeline from being eliminated.
];
for (const pipeline of pipelines) {
for (const coll of [collFew, collMany]) {
const collections = [collFew, collMany];
// Get explain and the expected results to use later for verification.
// We want to get the explain before running the tests to make sure that
// the explain is not affected by the releaseMemory command, which might
// cause additional spilling.
const pipelineData = [];
for (let pipelineIndex = 0; pipelineIndex < pipelines.length; pipelineIndex++) {
const pipeline = pipelines[pipelineIndex];
const dataForPipeline = [];
for (let collIndex = 0; collIndex < collections.length; collIndex++) {
const coll = collections[collIndex];
const explain = coll.explain().aggregate(pipeline);
// Get all the results to use as a reference. Set 'allowDiskUse' to false to disable
// increased spilling in debug builds.
// Get all the results to use as a reference. Set 'allowDiskUse' to false to disable spilling.
const expectedResults = coll.aggregate(pipeline, {"allowDiskUse": false}).toArray();
const expectedResultsCount = expectedResults.length;
dataForPipeline[collIndex] = {
explain,
expectedResults,
};
}
pipelineData[pipelineIndex] = dataForPipeline;
}
// To minimise the setServerParameter calls that overload the config server,
// we run every test case for all pipelines and collections together, then
// move on to the next test case.
// No disk space available for spilling.
jsTest.log.info(`Running \`releaseMemory with no disk space available\` test case`);
setAvailableDiskSpaceMode(db.getSiblingDB("admin"), "alwaysOn");
for (let pipelineIndex = 0; pipelineIndex < pipelines.length; pipelineIndex++) {
const pipeline = pipelines[pipelineIndex];
for (let collIndex = 0; collIndex < collections.length; collIndex++) {
const coll = collections[collIndex];
const {explain, expectedResults} = pipelineData[pipelineIndex][collIndex];
const willNotSpill =
isTimeSeriesCollection(db, coll.getName()) && expectedResults.length <= 128 && getEngine(explain) === "sbe";
if (willNotSpill) {
continue;
}
jsTest.log.info(`Testing collection ${coll.getName()} on pipeline: ${tojson(pipeline)}`);
const cursor = coll.aggregate(pipeline, {"allowDiskUse": true, cursor: {batchSize: 1}});
const cursorId = cursor.getId();
{
jsTest.log.info(`Running no spill in first batch`);
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assertReleaseMemoryFailedWithCode(releaseMemoryRes, cursorId, ErrorCodes.OutOfDiskSpace);
setServerParameter(sbeMemorySizeKnob, 100 * 1024 * 1024);
setServerParameter(classicMemorySizeKnob, 100 * 1024 * 1024);
jsTest.log.info("Running getMore");
assert.throwsWithCode(() => cursor.toArray(), ErrorCodes.CursorNotFound);
}
}
setAvailableDiskSpaceMode(db.getSiblingDB("admin"), "off");
runReleaseMemoryTestWithRetries(() => {
let initialSpillCount = getSpillCounter();
const cursor = coll.aggregate(pipeline, {"allowDiskUse": true, cursor: {batchSize: 1}});
const cursorId = cursor.getId();
const newSpillCount = getSpillCounter();
assert.eq(newSpillCount, initialSpillCount);
initialSpillCount = newSpillCount;
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assert.eq(releaseMemoryRes.cursorsReleased, [cursorId], releaseMemoryRes);
// In a timeseries collection, in sbe, all records are consumed in the first batch (up
// to kBlockOutSize = 128) and there is nothing to spill.
const willNotSpill =
isTimeSeriesCollection(db, coll.getName()) &&
expectedResults.length <= 128 &&
getEngine(explain) === "sbe";
if (willNotSpill) {
assert.eq(initialSpillCount, getSpillCounter());
} else {
assert.lt(initialSpillCount, getSpillCounter());
}
jsTest.log.info("Running getMore");
const results = cursor.toArray();
assertArrayEq({actual: results, expected: expectedResults});
});
setServerParameter(sbeMemorySizeKnob, sbeMemorySizeInitialValue);
setServerParameter(classicMemorySizeKnob, classicMemorySizeInitialValue);
}
// Run query with increased spilling to spill while creating the first batch.
{
jsTest.log.info(`Running spill in first batch`);
setServerParameter(sbeMemorySizeKnob, 1);
setServerParameter(classicMemorySizeKnob, 1);
runReleaseMemoryTestWithRetries(() => {
let initialSpillCount = getSpillCounter();
const cursor = coll.aggregate(pipeline, {allowDiskUse: true, cursor: {batchSize: 1}});
const cursorId = cursor.getId();
const newSpillCount = getSpillCounter();
assert.lt(initialSpillCount, newSpillCount);
initialSpillCount = newSpillCount;
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assert.eq(releaseMemoryRes.cursorsReleased, [cursorId], releaseMemoryRes);
assert.eq(initialSpillCount, newSpillCount);
jsTest.log.info("Running getMore");
const results = cursor.toArray();
assertArrayEq({actual: results, expected: expectedResults});
});
setServerParameter(sbeMemorySizeKnob, sbeMemorySizeInitialValue);
setServerParameter(classicMemorySizeKnob, classicMemorySizeInitialValue);
}
// Return all results in the first batch.
{
jsTest.log.info(`Return all results in the first batch`);
setServerParameter(sbeMemorySizeKnob, 100 * 1024 * 1024);
setServerParameter(classicMemorySizeKnob, 100 * 1024 * 1024);
runReleaseMemoryTestWithRetries(() => {
let initialSpillCount = getSpillCounter();
const cursor = coll.aggregate(pipeline, {
"allowDiskUse": true,
cursor: {batchSize: expectedResultsCount},
});
const cursorId = cursor.getId();
const newSpillCount = getSpillCounter();
assert.eq(newSpillCount, initialSpillCount);
initialSpillCount = newSpillCount;
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assert.eq(releaseMemoryRes.cursorsReleased, [cursorId], releaseMemoryRes);
assert.eq(initialSpillCount, getSpillCounter());
jsTest.log.info("Running getMore");
const results = cursor.toArray();
assertArrayEq({actual: results, expected: expectedResults});
});
setServerParameter(sbeMemorySizeKnob, sbeMemorySizeInitialValue);
setServerParameter(classicMemorySizeKnob, classicMemorySizeInitialValue);
}
// No disk space available for spilling.
{
const willNotSpill =
isTimeSeriesCollection(db, coll.getName()) &&
expectedResults.length <= 128 &&
getEngine(explain) === "sbe";
if (!willNotSpill) {
jsTest.log.info(`Running releaseMemory with no disk space available`);
runReleaseMemoryTestWithRetries(() => {
const cursor = coll.aggregate(pipeline, {"allowDiskUse": true, cursor: {batchSize: 1}});
const cursorId = cursor.getId();
// Release memory (i.e., spill)
setAvailableDiskSpaceMode(db.getSiblingDB("admin"), "alwaysOn");
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assertReleaseMemoryFailedWithCode(releaseMemoryRes, cursorId, ErrorCodes.OutOfDiskSpace);
setAvailableDiskSpaceMode(db.getSiblingDB("admin"), "off");
jsTest.log.info("Running getMore");
assert.throwsWithCode(() => cursor.toArray(), ErrorCodes.CursorNotFound);
});
}
}
// Disallow spilling in group.
jsTest.log.info(`Running \`releaseMemory with no allowDiskUse\` test case`);
for (let pipelineIndex = 0; pipelineIndex < pipelines.length; pipelineIndex++) {
const pipeline = pipelines[pipelineIndex];
for (let collIndex = 0; collIndex < collections.length; collIndex++) {
const coll = collections[collIndex];
const {explain, expectedResults} = pipelineData[pipelineIndex][collIndex];
if (hasMergeCursors(explain)) {
// When `allowDiskUse` is false and a pipeline with `$mergeCursors` is used, operations
// might execute in `mongos`. So, the group operation will be performed on `mongos`, and
// `forceSpill` will be disregarded.
quit();
continue;
}
jsTest.log.info(`Testing collection ${coll.getName()} on pipeline: ${tojson(pipeline)}`);
const cursor = coll.aggregate(pipeline, {"allowDiskUse": false, cursor: {batchSize: 1}});
const cursorId = cursor.getId();
// Disallow spilling in group.
{
jsTest.log.info(`Running releaseMemory with no allowDiskUse`);
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assertReleaseMemoryFailedWithCode(releaseMemoryRes, cursorId, [
ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
ErrorCodes.ReleaseMemoryShardError,
]);
runReleaseMemoryTestWithRetries(() => {
const cursor = coll.aggregate(pipeline, {"allowDiskUse": false, cursor: {batchSize: 1}});
const cursorId = cursor.getId();
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assertReleaseMemoryFailedWithCode(releaseMemoryRes, cursorId, [
ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
ErrorCodes.ReleaseMemoryShardError,
]);
jsTest.log.info("Running getMore");
const results = cursor.toArray();
assertArrayEq({actual: results, expected: expectedResults});
});
}
jsTest.log.info("Running getMore");
const results = cursor.toArray();
assertArrayEq({actual: results, expected: expectedResults});
}
}
jsTest.log.info(`Setting memory to 100MB and running \`no spill in first batch\` test case`);
setServerParameter(sbeMemorySizeKnob, 100 * 1024 * 1024);
setServerParameter(classicMemorySizeKnob, 100 * 1024 * 1024);
for (let pipelineIndex = 0; pipelineIndex < pipelines.length; pipelineIndex++) {
const pipeline = pipelines[pipelineIndex];
for (let collIndex = 0; collIndex < collections.length; collIndex++) {
const coll = collections[collIndex];
const {explain, expectedResults} = pipelineData[pipelineIndex][collIndex];
jsTest.log.info(`Testing collection ${coll.getName()} on pipeline: ${tojson(pipeline)}`);
runReleaseMemoryTestWithRetries(() => {
let initialSpillCount = getSpillCounter();
const cursor = coll.aggregate(pipeline, {"allowDiskUse": true, cursor: {batchSize: 1}});
const cursorId = cursor.getId();
const newSpillCount = getSpillCounter();
assert.eq(newSpillCount, initialSpillCount);
initialSpillCount = newSpillCount;
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assert.eq(releaseMemoryRes.cursorsReleased, [cursorId], releaseMemoryRes);
// In a timeseries collection, in sbe, all records are consumed in the first batch (up
// to kBlockOutSize = 128) and there is nothing to spill.
const willNotSpill =
isTimeSeriesCollection(db, coll.getName()) &&
expectedResults.length <= 128 &&
getEngine(explain) === "sbe";
if (willNotSpill) {
assert.eq(initialSpillCount, getSpillCounter());
} else {
assert.lt(initialSpillCount, getSpillCounter());
}
jsTest.log.info("Running getMore");
const results = cursor.toArray();
assertArrayEq({actual: results, expected: expectedResults});
});
}
}
jsTest.log.info(`Leaving available memory to 100MB and running \`return all results in the first batch\` test case`);
for (let pipelineIndex = 0; pipelineIndex < pipelines.length; pipelineIndex++) {
const pipeline = pipelines[pipelineIndex];
for (let collIndex = 0; collIndex < collections.length; collIndex++) {
const coll = collections[collIndex];
const {explain, expectedResults} = pipelineData[pipelineIndex][collIndex];
jsTest.log.info(`Testing collection ${coll.getName()} on pipeline: ${tojson(pipeline)}`);
runReleaseMemoryTestWithRetries(() => {
let initialSpillCount = getSpillCounter();
const cursor = coll.aggregate(pipeline, {
"allowDiskUse": true,
cursor: {batchSize: expectedResults.length},
});
const cursorId = cursor.getId();
const newSpillCount = getSpillCounter();
assert.eq(newSpillCount, initialSpillCount);
initialSpillCount = newSpillCount;
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assert.eq(releaseMemoryRes.cursorsReleased, [cursorId], releaseMemoryRes);
assert.eq(initialSpillCount, getSpillCounter());
jsTest.log.info("Running getMore");
const results = cursor.toArray();
assertArrayEq({actual: results, expected: expectedResults});
});
}
}
// Run query with increased spilling to spill while creating the first batch.
jsTest.log.info(`Setting memory to 1KB and running \`spill in first batch\` test case`);
setServerParameter(sbeMemorySizeKnob, 1 * 1024);
setServerParameter(classicMemorySizeKnob, 1 * 1024);
for (let pipelineIndex = 0; pipelineIndex < pipelines.length; pipelineIndex++) {
const pipeline = pipelines[pipelineIndex];
for (let collIndex = 0; collIndex < collections.length; collIndex++) {
const coll = collections[collIndex];
const {explain, expectedResults} = pipelineData[pipelineIndex][collIndex];
jsTest.log.info(`Testing collection ${coll.getName()} on pipeline: ${tojson(pipeline)}`);
runReleaseMemoryTestWithRetries(() => {
let initialSpillCount = getSpillCounter();
const cursor = coll.aggregate(pipeline, {allowDiskUse: true, cursor: {batchSize: 1}});
const cursorId = cursor.getId();
const newSpillCount = getSpillCounter();
assert.lt(initialSpillCount, newSpillCount);
initialSpillCount = newSpillCount;
// Release memory (i.e., spill)
const releaseMemoryCmd = {releaseMemory: [cursorId]};
jsTest.log.info("Running releaseMemory: ", releaseMemoryCmd);
const releaseMemoryRes = db.runCommand(releaseMemoryCmd);
assert.commandWorked(releaseMemoryRes);
assert.eq(releaseMemoryRes.cursorsReleased, [cursorId], releaseMemoryRes);
assert.eq(initialSpillCount, newSpillCount);
jsTest.log.info("Running getMore");
const results = cursor.toArray();
assertArrayEq({actual: results, expected: expectedResults});
});
}
}
jsTest.log.info(`Restoring server parameters to initial values`);
setServerParameter(sbeMemorySizeKnob, sbeMemorySizeInitialValue);
setServerParameter(classicMemorySizeKnob, classicMemorySizeInitialValue);
setServerParameter(sbeIncreasedSpillingKnob, sbeIncreasedSpillingInitialValue);
setServerParameter(classicIncreasedSpillingKnob, classicIncreasedSpillingInitialValue);
for (let i = 0; i < dsCursorKnobs.length; i++) {

View File

@ -74,7 +74,9 @@ function takeAction(conn, operation) {
let topology;
try {
topology = DiscoverTopology.findConnectedNodes(conn);
topology = DiscoverTopology.findConnectedNodes(conn, {
connectFn: (host) => newMongoWithRetry(host, undefined, {gRPC: false}),
});
} catch (e) {
let errorWithCode = "Code: " + e.code + ", Message: " + e;
jsTest.log.info("Error during topology discovery: " + errorWithCode);