SERVER-119146 Serialize IFR flags in finalizePipelineAndTargetShardsForExplain (#47711)
GitOrigin-RevId: 813ed8b4bcd9d51b166c98b080685499938f9ad5
This commit is contained in:
parent
dffa5f381a
commit
784adb1029
@ -21,53 +21,127 @@ const options = {
|
||||
};
|
||||
|
||||
const pipeline = [{$vectorSearch: {}}];
|
||||
const testData = [
|
||||
{_id: 0, vector: [1, 2, 3, 4], text: "poppi cans"},
|
||||
{_id: 1, vector: [0, 2, 4, 6], text: "homegrown tomatoes"},
|
||||
{_id: 2, vector: [3, 6, 9, 16], text: "crispy rice puffs"},
|
||||
];
|
||||
|
||||
function runIFRFlagPropagationTests(conn, shardingTest = null) {
|
||||
function setupTestCollection(conn, shardingTest) {
|
||||
const adminDb = conn.getDB("admin");
|
||||
const db = conn.getDB("test");
|
||||
const coll = db[jsTestName()];
|
||||
coll.drop();
|
||||
const testData = [
|
||||
{_id: 0, vector: [1, 2, 3, 4], text: "poppi cans"},
|
||||
{_id: 1, vector: [0, 2, 4, 6], text: "homegrown tomatoes"},
|
||||
{_id: 2, vector: [3, 6, 9, 16], text: "crispy rice puffs"},
|
||||
];
|
||||
assert.commandWorked(coll.insertMany(testData));
|
||||
if (shardingTest) {
|
||||
shardingTest.shardColl(coll, {_id: 1});
|
||||
}
|
||||
|
||||
// Test 1: Verify IFR flag value propagated from router to shard.
|
||||
// Router has flag=true, request from router - shards should commit to true.
|
||||
assert.commandWorked(adminDb.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true}));
|
||||
const dbName = db.getName();
|
||||
assert.commandWorked(adminDb.runCommand({enableSharding: dbName}));
|
||||
assert.commandWorked(
|
||||
adminDb.runCommand({
|
||||
shardCollection: coll.getFullName(),
|
||||
key: {_id: 1},
|
||||
}),
|
||||
);
|
||||
|
||||
return {adminDb, db, coll};
|
||||
}
|
||||
|
||||
function getShardAdmins(shardingTest) {
|
||||
const shard0Admin = shardingTest.rs0.getPrimary().getDB("admin");
|
||||
const shard1Admin = shardingTest.rs1.getPrimary().getDB("admin");
|
||||
assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false}));
|
||||
assert.commandWorked(shard1Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false}));
|
||||
const shard1Admin = shardingTest.rs1 ? shardingTest.rs1.getPrimary().getDB("admin") : null;
|
||||
return {shard0Admin, shard1Admin};
|
||||
}
|
||||
|
||||
function setFlags(adminDb, shard0Admin, shard1Admin, routerFlag, shardFlag) {
|
||||
assert.commandWorked(adminDb.runCommand({setParameter: 1, featureFlagVectorSearchExtension: routerFlag}));
|
||||
if (shard0Admin) {
|
||||
assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: shardFlag}));
|
||||
}
|
||||
if (shard1Admin) {
|
||||
assert.commandWorked(shard1Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: shardFlag}));
|
||||
}
|
||||
}
|
||||
|
||||
function runIFRFlagPropagationTests(conn, shardingTest) {
|
||||
const {adminDb, db, coll} = setupTestCollection(conn, shardingTest);
|
||||
const {shard0Admin, shard1Admin} = getShardAdmins(shardingTest);
|
||||
|
||||
// Test 1: Router flag=true propagates to shards (shards commit to true)
|
||||
setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ true, /* shardFlag */ false);
|
||||
assertArrayEq({
|
||||
actual: coll.aggregate(pipeline).toArray(),
|
||||
expected: testData,
|
||||
});
|
||||
|
||||
// Test 2: Direct request to shard - uses shard's flag value.
|
||||
// Shard has flag = false, so legacy implementation should be used.
|
||||
assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false}));
|
||||
const shard0Coll = shardingTest.rs0.getPrimary().getDB("test")[jsTestName()];
|
||||
assert.throwsWithCode(() => shard0Coll.aggregate(pipeline).toArray(), ErrorCodes.SearchNotEnabled);
|
||||
// Set shard flag to true.
|
||||
assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true}));
|
||||
assert(shard0Coll.aggregate(pipeline).toArray().length > 0);
|
||||
// Test 2: Direct shard request uses shard's flag value
|
||||
if (shard0Admin) {
|
||||
assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false}));
|
||||
const shard0Coll = shardingTest.rs0.getPrimary().getDB("test")[coll.getName()];
|
||||
assert.throwsWithCode(() => shard0Coll.aggregate(pipeline).toArray(), ErrorCodes.SearchNotEnabled);
|
||||
assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true}));
|
||||
const shard0Result = shard0Coll.aggregate(pipeline).toArray();
|
||||
if (shard0Result.length === 0 && shard1Admin) {
|
||||
// If shard0 has no data, check shard1 instead
|
||||
assert.commandWorked(shard1Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true}));
|
||||
const shard1Coll = shardingTest.rs1.getPrimary().getDB("test")[coll.getName()];
|
||||
const shard1Result = shard1Coll.aggregate(pipeline).toArray();
|
||||
assert(shard1Result.length > 0);
|
||||
} else {
|
||||
assert(shard0Result.length > 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Test 3: Router has flag=false, request from router - shards should commit to false.
|
||||
assert.commandWorked(adminDb.runCommand({setParameter: 1, featureFlagVectorSearchExtension: false}));
|
||||
assert.commandWorked(shard0Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true}));
|
||||
assert.commandWorked(shard1Admin.runCommand({setParameter: 1, featureFlagVectorSearchExtension: true}));
|
||||
// Test 3: Router flag=false propagates to shards (shards commit to false)
|
||||
setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ false, /* shardFlag */ true);
|
||||
assert.throwsWithCode(() => coll.aggregate(pipeline).toArray(), ErrorCodes.SearchNotEnabled);
|
||||
|
||||
// Test 4: Explain propagates router flag=true to shards
|
||||
setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ true, /* shardFlag */ false);
|
||||
const explainResult = coll.explain().aggregate(pipeline);
|
||||
assert.commandWorked(explainResult);
|
||||
|
||||
// Test 5: Explain propagates router flag=false to shards
|
||||
setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ false, /* shardFlag */ true);
|
||||
assert.throwsWithCode(() => coll.explain().aggregate(pipeline), ErrorCodes.SearchNotEnabled);
|
||||
|
||||
// Setup $unionWith collection
|
||||
const otherCollName = jsTestName() + "_other";
|
||||
const otherColl = db[otherCollName];
|
||||
otherColl.drop();
|
||||
assert.commandWorked(otherColl.insertMany(testData));
|
||||
const dbName = db.getName();
|
||||
assert.commandWorked(adminDb.runCommand({enableSharding: dbName}));
|
||||
assert.commandWorked(
|
||||
adminDb.runCommand({
|
||||
shardCollection: otherColl.getFullName(),
|
||||
key: {_id: 1},
|
||||
}),
|
||||
);
|
||||
const unionPipeline = [{$unionWith: {coll: otherCollName, pipeline: pipeline}}];
|
||||
|
||||
// Test 6: $unionWith propagates router flag=true to shards
|
||||
setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ true, /* shardFlag */ false);
|
||||
assertArrayEq({
|
||||
actual: coll.aggregate(unionPipeline).toArray(),
|
||||
expected: testData.concat(testData),
|
||||
});
|
||||
|
||||
// Test 7: $unionWith propagates router flag=false to shards
|
||||
setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ false, /* shardFlag */ true);
|
||||
assert.throwsWithCode(() => coll.aggregate(unionPipeline).toArray(), ErrorCodes.SearchNotEnabled);
|
||||
|
||||
// Test 8: $unionWith explain propagates router flag=true to shards
|
||||
setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ true, /* shardFlag */ false);
|
||||
const unionExplainResult = coll.explain().aggregate(unionPipeline);
|
||||
assert.commandWorked(unionExplainResult);
|
||||
|
||||
// Test 9: $unionWith explain propagates router flag=false to shards
|
||||
setFlags(adminDb, shard0Admin, shard1Admin, /* routerFlag */ false, /* shardFlag */ true);
|
||||
assert.throwsWithCode(() => coll.explain().aggregate(unionPipeline), ErrorCodes.SearchNotEnabled);
|
||||
}
|
||||
|
||||
try {
|
||||
// Test IFR flag propagation in a sharded cluster.
|
||||
const shardingTest = new ShardingTest({
|
||||
const multiShardTest = new ShardingTest({
|
||||
shards: 2,
|
||||
rs: {nodes: 2},
|
||||
mongos: 1,
|
||||
@ -76,8 +150,20 @@ try {
|
||||
configOptions: options,
|
||||
rsOptions: options,
|
||||
});
|
||||
runIFRFlagPropagationTests(shardingTest.s, shardingTest);
|
||||
shardingTest.stop();
|
||||
runIFRFlagPropagationTests(multiShardTest.s, multiShardTest);
|
||||
multiShardTest.stop();
|
||||
|
||||
const singleShardTest = new ShardingTest({
|
||||
shards: 1,
|
||||
rs: {nodes: 2},
|
||||
mongos: 1,
|
||||
config: 1,
|
||||
mongosOptions: options,
|
||||
configOptions: options,
|
||||
rsOptions: options,
|
||||
});
|
||||
runIFRFlagPropagationTests(singleShardTest.s, singleShardTest);
|
||||
singleShardTest.stop();
|
||||
} finally {
|
||||
deleteExtensionConfigs(extensionNames);
|
||||
}
|
||||
|
||||
@ -177,31 +177,10 @@ public:
|
||||
_aggregationRequest(std::move(aggregationRequest)),
|
||||
_ifrContext([&]() {
|
||||
const auto& requestFlagValues = _aggregationRequest.getIfrFlags();
|
||||
auto ifrContext = requestFlagValues.has_value()
|
||||
return requestFlagValues.has_value()
|
||||
? std::make_shared<IncrementalFeatureRolloutContext>(
|
||||
requestFlagValues.value())
|
||||
: std::make_shared<IncrementalFeatureRolloutContext>();
|
||||
|
||||
// If there is no value for the feature flag passed on the request, that either
|
||||
// means we have a direct user request (so we should check the node's flag value),
|
||||
// or it means we have a request from a router where the value of the flag is
|
||||
// false (so we should also commit to flag value false).
|
||||
// TODO SERVER-116472 Remove this when the router sends all IFRContext flags,
|
||||
// regardless of flag value.
|
||||
const auto& vectorSearchFlagName =
|
||||
feature_flags::gFeatureFlagVectorSearchExtension.getName();
|
||||
bool hasVectorSearchFlag = requestFlagValues.has_value() &&
|
||||
std::any_of(requestFlagValues->begin(),
|
||||
requestFlagValues->end(),
|
||||
[&](const BSONObj& obj) {
|
||||
return obj["name"].valueStringData() == vectorSearchFlagName;
|
||||
});
|
||||
const bool isComingFromRouter =
|
||||
aggregation_request_helper::getFromRouter(_aggregationRequest);
|
||||
if (!hasVectorSearchFlag && isComingFromRouter) {
|
||||
ifrContext->disableFlag(feature_flags::gFeatureFlagVectorSearchExtension);
|
||||
}
|
||||
return ifrContext;
|
||||
}()),
|
||||
_extensionMetrics(
|
||||
static_cast<const PipelineCommand*>(cmd)->getExtensionMetricsAllocation()),
|
||||
|
||||
@ -109,6 +109,21 @@ void addQuerySettingsToRequest(AggregateCommandRequest& request,
|
||||
}
|
||||
}
|
||||
|
||||
void addIfrFlagsToRequest(AggregateCommandRequest& request,
|
||||
std::shared_ptr<IncrementalFeatureRolloutContext> ifrContext) {
|
||||
tassert(11565104, "IFRContext cannot be null", ifrContext);
|
||||
|
||||
// If the featureFlagVectorSearchExtension IFR flag is enabled, all nodes are upgraded and can
|
||||
// parse IFR flags.
|
||||
// TODO SERVER-117721 Remove FCV gate once multiversion testing can handle IFR flags.
|
||||
if (serverGlobalParams.featureCompatibility.acquireFCVSnapshot().isGreaterThanOrEqualTo(
|
||||
multiversion::FeatureCompatibilityVersion::kVersion_8_3)) { // NOLINT
|
||||
// TODO SERVER-116219 Expand IFR flag serialization beyond $vectorSearch.
|
||||
request.setIfrFlags(
|
||||
ifrContext->serializeFlagValues({&feature_flags::gFeatureFlagVectorSearchExtension}));
|
||||
}
|
||||
}
|
||||
|
||||
void validate(const AggregateCommandRequest& aggregate,
|
||||
const BSONObj& cmdObj,
|
||||
const NamespaceString& nss,
|
||||
|
||||
@ -104,6 +104,16 @@ MONGO_MOD_PUBLIC StatusWith<AggregateCommandRequest> parseFromBSONForTests(
|
||||
void addQuerySettingsToRequest(AggregateCommandRequest& request,
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx);
|
||||
|
||||
/**
|
||||
* Adds IFR flags from ifrContext to request. Always serializes the current flag value when
|
||||
* dispatching to shards, so they use the same value as the router/sender.
|
||||
*
|
||||
* For now, we only serialize featureFlagVectorSearchExtension.
|
||||
* TODO SERVER-116219: Expand IFR flag serialization beyond $vectorSearch.
|
||||
*/
|
||||
void addIfrFlagsToRequest(AggregateCommandRequest& request,
|
||||
std::shared_ptr<IncrementalFeatureRolloutContext> ifrContext);
|
||||
|
||||
/**
|
||||
* Validates if 'AggregateCommandRequest' specs complies with the current Client, which is required
|
||||
* for API versioning checks. Throws uassert in case of any failure.
|
||||
|
||||
@ -669,6 +669,7 @@ std::set<ShardId> getTargetedShardsForChangeStream(
|
||||
|
||||
return getTargetedShardsForAllShardsRequest(expCtx->getOperationContext());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expCtx,
|
||||
@ -1615,19 +1616,9 @@ BSONObj finalizePipelineAndTargetShardsForExplain(
|
||||
}
|
||||
|
||||
// Generate the command object for the targeted shards with the finalized pipeline.
|
||||
auto rawStages = [&pipelineToTarget]() {
|
||||
auto serialization = pipelineToTarget->serialize();
|
||||
std::vector<BSONObj> stages;
|
||||
stages.reserve(serialization.size());
|
||||
|
||||
for (const auto& stageObj : serialization) {
|
||||
invariant(stageObj.getType() == BSONType::object);
|
||||
stages.push_back(stageObj.getDocument().toBson());
|
||||
}
|
||||
|
||||
return stages;
|
||||
}();
|
||||
AggregateCommandRequest aggRequest(expCtx->getNamespaceString(), rawStages);
|
||||
AggregateCommandRequest aggRequest(expCtx->getNamespaceString(),
|
||||
pipelineToTarget->serializeToBson());
|
||||
aggregation_request_helper::addIfrFlagsToRequest(aggRequest, expCtx->getIfrContext());
|
||||
|
||||
LiteParsedPipeline liteParsedPipeline(aggRequest);
|
||||
PipelineDataSource pipelineDataSource = getPipelineDataSource(liteParsedPipeline);
|
||||
@ -1943,21 +1934,9 @@ std::unique_ptr<Pipeline> finalizeAndMaybePreparePipelineForExecution(
|
||||
"Preparing pipeline for execution",
|
||||
"pipeline"_attr = pipelineToTarget->serializeForLogging());
|
||||
|
||||
auto aggRequest = AggregateCommandRequest(expCtx->getNamespaceString(),
|
||||
pipelineToTarget->serializeToBson());
|
||||
|
||||
// Propagate IFR flags from the pipeline to the agg request. This is necessary
|
||||
// because we are executing an entire pipeline here, so it will not be treated as a
|
||||
// command coming from a router.
|
||||
// We gate this behavior on cluster FCV in order to avoid sending a field from a new
|
||||
// version shard to an old version shard that doesn't understand it.
|
||||
if (serverGlobalParams.featureCompatibility.acquireFCVSnapshot().isGreaterThanOrEqualTo(
|
||||
multiversion::FeatureCompatibilityVersion::kVersion_8_3)) { // NOLINT
|
||||
if (auto ifrCtx = pipelineToTarget->getContext()->getIfrContext()) {
|
||||
aggRequest.setIfrFlags(ifrCtx->serializeFlagValues(
|
||||
{&feature_flags::gFeatureFlagVectorSearchExtension}));
|
||||
}
|
||||
}
|
||||
AggregateCommandRequest aggRequest(expCtx->getNamespaceString(),
|
||||
pipelineToTarget->serializeToBson());
|
||||
aggregation_request_helper::addIfrFlagsToRequest(aggRequest, expCtx->getIfrContext());
|
||||
|
||||
return targetShardsAndAddMergeCursorsWithRoutingCtx(
|
||||
expCtx,
|
||||
|
||||
@ -183,21 +183,7 @@ Document serializeForPassthrough(const boost::intrusive_ptr<ExpressionContext>&
|
||||
}
|
||||
}
|
||||
|
||||
// If the featureFlagVectorSearchExtension IFR flag is enabled, all nodes are upgraded and can
|
||||
// parse IFR flags.
|
||||
// TODO SERVER-117721: Remove FCV gate once multiversion testing can handle IFR flags.
|
||||
if (serverGlobalParams.featureCompatibility.acquireFCVSnapshot().isGreaterThanOrEqualTo(
|
||||
multiversion::FeatureCompatibilityVersion::kVersion_8_3)) { // NOLINT
|
||||
// TODO SERVER-116219: Expand IFR flag serialization beyond $vectorSearch.
|
||||
auto ifrCtx = expCtx->getIfrContext();
|
||||
tassert(11565104, "IFRContext cannot be null", ifrCtx);
|
||||
// TODO SERVER-116472 Send all feature flags on the IFRContext regardless of feature flag
|
||||
// value.
|
||||
if (ifrCtx->getSavedFlagValue(feature_flags::gFeatureFlagVectorSearchExtension)) {
|
||||
req.setIfrFlags(
|
||||
ifrCtx->serializeFlagValues({&feature_flags::gFeatureFlagVectorSearchExtension}));
|
||||
}
|
||||
}
|
||||
aggregation_request_helper::addIfrFlagsToRequest(req, expCtx->getIfrContext());
|
||||
|
||||
auto cmdObj =
|
||||
isRawDataOperation(expCtx->getOperationContext()) && req.getNamespace() != executionNs
|
||||
|
||||
Loading…
Reference in New Issue
Block a user