SERVER-126016 Implement ExecAggStageResultsAndMetadataSource (#53418)

GitOrigin-RevId: 5bfb514d73f5bde3a82c08d3b921338b8f02857d
This commit is contained in:
Joshua Siegel 2026-05-19 13:13:05 -04:00 committed by MongoDB Bot
parent 2531c48861
commit 7db3a1d00f
3 changed files with 157 additions and 0 deletions

View File

@ -28,6 +28,7 @@
*/
#pragma once
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/extension/public/api.h"
#include "mongo/db/extension/sdk/assert_util.h"
#include "mongo/db/extension/sdk/distributed_plan_logic.h"
@ -869,6 +870,46 @@ protected:
}
};
/**
* Base class for source stages that produce two logical streams: a document-result stream and a
* metadata-result stream. The advanced() helpers wrap BSON in the envelope expected by the host
* Exchange: { _streamType: <N>, payload: <doc> }.
*/
class ExecAggStageResultsAndMetadataSource : public ExecAggStageSource {
public:
/**
* Identifies which stream a produced document belongs to. Mirrors ::MongoExtensionStreamType
* from public/api.h.
*/
enum class StreamType : uint8_t {
kDocResult = ::MongoExtensionStreamType::kMongoExtensionStreamTypeDocResult,
kMetaResult = ::MongoExtensionStreamType::kMongoExtensionStreamTypeMetaResult,
};
ExtensionGetNextResult advanced(const BSONObj& payload, StreamType streamType) {
BSONObjBuilder envelopeBob;
envelopeBob.append("_streamType", static_cast<int>(streamType));
envelopeBob.append("payload", payload);
return ExtensionGetNextResult::advanced(ExtensionBSONObj::makeAsByteBuf(envelopeBob.obj()));
}
/**
* Certain stages (e.g. $searchScore) produce per-document metadata that require this overload.
*/
ExtensionGetNextResult advanced(const BSONObj& payload,
StreamType streamType,
const BSONObj& meta) {
BSONObjBuilder envelopeBob;
envelopeBob.append("_streamType", static_cast<int>(streamType));
envelopeBob.append("payload", payload);
return ExtensionGetNextResult::advanced(ExtensionBSONObj::makeAsByteBuf(envelopeBob.obj()),
ExtensionBSONObj::makeAsByteBuf(meta));
}
protected:
ExecAggStageResultsAndMetadataSource(std::string_view name) : ExecAggStageSource(name) {}
};
/**
* ExecAggStageTransform is an execution stage that operates on documents it receives from a
* predecessor source stage. It must be provided with a source stage before getNext() is called.

View File

@ -14,6 +14,7 @@ mongo_cc_unit_test(
"aggregation_stage_test.cpp",
"api_version_vector_to_span_test.cpp",
"catalog_context_test.cpp",
"exec_agg_stage_results_and_metadata_source_test.cpp",
"get_next_result_test.cpp",
"host_services_test.cpp",
"operation_metrics_test.cpp",

View File

@ -0,0 +1,115 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/db/extension/public/api.h"
#include "mongo/db/extension/sdk/aggregation_stage.h"
#include "mongo/db/extension/shared/get_next_result.h"
#include "mongo/unittest/unittest.h"
#include <optional>
namespace mongo::extension::sdk {
namespace {
/**
* Minimal concrete subclass of ExecAggStageResultsAndMetadataSource used for testing.
* Implements all required pure virtual methods with no-ops.
*/
class TestMultiStreamSource : public ExecAggStageResultsAndMetadataSource {
public:
TestMultiStreamSource() : ExecAggStageResultsAndMetadataSource("testMultiStreamSource") {}
ExtensionGetNextResult getNext(const QueryExecutionContextHandle&,
::MongoExtensionExecAggStage*) override {
return ExtensionGetNextResult::eof();
}
void open() override {}
void reopen() override {}
void close() override {}
BSONObj explain(const QueryExecutionContextHandle&,
::MongoExtensionExplainVerbosity) const override {
return BSONObj();
}
};
using StreamType = ExecAggStageResultsAndMetadataSource::StreamType;
void assertEnvelopedAdvanced(const ExtensionGetNextResult& result,
int streamTypeInt,
const BSONObj& expectedPayload,
std::optional<BSONObj> expectedMetadata = std::nullopt) {
ASSERT_EQ(result.code, extension::GetNextCode::kAdvanced);
ASSERT_TRUE(result.resultDocument.has_value());
ASSERT_EQ(result.resultMetadata.has_value(), expectedMetadata.has_value());
ASSERT_BSONOBJ_EQ(result.resultDocument->getUnownedBSONObj(),
BSON("_streamType" << streamTypeInt << "payload" << expectedPayload));
if (expectedMetadata) {
ASSERT_BSONOBJ_EQ(result.resultMetadata->getUnownedBSONObj(), *expectedMetadata);
}
}
TEST(ExecAggStageResultsAndMetadataSourceTest, Advanced_SingleOverload_DocResult_WrapsEnvelope) {
TestMultiStreamSource stage;
assertEnvelopedAdvanced(
stage.advanced(BSON("x" << 42), StreamType::kDocResult), 0, BSON("x" << 42));
}
TEST(ExecAggStageResultsAndMetadataSourceTest, Advanced_SingleOverload_MetaResult_WrapsEnvelope) {
TestMultiStreamSource stage;
assertEnvelopedAdvanced(
stage.advanced(BSON("x" << 42), StreamType::kMetaResult), 1, BSON("x" << 42));
}
TEST(ExecAggStageResultsAndMetadataSourceTest, Advanced_DualOverload_WrapsEnvelopeAndSetsMetadata) {
TestMultiStreamSource stage;
assertEnvelopedAdvanced(
stage.advanced(BSON("x" << 42), StreamType::kDocResult, BSON("$searchScore" << 0.9)),
0,
BSON("x" << 42),
BSON("$searchScore" << 0.9));
}
TEST(ExecAggStageResultsAndMetadataSourceTest,
Advanced_DualOverloadWithMetadata_WrapsEnvelopeAndSetsMetadata) {
TestMultiStreamSource stage;
assertEnvelopedAdvanced(
stage.advanced(BSON("x" << 42), StreamType::kMetaResult, BSON("$searchScore" << 0.9)),
1,
BSON("x" << 42),
BSON("$searchScore" << 0.9));
}
} // namespace
} // namespace mongo::extension::sdk