SERVER-119378 Tweak the public api and comments of PipelineDependencyGraph (#54320)

GitOrigin-RevId: 95cb6b48df9d78a8b3cfb1853d359e21ce383916
This commit is contained in:
Henri Nikku 2026-05-26 11:25:48 +01:00 committed by MongoDB Bot
parent 5d8deadeb6
commit 1adc42dcf2
5 changed files with 273 additions and 245 deletions

View File

@ -234,7 +234,7 @@ BENCHMARK_DEFINE_F(PipelineOptimizationBMFixture, BM_RebuildDependencyGraphFromM
DependencyGraph graph(pipeline->getSources());
for (auto keepRunning : state) {
graph.recompute(middleIt);
graph.recompute_forTest(middleIt);
}
}
BENCHMARK_REGISTER_F(PipelineOptimizationBMFixture, BM_RebuildDependencyGraphFromMiddle)

View File

@ -570,7 +570,7 @@ public:
grow(endIt);
}
boost::intrusive_ptr<mongo::DocumentSource> getDeclaringStage(DocumentSource* ds,
boost::intrusive_ptr<mongo::DocumentSource> getDeclaringStage(const DocumentSource* ds,
PathRef path) const {
auto stageId = getPreviousStageId(ds);
if (!stageId) {
@ -589,7 +589,7 @@ public:
return nullptr;
}
DeclaringStageResult getDeclaringStageIncludingSubpipelines(DocumentSource* ds,
DeclaringStageResult getDeclaringStageIncludingSubpipelines(const DocumentSource* ds,
PathRef path) const {
auto stageId = getPreviousStageId(ds);
if (!stageId) {
@ -610,8 +610,8 @@ public:
if (auto* subGraph = _stages[declaringStageId].subpipelineGraph) {
auto suffixPath = skipPathComponents(path, prefix.size() + 1);
if (!suffixPath.empty()) {
auto result =
subGraph->getDeclaringStageIncludingSubpipelines(nullptr, suffixPath);
auto result = subGraph->getDeclaringStageIncludingSubpipelines_forTest(
nullptr, suffixPath);
result.srcStages.insert(result.srcStages.begin(),
_stages[declaringStageId].documentSource);
result.fromSubpipeline = true;
@ -623,7 +623,7 @@ public:
return {{getDeclaringStage(ds, path)}};
}
bool canPathBeArray(DocumentSource* ds, PathRef path) const {
bool canPathBeArray(const DocumentSource* ds, PathRef path) const {
auto stageId = getPreviousStageId(ds);
if (!stageId) {
// Empty pipeline - all paths come from the base collection.
@ -687,7 +687,7 @@ public:
MONGO_UNREACHABLE_TASSERT(12266805);
}
boost::optional<Value> getConstant(DocumentSource* ds, PathRef path) const {
boost::optional<Value> getConstant(const DocumentSource* ds, PathRef path) const {
auto stageId = getPreviousStageId(ds);
if (!stageId) {
return boost::none;
@ -731,7 +731,7 @@ public:
MONGO_UNREACHABLE_TASSERT(11939201);
}
const DependencyGraph* getSubpipelineGraph(DocumentSource* ds) const {
const DependencyGraph* getSubpipelineGraph(const DocumentSource* ds) const {
auto stageId = getStageId(ds);
return _stages[stageId].subpipelineGraph;
}
@ -1367,7 +1367,7 @@ private:
/**
* Gets the stage node that represents the given DocumentSource in the graph.
*/
StageId getStageId(DocumentSource* ds) const {
StageId getStageId(const DocumentSource* ds) const {
if (!ds) {
return _stages.getLastId();
}
@ -1382,7 +1382,7 @@ private:
* Gets the stage node that represents the stage before the given DocumentSource in the graph. A
* nullptr denotes the position after last stage.
*/
StageId getPreviousStageId(DocumentSource* ds) const {
StageId getPreviousStageId(const DocumentSource* ds) const {
auto stageId = getStageId(ds);
if (ds) {
if (stageId == StageId{0}) {
@ -1945,29 +1945,30 @@ DependencyGraph::~DependencyGraph() = default;
DependencyGraph::DependencyGraph(DependencyGraph&&) noexcept = default;
DependencyGraph& DependencyGraph::operator=(DependencyGraph&&) noexcept = default;
boost::intrusive_ptr<mongo::DocumentSource> DependencyGraph::getDeclaringStage(DocumentSource* ds,
PathRef path) const {
boost::intrusive_ptr<mongo::DocumentSource> DependencyGraph::getDeclaringStage_forTest(
const DocumentSource* ds, PathRef path) const {
return _impl->getDeclaringStage(ds, path);
}
DeclaringStageResult DependencyGraph::getDeclaringStageIncludingSubpipelines(DocumentSource* ds,
PathRef path) const {
DeclaringStageResult DependencyGraph::getDeclaringStageIncludingSubpipelines_forTest(
const DocumentSource* ds, PathRef path) const {
return _impl->getDeclaringStageIncludingSubpipelines(ds, path);
}
bool DependencyGraph::canPathBeArray(DocumentSource* ds, PathRef path) const {
bool DependencyGraph::canPathBeArray(const DocumentSource* ds, PathRef path) const {
return _impl->canPathBeArray(ds, path);
}
boost::optional<Value> DependencyGraph::getConstant(DocumentSource* ds, PathRef path) const {
boost::optional<Value> DependencyGraph::getConstant(const DocumentSource* ds, PathRef path) const {
return _impl->getConstant(ds, path);
}
const DependencyGraph* DependencyGraph::getSubpipelineGraph(DocumentSource* ds) const {
const DependencyGraph* DependencyGraph::getSubpipelineGraph(const DocumentSource* ds) const {
return _impl->getSubpipelineGraph(ds);
}
void DependencyGraph::recompute(boost::optional<DocumentSourceContainer::const_iterator> stageIt) {
void DependencyGraph::recompute_forTest(
boost::optional<DocumentSourceContainer::const_iterator> stageIt) {
_impl->recompute(stageIt);
}

View File

@ -44,10 +44,21 @@
namespace mongo::pipeline::dependency_graph {
/**
* A dot-separated field path. Every component is interpreted as a field name and never as an
* array index.
*/
using PathRef = StringData;
/**
* Callback used to query whether a path from the input of the pipeline (i.e. the base collection)
* may resolve to an array.
*/
using CanPathBeArray = std::function<bool(StringData)>;
/**
* Always returns true (any path may be an array).
*/
bool defaultCanPathBeArray(StringData path);
/**
@ -97,39 +108,38 @@ public:
DependencyGraph& operator=(DependencyGraph&&) noexcept;
/**
* Return the stage which last modified the path visible from the given DocumentSource. If no
* DocumentSource is given, returns the stage which last modified the path in the whole
* pipeline. The stage must have either declared, modified or removed the path. If nullptr, the
* path is unmodified and assumed to originate from the pipeline input.
* Returns the stage which last declared, modified or removed the path as seen at the input of
* 'stage'. If 'stage' is nullptr, returns the stage which last touched the path at the end of
* the pipeline. Returns nullptr when the path passes through unchanged from the pipeline input
* (the base collection or sub-pipeline input). Only used for testing.
*
* For example, the following stages all modify the path 'a'.
* For example, the following stages all modify the path 'a':
* - {$set: {a: 1}}
* - {$set: {a.b: 1}}
* - {$project: {a: 0}}
* - {$group: {_id: ...}}
*/
boost::intrusive_ptr<mongo::DocumentSource> getDeclaringStage(DocumentSource* stage,
PathRef path) const;
boost::intrusive_ptr<mongo::DocumentSource> getDeclaringStage_forTest(
const DocumentSource* stage, PathRef path) const;
/**
* Return the stage which last modified the path visible from the given DocumentSource along
* with all the intermediate stages that contain subpipelines. The stage must have either
* declared, modified or removed the path. If the stage is nullptr, the path is originating from
* the pipeline input.
* Like getDeclaringStage_forTest, but additionally records the chain of intermediate
* sub-pipeline containing stages that the path crosses through.
*
* When the path crosses into a sub-pipeline (e.g. "docs.x" through a $lookup), the result
* will have 'fromSubpipeline' set to true and 'srcStages' vector populated with pointers to the
* sequence of intermediate stages with subpipelines and the final declaring stage or nullptr
* (if it comes from the collection).
* When the path crosses into a sub-pipeline (e.g. "docs.x" through a $lookup), the result has
* 'fromSubpipeline' set to true and 'srcStages' populated with the chain of intermediate
* sub-pipeline containing stages followed by the final declaring stage (or nullptr if the path
* comes from the sub-pipeline's input).
*/
DeclaringStageResult getDeclaringStageIncludingSubpipelines(DocumentSource* stage,
PathRef path) const;
DeclaringStageResult getDeclaringStageIncludingSubpipelines_forTest(const DocumentSource* stage,
PathRef path) const;
/**
* Returns false if the path visible from the given DocumentSource can be assumed to not contain
* arrays. If nullptr, the path is assumed to originate from the pipeline input.
* Returns false if the path as seen at the input of 'stage' can be proven to not be an array.
* Returns true otherwise. If 'stage' is nullptr, the path is evaluated as it appears at the end
* of the pipeline.
*/
bool canPathBeArray(DocumentSource* stage, PathRef path) const;
bool canPathBeArray(const DocumentSource* stage, PathRef path) const;
/**
* Returns the constant value of 'path' visible to 'stage' (i.e., as it appears in the input
@ -141,19 +151,20 @@ public:
* not statically known, which includes the case where resolving the path would have to
* traverse an array element.
*/
boost::optional<Value> getConstant(DocumentSource* stage, PathRef path) const;
boost::optional<Value> getConstant(const DocumentSource* stage, PathRef path) const;
/**
* Returns the dependency graph for the sub-pipeline of the given stage (e.g. $lookup,
* $unionWith), or nullptr if the stage has no sub-pipeline.
*/
const DependencyGraph* getSubpipelineGraph(DocumentSource* stage) const;
const DependencyGraph* getSubpipelineGraph(const DocumentSource* stage) const;
/**
* Invalidate and recompute the subgraph starting from the earliest nodes which correspond to
* the stage pointed to by 'stageIt'.
* Invalidate and recompute the graph from the stage pointed to by 'stageIt' onwards. If
* 'stageIt' is not given, recomputes the entire graph from the beginning of the container. Only
* used for testing.
*/
void recompute(boost::optional<DocumentSourceContainer::const_iterator> stageIt = {});
void recompute_forTest(boost::optional<DocumentSourceContainer::const_iterator> stageIt = {});
/**
* Resizes the graph so that it covers the stages in the range [container.begin(), newEndIt).
@ -211,7 +222,14 @@ public:
*/
std::vector<DeadField> getDeadFields() const;
/**
* Renders the graph as a string for debug and golden-test output.
*/
std::string toDebugString() const;
/**
* Renders the graph as BSON for debug and golden-test output.
*/
BSONObj toBSON() const;
private:
@ -220,20 +238,23 @@ private:
};
/**
* Constructs the DependencyGraph and allows it to be invalidated and recomputed.
* Owns and lazily constructs the DependencyGraph for a pipeline and allows it to be invalidated and
* recomputed as the pipeline is rewritten.
*/
class DependencyGraphContext {
public:
DependencyGraphContext(ExpressionContext& expCtx, DocumentSourceContainer& container);
/**
* Get a dependency graph which is valid up to the given element.
* Returns a dependency graph that covers the stages from the beginning of the container up to
* and including 'maxStageIt'. If 'maxStageIt' is not given, covers the whole container.
*/
const DependencyGraph& getGraph(
boost::optional<DocumentSourceContainer::const_iterator> maxStageIt = {}) const;
/**
* Report that the stages starting at 'startIt' may have changed.
* Report that the stages starting at 'startIt' may have changed. The graph will be recomputed
* for those stages on the next call to getGraph().
*/
void invalidateFrom(DocumentSourceContainer::const_iterator startIt);

View File

@ -38,7 +38,7 @@ inline void recomputeAndAssert(DependencyGraph& graph, const Pipeline& pipeline,
const auto& sources = pipeline.getSources();
for (auto it = sources.begin(); it != sources.end(); ++it) {
func();
graph.recompute(it);
graph.recompute_forTest(it);
}
func();
}