SERVER-113352 Implement command to persist range deletion task documents for unowned non-empty ranges (#47314)
GitOrigin-RevId: 6f7cec27927b50fe786d2c0ef8693ffdedba5d7a
This commit is contained in:
parent
fc727a014e
commit
5f8dbc8713
@ -644,6 +644,18 @@ const internalCommandsMap = {
|
||||
command:
|
||||
{_shardsvrNotifyShardingEvent: "test", eventType: "collectionResharded", details: {}},
|
||||
},
|
||||
_shardsvrRecreateRangeDeletionTasks: {
|
||||
testname: "_shardsvrRecreateRangeDeletionTasks",
|
||||
command: {_shardsvrRecreateRangeDeletionTasks: "collection", skipEmptyRanges: true},
|
||||
},
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: {
|
||||
testname: "_shardsvrRecreateRangeDeletionTasksParticipant",
|
||||
command: {
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: "collection",
|
||||
skipEmptyRanges: true,
|
||||
uuid: UUID()
|
||||
},
|
||||
},
|
||||
_shardsvrRenameCollection: {
|
||||
testname: "_shardsvrRenameCollection",
|
||||
command: {_shardsvrRenameCollection: "test.collection", to: "db.collection_renamed"},
|
||||
|
||||
@ -197,6 +197,8 @@ let viewsCommandTests = {
|
||||
expectedErrorCode: ErrorCodes.NamespaceNotSharded,
|
||||
},
|
||||
_shardsvrNotifyShardingEvent: {skip: isAnInternalCommand},
|
||||
_shardsvrRecreateRangeDeletionTasks: {skip: isAnInternalCommand},
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: {skip: isAnInternalCommand},
|
||||
_shardsvrRefineCollectionShardKey: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollection: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollectionParticipant: {skip: isAnInternalCommand},
|
||||
@ -585,6 +587,7 @@ let viewsCommandTests = {
|
||||
planCacheSetFilter: {command: {planCacheSetFilter: "view"}, expectFailure: true},
|
||||
prepareTransaction: {skip: isUnrelated},
|
||||
profile: {skip: isUnrelated},
|
||||
recreateRangeDeletionTasks: {skip: isUnrelated},
|
||||
refineCollectionShardKey: {skip: isUnrelated},
|
||||
refreshLogicalSessionCacheNow: {skip: isAnInternalCommand},
|
||||
reapLogicalSessionCacheNow: {skip: isAnInternalCommand},
|
||||
|
||||
@ -182,6 +182,8 @@ const wcCommandsTests = {
|
||||
_shardsvrMovePrimaryExitCriticalSection: {skip: "internal command"},
|
||||
_shardsvrMoveRange: {skip: "internal command"},
|
||||
_shardsvrNotifyShardingEvent: {skip: "internal command"},
|
||||
_shardsvrRecreateRangeDeletionTasks: {skip: "internal command"},
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: {skip: "internal command"},
|
||||
_shardsvrRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrRenameCollection: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipant: {skip: "internal command"},
|
||||
@ -2075,6 +2077,7 @@ const wcCommandsTests = {
|
||||
profile: {skip: "does not accept write concern"},
|
||||
reIndex: {skip: "does not accept write concern"},
|
||||
reapLogicalSessionCacheNow: {skip: "does not accept write concern"},
|
||||
recreateRangeDeletionTasks: {skip: "does not accept write concern"},
|
||||
refineCollectionShardKey: {
|
||||
noop: {
|
||||
// Refine to same shard key
|
||||
@ -3284,6 +3287,8 @@ const wcTimeseriesViewsCommandsTests = {
|
||||
_shardsvrMovePrimaryExitCriticalSection: {skip: "internal command"},
|
||||
_shardsvrMoveRange: {skip: "internal command"},
|
||||
_shardsvrNotifyShardingEvent: {skip: "internal command"},
|
||||
_shardsvrRecreateRangeDeletionTasks: {skip: "internal command"},
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: {skip: "internal command"},
|
||||
_shardsvrRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrRenameCollection: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipant: {skip: "internal command"},
|
||||
@ -4139,6 +4144,7 @@ const wcTimeseriesViewsCommandsTests = {
|
||||
profile: {skip: "does not accept write concern"},
|
||||
reIndex: {skip: "does not accept write concern"},
|
||||
reapLogicalSessionCacheNow: {skip: "does not accept write concern"},
|
||||
recreateRangeDeletionTasks: {skip: "does not accept write concern"},
|
||||
refineCollectionShardKey: {
|
||||
|
||||
noop: {
|
||||
|
||||
159
jstests/noPassthrough/ddl/recreate_range_deletion_tasks.js
Normal file
159
jstests/noPassthrough/ddl/recreate_range_deletion_tasks.js
Normal file
@ -0,0 +1,159 @@
|
||||
/**
|
||||
* Test that `recreateRangeDeletionTasks` correctly flags unowned ranges as orphaned on all shards
|
||||
* knowing a collection.
|
||||
*/
|
||||
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
import {findChunksUtil} from "jstests/sharding/libs/find_chunks_util.js";
|
||||
|
||||
Random.setRandomSeed();
|
||||
|
||||
const st = new ShardingTest({
|
||||
shards: {shard0: {nodes: 1}, shard1: {nodes: 1}, shard2: {nodes: 1}},
|
||||
rs: {setParameter: {disableResumableRangeDeleter: true}},
|
||||
config: 1,
|
||||
});
|
||||
|
||||
const mongos = st.s;
|
||||
const shards = [st.shard0, st.shard1, st.shard2];
|
||||
|
||||
const dbName = "test";
|
||||
const hashedShardKeyCollName = "collWithHashedShardKey";
|
||||
const simpleShardKeyCollName = "collWithSimpleShardKey";
|
||||
const compoundHashedShardKeyCollName = "collWithCompoundHashedShardKey";
|
||||
const compoundShardKeyCollName = "collWithCompoundShardKey";
|
||||
|
||||
const hashedShardKeyNs = dbName + "." + hashedShardKeyCollName;
|
||||
const simpleShardKeyNs = dbName + "." + simpleShardKeyCollName;
|
||||
const compoundHashedShardKeyNs = dbName + "." + compoundHashedShardKeyCollName;
|
||||
const compoundShardKeyNs = dbName + "." + compoundShardKeyCollName;
|
||||
|
||||
const testDb = mongos.getDB(dbName);
|
||||
|
||||
function checkNoRangeDeletions(fullNs) {
|
||||
const [dbName, collName] = fullNs.split(".");
|
||||
const chunks = findChunksUtil.findChunksByNs(mongos.getDB("config"), fullNs);
|
||||
chunks.forEach((chunk) => {
|
||||
shards.forEach((shard) => {
|
||||
const nRangeDelDocs = shard.getDB("config").rangeDeletions.countDocuments({
|
||||
nss: fullNs,
|
||||
collectionUuid: chunk.uuid,
|
||||
});
|
||||
assert.eq(
|
||||
0,
|
||||
nRangeDelDocs,
|
||||
"Found unexpected range deletion docs on " + shard.shardName + " for collection " +
|
||||
fullNs,
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function checkRangeDeletionsRecreated(fullNs, shardKey) {
|
||||
const [dbName, collName] = fullNs.split(".");
|
||||
const chunks = findChunksUtil.findChunksByNs(mongos.getDB("config"), fullNs);
|
||||
chunks.forEach((chunk) => {
|
||||
shards.forEach((shard) => {
|
||||
const rangeDelDoc = shard.getDB("config").rangeDeletions.findOne({
|
||||
nss: fullNs,
|
||||
collectionUuid: chunk.uuid,
|
||||
keyPattern: shardKey,
|
||||
pending: {$ne: true} /* recreated tasks must never be pending */,
|
||||
// Match recreated tasks per orphaned range (may span multiple chunks)
|
||||
"range.min": {"$lte": chunk.min},
|
||||
"range.max": {"$gte": chunk.max},
|
||||
});
|
||||
|
||||
const collExistsOnShard = assert
|
||||
.commandWorked(shard.getDB(dbName).runCommand(
|
||||
{listCollections: 1, filter: {name: collName}}))
|
||||
.cursor.firstBatch.length == 1;
|
||||
|
||||
if (!collExistsOnShard || shard.shardName == chunk.shard) {
|
||||
// If the shard has no local incarnation of the collection or the shard is the
|
||||
// actual owner for the chunk, the range deletion task must not have been recreated.
|
||||
assert.eq(
|
||||
undefined,
|
||||
rangeDelDoc,
|
||||
"Unexpectedly found range deletion document on " + shard.shardName + " for " +
|
||||
JSON.stringify(chunk),
|
||||
);
|
||||
} else {
|
||||
assert(rangeDelDoc,
|
||||
"Range deletion not found on " + shard.shardName + " for " +
|
||||
JSON.stringify(chunk));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function shardCollectionAndRecreateRangeDeletions(fullNs, shardKey, skipEmptyRanges) {
|
||||
const [dbName, collName] = fullNs.split(".");
|
||||
assert.commandWorked(testDb.adminCommand({shardCollection: fullNs, key: shardKey}));
|
||||
|
||||
// Move to random shard the chunk with bounds [0, 100) for all shard key fields
|
||||
const randomShard = shards[Random.randInt(shards.length)].shardName;
|
||||
const min = Object.fromEntries(Object.keys(shardKey).map((field) => [field, 0]));
|
||||
const max = Object.fromEntries(Object.keys(shardKey).map((field) => [field, 100]));
|
||||
assert.commandWorked(
|
||||
mongos.adminCommand({moveRange: fullNs, min: min, max: max, toShard: randomShard}));
|
||||
|
||||
assert.commandWorked(testDb.runCommand(
|
||||
{recreateRangeDeletionTasks: collName, skipEmptyRanges: skipEmptyRanges}));
|
||||
checkRangeDeletionsRecreated(fullNs, shardKey);
|
||||
}
|
||||
|
||||
const skipEmptyRanges = false;
|
||||
// Sharding a collection with hashed-prefixed shard key initially distributes the shard key space
|
||||
// over all the shards
|
||||
shardCollectionAndRecreateRangeDeletions(hashedShardKeyNs, {h: "hashed"}, skipEmptyRanges);
|
||||
shardCollectionAndRecreateRangeDeletions(
|
||||
compoundHashedShardKeyNs, {h: "hashed", sk0: 1, sk1: 1}, skipEmptyRanges);
|
||||
|
||||
// Sharding a collection with non-hashed-prefixed shard key initially creates one chunk on the
|
||||
// primary shard (`shardCollectionAndRecreateRangeDeletions` randomly moves around some data)
|
||||
shardCollectionAndRecreateRangeDeletions(simpleShardKeyNs, {sk: 1}, skipEmptyRanges);
|
||||
shardCollectionAndRecreateRangeDeletions(compoundShardKeyNs, {sk0: 1, sk1: 1}, skipEmptyRanges);
|
||||
|
||||
function testSkipEmptyRanges() {
|
||||
// Distribute an empty collection with hashed shard key over all shards
|
||||
const initiallyEmptyCollName = "initiallyEmptyColl";
|
||||
const emptyNs = dbName + "." + initiallyEmptyCollName;
|
||||
const shardKey = {h: "hashed"};
|
||||
assert.commandWorked(testDb.adminCommand({shardCollection: emptyNs, key: shardKey}));
|
||||
|
||||
// Invoking the command must not create any range deletion on any shard because the whole
|
||||
// collection is empty
|
||||
assert.commandWorked(
|
||||
testDb.runCommand(
|
||||
{recreateRangeDeletionTasks: initiallyEmptyCollName, skipEmptyRanges: true}),
|
||||
);
|
||||
checkNoRangeDeletions(emptyNs);
|
||||
|
||||
// Insert a bunch of documents directly on shards to create orphaned docs in every non-owned
|
||||
// range
|
||||
shards.forEach((shard) => {
|
||||
const docs = Array.from({length: 100}, (_, i) => ({h: i}));
|
||||
assert.commandWorked(
|
||||
shard.getDB(dbName).runCommand({insert: initiallyEmptyCollName, documents: docs}));
|
||||
});
|
||||
|
||||
// Reinvoking the command must create range deletions for every non-owned range because they're
|
||||
// not empty
|
||||
assert.commandWorked(
|
||||
testDb.runCommand(
|
||||
{recreateRangeDeletionTasks: initiallyEmptyCollName, skipEmptyRanges: false}),
|
||||
);
|
||||
checkRangeDeletionsRecreated(emptyNs, shardKey);
|
||||
}
|
||||
|
||||
testSkipEmptyRanges();
|
||||
|
||||
// Re-enable the range deleter to double check that new tasks are valid and eventually drain during
|
||||
// teardown
|
||||
shards.forEach((shard) => {
|
||||
assert.commandWorked(
|
||||
shard.adminCommand({setParameter: 1, disableResumableRangeDeleter: false}));
|
||||
});
|
||||
|
||||
st.stop();
|
||||
@ -129,6 +129,8 @@ const allCommands = {
|
||||
_shardsvrMovePrimaryExitCriticalSection: {skip: isAnInternalCommand},
|
||||
_shardsvrMoveRange: {skip: isAnInternalCommand},
|
||||
_shardsvrNotifyShardingEvent: {skip: isAnInternalCommand},
|
||||
_shardsvrRecreateRangeDeletionTasks: {skip: isAnInternalCommand},
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollection: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollectionParticipant: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollectionParticipantUnblock: {skip: isAnInternalCommand},
|
||||
@ -1202,6 +1204,18 @@ const allCommands = {
|
||||
recipientForgetMigration: {skip: isAnInternalCommand},
|
||||
recipientSyncData: {skip: isAnInternalCommand},
|
||||
recipientVoteImportedFiles: {skip: isAnInternalCommand},
|
||||
recreateRangeDeletionTasks: {
|
||||
isShardedOnly: true,
|
||||
isAdminCommand: false,
|
||||
setUp: function(conn) {
|
||||
assert.commandWorked(
|
||||
conn.getDB(dbName).adminCommand({shardCollection: fullNs, key: {a: "hashed"}}));
|
||||
},
|
||||
command: {recreateRangeDeletionTasks: collName, skipEmptyRanges: false},
|
||||
teardown: function(conn) {
|
||||
assert.commandWorked(conn.getDB(dbName).runCommand({drop: collName}));
|
||||
},
|
||||
},
|
||||
refineCollectionShardKey: {
|
||||
isShardedOnly: true,
|
||||
isAdminCommand: true,
|
||||
|
||||
@ -116,6 +116,8 @@ const allCommands = {
|
||||
_shardsvrMovePrimaryExitCriticalSection: {skip: isPrimaryOnly},
|
||||
_shardsvrMoveRange: {skip: isPrimaryOnly},
|
||||
_shardsvrNotifyShardingEvent: {skip: isPrimaryOnly},
|
||||
_shardsvrRecreateRangeDeletionTasks: {skip: isPrimaryOnly},
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: {skip: isPrimaryOnly},
|
||||
_shardsvrRenameCollection: {skip: isPrimaryOnly},
|
||||
_shardsvrRenameCollectionParticipant: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollectionParticipantUnblock: {skip: isAnInternalCommand},
|
||||
|
||||
@ -136,6 +136,8 @@ const allCommands = {
|
||||
_shardsvrReshardCollection: {skip: isAnInternalCommand},
|
||||
_shardsvrReshardingOperationTime: {skip: isAnInternalCommand},
|
||||
_shardsvrReshardRecipientClone: {skip: isAnInternalCommand},
|
||||
_shardsvrRecreateRangeDeletionTasks: {skip: isAnInternalCommand},
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: {skip: isAnInternalCommand},
|
||||
_shardsvrRefineCollectionShardKey: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
@ -972,6 +974,7 @@ const allCommands = {
|
||||
recipientForgetMigration: {skip: isAnInternalCommand},
|
||||
recipientSyncData: {skip: isAnInternalCommand},
|
||||
recipientVoteImportedFiles: {skip: isAnInternalCommand},
|
||||
recreateRangeDeletionTasks: {skip: requiresMongoS},
|
||||
refineCollectionShardKey: {skip: requiresMongoS},
|
||||
refreshLogicalSessionCacheNow: {
|
||||
command: {refreshLogicalSessionCacheNow: 1},
|
||||
|
||||
@ -688,6 +688,15 @@ let testCases = {
|
||||
},
|
||||
profile: {skip: "not supported in mongos"},
|
||||
reapLogicalSessionCacheNow: {skip: "is a no-op on mongos"},
|
||||
recreateRangeDeletionTasks: {
|
||||
run: {
|
||||
sendsDbVersion: true,
|
||||
explicitlyCreateCollection: true,
|
||||
command: function(dbName, collName) {
|
||||
return {recreateRangeDeletionTasks: collName, skipEmptyRanges: true};
|
||||
},
|
||||
},
|
||||
},
|
||||
refineCollectionShardKey: {skip: "not on a user database"},
|
||||
refreshLogicalSessionCacheNow: {skip: "goes through the cluster write path"},
|
||||
refreshSessions: {skip: "executes locally on mongos (not sent to any remote node)"},
|
||||
|
||||
@ -26,5 +26,7 @@ export const commandsAddedToMongodSinceLastLTS = [
|
||||
"releaseMemory",
|
||||
"_configsvrStartShardDraining",
|
||||
"_configsvrShardDrainingStatus",
|
||||
"_shardsvrRecreateRangeDeletionTasks",
|
||||
"_shardsvrRecreateRangeDeletionTasksParticipant",
|
||||
"testCommandFeatureFlaggedOnLatestFCV82",
|
||||
];
|
||||
|
||||
@ -16,6 +16,7 @@ export const commandsAddedToMongosSinceLastLTS = [
|
||||
"releaseMemory",
|
||||
"replicateSearchIndexCommand",
|
||||
"getTrafficRecordingStatus",
|
||||
"recreateRangeDeletionTasks",
|
||||
"startTrafficRecording",
|
||||
"stopTrafficRecording",
|
||||
"startShardDraining",
|
||||
|
||||
@ -1024,6 +1024,16 @@ export let MongosAPIParametersUtil = (function() {
|
||||
},
|
||||
{commandName: "profile", skip: "not supported in mongos"},
|
||||
{commandName: "reapLogicalSessionCacheNow", skip: "is a no-op on mongos"},
|
||||
{
|
||||
commandName: "recreateRangeDeletionTasks",
|
||||
run: {
|
||||
inAPIVersion1: false,
|
||||
shardCommandName: "_shardsvrRecreateRangeDeletionTasks",
|
||||
permittedInTxn: false,
|
||||
requiresShardedCollection: true,
|
||||
command: () => ({recreateRangeDeletionTasks: "collection", skipEmptyRanges: true}),
|
||||
},
|
||||
},
|
||||
{
|
||||
commandName: "refineCollectionShardKey",
|
||||
run: {
|
||||
|
||||
@ -189,6 +189,8 @@ let testCases = {
|
||||
"does not accept read or write concern (accepts writeConcern, but only explicitly and when _secondaryThrottle is true)"
|
||||
},
|
||||
_shardsvrNotifyShardingEvent: {skip: "internal command"},
|
||||
_shardsvrRecreateRangeDeletionTasks: {skip: "internal command"},
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant: {skip: "internal command"},
|
||||
_shardsvrRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrRenameCollection: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipant: {skip: "internal command"},
|
||||
@ -665,6 +667,7 @@ let testCases = {
|
||||
recipientForgetMigration: {skip: "does not accept read or write concern"},
|
||||
recipientSyncData: {skip: "does not accept read or write concern"},
|
||||
recipientVoteImportedFiles: {skip: "does not accept read or write concern"},
|
||||
recreateRangeDeletionTasks: {skip: "accepts only majority"},
|
||||
refineCollectionShardKey: {skip: "does not accept read or write concern"},
|
||||
refreshLogicalSessionCacheNow: {skip: "does not accept read or write concern"},
|
||||
refreshSessions: {skip: "does not accept read or write concern"},
|
||||
|
||||
@ -328,6 +328,7 @@ let testCases = {
|
||||
planCacheSetFilter: {skip: "does not return user data"},
|
||||
profile: {skip: "primary only"},
|
||||
reapLogicalSessionCacheNow: {skip: "does not return user data"},
|
||||
recreateRangeDeletionTasks: {skip: "primary only"},
|
||||
refineCollectionShardKey: {skip: "primary only"},
|
||||
refreshLogicalSessionCacheNow: {skip: "does not return user data"},
|
||||
refreshSessions: {skip: "does not return user data"},
|
||||
|
||||
@ -424,6 +424,7 @@ let testCases = {
|
||||
planCacheSetFilter: {skip: "does not return user data"},
|
||||
profile: {skip: "primary only"},
|
||||
reapLogicalSessionCacheNow: {skip: "does not return user data"},
|
||||
recreateRangeDeletionTasks: {skip: "primary only"},
|
||||
refineCollectionShardKey: {skip: "primary only"},
|
||||
refreshLogicalSessionCacheNow: {skip: "does not return user data"},
|
||||
refreshSessions: {skip: "does not return user data"},
|
||||
|
||||
@ -342,6 +342,7 @@ let testCases = {
|
||||
planCacheSetFilter: {skip: "does not return user data"},
|
||||
profile: {skip: "primary only"},
|
||||
reapLogicalSessionCacheNow: {skip: "does not return user data"},
|
||||
recreateRangeDeletionTasks: {skip: "primary only"},
|
||||
refineCollectionShardKey: {skip: "primary only"},
|
||||
refreshLogicalSessionCacheNow: {skip: "does not return user data"},
|
||||
refreshSessions: {skip: "does not return user data"},
|
||||
|
||||
@ -1431,6 +1431,7 @@ mongo_cc_library(
|
||||
"shardsvr_move_range_command.cpp",
|
||||
"shardsvr_notify_sharding_event_command.cpp",
|
||||
"shardsvr_participant_block_command.cpp",
|
||||
"shardsvr_recreate_range_deletion_tasks_command.cpp",
|
||||
"shardsvr_refine_collection_shard_key_command.cpp",
|
||||
"shardsvr_rename_collection_command.cpp",
|
||||
"shardsvr_rename_collection_participant_command.cpp",
|
||||
|
||||
@ -162,100 +162,6 @@ void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
|
||||
TransactionParticipant::TransactionActions::kNone);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if any documents already exist in the given shard key range on the recipient shard.
|
||||
* This is used to detect spurious documents that may have been incorrectly present due to
|
||||
* historical reasons (e.g., inserts via direct connection) or unforeseen range deleter bugs.
|
||||
*
|
||||
* Returns the shard key of the first document found in the range, or boost::none if no documents
|
||||
* exist.
|
||||
*/
|
||||
boost::optional<BSONObj> checkForExistingDocumentsInRange(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& collUuid,
|
||||
const BSONObj& shardKeyPattern,
|
||||
const BSONObj& min,
|
||||
const BSONObj& max) {
|
||||
// Acquire collection to scan for existing documents.
|
||||
auto collection = acquireCollection(
|
||||
opCtx,
|
||||
CollectionAcquisitionRequest::fromOpCtx(opCtx, nss, AcquisitionPrerequisites::kRead),
|
||||
MODE_IS);
|
||||
|
||||
uassert(ErrorCodes::NamespaceNotFound,
|
||||
str::stream() << "Cannot find collection " << nss.toStringForErrorMsg(),
|
||||
collection.exists());
|
||||
|
||||
// Verify collection UUID matches (safety check).
|
||||
uassert(ErrorCodes::InvalidUUID,
|
||||
str::stream() << "Collection UUID mismatch during migration. Expected "
|
||||
<< collUuid.toString() << " but found "
|
||||
<< collection.getCollectionPtr()->uuid().toString(),
|
||||
collection.uuid() == collUuid);
|
||||
|
||||
// Find a shard key prefixed index to use for the scan.
|
||||
const auto shardKeyIdx = findShardKeyPrefixedIndex(
|
||||
opCtx, collection.getCollectionPtr(), shardKeyPattern, false /* requireSingleKey */);
|
||||
|
||||
uassert(ErrorCodes::IndexNotFound,
|
||||
str::stream() << "Could not find shard key index for pattern " << shardKeyPattern
|
||||
<< " on collection " << nss.toStringForErrorMsg(),
|
||||
shardKeyIdx);
|
||||
|
||||
// Use InternalPlanner to scan the shard key index within the range.
|
||||
auto exec = InternalPlanner::shardKeyIndexScan(opCtx,
|
||||
collection,
|
||||
*shardKeyIdx,
|
||||
min,
|
||||
max,
|
||||
BoundInclusion::kIncludeStartKeyOnly,
|
||||
PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
|
||||
InternalPlanner::FORWARD);
|
||||
|
||||
BSONObj doc;
|
||||
PlanExecutor::ExecState state = exec->getNext(&doc, nullptr);
|
||||
|
||||
if (state == PlanExecutor::ADVANCED) {
|
||||
// Found an index key in the range - reconstruct the shard key from index values:
|
||||
// this avoids the need to load the full document in shardKeyIndexScan() with
|
||||
// InternalPlanner::IXSCAN_FETCH. Index key format: {"": value1, "": value2, ...}, we need
|
||||
// to map these to proper field names from the shard key pattern.
|
||||
|
||||
BSONObjBuilder shardKeyBuilder;
|
||||
BSONObjIterator indexKeyIter(doc);
|
||||
BSONObjIterator shardKeyPatternIter(shardKeyPattern);
|
||||
|
||||
// Map index key values to shard key field names.
|
||||
while (indexKeyIter.more() && shardKeyPatternIter.more()) {
|
||||
BSONElement indexValue = indexKeyIter.next();
|
||||
BSONElement shardKeyField = shardKeyPatternIter.next();
|
||||
|
||||
// Append the index value with the proper field name from shard key pattern.
|
||||
shardKeyBuilder.appendAs(indexValue, shardKeyField.fieldName());
|
||||
}
|
||||
|
||||
BSONObj reconstructedShardKey = shardKeyBuilder.obj();
|
||||
|
||||
LOGV2_DEBUG(11095301,
|
||||
3,
|
||||
"Found index key in range, reconstructed shard key from index data",
|
||||
"indexKey"_attr = doc,
|
||||
"shardKeyPattern"_attr = shardKeyPattern,
|
||||
"reconstructedShardKey"_attr = reconstructedShardKey);
|
||||
|
||||
return reconstructedShardKey;
|
||||
} else if (state == PlanExecutor::IS_EOF) {
|
||||
// No documents found in the range.
|
||||
return boost::none;
|
||||
} else {
|
||||
// Error occurred during scan.
|
||||
uasserted(ErrorCodes::InternalError,
|
||||
str::stream() << "Error while scanning for existing documents in range [" << min
|
||||
<< ", " << max << ") on collection " << nss.toStringForErrorMsg()
|
||||
<< ": " << PlanExecutor::stateToStr(state));
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Callable>
|
||||
constexpr bool returnsVoid() {
|
||||
return std::is_void_v<std::invoke_result_t<Callable>>;
|
||||
@ -2222,4 +2128,99 @@ void MigrationDestinationManager::onStepDown() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if any documents already exist in the given shard key range on the recipient shard.
|
||||
* This is used to detect spurious documents that may have been incorrectly present due to
|
||||
* historical reasons (e.g., inserts via direct connection) or unforeseen range deleter bugs.
|
||||
*
|
||||
* Returns the shard key of the first document found in the range, or boost::none if no documents
|
||||
* exist.
|
||||
*/
|
||||
boost::optional<BSONObj> MigrationDestinationManager::checkForExistingDocumentsInRange(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& collUuid,
|
||||
const BSONObj& shardKeyPattern,
|
||||
const BSONObj& min,
|
||||
const BSONObj& max) {
|
||||
// Acquire collection to scan for existing documents.
|
||||
auto collection = acquireCollection(
|
||||
opCtx,
|
||||
CollectionAcquisitionRequest::fromOpCtx(opCtx, nss, AcquisitionPrerequisites::kRead),
|
||||
MODE_IS);
|
||||
|
||||
uassert(ErrorCodes::NamespaceNotFound,
|
||||
str::stream() << "Cannot find collection " << nss.toStringForErrorMsg(),
|
||||
collection.exists());
|
||||
|
||||
// Verify collection UUID matches (safety check).
|
||||
uassert(ErrorCodes::InvalidUUID,
|
||||
str::stream() << "Collection UUID mismatch during migration. Expected "
|
||||
<< collUuid.toString() << " but found "
|
||||
<< collection.getCollectionPtr()->uuid().toString(),
|
||||
collection.uuid() == collUuid);
|
||||
|
||||
// Find a shard key prefixed index to use for the scan.
|
||||
const auto shardKeyIdx = findShardKeyPrefixedIndex(
|
||||
opCtx, collection.getCollectionPtr(), shardKeyPattern, false /* requireSingleKey */);
|
||||
|
||||
uassert(ErrorCodes::IndexNotFound,
|
||||
str::stream() << "Could not find shard key index for pattern " << shardKeyPattern
|
||||
<< " on collection " << nss.toStringForErrorMsg(),
|
||||
shardKeyIdx);
|
||||
|
||||
// Use InternalPlanner to scan the shard key index within the range.
|
||||
auto exec = InternalPlanner::shardKeyIndexScan(opCtx,
|
||||
collection,
|
||||
*shardKeyIdx,
|
||||
min,
|
||||
max,
|
||||
BoundInclusion::kIncludeStartKeyOnly,
|
||||
PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
|
||||
InternalPlanner::FORWARD);
|
||||
|
||||
BSONObj doc;
|
||||
PlanExecutor::ExecState state = exec->getNext(&doc, nullptr);
|
||||
|
||||
if (state == PlanExecutor::ADVANCED) {
|
||||
// Found an index key in the range - reconstruct the shard key from index values:
|
||||
// this avoids the need to load the full document in shardKeyIndexScan() with
|
||||
// InternalPlanner::IXSCAN_FETCH. Index key format: {"": value1, "": value2, ...}, we need
|
||||
// to map these to proper field names from the shard key pattern.
|
||||
|
||||
BSONObjBuilder shardKeyBuilder;
|
||||
BSONObjIterator indexKeyIter(doc);
|
||||
BSONObjIterator shardKeyPatternIter(shardKeyPattern);
|
||||
|
||||
// Map index key values to shard key field names.
|
||||
while (indexKeyIter.more() && shardKeyPatternIter.more()) {
|
||||
BSONElement indexValue = indexKeyIter.next();
|
||||
BSONElement shardKeyField = shardKeyPatternIter.next();
|
||||
|
||||
// Append the index value with the proper field name from shard key pattern.
|
||||
shardKeyBuilder.appendAs(indexValue, shardKeyField.fieldName());
|
||||
}
|
||||
|
||||
BSONObj reconstructedShardKey = shardKeyBuilder.obj();
|
||||
|
||||
LOGV2_DEBUG(11095301,
|
||||
3,
|
||||
"Found index key in range, reconstructed shard key from index data",
|
||||
"indexKey"_attr = doc,
|
||||
"shardKeyPattern"_attr = shardKeyPattern,
|
||||
"reconstructedShardKey"_attr = reconstructedShardKey);
|
||||
|
||||
return reconstructedShardKey;
|
||||
} else if (state == PlanExecutor::IS_EOF) {
|
||||
// No documents found in the range.
|
||||
return boost::none;
|
||||
} else {
|
||||
// Error occurred during scan.
|
||||
uasserted(ErrorCodes::InternalError,
|
||||
str::stream() << "Error while scanning for existing documents in range [" << min
|
||||
<< ", " << max << ") on collection " << nss.toStringForErrorMsg()
|
||||
<< ": " << PlanExecutor::stateToStr(state));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -239,6 +239,21 @@ public:
|
||||
const NamespaceString& nss,
|
||||
const CollectionOptionsAndIndexes& collectionOptionsAndIndexes);
|
||||
|
||||
/**
|
||||
* Checks if any documents already exist in the given shard key range on the recipient shard.
|
||||
* This is used to detect orphaned documents that are present due to possible range deleter bugs
|
||||
* or unsupported manual operations on a direct connection.
|
||||
*
|
||||
* Returns the shard key of the first document found in the range, or boost::none if no
|
||||
* documents exist.
|
||||
*/
|
||||
static boost::optional<BSONObj> checkForExistingDocumentsInRange(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& collUuid,
|
||||
const BSONObj& shardKeyPattern,
|
||||
const BSONObj& min,
|
||||
const BSONObj& max);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Set state to Fail without Logging.
|
||||
|
||||
@ -0,0 +1,293 @@
|
||||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/catalog_raii.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/s/active_migrations_registry.h"
|
||||
#include "mongo/db/s/collection_sharding_runtime.h"
|
||||
#include "mongo/db/s/ddl_lock_manager.h"
|
||||
#include "mongo/db/s/migration_destination_manager.h"
|
||||
#include "mongo/db/s/range_deletion_util.h"
|
||||
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
|
||||
#include "mongo/db/s/sharding_state.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/vector_clock.h"
|
||||
#include "mongo/s/cluster_commands_helpers.h"
|
||||
#include "mongo/s/grid.h"
|
||||
|
||||
#include "src/mongo/s/request_types/sharded_ddl_commands_gen.h"
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter
|
||||
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
constexpr StringData kLockReason = "RecreateRangeDeletionTasks"_sd;
|
||||
|
||||
class ShardSvrRecreateRangeDeletionTasksCommand final
|
||||
: public TypedCommand<ShardSvrRecreateRangeDeletionTasksCommand> {
|
||||
public:
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return Command::AllowedOnSecondary::kNever;
|
||||
}
|
||||
|
||||
bool skipApiVersionCheck() const override {
|
||||
// Internal command (server to server).
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string help() const override {
|
||||
return "Internal command. Recreating range deletion task documents for a collection "
|
||||
"on all shards, according to the routing table.";
|
||||
}
|
||||
|
||||
using Request = ShardSvrRecreateRangeDeletionTasks;
|
||||
|
||||
class Invocation final : public InvocationBase {
|
||||
public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
ShardingState::get(opCtx)->assertCanAcceptShardedCommands();
|
||||
CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName,
|
||||
opCtx->getWriteConcern());
|
||||
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
const auto& nss = ns();
|
||||
|
||||
// Acquire DDL lock to ensure collection stability while recreating range deletion docs
|
||||
DDLLockManager::ScopedCollectionDDLLock dbDDLLock{opCtx, nss, kLockReason, MODE_X};
|
||||
|
||||
const auto collectionUuid = [&] {
|
||||
const auto collection = acquireCollectionMaybeLockFree(
|
||||
opCtx,
|
||||
CollectionAcquisitionRequest(nss,
|
||||
PlacementConcern::kPretendUnsharded,
|
||||
repl::ReadConcernArgs::get(opCtx),
|
||||
AcquisitionPrerequisites::kRead));
|
||||
uassert(ErrorCodes::NamespaceNotFound,
|
||||
str::stream()
|
||||
<< "Collection " << nss.toStringForErrorMsg() << " does not exist",
|
||||
collection.exists());
|
||||
return collection.uuid();
|
||||
}();
|
||||
|
||||
ShardSvrRecreateRangeDeletionTasksParticipant req{
|
||||
nss, request().getSkipEmptyRanges(), collectionUuid};
|
||||
|
||||
auto shardResponses = scatterGatherUnversionedTargetAllShards(
|
||||
opCtx,
|
||||
nss.dbName(),
|
||||
applyReadWriteConcern(opCtx, this, req.toBSON()),
|
||||
ReadPreferenceSetting::get(opCtx),
|
||||
Shard::RetryPolicy::kIdempotent);
|
||||
|
||||
for (const auto& response : shardResponses) {
|
||||
uassertStatusOK(response.swResponse.getStatus());
|
||||
const auto& cmdResponse = response.swResponse.getValue();
|
||||
try {
|
||||
uassertStatusOK(getStatusFromCommandResult(cmdResponse.data));
|
||||
} catch (const ExceptionFor<ErrorCodes::CollectionUUIDMismatch>&) {
|
||||
// Ignore collection UUID mismatch errors: no need to create range deletion
|
||||
// documents on shards that don't know the collection or host an incarnation
|
||||
// inconsistent with respect to the primary shard.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool supportsWriteConcern() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {
|
||||
uassert(ErrorCodes::Unauthorized,
|
||||
"Unauthorized",
|
||||
AuthorizationSession::get(opCtx->getClient())
|
||||
->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(ns()),
|
||||
ActionType::internal));
|
||||
}
|
||||
|
||||
NamespaceString ns() const override {
|
||||
return request().getNamespace();
|
||||
}
|
||||
};
|
||||
};
|
||||
MONGO_REGISTER_COMMAND(ShardSvrRecreateRangeDeletionTasksCommand).forShard();
|
||||
|
||||
class ShardSvrRecreateRangeDeletionTasksParticipantCommand final
|
||||
: public TypedCommand<ShardSvrRecreateRangeDeletionTasksParticipantCommand> {
|
||||
public:
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return Command::AllowedOnSecondary::kNever;
|
||||
}
|
||||
|
||||
bool skipApiVersionCheck() const override {
|
||||
// Internal command (server to server).
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string help() const override {
|
||||
return "Internal command. Recreating range deletion task documents for a collection on a"
|
||||
"specific shard, according to the routing table.";
|
||||
}
|
||||
|
||||
using Request = ShardSvrRecreateRangeDeletionTasksParticipant;
|
||||
|
||||
class Invocation final : public InvocationBase {
|
||||
public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName,
|
||||
opCtx->getWriteConcern());
|
||||
ShardingState::get(opCtx)->assertCanAcceptShardedCommands();
|
||||
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
const auto& nss = ns();
|
||||
const auto& expectedCollectionUuid = request().getUuid();
|
||||
bool shouldSkipEmptyRanges = request().getSkipEmptyRanges();
|
||||
|
||||
// Prevent migrations from running while recreating range deletion documents
|
||||
auto& activeMigrationsRegistry = ActiveMigrationsRegistry::get(opCtx);
|
||||
activeMigrationsRegistry.lock(opCtx, kLockReason);
|
||||
ScopeGuard unlockActiveMigrationRegistry(
|
||||
[&activeMigrationsRegistry] { activeMigrationsRegistry.unlock(kLockReason); });
|
||||
|
||||
const auto metadata = _getCollectionMetadata(opCtx, nss, expectedCollectionUuid);
|
||||
|
||||
// If sharded metadata are available, we can rely on the current state (at most the
|
||||
// shard may have stale knowledge about unowned ranges, but what matters is that it
|
||||
// knows they're not owned, no matter where they're placed).
|
||||
if (metadata->isSharded()) {
|
||||
const ShardId donorShardId{"RecreateRangeDeletionTasks"};
|
||||
const auto shardKey = KeyPattern(metadata->getKeyPattern());
|
||||
const auto preMigrationShardVersion = ChunkVersion::IGNORED();
|
||||
|
||||
const auto emptyChunkMap =
|
||||
RangeMap{SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<BSONObj>()};
|
||||
BSONObj lookupKey = metadata->getMinKey();
|
||||
boost::optional<ChunkRange> range;
|
||||
while ((range = metadata->getNextOrphanRange(emptyChunkMap, lookupKey))) {
|
||||
lookupKey = range->getMax();
|
||||
|
||||
if (shouldSkipEmptyRanges) {
|
||||
boost::optional<BSONObj> firstDocInRange =
|
||||
MigrationDestinationManager::checkForExistingDocumentsInRange(
|
||||
opCtx,
|
||||
nss,
|
||||
expectedCollectionUuid,
|
||||
shardKey.toBSON(),
|
||||
range->getMin(),
|
||||
range->getMax());
|
||||
if (!firstDocInRange.has_value()) {
|
||||
// Do not persist range deletion document for empty range
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
RangeDeletionTask task(UUID::gen() /* random task uuid (no migration id) */,
|
||||
nss,
|
||||
expectedCollectionUuid,
|
||||
donorShardId,
|
||||
*range,
|
||||
CleanWhenEnum::kNow);
|
||||
const auto currentTime = VectorClock::get(opCtx)->getTime();
|
||||
task.setTimestamp(currentTime.clusterTime().asTimestamp());
|
||||
task.setKeyPattern(shardKey);
|
||||
task.setPreMigrationShardVersion(preMigrationShardVersion);
|
||||
rangedeletionutil::persistRangeDeletionTaskLocally(
|
||||
opCtx,
|
||||
task,
|
||||
ShardingCatalogClient::writeConcernLocalHavingUpstreamWaiter());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool supportsWriteConcern() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {
|
||||
uassert(ErrorCodes::Unauthorized,
|
||||
"Unauthorized",
|
||||
AuthorizationSession::get(opCtx->getClient())
|
||||
->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(ns()),
|
||||
ActionType::internal));
|
||||
}
|
||||
|
||||
NamespaceString ns() const override {
|
||||
return request().getNamespace();
|
||||
}
|
||||
|
||||
private:
|
||||
/**
|
||||
* Retry loop to retrieve collection metadata. This method will throw if the collection does
|
||||
* not exist locally, avoiding unnecessary refreshes.
|
||||
*/
|
||||
static boost::optional<CollectionMetadata> _getCollectionMetadata(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUuid) {
|
||||
while (true) {
|
||||
const auto metadata = [&] {
|
||||
const auto collection = acquireCollection(
|
||||
opCtx,
|
||||
CollectionAcquisitionRequest(nss,
|
||||
expectedCollectionUuid,
|
||||
PlacementConcern::kPretendUnsharded,
|
||||
repl::ReadConcernArgs::get(opCtx),
|
||||
AcquisitionPrerequisites::kRead),
|
||||
MODE_IS);
|
||||
const auto scopedCsr =
|
||||
CollectionShardingRuntime::assertCollectionLockedAndAcquireShared(opCtx,
|
||||
nss);
|
||||
return scopedCsr->getCurrentMetadataIfKnown();
|
||||
}();
|
||||
|
||||
if (metadata.has_value()) {
|
||||
return metadata;
|
||||
}
|
||||
|
||||
FilteringMetadataCache::get(opCtx)
|
||||
->onCollectionPlacementVersionMismatch(opCtx, nss, boost::none)
|
||||
.ignore();
|
||||
}
|
||||
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
};
|
||||
};
|
||||
MONGO_REGISTER_COMMAND(ShardSvrRecreateRangeDeletionTasksParticipantCommand).forShard();
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
@ -28,6 +28,15 @@ idl_generator(
|
||||
],
|
||||
)
|
||||
|
||||
idl_generator(
|
||||
name = "recreate_range_deletion_tasks_gen",
|
||||
src = "recreate_range_deletion_tasks.idl",
|
||||
deps = [
|
||||
"//src/mongo/db:basic_types_gen",
|
||||
"//src/mongo/idl:generic_argument_gen",
|
||||
],
|
||||
)
|
||||
|
||||
idl_generator(
|
||||
name = "refine_collection_shard_key_gen",
|
||||
src = "refine_collection_shard_key.idl",
|
||||
@ -156,6 +165,7 @@ mongo_cc_library(
|
||||
"cluster_oplog_note_cmd.cpp",
|
||||
"cluster_query_settings_cmds.cpp",
|
||||
"cluster_query_without_shard_key_cmd.cpp",
|
||||
"cluster_recreate_range_deletion_tasks_command.cpp",
|
||||
"cluster_refine_collection_shard_key_cmd.cpp",
|
||||
"cluster_rename_collection_cmd.cpp",
|
||||
"cluster_repair_sharded_collection_chunks_history_cmd.cpp",
|
||||
@ -181,6 +191,7 @@ mongo_cc_library(
|
||||
"s_read_write_concern_defaults_server_status.cpp",
|
||||
":cluster_commands_gen",
|
||||
":cluster_fsync_unlock_cmd_gen",
|
||||
":recreate_range_deletion_tasks_gen",
|
||||
":refine_collection_shard_key_gen",
|
||||
":shard_collection_gen",
|
||||
"//src/mongo/s/commands/commit_quorum:cluster_set_index_commit_quorum_cmd.cpp",
|
||||
|
||||
@ -0,0 +1,117 @@
|
||||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/client/read_preference.h"
|
||||
#include "mongo/db/auth/action_type.h"
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/auth/resource_pattern.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/generic_argument_util.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/s/client/shard.h"
|
||||
#include "mongo/s/cluster_commands_helpers.h"
|
||||
#include "mongo/s/commands/recreate_range_deletion_tasks_gen.h"
|
||||
#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/s/router_role.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
|
||||
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class RecreateRangeDeletionTasksRequestCommand final
|
||||
: public TypedCommand<RecreateRangeDeletionTasksRequestCommand> {
|
||||
public:
|
||||
using Request = RecreateRangeDeletionTasksRequest;
|
||||
|
||||
class Invocation final : public InvocationBase {
|
||||
public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
const auto& nss = ns();
|
||||
ShardSvrRecreateRangeDeletionTasks shardSvrReq(nss);
|
||||
shardSvrReq.setSkipEmptyRanges(request().getSkipEmptyRanges());
|
||||
generic_argument_util::setMajorityWriteConcern(shardSvrReq, &opCtx->getWriteConcern());
|
||||
|
||||
sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), nss.dbName());
|
||||
router.route(opCtx,
|
||||
Request::kCommandName,
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& dbInfo) {
|
||||
auto cmdResponse =
|
||||
executeCommandAgainstDatabasePrimaryOnlyAttachingDbVersion(
|
||||
opCtx,
|
||||
nss.dbName(),
|
||||
dbInfo,
|
||||
shardSvrReq.toBSON(),
|
||||
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
|
||||
Shard::RetryPolicy::kIdempotent);
|
||||
|
||||
const auto remoteResponse = uassertStatusOK(cmdResponse.swResponse);
|
||||
uassertStatusOK(getStatusFromCommandResult(remoteResponse.data));
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
NamespaceString ns() const override {
|
||||
return request().getNamespace();
|
||||
}
|
||||
|
||||
bool supportsWriteConcern() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {
|
||||
uassert(ErrorCodes::Unauthorized,
|
||||
"Unauthorized",
|
||||
AuthorizationSession::get(opCtx->getClient())
|
||||
->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(ns()),
|
||||
ActionType::internal));
|
||||
}
|
||||
};
|
||||
|
||||
std::string help() const override {
|
||||
return "Command to recreate range deletion tasks for all unowned ranges on all shards "
|
||||
"knowing the collection.";
|
||||
}
|
||||
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return AllowedOnSecondary::kNever;
|
||||
}
|
||||
};
|
||||
MONGO_REGISTER_COMMAND(RecreateRangeDeletionTasksRequestCommand).forRouter();
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
49
src/mongo/s/commands/recreate_range_deletion_tasks.idl
Normal file
49
src/mongo/s/commands/recreate_range_deletion_tasks.idl
Normal file
@ -0,0 +1,49 @@
|
||||
# Copyright (C) 2025-present MongoDB, Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the Server Side Public License, version 1,
|
||||
# as published by MongoDB, Inc.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# Server Side Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the Server Side Public License
|
||||
# along with this program. If not, see
|
||||
# <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
#
|
||||
# As a special exception, the copyright holders give permission to link the
|
||||
# code of portions of this program with the OpenSSL library under certain
|
||||
# conditions as described in each individual source file and distribute
|
||||
# linked combinations including the program with the OpenSSL library. You
|
||||
# must comply with the Server Side Public License in all respects for
|
||||
# all of the code used other than as permitted herein. If you modify file(s)
|
||||
# with this exception, you may extend this exception to your version of the
|
||||
# file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
# delete this exception statement from your version. If you delete this
|
||||
# exception statement from all source files in the program, then also delete
|
||||
# it in the license file.
|
||||
#
|
||||
|
||||
global:
|
||||
mod_visibility: private
|
||||
cpp_namespace: "mongo"
|
||||
|
||||
imports:
|
||||
- "mongo/db/basic_types.idl"
|
||||
|
||||
commands:
|
||||
recreateRangeDeletionTasks:
|
||||
description: "The public recreateRangeDeletionTasks command on mongos."
|
||||
command_name: recreateRangeDeletionTasks
|
||||
cpp_name: RecreateRangeDeletionTasksRequest
|
||||
namespace: concatenate_with_db
|
||||
allow_global_collection_name: true
|
||||
api_version: ""
|
||||
reply_type: OkReply
|
||||
strict: false
|
||||
fields:
|
||||
skipEmptyRanges:
|
||||
description: "If true, only recreate range deletion document if needed"
|
||||
type: bool
|
||||
@ -928,3 +928,38 @@ commands:
|
||||
type: namespacestring
|
||||
strict: false
|
||||
reply_type: OkReply
|
||||
|
||||
_shardsvrRecreateRangeDeletionTasks:
|
||||
description:
|
||||
"Internal command sent to a db primary shard to recreate range deletion documents
|
||||
for a collection, based on the current routing table."
|
||||
command_name: _shardsvrRecreateRangeDeletionTasks
|
||||
cpp_name: ShardSvrRecreateRangeDeletionTasks
|
||||
namespace: concatenate_with_db
|
||||
allow_global_collection_name: true
|
||||
api_version: ""
|
||||
reply_type: OkReply
|
||||
strict: false
|
||||
fields:
|
||||
skipEmptyRanges:
|
||||
description: "If true, only recreate range deletion document if needed"
|
||||
type: bool
|
||||
|
||||
_shardsvrRecreateRangeDeletionTasksParticipant:
|
||||
description:
|
||||
"Internal command sent to shards to recreate range deletion documents
|
||||
for a collection, based on the current routing table."
|
||||
command_name: _shardsvrRecreateRangeDeletionTasksParticipant
|
||||
cpp_name: ShardSvrRecreateRangeDeletionTasksParticipant
|
||||
namespace: concatenate_with_db
|
||||
allow_global_collection_name: true
|
||||
api_version: ""
|
||||
reply_type: OkReply
|
||||
strict: false
|
||||
fields:
|
||||
skipEmptyRanges:
|
||||
description: "If true, only recreate range deletion document if needed"
|
||||
type: bool
|
||||
uuid:
|
||||
type: uuid
|
||||
description: "UUID of the collection"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user