SERVER-118423 Implement DocsNeededBounds reporting for extensions (#51217)
GitOrigin-RevId: 647587eb87410414f7fa4d0c21ef8a3ac349b223
This commit is contained in:
parent
dfa15cc750
commit
0cc5ead7c1
129
jstests/noPassthrough/extensions/extension_docs_needed_bounds.js
Normal file
129
jstests/noPassthrough/extensions/extension_docs_needed_bounds.js
Normal file
@ -0,0 +1,129 @@
|
||||
/**
|
||||
* Verifies DocsNeededBounds propagation end-to-end for extension stages by observing the batchSize mongod
|
||||
* sends mongot on the initial $search request.
|
||||
*
|
||||
* @tags: [featureFlagExtensionsAPI]
|
||||
*/
|
||||
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
||||
import {checkSbeRestrictedOrFullyEnabled} from "jstests/libs/query/sbe_util.js";
|
||||
import {getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
|
||||
import {
|
||||
checkPlatformCompatibleWithExtensions,
|
||||
withExtensionsAndMongot,
|
||||
} from "jstests/noPassthrough/libs/extension_helpers.js";
|
||||
import {mongotCommandForQuery, mongotResponseForBatch} from "jstests/with_mongot/mongotmock/lib/mongotmock.js";
|
||||
|
||||
checkPlatformCompatibleWithExtensions();
|
||||
|
||||
// Mirrors kDefaultMongotBatchSize in src/mongo/db/query/search/mongot_cursor.h.
|
||||
const kDefaultMongotBatchSize = 101;
|
||||
|
||||
withExtensionsAndMongot(
|
||||
{
|
||||
"libfoo_mongo_extension.so": {},
|
||||
"liblimit_mongo_extension.so": {},
|
||||
},
|
||||
(conn, mongotMock) => {
|
||||
const dbName = "test";
|
||||
const db = conn.getDB(dbName);
|
||||
|
||||
if (checkSbeRestrictedOrFullyEnabled(db) && FeatureFlagUtil.isPresentAndEnabled(db.getMongo(), "SearchInSbe")) {
|
||||
jsTest.log.info("Skipping: $search in SBE uses a different cursor-establishment path.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Pin oversubscriptionFactor to 1.0 so the expected batchSize is deterministic and directly
|
||||
// reflects the bounds we compute.
|
||||
assert.commandWorked(
|
||||
db.adminCommand({setClusterParameter: {internalSearchOptions: {oversubscriptionFactor: 1}}}),
|
||||
);
|
||||
|
||||
const coll = db[jsTestName()];
|
||||
coll.drop();
|
||||
for (let i = 0; i < 20; i++) {
|
||||
assert.commandWorked(coll.insert({_id: i, x: i}));
|
||||
}
|
||||
const collUUID = getUUIDFromListCollections(db, coll.getName());
|
||||
const mockConn = mongotMock.getConnection();
|
||||
const searchQuery = {query: "x", path: "y"};
|
||||
|
||||
// Prime mongot to expect a $search with cursorOptions.batchSize === expectedBatchSize. If
|
||||
// the server sends a different batchSize, mongotmock fails the expectedCommand match and
|
||||
// the aggregate errors out.
|
||||
let nextCursorId = 1;
|
||||
function primeMongot(expectedBatchSize) {
|
||||
mockConn.adminCommand({
|
||||
setMockResponses: 1,
|
||||
cursorId: NumberLong(nextCursorId++),
|
||||
history: [
|
||||
{
|
||||
expectedCommand: mongotCommandForQuery({
|
||||
query: searchQuery,
|
||||
collName: coll.getName(),
|
||||
db: dbName,
|
||||
collectionUUID: collUUID,
|
||||
cursorOptions: {batchSize: NumberLong(expectedBatchSize)},
|
||||
}),
|
||||
response: mongotResponseForBatch(
|
||||
[
|
||||
{_id: 0, $searchScore: 0.9},
|
||||
{_id: 1, $searchScore: 0.8},
|
||||
],
|
||||
NumberLong(0),
|
||||
coll.getFullName(),
|
||||
1,
|
||||
),
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
// Declared bounds: $extensionLimit reports {effect: "limit", value: 50} via its
|
||||
// LogicalAggStage get_docs_needed_bounds callback. Bounds become (50, 50); with
|
||||
// oversubscriptionFactor=1, batchSize = 50
|
||||
{
|
||||
const expectedBatchSize = 50;
|
||||
primeMongot(expectedBatchSize);
|
||||
assert.commandWorked(
|
||||
db.runCommand({
|
||||
aggregate: coll.getName(),
|
||||
pipeline: [{$search: searchQuery}, {$extensionLimit: 50}],
|
||||
cursor: {},
|
||||
}),
|
||||
);
|
||||
mongotMock.assertEmpty();
|
||||
}
|
||||
|
||||
// Default (unknown) bounds: $testFoo does not override getDocsNeededBounds, so the callback
|
||||
// returns an empty BSONObj which causes the host to treat $testFoo as a stage with unknown
|
||||
// bounds. Reverse walk: $limit(5) -> (5, 5); $testFoo resets both bounds to Unknown.
|
||||
// batchSize falls back to kDefaultMongotBatchSize.
|
||||
{
|
||||
primeMongot(kDefaultMongotBatchSize);
|
||||
assert.commandWorked(
|
||||
db.runCommand({
|
||||
aggregate: coll.getName(),
|
||||
pipeline: [{$search: searchQuery}, {$testFoo: {}}, {$limit: 5}],
|
||||
cursor: {},
|
||||
}),
|
||||
);
|
||||
mongotMock.assertEmpty();
|
||||
}
|
||||
|
||||
// Reverse walk: $limit(50) -> (50, 50); $extensionLimit applies min(25, 50) = 25 to both
|
||||
// bounds; $skip(10) adds 10 to each -> (35, 35). batchSize = 35.
|
||||
{
|
||||
const expectedBatchSize = 35;
|
||||
primeMongot(expectedBatchSize);
|
||||
assert.commandWorked(
|
||||
db.runCommand({
|
||||
aggregate: coll.getName(),
|
||||
pipeline: [{$search: searchQuery}, {$skip: 10}, {$extensionLimit: 25}, {$limit: 50}],
|
||||
cursor: {},
|
||||
}),
|
||||
);
|
||||
mongotMock.assertEmpty();
|
||||
}
|
||||
},
|
||||
["standalone"],
|
||||
);
|
||||
@ -481,6 +481,10 @@ public:
|
||||
return _properties;
|
||||
}
|
||||
|
||||
boost::optional<MongoExtensionDocsNeededBoundsInfo> getDocsNeededBounds() const {
|
||||
return _logicalStage->getDocsNeededBounds();
|
||||
}
|
||||
|
||||
DepsTracker::State getDependencies(DepsTracker* deps) const override;
|
||||
|
||||
boost::optional<DistributedPlanLogic> distributedPlanLogic(
|
||||
|
||||
@ -50,6 +50,7 @@
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/pipeline/aggregation_context_fixture.h"
|
||||
#include "mongo/db/pipeline/document_source_documents.h"
|
||||
#include "mongo/db/pipeline/document_source_limit.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_document_source.h"
|
||||
#include "mongo/db/pipeline/pipeline_factory.h"
|
||||
#include "mongo/db/pipeline/search/document_source_internal_search_id_lookup.h"
|
||||
@ -2687,6 +2688,350 @@ TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsRet
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
// DocsNeededBounds callback tests for effects.
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsBlockingAlone) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "blocking")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::NeedAll>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::NeedAll>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsBlockingOverridesLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "blocking")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 10);
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::NeedAll>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::NeedAll>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsNoEffectWithLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "noEffect")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 10);
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMinBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMinBounds()), 10);
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMaxBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMaxBounds()), 10);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsNoEffectAlone) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "noEffect")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsPossibleDecreaseWithLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "possibleDecrease")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 10);
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMinBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMinBounds()), 10);
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsPossibleDecreaseAlone) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "possibleDecrease")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsPossibleIncreaseWithLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "possibleIncrease")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 10);
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMaxBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMaxBounds()), 10);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsPossibleIncreaseAlone) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "possibleIncrease")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsExplicitUnknownWithLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "unknown")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 10);
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsExplicitUnknownAlone) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "unknown")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
// DocsNeededBounds callback tests for concrete limit/skip bounds.
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "limit" << "value" << 10)));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMinBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMinBounds()), 10);
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMaxBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMaxBounds()), 10);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsLimitWithSmallerDownstreamLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "limit" << "value" << 10)));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 5);
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMinBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMinBounds()), 5);
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMaxBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMaxBounds()), 5);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsLimitWithLargerDownstreamLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "limit" << "value" << 5)));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 10);
|
||||
// Pipeline: [ext_limit(5), $limit(10)]. Reverse walk: apply $limit(10) → min=10,max=10,
|
||||
// then apply ext_limit(5) → min=5,max=5 (smaller limit wins).
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMinBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMinBounds()), 5);
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMaxBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMaxBounds()), 5);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsSkipWithLimit) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "skip" << "value" << 5)));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 10);
|
||||
// Pipeline: [ext_skip(5), $limit(10)]. Reverse walk: apply $limit(10) → min=10,max=10,
|
||||
// then apply ext_skip(5) → min=15,max=15.
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMinBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMinBounds()), 15);
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMaxBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMaxBounds()), 15);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsSkipAlone) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "skip" << "value" << 5)));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
// Skip on Unknown bounds stays Unknown.
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
// Test that a null callback return (empty BSONObj) defaults to Unknown, even with downstream limit.
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest,
|
||||
ExtensionStageDocsNeededBoundsNullCallbackDefaultsToUnknown) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(BSONObj(), BSONObj()));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto limitStage = DocumentSourceLimit::create(getExpCtx(), 10);
|
||||
auto pipeline = Pipeline::create({extensionStage, limitStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMinBounds()));
|
||||
ASSERT_TRUE(std::holds_alternative<docs_needed_bounds::Unknown>(bounds.getMaxBounds()));
|
||||
}
|
||||
|
||||
// Two consecutive extension stages with concrete DocsNeededBounds compose correctly.
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsConsecutiveExtStages) {
|
||||
auto skipAstNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "skip" << "value" << 3)));
|
||||
AggStageAstNodeHandle skipHandle{skipAstNode};
|
||||
auto skipExtensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(skipHandle));
|
||||
|
||||
auto limitAstNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "limit" << "value" << 7)));
|
||||
AggStageAstNodeHandle limitHandle{limitAstNode};
|
||||
auto limitExtensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(limitHandle));
|
||||
|
||||
// Pipeline: [ext_skip(3), ext_limit(7)]. Reverse walk: ext_limit(7) -> (7, 7),
|
||||
// then ext_skip(3) -> (10, 10).
|
||||
auto pipeline = Pipeline::create({skipExtensionStage, limitExtensionStage}, getExpCtx());
|
||||
auto bounds = extractDocsNeededBounds(*pipeline);
|
||||
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMinBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMinBounds()), 10);
|
||||
ASSERT_TRUE(std::holds_alternative<long long>(bounds.getMaxBounds()));
|
||||
ASSERT_EQUALS(std::get<long long>(bounds.getMaxBounds()), 10);
|
||||
}
|
||||
|
||||
// The IDL validator rejects a limit/skip effect that is missing the 'value' field.
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsLimitMissingValue) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(BSONObj(),
|
||||
BSON("effect" << "limit")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
ASSERT_THROWS_CODE(extractDocsNeededBounds(*pipeline), DBException, 11842302);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsSkipMissingValue) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(BSONObj(),
|
||||
BSON("effect" << "skip")));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
ASSERT_THROWS_CODE(extractDocsNeededBounds(*pipeline), DBException, 11842302);
|
||||
}
|
||||
|
||||
// The IDL validator rejects an effect that erroneously specifies a 'value' field.
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, ExtensionStageDocsNeededBoundsUnknownWithValue) {
|
||||
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
|
||||
std::make_unique<sdk::shared_test_stages::CustomBoundsAstNode>(
|
||||
BSONObj(), BSON("effect" << "unknown" << "value" << 5)));
|
||||
AggStageAstNodeHandle handle{astNode};
|
||||
auto extensionStage =
|
||||
host::DocumentSourceExtensionOptimizable::create(getExpCtx(), std::move(handle));
|
||||
|
||||
auto pipeline = Pipeline::create({extensionStage}, getExpCtx());
|
||||
ASSERT_THROWS_CODE(extractDocsNeededBounds(*pipeline), DBException, 11842303);
|
||||
}
|
||||
|
||||
// Tests for registerStageRules / _extensionRuleRegistry.
|
||||
|
||||
class StageRulesTest : public DocumentSourceExtensionOptimizableTest {
|
||||
|
||||
@ -215,6 +215,16 @@ private:
|
||||
});
|
||||
}
|
||||
|
||||
static ::MongoExtensionStatus* _hostGetDocsNeededBounds(
|
||||
const ::MongoExtensionLogicalAggStage* logicalStage,
|
||||
::MongoExtensionByteBuf** output) noexcept {
|
||||
return wrapCXXAndConvertExceptionToStatus([]() {
|
||||
tasserted(
|
||||
11842301,
|
||||
"_hostGetDocsNeededBounds should not be called on a host-allocated logical stage.");
|
||||
});
|
||||
}
|
||||
|
||||
static constexpr ::MongoExtensionLogicalAggStageVTable VTABLE = {
|
||||
.destroy = &_hostDestroy,
|
||||
.get_name = &_hostGetName,
|
||||
@ -232,9 +242,9 @@ private:
|
||||
.apply_pipeline_suffix_dependencies = &_hostApplyPipelineSuffixDependencies,
|
||||
.get_sort_pattern = &_hostGetSortPattern,
|
||||
.skip_stream = &_hostSkipStream,
|
||||
.get_docs_needed_bounds = &_hostGetDocsNeededBounds,
|
||||
};
|
||||
|
||||
std::unique_ptr<host::LogicalAggStage> _logicalAggStage;
|
||||
};
|
||||
|
||||
}; // namespace mongo::extension::host_connector
|
||||
|
||||
@ -13,6 +13,10 @@ mongo_cc_library(
|
||||
srcs = [
|
||||
":extension_agg_stage_static_properties_gen",
|
||||
":extension_error_types_gen",
|
||||
"//src/mongo/db/extension/shared:extension_agg_stage_static_properties_validator.cpp",
|
||||
],
|
||||
hdrs = [
|
||||
"//src/mongo/db/extension/shared:extension_agg_stage_static_properties_validator.h",
|
||||
],
|
||||
deps = [
|
||||
":api",
|
||||
|
||||
@ -883,6 +883,16 @@ typedef struct MongoExtensionLogicalAggStageVTable {
|
||||
MongoExtensionStatus* (*skip_stream)(MongoExtensionLogicalAggStage* logicalStage,
|
||||
MongoExtensionStreamType streamType);
|
||||
|
||||
/**
|
||||
* Returns the DocsNeededBounds effect for this stage. The output buffer contains a BSON-
|
||||
* serialized MongoExtensionDocsNeededBoundsInfo struct. If the output buffer is left as
|
||||
* nullptr, the host treats this stage as having Unknown bounds (both min and max are
|
||||
* reset to Unknown).
|
||||
*
|
||||
* Ownership of the output buffer is transferred to the caller.
|
||||
*/
|
||||
MongoExtensionStatus* (*get_docs_needed_bounds)(
|
||||
const MongoExtensionLogicalAggStage* logicalStage, MongoExtensionByteBuf** output);
|
||||
} MongoExtensionLogicalAggStageVTable;
|
||||
|
||||
/**
|
||||
|
||||
@ -31,6 +31,7 @@ global:
|
||||
cpp_namespace: "mongo::extension"
|
||||
cpp_includes:
|
||||
- "mongo/db/extension/public/api.h"
|
||||
- "mongo/db/extension/shared/extension_agg_stage_static_properties_validator.h"
|
||||
|
||||
imports:
|
||||
- "mongo/db/basic_types.idl"
|
||||
@ -79,6 +80,19 @@ enums:
|
||||
kPlanCacheRead: "planCacheRead"
|
||||
kCollStats: "collStats"
|
||||
kIndexStats: "indexStats"
|
||||
MongoExtensionDocsNeededBoundsEffect:
|
||||
description: >-
|
||||
How this stage affects the DocsNeededBounds of the pipeline. Used by the pipeline
|
||||
visitor to infer how many documents upstream stages need to produce.
|
||||
type: string
|
||||
values:
|
||||
kUnknown: "unknown"
|
||||
kBlocking: "blocking"
|
||||
kPossibleDecrease: "possibleDecrease"
|
||||
kPossibleIncrease: "possibleIncrease"
|
||||
kNoEffect: "noEffect"
|
||||
kLimit: "limit"
|
||||
kSkip: "skip"
|
||||
|
||||
structs:
|
||||
MongoExtensionPrivilegeActionEntry:
|
||||
@ -156,3 +170,20 @@ structs:
|
||||
Whether this stage is a selection stage. A selection stage does not modify or transform documents.
|
||||
type: bool
|
||||
default: false
|
||||
MongoExtensionDocsNeededBoundsInfo:
|
||||
description: >-
|
||||
Return type for the LogicalAggStage get_docs_needed_bounds callback.
|
||||
Describes how a stage affects pipeline DocsNeededBounds.
|
||||
strict: true
|
||||
cpp_validator_func: "validateDocsNeededBoundsInfo"
|
||||
fields:
|
||||
effect:
|
||||
description: How the stage affects DocsNeededBounds.
|
||||
type: MongoExtensionDocsNeededBoundsEffect
|
||||
value:
|
||||
description: >-
|
||||
Concrete numeric value for the effect. Required when effect is "limit" or
|
||||
"skip". Must be a positive integer.
|
||||
type: safeInt64
|
||||
optional: true
|
||||
validator: {gt: 0}
|
||||
|
||||
@ -126,6 +126,15 @@ public:
|
||||
return _limit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns DocsNeededBounds info for this stage as a BSON-serialized
|
||||
* MongoExtensionDocsNeededBoundsInfo. Override to declare how this stage affects pipeline
|
||||
* bounds. Return empty BSONObj (the default) to use Unknown bounds.
|
||||
*/
|
||||
virtual BSONObj getDocsNeededBounds() const {
|
||||
return BSONObj();
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluates the precondition of the rule identified by name. Extensions override this.
|
||||
*/
|
||||
@ -367,6 +376,20 @@ private:
|
||||
});
|
||||
}
|
||||
|
||||
static ::MongoExtensionStatus* _extGetDocsNeededBounds(
|
||||
const ::MongoExtensionLogicalAggStage* extLogicalStage,
|
||||
::MongoExtensionByteBuf** output) noexcept {
|
||||
return wrapCXXAndConvertExceptionToStatus([&]() {
|
||||
*output = nullptr;
|
||||
const auto& impl =
|
||||
static_cast<const ExtensionLogicalAggStageAdapter*>(extLogicalStage)->getImpl();
|
||||
auto bounds = impl.getDocsNeededBounds();
|
||||
if (!bounds.isEmpty()) {
|
||||
*output = new ByteBuf(bounds);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static constexpr ::MongoExtensionLogicalAggStageVTable VTABLE = {
|
||||
.destroy = &_extDestroy,
|
||||
.get_name = &_extGetName,
|
||||
@ -383,7 +406,8 @@ private:
|
||||
.get_filter = &_extGetFilter,
|
||||
.apply_pipeline_suffix_dependencies = &_extApplyPipelineSuffixDependencies,
|
||||
.get_sort_pattern = &_extGetSortPattern,
|
||||
.skip_stream = &_extSkipStream};
|
||||
.skip_stream = &_extSkipStream,
|
||||
.get_docs_needed_bounds = &_extGetDocsNeededBounds};
|
||||
std::unique_ptr<LogicalAggStage> _stage;
|
||||
};
|
||||
|
||||
|
||||
@ -560,6 +560,56 @@ protected:
|
||||
BSONObj _properties;
|
||||
};
|
||||
|
||||
/**
|
||||
* A LogicalAggStage that overrides getDocsNeededBounds() with a configurable BSON return value.
|
||||
* Used to test the DocsNeededBounds visitor.
|
||||
*/
|
||||
class CustomBoundsLogicalAggStage : public TransformLogicalAggStage {
|
||||
public:
|
||||
CustomBoundsLogicalAggStage(BSONObj boundsInfo)
|
||||
: TransformLogicalAggStage(), _boundsInfo(boundsInfo.getOwned()) {}
|
||||
|
||||
BSONObj getDocsNeededBounds() const override {
|
||||
return _boundsInfo;
|
||||
}
|
||||
|
||||
std::unique_ptr<sdk::LogicalAggStage> clone() const override {
|
||||
return std::make_unique<CustomBoundsLogicalAggStage>(_boundsInfo);
|
||||
}
|
||||
|
||||
private:
|
||||
BSONObj _boundsInfo;
|
||||
};
|
||||
|
||||
/**
|
||||
* An AstNode that creates a CustomBoundsLogicalAggStage with configurable bounds.
|
||||
* Also accepts custom static properties for non-bounds-related testing.
|
||||
*/
|
||||
class CustomBoundsAstNode : public sdk::AggStageAstNode {
|
||||
public:
|
||||
CustomBoundsAstNode(BSONObj properties, BSONObj boundsInfo)
|
||||
: sdk::AggStageAstNode("$customBounds"),
|
||||
_properties(properties.getOwned()),
|
||||
_boundsInfo(boundsInfo.getOwned()) {}
|
||||
|
||||
BSONObj getProperties() const override {
|
||||
return _properties;
|
||||
}
|
||||
|
||||
std::unique_ptr<sdk::LogicalAggStage> promote(
|
||||
const ::MongoExtensionCatalogContext& catalogContext) const override {
|
||||
return std::make_unique<CustomBoundsLogicalAggStage>(_boundsInfo);
|
||||
}
|
||||
|
||||
std::unique_ptr<sdk::AggStageAstNode> clone() const override {
|
||||
return std::make_unique<CustomBoundsAstNode>(_properties, _boundsInfo);
|
||||
}
|
||||
|
||||
private:
|
||||
BSONObj _properties;
|
||||
BSONObj _boundsInfo;
|
||||
};
|
||||
|
||||
static constexpr std::string_view kSearchLikeSourceStageName = "$searchLikeSource";
|
||||
|
||||
class SearchLikeSourceAggStageAstNode : public sdk::TestAstNode<TransformLogicalAggStage> {
|
||||
|
||||
@ -5,6 +5,7 @@ package(default_visibility = ["//visibility:public"])
|
||||
exports_files(
|
||||
glob([
|
||||
"*.h",
|
||||
"*.cpp",
|
||||
]),
|
||||
)
|
||||
|
||||
|
||||
@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/extension/shared/extension_agg_stage_static_properties_validator.h"
|
||||
|
||||
#include "mongo/db/extension/public/extension_agg_stage_static_properties_gen.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
namespace mongo::extension {
|
||||
|
||||
void validateDocsNeededBoundsInfo(const MongoExtensionDocsNeededBoundsInfo* info) {
|
||||
const bool requiresValue =
|
||||
info->getEffect() == MongoExtensionDocsNeededBoundsEffectEnum::kLimit ||
|
||||
info->getEffect() == MongoExtensionDocsNeededBoundsEffectEnum::kSkip;
|
||||
uassert(11842302,
|
||||
"'value' must be specified when 'effect' is 'limit' or 'skip'",
|
||||
!requiresValue || info->getValue().has_value());
|
||||
uassert(11842303,
|
||||
"'value' must only be specified when 'effect' is 'limit' or 'skip'",
|
||||
requiresValue || !info->getValue().has_value());
|
||||
}
|
||||
|
||||
} // namespace mongo::extension
|
||||
@ -0,0 +1,42 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/util/modules.h"
|
||||
|
||||
namespace mongo::extension {
|
||||
class MongoExtensionDocsNeededBoundsInfo;
|
||||
|
||||
/**
|
||||
* Validates that 'value' is provided when 'effect' is "limit" or "skip", and is absent otherwise.
|
||||
*/
|
||||
void validateDocsNeededBoundsInfo(const MongoExtensionDocsNeededBoundsInfo* info);
|
||||
|
||||
} // namespace mongo::extension
|
||||
@ -182,4 +182,20 @@ BSONObj LogicalAggStageAPI::getSortPattern() const {
|
||||
ExtensionByteBufHandle ownedBuf{buf};
|
||||
return bsonObjFromByteView(ownedBuf->getByteView()).getOwned();
|
||||
}
|
||||
|
||||
boost::optional<MongoExtensionDocsNeededBoundsInfo> LogicalAggStageAPI::getDocsNeededBounds()
|
||||
const {
|
||||
::MongoExtensionByteBuf* buf{nullptr};
|
||||
invokeCAndConvertStatusToException(
|
||||
[&]() { return _vtable().get_docs_needed_bounds(get(), &buf); });
|
||||
|
||||
if (!buf) {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
ExtensionByteBufHandle ownedBuf{buf};
|
||||
auto bson = bsonObjFromByteView(ownedBuf->getByteView()).getOwned();
|
||||
return MongoExtensionDocsNeededBoundsInfo::parse(bson);
|
||||
}
|
||||
|
||||
} // namespace mongo::extension
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/db/extension/public/api.h"
|
||||
#include "mongo/db/extension/public/extension_agg_stage_static_properties_gen.h"
|
||||
#include "mongo/db/extension/shared/handle/aggregation_stage/executable_agg_stage.h"
|
||||
#include "mongo/db/extension/shared/handle/handle.h"
|
||||
#include "mongo/db/query/explain_options.h"
|
||||
@ -130,6 +131,12 @@ public:
|
||||
*/
|
||||
void skipStream(::MongoExtensionStreamType streamType);
|
||||
|
||||
/**
|
||||
* Returns the DocsNeededBounds info for this stage. Returns boost::none if the extension
|
||||
* does not provide bounds info.
|
||||
*/
|
||||
boost::optional<MongoExtensionDocsNeededBoundsInfo> getDocsNeededBounds() const;
|
||||
|
||||
static void assertVTableConstraints(const VTable_t& vtable) {
|
||||
tassert(11420603, "LogicalAggStage 'get_name' is null", vtable.get_name != nullptr);
|
||||
tassert(11173703, "LogicalAggStage 'serialize' is null", vtable.serialize != nullptr);
|
||||
@ -159,6 +166,9 @@ public:
|
||||
"LogicalAggStage 'get_sort_pattern' is null",
|
||||
vtable.get_sort_pattern != nullptr);
|
||||
tassert(12601400, "LogicalAggStage 'skip_stream' is null", vtable.skip_stream != nullptr);
|
||||
tassert(11842300,
|
||||
"LogicalAggStage 'get_docs_needed_bounds' is null",
|
||||
vtable.get_docs_needed_bounds != nullptr);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -84,6 +84,12 @@ public:
|
||||
return _stageBson;
|
||||
}
|
||||
|
||||
mongo::BSONObj getDocsNeededBounds() const override {
|
||||
auto limit = _stageBson.firstElement().parseIntegerElementToNonNegativeLong().getValue();
|
||||
return BSON("effect" << "limit"
|
||||
<< "value" << limit);
|
||||
}
|
||||
|
||||
std::unique_ptr<sdk::LogicalAggStage> clone() const override {
|
||||
return std::make_unique<LimitLogicalStage>(_name, _stageBson);
|
||||
}
|
||||
|
||||
@ -28,6 +28,10 @@
|
||||
*/
|
||||
#include "mongo/db/pipeline/visitors/document_source_visitor_docs_needed_bounds.h"
|
||||
|
||||
#include "mongo/db/extension/host/document_source_extension_for_query_shape.h"
|
||||
#include "mongo/db/extension/host/document_source_extension_optimizable.h"
|
||||
#include "mongo/db/extension/public/extension_agg_stage_static_properties_gen.h"
|
||||
#include "mongo/db/extension/shared/handle/aggregation_stage/logical.h"
|
||||
#include "mongo/db/pipeline/document_source_group.h"
|
||||
#include "mongo/db/pipeline/document_source_limit.h"
|
||||
#include "mongo/db/pipeline/document_source_lookup.h"
|
||||
@ -365,10 +369,49 @@ void visit(DocsNeededBoundsContext* ctx, const DocumentSourceSequentialDocumentC
|
||||
// the first in the pipeline, where it populates the result stream.
|
||||
}
|
||||
|
||||
namespace {
|
||||
void applyDocsNeededBoundsEffect(DocsNeededBoundsContext* ctx,
|
||||
extension::MongoExtensionDocsNeededBoundsEffectEnum effect,
|
||||
boost::optional<std::int64_t> value) {
|
||||
// The IDL validator validateDocsNeededBoundsInfo guarantees that 'value' is set iff 'effect' is
|
||||
// kLimit or kSkip, so the dereferences below are safe.
|
||||
using Effect = extension::MongoExtensionDocsNeededBoundsEffectEnum;
|
||||
switch (effect) {
|
||||
case Effect::kUnknown:
|
||||
ctx->applyUnknownStage();
|
||||
return;
|
||||
case Effect::kBlocking:
|
||||
ctx->applyBlockingStage();
|
||||
return;
|
||||
case Effect::kPossibleDecrease:
|
||||
ctx->applyPossibleDecreaseStage();
|
||||
return;
|
||||
case Effect::kPossibleIncrease:
|
||||
ctx->applyPossibleIncreaseStage();
|
||||
return;
|
||||
case Effect::kNoEffect:
|
||||
return;
|
||||
case Effect::kLimit:
|
||||
ctx->applyLimit(*value);
|
||||
return;
|
||||
case Effect::kSkip:
|
||||
ctx->applySkip(*value);
|
||||
return;
|
||||
}
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
void visitExtensionStage(DocsNeededBoundsContext* ctx,
|
||||
const extension::host::DocumentSourceExtensionOptimizable& source) {
|
||||
// TODO SERVER-118423: Allow extension stages to report their own bounds.
|
||||
ctx->applyUnknownStage();
|
||||
auto boundsInfo = source.getDocsNeededBounds();
|
||||
if (!boundsInfo) {
|
||||
// Default to unknown bounds.
|
||||
ctx->applyUnknownStage();
|
||||
return;
|
||||
}
|
||||
|
||||
applyDocsNeededBoundsEffect(ctx, boundsInfo->getEffect(), boundsInfo->getValue());
|
||||
}
|
||||
|
||||
void visitExtensionStage(DocsNeededBoundsContext* ctx,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user