SERVER-125594 Implement desugaring infrastructure for hybrid search lite parsed stages (#52718)
GitOrigin-RevId: a27a3cfe283cf5a16800053c4039cf65b5d03f51
This commit is contained in:
parent
3b3507cb34
commit
cd4d6770ed
@ -438,10 +438,14 @@ mongo_cc_library(
|
||||
"group_processor_base.cpp",
|
||||
"hybrid_search_pipeline_builder.cpp",
|
||||
"lite_parsed_graph_lookup.cpp",
|
||||
"lite_parsed_hybrid_search_desugarer.cpp",
|
||||
"lite_parsed_hybrid_search_desugarer_utils.cpp",
|
||||
"lite_parsed_lookup.cpp",
|
||||
"lite_parsed_pipeline_metadata_validation.cpp",
|
||||
"lite_parsed_rank_fusion.cpp",
|
||||
"lite_parsed_rank_fusion_desugarer_utils.cpp",
|
||||
"lite_parsed_score_fusion.cpp",
|
||||
"lite_parsed_score_fusion_desugarer_utils.cpp",
|
||||
"lite_parsed_union_with.cpp",
|
||||
"match_processor.cpp",
|
||||
"merge_processor.cpp",
|
||||
@ -1441,7 +1445,9 @@ mongo_cc_unit_test(
|
||||
"document_source_union_with_test.cpp",
|
||||
"document_source_unwind_test.cpp",
|
||||
"lite_parsed_lookup_test.cpp",
|
||||
"lite_parsed_rank_fusion_desugarer_test.cpp",
|
||||
"lite_parsed_rank_fusion_test.cpp",
|
||||
"lite_parsed_score_fusion_desugarer_test.cpp",
|
||||
"lite_parsed_score_fusion_test.cpp",
|
||||
"serverless_aggregation_context_fixture.cpp",
|
||||
"//src/mongo/db/pipeline:document_source_coll_stats_test.cpp",
|
||||
|
||||
@ -538,6 +538,15 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this stage produces scoreDetails metadata.
|
||||
* TODO SERVER-121091 This can be removed once hybrid search desugars into the internal hybrid
|
||||
* search stage.
|
||||
*/
|
||||
virtual bool isScoreDetailsStage() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this stage is a selection stage. A selection stage does not modify or
|
||||
* transform documents.
|
||||
|
||||
243
src/mongo/db/pipeline/lite_parsed_hybrid_search_desugarer.cpp
Normal file
243
src/mongo/db/pipeline/lite_parsed_hybrid_search_desugarer.cpp
Normal file
@ -0,0 +1,243 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/pipeline/lite_parsed_hybrid_search_desugarer.h"
|
||||
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/pipeline/document_source_hybrid_scoring_util.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_hybrid_search_desugarer_utils.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_rank_fusion_desugarer_utils.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_score_fusion_desugarer_utils.h"
|
||||
#include "mongo/db/query/util/rank_fusion_util.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
#include "mongo/util/string_map.h"
|
||||
|
||||
namespace mongo::lite_parsed_hybrid_search_desugarer {
|
||||
|
||||
StageSpecs desugarRankFusion(const LiteParsedRankFusion& stage,
|
||||
const NamespaceString& nss,
|
||||
StringData userCollName) {
|
||||
const auto& spec = stage.getSpec();
|
||||
const auto& subPipelines = *stage.getSubPipelines();
|
||||
const bool includeScoreDetails = spec.getScoreDetails();
|
||||
|
||||
// Pipeline names in spec order (so we can index by position).
|
||||
std::vector<std::string> pipelineNames;
|
||||
pipelineNames.reserve(subPipelines.size());
|
||||
for (const auto& elem : spec.getInput().getPipelines()) {
|
||||
pipelineNames.emplace_back(elem.fieldName());
|
||||
}
|
||||
|
||||
tassert(12559411,
|
||||
"$rankFusion: subPipelines and pipeline-name list size mismatch",
|
||||
pipelineNames.size() == subPipelines.size());
|
||||
|
||||
// Resolve and validate weights against the input pipeline names; otherwise empty (all default
|
||||
// to 1).
|
||||
StringMap<double> weights;
|
||||
if (const auto& combinationSpec = spec.getCombination()) {
|
||||
weights = common_utils::validateWeights(
|
||||
combinationSpec->getWeights(), pipelineNames, "$rankFusion"_sd);
|
||||
}
|
||||
|
||||
StageSpecs out;
|
||||
|
||||
for (size_t i = 0; i < subPipelines.size(); ++i) {
|
||||
const auto& pipelineName = pipelineNames[i];
|
||||
double weight = hybrid_scoring_util::getPipelineWeight(weights, pipelineName);
|
||||
|
||||
StageSpecs perPipeline = rank_fusion_utils::buildRankFusionInputPipelinePreamble(
|
||||
nss, *subPipelines[i], pipelineName, weight, includeScoreDetails);
|
||||
|
||||
if (i == 0) {
|
||||
for (auto& s : perPipeline) {
|
||||
out.push_back(std::move(s));
|
||||
}
|
||||
} else {
|
||||
out.push_back(
|
||||
common_utils::buildUnionWithLPDS(nss, userCollName, std::move(perPipeline)));
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Tail: group + replaceRoot + rank-NA mutation + score/sort/project (+ scoreDetails) ----
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss,
|
||||
common_utils::buildGroupBson(pipelineNames,
|
||||
includeScoreDetails,
|
||||
rank_fusion_utils::kInternalFieldsName,
|
||||
rank_fusion_utils::kDocsName,
|
||||
rank_fusion_utils::kDetailsScalarSuffix)));
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss,
|
||||
common_utils::buildReplaceRootMergeBson(pipelineNames,
|
||||
includeScoreDetails,
|
||||
rank_fusion_utils::kInternalFieldsName,
|
||||
rank_fusion_utils::kDocsName,
|
||||
rank_fusion_utils::kDetailsScalarSuffix)));
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, rank_fusion_utils::buildRankAddFieldsBson(pipelineNames)));
|
||||
|
||||
// TODO SERVER-85426: Remove this branch once all feature flags have been removed.
|
||||
if (isRankFusionFullEnabled()) {
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, rank_fusion_utils::buildSetMetadataScoreBson(pipelineNames)));
|
||||
if (includeScoreDetails) {
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss,
|
||||
rank_fusion_utils::buildCalculatedFinalScoreDetailsBson(pipelineNames, weights)));
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, rank_fusion_utils::buildSetMetadataScoreDetailsBson()));
|
||||
}
|
||||
out.push_back(common_utils::parseOwnedStage(nss, common_utils::buildSortByScoreMetaBson()));
|
||||
} else {
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, rank_fusion_utils::buildAddFieldsScoreBson(pipelineNames)));
|
||||
out.push_back(
|
||||
common_utils::parseOwnedStage(nss, rank_fusion_utils::buildSortByScoreScalarBson()));
|
||||
}
|
||||
out.push_back(common_utils::parseOwnedStage(nss,
|
||||
common_utils::buildProjectRemoveInternalFieldsBson(
|
||||
rank_fusion_utils::kInternalFieldsName)));
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
size_t rankFusionStageExpander(LiteParsedPipeline* pipeline,
|
||||
size_t index,
|
||||
LiteParsedDocumentSource& stage) {
|
||||
auto* rankFusionStage = dynamic_cast<LiteParsedRankFusion*>(&stage);
|
||||
tassert(12559412,
|
||||
"rankFusionStageExpander invoked with non-$rankFusion stage",
|
||||
rankFusionStage != nullptr);
|
||||
const NamespaceString& nss = pipeline->getOriginalParseNss();
|
||||
StringData userCollName = nss.coll();
|
||||
auto desugared = desugarRankFusion(*rankFusionStage, nss, userCollName);
|
||||
return pipeline->replaceStageWith(index, std::move(desugared));
|
||||
}
|
||||
|
||||
StageSpecs desugarScoreFusion(const LiteParsedScoreFusion& stage,
|
||||
const NamespaceString& nss,
|
||||
StringData userCollName) {
|
||||
const auto& spec = stage.getSpec();
|
||||
const auto& subPipelines = *stage.getSubPipelines();
|
||||
const bool includeScoreDetails = spec.getScoreDetails();
|
||||
|
||||
// Pipeline names in spec order.
|
||||
std::vector<std::string> pipelineNames;
|
||||
pipelineNames.reserve(subPipelines.size());
|
||||
for (const auto& elem : spec.getInput().getPipelines()) {
|
||||
pipelineNames.emplace_back(elem.fieldName());
|
||||
}
|
||||
|
||||
tassert(12559413,
|
||||
"$scoreFusion: subPipelines and pipeline-name list size mismatch",
|
||||
pipelineNames.size() == subPipelines.size());
|
||||
|
||||
// Validate normalization / combination shape.
|
||||
score_fusion_utils::ScoreFusionScoringOptions scoringOptions(spec);
|
||||
|
||||
StringMap<double> weights;
|
||||
if (const auto& combinationSpec = spec.getCombination()) {
|
||||
if (const auto& weightsObj = combinationSpec->getWeights()) {
|
||||
weights = common_utils::validateWeights(*weightsObj, pipelineNames, "$scoreFusion"_sd);
|
||||
}
|
||||
}
|
||||
|
||||
StageSpecs out;
|
||||
|
||||
for (size_t i = 0; i < subPipelines.size(); ++i) {
|
||||
const auto& pipelineName = pipelineNames[i];
|
||||
double weight = hybrid_scoring_util::getPipelineWeight(weights, pipelineName);
|
||||
|
||||
StageSpecs perPipeline = score_fusion_utils::buildScoreFusionInputPipelinePreamble(
|
||||
nss,
|
||||
*subPipelines[i],
|
||||
pipelineName,
|
||||
weight,
|
||||
scoringOptions.getNormalizationMethod(),
|
||||
includeScoreDetails);
|
||||
|
||||
if (i == 0) {
|
||||
for (auto& s : perPipeline) {
|
||||
out.push_back(std::move(s));
|
||||
}
|
||||
} else {
|
||||
out.push_back(
|
||||
common_utils::buildUnionWithLPDS(nss, userCollName, std::move(perPipeline)));
|
||||
}
|
||||
}
|
||||
|
||||
// Tail: $group + $replaceRoot + $setMetadata score (+ optional scoreDetails) + $sort +
|
||||
// $project.
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss,
|
||||
common_utils::buildGroupBson(pipelineNames,
|
||||
includeScoreDetails,
|
||||
score_fusion_utils::kInternalFieldsName,
|
||||
score_fusion_utils::kDocsName,
|
||||
score_fusion_utils::kDetailsScalarSuffix)));
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss,
|
||||
common_utils::buildReplaceRootMergeBson(pipelineNames,
|
||||
includeScoreDetails,
|
||||
score_fusion_utils::kInternalFieldsName,
|
||||
score_fusion_utils::kDocsName,
|
||||
score_fusion_utils::kDetailsScalarSuffix)));
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, score_fusion_utils::buildSetFinalCombinedScoreBson(pipelineNames, scoringOptions)));
|
||||
|
||||
if (includeScoreDetails) {
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, score_fusion_utils::buildCalculatedFinalScoreDetailsBson(pipelineNames, weights)));
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, score_fusion_utils::buildSetMetadataScoreDetailsBson(scoringOptions)));
|
||||
}
|
||||
|
||||
out.push_back(common_utils::parseOwnedStage(nss, common_utils::buildSortByScoreMetaBson()));
|
||||
out.push_back(common_utils::parseOwnedStage(nss,
|
||||
common_utils::buildProjectRemoveInternalFieldsBson(
|
||||
score_fusion_utils::kInternalFieldsName)));
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
size_t scoreFusionStageExpander(LiteParsedPipeline* pipeline,
|
||||
size_t index,
|
||||
LiteParsedDocumentSource& stage) {
|
||||
auto* scoreFusionStage = dynamic_cast<LiteParsedScoreFusion*>(&stage);
|
||||
tassert(12559414,
|
||||
"scoreFusionStageExpander invoked with non-$scoreFusion stage",
|
||||
scoreFusionStage != nullptr);
|
||||
const NamespaceString& nss = pipeline->getOriginalParseNss();
|
||||
StringData userCollName = nss.coll();
|
||||
auto desugared = desugarScoreFusion(*scoreFusionStage, nss, userCollName);
|
||||
return pipeline->replaceStageWith(index, std::move(desugared));
|
||||
}
|
||||
|
||||
} // namespace mongo::lite_parsed_hybrid_search_desugarer
|
||||
94
src/mongo/db/pipeline/lite_parsed_hybrid_search_desugarer.h
Normal file
94
src/mongo/db/pipeline/lite_parsed_hybrid_search_desugarer.h
Normal file
@ -0,0 +1,94 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_document_source.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_rank_fusion.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_score_fusion.h"
|
||||
#include "mongo/util/modules.h"
|
||||
|
||||
#include <cstddef>
|
||||
|
||||
namespace mongo::lite_parsed_hybrid_search_desugarer {
|
||||
|
||||
/**
|
||||
* Desugars a $rankFusion lite-parsed stage into the equivalent list of LiteParsedDocumentSource
|
||||
* stages.
|
||||
*
|
||||
* `nss` is the namespace used to parse the synthesized stages. In production callers, this is
|
||||
* `pipeline->getOriginalParseNss()` from the outer LiteParsedPipeline.
|
||||
*
|
||||
* `userCollName` is the collection name to use for synthesized $unionWith stages. In production
|
||||
* callers, this is `pipeline->getOriginalParseNss().coll()` (the namespace the outer aggregation
|
||||
* runs against).
|
||||
*/
|
||||
StageSpecs desugarRankFusion(const LiteParsedRankFusion& stage,
|
||||
const NamespaceString& nss,
|
||||
StringData userCollName);
|
||||
|
||||
/**
|
||||
* StageExpander implementation as required by LiteParsedDesugarer::StageExpander. Downcasts
|
||||
* `stage` to LiteParsedRankFusion, derives `userCollName` from `pipeline->getOriginalParseNss()`,
|
||||
* runs `desugarRankFusion`, and replaces the stage at `index` with the result. Returns the index
|
||||
* after the inserted block.
|
||||
*/
|
||||
size_t rankFusionStageExpander(LiteParsedPipeline* pipeline,
|
||||
size_t index,
|
||||
LiteParsedDocumentSource& stage);
|
||||
|
||||
/**
|
||||
* Desugars a $scoreFusion lite-parsed stage into the equivalent list of LiteParsedDocumentSource
|
||||
* stages.
|
||||
*
|
||||
* `nss` is the namespace used to parse the synthesized stages. In production callers, this is
|
||||
* `pipeline->getOriginalParseNss()` from the outer LiteParsedPipeline.
|
||||
*
|
||||
* `userCollName` is the collection name to use for synthesized $unionWith stages. In production
|
||||
* callers, this is `pipeline->getOriginalParseNss().coll()` (the namespace the outer aggregation
|
||||
* runs against).
|
||||
*/
|
||||
StageSpecs desugarScoreFusion(const LiteParsedScoreFusion& stage,
|
||||
const NamespaceString& nss,
|
||||
StringData userCollName);
|
||||
|
||||
/**
|
||||
* StageExpander implementation as required by LiteParsedDesugarer::StageExpander. Downcasts
|
||||
* `stage` to LiteParsedScoreFusion, derives `userCollName` from `pipeline->getOriginalParseNss()`,
|
||||
* runs `desugarScoreFusion`, and replaces the stage at `index` with the result. Returns the index
|
||||
* after the inserted block.
|
||||
*/
|
||||
size_t scoreFusionStageExpander(LiteParsedPipeline* pipeline,
|
||||
size_t index,
|
||||
LiteParsedDocumentSource& stage);
|
||||
|
||||
} // namespace mongo::lite_parsed_hybrid_search_desugarer
|
||||
@ -0,0 +1,315 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/pipeline/lite_parsed_hybrid_search_desugarer_utils.h"
|
||||
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/db/namespace_string_util.h"
|
||||
#include "mongo/db/pipeline/document_source_hybrid_scoring_util.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_union_with.h"
|
||||
#include "mongo/db/pipeline/owned_lite_parsed_pipeline.h"
|
||||
#include "mongo/db/query/util/string_util.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
#include "mongo/util/str.h"
|
||||
|
||||
#include <utility>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <fmt/ranges.h>
|
||||
|
||||
namespace mongo::lite_parsed_hybrid_search_desugarer::common_utils {
|
||||
|
||||
namespace {
|
||||
|
||||
// Builds and throws the "invalid weight name" error with typo suggestions.
|
||||
[[noreturn]] void failWeightsValidationWithPipelineSuggestions(
|
||||
const std::vector<std::string>& pipelineNames,
|
||||
const StringSet& matchedPipelines,
|
||||
const std::vector<std::string>& invalidWeights,
|
||||
StringData stageName) {
|
||||
std::vector<std::string> unmatchedPipelines;
|
||||
for (const auto& name : pipelineNames) {
|
||||
if (!matchedPipelines.contains(name)) {
|
||||
unmatchedPipelines.push_back(name);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::pair<std::string, std::vector<std::string>>> suggestions =
|
||||
query_string_util::computeTypoSuggestions(unmatchedPipelines, invalidWeights);
|
||||
|
||||
auto convertSingleSuggestionToString = [&](std::size_t i) -> std::string {
|
||||
std::string s = fmt::format("(provided: '{}' -> ", suggestions[i].first);
|
||||
if (suggestions[i].second.size() == 1) {
|
||||
s += fmt::format("suggested: '{}')", suggestions[i].second.front());
|
||||
} else {
|
||||
s += fmt::format("suggestions: [{}])", fmt::join(suggestions[i].second, ", "));
|
||||
}
|
||||
if (i < suggestions.size() - 1) {
|
||||
s += ", ";
|
||||
}
|
||||
return s;
|
||||
};
|
||||
|
||||
std::string errorMsg = fmt::format(
|
||||
"${} stage contained ({}) weight(s) in "
|
||||
"'combination.weights' that did not reference valid pipeline names. "
|
||||
"Suggestions for valid pipeline names: ",
|
||||
stageName,
|
||||
std::to_string(invalidWeights.size()));
|
||||
for (std::size_t i = 0; i < suggestions.size(); ++i) {
|
||||
errorMsg += convertSingleSuggestionToString(i);
|
||||
}
|
||||
|
||||
uasserted(12559400, errorMsg);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
StringMap<double> validateWeights(const BSONObj& inputWeights,
|
||||
const std::vector<std::string>& pipelineNames,
|
||||
StringData stageName) {
|
||||
StringSet pipelineNameSet(pipelineNames.begin(), pipelineNames.end());
|
||||
|
||||
StringMap<double> weights;
|
||||
std::vector<std::string> invalidWeights;
|
||||
StringSet matchedPipelines;
|
||||
|
||||
for (const auto& weightEntry : inputWeights) {
|
||||
const auto* fieldName = weightEntry.fieldName();
|
||||
|
||||
if (!pipelineNameSet.contains(fieldName)) {
|
||||
invalidWeights.emplace_back(fieldName);
|
||||
continue;
|
||||
}
|
||||
|
||||
uassert(12559402,
|
||||
str::stream() << "A pipeline named '" << fieldName
|
||||
<< "' is specified more than once in the " << stageName
|
||||
<< "'combinations.weight' object.",
|
||||
!weights.contains(fieldName));
|
||||
|
||||
uassert(12559404,
|
||||
str::stream() << stageName
|
||||
<< "'s pipeline weight must be numeric, but given non-numeric value "
|
||||
"for pipeline '"
|
||||
<< fieldName << "'.",
|
||||
weightEntry.isNumber());
|
||||
|
||||
const double weight = weightEntry.Number();
|
||||
uassert(12559401,
|
||||
str::stream() << stageName << "'s pipeline weight must be non-negative, but given "
|
||||
<< weight << " for pipeline '" << fieldName << "'.",
|
||||
weight >= 0);
|
||||
|
||||
weights[fieldName] = weight;
|
||||
matchedPipelines.insert(fieldName);
|
||||
}
|
||||
|
||||
if (static_cast<int>(pipelineNames.size()) < inputWeights.nFields()) {
|
||||
uasserted(
|
||||
12559403,
|
||||
fmt::format(
|
||||
"{} input has more weights ({}) than pipelines ({}). "
|
||||
"If 'combination.weights' is specified, there must be a less or equal number of "
|
||||
"weights as pipelines, each of which is unique and existing. "
|
||||
"Possible extraneous specified weights = [{}]",
|
||||
stageName,
|
||||
inputWeights.nFields(),
|
||||
static_cast<int>(pipelineNames.size()),
|
||||
fmt::join(invalidWeights, ", ")));
|
||||
} else if (!invalidWeights.empty()) {
|
||||
failWeightsValidationWithPipelineSuggestions(
|
||||
pipelineNames, matchedPipelines, invalidWeights, stageName);
|
||||
}
|
||||
|
||||
return weights;
|
||||
}
|
||||
|
||||
BSONObj buildReplaceRootBson(StringData docsName) {
|
||||
return BSON("$replaceWith" << BSON(docsName << "$$ROOT"));
|
||||
}
|
||||
|
||||
BSONObj buildSortByScoreMetaBson() {
|
||||
return BSON("$sort" << BSON("score" << BSON("$meta" << "score") << "_id" << 1));
|
||||
}
|
||||
|
||||
BSONObj buildProjectRemoveInternalFieldsBson(StringData internalFieldsName) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder projectBob(bob.subobjStart("$project"_sd));
|
||||
projectBob.append(internalFieldsName, 0);
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildGroupBson(const std::vector<std::string>& pipelineNames,
|
||||
bool includeScoreDetails,
|
||||
StringData internalFieldsName,
|
||||
StringData docsName,
|
||||
StringData detailsScalarSuffix) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder gBob(bob.subobjStart("$group"_sd));
|
||||
gBob.append("_id", fmt::format("${}._id", docsName));
|
||||
gBob.append(docsName, BSON("$first" << fmt::format("${}", docsName)));
|
||||
|
||||
auto accumulateScalar = [&](StringData field, const std::string& internalPath) {
|
||||
gBob.append(fmt::format("{}{}", kHsFlatFieldPrefix, field),
|
||||
BSON("$max" << BSON("$ifNull"
|
||||
<< BSON_ARRAY(fmt::format("${}", internalPath) << 0))));
|
||||
};
|
||||
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
const std::string scoreField =
|
||||
hybrid_scoring_util::getScoreFieldFromPipelineName(pipelineName);
|
||||
accumulateScalar(scoreField,
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
internalFieldsName, scoreField));
|
||||
if (includeScoreDetails) {
|
||||
const std::string detailsScalarField =
|
||||
fmt::format("{}{}", pipelineName, detailsScalarSuffix);
|
||||
accumulateScalar(detailsScalarField,
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
internalFieldsName, detailsScalarField));
|
||||
|
||||
const std::string scoreDetailsField = fmt::format("{}_scoreDetails", pipelineName);
|
||||
const std::string internalScoreDetailsPath =
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(internalFieldsName,
|
||||
scoreDetailsField);
|
||||
gBob.append(fmt::format("{}{}", kHsFlatFieldPrefix, scoreDetailsField),
|
||||
BSON("$mergeObjects" << fmt::format("${}", internalScoreDetailsPath)));
|
||||
}
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildReplaceRootMergeBson(const std::vector<std::string>& pipelineNames,
|
||||
bool includeScoreDetails,
|
||||
StringData internalFieldsName,
|
||||
StringData docsName,
|
||||
StringData detailsScalarSuffix) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder rrBob(bob.subobjStart("$replaceRoot"_sd));
|
||||
BSONObjBuilder newRootBob(rrBob.subobjStart("newRoot"_sd));
|
||||
BSONArrayBuilder mergeArr(newRootBob.subarrayStart("$mergeObjects"_sd));
|
||||
mergeArr.append(fmt::format("${}", docsName));
|
||||
BSONObjBuilder wrapperBob;
|
||||
{
|
||||
BSONObjBuilder internalFieldsBob(wrapperBob.subobjStart(internalFieldsName));
|
||||
auto appendFlat = [&](StringData field) {
|
||||
internalFieldsBob.append(field, fmt::format("${}{}", kHsFlatFieldPrefix, field));
|
||||
};
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
appendFlat(hybrid_scoring_util::getScoreFieldFromPipelineName(pipelineName));
|
||||
if (includeScoreDetails) {
|
||||
appendFlat(fmt::format("{}{}", pipelineName, detailsScalarSuffix));
|
||||
appendFlat(fmt::format("{}_scoreDetails", pipelineName));
|
||||
}
|
||||
}
|
||||
}
|
||||
mergeArr.append(wrapperBob.obj());
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
std::unique_ptr<LiteParsedDocumentSource> parseOwnedStage(const NamespaceString& nss,
|
||||
BSONObj stageBson) {
|
||||
auto lpds = LiteParsedDocumentSource::parse(nss, stageBson);
|
||||
lpds->makeOwned();
|
||||
return lpds;
|
||||
}
|
||||
|
||||
void mutateRightmostSortToOutputSortKey(const NamespaceString& nss, StageSpecs& stages) {
|
||||
// No-op if no $sort is present; e.g. $search and $vectorSearch provide sorted output
|
||||
// via sort-key metadata without an explicit $sort stage.
|
||||
for (auto it = stages.rbegin(); it != stages.rend(); ++it) {
|
||||
if ((*it)->getParseTimeName() == "$sort") {
|
||||
BSONElement origSpec = (*it)->getOriginalBson();
|
||||
tassert(12559415,
|
||||
"Expected $sort spec value to be an object",
|
||||
origSpec.type() == BSONType::object);
|
||||
|
||||
BSONObjBuilder mergedSortSpec;
|
||||
mergedSortSpec.appendElements(origSpec.embeddedObject());
|
||||
if (!origSpec.embeddedObject().hasField("$_internalOutputSortKeyMetadata"_sd)) {
|
||||
mergedSortSpec.append("$_internalOutputSortKeyMetadata", true);
|
||||
}
|
||||
|
||||
BSONObj newStageBson = BSON("$sort" << mergedSortSpec.obj());
|
||||
*it = parseOwnedStage(nss, std::move(newStageBson));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<LiteParsedDocumentSource> buildUnionWithLPDS(const NamespaceString& nss,
|
||||
StringData userCollName,
|
||||
StageSpecs perPipelineStages) {
|
||||
// Serialize the already-parsed (and sort-mutated) stages back to BSON. Each .wrap() call
|
||||
// produces a self-owning copy, so rawPipeline does not alias any stage's internal buffer.
|
||||
std::vector<BSONObj> rawPipeline;
|
||||
rawPipeline.reserve(perPipelineStages.size());
|
||||
for (const auto& s : perPipelineStages) {
|
||||
rawPipeline.push_back(s->getOriginalBson().wrap());
|
||||
}
|
||||
|
||||
NamespaceString foreignNss = NamespaceStringUtil::deserialize(nss.dbName(), userCollName);
|
||||
|
||||
// OwnedLiteParsedPipeline owns its backing BSON, so no manual setOwnedBson is needed for the
|
||||
// inner pipeline.
|
||||
OwnedLiteParsedPipeline innerLpp(foreignNss, rawPipeline);
|
||||
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder uwBob(bob.subobjStart("$unionWith"_sd));
|
||||
uwBob.append("coll", userCollName);
|
||||
BSONArrayBuilder pipeArr(uwBob.subarrayStart("pipeline"_sd));
|
||||
for (const auto& obj : rawPipeline) {
|
||||
pipeArr.append(obj);
|
||||
}
|
||||
}
|
||||
BSONObj unionWithObj = bob.obj();
|
||||
|
||||
auto lpuw = std::make_unique<LiteParsedUnionWith>(
|
||||
unionWithObj.firstElement(),
|
||||
std::move(foreignNss),
|
||||
boost::optional<OwnedLiteParsedPipeline>(std::move(innerLpp)),
|
||||
std::move(rawPipeline),
|
||||
/*hasForeignDB=*/false, // views are flattened before this point
|
||||
/*isHybridSearch=*/true);
|
||||
|
||||
// makeOwned() wraps _originalBson (a BSONElement view into unionWithObj) into a shared buffer,
|
||||
// keeping the underlying BSON alive after unionWithObj goes out of scope.
|
||||
lpuw->makeOwned();
|
||||
return lpuw;
|
||||
}
|
||||
|
||||
} // namespace mongo::lite_parsed_hybrid_search_desugarer::common_utils
|
||||
@ -0,0 +1,105 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_document_source.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
|
||||
#include "mongo/util/string_map.h"
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace mongo::lite_parsed_hybrid_search_desugarer::common_utils {
|
||||
|
||||
// Prefix for the flat scalar group keys (e.g. "__hs_<p>_score") used in the desugared $group's
|
||||
// per-pipeline accumulators and in the subsequent $replaceRoot's wrapper object.
|
||||
inline constexpr StringData kHsFlatFieldPrefix = "__hs_"_sd;
|
||||
|
||||
// Parses a synthesized BSONObj stage into an owned LPDS at namespace `nss`. Not for use with
|
||||
// stages that hold nested LiteParsedPipelines -- use buildUnionWithLPDS for $unionWith.
|
||||
std::unique_ptr<LiteParsedDocumentSource> parseOwnedStage(const NamespaceString& nss,
|
||||
BSONObj stageBson);
|
||||
|
||||
// Walks `stages` right-to-left and rewrites the rightmost $sort to emit sort-key metadata.
|
||||
// No-op if no $sort is present (e.g. $search and $vectorSearch emit scored output without one).
|
||||
void mutateRightmostSortToOutputSortKey(const NamespaceString& nss, StageSpecs& stages);
|
||||
|
||||
// Builds a $unionWith LPDS whose subpipeline is `perPipelineStages`, targeting `userCollName`
|
||||
// on the same DB as `nss`.
|
||||
std::unique_ptr<LiteParsedDocumentSource> buildUnionWithLPDS(const NamespaceString& nss,
|
||||
StringData userCollName,
|
||||
StageSpecs perPipelineStages);
|
||||
|
||||
// Validates the user-provided combination.weights BSON against the input pipeline names and
|
||||
// returns the resulting weights map.
|
||||
StringMap<double> validateWeights(const BSONObj& inputWeights,
|
||||
const std::vector<std::string>& pipelineNames,
|
||||
StringData stageName);
|
||||
|
||||
// {$replaceWith: {<docsName>: "$$ROOT"}}
|
||||
BSONObj buildReplaceRootBson(StringData docsName);
|
||||
|
||||
// {$sort: {score: {$meta: "score"}, _id: 1}}
|
||||
BSONObj buildSortByScoreMetaBson();
|
||||
|
||||
// {$project: {<internalFieldsName>: 0}}
|
||||
BSONObj buildProjectRemoveInternalFieldsBson(StringData internalFieldsName);
|
||||
|
||||
// {$group: {_id: "$<docsName>._id",
|
||||
// <docsName>: {$first: "$<docsName>"},
|
||||
// __hs_<p>_score: {$max: {$ifNull: ["$<internalFieldsName>.<p>_score", 0]}},
|
||||
// [if scoreDetails:] __hs_<p><detailsScalarSuffix>: {$max: ...},
|
||||
// __hs_<p>_scoreDetails: {$mergeObjects: ...},
|
||||
// ...}}
|
||||
//
|
||||
// `detailsScalarSuffix` is the per-pipeline scoreDetails scalar field suffix:
|
||||
// "_rank" for $rankFusion, "_rawScore" for $scoreFusion.
|
||||
BSONObj buildGroupBson(const std::vector<std::string>& pipelineNames,
|
||||
bool includeScoreDetails,
|
||||
StringData internalFieldsName,
|
||||
StringData docsName,
|
||||
StringData detailsScalarSuffix);
|
||||
|
||||
// {$replaceRoot: {newRoot: {$mergeObjects: ["$<docsName>",
|
||||
// {<internalFieldsName>: {<p>_score: "$__hs_<p>_score",
|
||||
// ...}}]}}}
|
||||
//
|
||||
// `detailsScalarSuffix`: "_rank" for $rankFusion, "_rawScore" for $scoreFusion.
|
||||
BSONObj buildReplaceRootMergeBson(const std::vector<std::string>& pipelineNames,
|
||||
bool includeScoreDetails,
|
||||
StringData internalFieldsName,
|
||||
StringData docsName,
|
||||
StringData detailsScalarSuffix);
|
||||
|
||||
} // namespace mongo::lite_parsed_hybrid_search_desugarer::common_utils
|
||||
@ -97,6 +97,20 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a LiteParsedPipeline from already-parsed StageSpecs without re-parsing. Useful
|
||||
* when a desugarer assembles a subpipeline from existing LiteParsedDocumentSources.
|
||||
*/
|
||||
LiteParsedPipeline(NamespaceString nss,
|
||||
StageSpecs stages,
|
||||
bool isRunningAgainstView_ForHybridSearch = false)
|
||||
: _stageSpecs(std::move(stages)),
|
||||
_originalParseNss(std::move(nss)),
|
||||
_isRunningAgainstView_ForHybridSearch(isRunningAgainstView_ForHybridSearch) {
|
||||
// _hasChangeStream and _involvedNamespaces are intentionally left at their in-class
|
||||
// defaults; they are Deferred and will be computed on demand from _stageSpecs.
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy constructor. Calls clone on each LiteParsedDocumentSource in the pipeline and copies
|
||||
* member variables.
|
||||
@ -281,6 +295,16 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if any stage in the pipeline is a scoreDetails stage (produces scoreDetails
|
||||
* metadata).
|
||||
*/
|
||||
bool isScoreDetailsPipeline() const {
|
||||
return std::any_of(_stageSpecs.begin(), _stageSpecs.end(), [](auto&& spec) {
|
||||
return spec->isScoreDetailsStage();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if all stages in the pipeline are selection stages (they do not modify or
|
||||
* transform documents, only retrieve, limit, or order them).
|
||||
|
||||
@ -138,6 +138,11 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
// $rankFusion produces scoreDetails metadata when the user requests it via the spec.
|
||||
bool isScoreDetailsStage() const final {
|
||||
return _parsedSpec.getScoreDetails();
|
||||
}
|
||||
|
||||
// $rankFusion does not modify documents, only combines and reorders them.
|
||||
bool isSelectionStage() const final {
|
||||
return true;
|
||||
@ -166,6 +171,10 @@ public:
|
||||
_parsedSpec, _pipelines, getOriginalBson().wrap().getOwned());
|
||||
}
|
||||
|
||||
const RankFusionSpec& getSpec() const {
|
||||
return _parsedSpec;
|
||||
}
|
||||
|
||||
private:
|
||||
RankFusionSpec _parsedSpec;
|
||||
};
|
||||
|
||||
1708
src/mongo/db/pipeline/lite_parsed_rank_fusion_desugarer_test.cpp
Normal file
1708
src/mongo/db/pipeline/lite_parsed_rank_fusion_desugarer_test.cpp
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,227 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/pipeline/lite_parsed_rank_fusion_desugarer_utils.h"
|
||||
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/db/pipeline/document_source_hybrid_scoring_util.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_hybrid_search_desugarer_utils.h"
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace mongo::lite_parsed_hybrid_search_desugarer::rank_fusion_utils {
|
||||
|
||||
BSONObj buildSetWindowFieldsBson(const std::string& rankFieldName) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder swfBob(bob.subobjStart("$_internalSetWindowFields"_sd));
|
||||
swfBob.append("sortBy", BSON("order" << 1));
|
||||
BSONObjBuilder outputBob(swfBob.subobjStart("output"_sd));
|
||||
outputBob.append(rankFieldName, BSON("$rank" << BSONObj()));
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildScoreAddFieldsBson(StringData inputPipelineName, int rankConstant, double weight) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder addFieldsBob(bob.subobjStart("$addFields"_sd));
|
||||
const std::string scoreField = hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_score", inputPipelineName));
|
||||
BSONObjBuilder scoreBob(addFieldsBob.subobjStart(scoreField));
|
||||
BSONArrayBuilder multArr(scoreBob.subarrayStart("$multiply"_sd));
|
||||
const std::string rankPath =
|
||||
fmt::format("${}",
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_rank", inputPipelineName)));
|
||||
multArr.append(BSON(
|
||||
"$divide" << BSON_ARRAY(1 << BSON("$add" << BSON_ARRAY(rankPath << rankConstant)))));
|
||||
multArr.append(weight);
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildAddInputPipelineScoreDetailsBson(StringData inputPipelineName,
|
||||
bool inputGeneratesScore,
|
||||
bool inputGeneratesScoreDetails) {
|
||||
const std::string scoreDetailsField = hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_scoreDetails", inputPipelineName));
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder addFieldsBob(bob.subobjStart("$addFields"_sd));
|
||||
if (inputGeneratesScoreDetails) {
|
||||
addFieldsBob.append(scoreDetailsField, BSON("$meta" << "scoreDetails"));
|
||||
} else if (inputGeneratesScore) {
|
||||
addFieldsBob.append(
|
||||
scoreDetailsField,
|
||||
BSON("value" << BSON("$meta" << "score") << "details" << BSONArrayBuilder().arr()));
|
||||
} else {
|
||||
addFieldsBob.append(scoreDetailsField, BSON("details" << BSONArrayBuilder().arr()));
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildRankAddFieldsBson(const std::vector<std::string>& pipelineNames) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder addFieldsBob(bob.subobjStart("$addFields"_sd));
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
const std::string rankField = hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_rank", pipelineName));
|
||||
const std::string rankPath = fmt::format("${}", rankField);
|
||||
addFieldsBob.append(rankField,
|
||||
BSON("$cond" << BSON_ARRAY(BSON("$eq" << BSON_ARRAY(rankPath << 0))
|
||||
<< "NA" << rankPath)));
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildSetMetadataScoreBson(const std::vector<std::string>& pipelineNames) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder smBob(bob.subobjStart("$setMetadata"_sd));
|
||||
BSONObjBuilder scoreBob(smBob.subobjStart("score"_sd));
|
||||
BSONArrayBuilder addArr(scoreBob.subarrayStart("$add"_sd));
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
addArr.append(
|
||||
fmt::format("${}",
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_score", pipelineName))));
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildAddFieldsScoreBson(const std::vector<std::string>& pipelineNames) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder afBob(bob.subobjStart("$addFields"_sd));
|
||||
BSONObjBuilder scoreBob(afBob.subobjStart("score"_sd));
|
||||
BSONArrayBuilder addArr(scoreBob.subarrayStart("$add"_sd));
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
addArr.append(
|
||||
fmt::format("${}",
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_score", pipelineName))));
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildCalculatedFinalScoreDetailsBson(const std::vector<std::string>& pipelineNames,
|
||||
const StringMap<double>& weights) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder addFieldsBob(bob.subobjStart("$addFields"_sd));
|
||||
BSONObjBuilder internalFieldsBob(addFieldsBob.subobjStart(kInternalFieldsName));
|
||||
BSONArrayBuilder calcArr(internalFieldsBob.subarrayStart("calculatedScoreDetails"_sd));
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
// Path of the pipeline's scoreDetails subobject: <INTERNAL_FIELDS>.<p>
|
||||
const std::string internalFieldsPipelineName =
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(kInternalFieldsName,
|
||||
pipelineName);
|
||||
const std::string rankPath = fmt::format("${}_rank", internalFieldsPipelineName);
|
||||
double weight = hybrid_scoring_util::getPipelineWeight(weights, pipelineName);
|
||||
|
||||
BSONObjBuilder mergeSub;
|
||||
mergeSub.append("inputPipelineName"_sd, pipelineName);
|
||||
mergeSub.append("rank"_sd, rankPath);
|
||||
mergeSub.append("weight",
|
||||
BSON("$cond" << BSON_ARRAY(BSON("$eq" << BSON_ARRAY(rankPath << "NA"))
|
||||
<< "$$REMOVE" << weight)));
|
||||
|
||||
BSONArrayBuilder mergeArr;
|
||||
mergeArr.append(mergeSub.obj());
|
||||
mergeArr.append(fmt::format("${}.{}_scoreDetails", kInternalFieldsName, pipelineName));
|
||||
calcArr.append(BSON("$mergeObjects" << mergeArr.arr()));
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildSetMetadataScoreDetailsBson() {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder smBob(bob.subobjStart("$setMetadata"_sd));
|
||||
BSONObjBuilder sdBob(smBob.subobjStart("scoreDetails"_sd));
|
||||
sdBob.append("value", BSON("$meta" << "score"));
|
||||
sdBob.append("description", kScoreDetailsDescription);
|
||||
sdBob.append("details",
|
||||
fmt::format("${}",
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, "calculatedScoreDetails")));
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildSortByScoreScalarBson() {
|
||||
return BSON("$sort" << BSON("score" << -1 << "_id" << 1));
|
||||
}
|
||||
|
||||
StageSpecs buildRankFusionInputPipelinePreamble(const NamespaceString& nss,
|
||||
const LiteParsedPipeline& subPipeline,
|
||||
const std::string& pipelineName,
|
||||
double weight,
|
||||
bool includeScoreDetails) {
|
||||
StageSpecs out;
|
||||
out.reserve(subPipeline.getStages().size() + 4);
|
||||
for (const auto& stage : subPipeline.getStages()) {
|
||||
out.push_back(stage->clone());
|
||||
}
|
||||
|
||||
// Rewrite rightmost $sort in-place to ensure sort-key metadata is output; no-op if no $sort
|
||||
// is present (e.g. $search and $vectorSearch emit scored output without an explicit $sort).
|
||||
common_utils::mutateRightmostSortToOutputSortKey(nss, out);
|
||||
|
||||
const bool inputGeneratesScore = subPipeline.isScoredPipeline();
|
||||
const bool inputGeneratesScoreDetails = subPipeline.isScoreDetailsPipeline();
|
||||
|
||||
out.push_back(
|
||||
common_utils::parseOwnedStage(nss, common_utils::buildReplaceRootBson(kDocsName)));
|
||||
|
||||
const std::string rankFieldName = hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_rank", pipelineName));
|
||||
out.push_back(common_utils::parseOwnedStage(nss, buildSetWindowFieldsBson(rankFieldName)));
|
||||
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, buildScoreAddFieldsBson(pipelineName, kRankConstant, weight)));
|
||||
|
||||
if (includeScoreDetails) {
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss,
|
||||
buildAddInputPipelineScoreDetailsBson(
|
||||
pipelineName, inputGeneratesScore, inputGeneratesScoreDetails)));
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
} // namespace mongo::lite_parsed_hybrid_search_desugarer::rank_fusion_utils
|
||||
104
src/mongo/db/pipeline/lite_parsed_rank_fusion_desugarer_utils.h
Normal file
104
src/mongo/db/pipeline/lite_parsed_rank_fusion_desugarer_utils.h
Normal file
@ -0,0 +1,104 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
|
||||
#include "mongo/db/pipeline/rank_fusion_pipeline_builder.h"
|
||||
#include "mongo/util/string_map.h"
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace mongo::lite_parsed_hybrid_search_desugarer::rank_fusion_utils {
|
||||
|
||||
inline constexpr int kRankConstant = RankFusionPipelineBuilder::kRankConstant;
|
||||
inline constexpr StringData kInternalFieldsName =
|
||||
RankFusionPipelineBuilder::kRankFusionInternalFieldsName;
|
||||
inline constexpr StringData kDocsName = RankFusionPipelineBuilder::kRankFusionDocsFieldName;
|
||||
inline constexpr StringData kScoreDetailsDescription =
|
||||
RankFusionPipelineBuilder::kRankFusionScoreDetailsDescription;
|
||||
|
||||
// Per-pipeline scoreDetails scalar field suffix used in the desugared $group output (and the
|
||||
// matching $replaceRoot wrapper). For $rankFusion the per-pipeline scalar is "<p>_rank".
|
||||
inline constexpr StringData kDetailsScalarSuffix = "_rank"_sd;
|
||||
|
||||
// {$_internalSetWindowFields: {sortBy: {order: 1},
|
||||
// output: {<INTERNAL_FIELDS>.<p>_rank: {$rank: {}}}}}
|
||||
BSONObj buildSetWindowFieldsBson(const std::string& rankFieldName);
|
||||
|
||||
// {$addFields: {<INTERNAL_FIELDS>.<p>_score: {$multiply: [{$divide: [1, {$add: [<rank>, K]}]},
|
||||
// <weight>]}}}
|
||||
BSONObj buildScoreAddFieldsBson(StringData inputPipelineName, int rankConstant, double weight);
|
||||
|
||||
// {$addFields: {<INTERNAL_FIELDS>.<p>_scoreDetails: <three branches>}}
|
||||
BSONObj buildAddInputPipelineScoreDetailsBson(StringData inputPipelineName,
|
||||
bool inputGeneratesScore,
|
||||
bool inputGeneratesScoreDetails);
|
||||
|
||||
// {$addFields: {<INTERNAL_FIELDS>.<p>_rank: {$cond: [{$eq:[<path>,0]},"NA",<path>]}, ...}}
|
||||
BSONObj buildRankAddFieldsBson(const std::vector<std::string>& pipelineNames);
|
||||
|
||||
// {$setMetadata: {score: {$add: ["$<INTERNAL_FIELDS>.<p>_score", ...]}}} (Full branch)
|
||||
BSONObj buildSetMetadataScoreBson(const std::vector<std::string>& pipelineNames);
|
||||
|
||||
// {$addFields: {score: {$add: ["$<INTERNAL_FIELDS>.<p>_score", ...]}}} (Basic branch)
|
||||
BSONObj buildAddFieldsScoreBson(const std::vector<std::string>& pipelineNames);
|
||||
|
||||
// {$addFields: {<INTERNAL_FIELDS>: {calculatedScoreDetails: [
|
||||
// {$mergeObjects: [{inputPipelineName: <p>, rank: "$<p>_rank",
|
||||
// weight: {$cond: [{$eq:["$<p>_rank","NA"]},"$$REMOVE",<weight>]}},
|
||||
// "$<INTERNAL_FIELDS>.<p>_scoreDetails"]},
|
||||
// ...]}}}
|
||||
BSONObj buildCalculatedFinalScoreDetailsBson(const std::vector<std::string>& pipelineNames,
|
||||
const StringMap<double>& weights);
|
||||
|
||||
// {$setMetadata: {scoreDetails: {value: {$meta: "score"}, description: "...",
|
||||
// details: "$<INTERNAL_FIELDS>.calculatedScoreDetails"}}}
|
||||
BSONObj buildSetMetadataScoreDetailsBson();
|
||||
|
||||
// {$sort: {score: -1, _id: 1}} (Basic branch)
|
||||
BSONObj buildSortByScoreScalarBson();
|
||||
|
||||
// Builds the per-input-pipeline preamble for $rankFusion: a clone of the input subpipeline's
|
||||
// LPDSs (with the rightmost $sort mutated to output sort key metadata) followed by:
|
||||
// $replaceWith ({<INTERNAL_DOCS>: "$$ROOT"})
|
||||
// $_internalSetWindowFields ({sortBy: {order: 1}, output: {<rank field>: {$rank: {}}}})
|
||||
// $addFields (per-pipeline weighted score)
|
||||
// [optional] $addFields (per-pipeline scoreDetails)
|
||||
StageSpecs buildRankFusionInputPipelinePreamble(const NamespaceString& nss,
|
||||
const LiteParsedPipeline& subPipeline,
|
||||
const std::string& pipelineName,
|
||||
double weight,
|
||||
bool includeScoreDetails);
|
||||
|
||||
} // namespace mongo::lite_parsed_hybrid_search_desugarer::rank_fusion_utils
|
||||
@ -138,6 +138,11 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
// $scoreFusion produces scoreDetails metadata when the user requests it via the spec.
|
||||
bool isScoreDetailsStage() const final {
|
||||
return _parsedSpec.getScoreDetails();
|
||||
}
|
||||
|
||||
// $scoreFusion does not modify documents, only combines and reorders them.
|
||||
bool isSelectionStage() const final {
|
||||
return true;
|
||||
@ -166,6 +171,10 @@ public:
|
||||
_parsedSpec, _pipelines, getOriginalBson().wrap().getOwned());
|
||||
}
|
||||
|
||||
const ScoreFusionSpec& getSpec() const {
|
||||
return _parsedSpec;
|
||||
}
|
||||
|
||||
private:
|
||||
ScoreFusionSpec _parsedSpec;
|
||||
};
|
||||
|
||||
2076
src/mongo/db/pipeline/lite_parsed_score_fusion_desugarer_test.cpp
Normal file
2076
src/mongo/db/pipeline/lite_parsed_score_fusion_desugarer_test.cpp
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,284 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/pipeline/lite_parsed_score_fusion_desugarer_utils.h"
|
||||
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/db/pipeline/document_source_hybrid_scoring_util.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_hybrid_search_desugarer_utils.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
#include <utility>
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace mongo::lite_parsed_hybrid_search_desugarer::score_fusion_utils {
|
||||
|
||||
ScoreFusionScoringOptions::ScoreFusionScoringOptions(const ScoreFusionSpec& spec) {
|
||||
_normalizationMethod = spec.getInput().getNormalization();
|
||||
const auto& combination = spec.getCombination();
|
||||
ScoreFusionCombinationMethodEnum combinationMethod = ScoreFusionCombinationMethodEnum::kAvg;
|
||||
boost::optional<IDLAnyType> combinationExpression = boost::none;
|
||||
if (combination.has_value() && combination->getMethod().has_value()) {
|
||||
combinationMethod = combination->getMethod().get();
|
||||
uassert(12559406,
|
||||
"combination.expression should only be specified when combination.method "
|
||||
"has the value \"expression\"",
|
||||
(combinationMethod != ScoreFusionCombinationMethodEnum::kExpression &&
|
||||
!combination->getExpression().has_value()) ||
|
||||
(combinationMethod == ScoreFusionCombinationMethodEnum::kExpression &&
|
||||
combination->getExpression().has_value()));
|
||||
combinationExpression = combination->getExpression();
|
||||
uassert(12559407,
|
||||
"combination.expression and combination.weights cannot both be specified",
|
||||
!(combination->getWeights().has_value() && combinationExpression.has_value()));
|
||||
}
|
||||
_combinationMethod = std::move(combinationMethod);
|
||||
_combinationExpression = std::move(combinationExpression);
|
||||
}
|
||||
|
||||
StringData ScoreFusionScoringOptions::getNormalizationString() const {
|
||||
switch (_normalizationMethod) {
|
||||
case ScoreFusionNormalizationEnum::kSigmoid:
|
||||
return "sigmoid"_sd;
|
||||
case ScoreFusionNormalizationEnum::kMinMaxScaler:
|
||||
return "minMaxScaler"_sd;
|
||||
case ScoreFusionNormalizationEnum::kNone:
|
||||
return "none"_sd;
|
||||
}
|
||||
MONGO_UNREACHABLE_TASSERT(12559408);
|
||||
}
|
||||
|
||||
StringData ScoreFusionScoringOptions::getCombinationMethodString() const {
|
||||
switch (_combinationMethod) {
|
||||
case ScoreFusionCombinationMethodEnum::kExpression:
|
||||
return "custom expression"_sd;
|
||||
case ScoreFusionCombinationMethodEnum::kAvg:
|
||||
return "average"_sd;
|
||||
}
|
||||
MONGO_UNREACHABLE_TASSERT(12559409);
|
||||
}
|
||||
|
||||
BSONObj buildScoreAddFieldsBson(StringData inputPipelineName,
|
||||
ScoreFusionNormalizationEnum normalization,
|
||||
double weight) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder addFieldsBob(bob.subobjStart("$addFields"_sd));
|
||||
const std::string scoreField = hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_score", inputPipelineName));
|
||||
BSONObjBuilder scoreBob(addFieldsBob.subobjStart(scoreField));
|
||||
BSONArrayBuilder multArr(scoreBob.subarrayStart("$multiply"_sd));
|
||||
BSONObj scorePath = BSON("$meta" << "score");
|
||||
switch (normalization) {
|
||||
case ScoreFusionNormalizationEnum::kSigmoid:
|
||||
multArr.append(BSON("$sigmoid" << scorePath));
|
||||
break;
|
||||
case ScoreFusionNormalizationEnum::kMinMaxScaler:
|
||||
case ScoreFusionNormalizationEnum::kNone:
|
||||
multArr.append(scorePath);
|
||||
break;
|
||||
}
|
||||
multArr.append(weight);
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildRawScoreAddFieldsBson(StringData inputPipelineName) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder addFieldsBob(bob.subobjStart("$addFields"_sd));
|
||||
const std::string rawScoreField = hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_rawScore", inputPipelineName));
|
||||
addFieldsBob.append(rawScoreField, BSON("$meta" << "score"));
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildAddInputPipelineScoreDetailsBson(StringData inputPipelineName,
|
||||
bool inputGeneratesScoreDetails) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder addFieldsBob(bob.subobjStart("$addFields"_sd));
|
||||
const std::string scoreDetailsField =
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, fmt::format("{}_scoreDetails", inputPipelineName));
|
||||
if (inputGeneratesScoreDetails) {
|
||||
addFieldsBob.append(scoreDetailsField,
|
||||
BSON("details" << BSON("$meta" << "scoreDetails")));
|
||||
} else {
|
||||
addFieldsBob.append(scoreDetailsField, BSON("details" << BSONArrayBuilder().arr()));
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildMinMaxScalerSetWindowFieldsBson(StringData inputPipelineName) {
|
||||
const std::string internalFieldsScore =
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName,
|
||||
hybrid_scoring_util::getScoreFieldFromPipelineName(inputPipelineName));
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder swfBob(bob.subobjStart("$_internalSetWindowFields"_sd));
|
||||
swfBob.append("sortBy", BSON(internalFieldsScore << -1));
|
||||
BSONObjBuilder outputBob(swfBob.subobjStart("output"_sd));
|
||||
outputBob.append(internalFieldsScore,
|
||||
BSON("$minMaxScaler" << BSON("input" << ("$" + internalFieldsScore))));
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildSetFinalCombinedScoreBson(const std::vector<std::string>& pipelineNames,
|
||||
const ScoreFusionScoringOptions& scoringOptions) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder smBob(bob.subobjStart("$setMetadata"_sd));
|
||||
BSONObjBuilder scoreBob(smBob.subobjStart("score"_sd));
|
||||
switch (scoringOptions.getCombinationMethod()) {
|
||||
case ScoreFusionCombinationMethodEnum::kExpression: {
|
||||
BSONObjBuilder letBob(scoreBob.subobjStart("$let"_sd));
|
||||
BSONObjBuilder varsBob(letBob.subobjStart("vars"_sd));
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
const std::string scoreField =
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName,
|
||||
hybrid_scoring_util::getScoreFieldFromPipelineName(pipelineName));
|
||||
varsBob.append(pipelineName, fmt::format("${}", scoreField));
|
||||
}
|
||||
varsBob.done();
|
||||
scoringOptions.getCombinationExpression()->serializeToBSON("in", &letBob);
|
||||
letBob.done();
|
||||
break;
|
||||
}
|
||||
case ScoreFusionCombinationMethodEnum::kAvg: {
|
||||
BSONArrayBuilder avgArr(scoreBob.subarrayStart("$avg"_sd));
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
avgArr.append(fmt::format(
|
||||
"${}",
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName,
|
||||
hybrid_scoring_util::getScoreFieldFromPipelineName(pipelineName))));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildCalculatedFinalScoreDetailsBson(const std::vector<std::string>& pipelineNames,
|
||||
const StringMap<double>& weights) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder addFieldsBob(bob.subobjStart("$addFields"_sd));
|
||||
BSONObjBuilder internalFieldsBob(addFieldsBob.subobjStart(kInternalFieldsName));
|
||||
BSONArrayBuilder calcArr(internalFieldsBob.subarrayStart("calculatedScoreDetails"_sd));
|
||||
for (const auto& pipelineName : pipelineNames) {
|
||||
const std::string internalFieldsPipelineName =
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(kInternalFieldsName,
|
||||
pipelineName);
|
||||
double weight = hybrid_scoring_util::getPipelineWeight(weights, pipelineName);
|
||||
|
||||
BSONObjBuilder mergeSub;
|
||||
mergeSub.append("inputPipelineName"_sd, pipelineName);
|
||||
mergeSub.append("inputPipelineRawScore"_sd,
|
||||
fmt::format("${}_rawScore", internalFieldsPipelineName));
|
||||
mergeSub.append("weight"_sd, weight);
|
||||
mergeSub.append("value"_sd, fmt::format("${}_score", internalFieldsPipelineName));
|
||||
|
||||
BSONArrayBuilder mergeArr;
|
||||
mergeArr.append(mergeSub.obj());
|
||||
mergeArr.append(fmt::format("${}.{}_scoreDetails", kInternalFieldsName, pipelineName));
|
||||
calcArr.append(BSON("$mergeObjects" << mergeArr.arr()));
|
||||
}
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
BSONObj buildSetMetadataScoreDetailsBson(const ScoreFusionScoringOptions& scoringOptions) {
|
||||
BSONObjBuilder bob;
|
||||
{
|
||||
BSONObjBuilder smBob(bob.subobjStart("$setMetadata"_sd));
|
||||
BSONObjBuilder sdBob(smBob.subobjStart("scoreDetails"_sd));
|
||||
sdBob.append("value", BSON("$meta" << "score"));
|
||||
sdBob.append("description", kScoreDetailsDescription);
|
||||
sdBob.append("normalization", scoringOptions.getNormalizationString());
|
||||
BSONObjBuilder combinationBob(sdBob.subobjStart("combination"_sd));
|
||||
combinationBob.append("method", scoringOptions.getCombinationMethodString());
|
||||
if (scoringOptions.getCombinationMethod() ==
|
||||
ScoreFusionCombinationMethodEnum::kExpression) {
|
||||
combinationBob.append("expression",
|
||||
hybrid_scoring_util::score_details::stringifyExpression(
|
||||
scoringOptions.getCombinationExpression()));
|
||||
}
|
||||
combinationBob.done();
|
||||
sdBob.append("details",
|
||||
fmt::format("${}",
|
||||
hybrid_scoring_util::applyInternalFieldPrefixToFieldName(
|
||||
kInternalFieldsName, "calculatedScoreDetails")));
|
||||
}
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
StageSpecs buildScoreFusionInputPipelinePreamble(const NamespaceString& nss,
|
||||
const LiteParsedPipeline& subPipeline,
|
||||
const std::string& pipelineName,
|
||||
double weight,
|
||||
ScoreFusionNormalizationEnum normalization,
|
||||
bool includeScoreDetails) {
|
||||
StageSpecs out;
|
||||
out.reserve(subPipeline.getStages().size() + 5);
|
||||
for (const auto& stage : subPipeline.getStages()) {
|
||||
out.push_back(stage->clone());
|
||||
}
|
||||
|
||||
const bool inputGeneratesScoreDetails = subPipeline.isScoreDetailsPipeline();
|
||||
|
||||
out.push_back(
|
||||
common_utils::parseOwnedStage(nss, common_utils::buildReplaceRootBson(kDocsName)));
|
||||
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, buildScoreAddFieldsBson(pipelineName, normalization, weight)));
|
||||
|
||||
if (includeScoreDetails) {
|
||||
out.push_back(common_utils::parseOwnedStage(nss, buildRawScoreAddFieldsBson(pipelineName)));
|
||||
out.push_back(common_utils::parseOwnedStage(
|
||||
nss, buildAddInputPipelineScoreDetailsBson(pipelineName, inputGeneratesScoreDetails)));
|
||||
}
|
||||
|
||||
if (normalization == ScoreFusionNormalizationEnum::kMinMaxScaler) {
|
||||
out.push_back(
|
||||
common_utils::parseOwnedStage(nss, buildMinMaxScalerSetWindowFieldsBson(pipelineName)));
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
} // namespace mongo::lite_parsed_hybrid_search_desugarer::score_fusion_utils
|
||||
145
src/mongo/db/pipeline/lite_parsed_score_fusion_desugarer_utils.h
Normal file
145
src/mongo/db/pipeline/lite_parsed_score_fusion_desugarer_utils.h
Normal file
@ -0,0 +1,145 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/pipeline/document_source_score_fusion_gen.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
|
||||
#include "mongo/db/pipeline/score_fusion_pipeline_builder.h"
|
||||
#include "mongo/util/string_map.h"
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
namespace mongo::lite_parsed_hybrid_search_desugarer::score_fusion_utils {
|
||||
|
||||
inline constexpr StringData kInternalFieldsName =
|
||||
ScoreFusionPipelineBuilder::kScoreFusionInternalFieldsName;
|
||||
inline constexpr StringData kDocsName = ScoreFusionPipelineBuilder::kScoreFusionDocsFieldName;
|
||||
inline constexpr StringData kScoreDetailsDescription =
|
||||
ScoreFusionPipelineBuilder::kScoreFusionScoreDetailsDescription;
|
||||
|
||||
// Per-pipeline scoreDetails scalar field suffix used in the desugared $group output (and the
|
||||
// matching $replaceRoot wrapper). For $scoreFusion the per-pipeline scalar is "<p>_rawScore".
|
||||
inline constexpr StringData kDetailsScalarSuffix = "_rawScore"_sd;
|
||||
|
||||
// Validation/translation of normalization + combination spec.
|
||||
class ScoreFusionScoringOptions {
|
||||
public:
|
||||
explicit ScoreFusionScoringOptions(const ScoreFusionSpec& spec);
|
||||
|
||||
ScoreFusionNormalizationEnum getNormalizationMethod() const {
|
||||
return _normalizationMethod;
|
||||
}
|
||||
|
||||
StringData getNormalizationString() const;
|
||||
|
||||
ScoreFusionCombinationMethodEnum getCombinationMethod() const {
|
||||
return _combinationMethod;
|
||||
}
|
||||
|
||||
StringData getCombinationMethodString() const;
|
||||
|
||||
const boost::optional<IDLAnyType>& getCombinationExpression() const {
|
||||
return _combinationExpression;
|
||||
}
|
||||
|
||||
private:
|
||||
ScoreFusionNormalizationEnum _normalizationMethod;
|
||||
ScoreFusionCombinationMethodEnum _combinationMethod;
|
||||
boost::optional<IDLAnyType> _combinationExpression;
|
||||
};
|
||||
|
||||
// {$addFields: {<INTERNAL_FIELDS>.<p>_score: {$multiply: [<scoreOrNorm>, <weight>]}}}
|
||||
// where <scoreOrNorm> is determined by the input.normalization:
|
||||
// - none / minMaxScaler: {$meta: "score"}
|
||||
// - sigmoid: {$sigmoid: {$meta: "score"}}
|
||||
BSONObj buildScoreAddFieldsBson(StringData inputPipelineName,
|
||||
ScoreFusionNormalizationEnum normalization,
|
||||
double weight);
|
||||
|
||||
// {$addFields: {<INTERNAL_FIELDS>.<p>_rawScore: {$meta: "score"}}}
|
||||
BSONObj buildRawScoreAddFieldsBson(StringData inputPipelineName);
|
||||
|
||||
// {$addFields: {<INTERNAL_FIELDS>.<p>_scoreDetails: ...}} -- two branches:
|
||||
// - inputGeneratesScoreDetails: { details: {$meta: "scoreDetails"} }
|
||||
// - else : { details: [] }
|
||||
// Mirrors `addInputPipelineScoreDetails` in score_fusion_pipeline_builder.cpp.
|
||||
BSONObj buildAddInputPipelineScoreDetailsBson(StringData inputPipelineName,
|
||||
bool inputGeneratesScoreDetails);
|
||||
|
||||
// {$_internalSetWindowFields: {sortBy: {<INTERNAL_FIELDS>.<p>_score: -1},
|
||||
// output: {<INTERNAL_FIELDS>.<p>_score:
|
||||
// {$minMaxScaler: {input:
|
||||
// "$<INTERNAL_FIELDS>.<p>_score"}}}}}
|
||||
BSONObj buildMinMaxScalerSetWindowFieldsBson(StringData inputPipelineName);
|
||||
|
||||
// {$setMetadata: {score: {$avg: ["$<INTERNAL_FIELDS>.<p>_score", ...]}}} (avg branch)
|
||||
// or
|
||||
// {$setMetadata: {score: {$let: {vars: {<p>: "$<INTERNAL_FIELDS>.<p>_score", ...},
|
||||
// in: <user combination.expression>}}}} (expression branch).
|
||||
//
|
||||
// In the expression branch, the user's combination.expression is forwarded as raw BSON. Full
|
||||
// parse will validate the expression via ExpressionLet::parse.
|
||||
BSONObj buildSetFinalCombinedScoreBson(const std::vector<std::string>& pipelineNames,
|
||||
const ScoreFusionScoringOptions& scoringOptions);
|
||||
|
||||
// {$addFields: {<INTERNAL_FIELDS>: {calculatedScoreDetails: [
|
||||
// {$mergeObjects: [{inputPipelineName: "<p>",
|
||||
// inputPipelineRawScore: "$<INTERNAL_FIELDS>.<p>_rawScore",
|
||||
// weight: <w>,
|
||||
// value: "$<INTERNAL_FIELDS>.<p>_score"},
|
||||
// "$<INTERNAL_FIELDS>.<p>_scoreDetails"]},
|
||||
// ...]}}}
|
||||
BSONObj buildCalculatedFinalScoreDetailsBson(const std::vector<std::string>& pipelineNames,
|
||||
const StringMap<double>& weights);
|
||||
|
||||
// {$setMetadata: {scoreDetails: {value: {$meta: "score"}, description: ...,
|
||||
// normalization: ..., combination: {...}, details: ...}}}
|
||||
BSONObj buildSetMetadataScoreDetailsBson(const ScoreFusionScoringOptions& scoringOptions);
|
||||
|
||||
// Builds the per-input-pipeline preamble for $scoreFusion: a clone of the input subpipeline's
|
||||
// LPDSs, followed by:
|
||||
// $replaceWith ({<INTERNAL_DOCS>: "$$ROOT"})
|
||||
// $addFields (per-pipeline weighted, optionally normalized, score)
|
||||
// [optional] $addFields rawScore + $addFields scoreDetails
|
||||
// [optional] $_internalSetWindowFields (minMaxScaler normalization)
|
||||
StageSpecs buildScoreFusionInputPipelinePreamble(const NamespaceString& nss,
|
||||
const LiteParsedPipeline& subPipeline,
|
||||
const std::string& pipelineName,
|
||||
double weight,
|
||||
ScoreFusionNormalizationEnum normalization,
|
||||
bool includeScoreDetails);
|
||||
|
||||
} // namespace mongo::lite_parsed_hybrid_search_desugarer::score_fusion_utils
|
||||
@ -68,6 +68,11 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
// $search produces scoreDetails metadata when the user requests it via the mongotQuery.
|
||||
bool isScoreDetailsStage() const final {
|
||||
return hasScoreDetails();
|
||||
}
|
||||
|
||||
// $search is not a selection stage when returnStoredSource is true since it might have an
|
||||
// implicit projection applied.
|
||||
bool isSelectionStage() const final {
|
||||
|
||||
@ -35,6 +35,7 @@
|
||||
namespace mongo {
|
||||
|
||||
static constexpr StringData kReturnStoredSourceFieldName = "returnStoredSource"_sd;
|
||||
static constexpr StringData kScoreDetailsFieldName = "scoreDetails"_sd;
|
||||
|
||||
/**
|
||||
* A 'LiteParsed' representation of a search stage. This is the parent class for the
|
||||
@ -97,6 +98,20 @@ protected:
|
||||
return false;
|
||||
}
|
||||
|
||||
// Returns true if the stage spec has scoreDetails: true inside the mongotQuery.
|
||||
bool hasScoreDetails() const {
|
||||
if (this->_originalBson.type() == BSONType::object) {
|
||||
auto specObj = this->_originalBson.Obj();
|
||||
// SearchLiteParsed's BSON shape mirrors DocumentSourceSearch::hasScoreDetails():
|
||||
// top-level $search spec is the mongotQuery, so 'scoreDetails' lives directly on it.
|
||||
if (specObj.hasField(kScoreDetailsFieldName)) {
|
||||
auto sd = specObj[kScoreDetailsFieldName];
|
||||
return sd.isBoolean() && sd.boolean();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
const NamespaceString _nss;
|
||||
};
|
||||
|
||||
Loading…
Reference in New Issue
Block a user