diff --git a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml index c40f744dffc..e10157f0b49 100644 --- a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml +++ b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml @@ -81,6 +81,9 @@ selector: exclude_with_any_tags: - assumes_against_mongod_not_mongos - assumes_standalone_mongod + # Tests tagged auth_incompatible use internal commands (e.g. $_internalListCollections) that + # require the __system role, which is not available to the test user in this suite. + - auth_incompatible ## # The next tag corresponds to the special error thrown by the set_read_preference_secondary.js # override when it refuses to replace the readPreference of a particular command. Above each tag diff --git a/jstests/aggregation/sources/out/out_preserve_coll_options.js b/jstests/aggregation/sources/out/out_preserve_coll_options.js index d03d7fdb7f3..b765136117a 100644 --- a/jstests/aggregation/sources/out/out_preserve_coll_options.js +++ b/jstests/aggregation/sources/out/out_preserve_coll_options.js @@ -19,8 +19,6 @@ assert.commandWorked(db.createCollection(targetName, {validationLevel: "moderate const targetOptionsResponse = assert.commandWorked(db.runCommand({listCollections: 1, filter: {"name": targetName}})); const targetOptionsResults = new DBCommandCursor(db, targetOptionsResponse).toArray(); assert.eq(targetOptionsResults.length, 1, targetOptionsResults); -// Make test compatible regardless of whether 'recordIdsReplicated' is true. -delete targetOptionsResults[0].options.recordIdsReplicated; assert.eq({validationLevel: "moderate"}, targetOptionsResults[0].options, targetOptionsResults[0]); // Run $out pipeline. @@ -34,6 +32,4 @@ const targetOptionsResponseNew = assert.commandWorked( ); const targetOptionsResultsNew = new DBCommandCursor(db, targetOptionsResponseNew).toArray(); assert.eq(targetOptionsResultsNew.length, 1, targetOptionsResultsNew); -// Make test compatible regardless of whether 'recordIdsReplicated' is true. -delete targetOptionsResultsNew[0].options.recordIdsReplicated; assert.eq({validationLevel: "moderate"}, targetOptionsResultsNew[0].options, targetOptionsResultsNew[0]); diff --git a/jstests/core/catalog/agg_internal_list_collections_stage.js b/jstests/core/catalog/agg_internal_list_collections_stage.js index 9a0cae8e2f2..61053cfb3d1 100644 --- a/jstests/core/catalog/agg_internal_list_collections_stage.js +++ b/jstests/core/catalog/agg_internal_list_collections_stage.js @@ -46,6 +46,16 @@ function removePrimaryField(listOfCollections) { return listResult; } +function removeRecordIdsReplicatedField(listOfCollections) { + return listOfCollections.map((entry) => { + const { + info: {recordIdsReplicated: _, ...infoWithoutRecordIdsReplicated}, + ...entryWithoutInfo + } = entry; + return {info: infoWithoutRecordIdsReplicated, ...entryWithoutInfo}; + }); +} + // TODO SERVER-120014: Remove once 9.0 becomes last LTS and all timeseries collections are viewless. function getBucketCollections(listOfCollections) { return listOfCollections.filter((collEntry) => collEntry["ns"].includes(".system.buckets.")); @@ -112,6 +122,12 @@ function compareInternalListCollectionsStageAgainstListCollections(dbTest, expec internalStageResponseAgainstDbTest = removeUuidField(internalStageResponseAgainstDbTest); } internalStageResponseAgainstDbTest = removePrimaryField(internalStageResponseAgainstDbTest); + if (FixtureHelpers.isMongos(dbTest)) { + // The router scrubs 'recordIdsReplicated' from listCollections responses because it is an + // internal field that can be inconsistent across shards. $_internalListCollections returns + // it unconditionally, so remove it before comparing. + internalStageResponseAgainstDbTest = removeRecordIdsReplicatedField(internalStageResponseAgainstDbTest); + } const internalListCollectionsBuckets = getBucketCollections(internalStageResponseAgainstDbTest); internalStageResponseAgainstDbTest = normalizeTimeseriesCollectionFormat(internalStageResponseAgainstDbTest); @@ -136,12 +152,22 @@ function compareInternalListCollectionsStageAgainstListCollections(dbTest, expec stageResponseAgainstAdminDb = removeUuidField(stageResponseAgainstAdminDb); } stageResponseAgainstAdminDb = removePrimaryField(stageResponseAgainstAdminDb); + if (FixtureHelpers.isMongos(dbTest)) { + // The router scrubs 'recordIdsReplicated' from listCollections responses because it is an + // internal field that can be inconsistent across shards. $_internalListCollections returns + // it unconditionally, so remove it before comparing. + stageResponseAgainstAdminDb = removeRecordIdsReplicatedField(stageResponseAgainstAdminDb); + } stageResponseAgainstAdminDb = normalizeTimeseriesCollectionFormat(stageResponseAgainstAdminDb); listCollectionsResponse.forEach((entry) => { - assert.contains( - entry, - stageResponseAgainstAdminDb, + // Use bsonUnorderedFieldsCompare rather than assert.contains so that two semantically + // equivalent documents with different field orderings are treated as equal. + const found = stageResponseAgainstAdminDb.some( + (stageEntry) => bsonUnorderedFieldsCompare(entry, stageEntry) === 0, + ); + assert( + found, "The listCollections entry " + tojson(entry) + " hasn't been found on the $_internalListCollections output " + diff --git a/jstests/core/catalog/agg_list_cluster_catalog.js b/jstests/core/catalog/agg_list_cluster_catalog.js index ea336770228..012901042ae 100644 --- a/jstests/core/catalog/agg_list_cluster_catalog.js +++ b/jstests/core/catalog/agg_list_cluster_catalog.js @@ -59,6 +59,13 @@ function verify(listCollectionEntry, stageEntry, specs) { delete listCollectionEntry.info.configDebugDump; delete stageEntry.info.configDebugDump; } + + // The router scrubs 'recordIdsReplicated' from listCollections responses because it is an + // internal field that can be inconsistent across shards. $listClusterCatalog returns it + // unconditionally, so remove it from the stage entry before comparing. + if (FixtureHelpers.isMongos(db)) { + delete stageEntry.info.recordIdsReplicated; + } } function errMsgWithListCollectionEntry(reason) { diff --git a/jstests/core/catalog/list_collections_filter.js b/jstests/core/catalog/list_collections_filter.js index ea5df8ec8a2..c7658609e71 100644 --- a/jstests/core/catalog/list_collections_filter.js +++ b/jstests/core/catalog/list_collections_filter.js @@ -10,8 +10,6 @@ */ import {ClusteredCollectionUtil} from "jstests/libs/clustered_collections/clustered_collection_util.js"; -import {assertDropCollection} from "jstests/libs/collection_drop_recreate.js"; -import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js"; import {FixtureHelpers} from "jstests/libs/fixture_helpers.js"; const mydb = db.getSiblingDB("list_collections_filter"); @@ -24,20 +22,6 @@ const defaultCollectionOptionsFilter = ClusteredCollectionUtil.areAllCollections ? {"options.clusteredIndex.unique": true} : {options: {}}; -if (FeatureFlagUtil.isPresentAndEnabled(db, "RecordIdsReplicated")) { - // Replicated recordIds are enabled, but may not be the default for user collections. Create a - // test collection to check if 'replicatedRecordIds' is set by default. - const checkReplRidCollName = "checkReplRidColl"; - assert.commandWorked(mydb.createCollection(checkReplRidCollName)); - const collMetadata = mydb - .getCollectionInfos() - .find((collectionMetadata) => collectionMetadata.name === checkReplRidCollName); - if (collMetadata.options.recordIdsReplicated) { - defaultCollectionOptionsFilter.options.recordIdsReplicated = true; - } - assertDropCollection(mydb, checkReplRidCollName); -} - // Make some collections. assert.commandWorked(mydb.createCollection("lists")); assert.commandWorked(mydb.createCollection("ordered_sets")); diff --git a/jstests/core/query/internal_rename_if_options_and_indexes_match.js b/jstests/core/query/internal_rename_if_options_and_indexes_match.js index 7b8afda1f55..f7799a7785e 100644 --- a/jstests/core/query/internal_rename_if_options_and_indexes_match.js +++ b/jstests/core/query/internal_rename_if_options_and_indexes_match.js @@ -38,12 +38,6 @@ let commandObj = { collectionOptions: {uuid: optionsArray[0].info.uuid}, }; -const recordIdsReplicatedByDefault = optionsArray[0].options.recordIdsReplicated; -if (recordIdsReplicatedByDefault) { - // RecordIds are replicated by default - make the collectionOptions reflect that. - commandObj.collectionOptions.recordIdsReplicated = optionsArray[0].options.recordIdsReplicated; -} - // Destination has an extra index. assert.commandFailedWithCode(adminDB.runCommand(commandObj), ErrorCodes.CommandFailed); @@ -75,8 +69,5 @@ commandObj.collectionOptions = { size: 256, max: 2, }; -if (recordIdsReplicatedByDefault) { - commandObj.collectionOptions.recordIdsReplicated = optionsArray[0].options.recordIdsReplicated; -} assert.commandWorked(adminDB.runCommand(commandObj)); diff --git a/jstests/core/replicate_record_ids/collmod_removes_replicate_record_ids.js b/jstests/core/replicate_record_ids/collmod_removes_replicate_record_ids.js index 2e5456c915e..cdef152d83e 100644 --- a/jstests/core/replicate_record_ids/collmod_removes_replicate_record_ids.js +++ b/jstests/core/replicate_record_ids/collmod_removes_replicate_record_ids.js @@ -13,9 +13,15 @@ * # TODO (SERVER-118693): Check if this tag can be removed. * # Unsetting 'recordIdsReplicated' option with the collMod command can create a test-only race condition during initial sync. * incompatible_with_initial_sync, + * # hasRecordIdsReplicated uses $_internalListCollections which requires the __system role. + * auth_incompatible, + * # hasRecordIdsReplicated uses $_internalListCollections which only supports local read concern. + * assumes_read_concern_unchanged, * ] */ +import {hasRecordIdsReplicated} from "jstests/libs/collection_write_path/replicated_record_ids_utils.js"; + const collName = "replRecIdCollForCollMod"; const coll = db.getCollection(collName); coll.drop(); @@ -35,7 +41,7 @@ assert( ); jsTestLog("Collection options after creation: " + tojson(collInfo)); assert( - collInfo.info.hasOwnProperty("recordIdsReplicated"), + hasRecordIdsReplicated(db, collName), "collection options does not contain recordIdsReplicated flag after collection creation", ); @@ -58,7 +64,7 @@ assert( ); jsTestLog("Collection options after collMod: " + tojson(collInfo)); assert( - !collInfo.info.hasOwnProperty("recordIdsReplicated"), + !hasRecordIdsReplicated(db, collName), "collMod failed to remove recordIdsReplicated flag from collection options", ); diff --git a/jstests/core/replicate_record_ids/replicate_record_ids_collection_creation.js b/jstests/core/replicate_record_ids/replicate_record_ids_collection_creation.js index ec2c4b07d04..fddefe1e90d 100644 --- a/jstests/core/replicate_record_ids/replicate_record_ids_collection_creation.js +++ b/jstests/core/replicate_record_ids/replicate_record_ids_collection_creation.js @@ -6,9 +6,15 @@ * assumes_no_implicit_collection_creation_after_drop, * expects_explicit_underscore_id_index, * featureFlagRecordIdsReplicated, + * # hasRecordIdsReplicated uses $_internalListCollections which requires the __system role. + * auth_incompatible, + * # hasRecordIdsReplicated uses $_internalListCollections which only supports local read concern. + * assumes_read_concern_unchanged, * ] */ +import {hasRecordIdsReplicated} from "jstests/libs/collection_write_path/replicated_record_ids_utils.js"; + const collName = "replRecIdCollForCreate"; const coll = db.getCollection(collName); coll.drop(); @@ -17,10 +23,9 @@ coll.drop(); // implicitly. const clusteredCollName = collName + "_clustered"; assert.commandWorked(db.createCollection(clusteredCollName, {clusteredIndex: {key: {_id: 1}, unique: true}})); -const clusteredCollInfo = db[clusteredCollName].exists(); assert( - !clusteredCollInfo.info.hasOwnProperty("recordIdsReplicated"), - "clustered collection created with recordIdsReplicated collection option: " + tojson(clusteredCollInfo), + !hasRecordIdsReplicated(db, clusteredCollName), + "clustered collection created with recordIdsReplicated collection option", ); // Create a collection with the param set. @@ -39,6 +44,6 @@ assert( ); jsTestLog("Collection options after creation: " + tojson(collInfo)); assert( - collInfo.info.hasOwnProperty("recordIdsReplicated"), + hasRecordIdsReplicated(db, collName), "collection options does not contain recordIdsReplicated flag after collection creation", ); diff --git a/jstests/libs/collection_write_path/replicated_record_ids_utils.js b/jstests/libs/collection_write_path/replicated_record_ids_utils.js index 3f10d9b6a68..a62cea97eaf 100644 --- a/jstests/libs/collection_write_path/replicated_record_ids_utils.js +++ b/jstests/libs/collection_write_path/replicated_record_ids_utils.js @@ -2,6 +2,23 @@ // documents, otherwise. import {ReplSetTest} from "jstests/libs/replsettest.js"; +/** + * Returns true if the collection named 'collName' in 'db' has the 'recordIdsReplicated' field + * set in its catalog entry, false otherwise. + * + * Uses $_internalListCollections (info.recordIdsReplicated) rather than listCollections because + * listCollections scrubs 'recordIdsReplicated' from the 'info' sub-document on mongos routers. + * + * NOTE: $_internalListCollections requires the __system role and only supports local read concern. + * Tests that call this function must be tagged 'auth_incompatible' and + * 'assumes_read_concern_unchanged'. + */ +export function hasRecordIdsReplicated(db, collName) { + const ns = db.getName() + "." + collName; + const results = db.aggregate([{$_internalListCollections: {}}, {$match: {ns: ns}}]).toArray(); + return results.length > 0 && !!results[0].info && !!results[0].info.recordIdsReplicated; +} + export function getShowRecordIdsCursor(node, dbName, replicatedCollName) { return node .getDB(dbName) diff --git a/jstests/noPassthrough/catalog/list_collections_scrub_record_ids_replicated.js b/jstests/noPassthrough/catalog/list_collections_scrub_record_ids_replicated.js new file mode 100644 index 00000000000..5985e4051fc --- /dev/null +++ b/jstests/noPassthrough/catalog/list_collections_scrub_record_ids_replicated.js @@ -0,0 +1,44 @@ +/** + * Tests that 'recordIdsReplicated' is scrubbed from the 'info' sub-document of each collection + * entry returned by listCollections when run via the router (mongos). The field is internal and + * can be inconsistent across shards, so it must not be exposed to clients. + * + * @tags: [ + * featureFlagRecordIdsReplicated, + * expects_explicit_underscore_id_index, + * ] + */ + +import {ShardingTest} from "jstests/libs/shardingtest.js"; + +const st = new ShardingTest({mongos: 1, shards: 1}); +const dbName = "test"; +const collName = jsTestName(); + +const mongos = st.s0; +const shard0 = st.shard0; + +assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: shard0.shardName})); + +const testDB = mongos.getDB(dbName); +assert.commandWorked(testDB.createCollection(collName)); + +// Verify the field is present when querying the shard directly. +const shardCollInfos = shard0.getDB(dbName).getCollectionInfos({name: collName}); +assert.eq(1, shardCollInfos.length, "expected to find collection on shard: " + tojson(shardCollInfos)); +const shardCollInfo = shardCollInfos[0]; +assert( + shardCollInfo.info.hasOwnProperty("recordIdsReplicated"), + "expected 'recordIdsReplicated' to be present in shard listCollections response: " + tojson(shardCollInfo), +); + +// Verify the field is scrubbed when querying via the router (mongos). +const routerCollInfos = testDB.getCollectionInfos({name: collName}); +assert.eq(1, routerCollInfos.length, "expected to find collection via router: " + tojson(routerCollInfos)); +const routerCollInfo = routerCollInfos[0]; +assert( + !routerCollInfo.info.hasOwnProperty("recordIdsReplicated"), + "expected 'recordIdsReplicated' to be absent in router listCollections response: " + tojson(routerCollInfo), +); + +st.stop(); diff --git a/jstests/noPassthrough/cluster_scalability_misc/move_collection_create_options.js b/jstests/noPassthrough/cluster_scalability_misc/move_collection_create_options.js index d5bdfda83fe..5d703864bf5 100644 --- a/jstests/noPassthrough/cluster_scalability_misc/move_collection_create_options.js +++ b/jstests/noPassthrough/cluster_scalability_misc/move_collection_create_options.js @@ -48,7 +48,7 @@ function validateCollection( dbName, collName, shardKey, - {expectedCollOpts, expectedIndexes, expectNoShardingMetadata, expectedCollInfos} = {}, + {expectedCollOpts, expectedIndexes, expectNoShardingMetadata} = {}, ) { const db = conn.getDB(dbName); const coll = db.getCollection(collName); @@ -63,15 +63,6 @@ function validateCollection( } assert.eq(coll.countDocuments({}), maxCount); - if (expectedCollInfos) { - jsTest.log("*** Checking expectedCollInfos " + tojson({listCollectionsDoc, expectedCollInfos})); - for (let fieldName in expectedCollInfos) { - const actual = getDottedField(listCollectionsDoc.info, fieldName); - const expected = expectedCollInfos[fieldName]; - assert.eq(bsonUnorderedFieldsCompare(actual, expected), 0, {fieldName, actual, expected}); - } - } - if (expectedIndexes) { const actualIndexes = coll.getIndexes(); jsTest.log("*** Checking expectedIndexes " + tojson({actualIndexes, expectedIndexes})); @@ -471,25 +462,6 @@ const testCases = [ }); }, }, - { - name: "recordIdsReplicated", - // TODO (SERVER-68173): Enable featureFlagRecordIdsReplicated. - shouldSkip: (conn) => !FeatureFlagUtil.isEnabled(conn, "RecordIdsReplicated"), - createCollection: (conn, dbName, collName) => { - assert.commandWorked(conn.getDB(dbName).runCommand({create: collName})); - return dbName + "." + collName; - }, - insertDocuments: (conn, dbName, collName) => { - insertDocuments(conn, dbName, collName); - }, - validateCollection: (conn, dbName, collName, shardKey) => { - validateCollection(conn, dbName, collName, shardKey, { - expectedCollInfos: { - "recordIdsReplicated": true, - }, - }); - }, - }, { name: "expireAfterSeconds", createCollection: (conn, dbName, collName) => { diff --git a/jstests/sharding/list_cluster_catalog.js b/jstests/sharding/list_cluster_catalog.js index 1afeaf5e18b..aa7281f8ce0 100644 --- a/jstests/sharding/list_cluster_catalog.js +++ b/jstests/sharding/list_cluster_catalog.js @@ -98,6 +98,11 @@ function verify(expectedResult, result) { result.shards.sort(); } + // The router scrubs 'recordIdsReplicated' from listCollections responses because it is an + // internal field that can be inconsistent across shards. $listClusterCatalog returns it + // unconditionally (via $_internalListCollections), so remove it before comparing. + delete result.info.recordIdsReplicated; + // Ensure the resuls fields matches the expected once. assert.eq(expectedResult.options, result.options, "options field mismatch:" + tojson(result)); assert.eq(expectedResult.info, result.info, "info field mismatch:" + tojson(result)); diff --git a/jstests/sharding/replicate_record_ids_collection_migration.js b/jstests/sharding/replicate_record_ids_collection_migration.js index edf353c4199..8771330c0a4 100644 --- a/jstests/sharding/replicate_record_ids_collection_migration.js +++ b/jstests/sharding/replicate_record_ids_collection_migration.js @@ -5,10 +5,15 @@ * featureFlagRecordIdsReplicated, * # Replicated record IDs are incompatible with clustered collections. * expects_explicit_underscore_id_index, + * # hasRecordIdsReplicated uses $_internalListCollections which requires the __system role. + * auth_incompatible, + * # hasRecordIdsReplicated uses $_internalListCollections which only supports local read concern. + * assumes_read_concern_unchanged, * ] */ import { + hasRecordIdsReplicated, getRidForDoc, mapFieldToMatchingDocRid, } from "jstests/libs/collection_write_path/replicated_record_ids_utils.js"; @@ -30,8 +35,7 @@ const coll = testDB[collName]; assert.commandWorked(testDB.adminCommand({enableSharding: dbName, primaryShard: shard0.shardName})); assert.commandWorked(testDB.createCollection(collName)); -let collInfo = coll.exists(); -assert(collInfo.info.recordIdsReplicated, tojson(collInfo)); +assert(hasRecordIdsReplicated(testDB, collName), "collection should have recordIdsReplicated set"); // Remove some of the initial documents on the collection with replicated record // IDs to create gaps in the record IDs. @@ -106,11 +110,15 @@ assert.eq( // Ensure that 'recordIdsReplicated` collection option is still present and set to true. // Check collection options on shard. -collInfo = shard1.getCollection(collNS).exists(); -assert(collInfo.info.recordIdsReplicated, tojson(collInfo)); +assert( + hasRecordIdsReplicated(shard1.getDB(dbName), collName), + "shard should still have recordIdsReplicated set after moveCollection", +); // Check collection options through mongos. -collInfo = mongos.getCollection(collNS).exists(); -assert(collInfo.info.recordIdsReplicated, tojson(collInfo)); +assert( + hasRecordIdsReplicated(mongos.getDB(dbName), collName), + "mongos should still see recordIdsReplicated set after moveCollection", +); st.stop(); diff --git a/src/mongo/db/global_catalog/ddl/cluster_list_collections_cmd.cpp b/src/mongo/db/global_catalog/ddl/cluster_list_collections_cmd.cpp index 82744e1c3ea..9aea34a5d55 100644 --- a/src/mongo/db/global_catalog/ddl/cluster_list_collections_cmd.cpp +++ b/src/mongo/db/global_catalog/ddl/cluster_list_collections_cmd.cpp @@ -85,6 +85,27 @@ namespace { constexpr auto systemBucketsDot = "system.buckets."_sd; +/** + * Removes 'recordIdsReplicated' from the 'info' sub-document of a single listCollections result + * entry. The field is internal and can be inconsistent across shards, so it must not be exposed + * via the router. + */ +static BSONObj scrubRecordIdsReplicatedFromCollectionEntry(BSONObj entry) { + auto infoElem = entry["info"]; + if (infoElem.eoo() || infoElem.type() != BSONType::object) { + return entry; + } + BSONObjBuilder entryBuilder; + for (auto&& field : entry) { + if (field.fieldNameStringData() == "info"_sd) { + entryBuilder.append("info"_sd, infoElem.Obj().removeField("recordIdsReplicated"_sd)); + } else { + entryBuilder.append(field); + } + } + return entryBuilder.obj(); +} + BSONObj rewriteCommandForListingOwnCollections(OperationContext* opCtx, const DatabaseName& dbName, const BSONObj& cmdObj) { @@ -278,7 +299,8 @@ public: nss, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager(), - privileges)); + privileges, + scrubRecordIdsReplicatedFromCollectionEntry)); CommandHelpers::filterCommandReplyForPassthrough(transformedResponse, &output); uassertStatusOK(getStatusFromCommandResult(output.asTempObj())); diff --git a/src/mongo/s/query/exec/BUILD.bazel b/src/mongo/s/query/exec/BUILD.bazel index f5006308145..682e8f35198 100644 --- a/src/mongo/s/query/exec/BUILD.bazel +++ b/src/mongo/s/query/exec/BUILD.bazel @@ -75,6 +75,7 @@ mongo_cc_library( "router_stage_queued_data.cpp", "router_stage_remove_metadata_fields.cpp", "router_stage_skip.cpp", + "router_stage_transform.cpp", ], deps = [ ":async_results_merger", @@ -155,6 +156,7 @@ mongo_cc_unit_test( "router_stage_limit_test.cpp", "router_stage_remove_metadata_fields_test.cpp", "router_stage_skip_test.cpp", + "router_stage_transform_test.cpp", "shard_tag_test.cpp", "store_possible_cursor_test.cpp", ], diff --git a/src/mongo/s/query/exec/router_stage_transform.cpp b/src/mongo/s/query/exec/router_stage_transform.cpp new file mode 100644 index 00000000000..7aac34fc04a --- /dev/null +++ b/src/mongo/s/query/exec/router_stage_transform.cpp @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/query/exec/router_stage_transform.h" + +#include + +namespace mongo { + +RouterStageTransform::RouterStageTransform(OperationContext* opCtx, + std::unique_ptr child, + TransformFn transform) + : RouterExecStage(opCtx, std::move(child)), _transform(std::move(transform)) {} + +StatusWith RouterStageTransform::next() { + auto childResult = getChildStage()->next(); + if (!childResult.isOK() || childResult.getValue().isEOF()) { + return childResult; + } + return ClusterQueryResult(_transform(*childResult.getValue().getResult()), + childResult.getValue().getShardId()); +} + +} // namespace mongo diff --git a/src/mongo/s/query/exec/router_stage_transform.h b/src/mongo/s/query/exec/router_stage_transform.h new file mode 100644 index 00000000000..2e1263c3334 --- /dev/null +++ b/src/mongo/s/query/exec/router_stage_transform.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/s/query/exec/cluster_query_result.h" +#include "mongo/s/query/exec/router_exec_stage.h" +#include "mongo/util/modules.h" + +#include +#include + +namespace mongo { + +/** + * A router execution stage that applies a caller-supplied per-document transformation as results + * flow through the cursor pipeline. The transform is applied to every document returned by + * next(), including documents from getMore batches, without buffering the full result set. + */ +class RouterStageTransform final : public RouterExecStage { +public: + using TransformFn = std::function; + + RouterStageTransform(OperationContext* opCtx, + std::unique_ptr child, + TransformFn transform); + + StatusWith next() final; + +private: + TransformFn _transform; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/exec/router_stage_transform_test.cpp b/src/mongo/s/query/exec/router_stage_transform_test.cpp new file mode 100644 index 00000000000..a16fbb894cb --- /dev/null +++ b/src/mongo/s/query/exec/router_stage_transform_test.cpp @@ -0,0 +1,235 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/query/exec/router_stage_transform.h" + +#include "mongo/base/error_codes.h" +#include "mongo/base/status.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/s/query/exec/router_stage_mock.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/duration.h" + +#include + +namespace mongo { +namespace { + +// Router stages do not use their OperationContext in these tests. +OperationContext* opCtx = nullptr; + +// A simple transform that adds a field "transformed: true" to every document. +BSONObj addTransformedField(BSONObj doc) { + return doc.addField(BSON("transformed" << true).firstElement()); +} + +// A transform that removes the "secret" field from every document. +BSONObj removeSecretField(BSONObj doc) { + return doc.removeField("secret"); +} + +TEST(RouterStageTransformTest, AppliesTransformToEachDocument) { + auto mockStage = std::make_unique(opCtx); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2 << "b" << 3)); + mockStage->queueResult(BSON("x" << "hello")); + + auto stage = + std::make_unique(opCtx, std::move(mockStage), addTransformedField); + + auto first = stage->next(); + ASSERT_OK(first.getStatus()); + ASSERT(first.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1 << "transformed" << true)); + + auto second = stage->next(); + ASSERT_OK(second.getStatus()); + ASSERT(second.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*second.getValue().getResult(), + BSON("a" << 2 << "b" << 3 << "transformed" << true)); + + auto third = stage->next(); + ASSERT_OK(third.getStatus()); + ASSERT(third.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*third.getValue().getResult(), BSON("x" << "hello" << "transformed" << true)); + + auto eof = stage->next(); + ASSERT_OK(eof.getStatus()); + ASSERT(eof.getValue().isEOF()); +} + +TEST(RouterStageTransformTest, TransformCanRemoveFields) { + auto mockStage = std::make_unique(opCtx); + mockStage->queueResult(BSON("a" << 1 << "secret" << 42)); + mockStage->queueResult(BSON("b" << 2)); + mockStage->queueResult(BSON("secret" << 99 << "c" << 3)); + + auto stage = + std::make_unique(opCtx, std::move(mockStage), removeSecretField); + + auto first = stage->next(); + ASSERT_OK(first.getStatus()); + ASSERT(first.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1)); + + auto second = stage->next(); + ASSERT_OK(second.getStatus()); + ASSERT(second.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*second.getValue().getResult(), BSON("b" << 2)); + + auto third = stage->next(); + ASSERT_OK(third.getStatus()); + ASSERT(third.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*third.getValue().getResult(), BSON("c" << 3)); + + auto eof = stage->next(); + ASSERT_OK(eof.getStatus()); + ASSERT(eof.getValue().isEOF()); +} + +TEST(RouterStageTransformTest, PropagatesError) { + auto mockStage = std::make_unique(opCtx); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueError(Status(ErrorCodes::BadValue, "something went wrong")); + + auto stage = + std::make_unique(opCtx, std::move(mockStage), addTransformedField); + + auto first = stage->next(); + ASSERT_OK(first.getStatus()); + ASSERT(first.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1 << "transformed" << true)); + + auto second = stage->next(); + ASSERT_NOT_OK(second.getStatus()); + ASSERT_EQ(second.getStatus(), ErrorCodes::BadValue); + ASSERT_EQ(second.getStatus().reason(), "something went wrong"); +} + +TEST(RouterStageTransformTest, PropagatesEOF) { + auto mockStage = std::make_unique(opCtx); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueEOF(); + // A result queued after EOF should not be reached. + mockStage->queueResult(BSON("a" << 2)); + + auto stage = + std::make_unique(opCtx, std::move(mockStage), addTransformedField); + + auto first = stage->next(); + ASSERT_OK(first.getStatus()); + ASSERT(first.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1 << "transformed" << true)); + + auto eof = stage->next(); + ASSERT_OK(eof.getStatus()); + ASSERT(eof.getValue().isEOF()); +} + +TEST(RouterStageTransformTest, RemotesExhausted) { + auto mockStage = std::make_unique(opCtx); + mockStage->queueResult(BSON("a" << 1)); + mockStage->markRemotesExhausted(); + + auto stage = + std::make_unique(opCtx, std::move(mockStage), addTransformedField); + ASSERT_TRUE(stage->remotesExhausted()); + + auto first = stage->next(); + ASSERT_OK(first.getStatus()); + ASSERT(first.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1 << "transformed" << true)); + ASSERT_TRUE(stage->remotesExhausted()); + + auto eof = stage->next(); + ASSERT_OK(eof.getStatus()); + ASSERT(eof.getValue().isEOF()); + ASSERT_TRUE(stage->remotesExhausted()); +} + +TEST(RouterStageTransformTest, ForwardsAwaitDataTimeout) { + auto mockStage = std::make_unique(opCtx); + auto* mockStagePtr = mockStage.get(); + ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); + + auto stage = + std::make_unique(opCtx, std::move(mockStage), addTransformedField); + ASSERT_OK(stage->setAwaitDataTimeout(Milliseconds(500))); + + auto timeout = mockStagePtr->getAwaitDataTimeout(); + ASSERT_OK(timeout.getStatus()); + ASSERT_EQ(500, durationCount(timeout.getValue())); +} + +TEST(RouterStageTransformTest, TransformIsAppliedToNestedSubDocumentField) { + // Simulates the recordIdsReplicated scrub use case: remove a field from a nested sub-document. + auto removeNestedField = [](BSONObj doc) -> BSONObj { + auto infoElem = doc["info"]; + if (infoElem.eoo() || infoElem.type() != BSONType::object) { + return doc; + } + BSONObjBuilder builder; + for (auto&& field : doc) { + if (field.fieldNameStringData() == "info"_sd) { + builder.append("info"_sd, infoElem.Obj().removeField("internal"_sd)); + } else { + builder.append(field); + } + } + return builder.obj(); + }; + + auto mockStage = std::make_unique(opCtx); + mockStage->queueResult( + BSON("name" << "coll1" << "info" << BSON("uuid" << "abc" << "internal" << true))); + mockStage->queueResult(BSON("name" << "coll2" << "info" << BSON("uuid" << "def"))); + + auto stage = + std::make_unique(opCtx, std::move(mockStage), removeNestedField); + + auto first = stage->next(); + ASSERT_OK(first.getStatus()); + ASSERT(first.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), + BSON("name" << "coll1" << "info" << BSON("uuid" << "abc"))); + + auto second = stage->next(); + ASSERT_OK(second.getStatus()); + ASSERT(second.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*second.getValue().getResult(), + BSON("name" << "coll2" << "info" << BSON("uuid" << "def"))); + + auto eof = stage->next(); + ASSERT_OK(eof.getStatus()); + ASSERT(eof.getValue().isEOF()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/query/exec/store_possible_cursor.cpp b/src/mongo/s/query/exec/store_possible_cursor.cpp index 81022f2f14d..e56b4a238c1 100644 --- a/src/mongo/s/query/exec/store_possible_cursor.cpp +++ b/src/mongo/s/query/exec/store_possible_cursor.cpp @@ -51,10 +51,13 @@ #include "mongo/s/query/exec/cluster_client_cursor_params.h" #include "mongo/s/query/exec/cluster_cursor_manager.h" #include "mongo/s/query/exec/collect_query_stats_mongos.h" +#include "mongo/s/query/exec/router_stage_merge.h" +#include "mongo/s/query/exec/router_stage_transform.h" #include "mongo/s/transaction_router.h" #include "mongo/util/decorable.h" #include +#include #include #include @@ -210,4 +213,122 @@ StatusWith storePossibleCursor(OperationContext* opCtx, return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); } +StatusWith storePossibleCursor(OperationContext* opCtx, + const ShardId& shardId, + const HostAndPort& server, + const BSONObj& cmdResult, + const NamespaceString& requestedNss, + std::shared_ptr executor, + ClusterCursorManager* cursorManager, + PrivilegeVector privileges, + std::function documentTransform, + TailableModeEnum tailableMode, + boost::optional routerSort) { + if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) { + return cmdResult; + } + + auto incomingCursorResponse = CursorResponse::parseFromBSON(cmdResult); + if (!incomingCursorResponse.isOK()) { + return incomingCursorResponse.getStatus(); + } + + auto& response = incomingCursorResponse.getValue(); + if (const auto& cursorMetrics = response.getCursorMetrics()) { + CurOp::get(opCtx)->debug().getAdditiveMetrics().aggregateCursorMetrics(*cursorMetrics); + } + + auto&& opDebug = CurOp::get(opCtx)->debug(); + opDebug.getAdditiveMetrics().nBatches = 1; + opDebug.nShards = std::max(opDebug.nShards, 1); + CurOp::get(opCtx)->setEndOfOpMetrics(response.getBatch().size()); + + // Apply the transform to the first batch, which comes directly from the shard response. + std::vector transformedFirstBatch; + transformedFirstBatch.reserve(response.getBatch().size()); + for (const auto& doc : response.getBatch()) { + transformedFirstBatch.push_back(documentTransform(doc)); + } + + if (response.getCursorId() == CursorId(0)) { + // Cursor exhausted in the first batch, no proxy cursor is needed. + opDebug.cursorExhausted = true; + collectQueryStatsMongos(opCtx, std::move(opDebug.getQueryStatsInfo().key)); + CursorResponse exhaustedResponse(requestedNss, + CursorId(0), + std::move(transformedFirstBatch), + response.getAtClusterTime(), + response.getPostBatchResumeToken()); + return exhaustedResponse.toBSON(CursorResponse::ResponseType::InitialResponse); + } + + // Build the cursor params pointing at the shard's open cursor. + ClusterClientCursorParams params(response.getNSS(), + APIParameters::get(opCtx), + boost::none /* ReadPreferenceSetting */, + repl::ReadConcernArgs::get(opCtx), + [&] { + if (!opCtx->getLogicalSessionId()) + return OperationSessionInfoFromClient(); + OperationSessionInfoFromClient osi{ + *opCtx->getLogicalSessionId(), opCtx->getTxnNumber()}; + if (TransactionRouter::get(opCtx)) { + osi.setAutocommit(false); + } + return osi; + }()); + params.remotes.emplace_back(); + auto& remoteCursor = params.remotes.back(); + remoteCursor.setShardId(shardId.toString()); + remoteCursor.setHostAndPort(server); + remoteCursor.setCursorResponse(CursorResponse(response.getNSS(), + response.getCursorId(), + {} /* first batch served above */, + response.getAtClusterTime(), + response.getPostBatchResumeToken())); + params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); + params.tailableMode = tailableMode; + params.originatingPrivileges = std::move(privileges); + if (routerSort) { + params.sortToApplyOnRouter = *routerSort; + } + params.requestQueryStatsFromRemotes = response.getCursorMetrics().has_value(); + + // Build the execution plan: RouterStageTransform wraps RouterStageMerge so the + // documentTransform is applied to every document in every batch, including getMore batches, + // without buffering the full result set in mongos memory. + // + // extractARMParams() moves 'remotes' out of params. The remaining fields in params are still + // needed by ClusterClientCursorImpl for auth, session, and metrics bookkeeping. + auto armParams = params.extractARMParams(); + auto mergeStage = std::make_unique(opCtx, executor, std::move(armParams)); + auto transformStage = std::make_unique( + opCtx, std::move(mergeStage), std::move(documentTransform)); + + auto ccc = ClusterClientCursorImpl::make(opCtx, std::move(transformStage), std::move(params)); + collectQueryStatsMongos(opCtx, ccc); + ccc->detachFromOperationContext(); + + auto authUser = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName(); + auto clusterCursorId = + cursorManager->registerCursor(opCtx, + ccc.releaseCursor(), + requestedNss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal, + authUser); + if (!clusterCursorId.isOK()) { + return clusterCursorId.getStatus(); + } + + opDebug.cursorid = clusterCursorId.getValue(); + + CursorResponse outgoingCursorResponse(requestedNss, + clusterCursorId.getValue(), + std::move(transformedFirstBatch), + response.getAtClusterTime(), + response.getPostBatchResumeToken()); + return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); +} + } // namespace mongo diff --git a/src/mongo/s/query/exec/store_possible_cursor.h b/src/mongo/s/query/exec/store_possible_cursor.h index a6da4f9a193..586a8b89e61 100644 --- a/src/mongo/s/query/exec/store_possible_cursor.h +++ b/src/mongo/s/query/exec/store_possible_cursor.h @@ -42,6 +42,7 @@ #include "mongo/util/modules.h" #include "mongo/util/net/hostandport.h" +#include #include #include @@ -127,4 +128,24 @@ StatusWith storePossibleCursor(OperationContext* opCtx, PrivilegeVector privileges, TailableModeEnum tailableMode); +/** + * Overload of storePossibleCursor(), but applies 'documentTransform' to every result document, both + * in the initial first batch and in all subsequent getMore batches. The transform is injected into + * the cursor's execution plan via RouterStageTransform. + * + * Use this when a command-level response transformation must be applied uniformly across all + * cursor batches (e.g., stripping internal fields before sending response to clients). + */ +StatusWith storePossibleCursor(OperationContext* opCtx, + const ShardId& shardId, + const HostAndPort& server, + const BSONObj& cmdResult, + const NamespaceString& requestedNss, + std::shared_ptr executor, + ClusterCursorManager* cursorManager, + PrivilegeVector privileges, + std::function documentTransform, + TailableModeEnum tailableMode = TailableModeEnum::kNormal, + boost::optional routerSort = boost::none); + } // namespace MONGO_MOD_PUBLIC mongo