diff --git a/jstests/noPassthrough/extensions/aggregation_ifr_flag_propagation.js b/jstests/noPassthrough/extensions/aggregation_ifr_flag_propagation.js index ad8b8971e40..59af3c4157f 100644 --- a/jstests/noPassthrough/extensions/aggregation_ifr_flag_propagation.js +++ b/jstests/noPassthrough/extensions/aggregation_ifr_flag_propagation.js @@ -21,53 +21,127 @@ const options = { }; const pipeline = [{$vectorSearch: {}}]; +const testData = [ + {_id: 0, vector: [1, 2, 3, 4], text: "poppi cans"}, + {_id: 1, vector: [0, 2, 4, 6], text: "homegrown tomatoes"}, + {_id: 2, vector: [3, 6, 9, 16], text: "crispy rice puffs"}, +]; -function runIFRFlagPropagationTests(conn, shardingTest = null) { +function setupTestCollection(conn, shardingTest) { const adminDb = conn.getDB("admin"); const db = conn.getDB("test"); const coll = db[jsTestName()]; coll.drop(); - const testData = [ - {_id: 0, vector: [1, 2, 3, 4], text: "poppi cans"}, - {_id: 1, vector: [0, 2, 4, 6], text: "homegrown tomatoes"}, - {_id: 2, vector: [3, 6, 9, 16], text: "crispy rice puffs"}, - ]; assert.commandWorked(coll.insertMany(testData)); - if (shardingTest) { - shardingTest.shardColl(coll, {_id: 1}); - } - // Test 1: Verify IFR flag value propagated from router to shard. - // Router has flag=true, request from router - shards should commit to true. - assert.commandWorked(adminDb.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true})); + const dbName = db.getName(); + assert.commandWorked(adminDb.runCommand({enableSharding: dbName})); + assert.commandWorked( + adminDb.runCommand({ + shardCollection: coll.getFullName(), + key: {_id: 1}, + }), + ); + + return {adminDb, db, coll}; +} + +function getShardAdmins(shardingTest) { const shard0Admin = shardingTest.rs0.getPrimary().getDB("admin"); - const shard1Admin = shardingTest.rs1.getPrimary().getDB("admin"); - assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false})); - assert.commandWorked(shard1Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false})); + const shard1Admin = shardingTest.rs1 ? shardingTest.rs1.getPrimary().getDB("admin") : null; + return {shard0Admin, shard1Admin}; +} + +function setFlags(adminDb, shard0Admin, shard1Admin, routerFlag, shardFlag) { + assert.commandWorked(adminDb.runCommand({setParameter: 1, featureFlagVectorSearchExtension: routerFlag})); + if (shard0Admin) { + assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: shardFlag})); + } + if (shard1Admin) { + assert.commandWorked(shard1Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: shardFlag})); + } +} + +function runIFRFlagPropagationTests(conn, shardingTest) { + const {adminDb, db, coll} = setupTestCollection(conn, shardingTest); + const {shard0Admin, shard1Admin} = getShardAdmins(shardingTest); + + // Test 1: Router flag=true propagates to shards (shards commit to true) + setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ true, /* shardFlag */ false); assertArrayEq({ actual: coll.aggregate(pipeline).toArray(), expected: testData, }); - // Test 2: Direct request to shard - uses shard's flag value. - // Shard has flag = false, so legacy implementation should be used. - assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false})); - const shard0Coll = shardingTest.rs0.getPrimary().getDB("test")[jsTestName()]; - assert.throwsWithCode(() => shard0Coll.aggregate(pipeline).toArray(), ErrorCodes.SearchNotEnabled); - // Set shard flag to true. - assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true})); - assert(shard0Coll.aggregate(pipeline).toArray().length > 0); + // Test 2: Direct shard request uses shard's flag value + if (shard0Admin) { + assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false})); + const shard0Coll = shardingTest.rs0.getPrimary().getDB("test")[coll.getName()]; + assert.throwsWithCode(() => shard0Coll.aggregate(pipeline).toArray(), ErrorCodes.SearchNotEnabled); + assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true})); + const shard0Result = shard0Coll.aggregate(pipeline).toArray(); + if (shard0Result.length === 0 && shard1Admin) { + // If shard0 has no data, check shard1 instead + assert.commandWorked(shard1Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true})); + const shard1Coll = shardingTest.rs1.getPrimary().getDB("test")[coll.getName()]; + const shard1Result = shard1Coll.aggregate(pipeline).toArray(); + assert(shard1Result.length > 0); + } else { + assert(shard0Result.length > 0); + } + } - // Test 3: Router has flag=false, request from router - shards should commit to false. - assert.commandWorked(adminDb.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false})); - assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true})); - assert.commandWorked(shard1Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true})); + // Test 3: Router flag=false propagates to shards (shards commit to false) + setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ false, /* shardFlag */ true); assert.throwsWithCode(() => coll.aggregate(pipeline).toArray(), ErrorCodes.SearchNotEnabled); + + // Test 4: Explain propagates router flag=true to shards + setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ true, /* shardFlag */ false); + const explainResult = coll.explain().aggregate(pipeline); + assert.commandWorked(explainResult); + + // Test 5: Explain propagates router flag=false to shards + setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ false, /* shardFlag */ true); + assert.throwsWithCode(() => coll.explain().aggregate(pipeline), ErrorCodes.SearchNotEnabled); + + // Setup $unionWith collection + const otherCollName = jsTestName() + "_other"; + const otherColl = db[otherCollName]; + otherColl.drop(); + assert.commandWorked(otherColl.insertMany(testData)); + const dbName = db.getName(); + assert.commandWorked(adminDb.runCommand({enableSharding: dbName})); + assert.commandWorked( + adminDb.runCommand({ + shardCollection: otherColl.getFullName(), + key: {_id: 1}, + }), + ); + const unionPipeline = [{$unionWith: {coll: otherCollName, pipeline: pipeline}}]; + + // Test 6: $unionWith propagates router flag=true to shards + setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ true, /* shardFlag */ false); + assertArrayEq({ + actual: coll.aggregate(unionPipeline).toArray(), + expected: testData.concat(testData), + }); + + // Test 7: $unionWith propagates router flag=false to shards + setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ false, /* shardFlag */ true); + assert.throwsWithCode(() => coll.aggregate(unionPipeline).toArray(), ErrorCodes.SearchNotEnabled); + + // Test 8: $unionWith explain propagates router flag=true to shards + setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ true, /* shardFlag */ false); + const unionExplainResult = coll.explain().aggregate(unionPipeline); + assert.commandWorked(unionExplainResult); + + // Test 9: $unionWith explain propagates router flag=false to shards + setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ false, /* shardFlag */ true); + assert.throwsWithCode(() => coll.explain().aggregate(unionPipeline), ErrorCodes.SearchNotEnabled); } try { - // Test IFR flag propagation in a sharded cluster. - const shardingTest = new ShardingTest({ + const multiShardTest = new ShardingTest({ shards: 2, rs: {nodes: 2}, mongos: 1, @@ -76,8 +150,20 @@ try { configOptions: options, rsOptions: options, }); - runIFRFlagPropagationTests(shardingTest.s, shardingTest); - shardingTest.stop(); + runIFRFlagPropagationTests(multiShardTest.s, multiShardTest); + multiShardTest.stop(); + + const singleShardTest = new ShardingTest({ + shards: 1, + rs: {nodes: 2}, + mongos: 1, + config: 1, + mongosOptions: options, + configOptions: options, + rsOptions: options, + }); + runIFRFlagPropagationTests(singleShardTest.s, singleShardTest); + singleShardTest.stop(); } finally { deleteExtensionConfigs(extensionNames); } diff --git a/src/mongo/db/commands/query_cmd/pipeline_command.cpp b/src/mongo/db/commands/query_cmd/pipeline_command.cpp index 9c372acafdb..adf8c2ec644 100644 --- a/src/mongo/db/commands/query_cmd/pipeline_command.cpp +++ b/src/mongo/db/commands/query_cmd/pipeline_command.cpp @@ -177,31 +177,10 @@ public: _aggregationRequest(std::move(aggregationRequest)), _ifrContext([&]() { const auto& requestFlagValues = _aggregationRequest.getIfrFlags(); - auto ifrContext = requestFlagValues.has_value() + return requestFlagValues.has_value() ? std::make_shared( requestFlagValues.value()) : std::make_shared(); - - // If there is no value for the feature flag passed on the request, that either - // means we have a direct user request (so we should check the node's flag value), - // or it means we have a request from a router where the value of the flag is - // false (so we should also commit to flag value false). - // TODO SERVER-116472 Remove this when the router sends all IFRContext flags, - // regardless of flag value. - const auto& vectorSearchFlagName = - feature_flags::gFeatureFlagVectorSearchExtension.getName(); - bool hasVectorSearchFlag = requestFlagValues.has_value() && - std::any_of(requestFlagValues->begin(), - requestFlagValues->end(), - [&](const BSONObj& obj) { - return obj["name"].valueStringData() == vectorSearchFlagName; - }); - const bool isComingFromRouter = - aggregation_request_helper::getFromRouter(_aggregationRequest); - if (!hasVectorSearchFlag && isComingFromRouter) { - ifrContext->disableFlag(feature_flags::gFeatureFlagVectorSearchExtension); - } - return ifrContext; }()), _extensionMetrics( static_cast(cmd)->getExtensionMetricsAllocation()), diff --git a/src/mongo/db/pipeline/aggregation_request_helper.cpp b/src/mongo/db/pipeline/aggregation_request_helper.cpp index 0cffa1684f4..538c6d7738c 100644 --- a/src/mongo/db/pipeline/aggregation_request_helper.cpp +++ b/src/mongo/db/pipeline/aggregation_request_helper.cpp @@ -109,6 +109,21 @@ void addQuerySettingsToRequest(AggregateCommandRequest& request, } } +void addIfrFlagsToRequest(AggregateCommandRequest& request, + std::shared_ptr ifrContext) { + tassert(11565104, "IFRContext cannot be null", ifrContext); + + // If the featureFlagVectorSearchExtension IFR flag is enabled, all nodes are upgraded and can + // parse IFR flags. + // TODO SERVER-117721 Remove FCV gate once multiversion testing can handle IFR flags. + if (serverGlobalParams.featureCompatibility.acquireFCVSnapshot().isGreaterThanOrEqualTo( + multiversion::FeatureCompatibilityVersion::kVersion_8_3)) { // NOLINT + // TODO SERVER-116219 Expand IFR flag serialization beyond $vectorSearch. + request.setIfrFlags( + ifrContext->serializeFlagValues({&feature_flags::gFeatureFlagVectorSearchExtension})); + } +} + void validate(const AggregateCommandRequest& aggregate, const BSONObj& cmdObj, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/aggregation_request_helper.h b/src/mongo/db/pipeline/aggregation_request_helper.h index b3aea0e5555..6a67ee057a1 100644 --- a/src/mongo/db/pipeline/aggregation_request_helper.h +++ b/src/mongo/db/pipeline/aggregation_request_helper.h @@ -104,6 +104,16 @@ MONGO_MOD_PUBLIC StatusWith parseFromBSONForTests( void addQuerySettingsToRequest(AggregateCommandRequest& request, const boost::intrusive_ptr& expCtx); +/** + * Adds IFR flags from ifrContext to request. Always serializes the current flag value when + * dispatching to shards, so they use the same value as the router/sender. + * + * For now, we only serialize featureFlagVectorSearchExtension. + * TODO SERVER-116219: Expand IFR flag serialization beyond $vectorSearch. + */ +void addIfrFlagsToRequest(AggregateCommandRequest& request, + std::shared_ptr ifrContext); + /** * Validates if 'AggregateCommandRequest' specs complies with the current Client, which is required * for API versioning checks. Throws uassert in case of any failure. diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 5a58e9ff53b..a4f6e4d5b51 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -669,6 +669,7 @@ std::set getTargetedShardsForChangeStream( return getTargetedShardsForAllShardsRequest(expCtx->getOperationContext()); } + } // namespace std::set getTargetedShards(boost::intrusive_ptr expCtx, @@ -1615,19 +1616,9 @@ BSONObj finalizePipelineAndTargetShardsForExplain( } // Generate the command object for the targeted shards with the finalized pipeline. - auto rawStages = [&pipelineToTarget]() { - auto serialization = pipelineToTarget->serialize(); - std::vector stages; - stages.reserve(serialization.size()); - - for (const auto& stageObj : serialization) { - invariant(stageObj.getType() == BSONType::object); - stages.push_back(stageObj.getDocument().toBson()); - } - - return stages; - }(); - AggregateCommandRequest aggRequest(expCtx->getNamespaceString(), rawStages); + AggregateCommandRequest aggRequest(expCtx->getNamespaceString(), + pipelineToTarget->serializeToBson()); + aggregation_request_helper::addIfrFlagsToRequest(aggRequest, expCtx->getIfrContext()); LiteParsedPipeline liteParsedPipeline(aggRequest); PipelineDataSource pipelineDataSource = getPipelineDataSource(liteParsedPipeline); @@ -1943,21 +1934,9 @@ std::unique_ptr finalizeAndMaybePreparePipelineForExecution( "Preparing pipeline for execution", "pipeline"_attr = pipelineToTarget->serializeForLogging()); - auto aggRequest = AggregateCommandRequest(expCtx->getNamespaceString(), - pipelineToTarget->serializeToBson()); - - // Propagate IFR flags from the pipeline to the agg request. This is necessary - // because we are executing an entire pipeline here, so it will not be treated as a - // command coming from a router. - // We gate this behavior on cluster FCV in order to avoid sending a field from a new - // version shard to an old version shard that doesn't understand it. - if (serverGlobalParams.featureCompatibility.acquireFCVSnapshot().isGreaterThanOrEqualTo( - multiversion::FeatureCompatibilityVersion::kVersion_8_3)) { // NOLINT - if (auto ifrCtx = pipelineToTarget->getContext()->getIfrContext()) { - aggRequest.setIfrFlags(ifrCtx->serializeFlagValues( - {&feature_flags::gFeatureFlagVectorSearchExtension})); - } - } + AggregateCommandRequest aggRequest(expCtx->getNamespaceString(), + pipelineToTarget->serializeToBson()); + aggregation_request_helper::addIfrFlagsToRequest(aggRequest, expCtx->getIfrContext()); return targetShardsAndAddMergeCursorsWithRoutingCtx( expCtx, diff --git a/src/mongo/s/query/planner/cluster_aggregate.cpp b/src/mongo/s/query/planner/cluster_aggregate.cpp index f99aa05a597..92d989724a0 100644 --- a/src/mongo/s/query/planner/cluster_aggregate.cpp +++ b/src/mongo/s/query/planner/cluster_aggregate.cpp @@ -183,21 +183,7 @@ Document serializeForPassthrough(const boost::intrusive_ptr& } } - // If the featureFlagVectorSearchExtension IFR flag is enabled, all nodes are upgraded and can - // parse IFR flags. - // TODO SERVER-117721: Remove FCV gate once multiversion testing can handle IFR flags. - if (serverGlobalParams.featureCompatibility.acquireFCVSnapshot().isGreaterThanOrEqualTo( - multiversion::FeatureCompatibilityVersion::kVersion_8_3)) { // NOLINT - // TODO SERVER-116219: Expand IFR flag serialization beyond $vectorSearch. - auto ifrCtx = expCtx->getIfrContext(); - tassert(11565104, "IFRContext cannot be null", ifrCtx); - // TODO SERVER-116472 Send all feature flags on the IFRContext regardless of feature flag - // value. - if (ifrCtx->getSavedFlagValue(feature_flags::gFeatureFlagVectorSearchExtension)) { - req.setIfrFlags( - ifrCtx->serializeFlagValues({&feature_flags::gFeatureFlagVectorSearchExtension})); - } - } + aggregation_request_helper::addIfrFlagsToRequest(req, expCtx->getIfrContext()); auto cmdObj = isRawDataOperation(expCtx->getOperationContext()) && req.getNamespace() != executionNs