SERVER-97456 releaseMemory command for cluster (#32784)

GitOrigin-RevId: 88441b4eb3aa98b980ccfce6e627832285980423
This commit is contained in:
fotiniAlvanaki 2025-03-12 14:43:03 +00:00 committed by MongoDB Bot
parent b5dccd29d9
commit 8d6b3ed049
40 changed files with 947 additions and 65 deletions

2
.github/CODEOWNERS vendored
View File

@ -1155,6 +1155,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
/jstests/sharding/query/**/stats.js @10gen/query-integration-observability @svc-auto-approve-bot
/jstests/sharding/query/**/unowned_doc_filtering.js @10gen/query-execution-router-query-exec @svc-auto-approve-bot
/jstests/sharding/query/**/update_delete_many_metrics.js @10gen/query-integration-observability @svc-auto-approve-bot
/jstests/sharding/query/**/release_memory* @10gen/query-execution @svc-auto-approve-bot
# The following patterns are parsed from ./jstests/sharding/query/agg/OWNERS.yml
/jstests/sharding/query/agg/**/* @10gen/query @svc-auto-approve-bot
@ -2373,6 +2374,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
/src/mongo/s/commands/query_cmd/**/cluster_index_filter_cmd.cpp @10gen/query-execution-query-settings @svc-auto-approve-bot
/src/mongo/s/commands/query_cmd/**/cluster_plan_cache_clear_cmd.cpp @10gen/query-optimization-plan-cache @svc-auto-approve-bot
/src/mongo/s/commands/query_cmd/**/cluster_profile_cmd.cpp @10gen/query-integration-observability @svc-auto-approve-bot
/src/mongo/s/commands/query_cmd/**/cluster_release_memory_cmd.cpp @10gen/query-execution @10gen/query-execution-client-cursor @svc-auto-approve-bot
# The following patterns are parsed from ./src/mongo/s/migration_blocking_operation/OWNERS.yml
/src/mongo/s/migration_blocking_operation/**/* @10gen/server-cluster-scalability @svc-auto-approve-bot

View File

@ -281,7 +281,7 @@ continuation-chaining member functions in [future.h][future], starting above the
At some point, we may have no more continuations to add to a future chain, and will want to either
synchronously extract the value or error held in the last future of the chain, or add a callback to
asynchronously consume this value. The `.get()` and `.getAsync()` members of future-like types
provide these facilities for terminating a future chain by extracting or asynchronouslyunsly
provide these facilities for terminating a future chain by extracting or asynchronously
consuming the result of the chain. The `.getAsync()` function works much like `.onCompletion()`,
taking a `Status` or `StatusWith<T>` and running regardless of whether or not the previous link in
the chain resolved with error or success, and running asynchronously when the previous results are

View File

@ -1,6 +1,5 @@
/*
* Run release memory auth test on a standalone mongod.
* TODO SERVER-97456 - add sharded test
* @tags: [
* requires_fcv_81
* ]

View File

@ -0,0 +1,14 @@
/*
* Run release memory auth test on a sharded cluster.
*
* @tags: [
* requires_sharding
* ]
*/
import {runTest} from "jstests/auth/release_memory_base.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const st =
new ShardingTest({shards: 1, mongos: 1, config: 1, other: {keyFile: 'jstests/libs/key1'}});
runTest(st.s0);
st.stop();

View File

@ -1,7 +1,6 @@
/**
* Test basic case for releaseMemory command
* @tags: [
* # TODO SERVER-97456 - this test should work with mongos when command is supported
* assumes_against_mongod_not_mongos,
* assumes_read_preference_unchanged,
* assumes_superuser_permissions,

View File

@ -1,8 +1,6 @@
/**
* Test how releaseMemory command interacts with snapshot read concern and collection drops
* @tags: [
* # TODO SERVER-97456 - this test should work with mongos when command is supported
* assumes_against_mongod_not_mongos,
* assumes_read_preference_unchanged,
* assumes_superuser_permissions,
* requires_fcv_81,

View File

@ -1243,8 +1243,6 @@ const allCommands = {
skip: isDeprecated,
},
releaseMemory: {
// TODO SERVER-97456 - remove this
skip: "Currently not supported in sharding",
setUp: function(conn) {
const db = conn.getDB(dbName);
for (let i = 0; i < 10; i++) {

View File

@ -693,6 +693,7 @@ let testCases = {
refineCollectionShardKey: {skip: "not on a user database"},
refreshLogicalSessionCacheNow: {skip: "goes through the cluster write path"},
refreshSessions: {skip: "executes locally on mongos (not sent to any remote node)"},
releaseMemory: {skip: "requires a previously established cursor"},
removeShard: {skip: "not on a user database"},
removeShardFromZone: {skip: "not on a user database"},
renameCollection: {

View File

@ -11,4 +11,6 @@ export const commandsRemovedFromMongodSinceLastLTS = [
// These commands were added in mongod since the last LTS version, so will not appear in the
// listCommands output of a last LTS version mongod. We will allow these commands to have a
// test defined without always existing on the mongod being used.
export const commandsAddedToMongodSinceLastLTS = [];
export const commandsAddedToMongodSinceLastLTS = [
"releaseMemory",
];

View File

@ -10,4 +10,6 @@ export const commandsRemovedFromMongosSinceLastLTS = [
// These commands were added in mongos since the last LTS version, so will not appear in the
// listCommands output of a last LTS version mongos. We will allow these commands to have a test
// defined without always existing on the mongos being used.
export const commandsAddedToMongosSinceLastLTS = [];
export const commandsAddedToMongosSinceLastLTS = [
"releaseMemory",
];

View File

@ -48,3 +48,6 @@ filters:
- "update_delete_many_metrics.js":
approvers:
- 10gen/query-integration-observability
- "release_memory*":
approvers:
- 10gen/query-execution

View File

@ -0,0 +1,238 @@
/**
* Make sure that running releaseMemory command does not affect the normal query execution.
*
* Uses getMore to pin an open cursor.
* @tags: [
* requires_getmore,
* requires_fcv_81,
* uses_parallel_shell,
* ]
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
// This test manually simulates a session, which is not compatible with implicit sessions.
TestData.disableImplicitSessions = true;
const kFailPointName = "waitAfterPinningCursorBeforeGetMoreBatch";
const kFailpointOptions = {
shouldCheckForInterrupt: true
};
const st = new ShardingTest({shards: 2});
const kDBName = "test";
const mongosDB = st.s.getDB(kDBName);
const shard0DB = st.shard0.getDB(kDBName);
const shard1DB = st.shard1.getDB(kDBName);
st.s.adminCommand({enablesharding: kDBName, primaryShard: st.shard0.name});
let coll = mongosDB.jstest_release_memory;
let docs = [];
for (let i = 0; i < 10; i++) {
docs.push({_id: i});
}
assert.commandWorked(coll.insertMany(docs));
st.shardColl(coll, {_id: 1}, {_id: 5}, {_id: 6}, kDBName, false);
function assertGetMore(targetDB, collName, cursorId, useSession, sessionId, results, docs) {
const testDB = targetDB ? targetDB : db;
let getMoreCmd = {getMore: cursorId, collection: collName, batchSize: 4};
if (useSession) {
getMoreCmd.lsid = sessionId;
}
while (cursorId != 0) {
jsTest.log(`Running getMore command ${tojson(getMoreCmd)}`);
const getMoreRes = testDB.runCommand(getMoreCmd);
assert.commandWorked(getMoreRes);
const cursor = getMoreRes.cursor;
cursorId = cursor.id;
results.push(...cursor.nextBatch);
}
if (useSession) {
assert.commandWorked(testDB.adminCommand({endSessions: [sessionId]}));
}
assert.sameMembers(results, docs);
}
// Tests that the various cursors involved in a sharded query can release memory.
function testReleaseMemory({func, useSession, cursorsNum, pinCursor, unknownCursor}) {
let cursorIdsArr = [];
let cursorIdx = 0;
let sessionIdsArr = [];
let getMoreJoiner = null;
let results = [];
for (let i = 0; i < cursorsNum; ++i) {
// Run a find against mongos. This should open cursors on both of the shards.
let findCmd = {find: coll.getName(), batchSize: 2};
if (useSession) {
// Manually start a session so it can be continued from inside a parallel shell.
const sessionId = assert.commandWorked(mongosDB.adminCommand({startSession: 1})).id;
findCmd.lsid = sessionId;
sessionIdsArr.push(sessionId)
}
jsTest.log(`Running find command ${i}`);
const findRes = mongosDB.runCommand(findCmd);
assert.commandWorked(findRes);
let cursor = findRes.cursor;
assert.neq(cursor.id, NumberLong(0));
results.push(cursor.firstBatch);
cursorIdsArr.push(cursor.id)
}
let shard0DBFailpoint;
let shard1DBFailpoint;
if (pinCursor) {
assert.gte(cursorsNum, cursorIdx + 1);
shard0DBFailpoint = configureFailPoint(shard0DB, kFailPointName, kFailpointOptions);
shard1DBFailpoint = configureFailPoint(shard1DB, kFailPointName, kFailpointOptions);
getMoreJoiner = startParallelShell(funWithArgs(assertGetMore,
null,
coll.getName(),
cursorIdsArr[cursorIdx],
useSession,
sessionIdsArr[cursorIdx],
results[cursorIdx],
docs),
st.s.port);
// Wait until we know the mongod cursors are pinned.
shard0DBFailpoint.wait();
shard1DBFailpoint.wait();
++cursorIdx;
}
if (unknownCursor) {
assert.gte(cursorsNum, cursorIdx + 1);
// Kill the cursor to make it go away.
jsTest.log(`killing cursor ${cursorIdsArr[cursorIdx]}`);
let cmdRes = assert.commandWorked(
mongosDB.runCommand({killCursors: coll.getName(), cursors: [cursorIdsArr[cursorIdx]]}));
assert.eq(cmdRes.cursorsKilled, [cursorIdsArr[cursorIdx]]);
assert.eq(cmdRes.cursorsAlive, []);
assert.eq(cmdRes.cursorsNotFound, []);
assert.eq(cmdRes.cursorsUnknown, []);
++cursorIdx
}
// Use the function provided by the caller to call the releaseMemory command.
func(cursorIdsArr);
if (pinCursor) {
shard0DBFailpoint.off();
shard1DBFailpoint.off();
// The getMore should finish now
getMoreJoiner();
getMoreJoiner = null;
}
for (; cursorIdx < cursorIdsArr.length; ++cursorIdx) {
assertGetMore(mongosDB,
coll.getName(),
cursorIdsArr[cursorIdx],
useSession,
sessionIdsArr[cursorIdx],
results[cursorIdx],
docs);
}
}
for (let useSession of [false, true]) {
// Test single cursor.
testReleaseMemory({
func: function(mongosCursorIdArr) {
jsTest.log(`Running releaseMemory command for single cursor ${
tojson({releaseMemory: mongosCursorIdArr})}`);
let cmdRes =
assert.commandWorked(mongosDB.runCommand({releaseMemory: mongosCursorIdArr}));
assert.eq(cmdRes.cursorsReleased, mongosCursorIdArr);
assert.eq(cmdRes.cursorsCurrentlyPinned, []);
assert.eq(cmdRes.cursorsNotFound, []);
},
useSession: useSession,
cursorsNum: 1,
pinCursor: false,
unknownCursor: false
});
// Test multiple cursors.
testReleaseMemory({
func: function(mongosCursorIdsArr) {
jsTest.log(`Running releaseMemory command for multiple cursors ${
tojson({releaseMemory: mongosCursorIdsArr})}`);
let cmdRes =
assert.commandWorked(mongosDB.runCommand({releaseMemory: mongosCursorIdsArr}));
assert.eq(cmdRes.cursorsReleased, mongosCursorIdsArr);
assert.eq(cmdRes.cursorsCurrentlyPinned, []);
assert.eq(cmdRes.cursorsNotFound, []);
},
useSession: useSession,
cursorsNum: 2,
pinCursor: false,
unknownCursor: false
});
// Test not found cursor
testReleaseMemory({
func: function(mongosCursorIdArr) {
jsTest.log(`Running releaseMemory command for single unknown cursor ${
tojson({releaseMemory: mongosCursorIdArr})}`);
let cmdRes =
assert.commandWorked(mongosDB.runCommand({releaseMemory: mongosCursorIdArr}));
assert.eq(cmdRes.cursorsReleased, []);
assert.eq(cmdRes.cursorsCurrentlyPinned, []);
assert.eq(cmdRes.cursorsNotFound, mongosCursorIdArr);
},
useSession: useSession,
cursorsNum: 1,
pinCursor: false,
unknownCursor: true
});
// Test pinned cursor
testReleaseMemory({
func: function(mongosCursorIdArr) {
jsTest.log(`Running releaseMemory command for single cursor pinned ${
tojson({releaseMemory: mongosCursorIdArr})}`);
let cmdRes =
assert.commandWorked(mongosDB.runCommand({releaseMemory: mongosCursorIdArr}));
assert.eq(cmdRes.cursorsReleased, []);
assert.eq(cmdRes.cursorsCurrentlyPinned, mongosCursorIdArr);
assert.eq(cmdRes.cursorsNotFound, []);
},
useSession: useSession,
cursorsNum: 1,
pinCursor: true,
unknownCursor: false
});
// Test cursor combinations
testReleaseMemory({
func: function(mongosCursorIdArr) {
jsTest.log(`Running releaseMemory command for all types of cursors ${
tojson({releaseMemory: mongosCursorIdArr})}`);
let cmdRes =
assert.commandWorked(mongosDB.runCommand({releaseMemory: mongosCursorIdArr}));
assert.eq(cmdRes.cursorsReleased, [mongosCursorIdArr[2]]);
assert.eq(cmdRes.cursorsCurrentlyPinned, [mongosCursorIdArr[0]]);
assert.eq(cmdRes.cursorsNotFound, [mongosCursorIdArr[1]]);
},
useSession: useSession,
cursorsNum: 3,
pinCursor: true,
unknownCursor: true
});
}
st.stop();

View File

@ -0,0 +1,83 @@
/**
* Make sure that a releaseMemory failure does not affect the normal query execution.
*
* Uses getMore to pin an open cursor.
* @tags: [
* requires_getmore,
* requires_fcv_81,
* uses_parallel_shell,
* ]
*/
import {assertArrayEq} from "jstests/aggregation/extras/utils.js";
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const kFailPointName = "failReleaseMemoryAfterCursorCheckout";
const kFailpointOptions = {
"errorCode": ErrorCodes.SocketException,
};
const st = new ShardingTest({shards: 2});
const kDBName = "test";
const mongosDB = st.s.getDB(kDBName);
const shard0DB = st.shard0.getDB(kDBName);
const shard1DB = st.shard1.getDB(kDBName);
st.s.adminCommand({enablesharding: kDBName, primaryShard: st.shard0.name});
let coll = mongosDB.jstest_release_memory;
let docs = [];
for (let i = 0; i < 10; i++) {
docs.push({_id: i});
}
assert.commandWorked(coll.insertMany(docs));
st.shardColl(coll, {_id: 1}, {_id: 5}, {_id: 6}, kDBName, false);
function runTest(cursorId, docIdx) {
// Activate the failpoint and set the exception that it will throw.
let failpoint = configureFailPoint(mongosDB, kFailPointName, kFailpointOptions);
// Test releaseMemory
jsTest.log(`Running releaseMemory command ${tojson({releaseMemory: [cursorId]})}`);
assert.commandFailedWithCode(
mongosDB.runCommand({releaseMemory: [cursorId]}),
ErrorCodes.SocketException,
);
// Test getMore
let batchSize = 4;
let getMoreCmd = {getMore: cursorId, collection: coll.getName(), batchSize: batchSize};
while (cursorId != 0) {
jsTest.log(`Running getMore command ${tojson(getMoreCmd)}`);
let getMoreRes = mongosDB.runCommand(getMoreCmd);
assert.commandWorked(getMoreRes);
let cursor = getMoreRes.cursor;
assertArrayEq({actual: cursor.nextBatch, expected: docs.slice(docIdx, docIdx + batchSize)});
docIdx += batchSize;
cursorId = cursor.id;
}
failpoint.off();
}
// Test find command
jsTest.log(`Running find command`);
let findRes = mongosDB.runCommand({find: coll.getName(), sort: {_id: 1}, batchSize: 2});
assert.commandWorked(findRes);
let cursor = findRes.cursor;
assertArrayEq({actual: cursor.firstBatch, expected: docs.slice(0, 2)});
assert.neq(cursor.id, NumberLong(0));
runTest(cursor.id, 2);
// Test aggregate command
jsTest.log(`Running aggregate command`);
let aggregateRes = coll.aggregate([{$sort: {_id: 1}}], {cursor: {batchSize: 2}});
assertArrayEq({actual: aggregateRes._batch, expected: docs.slice(0, 2)});
assert.neq(aggregateRes._cursorid, NumberLong(0));
runTest(aggregateRes._cursorid, 2)
st.stop();

View File

@ -167,6 +167,7 @@ let testCases = {
clusterDelete: {skip: "already tested by 'delete' tests on mongos"},
clusterFind: {skip: "already tested by 'find' tests on mongos"},
clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterReleaseMemory: {skip: "already tested by 'releaseMemory' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
collMod: {skip: "primary only"},
@ -331,6 +332,7 @@ let testCases = {
refineCollectionShardKey: {skip: "primary only"},
refreshLogicalSessionCacheNow: {skip: "does not return user data"},
refreshSessions: {skip: "does not return user data"},
releaseMemory: {skip: "does not return user data"},
removeShard: {skip: "primary only"},
removeShardFromZone: {skip: "primary only"},
renameCollection: {skip: "primary only"},

View File

@ -176,6 +176,7 @@ let testCases = {
clusterFind: {skip: "already tested by 'find' tests on mongos"},
clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterReleaseMemory: {skip: "already tested by 'releaseMemory' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
commitReshardCollection: {skip: "primary only"},
commitTransaction: {skip: "primary only"},
@ -394,6 +395,7 @@ let testCases = {
refineCollectionShardKey: {skip: "primary only"},
refreshLogicalSessionCacheNow: {skip: "does not return user data"},
refreshSessions: {skip: "does not return user data"},
releaseMemory: {skip: "does not return user data"},
removeShard: {skip: "primary only"},
removeShardFromZone: {skip: "primary only"},
renameCollection: {skip: "primary only"},

View File

@ -178,6 +178,7 @@ let testCases = {
clusterFind: {skip: "already tested by 'find' tests on mongos"},
clusterGetMore: {skip: "already tested by 'getMore' tests on mongos"},
clusterInsert: {skip: "already tested by 'insert' tests on mongos"},
clusterReleaseMemory: {skip: "already tested by 'releaseMemore' tests on mongos"},
clusterUpdate: {skip: "already tested by 'update' tests on mongos"},
collMod: {skip: "primary only"},
collStats: {skip: "does not return user data"},
@ -345,6 +346,7 @@ let testCases = {
refineCollectionShardKey: {skip: "primary only"},
refreshLogicalSessionCacheNow: {skip: "does not return user data"},
refreshSessions: {skip: "does not return user data"},
releaseMemory: {skip: "does not return user data"},
removeShard: {skip: "primary only"},
removeShardFromZone: {skip: "primary only"},
renameCollection: {skip: "primary only"},

View File

@ -1090,7 +1090,7 @@ public:
/**
* Checks if the client associated with the given OperationContext is authorized to run this
* command.
* Command imlpementations MUST provide a method here, even if no authz checks are required.
* Command implementations MUST provide a method here, even if no authz checks are required.
* Such commands should return Status::OK(), with a comment stating "No auth required".
*/
virtual Status checkAuthForOperation(OperationContext* opCtx,

View File

@ -228,7 +228,7 @@ public:
/**
* Same as waitForEvent without an OperationContext, but if the OperationContext gets
* interrupted, will return the kill code, or, if the the deadline passes, will return
* interrupted, will return the kill code, or, if the deadline passes, will return
* Status::OK with cv_status::timeout.
*/
virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,

View File

@ -201,6 +201,7 @@ mongo_cc_library(
"//src/mongo/s/commands/query_cmd:cluster_pipeline_cmd_s.cpp",
"//src/mongo/s/commands/query_cmd:cluster_plan_cache_clear_cmd.cpp",
"//src/mongo/s/commands/query_cmd:cluster_profile_cmd.cpp",
"//src/mongo/s/commands/query_cmd:cluster_release_memory_cmd.cpp",
"//src/mongo/s/commands/query_cmd:cluster_write_cmd_s.cpp",
],
hdrs = [

View File

@ -25,3 +25,7 @@ filters:
- "cluster_profile_cmd.cpp":
approvers:
- 10gen/query-integration-observability
- "cluster_release_memory_cmd.cpp":
approvers:
- 10gen/query-execution
- 10gen/query-execution-client-cursor

View File

@ -56,13 +56,11 @@ struct ClusterKillCursorsCmd {
const NamespaceString& nss,
CursorId cursorId) {
auto const authzSession = AuthorizationSession::get(opCtx->getClient());
auto authChecker = [&authzSession,
&nss](const boost::optional<UserName>& userName) -> Status {
AuthzCheckFn authChecker = [&authzSession, &nss](AuthzCheckFnInputType userName) -> Status {
return auth::checkAuthForKillCursors(authzSession, nss, userName);
};
return Grid::get(opCtx)->getCursorManager()->checkAuthForKillCursors(
opCtx, cursorId, authChecker);
return Grid::get(opCtx)->getCursorManager()->checkAuthCursor(opCtx, cursorId, authChecker);
}
static Status doKillCursor(OperationContext* opCtx,

View File

@ -0,0 +1,210 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include <memory>
#include <set>
#include <string>
#include <vector>
#include <boost/optional/optional.hpp>
#include "mongo/base/status.h"
#include "mongo/db/auth/authorization_checks.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/user_name.h"
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/client_cursor/cursor_id.h"
#include "mongo/db/query/client_cursor/release_memory_gen.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/exec/cluster_cursor_manager.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(failReleaseMemoryAfterCursorCheckout);
class ClusterReleaseMemoryCmd final : public TypedCommand<ClusterReleaseMemoryCmd> {
public:
using Request = ReleaseMemoryCommandRequest;
const std::set<std::string>& apiVersions() const final {
return kNoApiVersions;
}
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kAlways;
}
bool adminOnly() const final {
return false;
}
bool allowedInTransactions() const final {
return true;
}
bool allowedWithSecurityToken() const final {
return true;
}
bool enableDiagnosticPrintingOnFailure() const final {
return true;
}
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
ReleaseMemoryCommandReply typedRun(OperationContext* opCtx) {
ReleaseMemoryCommandRequest request = this->request();
auto cursorManager = Grid::get(opCtx)->getCursorManager();
auto cursorIds = request.getCommandParameter();
std::vector<CursorId> cursorsReleased;
std::vector<CursorId> cursorsNotFound;
std::vector<CursorId> cursorsCurrentlyPinned;
for (CursorId id : cursorIds) {
auto pinnedCursor = cursorManager->checkOutCursorNoAuthCheck(id, opCtx);
if (pinnedCursor.isOK()) {
ScopeGuard returnCursorGuard([&pinnedCursor] {
pinnedCursor.getValue().returnCursor(
ClusterCursorManager::CursorState::NotExhausted);
});
{
// If the 'failGetMoreAfterCursorCheckout' failpoint is enabled, throw an
// exception with the given 'errorCode' value, or ErrorCodes::InternalError
// if 'errorCode' is omitted.
auto nss = ns();
failReleaseMemoryAfterCursorCheckout.executeIf(
[](const BSONObj& data) {
auto errorCode =
(data["errorCode"] ? data["errorCode"].safeNumberLong()
: ErrorCodes::InternalError);
uasserted(
errorCode,
"Hit the 'failReleaseMemoryAfterCursorCheckout' failpoint");
},
[&opCtx, nss](const BSONObj& data) {
auto dataForFailCommand = data.addField(
BSON("failCommands" << BSON_ARRAY("releaseMemory"))
.firstElement());
auto* command = CommandHelpers::findCommand(opCtx, "releaseMemory");
return CommandHelpers::shouldActivateFailCommandFailPoint(
dataForFailCommand, nss, command, opCtx->getClient());
});
}
auto resp = pinnedCursor.getValue()->releaseMemory();
// Check the status and decide where the result should go
if (resp.isOK()) {
cursorsReleased.push_back(id);
} else {
LOGV2_ERROR(9745601,
"ReleaseMemory returned non-OK status",
"cursorId"_attr = id,
"status"_attr = resp.toString());
}
// Upon successful completion, transfer ownership of the cursor back to the
// cursor manager.
pinnedCursor.getValue().returnCursor(
ClusterCursorManager::CursorState::NotExhausted);
returnCursorGuard.dismiss();
} else if (pinnedCursor.getStatus().code() == ErrorCodes::CursorInUse) {
cursorsCurrentlyPinned.push_back(id);
} else if (pinnedCursor.getStatus().code() == ErrorCodes::CursorNotFound) {
cursorsNotFound.push_back(id);
} else {
LOGV2_ERROR(9745602,
"ReleaseMemory returned unexpected status",
"cursorId"_attr = id,
"status"_attr = pinnedCursor.getStatus().toString());
}
}
return ReleaseMemoryCommandReply{std::move(cursorsReleased),
std::move(cursorsNotFound),
std::move(cursorsCurrentlyPinned)};
}
private:
bool supportsWriteConcern() const override {
return false;
}
NamespaceString ns() const override {
return NamespaceString::makeCommandNamespace(db());
}
const DatabaseName& db() const override {
return request().getDbName();
}
void doCheckAuthorization(OperationContext* opCtx) const final {
auto cursorIds = this->request().getCommandParameter();
// Check each cursor to verify that it has privileges to release memory on it.
for (CursorId id : cursorIds) {
auto const authzSession = AuthorizationSession::get(opCtx->getClient());
ReleaseMemoryAuthzCheckFn authChecker =
[&authzSession](ReleaseMemoryAuthzCheckFnInputType ns) -> Status {
return auth::checkAuthForReleaseMemory(authzSession, ns);
};
auto status =
Grid::get(opCtx)->getCursorManager()->checkAuthCursor(opCtx, id, authChecker);
// audit::logReleaseMemoryAuthzCheck(opCtx->getClient(), nss, id, status.code());
if (!status.isOK()) {
if (status.code() == ErrorCodes::CursorNotFound) {
// Not found isn't an authorization issue.
// run() will raise it as a return value.
continue;
}
uassertStatusOK(status); // throws
}
}
}
};
};
MONGO_REGISTER_COMMAND(ClusterReleaseMemoryCmd).forRouter();
} // namespace
} // namespace mongo

View File

@ -44,6 +44,7 @@
#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/query/client_cursor/cursor_response.h"
#include "mongo/db/query/client_cursor/kill_cursors_gen.h"
#include "mongo/db/query/client_cursor/release_memory_gen.h"
#include "mongo/db/query/getmore_command_gen.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/session/logical_session_id_gen.h"
@ -387,10 +388,13 @@ AsyncResultsMerger::RemoteCursorPtr AsyncResultsMerger::_buildRemote(WithLock lk
}
void AsyncResultsMerger::_cancelCallbackForRemote(WithLock lk, const RemoteCursorPtr& remote) {
if (!remote->cbHandle.isValid()) {
return;
if (remote->cbHandle.isValid()) {
_executor->cancel(remote->cbHandle);
}
if (remote->releaseMemoryCbHandle.isValid()) {
_executor->cancel(remote->releaseMemoryCbHandle);
}
_executor->cancel(remote->cbHandle);
}
void AsyncResultsMerger::_removeRemoteFromPromisedMinSortKeys(WithLock lk,
@ -652,6 +656,51 @@ Status AsyncResultsMerger::scheduleGetMores() {
return _scheduleGetMores(lk);
}
stdx::shared_future<void> AsyncResultsMerger::releaseMemory(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_releaseMemoryCompleteInfo) {
// There is already a 'releasMemoryCompleteInfo' future. Do not create another one.
return _releaseMemoryCompleteInfo->getFuture();
}
// Create a 'releasMemoryCompleteInfo' future, which will be signaled when all remotes have
// responded to the releaseMemory command.
_releaseMemoryCompleteInfo.emplace();
_status = _scheduleReleaseMemory(lk, opCtx);
if (!_status.isOK()) {
LOGV2_ERROR(
9745606, "Scheduling releaseMemory encountered an issue", "status"_attr = _status);
_releaseMemoryCompleteInfo->signalFutures();
}
if (!_haveOutstandingReleaseMemoryRequests(lk)) {
// Signal the future right now, as there's nothing to wait for.
_releaseMemoryCompleteInfo->signalFutures();
}
return _releaseMemoryCompleteInfo->getFuture();
}
Status AsyncResultsMerger::releaseMemoryResult(OperationContext* opCtx) {
if (!_status.isOK()) {
return _status;
}
for (const auto& remote : _remotes) {
// If we are done, all remotes should have sent a response.
tassert(9745603, "remote should have releaseMemoryResponse", remote->releaseMemoryResponse);
auto response = remote->releaseMemoryResponse.get();
Status responseStatus = getStatusFromCommandResult(response);
if (!responseStatus.isOK()) {
return responseStatus;
}
}
return Status::OK();
}
Status AsyncResultsMerger::_scheduleGetMores(WithLock lk) {
// Before scheduling more work, check whether the cursor has been invalidated.
_assertNotInvalidated(lk);
@ -974,6 +1023,20 @@ void AsyncResultsMerger::_handleBatchResponse(WithLock lk,
_signalCurrentEventIfReady(lk); // Wake up anyone waiting on '_currentEvent'.
}
void AsyncResultsMerger::_handleReleaseMemoryResponse(WithLock lk,
StatusWith<BSONObj>& parsedResponse,
const RemoteCursorPtr& remote) {
// Got a response from remote, so indicate we are no longer waiting for one.
remote->releaseMemoryCbHandle = executor::TaskExecutor::CallbackHandle();
remote->releaseMemoryResponse = std::move(parsedResponse.getValue());
if (!_haveOutstandingReleaseMemoryRequests(lk)) {
tassert(9745604, "_releaseMemoryCompleteInfo should exist", _releaseMemoryCompleteInfo);
_releaseMemoryCompleteInfo->signalFutures();
}
}
void AsyncResultsMerger::_cleanUpKilledBatch(WithLock lk) {
invariant(_lifecycleState == kKillStarted);
@ -1082,6 +1145,12 @@ bool AsyncResultsMerger::_haveOutstandingBatchRequests(WithLock) {
});
}
bool AsyncResultsMerger::_haveOutstandingReleaseMemoryRequests(WithLock) {
return std::any_of(_remotes.begin(), _remotes.end(), [](const auto& remote) {
return remote->releaseMemoryCbHandle.isValid();
});
}
void AsyncResultsMerger::_scheduleKillCursors(WithLock lk, OperationContext* opCtx) {
invariant(_killCompleteInfo);
@ -1137,6 +1206,61 @@ void AsyncResultsMerger::_scheduleKillCursorForRemote(WithLock lk,
.ignore();
}
Status AsyncResultsMerger::_scheduleReleaseMemory(WithLock, OperationContext* opCtx) {
std::vector<AsyncRequestsSender::Request> asyncRequests;
asyncRequests.reserve(_remotes.size());
for (auto& remote : _remotes) {
if (!remote->status.isOK()) {
return remote->status;
}
const std::vector<mongo::CursorId> params{remote->cursorId};
ReleaseMemoryCommandRequest releaseMemoryCmd(params);
releaseMemoryCmd.setDbName(remote->cursorNss.dbName());
BSONObjBuilder bob;
releaseMemoryCmd.serialize(&bob);
executor::RemoteCommandRequest executorRequest{
remote->getTargetHost(), remote->cursorNss.dbName(), bob.obj(), _opCtx};
LOGV2_DEBUG(99745600,
2,
"scheduling releaseMemory command",
"remoteHost"_attr = remote->getTargetHost(),
"shardId"_attr = remote->shardId.toString());
// Make a copy of the remote's cursorId here while holding the mutex. The copy is passed
// into the lambda so the cursorId can be accessed without holding the mutex.
const auto cursorId = remote->cursorId;
auto callbackStatus = _executor->scheduleRemoteCommand(
executorRequest,
[self = shared_from_this(), cursorId, remote /* intrusive_ptr copy! */](
auto const& cbData) {
// Parse response outside of the mutex.
auto parsedResponse = [&](const auto& cbData) -> StatusWith<BSONObj> {
if (!cbData.response.isOK()) {
return cbData.response.status;
}
// Not much to do here really.
return std::move(cbData.response.data);
}(cbData);
// Handle the response and update the remote's status under the mutex.
stdx::lock_guard<stdx::mutex> lk(self->_mutex);
self->_handleReleaseMemoryResponse(lk, parsedResponse, remote);
});
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
remote->releaseMemoryCbHandle = callbackStatus.getValue();
}
return Status::OK();
}
bool AsyncResultsMerger::_shouldKillRemote(WithLock, const RemoteCursorData& remote) {
static const std::set<ErrorCodes::Error> kCursorAlreadyDeadCodes = {
ErrorCodes::QueryPlanKilled, ErrorCodes::CursorKilled, ErrorCodes::CursorNotFound};

View File

@ -231,6 +231,12 @@ public:
*/
Status scheduleGetMores();
stdx::shared_future<void> releaseMemory(OperationContext* opCtx);
// It merges the releaseMemory results from all the remote requests. At the moment it returns
// only a status but it can be extended to return more in the future.
Status releaseMemoryResult(OperationContext* opCtx);
/**
* Adds the specified shard cursors to the set of cursors to be merged. The results from the
* new cursors will be returned as normal through nextReady().
@ -410,9 +416,13 @@ private:
// The buffer of results that have been retrieved but not yet returned to the caller.
std::queue<BSONObj> docBuffer;
// Keep outside the docBuffer so not to mess with buffered results from normal execution.
boost::optional<BSONObj> releaseMemoryResponse;
// Is valid if there is currently a pending request to this remote.
executor::TaskExecutor::CallbackHandle cbHandle;
// Different handle so that it does not override the handle for getMore.
executor::TaskExecutor::CallbackHandle releaseMemoryCbHandle;
// Set to an error status if there is an error retrieving a response from this remote or if
// the command result contained an error.
@ -515,6 +525,10 @@ private:
StatusWith<CursorResponse>& response,
const RemoteCursorPtr& remote);
void _handleReleaseMemoryResponse(WithLock lk,
StatusWith<BSONObj>& response,
const RemoteCursorPtr& remote);
/**
* Schedule a killCursors request for the remote if the remote still has a cursor open.
* This is a fire-and-forget attempt to close the remote cursor. We are not blocking until
@ -562,6 +576,12 @@ private:
* Returns true if this async cursor is waiting to receive another batch from a remote.
*/
bool _haveOutstandingBatchRequests(WithLock);
/**
* Returns true if this async cursor is waiting to receive a response on a releaseMemory command
* from a remote.
*/
bool _haveOutstandingReleaseMemoryRequests(WithLock);
/**
* Called internally when attempting to get a new event for the caller to wait on. Throws if
@ -597,6 +617,11 @@ private:
*/
bool _shouldKillRemote(WithLock, const RemoteCursorData& remote);
/**
* Schedules a releaseMemory command to be run on all remote hosts that have stored cursors.
*/
Status _scheduleReleaseMemory(WithLock, OperationContext* opCtx);
/**
* Updates the given remote's metadata (e.g. the cursor id) based on information in
* 'response'.
@ -733,11 +758,12 @@ private:
// Handles the promise/future mechanism used to cleanly shut down the ARM. This avoids race
// conditions in cases where the underlying TaskExecutor is simultaneously being torn down.
struct KillCompletePromiseFuture {
KillCompletePromiseFuture() : _future(_promise.get_future()) {}
struct CompletePromiseFuture {
CompletePromiseFuture() : _future(_promise.get_future()) {}
// Multiple calls to kill() can be made and each must return a future that will be
// notified when the ARM has been cleaned up.
// Multiple calls to the method that creates the promise (i.e., kill()/releaseMemory()) can
// be made and each must return a future that will be notified when the ARM has been cleaned
// up.
stdx::shared_future<void> getFuture() {
return _future;
}
@ -752,7 +778,9 @@ private:
stdx::promise<void> _promise;
stdx::shared_future<void> _future;
};
boost::optional<KillCompletePromiseFuture> _killCompleteInfo;
boost::optional<CompletePromiseFuture> _killCompleteInfo;
boost::optional<CompletePromiseFuture> _releaseMemoryCompleteInfo;
};
} // namespace mongo

View File

@ -154,6 +154,7 @@ StatusWith<ClusterQueryResult> BlockingResultsMerger::blockUntilNext(OperationCo
return _arm->nextReady();
}
StatusWith<ClusterQueryResult> BlockingResultsMerger::next(OperationContext* opCtx) {
CurOp::get(opCtx)->ensureRecordRemoteOpWait();
@ -163,6 +164,11 @@ StatusWith<ClusterQueryResult> BlockingResultsMerger::next(OperationContext* opC
: blockUntilNext(opCtx));
}
Status BlockingResultsMerger::releaseMemory(OperationContext* opCtx) {
_arm->releaseMemory(opCtx).wait();
return _arm->releaseMemoryResult(opCtx);
}
StatusWith<executor::TaskExecutor::EventHandle> BlockingResultsMerger::getNextEvent() {
// If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
if (_leftoverEventFromLastTimeout) {

View File

@ -77,6 +77,8 @@ public:
*/
StatusWith<ClusterQueryResult> next(OperationContext*);
Status releaseMemory(OperationContext* opCtx);
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm->setAwaitDataTimeout(awaitDataTimeout);
}

View File

@ -73,6 +73,8 @@ public:
*/
virtual StatusWith<ClusterQueryResult> next() = 0;
virtual Status releaseMemory() = 0;
/**
* Must be called before destruction to abandon a not-yet-exhausted cursor. If next() has
* already returned boost::none, then the cursor is exhausted and is safe to destroy.

View File

@ -153,6 +153,20 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
return next;
}
Status ClusterClientCursorImpl::releaseMemory() {
tassert(9745605, "releaseMemory should have a valid OperationContext", _opCtx);
auto interruptStatus = _opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
return interruptStatus;
}
auto res = _root->releaseMemory();
if (res.isOK()) {
++_numReturnedSoFar;
}
return res;
}
void ClusterClientCursorImpl::kill(OperationContext* opCtx) {
tassert(7448200,
"Cannot kill a cluster client cursor that has already been killed",

View File

@ -85,6 +85,8 @@ public:
StatusWith<ClusterQueryResult> next() final;
Status releaseMemory() final;
void kill(OperationContext* opCtx) final;
void reattachToOperationContext(OperationContext* opCtx) final;

View File

@ -125,6 +125,10 @@ void ClusterClientCursorMock::kill(OperationContext* opCtx) {
}
}
Status ClusterClientCursorMock::releaseMemory() {
return Status::OK();
}
bool ClusterClientCursorMock::isTailable() const {
return false;
}

View File

@ -72,6 +72,8 @@ public:
void kill(OperationContext* opCtx) final;
Status releaseMemory() final;
void reattachToOperationContext(OperationContext* opCtx) final {
_opCtx = opCtx;
}

View File

@ -78,6 +78,13 @@ Status cursorInUseStatus(CursorId cursorId) {
} // namespace
/* Explicit instantiation of the templates to have available for linker. */
template Status ClusterCursorManager::checkAuthCursor<AuthzCheckFnInputType>(
OperationContext* opCtx, CursorId cursorId, AuthzCheckFn func);
template Status ClusterCursorManager::checkAuthCursor<ReleaseMemoryAuthzCheckFnInputType>(
OperationContext* opCtx, CursorId cursorId, ReleaseMemoryAuthzCheckFn func);
ClusterCursorManager::PinnedCursor::PinnedCursor(ClusterCursorManager* manager,
ClusterClientCursorGuard&& cursorGuard,
const NamespaceString& nss,
@ -226,7 +233,6 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur
return cursorNotFoundStatus(cursorId);
}
// Check if the user is coauthorized to access this cursor.
auto authCheckStatus = authChecker(entry->getAuthenticatedUser());
if (!authCheckStatus.isOK()) {
return authCheckStatus.withContext(str::stream()
@ -247,7 +253,7 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur
auto cursorGuard = entry->releaseCursor(opCtx);
// We use pinning of a cursor as a proxy for active, user-initiated use of a cursor. Therefore,
// We use pinning of a cursor as a proxy for active, user-initiated use of a cursor. Therefore,
// we pass down to the logical session cache and vivify the record (updating last use).
if (cursorGuard->getLsid()) {
auto vivifyCursorStatus =
@ -264,9 +270,34 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur
return PinnedCursor(this, std::move(cursorGuard), entry->getNamespace(), cursorId);
}
StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCursorNoAuthCheck(
CursorId cursorId, OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_inShutdown) {
return Status(ErrorCodes::ShutdownInProgress,
"Cannot check out cursor as we are in the process of shutting down");
}
CursorEntry* entry = _getEntry(lk, cursorId);
if (!entry) {
return cursorNotFoundStatus(cursorId);
}
if (entry->getOperationUsingCursor()) {
return cursorInUseStatus(cursorId);
}
auto cursorGuard = entry->releaseCursor(opCtx);
cursorGuard->reattachToOperationContext(opCtx);
return PinnedCursor(this, std::move(cursorGuard), entry->getNamespace(), cursorId);
}
void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cursor,
CursorId cursorId,
CursorState cursorState) {
CursorState cursorState,
bool isReleaseMemory) {
invariant(cursor);
// Read the clock out of the lock.
const auto now = _clockSource->now();
@ -275,7 +306,9 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
OperationContext* opCtx = cursor->getCurrentOperationContext();
invariant(opCtx);
cursor->detachFromOperationContext();
cursor->setLastUseDate(now);
if (!isReleaseMemory) {
cursor->setLastUseDate(now);
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
@ -285,7 +318,9 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
// killPending will be true if killCursor() was called while the cursor was in use.
const bool killPending = entry->isKillPending();
entry->setLastActive(now);
if (!isReleaseMemory) {
entry->setLastActive(now);
}
entry->returnCursor(std::move(cursor));
if (cursorState == CursorState::NotExhausted && !killPending) {
@ -298,9 +333,10 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
detachAndKillCursor(std::move(lk), opCtx, cursorId);
}
Status ClusterCursorManager::checkAuthForKillCursors(OperationContext* opCtx,
CursorId cursorId,
AuthzCheckFn authChecker) {
template <typename T>
Status ClusterCursorManager::checkAuthCursor(OperationContext* opCtx,
CursorId cursorId,
std::function<Status(T)> authChecker) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto entry = _getEntry(lk, cursorId);
@ -308,9 +344,7 @@ Status ClusterCursorManager::checkAuthForKillCursors(OperationContext* opCtx,
return cursorNotFoundStatus(cursorId);
}
// Note that getAuthenticatedUser() is thread-safe, so it's okay to call even if there's
// an operation using the cursor.
return authChecker(entry->getAuthenticatedUser());
return AuthzCheckPolicy<T>::authzCheck(entry, authChecker);
}
void ClusterCursorManager::killOperationUsingCursor(WithLock, CursorEntry* entry) {

View File

@ -75,6 +75,20 @@ class OperationContext;
template <typename T>
class StatusWith;
// 'CursorsAuthzCheckFnInputType' represents the argument of a the 'CursorsAuthzCheckFn' function.
// The function may be passed into a ClusterCursorManager method which checks whether the
// current client is authorized to perform the operation in question by checking the user that
// issues the command against the authorised users.
using AuthzCheckFnInputType = const boost::optional<UserName>&;
using AuthzCheckFn = std::function<Status(AuthzCheckFnInputType)>;
// 'ReleaseMemoryAuthzCheckFnInputType' represents the argument of a the 'ReleaseMemoryAuthzCheckFn'
// function. The function may be passed into a ClusterCursorManager method which checks whether
// the current client is authorized to perform the operation in question by checking whether the
// user has privileges over a namespace.
using ReleaseMemoryAuthzCheckFnInputType = const NamespaceString&;
using ReleaseMemoryAuthzCheckFn = std::function<Status(ReleaseMemoryAuthzCheckFnInputType)>;
/**
* ClusterCursorManager is a container for ClusterClientCursor objects. It manages the lifetime of
* its registered cursors and tracks basic information about them.
@ -135,11 +149,6 @@ public:
size_t pinned;
};
// Represents a function that may be passed into a ClusterCursorManager method which checks
// whether the current client is authorized to perform the operation in question. The function
// will be passed the list of users authorized to use the cursor.
using AuthzCheckFn = std::function<Status(const boost::optional<UserName>&)>;
/**
* PinnedCursor is a moveable, non-copyable class representing ownership of a cursor that has
* been leased from a ClusterCursorManager.
@ -430,6 +439,9 @@ public:
CursorLifetime cursorLifetime,
const boost::optional<UserName>& authenticatedUser);
template <typename T>
struct AuthzCheckPolicy;
/**
* Moves the given cursor to the 'pinned' state, and transfers ownership of the cursor to the
* PinnedCursor object returned. Cursors that are pinned must later be returned with
@ -441,14 +453,14 @@ public:
*
* Checking out a cursor will attach it to the given operation context.
*
* 'authChecker' is function that will be called with the list of users authorized to use this
* cursor. This function should check whether the current client is also authorized to use this
* cursor, and if not, return an error status, which will cause checkOutCursor to fail.
* 'authChecker' is function that will be called to check whether the current client is
* authorized to use this cursor, and if not, return an error status, which will cause
* checkOutCursor to fail.
*
* If 'checkSessionAuth' is 'kCheckSession' or left unspecified, this function also checks if
* the current session in the specified 'opCtx' has privilege to access the cursor specified by
* 'id.' In this case, this function returns a 'mongo::Status' with information regarding the
* nature of the inaccessability when the cursor is not accessible. If 'kNoCheckSession' is
* nature of the inaccessibility when the cursor is not accessible. If 'kNoCheckSession' is
* passed for 'checkSessionAuth,' this function does not check if the current session is
* authorized to access the cursor with the given id.
*
@ -462,14 +474,30 @@ public:
AuthzCheckFn authChecker,
AuthCheck checkSessionAuth = kCheckSession);
/**
* Moves the given cursor to the 'pinned' state, and transfers ownership of the cursor to the
* PinnedCursor object returned. Cursors that are pinned must later be returned with
* PinnedCursor::returnCursor().
*
* Only one client may pin a given cursor at a time. If the given cursor is already pinned,
* returns an error Status with code CursorInUse. If the given cursor is not registered or has
* a pending kill, returns an error Status with code CursorNotFound.
*
* Checking out a cursor will attach it to the given operation context.
*
* It does not check if the current client is authorized to use this cursor, assuming that this
* check has already been done.
*/
StatusWith<PinnedCursor> checkOutCursorNoAuthCheck(CursorId cursorId, OperationContext* opCtx);
/**
* This method will find the given cursor, and if it exists, call 'authChecker', passing the
* list of users authorized to use the cursor. Will propagate the return value of authChecker.
*/
Status checkAuthForKillCursors(OperationContext* opCtx,
CursorId cursorId,
AuthzCheckFn authChecker);
template <typename T>
Status checkAuthCursor(OperationContext* opCtx,
CursorId cursorId,
std::function<Status(T)> authChecker);
/**
* Informs the manager that the given cursor should be killed. The cursor need not necessarily
@ -544,6 +572,8 @@ private:
*
* If 'cursorState' is 'Exhausted', the cursor will be destroyed.
*
* If 'isReleaseMemory' is true the last use date and last active will not be updated.
*
* Thread-safe.
*
* Intentionally private. Clients should use public methods on PinnedCursor to check a cursor
@ -551,7 +581,8 @@ private:
*/
void checkInCursor(std::unique_ptr<ClusterClientCursor> cursor,
CursorId cursorId,
CursorState cursorState);
CursorState cursorState,
bool isReleaseMemory = false);
/**
* Will detach a cursor, release the lock and then call kill() on it.
@ -610,4 +641,26 @@ private:
size_t _cursorsTimedOut = 0;
};
/* For the killCursors command the list of users authorised to access the cursor is used to
* check authorisation*/
template <>
struct ClusterCursorManager::AuthzCheckPolicy<AuthzCheckFnInputType> {
static Status authzCheck(CursorEntry* entry, const AuthzCheckFn& authChecker) {
// Note that getAuthenticatedUser() is thread-safe, so it's okay to call even if there's
// an operation using the cursor.
return authChecker(entry->getAuthenticatedUser());
}
};
/* For the releaseMemory command the list of namespaces authorised to access the cursor is used
* to check authorisation*/
template <>
struct ClusterCursorManager::AuthzCheckPolicy<ReleaseMemoryAuthzCheckFnInputType> {
static Status authzCheck(CursorEntry* entry, const ReleaseMemoryAuthzCheckFn& authChecker) {
// Note that getNamespace() is thread-safe, so it's okay to call even if there's
// an operation using the cursor.
return authChecker(entry->getNamespace());
}
};
} // namespace mongo

View File

@ -74,14 +74,6 @@ protected:
_manager.shutdown(_opCtx.get());
}
static Status successAuthChecker(const boost::optional<UserName>&) {
return Status::OK();
};
static Status failAuthChecker(const boost::optional<UserName>&) {
return {ErrorCodes::Unauthorized, "Unauthorized"};
};
/**
* Returns an unowned pointer to the manager owned by this test fixture.
*/
@ -142,6 +134,10 @@ protected:
ASSERT_OK(getManager()->killCursor(killCursorOpCtx.get(), cursorId));
}
static AuthzCheckFn successAuthChecker;
static AuthzCheckFn failAuthChecker;
static ReleaseMemoryAuthzCheckFn successReleaseMemoryAuthChecker;
static ReleaseMemoryAuthzCheckFn failReleaseMemoryAuthChecker;
private:
// List of flags representing whether our allocated cursors have been killed yet. The value of
@ -155,6 +151,22 @@ private:
ServiceContext::UniqueOperationContext _opCtx;
};
AuthzCheckFn ClusterCursorManagerTest::successAuthChecker = [](AuthzCheckFnInputType) -> Status {
return Status::OK();
};
AuthzCheckFn ClusterCursorManagerTest::failAuthChecker = [](AuthzCheckFnInputType) -> Status {
return {ErrorCodes::Unauthorized, "Unauthorized"};
};
ReleaseMemoryAuthzCheckFn ClusterCursorManagerTest::successReleaseMemoryAuthChecker =
[](ReleaseMemoryAuthzCheckFnInputType) -> Status {
return Status::OK();
};
ReleaseMemoryAuthzCheckFn ClusterCursorManagerTest::failReleaseMemoryAuthChecker =
[](ReleaseMemoryAuthzCheckFnInputType) -> Status {
return {ErrorCodes::Unauthorized, "Unauthorized"};
};
// Test that registering a cursor and checking it out returns a pin to the same cursor.
TEST_F(ClusterCursorManagerTest, RegisterCursor) {
auto cursor = allocateMockCursor();
@ -1341,14 +1353,31 @@ TEST_F(ClusterCursorManagerTest, CheckAuthForKillCursors) {
ClusterCursorManager::CursorLifetime::Mortal,
boost::none));
ASSERT_EQ(ErrorCodes::CursorNotFound,
getManager()->checkAuthForKillCursors(
getOperationContext(), cursorId + 1, successAuthChecker));
ASSERT_EQ(
ErrorCodes::Unauthorized,
getManager()->checkAuthForKillCursors(getOperationContext(), cursorId, failAuthChecker));
ASSERT_OK(
getManager()->checkAuthForKillCursors(getOperationContext(), cursorId, successAuthChecker));
ErrorCodes::CursorNotFound,
getManager()->checkAuthCursor(getOperationContext(), cursorId + 1, successAuthChecker));
ASSERT_EQ(ErrorCodes::Unauthorized,
getManager()->checkAuthCursor(getOperationContext(), cursorId, failAuthChecker));
ASSERT_OK(getManager()->checkAuthCursor(getOperationContext(), cursorId, successAuthChecker));
}
TEST_F(ClusterCursorManagerTest, CheckAuthForReleaseMemory) {
auto cursorId =
assertGet(getManager()->registerCursor(getOperationContext(),
allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal,
boost::none));
ASSERT_EQ(ErrorCodes::CursorNotFound,
getManager()->checkAuthCursor(
getOperationContext(), cursorId + 1, successReleaseMemoryAuthChecker));
ASSERT_EQ(ErrorCodes::Unauthorized,
getManager()->checkAuthCursor(
getOperationContext(), cursorId, failReleaseMemoryAuthChecker));
ASSERT_OK(getManager()->checkAuthCursor(
getOperationContext(), cursorId, successReleaseMemoryAuthChecker));
}
TEST_F(ClusterCursorManagerTest, PinnedCursorReturnsUnderlyingCursorTxnNumber) {

View File

@ -125,7 +125,7 @@ public:
void checkForFailedRequests();
/**
* Take all cursors currently tracked by the CursorEstablsher.
* Take all cursors currently tracked by the CursorEstablisher.
*/
std::vector<RemoteCursor> takeCursors() {
return std::exchange(_remoteCursors, {});

View File

@ -72,6 +72,13 @@ public:
*/
virtual StatusWith<ClusterQueryResult> next() = 0;
virtual Status releaseMemory() {
tassert(9745608,
"router stage should have a child or provide alternative implementation",
_child);
return _child->releaseMemory();
}
/**
* Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for
* responses from remote hosts.

View File

@ -60,6 +60,11 @@ public:
return _resultsMerger.next(getOpCtx());
}
Status releaseMemory() final {
auto res = _resultsMerger.releaseMemory(getOpCtx());
return res;
}
void kill(OperationContext* opCtx) final {
_resultsMerger.kill(opCtx);
}

View File

@ -58,6 +58,12 @@ public:
StatusWith<ClusterQueryResult> next() final;
Status releaseMemory() final {
_mergePipeline->forceSpill();
// When I have the bytes I will return them from here. Now, I have nothing to return.
return Status::OK();
}
void kill(OperationContext* opCtx) final;
bool remotesExhausted() const final;

View File

@ -52,6 +52,12 @@ public:
StatusWith<ClusterQueryResult> next() final;
Status releaseMemory() final {
// It has no children. It cannot do anything to release memory.
return Status::OK();
}
void kill(OperationContext* opCtx) final;
bool remotesExhausted() const final;

View File

@ -925,7 +925,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
auto cursorManager = Grid::get(opCtx)->getCursorManager();
auto authzSession = AuthorizationSession::get(opCtx->getClient());
auto authChecker = [&authzSession](const boost::optional<UserName>& userName) -> Status {
AuthzCheckFn authChecker = [&authzSession](AuthzCheckFnInputType userName) -> Status {
return authzSession->isCoauthorizedWith(userName)
? Status::OK()
: Status(ErrorCodes::Unauthorized, "User not authorized to access cursor");