SERVER-122769 Remove recordIdsReplicated from mongos listCollections Output (#50993)
GitOrigin-RevId: e21415eb3ce87a248220315431e37459527a0263
This commit is contained in:
parent
f6ce0b0b0a
commit
073374e1bf
@ -81,6 +81,9 @@ selector:
|
||||
exclude_with_any_tags:
|
||||
- assumes_against_mongod_not_mongos
|
||||
- assumes_standalone_mongod
|
||||
# Tests tagged auth_incompatible use internal commands (e.g. $_internalListCollections) that
|
||||
# require the __system role, which is not available to the test user in this suite.
|
||||
- auth_incompatible
|
||||
##
|
||||
# The next tag corresponds to the special error thrown by the set_read_preference_secondary.js
|
||||
# override when it refuses to replace the readPreference of a particular command. Above each tag
|
||||
|
||||
@ -19,8 +19,6 @@ assert.commandWorked(db.createCollection(targetName, {validationLevel: "moderate
|
||||
const targetOptionsResponse = assert.commandWorked(db.runCommand({listCollections: 1, filter: {"name": targetName}}));
|
||||
const targetOptionsResults = new DBCommandCursor(db, targetOptionsResponse).toArray();
|
||||
assert.eq(targetOptionsResults.length, 1, targetOptionsResults);
|
||||
// Make test compatible regardless of whether 'recordIdsReplicated' is true.
|
||||
delete targetOptionsResults[0].options.recordIdsReplicated;
|
||||
assert.eq({validationLevel: "moderate"}, targetOptionsResults[0].options, targetOptionsResults[0]);
|
||||
|
||||
// Run $out pipeline.
|
||||
@ -34,6 +32,4 @@ const targetOptionsResponseNew = assert.commandWorked(
|
||||
);
|
||||
const targetOptionsResultsNew = new DBCommandCursor(db, targetOptionsResponseNew).toArray();
|
||||
assert.eq(targetOptionsResultsNew.length, 1, targetOptionsResultsNew);
|
||||
// Make test compatible regardless of whether 'recordIdsReplicated' is true.
|
||||
delete targetOptionsResultsNew[0].options.recordIdsReplicated;
|
||||
assert.eq({validationLevel: "moderate"}, targetOptionsResultsNew[0].options, targetOptionsResultsNew[0]);
|
||||
|
||||
@ -46,6 +46,16 @@ function removePrimaryField(listOfCollections) {
|
||||
return listResult;
|
||||
}
|
||||
|
||||
function removeRecordIdsReplicatedField(listOfCollections) {
|
||||
return listOfCollections.map((entry) => {
|
||||
const {
|
||||
info: {recordIdsReplicated: _, ...infoWithoutRecordIdsReplicated},
|
||||
...entryWithoutInfo
|
||||
} = entry;
|
||||
return {info: infoWithoutRecordIdsReplicated, ...entryWithoutInfo};
|
||||
});
|
||||
}
|
||||
|
||||
// TODO SERVER-120014: Remove once 9.0 becomes last LTS and all timeseries collections are viewless.
|
||||
function getBucketCollections(listOfCollections) {
|
||||
return listOfCollections.filter((collEntry) => collEntry["ns"].includes(".system.buckets."));
|
||||
@ -112,6 +122,12 @@ function compareInternalListCollectionsStageAgainstListCollections(dbTest, expec
|
||||
internalStageResponseAgainstDbTest = removeUuidField(internalStageResponseAgainstDbTest);
|
||||
}
|
||||
internalStageResponseAgainstDbTest = removePrimaryField(internalStageResponseAgainstDbTest);
|
||||
if (FixtureHelpers.isMongos(dbTest)) {
|
||||
// The router scrubs 'recordIdsReplicated' from listCollections responses because it is an
|
||||
// internal field that can be inconsistent across shards. $_internalListCollections returns
|
||||
// it unconditionally, so remove it before comparing.
|
||||
internalStageResponseAgainstDbTest = removeRecordIdsReplicatedField(internalStageResponseAgainstDbTest);
|
||||
}
|
||||
|
||||
const internalListCollectionsBuckets = getBucketCollections(internalStageResponseAgainstDbTest);
|
||||
internalStageResponseAgainstDbTest = normalizeTimeseriesCollectionFormat(internalStageResponseAgainstDbTest);
|
||||
@ -136,12 +152,22 @@ function compareInternalListCollectionsStageAgainstListCollections(dbTest, expec
|
||||
stageResponseAgainstAdminDb = removeUuidField(stageResponseAgainstAdminDb);
|
||||
}
|
||||
stageResponseAgainstAdminDb = removePrimaryField(stageResponseAgainstAdminDb);
|
||||
if (FixtureHelpers.isMongos(dbTest)) {
|
||||
// The router scrubs 'recordIdsReplicated' from listCollections responses because it is an
|
||||
// internal field that can be inconsistent across shards. $_internalListCollections returns
|
||||
// it unconditionally, so remove it before comparing.
|
||||
stageResponseAgainstAdminDb = removeRecordIdsReplicatedField(stageResponseAgainstAdminDb);
|
||||
}
|
||||
stageResponseAgainstAdminDb = normalizeTimeseriesCollectionFormat(stageResponseAgainstAdminDb);
|
||||
|
||||
listCollectionsResponse.forEach((entry) => {
|
||||
assert.contains(
|
||||
entry,
|
||||
stageResponseAgainstAdminDb,
|
||||
// Use bsonUnorderedFieldsCompare rather than assert.contains so that two semantically
|
||||
// equivalent documents with different field orderings are treated as equal.
|
||||
const found = stageResponseAgainstAdminDb.some(
|
||||
(stageEntry) => bsonUnorderedFieldsCompare(entry, stageEntry) === 0,
|
||||
);
|
||||
assert(
|
||||
found,
|
||||
"The listCollections entry " +
|
||||
tojson(entry) +
|
||||
" hasn't been found on the $_internalListCollections output " +
|
||||
|
||||
@ -59,6 +59,13 @@ function verify(listCollectionEntry, stageEntry, specs) {
|
||||
delete listCollectionEntry.info.configDebugDump;
|
||||
delete stageEntry.info.configDebugDump;
|
||||
}
|
||||
|
||||
// The router scrubs 'recordIdsReplicated' from listCollections responses because it is an
|
||||
// internal field that can be inconsistent across shards. $listClusterCatalog returns it
|
||||
// unconditionally, so remove it from the stage entry before comparing.
|
||||
if (FixtureHelpers.isMongos(db)) {
|
||||
delete stageEntry.info.recordIdsReplicated;
|
||||
}
|
||||
}
|
||||
|
||||
function errMsgWithListCollectionEntry(reason) {
|
||||
|
||||
@ -10,8 +10,6 @@
|
||||
*/
|
||||
|
||||
import {ClusteredCollectionUtil} from "jstests/libs/clustered_collections/clustered_collection_util.js";
|
||||
import {assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
|
||||
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
||||
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
|
||||
|
||||
const mydb = db.getSiblingDB("list_collections_filter");
|
||||
@ -24,20 +22,6 @@ const defaultCollectionOptionsFilter = ClusteredCollectionUtil.areAllCollections
|
||||
? {"options.clusteredIndex.unique": true}
|
||||
: {options: {}};
|
||||
|
||||
if (FeatureFlagUtil.isPresentAndEnabled(db, "RecordIdsReplicated")) {
|
||||
// Replicated recordIds are enabled, but may not be the default for user collections. Create a
|
||||
// test collection to check if 'replicatedRecordIds' is set by default.
|
||||
const checkReplRidCollName = "checkReplRidColl";
|
||||
assert.commandWorked(mydb.createCollection(checkReplRidCollName));
|
||||
const collMetadata = mydb
|
||||
.getCollectionInfos()
|
||||
.find((collectionMetadata) => collectionMetadata.name === checkReplRidCollName);
|
||||
if (collMetadata.options.recordIdsReplicated) {
|
||||
defaultCollectionOptionsFilter.options.recordIdsReplicated = true;
|
||||
}
|
||||
assertDropCollection(mydb, checkReplRidCollName);
|
||||
}
|
||||
|
||||
// Make some collections.
|
||||
assert.commandWorked(mydb.createCollection("lists"));
|
||||
assert.commandWorked(mydb.createCollection("ordered_sets"));
|
||||
|
||||
@ -38,12 +38,6 @@ let commandObj = {
|
||||
collectionOptions: {uuid: optionsArray[0].info.uuid},
|
||||
};
|
||||
|
||||
const recordIdsReplicatedByDefault = optionsArray[0].options.recordIdsReplicated;
|
||||
if (recordIdsReplicatedByDefault) {
|
||||
// RecordIds are replicated by default - make the collectionOptions reflect that.
|
||||
commandObj.collectionOptions.recordIdsReplicated = optionsArray[0].options.recordIdsReplicated;
|
||||
}
|
||||
|
||||
// Destination has an extra index.
|
||||
assert.commandFailedWithCode(adminDB.runCommand(commandObj), ErrorCodes.CommandFailed);
|
||||
|
||||
@ -75,8 +69,5 @@ commandObj.collectionOptions = {
|
||||
size: 256,
|
||||
max: 2,
|
||||
};
|
||||
if (recordIdsReplicatedByDefault) {
|
||||
commandObj.collectionOptions.recordIdsReplicated = optionsArray[0].options.recordIdsReplicated;
|
||||
}
|
||||
|
||||
assert.commandWorked(adminDB.runCommand(commandObj));
|
||||
|
||||
@ -13,9 +13,15 @@
|
||||
* # TODO (SERVER-118693): Check if this tag can be removed.
|
||||
* # Unsetting 'recordIdsReplicated' option with the collMod command can create a test-only race condition during initial sync.
|
||||
* incompatible_with_initial_sync,
|
||||
* # hasRecordIdsReplicated uses $_internalListCollections which requires the __system role.
|
||||
* auth_incompatible,
|
||||
* # hasRecordIdsReplicated uses $_internalListCollections which only supports local read concern.
|
||||
* assumes_read_concern_unchanged,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {hasRecordIdsReplicated} from "jstests/libs/collection_write_path/replicated_record_ids_utils.js";
|
||||
|
||||
const collName = "replRecIdCollForCollMod";
|
||||
const coll = db.getCollection(collName);
|
||||
coll.drop();
|
||||
@ -35,7 +41,7 @@ assert(
|
||||
);
|
||||
jsTestLog("Collection options after creation: " + tojson(collInfo));
|
||||
assert(
|
||||
collInfo.info.hasOwnProperty("recordIdsReplicated"),
|
||||
hasRecordIdsReplicated(db, collName),
|
||||
"collection options does not contain recordIdsReplicated flag after collection creation",
|
||||
);
|
||||
|
||||
@ -58,7 +64,7 @@ assert(
|
||||
);
|
||||
jsTestLog("Collection options after collMod: " + tojson(collInfo));
|
||||
assert(
|
||||
!collInfo.info.hasOwnProperty("recordIdsReplicated"),
|
||||
!hasRecordIdsReplicated(db, collName),
|
||||
"collMod failed to remove recordIdsReplicated flag from collection options",
|
||||
);
|
||||
|
||||
|
||||
@ -6,9 +6,15 @@
|
||||
* assumes_no_implicit_collection_creation_after_drop,
|
||||
* expects_explicit_underscore_id_index,
|
||||
* featureFlagRecordIdsReplicated,
|
||||
* # hasRecordIdsReplicated uses $_internalListCollections which requires the __system role.
|
||||
* auth_incompatible,
|
||||
* # hasRecordIdsReplicated uses $_internalListCollections which only supports local read concern.
|
||||
* assumes_read_concern_unchanged,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {hasRecordIdsReplicated} from "jstests/libs/collection_write_path/replicated_record_ids_utils.js";
|
||||
|
||||
const collName = "replRecIdCollForCreate";
|
||||
const coll = db.getCollection(collName);
|
||||
coll.drop();
|
||||
@ -17,10 +23,9 @@ coll.drop();
|
||||
// implicitly.
|
||||
const clusteredCollName = collName + "_clustered";
|
||||
assert.commandWorked(db.createCollection(clusteredCollName, {clusteredIndex: {key: {_id: 1}, unique: true}}));
|
||||
const clusteredCollInfo = db[clusteredCollName].exists();
|
||||
assert(
|
||||
!clusteredCollInfo.info.hasOwnProperty("recordIdsReplicated"),
|
||||
"clustered collection created with recordIdsReplicated collection option: " + tojson(clusteredCollInfo),
|
||||
!hasRecordIdsReplicated(db, clusteredCollName),
|
||||
"clustered collection created with recordIdsReplicated collection option",
|
||||
);
|
||||
|
||||
// Create a collection with the param set.
|
||||
@ -39,6 +44,6 @@ assert(
|
||||
);
|
||||
jsTestLog("Collection options after creation: " + tojson(collInfo));
|
||||
assert(
|
||||
collInfo.info.hasOwnProperty("recordIdsReplicated"),
|
||||
hasRecordIdsReplicated(db, collName),
|
||||
"collection options does not contain recordIdsReplicated flag after collection creation",
|
||||
);
|
||||
|
||||
@ -2,6 +2,23 @@
|
||||
// documents, otherwise.
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
|
||||
/**
|
||||
* Returns true if the collection named 'collName' in 'db' has the 'recordIdsReplicated' field
|
||||
* set in its catalog entry, false otherwise.
|
||||
*
|
||||
* Uses $_internalListCollections (info.recordIdsReplicated) rather than listCollections because
|
||||
* listCollections scrubs 'recordIdsReplicated' from the 'info' sub-document on mongos routers.
|
||||
*
|
||||
* NOTE: $_internalListCollections requires the __system role and only supports local read concern.
|
||||
* Tests that call this function must be tagged 'auth_incompatible' and
|
||||
* 'assumes_read_concern_unchanged'.
|
||||
*/
|
||||
export function hasRecordIdsReplicated(db, collName) {
|
||||
const ns = db.getName() + "." + collName;
|
||||
const results = db.aggregate([{$_internalListCollections: {}}, {$match: {ns: ns}}]).toArray();
|
||||
return results.length > 0 && !!results[0].info && !!results[0].info.recordIdsReplicated;
|
||||
}
|
||||
|
||||
export function getShowRecordIdsCursor(node, dbName, replicatedCollName) {
|
||||
return node
|
||||
.getDB(dbName)
|
||||
|
||||
@ -0,0 +1,44 @@
|
||||
/**
|
||||
* Tests that 'recordIdsReplicated' is scrubbed from the 'info' sub-document of each collection
|
||||
* entry returned by listCollections when run via the router (mongos). The field is internal and
|
||||
* can be inconsistent across shards, so it must not be exposed to clients.
|
||||
*
|
||||
* @tags: [
|
||||
* featureFlagRecordIdsReplicated,
|
||||
* expects_explicit_underscore_id_index,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
const st = new ShardingTest({mongos: 1, shards: 1});
|
||||
const dbName = "test";
|
||||
const collName = jsTestName();
|
||||
|
||||
const mongos = st.s0;
|
||||
const shard0 = st.shard0;
|
||||
|
||||
assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: shard0.shardName}));
|
||||
|
||||
const testDB = mongos.getDB(dbName);
|
||||
assert.commandWorked(testDB.createCollection(collName));
|
||||
|
||||
// Verify the field is present when querying the shard directly.
|
||||
const shardCollInfos = shard0.getDB(dbName).getCollectionInfos({name: collName});
|
||||
assert.eq(1, shardCollInfos.length, "expected to find collection on shard: " + tojson(shardCollInfos));
|
||||
const shardCollInfo = shardCollInfos[0];
|
||||
assert(
|
||||
shardCollInfo.info.hasOwnProperty("recordIdsReplicated"),
|
||||
"expected 'recordIdsReplicated' to be present in shard listCollections response: " + tojson(shardCollInfo),
|
||||
);
|
||||
|
||||
// Verify the field is scrubbed when querying via the router (mongos).
|
||||
const routerCollInfos = testDB.getCollectionInfos({name: collName});
|
||||
assert.eq(1, routerCollInfos.length, "expected to find collection via router: " + tojson(routerCollInfos));
|
||||
const routerCollInfo = routerCollInfos[0];
|
||||
assert(
|
||||
!routerCollInfo.info.hasOwnProperty("recordIdsReplicated"),
|
||||
"expected 'recordIdsReplicated' to be absent in router listCollections response: " + tojson(routerCollInfo),
|
||||
);
|
||||
|
||||
st.stop();
|
||||
@ -48,7 +48,7 @@ function validateCollection(
|
||||
dbName,
|
||||
collName,
|
||||
shardKey,
|
||||
{expectedCollOpts, expectedIndexes, expectNoShardingMetadata, expectedCollInfos} = {},
|
||||
{expectedCollOpts, expectedIndexes, expectNoShardingMetadata} = {},
|
||||
) {
|
||||
const db = conn.getDB(dbName);
|
||||
const coll = db.getCollection(collName);
|
||||
@ -63,15 +63,6 @@ function validateCollection(
|
||||
}
|
||||
assert.eq(coll.countDocuments({}), maxCount);
|
||||
|
||||
if (expectedCollInfos) {
|
||||
jsTest.log("*** Checking expectedCollInfos " + tojson({listCollectionsDoc, expectedCollInfos}));
|
||||
for (let fieldName in expectedCollInfos) {
|
||||
const actual = getDottedField(listCollectionsDoc.info, fieldName);
|
||||
const expected = expectedCollInfos[fieldName];
|
||||
assert.eq(bsonUnorderedFieldsCompare(actual, expected), 0, {fieldName, actual, expected});
|
||||
}
|
||||
}
|
||||
|
||||
if (expectedIndexes) {
|
||||
const actualIndexes = coll.getIndexes();
|
||||
jsTest.log("*** Checking expectedIndexes " + tojson({actualIndexes, expectedIndexes}));
|
||||
@ -471,25 +462,6 @@ const testCases = [
|
||||
});
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "recordIdsReplicated",
|
||||
// TODO (SERVER-68173): Enable featureFlagRecordIdsReplicated.
|
||||
shouldSkip: (conn) => !FeatureFlagUtil.isEnabled(conn, "RecordIdsReplicated"),
|
||||
createCollection: (conn, dbName, collName) => {
|
||||
assert.commandWorked(conn.getDB(dbName).runCommand({create: collName}));
|
||||
return dbName + "." + collName;
|
||||
},
|
||||
insertDocuments: (conn, dbName, collName) => {
|
||||
insertDocuments(conn, dbName, collName);
|
||||
},
|
||||
validateCollection: (conn, dbName, collName, shardKey) => {
|
||||
validateCollection(conn, dbName, collName, shardKey, {
|
||||
expectedCollInfos: {
|
||||
"recordIdsReplicated": true,
|
||||
},
|
||||
});
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "expireAfterSeconds",
|
||||
createCollection: (conn, dbName, collName) => {
|
||||
|
||||
@ -98,6 +98,11 @@ function verify(expectedResult, result) {
|
||||
result.shards.sort();
|
||||
}
|
||||
|
||||
// The router scrubs 'recordIdsReplicated' from listCollections responses because it is an
|
||||
// internal field that can be inconsistent across shards. $listClusterCatalog returns it
|
||||
// unconditionally (via $_internalListCollections), so remove it before comparing.
|
||||
delete result.info.recordIdsReplicated;
|
||||
|
||||
// Ensure the resuls fields matches the expected once.
|
||||
assert.eq(expectedResult.options, result.options, "options field mismatch:" + tojson(result));
|
||||
assert.eq(expectedResult.info, result.info, "info field mismatch:" + tojson(result));
|
||||
|
||||
@ -5,10 +5,15 @@
|
||||
* featureFlagRecordIdsReplicated,
|
||||
* # Replicated record IDs are incompatible with clustered collections.
|
||||
* expects_explicit_underscore_id_index,
|
||||
* # hasRecordIdsReplicated uses $_internalListCollections which requires the __system role.
|
||||
* auth_incompatible,
|
||||
* # hasRecordIdsReplicated uses $_internalListCollections which only supports local read concern.
|
||||
* assumes_read_concern_unchanged,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {
|
||||
hasRecordIdsReplicated,
|
||||
getRidForDoc,
|
||||
mapFieldToMatchingDocRid,
|
||||
} from "jstests/libs/collection_write_path/replicated_record_ids_utils.js";
|
||||
@ -30,8 +35,7 @@ const coll = testDB[collName];
|
||||
assert.commandWorked(testDB.adminCommand({enableSharding: dbName, primaryShard: shard0.shardName}));
|
||||
assert.commandWorked(testDB.createCollection(collName));
|
||||
|
||||
let collInfo = coll.exists();
|
||||
assert(collInfo.info.recordIdsReplicated, tojson(collInfo));
|
||||
assert(hasRecordIdsReplicated(testDB, collName), "collection should have recordIdsReplicated set");
|
||||
|
||||
// Remove some of the initial documents on the collection with replicated record
|
||||
// IDs to create gaps in the record IDs.
|
||||
@ -106,11 +110,15 @@ assert.eq(
|
||||
|
||||
// Ensure that 'recordIdsReplicated` collection option is still present and set to true.
|
||||
// Check collection options on shard.
|
||||
collInfo = shard1.getCollection(collNS).exists();
|
||||
assert(collInfo.info.recordIdsReplicated, tojson(collInfo));
|
||||
assert(
|
||||
hasRecordIdsReplicated(shard1.getDB(dbName), collName),
|
||||
"shard should still have recordIdsReplicated set after moveCollection",
|
||||
);
|
||||
|
||||
// Check collection options through mongos.
|
||||
collInfo = mongos.getCollection(collNS).exists();
|
||||
assert(collInfo.info.recordIdsReplicated, tojson(collInfo));
|
||||
assert(
|
||||
hasRecordIdsReplicated(mongos.getDB(dbName), collName),
|
||||
"mongos should still see recordIdsReplicated set after moveCollection",
|
||||
);
|
||||
|
||||
st.stop();
|
||||
|
||||
@ -85,6 +85,27 @@ namespace {
|
||||
|
||||
constexpr auto systemBucketsDot = "system.buckets."_sd;
|
||||
|
||||
/**
|
||||
* Removes 'recordIdsReplicated' from the 'info' sub-document of a single listCollections result
|
||||
* entry. The field is internal and can be inconsistent across shards, so it must not be exposed
|
||||
* via the router.
|
||||
*/
|
||||
static BSONObj scrubRecordIdsReplicatedFromCollectionEntry(BSONObj entry) {
|
||||
auto infoElem = entry["info"];
|
||||
if (infoElem.eoo() || infoElem.type() != BSONType::object) {
|
||||
return entry;
|
||||
}
|
||||
BSONObjBuilder entryBuilder;
|
||||
for (auto&& field : entry) {
|
||||
if (field.fieldNameStringData() == "info"_sd) {
|
||||
entryBuilder.append("info"_sd, infoElem.Obj().removeField("recordIdsReplicated"_sd));
|
||||
} else {
|
||||
entryBuilder.append(field);
|
||||
}
|
||||
}
|
||||
return entryBuilder.obj();
|
||||
}
|
||||
|
||||
BSONObj rewriteCommandForListingOwnCollections(OperationContext* opCtx,
|
||||
const DatabaseName& dbName,
|
||||
const BSONObj& cmdObj) {
|
||||
@ -278,7 +299,8 @@ public:
|
||||
nss,
|
||||
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
|
||||
Grid::get(opCtx)->getCursorManager(),
|
||||
privileges));
|
||||
privileges,
|
||||
scrubRecordIdsReplicatedFromCollectionEntry));
|
||||
|
||||
CommandHelpers::filterCommandReplyForPassthrough(transformedResponse, &output);
|
||||
uassertStatusOK(getStatusFromCommandResult(output.asTempObj()));
|
||||
|
||||
@ -75,6 +75,7 @@ mongo_cc_library(
|
||||
"router_stage_queued_data.cpp",
|
||||
"router_stage_remove_metadata_fields.cpp",
|
||||
"router_stage_skip.cpp",
|
||||
"router_stage_transform.cpp",
|
||||
],
|
||||
deps = [
|
||||
":async_results_merger",
|
||||
@ -155,6 +156,7 @@ mongo_cc_unit_test(
|
||||
"router_stage_limit_test.cpp",
|
||||
"router_stage_remove_metadata_fields_test.cpp",
|
||||
"router_stage_skip_test.cpp",
|
||||
"router_stage_transform_test.cpp",
|
||||
"shard_tag_test.cpp",
|
||||
"store_possible_cursor_test.cpp",
|
||||
],
|
||||
|
||||
50
src/mongo/s/query/exec/router_stage_transform.cpp
Normal file
50
src/mongo/s/query/exec/router_stage_transform.cpp
Normal file
@ -0,0 +1,50 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/s/query/exec/router_stage_transform.h"
|
||||
|
||||
#include <utility>
|
||||
|
||||
namespace mongo {
|
||||
|
||||
RouterStageTransform::RouterStageTransform(OperationContext* opCtx,
|
||||
std::unique_ptr<RouterExecStage> child,
|
||||
TransformFn transform)
|
||||
: RouterExecStage(opCtx, std::move(child)), _transform(std::move(transform)) {}
|
||||
|
||||
StatusWith<ClusterQueryResult> RouterStageTransform::next() {
|
||||
auto childResult = getChildStage()->next();
|
||||
if (!childResult.isOK() || childResult.getValue().isEOF()) {
|
||||
return childResult;
|
||||
}
|
||||
return ClusterQueryResult(_transform(*childResult.getValue().getResult()),
|
||||
childResult.getValue().getShardId());
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
62
src/mongo/s/query/exec/router_stage_transform.h
Normal file
62
src/mongo/s/query/exec/router_stage_transform.h
Normal file
@ -0,0 +1,62 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/base/status_with.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/s/query/exec/cluster_query_result.h"
|
||||
#include "mongo/s/query/exec/router_exec_stage.h"
|
||||
#include "mongo/util/modules.h"
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* A router execution stage that applies a caller-supplied per-document transformation as results
|
||||
* flow through the cursor pipeline. The transform is applied to every document returned by
|
||||
* next(), including documents from getMore batches, without buffering the full result set.
|
||||
*/
|
||||
class RouterStageTransform final : public RouterExecStage {
|
||||
public:
|
||||
using TransformFn = std::function<BSONObj(BSONObj)>;
|
||||
|
||||
RouterStageTransform(OperationContext* opCtx,
|
||||
std::unique_ptr<RouterExecStage> child,
|
||||
TransformFn transform);
|
||||
|
||||
StatusWith<ClusterQueryResult> next() final;
|
||||
|
||||
private:
|
||||
TransformFn _transform;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
235
src/mongo/s/query/exec/router_stage_transform_test.cpp
Normal file
235
src/mongo/s/query/exec/router_stage_transform_test.cpp
Normal file
@ -0,0 +1,235 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/s/query/exec/router_stage_transform.h"
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/s/query/exec/router_stage_mock.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/duration.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
// Router stages do not use their OperationContext in these tests.
|
||||
OperationContext* opCtx = nullptr;
|
||||
|
||||
// A simple transform that adds a field "transformed: true" to every document.
|
||||
BSONObj addTransformedField(BSONObj doc) {
|
||||
return doc.addField(BSON("transformed" << true).firstElement());
|
||||
}
|
||||
|
||||
// A transform that removes the "secret" field from every document.
|
||||
BSONObj removeSecretField(BSONObj doc) {
|
||||
return doc.removeField("secret");
|
||||
}
|
||||
|
||||
TEST(RouterStageTransformTest, AppliesTransformToEachDocument) {
|
||||
auto mockStage = std::make_unique<RouterStageMock>(opCtx);
|
||||
mockStage->queueResult(BSON("a" << 1));
|
||||
mockStage->queueResult(BSON("a" << 2 << "b" << 3));
|
||||
mockStage->queueResult(BSON("x" << "hello"));
|
||||
|
||||
auto stage =
|
||||
std::make_unique<RouterStageTransform>(opCtx, std::move(mockStage), addTransformedField);
|
||||
|
||||
auto first = stage->next();
|
||||
ASSERT_OK(first.getStatus());
|
||||
ASSERT(first.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1 << "transformed" << true));
|
||||
|
||||
auto second = stage->next();
|
||||
ASSERT_OK(second.getStatus());
|
||||
ASSERT(second.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*second.getValue().getResult(),
|
||||
BSON("a" << 2 << "b" << 3 << "transformed" << true));
|
||||
|
||||
auto third = stage->next();
|
||||
ASSERT_OK(third.getStatus());
|
||||
ASSERT(third.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*third.getValue().getResult(), BSON("x" << "hello" << "transformed" << true));
|
||||
|
||||
auto eof = stage->next();
|
||||
ASSERT_OK(eof.getStatus());
|
||||
ASSERT(eof.getValue().isEOF());
|
||||
}
|
||||
|
||||
TEST(RouterStageTransformTest, TransformCanRemoveFields) {
|
||||
auto mockStage = std::make_unique<RouterStageMock>(opCtx);
|
||||
mockStage->queueResult(BSON("a" << 1 << "secret" << 42));
|
||||
mockStage->queueResult(BSON("b" << 2));
|
||||
mockStage->queueResult(BSON("secret" << 99 << "c" << 3));
|
||||
|
||||
auto stage =
|
||||
std::make_unique<RouterStageTransform>(opCtx, std::move(mockStage), removeSecretField);
|
||||
|
||||
auto first = stage->next();
|
||||
ASSERT_OK(first.getStatus());
|
||||
ASSERT(first.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1));
|
||||
|
||||
auto second = stage->next();
|
||||
ASSERT_OK(second.getStatus());
|
||||
ASSERT(second.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*second.getValue().getResult(), BSON("b" << 2));
|
||||
|
||||
auto third = stage->next();
|
||||
ASSERT_OK(third.getStatus());
|
||||
ASSERT(third.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*third.getValue().getResult(), BSON("c" << 3));
|
||||
|
||||
auto eof = stage->next();
|
||||
ASSERT_OK(eof.getStatus());
|
||||
ASSERT(eof.getValue().isEOF());
|
||||
}
|
||||
|
||||
TEST(RouterStageTransformTest, PropagatesError) {
|
||||
auto mockStage = std::make_unique<RouterStageMock>(opCtx);
|
||||
mockStage->queueResult(BSON("a" << 1));
|
||||
mockStage->queueError(Status(ErrorCodes::BadValue, "something went wrong"));
|
||||
|
||||
auto stage =
|
||||
std::make_unique<RouterStageTransform>(opCtx, std::move(mockStage), addTransformedField);
|
||||
|
||||
auto first = stage->next();
|
||||
ASSERT_OK(first.getStatus());
|
||||
ASSERT(first.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1 << "transformed" << true));
|
||||
|
||||
auto second = stage->next();
|
||||
ASSERT_NOT_OK(second.getStatus());
|
||||
ASSERT_EQ(second.getStatus(), ErrorCodes::BadValue);
|
||||
ASSERT_EQ(second.getStatus().reason(), "something went wrong");
|
||||
}
|
||||
|
||||
TEST(RouterStageTransformTest, PropagatesEOF) {
|
||||
auto mockStage = std::make_unique<RouterStageMock>(opCtx);
|
||||
mockStage->queueResult(BSON("a" << 1));
|
||||
mockStage->queueEOF();
|
||||
// A result queued after EOF should not be reached.
|
||||
mockStage->queueResult(BSON("a" << 2));
|
||||
|
||||
auto stage =
|
||||
std::make_unique<RouterStageTransform>(opCtx, std::move(mockStage), addTransformedField);
|
||||
|
||||
auto first = stage->next();
|
||||
ASSERT_OK(first.getStatus());
|
||||
ASSERT(first.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1 << "transformed" << true));
|
||||
|
||||
auto eof = stage->next();
|
||||
ASSERT_OK(eof.getStatus());
|
||||
ASSERT(eof.getValue().isEOF());
|
||||
}
|
||||
|
||||
TEST(RouterStageTransformTest, RemotesExhausted) {
|
||||
auto mockStage = std::make_unique<RouterStageMock>(opCtx);
|
||||
mockStage->queueResult(BSON("a" << 1));
|
||||
mockStage->markRemotesExhausted();
|
||||
|
||||
auto stage =
|
||||
std::make_unique<RouterStageTransform>(opCtx, std::move(mockStage), addTransformedField);
|
||||
ASSERT_TRUE(stage->remotesExhausted());
|
||||
|
||||
auto first = stage->next();
|
||||
ASSERT_OK(first.getStatus());
|
||||
ASSERT(first.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*first.getValue().getResult(), BSON("a" << 1 << "transformed" << true));
|
||||
ASSERT_TRUE(stage->remotesExhausted());
|
||||
|
||||
auto eof = stage->next();
|
||||
ASSERT_OK(eof.getStatus());
|
||||
ASSERT(eof.getValue().isEOF());
|
||||
ASSERT_TRUE(stage->remotesExhausted());
|
||||
}
|
||||
|
||||
TEST(RouterStageTransformTest, ForwardsAwaitDataTimeout) {
|
||||
auto mockStage = std::make_unique<RouterStageMock>(opCtx);
|
||||
auto* mockStagePtr = mockStage.get();
|
||||
ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
|
||||
|
||||
auto stage =
|
||||
std::make_unique<RouterStageTransform>(opCtx, std::move(mockStage), addTransformedField);
|
||||
ASSERT_OK(stage->setAwaitDataTimeout(Milliseconds(500)));
|
||||
|
||||
auto timeout = mockStagePtr->getAwaitDataTimeout();
|
||||
ASSERT_OK(timeout.getStatus());
|
||||
ASSERT_EQ(500, durationCount<Milliseconds>(timeout.getValue()));
|
||||
}
|
||||
|
||||
TEST(RouterStageTransformTest, TransformIsAppliedToNestedSubDocumentField) {
|
||||
// Simulates the recordIdsReplicated scrub use case: remove a field from a nested sub-document.
|
||||
auto removeNestedField = [](BSONObj doc) -> BSONObj {
|
||||
auto infoElem = doc["info"];
|
||||
if (infoElem.eoo() || infoElem.type() != BSONType::object) {
|
||||
return doc;
|
||||
}
|
||||
BSONObjBuilder builder;
|
||||
for (auto&& field : doc) {
|
||||
if (field.fieldNameStringData() == "info"_sd) {
|
||||
builder.append("info"_sd, infoElem.Obj().removeField("internal"_sd));
|
||||
} else {
|
||||
builder.append(field);
|
||||
}
|
||||
}
|
||||
return builder.obj();
|
||||
};
|
||||
|
||||
auto mockStage = std::make_unique<RouterStageMock>(opCtx);
|
||||
mockStage->queueResult(
|
||||
BSON("name" << "coll1" << "info" << BSON("uuid" << "abc" << "internal" << true)));
|
||||
mockStage->queueResult(BSON("name" << "coll2" << "info" << BSON("uuid" << "def")));
|
||||
|
||||
auto stage =
|
||||
std::make_unique<RouterStageTransform>(opCtx, std::move(mockStage), removeNestedField);
|
||||
|
||||
auto first = stage->next();
|
||||
ASSERT_OK(first.getStatus());
|
||||
ASSERT(first.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*first.getValue().getResult(),
|
||||
BSON("name" << "coll1" << "info" << BSON("uuid" << "abc")));
|
||||
|
||||
auto second = stage->next();
|
||||
ASSERT_OK(second.getStatus());
|
||||
ASSERT(second.getValue().getResult());
|
||||
ASSERT_BSONOBJ_EQ(*second.getValue().getResult(),
|
||||
BSON("name" << "coll2" << "info" << BSON("uuid" << "def")));
|
||||
|
||||
auto eof = stage->next();
|
||||
ASSERT_OK(eof.getStatus());
|
||||
ASSERT(eof.getValue().isEOF());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
@ -51,10 +51,13 @@
|
||||
#include "mongo/s/query/exec/cluster_client_cursor_params.h"
|
||||
#include "mongo/s/query/exec/cluster_cursor_manager.h"
|
||||
#include "mongo/s/query/exec/collect_query_stats_mongos.h"
|
||||
#include "mongo/s/query/exec/router_stage_merge.h"
|
||||
#include "mongo/s/query/exec/router_stage_transform.h"
|
||||
#include "mongo/s/transaction_router.h"
|
||||
#include "mongo/util/decorable.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@ -210,4 +213,122 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
|
||||
return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
|
||||
}
|
||||
|
||||
StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
|
||||
const ShardId& shardId,
|
||||
const HostAndPort& server,
|
||||
const BSONObj& cmdResult,
|
||||
const NamespaceString& requestedNss,
|
||||
std::shared_ptr<executor::TaskExecutor> executor,
|
||||
ClusterCursorManager* cursorManager,
|
||||
PrivilegeVector privileges,
|
||||
std::function<BSONObj(BSONObj)> documentTransform,
|
||||
TailableModeEnum tailableMode,
|
||||
boost::optional<BSONObj> routerSort) {
|
||||
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
|
||||
return cmdResult;
|
||||
}
|
||||
|
||||
auto incomingCursorResponse = CursorResponse::parseFromBSON(cmdResult);
|
||||
if (!incomingCursorResponse.isOK()) {
|
||||
return incomingCursorResponse.getStatus();
|
||||
}
|
||||
|
||||
auto& response = incomingCursorResponse.getValue();
|
||||
if (const auto& cursorMetrics = response.getCursorMetrics()) {
|
||||
CurOp::get(opCtx)->debug().getAdditiveMetrics().aggregateCursorMetrics(*cursorMetrics);
|
||||
}
|
||||
|
||||
auto&& opDebug = CurOp::get(opCtx)->debug();
|
||||
opDebug.getAdditiveMetrics().nBatches = 1;
|
||||
opDebug.nShards = std::max(opDebug.nShards, 1);
|
||||
CurOp::get(opCtx)->setEndOfOpMetrics(response.getBatch().size());
|
||||
|
||||
// Apply the transform to the first batch, which comes directly from the shard response.
|
||||
std::vector<BSONObj> transformedFirstBatch;
|
||||
transformedFirstBatch.reserve(response.getBatch().size());
|
||||
for (const auto& doc : response.getBatch()) {
|
||||
transformedFirstBatch.push_back(documentTransform(doc));
|
||||
}
|
||||
|
||||
if (response.getCursorId() == CursorId(0)) {
|
||||
// Cursor exhausted in the first batch, no proxy cursor is needed.
|
||||
opDebug.cursorExhausted = true;
|
||||
collectQueryStatsMongos(opCtx, std::move(opDebug.getQueryStatsInfo().key));
|
||||
CursorResponse exhaustedResponse(requestedNss,
|
||||
CursorId(0),
|
||||
std::move(transformedFirstBatch),
|
||||
response.getAtClusterTime(),
|
||||
response.getPostBatchResumeToken());
|
||||
return exhaustedResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
|
||||
}
|
||||
|
||||
// Build the cursor params pointing at the shard's open cursor.
|
||||
ClusterClientCursorParams params(response.getNSS(),
|
||||
APIParameters::get(opCtx),
|
||||
boost::none /* ReadPreferenceSetting */,
|
||||
repl::ReadConcernArgs::get(opCtx),
|
||||
[&] {
|
||||
if (!opCtx->getLogicalSessionId())
|
||||
return OperationSessionInfoFromClient();
|
||||
OperationSessionInfoFromClient osi{
|
||||
*opCtx->getLogicalSessionId(), opCtx->getTxnNumber()};
|
||||
if (TransactionRouter::get(opCtx)) {
|
||||
osi.setAutocommit(false);
|
||||
}
|
||||
return osi;
|
||||
}());
|
||||
params.remotes.emplace_back();
|
||||
auto& remoteCursor = params.remotes.back();
|
||||
remoteCursor.setShardId(shardId.toString());
|
||||
remoteCursor.setHostAndPort(server);
|
||||
remoteCursor.setCursorResponse(CursorResponse(response.getNSS(),
|
||||
response.getCursorId(),
|
||||
{} /* first batch served above */,
|
||||
response.getAtClusterTime(),
|
||||
response.getPostBatchResumeToken()));
|
||||
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
|
||||
params.tailableMode = tailableMode;
|
||||
params.originatingPrivileges = std::move(privileges);
|
||||
if (routerSort) {
|
||||
params.sortToApplyOnRouter = *routerSort;
|
||||
}
|
||||
params.requestQueryStatsFromRemotes = response.getCursorMetrics().has_value();
|
||||
|
||||
// Build the execution plan: RouterStageTransform wraps RouterStageMerge so the
|
||||
// documentTransform is applied to every document in every batch, including getMore batches,
|
||||
// without buffering the full result set in mongos memory.
|
||||
//
|
||||
// extractARMParams() moves 'remotes' out of params. The remaining fields in params are still
|
||||
// needed by ClusterClientCursorImpl for auth, session, and metrics bookkeeping.
|
||||
auto armParams = params.extractARMParams();
|
||||
auto mergeStage = std::make_unique<RouterStageMerge>(opCtx, executor, std::move(armParams));
|
||||
auto transformStage = std::make_unique<RouterStageTransform>(
|
||||
opCtx, std::move(mergeStage), std::move(documentTransform));
|
||||
|
||||
auto ccc = ClusterClientCursorImpl::make(opCtx, std::move(transformStage), std::move(params));
|
||||
collectQueryStatsMongos(opCtx, ccc);
|
||||
ccc->detachFromOperationContext();
|
||||
|
||||
auto authUser = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName();
|
||||
auto clusterCursorId =
|
||||
cursorManager->registerCursor(opCtx,
|
||||
ccc.releaseCursor(),
|
||||
requestedNss,
|
||||
ClusterCursorManager::CursorType::SingleTarget,
|
||||
ClusterCursorManager::CursorLifetime::Mortal,
|
||||
authUser);
|
||||
if (!clusterCursorId.isOK()) {
|
||||
return clusterCursorId.getStatus();
|
||||
}
|
||||
|
||||
opDebug.cursorid = clusterCursorId.getValue();
|
||||
|
||||
CursorResponse outgoingCursorResponse(requestedNss,
|
||||
clusterCursorId.getValue(),
|
||||
std::move(transformedFirstBatch),
|
||||
response.getAtClusterTime(),
|
||||
response.getPostBatchResumeToken());
|
||||
return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -42,6 +42,7 @@
|
||||
#include "mongo/util/modules.h"
|
||||
#include "mongo/util/net/hostandport.h"
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
#include <boost/none.hpp>
|
||||
@ -127,4 +128,24 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
|
||||
PrivilegeVector privileges,
|
||||
TailableModeEnum tailableMode);
|
||||
|
||||
/**
|
||||
* Overload of storePossibleCursor(), but applies 'documentTransform' to every result document, both
|
||||
* in the initial first batch and in all subsequent getMore batches. The transform is injected into
|
||||
* the cursor's execution plan via RouterStageTransform.
|
||||
*
|
||||
* Use this when a command-level response transformation must be applied uniformly across all
|
||||
* cursor batches (e.g., stripping internal fields before sending response to clients).
|
||||
*/
|
||||
StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
|
||||
const ShardId& shardId,
|
||||
const HostAndPort& server,
|
||||
const BSONObj& cmdResult,
|
||||
const NamespaceString& requestedNss,
|
||||
std::shared_ptr<executor::TaskExecutor> executor,
|
||||
ClusterCursorManager* cursorManager,
|
||||
PrivilegeVector privileges,
|
||||
std::function<BSONObj(BSONObj)> documentTransform,
|
||||
TailableModeEnum tailableMode = TailableModeEnum::kNormal,
|
||||
boost::optional<BSONObj> routerSort = boost::none);
|
||||
|
||||
} // namespace MONGO_MOD_PUBLIC mongo
|
||||
|
||||
Loading…
Reference in New Issue
Block a user