SERVER-120068 Support streamType stage constraint for extensions (#49012)

GitOrigin-RevId: cfdc1e7c477dddc6873f667e1dd88b589af27934
This commit is contained in:
Daniel Segel 2026-03-04 12:46:09 -08:00 committed by MongoDB Bot
parent 03edbc1501
commit af77af452f
4 changed files with 37 additions and 11 deletions

View File

@ -272,15 +272,16 @@ Value DocumentSourceExtensionOptimizable::serialize(const SerializationOptions&
StageConstraints DocumentSourceExtensionOptimizable::constraints(
PipelineSplitState pipeState) const {
auto constraints = StageConstraints(StreamType::kStreaming,
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
LookupRequirement::kNotAllowed,
UnionRequirement::kAllowed,
ChangeStreamRequirement::kDenylist);
auto constraints =
StageConstraints(static_properties_util::toStreamType(_properties.getStreamType()),
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
LookupRequirement::kNotAllowed,
UnionRequirement::kAllowed,
ChangeStreamRequirement::kDenylist);
constraints.canRunOnTimeseries = false;
// Apply potential overrides from static properties.

View File

@ -1993,6 +1993,7 @@ TEST_F(DocumentSourceExtensionOptimizableTest, StageWithDefaultStaticProperties)
ASSERT_TRUE(staticProperties.getRequiresInputDocSource());
ASSERT_EQ(staticProperties.getPosition(), MongoExtensionPositionRequirementEnum::kNone);
ASSERT_EQ(staticProperties.getHostType(), MongoExtensionHostTypeRequirementEnum::kNone);
ASSERT_EQ(staticProperties.getStreamType(), MongoExtensionStreamTypeEnum::kStreaming);
ASSERT_TRUE(staticProperties.getPreservesUpstreamMetadata());
ASSERT_FALSE(staticProperties.getRequiredMetadataFields().has_value());
ASSERT_FALSE(staticProperties.getProvidedMetadataFields().has_value());
@ -2006,6 +2007,7 @@ TEST_F(DocumentSourceExtensionOptimizableTest, StageWithDefaultStaticProperties)
auto constraints = optimizable->constraints(PipelineSplitState::kUnsplit);
ASSERT_EQ(constraints.streamType, StageConstraints::StreamType::kStreaming);
ASSERT_EQ(constraints.requiredPosition, StageConstraints::PositionRequirement::kNone);
ASSERT_EQ(constraints.hostRequirement, StageConstraints::HostTypeRequirement::kNone);
ASSERT_TRUE(constraints.requiresInputDocSource);
@ -2130,8 +2132,8 @@ TEST_F(DocumentSourceExtensionOptimizableTest,
}
TEST_F(DocumentSourceExtensionOptimizableTest, StageWithNonDefaultSubPipelineStaticProperties) {
auto properties = BSON("allowedInUnionWith" << false << "allowedInLookup" << false
<< "allowedInFacet" << false);
auto properties = BSON("streamType" << "blocking" << "allowedInUnionWith" << false
<< "allowedInLookup" << false << "allowedInFacet" << false);
auto astNode = new sdk::ExtensionAggStageAstNodeAdapter(
std::make_unique<sdk::shared_test_stages::CustomPropertiesAstNode>(properties));
auto astHandle = AggStageAstNodeHandle(astNode);
@ -2145,6 +2147,7 @@ TEST_F(DocumentSourceExtensionOptimizableTest, StageWithNonDefaultSubPipelineSta
ASSERT_FALSE(staticProperties.getAllowedInFacet());
auto constraints = optimizable->constraints(PipelineSplitState::kUnsplit);
ASSERT_EQ(constraints.streamType, StageConstraints::StreamType::kBlocking);
ASSERT_EQ(constraints.unionRequirement, StageConstraints::UnionRequirement::kNotAllowed);
ASSERT_EQ(constraints.lookupRequirement, StageConstraints::LookupRequirement::kNotAllowed);
ASSERT_EQ(constraints.facetRequirement, StageConstraints::FacetRequirement::kNotAllowed);

View File

@ -59,6 +59,16 @@ inline ActionType toActionType(const MongoExtensionPrivilegeActionEnum& action)
}
}
inline StageConstraints::StreamType toStreamType(MongoExtensionStreamTypeEnum streamType) {
switch (streamType) {
case MongoExtensionStreamTypeEnum::kStreaming:
return StageConstraints::StreamType::kStreaming;
case MongoExtensionStreamTypeEnum::kBlocking:
return StageConstraints::StreamType::kBlocking;
}
MONGO_UNREACHABLE_TASSERT(12006800);
}
inline boost::optional<StageConstraints::PositionRequirement> toPositionRequirement(
MongoExtensionPositionRequirementEnum pos) {
switch (pos) {

View File

@ -55,6 +55,13 @@ enums:
kAnyShard: "anyShard"
kRouter: "router"
kAllShardHosts: "allShardHosts"
MongoExtensionStreamType:
description: >-
Whether this stage is streaming or blocking.
type: string
values:
kStreaming: "streaming"
kBlocking: "blocking"
MongoExtensionPrivilegeResourcePattern:
description: What resource the privilege applies to.
type: string
@ -98,6 +105,11 @@ structs:
resources it may require, etc.)
strict: false
fields:
streamType:
description: >-
Whether this stage is streaming or blocking.
type: MongoExtensionStreamType
default: kStreaming
position:
description: What position the stage must occupy in the pipeline.
type: MongoExtensionPositionRequirement