SERVER-121178 Implement split as a coordinator (#51941)
Co-authored-by: name <email> GitOrigin-RevId: e11798de6794de345490f8e6df0542bac52ff5de
This commit is contained in:
parent
51cc8205c2
commit
a987e53fda
@ -833,6 +833,18 @@ const internalCommandsMap = {
|
||||
phase: "complete",
|
||||
},
|
||||
},
|
||||
_shardsvrSplitChunk: {
|
||||
testname: "_shardsvrSplitChunk",
|
||||
command: {
|
||||
_shardsvrSplitChunk: "test.x",
|
||||
keyPattern: {x: 1},
|
||||
min: {x: MinKey},
|
||||
max: {x: MaxKey},
|
||||
splitKeys: [{x: 0}],
|
||||
from: "shard0000",
|
||||
epoch: ObjectId(),
|
||||
},
|
||||
},
|
||||
_shardsvrValidateShardKeyCandidate: {
|
||||
testname: "_shardsvrValidateShardKeyCandidate",
|
||||
command: {_shardsvrValidateShardKeyCandidate: "x.y", key: {a: 1}},
|
||||
|
||||
@ -7330,8 +7330,16 @@ export const authCommandsLib = {
|
||||
],
|
||||
},
|
||||
{
|
||||
testname: "splitChunk",
|
||||
command: {splitChunk: "test.x"},
|
||||
testname: "_shardsvrSplitChunk",
|
||||
command: {
|
||||
_shardsvrSplitChunk: "test.x",
|
||||
keyPattern: {a: 1},
|
||||
min: {a: MinKey},
|
||||
max: {a: MaxKey},
|
||||
splitKeys: [{a: 0}],
|
||||
from: "shard0000",
|
||||
epoch: ObjectId(),
|
||||
},
|
||||
skipSharded: true,
|
||||
testcases: [
|
||||
{
|
||||
|
||||
@ -51,7 +51,13 @@ export var ChunkHelper = (function () {
|
||||
|
||||
function splitChunkAt(db, collName, middle) {
|
||||
let cmd = {split: db[collName].getFullName(), middle: middle};
|
||||
return runCommandWithRetries(db, cmd, (res) => res.code === ErrorCodes.LockBusy);
|
||||
// TODO (SERVER-125033): Accept ConflictingOperationInProgress until multiple split chunk
|
||||
// commands can instantiate concurrent coordinators.
|
||||
return runCommandWithRetries(
|
||||
db,
|
||||
cmd,
|
||||
(res) => res.code === ErrorCodes.LockBusy || res.code === ErrorCodes.ConflictingOperationInProgress,
|
||||
);
|
||||
}
|
||||
|
||||
function splitChunkAtPoint(db, collName, splitPoint) {
|
||||
@ -60,7 +66,13 @@ export var ChunkHelper = (function () {
|
||||
|
||||
function splitChunkWithBounds(db, collName, bounds) {
|
||||
let cmd = {split: db[collName].getFullName(), bounds: bounds};
|
||||
return runCommandWithRetries(db, cmd, (res) => res.code === ErrorCodes.LockBusy);
|
||||
// TODO (SERVER-125033): Accept ConflictingOperationInProgress until multiple split chunk
|
||||
// commands can instantiate concurrent coordinators.
|
||||
return runCommandWithRetries(
|
||||
db,
|
||||
cmd,
|
||||
(res) => res.code === ErrorCodes.LockBusy || res.code === ErrorCodes.ConflictingOperationInProgress,
|
||||
);
|
||||
}
|
||||
|
||||
function moveChunk(db, collName, bounds, toShard, waitForDelete, secondaryThrottle) {
|
||||
|
||||
@ -222,6 +222,7 @@ let viewsCommandTests = {
|
||||
_shardsvrRunSearchIndexCommand: {skip: isAnInternalCommand},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
_shardsvrSplitChunk: {skip: isAnInternalCommand},
|
||||
_shardsvrUpgradeDowngradeViewlessTimeseries: {skip: isAnInternalCommand},
|
||||
_shardsvrTimeseriesUpgradeDowngradePrepare: {skip: isAnInternalCommand},
|
||||
_shardsvrTimeseriesUpgradeDowngradeCommit: {skip: isAnInternalCommand},
|
||||
@ -763,19 +764,7 @@ let viewsCommandTests = {
|
||||
isAdminCommand: true,
|
||||
},
|
||||
splitChunk: {
|
||||
command: {
|
||||
splitChunk: "test.view",
|
||||
from: "shard0000",
|
||||
min: {x: MinKey},
|
||||
max: {x: 0},
|
||||
keyPattern: {x: 1},
|
||||
splitKeys: [{x: -2}, {x: -1}],
|
||||
shardVersion: {t: Timestamp(1, 2), e: ObjectId(), v: Timestamp(1, 1)},
|
||||
},
|
||||
skipSharded: true,
|
||||
expectFailure: true,
|
||||
expectedErrorCode: ErrorCodes.ShardingStateNotInitialized,
|
||||
isAdminCommand: true,
|
||||
skip: isDeprecated,
|
||||
},
|
||||
splitVector: {
|
||||
command: {
|
||||
|
||||
@ -1,142 +0,0 @@
|
||||
/**
|
||||
* Tests that merge, split and move chunks via mongos works/doesn't work with different chunk
|
||||
* configurations
|
||||
*
|
||||
* @tags: [
|
||||
* requires_getmore,
|
||||
* assumes_balancer_off,
|
||||
* does_not_support_stepdowns,
|
||||
* # This test performs explicit calls to shardCollection
|
||||
* assumes_unsharded_collection,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {findChunksUtil} from "jstests/sharding/libs/find_chunks_util.js";
|
||||
|
||||
const dbName = db.getName();
|
||||
const admin = db.getSiblingDB("admin");
|
||||
const config = db.getSiblingDB("config");
|
||||
const collName = jsTestName();
|
||||
const ns = dbName + "." + collName;
|
||||
const coll = db.getCollection(collName);
|
||||
|
||||
const shardNames = db.adminCommand({listShards: 1}).shards.map((shard) => shard._id);
|
||||
|
||||
if (shardNames.length < 2) {
|
||||
print(jsTestName() + " will not run; at least 2 shards are required.");
|
||||
quit();
|
||||
}
|
||||
|
||||
print(jsTestName() + " is running on " + shardNames.length + " shards.");
|
||||
|
||||
assert.commandWorked(admin.runCommand({enableSharding: dbName}));
|
||||
assert.commandWorked(admin.runCommand({shardCollection: ns, key: {_id: 1}}));
|
||||
|
||||
// Make sure split is correctly disabled for unsharded collection
|
||||
jsTest.log("Trying to split an unsharded collection ...");
|
||||
const collNameUnsplittable = collName + "_unsplittable";
|
||||
const nsUnsplittable = dbName + "." + collNameUnsplittable;
|
||||
assert.commandWorked(db.runCommand({create: collNameUnsplittable}));
|
||||
assert.commandFailedWithCode(
|
||||
admin.runCommand({split: nsUnsplittable, middle: {_id: 0}}),
|
||||
ErrorCodes.NamespaceNotSharded,
|
||||
);
|
||||
jsTest.log("Trying to merge an unsharded collection ...");
|
||||
assert.commandFailedWithCode(
|
||||
admin.runCommand({mergeChunks: nsUnsplittable, bounds: [{_id: 90}, {_id: MaxKey}]}),
|
||||
ErrorCodes.NamespaceNotSharded,
|
||||
);
|
||||
db.getCollection(collNameUnsplittable).drop();
|
||||
|
||||
// Create ranges MIN->0,0->10,(hole),20->40,40->50,50->90,(hole),100->110,110->MAX on first
|
||||
// shard
|
||||
jsTest.log("Creating ranges...");
|
||||
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 0}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 10}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 20}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 40}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 50}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 90}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 100}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 110}}));
|
||||
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 10}, to: shardNames[1]}));
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 90}, to: shardNames[1]}));
|
||||
|
||||
// Insert some data into each of the consolidated ranges
|
||||
let numDocs = 0;
|
||||
const bulk = coll.initializeUnorderedBulkOp();
|
||||
for (let i = 0; i <= 120; i++) {
|
||||
bulk.insert({_id: i});
|
||||
numDocs++;
|
||||
}
|
||||
assert.commandWorked(bulk.execute({w: "majority"}));
|
||||
|
||||
// S0: min->0, 0->10, 20->40, 40->50, 50->90, 100->110, 110->max
|
||||
// S1: 10->20, 90->100
|
||||
assert.eq(9, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
|
||||
jsTest.log("Trying merges that should succeed...");
|
||||
|
||||
// Make sure merge including the MinKey works
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: MinKey}, {_id: 10}]}));
|
||||
assert.eq(8, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
// S0: min->10, 20->40, 40->50, 50->90, 100->110, 110->max
|
||||
// S1: 10->20, 90->100
|
||||
|
||||
// Make sure merging three chunks in the middle works
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 20}, {_id: 90}]}));
|
||||
assert.eq(6, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 20->90, 100->110, 110->max
|
||||
// S1: 10->20, 90->100
|
||||
|
||||
// Make sure splitting chunks after merging works
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 55}}));
|
||||
assert.eq(7, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 20->55, 55->90, 100->110, 110->max
|
||||
// S1: 10->20, 90->100
|
||||
|
||||
// make sure moving the new chunk works
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 20}, to: shardNames[1]}));
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 55}, to: shardNames[1]}));
|
||||
assert.eq(7, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 100->110, 110->max
|
||||
// S1: 10->20, 20->55, 55->90, 90->100
|
||||
|
||||
// Make sure merge including the MaxKey works
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 100}, {_id: MaxKey}]}));
|
||||
assert.eq(6, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
// S0: min->10, 100->max
|
||||
// S1: 10->20, 20->55, 55->90, 90->100
|
||||
|
||||
// Make sure merging chunks after a chunk has been moved out of a shard succeeds
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 110}, to: shardNames[1]}));
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 10}, to: shardNames[0]}));
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
assert.eq(6, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
// S0: min->10, 10->20
|
||||
// S1: 20->55, 55->90, 90->100, 100->max
|
||||
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 90}, {_id: MaxKey}]}));
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 20}, {_id: 90}]}));
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 10->20
|
||||
// S1: 20->90, 90->max
|
||||
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 15}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 30}}));
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 30}, to: shardNames[0]}));
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 10->15, 15->20, 30->90
|
||||
// S1: 20->30, 90->max
|
||||
|
||||
// range has a hole on shard 0
|
||||
assert.commandFailed(admin.runCommand({mergeChunks: ns, bounds: [{_id: MinKey}, {_id: 90}]}));
|
||||
|
||||
assert.eq(6, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
|
||||
coll.drop();
|
||||
11
jstests/core_sharding/chunk_operations/OWNERS.yml
Normal file
11
jstests/core_sharding/chunk_operations/OWNERS.yml
Normal file
@ -0,0 +1,11 @@
|
||||
version: 2.0.0
|
||||
filters:
|
||||
- "*":
|
||||
approvers:
|
||||
- 10gen/server-cluster-scalability
|
||||
- "*merge*":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-ddl
|
||||
- "*split*":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-ddl
|
||||
202
jstests/core_sharding/chunk_operations/merge_split_chunks.js
Normal file
202
jstests/core_sharding/chunk_operations/merge_split_chunks.js
Normal file
@ -0,0 +1,202 @@
|
||||
/**
|
||||
* Tests that merge, split and move chunks via mongos works/doesn't work with different chunk
|
||||
* configurations. Covers the three split modes (middle, find, bounds), merge across various
|
||||
* chunk layouts, and move of newly split chunks.
|
||||
*
|
||||
* @tags: [
|
||||
* requires_getmore,
|
||||
* assumes_balancer_off,
|
||||
* does_not_support_stepdowns,
|
||||
* # This test performs explicit calls to shardCollection
|
||||
* assumes_unsharded_collection,
|
||||
* requires_2_or_more_shards,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {after, before, describe, it} from "jstests/libs/mochalite.js";
|
||||
import {findChunksUtil} from "jstests/sharding/libs/find_chunks_util.js";
|
||||
|
||||
describe("merge, split, and move chunks", function () {
|
||||
const dbName = db.getName();
|
||||
const admin = db.getSiblingDB("admin");
|
||||
const config = db.getSiblingDB("config");
|
||||
const collName = jsTestName();
|
||||
const ns = dbName + "." + collName;
|
||||
const coll = db.getCollection(collName);
|
||||
const shardNames = db.adminCommand({listShards: 1}).shards.map((shard) => shard._id);
|
||||
let numDocs;
|
||||
|
||||
before(function () {
|
||||
assert.commandWorked(admin.runCommand({enableSharding: dbName}));
|
||||
assert.commandWorked(admin.runCommand({shardCollection: ns, key: {_id: 1}}));
|
||||
|
||||
// Create ranges MIN->0, 0->10, (hole), 20->40, 40->50, 50->90, (hole),
|
||||
// 100->110, 110->MAX on shard 0, with chunks [10->20) and [90->100) on shard 1.
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 0}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 10}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 20}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 40}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 50}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 90}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 100}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 110}}));
|
||||
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 10}, to: shardNames[1]}));
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 90}, to: shardNames[1]}));
|
||||
|
||||
numDocs = 0;
|
||||
const bulk = coll.initializeUnorderedBulkOp();
|
||||
for (let i = 0; i <= 120; i++) {
|
||||
bulk.insert({_id: i});
|
||||
numDocs++;
|
||||
}
|
||||
assert.commandWorked(bulk.execute({w: "majority"}));
|
||||
|
||||
// S0: min->0, 0->10, 20->40, 40->50, 50->90, 100->110, 110->max
|
||||
// S1: 10->20, 90->100
|
||||
assert.eq(9, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
});
|
||||
|
||||
after(function () {
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
describe("operations on unsharded collections", function () {
|
||||
it("rejects split on unsharded collection", function () {
|
||||
const unsplittableName = collName + "_unsplittable";
|
||||
const unsplittableNs = dbName + "." + unsplittableName;
|
||||
assert.commandWorked(db.runCommand({create: unsplittableName}));
|
||||
assert.commandFailedWithCode(
|
||||
admin.runCommand({split: unsplittableNs, middle: {_id: 0}}),
|
||||
ErrorCodes.NamespaceNotSharded,
|
||||
);
|
||||
db.getCollection(unsplittableName).drop();
|
||||
});
|
||||
|
||||
it("rejects merge on unsharded collection", function () {
|
||||
const unsplittableName = collName + "_unsplittable_merge";
|
||||
const unsplittableNs = dbName + "." + unsplittableName;
|
||||
assert.commandWorked(db.runCommand({create: unsplittableName}));
|
||||
assert.commandFailedWithCode(
|
||||
admin.runCommand({mergeChunks: unsplittableNs, bounds: [{_id: 90}, {_id: MaxKey}]}),
|
||||
ErrorCodes.NamespaceNotSharded,
|
||||
);
|
||||
db.getCollection(unsplittableName).drop();
|
||||
});
|
||||
});
|
||||
|
||||
// The tests in this block are sequential: each builds on the chunk layout left by the
|
||||
// previous test. mochalite runs tests serially within a describe, so this is safe.
|
||||
describe("chunk merge and split workflows", function () {
|
||||
it("merges chunks including MinKey", function () {
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: MinKey}, {_id: 10}]}));
|
||||
assert.eq(8, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
// S0: min->10, 20->40, 40->50, 50->90, 100->110, 110->max
|
||||
// S1: 10->20, 90->100
|
||||
});
|
||||
|
||||
it("merges three adjacent chunks in the middle", function () {
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 20}, {_id: 90}]}));
|
||||
assert.eq(6, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 20->90, 100->110, 110->max
|
||||
// S1: 10->20, 90->100
|
||||
});
|
||||
|
||||
it("splits a merged chunk with middle", function () {
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 55}}));
|
||||
assert.eq(7, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 20->55, 55->90, 100->110, 110->max
|
||||
// S1: 10->20, 90->100
|
||||
});
|
||||
|
||||
it("moves newly split chunks to another shard", function () {
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 20}, to: shardNames[1]}));
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 55}, to: shardNames[1]}));
|
||||
assert.eq(7, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 100->110, 110->max
|
||||
// S1: 10->20, 20->55, 55->90, 90->100
|
||||
});
|
||||
|
||||
it("merges chunks including MaxKey", function () {
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 100}, {_id: MaxKey}]}));
|
||||
assert.eq(6, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
// S0: min->10, 100->max
|
||||
// S1: 10->20, 20->55, 55->90, 90->100
|
||||
});
|
||||
|
||||
it("merges chunks after a chunk has been moved out of a shard", function () {
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 110}, to: shardNames[1]}));
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 10}, to: shardNames[0]}));
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
assert.eq(6, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
// S0: min->10, 10->20
|
||||
// S1: 20->55, 55->90, 90->100, 100->max
|
||||
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 90}, {_id: MaxKey}]}));
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 20}, {_id: 90}]}));
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 10->20
|
||||
// S1: 20->90, 90->max
|
||||
});
|
||||
|
||||
it("splits after merge then moves across shards", function () {
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 15}}));
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 30}}));
|
||||
assert.commandWorked(admin.runCommand({moveChunk: ns, find: {_id: 30}, to: shardNames[0]}));
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
// S0: min->10, 10->15, 15->20, 30->90
|
||||
// S1: 20->30, 90->max
|
||||
});
|
||||
|
||||
it("rejects merge across a hole in chunk ranges", function () {
|
||||
// S0 has chunks min->10, 10->15, 15->20, 30->90 with a hole at [20, 30)
|
||||
assert.commandFailed(admin.runCommand({mergeChunks: ns, bounds: [{_id: MinKey}, {_id: 90}]}));
|
||||
assert.eq(6, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
});
|
||||
|
||||
// The following tests exercise split modes not covered by the workflow above.
|
||||
// State at this point:
|
||||
// S0: min->10, 10->15, 15->20, 30->90
|
||||
// S1: 20->30, 90->max
|
||||
|
||||
it("splits using find mode", function () {
|
||||
// Split the [30, 90) chunk on S0 via find (which uses selectMedianKey /
|
||||
// splitVector with force:true to auto-select a split point).
|
||||
const chunksBefore = findChunksUtil.findChunksByNs(config, ns).itcount();
|
||||
assert.commandWorked(admin.runCommand({split: ns, find: {_id: 50}}));
|
||||
assert.gt(findChunksUtil.findChunksByNs(config, ns).itcount(), chunksBefore);
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
});
|
||||
|
||||
it("splits using bounds mode", function () {
|
||||
// Split the [90, MaxKey) chunk on S1 via bounds (which auto-selects a split
|
||||
// point using splitVector on the target chunk).
|
||||
const targetChunk = findChunksUtil.findOneChunkByNs(config, ns, {min: {_id: 90}});
|
||||
assert.neq(null, targetChunk);
|
||||
const chunksBefore = findChunksUtil.findChunksByNs(config, ns).itcount();
|
||||
|
||||
assert.commandWorked(admin.runCommand({split: ns, bounds: [targetChunk.min, targetChunk.max]}));
|
||||
assert.gt(findChunksUtil.findChunksByNs(config, ns).itcount(), chunksBefore);
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
});
|
||||
|
||||
it("re-splits at the same point after merge", function () {
|
||||
// Split [10, 15) at _id: 12, merge the halves back, then split at 12 again.
|
||||
// Confirms the split/merge/split round-trip works.
|
||||
const chunksBefore = findChunksUtil.findChunksByNs(config, ns).itcount();
|
||||
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 12}}));
|
||||
assert.eq(chunksBefore + 1, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
|
||||
assert.commandWorked(admin.runCommand({mergeChunks: ns, bounds: [{_id: 10}, {_id: 15}]}));
|
||||
assert.eq(chunksBefore, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
|
||||
assert.commandWorked(admin.runCommand({split: ns, middle: {_id: 12}}));
|
||||
assert.eq(chunksBefore + 1, findChunksUtil.findChunksByNs(config, ns).itcount());
|
||||
assert.eq(numDocs, coll.find().itcount());
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -0,0 +1,131 @@
|
||||
/**
|
||||
* Tests the split command's parameter validation on mongos. Covers mutually exclusive parameter
|
||||
* combinations, missing required parameters, malformed bounds arrays, and partial shard keys
|
||||
* on compound-key collections. These validation paths live in cluster_split_cmd.cpp's
|
||||
* errmsgRun() and are not exercised by the other split tests (basic_split.js, etc.).
|
||||
*
|
||||
* @tags: [
|
||||
* assumes_balancer_off,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {after, before, describe, it} from "jstests/libs/mochalite.js";
|
||||
|
||||
describe("split command parameter validation", function () {
|
||||
const dbName = db.getName();
|
||||
const admin = db.getSiblingDB("admin");
|
||||
|
||||
const simpleCollName = jsTestName() + "_simple";
|
||||
const simpleNs = dbName + "." + simpleCollName;
|
||||
|
||||
const compoundCollName = jsTestName() + "_compound";
|
||||
const compoundNs = dbName + "." + compoundCollName;
|
||||
|
||||
before(function () {
|
||||
assert.commandWorked(admin.runCommand({shardCollection: simpleNs, key: {_id: 1}}));
|
||||
assert.commandWorked(admin.runCommand({shardCollection: compoundNs, key: {x: 1, y: 1}}));
|
||||
});
|
||||
|
||||
after(function () {
|
||||
db.getCollection(simpleCollName).drop();
|
||||
db.getCollection(compoundCollName).drop();
|
||||
});
|
||||
|
||||
describe("mutually exclusive parameters", function () {
|
||||
it("rejects find and bounds together", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: simpleNs,
|
||||
find: {_id: 0},
|
||||
bounds: [{_id: MinKey}, {_id: MaxKey}],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects find and middle together", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: simpleNs,
|
||||
find: {_id: 0},
|
||||
middle: {_id: 0},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects bounds and middle together", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: simpleNs,
|
||||
bounds: [{_id: MinKey}, {_id: MaxKey}],
|
||||
middle: {_id: 0},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects all three parameters together", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: simpleNs,
|
||||
find: {_id: 0},
|
||||
bounds: [{_id: MinKey}, {_id: MaxKey}],
|
||||
middle: {_id: 0},
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("missing parameters", function () {
|
||||
it("rejects split with no find, bounds, or middle", function () {
|
||||
assert.commandFailed(admin.runCommand({split: simpleNs}));
|
||||
});
|
||||
});
|
||||
|
||||
describe("incomplete bounds", function () {
|
||||
it("rejects bounds with only lower bound", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: simpleNs,
|
||||
bounds: [{_id: MinKey}],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects bounds with empty array", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: simpleNs,
|
||||
bounds: [],
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("partial shard key on compound key collection", function () {
|
||||
it("rejects middle with partial compound shard key", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: compoundNs,
|
||||
middle: {x: 0},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects find with partial compound shard key", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: compoundNs,
|
||||
find: {x: 0},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects bounds with partial compound shard key", function () {
|
||||
assert.commandFailed(
|
||||
admin.runCommand({
|
||||
split: compoundNs,
|
||||
bounds: [{x: MinKey}, {x: MaxKey}],
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -210,6 +210,7 @@ const wcCommandsTests = {
|
||||
_shardsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_shardsvrSetClusterParameter: {skip: "internal command"},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
_shardsvrSplitChunk: {skip: "internal command"},
|
||||
_shardsvrValidateShardKeyCandidate: {skip: "internal command"},
|
||||
_shardsvrCollMod: {skip: "internal command"},
|
||||
_shardsvrCollModParticipant: {skip: "internal command"},
|
||||
@ -2987,7 +2988,6 @@ const wcCommandsTests = {
|
||||
shutdown: {skip: "does not accept write concern"},
|
||||
sleep: {skip: "does not accept write concern"},
|
||||
split: {skip: "does not accept write concern"},
|
||||
splitChunk: {skip: "does not accept write concern"},
|
||||
splitVector: {skip: "internal command"},
|
||||
stageDebug: {skip: "does not accept write concern"},
|
||||
startSession: {skip: "does not accept write concern"},
|
||||
@ -3436,6 +3436,7 @@ const wcTimeseriesCommandsTests = {
|
||||
_shardsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_shardsvrSetClusterParameter: {skip: "internal command"},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
_shardsvrSplitChunk: {skip: "internal command"},
|
||||
_shardsvrValidateShardKeyCandidate: {skip: "internal command"},
|
||||
_shardsvrCollMod: {skip: "internal command"},
|
||||
_shardsvrCollModParticipant: {skip: "internal command"},
|
||||
@ -4378,7 +4379,6 @@ const wcTimeseriesCommandsTests = {
|
||||
shutdown: {skip: "does not accept write concern"},
|
||||
sleep: {skip: "does not accept write concern"},
|
||||
split: {skip: "does not accept write concern"},
|
||||
splitChunk: {skip: "does not accept write concern"},
|
||||
splitVector: {skip: "internal command"},
|
||||
stageDebug: {skip: "does not accept write concern"},
|
||||
startSession: {skip: "does not accept write concern"},
|
||||
|
||||
@ -159,6 +159,7 @@ const allCommands = {
|
||||
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
_shardsvrSplitChunk: {skip: isAnInternalCommand},
|
||||
_shardsvrValidateShardKeyCandidate: {skip: isAnInternalCommand},
|
||||
_shardsvrCollMod: {skip: isAnInternalCommand},
|
||||
_shardsvrCollModParticipant: {skip: isAnInternalCommand},
|
||||
@ -1602,7 +1603,6 @@ const allCommands = {
|
||||
isAdminCommand: true,
|
||||
isShardedOnly: true,
|
||||
},
|
||||
splitChunk: {skip: isAnInternalCommand},
|
||||
splitVector: {skip: isAnInternalCommand},
|
||||
startRecordingTraffic: {
|
||||
skip: "Renamed to startTrafficRecording",
|
||||
|
||||
@ -145,6 +145,7 @@ const allCommands = {
|
||||
_shardsvrSetAllowMigrations: {skip: isPrimaryOnly},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isPrimaryOnly},
|
||||
_shardsvrSplitChunk: {skip: isPrimaryOnly},
|
||||
_shardsvrValidateShardKeyCandidate: {skip: isPrimaryOnly},
|
||||
_shardsvrCoordinateMultiUpdate: {skip: isAnInternalCommand},
|
||||
_shardsvrCollMod: {skip: isPrimaryOnly},
|
||||
@ -448,7 +449,6 @@ const allCommands = {
|
||||
shardingState: {skip: isNotAUserDataRead},
|
||||
shutdown: {skip: isNotAUserDataRead},
|
||||
sleep: {skip: isNotAUserDataRead},
|
||||
splitChunk: {skip: isPrimaryOnly},
|
||||
splitVector: {skip: isPrimaryOnly},
|
||||
startRecordingTraffic: {skip: "Renamed to startTrafficRecording"},
|
||||
stopRecordingTraffic: {skip: "Renamed to stopTrafficRecording"},
|
||||
|
||||
@ -154,6 +154,7 @@ const allCommands = {
|
||||
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
_shardsvrSplitChunk: {skip: isAnInternalCommand},
|
||||
_shardsvrValidateShardKeyCandidate: {skip: isAnInternalCommand},
|
||||
_shardsvrCollMod: {skip: isAnInternalCommand},
|
||||
_shardsvrCollModParticipant: {skip: isAnInternalCommand},
|
||||
@ -1188,7 +1189,6 @@ const allCommands = {
|
||||
shutdown: {skip: "requires changes to shards"},
|
||||
sleep: {skip: isAnInternalCommand},
|
||||
split: {skip: requiresMongoS},
|
||||
splitChunk: {skip: isAnInternalCommand},
|
||||
splitVector: {skip: isAnInternalCommand},
|
||||
startRecordingTraffic: {skip: "Renamed to startTrafficRecording"},
|
||||
stopRecordingTraffic: {skip: "Renamed to stopTrafficRecording"},
|
||||
|
||||
@ -2,6 +2,10 @@
|
||||
* Specifies for each command whether it is expected to send a databaseVersion, and verifies that
|
||||
* the commands match the specification.
|
||||
*
|
||||
* Each command is executed against two different scenarios: after movePrimary, and after
|
||||
* dropDatabase + recreate on a different primary shard; to verify that the command behaves correctly
|
||||
* when run with a stale dbVersion.
|
||||
*
|
||||
* Each command must have exactly one corresponding test defined. Each defined test case must
|
||||
* correspond to an existing command. The allowable fields for the test cases are as follows:
|
||||
*
|
||||
@ -912,7 +916,19 @@ const allTestCases = {
|
||||
shardCollection: {skip: "does not forward command to primary shard"},
|
||||
shardDrainingStatus: {skip: "not on a user database"},
|
||||
shutdown: {skip: "does not forward command to primary shard"},
|
||||
split: {skip: "does not forward command to primary shard"},
|
||||
split: {
|
||||
run: {
|
||||
sendsDbVersion: false,
|
||||
runsAgainstAdminDb: true,
|
||||
command: function (dbName, collName) {
|
||||
return {
|
||||
split: dbName + "." + collName,
|
||||
middle: {_id: 0},
|
||||
};
|
||||
},
|
||||
expectedFailureCode: ErrorCodes.NamespaceNotSharded,
|
||||
},
|
||||
},
|
||||
splitVector: {skip: "does not forward command to primary shard"},
|
||||
getTrafficRecordingStatus: {skip: "executes locally on targeted node"},
|
||||
startRecordingTraffic: {skip: "Renamed to startTrafficRecording"},
|
||||
@ -1213,6 +1229,23 @@ const allTestCases = {
|
||||
_shardsvrSetAllowMigrations: {skip: "TODO"},
|
||||
_shardsvrSetClusterParameter: {skip: "TODO"},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: "TODO"},
|
||||
_shardsvrSplitChunk: {
|
||||
run: {
|
||||
runsAgainstAdminDb: true,
|
||||
command: function (dbName, collName) {
|
||||
return {
|
||||
_shardsvrSplitChunk: dbName + "." + collName,
|
||||
keyPattern: {_id: 1},
|
||||
min: {_id: MinKey},
|
||||
max: {_id: MaxKey},
|
||||
splitKeys: [{_id: 0}],
|
||||
from: "shard0",
|
||||
epoch: ObjectId(),
|
||||
};
|
||||
},
|
||||
expectedFailureCode: ErrorCodes.StaleConfig,
|
||||
},
|
||||
},
|
||||
_shardsvrUpgradeDowngradeViewlessTimeseries: {skip: "internal command"},
|
||||
_shardsvrTimeseriesUpgradeDowngradePrepare: {skip: "internal command"},
|
||||
_shardsvrTimeseriesUpgradeDowngradeCommit: {skip: "internal command"},
|
||||
@ -1397,7 +1430,7 @@ const allTestCases = {
|
||||
shardingState: {skip: "TODO"},
|
||||
shutdown: {skip: "TODO"},
|
||||
sleep: {skip: "TODO"},
|
||||
splitChunk: {skip: "TODO"},
|
||||
splitChunk: {skip: "is deprecated", conditional: true},
|
||||
splitVector: {skip: "TODO"},
|
||||
startSession: {skip: "TODO"},
|
||||
startTrafficRecording: {skip: "TODO"},
|
||||
|
||||
@ -83,4 +83,5 @@ export const commandsAddedToMongodSinceLastLTS = [
|
||||
"_shardsvrCommitCreateCollectionMetadata",
|
||||
"_internalClearCollectionShardingMetadata",
|
||||
"_shardsvrCommitDropCollectionMetadata",
|
||||
"_shardsvrSplitChunk",
|
||||
];
|
||||
|
||||
@ -1665,8 +1665,9 @@ export let MongosAPIParametersUtil = (function () {
|
||||
commandName: "split",
|
||||
run: {
|
||||
inAPIVersion1: false,
|
||||
configServerCommandName: "_configsvrCommitChunkSplit",
|
||||
shardCommandName: "splitChunk",
|
||||
// TODO (SERVER-108802): Re-enable this test case after the api version is propagated to the config server.
|
||||
// configServerCommandName: "_configsvrCommitChunkSplit",
|
||||
shardCommandName: "_shardsvrSplitChunk",
|
||||
runsAgainstAdminDb: true,
|
||||
permittedInTxn: false,
|
||||
requiresShardedCollection: true,
|
||||
|
||||
@ -212,6 +212,7 @@ let testCases = {
|
||||
_shardsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_shardsvrSetClusterParameter: {skip: "internal command"},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
_shardsvrSplitChunk: {skip: "internal command"},
|
||||
_shardsvrValidateShardKeyCandidate: {skip: "internal command"},
|
||||
_shardsvrCollMod: {skip: "internal command"},
|
||||
_shardsvrCollModParticipant: {skip: "internal command"},
|
||||
|
||||
@ -97,6 +97,7 @@ let testCases = {
|
||||
_shardsvrMovePrimaryEnterCriticalSection: {skip: "primary only"},
|
||||
_shardsvrMovePrimaryExitCriticalSection: {skip: "primary only"},
|
||||
_shardsvrMoveRange: {skip: "primary only"},
|
||||
_shardsvrSplitChunk: {skip: "primary only"},
|
||||
_flushShardRegistry: {skip: "internal command"},
|
||||
_recvChunkAbort: {skip: "primary only"},
|
||||
_recvChunkCommit: {skip: "primary only"},
|
||||
@ -391,7 +392,6 @@ let testCases = {
|
||||
shutdown: {skip: "does not return user data"},
|
||||
sleep: {skip: "does not return user data"},
|
||||
split: {skip: "primary only"},
|
||||
splitChunk: {skip: "primary only"},
|
||||
splitVector: {skip: "primary only"},
|
||||
startRecordingTraffic: {skip: "Renamed to startTrafficRecording"},
|
||||
stopRecordingTraffic: {skip: "Renamed to stopTrafficRecording"},
|
||||
|
||||
@ -113,6 +113,7 @@ let testCases = {
|
||||
_shardsvrMovePrimaryEnterCriticalSection: {skip: "primary only"},
|
||||
_shardsvrMovePrimaryExitCriticalSection: {skip: "primary only"},
|
||||
_shardsvrMoveRange: {skip: "primary only"},
|
||||
_shardsvrSplitChunk: {skip: "primary only"},
|
||||
_flushShardRegistry: {skip: "internal command"},
|
||||
_recvChunkAbort: {skip: "primary only"},
|
||||
_recvChunkCommit: {skip: "primary only"},
|
||||
@ -492,7 +493,6 @@ let testCases = {
|
||||
shutdown: {skip: "does not return user data"},
|
||||
sleep: {skip: "does not return user data"},
|
||||
split: {skip: "primary only"},
|
||||
splitChunk: {skip: "primary only"},
|
||||
splitVector: {skip: "primary only"},
|
||||
startRecordingTraffic: {skip: "Renamed to startTrafficRecording"},
|
||||
stopRecordingTraffic: {skip: "Renamed to stopTrafficRecording"},
|
||||
|
||||
@ -104,6 +104,7 @@ let testCases = {
|
||||
_shardsvrMovePrimaryEnterCriticalSection: {skip: "primary only"},
|
||||
_shardsvrMovePrimaryExitCriticalSection: {skip: "primary only"},
|
||||
_shardsvrMoveRange: {skip: "primary only"},
|
||||
_shardsvrSplitChunk: {skip: "primary only"},
|
||||
_flushShardRegistry: {skip: "internal command"},
|
||||
_recvChunkAbort: {skip: "primary only"},
|
||||
_recvChunkCommit: {skip: "primary only"},
|
||||
@ -407,7 +408,6 @@ let testCases = {
|
||||
shutdown: {skip: "does not return user data"},
|
||||
sleep: {skip: "does not return user data"},
|
||||
split: {skip: "primary only"},
|
||||
splitChunk: {skip: "primary only"},
|
||||
splitVector: {skip: "primary only"},
|
||||
startRecordingTraffic: {skip: "Renamed to startTrafficRecording"},
|
||||
stopRecordingTraffic: {skip: "Renamed to stopTrafficRecording"},
|
||||
|
||||
@ -590,6 +590,16 @@ idl_generator(
|
||||
],
|
||||
)
|
||||
|
||||
idl_generator(
|
||||
name = "split_chunk_coordinator_document_gen",
|
||||
src = "split_chunk_coordinator_document.idl",
|
||||
deps = [
|
||||
"//src/mongo/db:basic_types_gen",
|
||||
"//src/mongo/db/global_catalog/ddl:sharded_ddl_commands_gen",
|
||||
"//src/mongo/db/global_catalog/ddl:sharding_coordinator_gen",
|
||||
],
|
||||
)
|
||||
|
||||
idl_generator(
|
||||
name = "test_chunk_operation_sharding_coordinator_document_gen",
|
||||
src = "test_chunk_operation_sharding_coordinator_document.idl",
|
||||
|
||||
@ -31,6 +31,7 @@
|
||||
|
||||
#include "mongo/db/global_catalog/ddl/merge_chunks_coordinator.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharding_coordinator_external_state_for_test.h"
|
||||
#include "mongo/db/global_catalog/ddl/split_chunk_coordinator.h"
|
||||
#include "mongo/db/global_catalog/ddl/test_chunk_operation_sharding_coordinator_document_gen.h"
|
||||
#include "mongo/db/repl/primary_only_service_test_fixture.h"
|
||||
#include "mongo/db/shard_role/lock_manager/locker.h"
|
||||
@ -121,6 +122,26 @@ protected:
|
||||
return doc;
|
||||
}
|
||||
|
||||
SplitChunkCoordinatorDocument makeSplitChunkCoordinatorDoc(std::vector<BSONObj> splitKeys,
|
||||
OID epoch) {
|
||||
SplitChunkCoordinatorDocument doc;
|
||||
ShardsvrSplitChunkRequest req;
|
||||
req.setKeyPattern(BSON("x" << 1));
|
||||
req.setMin(BSON("x" << 0));
|
||||
req.setMax(BSON("x" << 100));
|
||||
req.setSplitKeys(std::move(splitKeys));
|
||||
req.setFrom("shard0000");
|
||||
req.setEpoch(epoch);
|
||||
ShardingCoordinatorMetadata metadata{
|
||||
{NamespaceString::createNamespaceString_forTest("test.split"),
|
||||
CoordinatorTypeEnum::kSplitChunk}};
|
||||
ForwardableOperationMetadata forwardableOpMetadata(_opCtx);
|
||||
metadata.setForwardableOpMetadata(forwardableOpMetadata);
|
||||
doc.setShardingCoordinatorMetadata(std::move(metadata));
|
||||
doc.setShardsvrSplitChunkRequest(req);
|
||||
return doc;
|
||||
}
|
||||
|
||||
class TestChunkOperationShardingCoordinator
|
||||
: public ChunkOperationShardingCoordinator<TestChunkOperationShardingCoordinatorDocument> {
|
||||
public:
|
||||
@ -192,6 +213,22 @@ TEST_F(ChunkOperationShardingCoordinatorTest, MergeChunksCheckIfOptionsConflictS
|
||||
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
|
||||
}
|
||||
|
||||
TEST_F(ChunkOperationShardingCoordinatorTest, SplitChunkCheckIfOptionsConflictSameParams) {
|
||||
auto epoch = OID::gen();
|
||||
std::vector<BSONObj> splitKeys = {BSON("x" << 50)};
|
||||
auto coordinatorDoc = makeSplitChunkCoordinatorDoc(splitKeys, epoch);
|
||||
|
||||
auto coordinator = std::make_shared<SplitChunkCoordinator>(
|
||||
static_cast<ShardingCoordinatorService*>(_service), coordinatorDoc.toBSON());
|
||||
|
||||
// Same parameters — should not throw.
|
||||
ASSERT_DOES_NOT_THROW(coordinator->checkIfOptionsConflict(coordinatorDoc.toBSON()));
|
||||
|
||||
// Satisfy destructor invariants by resolving internal promises.
|
||||
static_cast<repl::PrimaryOnlyService::Instance*>(coordinator.get())
|
||||
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
|
||||
}
|
||||
|
||||
TEST_F(ChunkOperationShardingCoordinatorTest, MergeChunksCheckIfOptionsConflictDifferentBounds) {
|
||||
auto epoch = OID::gen();
|
||||
std::vector<BSONObj> bounds = {BSON("a" << 1), BSON("a" << 10)};
|
||||
@ -212,6 +249,26 @@ TEST_F(ChunkOperationShardingCoordinatorTest, MergeChunksCheckIfOptionsConflictD
|
||||
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
|
||||
}
|
||||
|
||||
TEST_F(ChunkOperationShardingCoordinatorTest, SplitChunkCheckIfOptionsConflictDifferentSplitKeys) {
|
||||
auto epoch = OID::gen();
|
||||
std::vector<BSONObj> splitKeys = {BSON("x" << 50)};
|
||||
auto coordinatorDoc = makeSplitChunkCoordinatorDoc(splitKeys, epoch);
|
||||
|
||||
auto coordinator = std::make_shared<SplitChunkCoordinator>(
|
||||
static_cast<ShardingCoordinatorService*>(_service), coordinatorDoc.toBSON());
|
||||
|
||||
// Different split keys — should throw.
|
||||
std::vector<BSONObj> differentSplitKeys = {BSON("x" << 60)};
|
||||
auto otherDoc = makeSplitChunkCoordinatorDoc(differentSplitKeys, epoch);
|
||||
|
||||
ASSERT_THROWS_CODE(coordinator->checkIfOptionsConflict(otherDoc.toBSON()),
|
||||
DBException,
|
||||
ErrorCodes::ConflictingOperationInProgress);
|
||||
|
||||
static_cast<repl::PrimaryOnlyService::Instance*>(coordinator.get())
|
||||
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
|
||||
}
|
||||
|
||||
TEST_F(ChunkOperationShardingCoordinatorTest, MergeChunksCheckIfOptionsConflictDifferentEpoch) {
|
||||
auto epoch = OID::gen();
|
||||
std::vector<BSONObj> bounds = {BSON("a" << 1), BSON("a" << 10)};
|
||||
@ -232,4 +289,24 @@ TEST_F(ChunkOperationShardingCoordinatorTest, MergeChunksCheckIfOptionsConflictD
|
||||
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
|
||||
}
|
||||
|
||||
TEST_F(ChunkOperationShardingCoordinatorTest, SplitChunkAppendCommandInfoIncludesRequestFields) {
|
||||
auto epoch = OID::gen();
|
||||
std::vector<BSONObj> splitKeys = {BSON("x" << 50), BSON("x" << 75)};
|
||||
auto coordinatorDoc = makeSplitChunkCoordinatorDoc(splitKeys, epoch);
|
||||
|
||||
auto coordinator = std::make_shared<SplitChunkCoordinator>(
|
||||
static_cast<ShardingCoordinatorService*>(_service), coordinatorDoc.toBSON());
|
||||
|
||||
BSONObjBuilder cmdInfoBuilder;
|
||||
coordinator->appendCommandInfo(&cmdInfoBuilder);
|
||||
auto cmdInfo = cmdInfoBuilder.obj();
|
||||
ASSERT_BSONOBJ_EQ(cmdInfo.getObjectField("min"), BSON("x" << 0));
|
||||
ASSERT_BSONOBJ_EQ(cmdInfo.getObjectField("max"), BSON("x" << 100));
|
||||
ASSERT_EQ(cmdInfo.getStringField("from"), "shard0000");
|
||||
ASSERT_EQ(cmdInfo.getField("splitKeys").Array().size(), 2u);
|
||||
|
||||
static_cast<repl::PrimaryOnlyService::Instance*>(coordinator.get())
|
||||
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -41,6 +41,7 @@
|
||||
#include "mongo/bson/simple_bsonobj_comparator.h"
|
||||
#include "mongo/bson/util/bson_extract.h"
|
||||
#include "mongo/client/read_preference.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/db/global_catalog/shard_key_pattern.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/namespace_string_util.h"
|
||||
@ -182,18 +183,17 @@ Status splitChunkAtMultiplePoints(OperationContext* opCtx,
|
||||
return {ErrorCodes::CannotSplit, msg};
|
||||
}
|
||||
|
||||
BSONObjBuilder cmd;
|
||||
cmd.append("splitChunk",
|
||||
NamespaceStringUtil::serialize(nss, SerializationContext::stateDefault()));
|
||||
cmd.append("from", shardId.toString());
|
||||
cmd.append("keyPattern", shardKeyPattern.toBSON());
|
||||
cmd.append("epoch", epoch);
|
||||
cmd.append("timestamp", timestamp);
|
||||
ShardsvrSplitChunk req(nss);
|
||||
req.setDbName(DatabaseName::kAdmin);
|
||||
req.setKeyPattern(shardKeyPattern.toBSON());
|
||||
req.setMin(chunkRange.getMin());
|
||||
req.setMax(chunkRange.getMax());
|
||||
req.setSplitKeys({splitPointsBeginIt, splitPointsEndIt});
|
||||
req.setFrom(shardId.toString());
|
||||
req.setEpoch(epoch);
|
||||
req.setTimestamp(timestamp);
|
||||
|
||||
chunkRange.serialize(&cmd);
|
||||
cmd.append("splitKeys", splitPointsBeginIt, splitPointsEndIt);
|
||||
|
||||
BSONObj cmdObj = cmd.obj();
|
||||
BSONObj cmdObj = req.toBSON();
|
||||
|
||||
Status status{ErrorCodes::InternalError, "Uninitialized value"};
|
||||
BSONObj cmdResponse;
|
||||
|
||||
@ -500,6 +500,33 @@ structs:
|
||||
type: safeInt64
|
||||
description: "Maximum number of documents in the capped collection"
|
||||
|
||||
ShardsvrSplitChunkRequest:
|
||||
description: "_shardsvrSplitChunk command parameters"
|
||||
strict: false
|
||||
fields:
|
||||
keyPattern:
|
||||
type: object_owned
|
||||
description: "The shard key pattern for the collection."
|
||||
min:
|
||||
type: object_owned
|
||||
description: "The min bound of the chunk range to split."
|
||||
max:
|
||||
type: object_owned
|
||||
description: "The max bound of the chunk range to split."
|
||||
splitKeys:
|
||||
type: array<object>
|
||||
description: "The split points for the chunk."
|
||||
from:
|
||||
type: string
|
||||
description: "The shard name."
|
||||
epoch:
|
||||
type: objectid
|
||||
description: "The expected collection epoch."
|
||||
timestamp:
|
||||
type: timestamp
|
||||
optional: true
|
||||
description: "The expected collection timestamp."
|
||||
|
||||
commands:
|
||||
_shardsvrCreateCollection:
|
||||
command_name: _shardsvrCreateCollection
|
||||
@ -896,6 +923,18 @@ commands:
|
||||
chained_structs:
|
||||
ShardsvrConvertToCappedRequest: ShardsvrConvertToCappedRequest
|
||||
|
||||
_shardsvrSplitChunk:
|
||||
command_name: _shardsvrSplitChunk
|
||||
command_alias: splitChunk
|
||||
cpp_name: ShardsvrSplitChunk
|
||||
description: "The internal split command for a shard."
|
||||
strict: false
|
||||
namespace: type
|
||||
type: namespacestring
|
||||
api_version: ""
|
||||
chained_structs:
|
||||
ShardsvrSplitChunkRequest: ShardsvrSplitChunkRequest
|
||||
|
||||
# TODO (SERVER-116499): Remove this command once 9.0 becomes last LTS.
|
||||
_shardsvrUpgradeDowngradeViewlessTimeseries:
|
||||
command_name: _shardsvrUpgradeDowngradeViewlessTimeseries
|
||||
|
||||
@ -68,6 +68,7 @@ enums:
|
||||
# TODO (SERVER-116499): Remove this coordinator type once 9.0 becomes last LTS.
|
||||
kTimeseriesUpgradeDowngrade: "upgradeDowngradeViewlessTimeseries"
|
||||
kMergeChunks: "mergeChunks"
|
||||
kSplitChunk: "splitChunk"
|
||||
kTestCoordinator: "testCoordinator"
|
||||
|
||||
# TODO (SERVER-98118): Remove this enum once v9.0 become last-lts.
|
||||
|
||||
@ -56,6 +56,7 @@
|
||||
#include "mongo/db/global_catalog/ddl/rename_collection_coordinator.h"
|
||||
#include "mongo/db/global_catalog/ddl/set_allow_migrations_coordinator.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharding_coordinator.h"
|
||||
#include "mongo/db/global_catalog/ddl/split_chunk_coordinator.h"
|
||||
#include "mongo/db/global_catalog/ddl/timeseries_upgrade_downgrade_coordinator.h"
|
||||
#include "mongo/db/global_catalog/ddl/untrack_unsplittable_collection_coordinator.h"
|
||||
#include "mongo/db/pipeline/aggregate_command_gen.h"
|
||||
@ -133,6 +134,7 @@ constexpr std::pair<CoordinatorTypeEnum,
|
||||
{CoordinatorTypeEnum::kTimeseriesUpgradeDowngrade,
|
||||
typedInstance<TimeseriesUpgradeDowngradeCoordinator>},
|
||||
{CoordinatorTypeEnum::kMergeChunks, typedInstance<MergeChunksCoordinator>},
|
||||
{CoordinatorTypeEnum::kSplitChunk, typedInstance<SplitChunkCoordinator>},
|
||||
{CoordinatorTypeEnum::kTestCoordinator, noInstance},
|
||||
};
|
||||
|
||||
|
||||
@ -27,44 +27,28 @@
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/bson/bsonelement.h"
|
||||
#include "mongo/bson/bsonmisc.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/bson/bsontypes.h"
|
||||
#include "mongo/bson/oid.h"
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/bson/util/bson_extract.h"
|
||||
#include "mongo/db/auth/action_type.h"
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/auth/resource_pattern.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/database_name.h"
|
||||
#include "mongo/db/database_name_util.h"
|
||||
#include "mongo/db/commands/feature_compatibility_version.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/db/global_catalog/ddl/split_chunk.h"
|
||||
#include "mongo/db/global_catalog/ddl/split_chunk_coordinator.h"
|
||||
#include "mongo/db/global_catalog/type_chunk.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/namespace_string_util.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/s/active_migrations_registry.h"
|
||||
#include "mongo/db/s/chunk_operation_precondition_checks.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/operation_sharding_state.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/shard_filtering_metadata_refresh.h"
|
||||
#include "mongo/db/tenant_id.h"
|
||||
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
||||
#include "mongo/db/sharding_environment/sharding_runtime_d_params_gen.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/db/versioning_protocol/chunk_version.h"
|
||||
#include "mongo/db/version_context.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/move/utility_core.hpp>
|
||||
#include <boost/none.hpp>
|
||||
#include <boost/optional/optional.hpp>
|
||||
|
||||
@ -74,19 +58,81 @@
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class SplitChunkCommand : public ErrmsgCommandDeprecated {
|
||||
/**
|
||||
* Attempts to execute the split through the sharding coordinator service, retrying while a
|
||||
* conflicting coordinator is already running for the same namespace. Returns true if the split
|
||||
* completed via the coordinator; returns false if the caller should fall back to the legacy
|
||||
* config-server path (because the authoritative metadata feature flag is disabled). Throws
|
||||
* ConflictingOperationInProgress if the configured retry budget is exhausted.
|
||||
*/
|
||||
bool tryRunSplitChunkCoordinator(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const ShardsvrSplitChunk& req) {
|
||||
// If a conflicting split coordinator is already running for this namespace,
|
||||
// wait for it to complete and retry.
|
||||
// TODO (SERVER-125033): Remove the retry-loop once this task gets done.
|
||||
const int maxConflictRetries = shardsvrSplitChunkMaxConflictRetries.load();
|
||||
Status lastConflictStatus = Status::OK();
|
||||
for (int retries = 0; retries < maxConflictRetries; ++retries) {
|
||||
boost::optional<FixedFCVRegion> optFixedFcvRegion{boost::in_place_init, opCtx};
|
||||
|
||||
if (!feature_flags::gShardAuthoritativeCollMetadata.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
optFixedFcvRegion.get()->acquireFCVSnapshot())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto coordinatorDoc = SplitChunkCoordinatorDocument();
|
||||
coordinatorDoc.setShardsvrSplitChunkRequest(req.getShardsvrSplitChunkRequest());
|
||||
coordinatorDoc.setShardingCoordinatorMetadata({{nss, CoordinatorTypeEnum::kSplitChunk}});
|
||||
|
||||
// Defer option conflict checking to the explicit checkIfOptionsConflict
|
||||
// call below, allowing the retry loop to handle ConflictingOperationInProgress.
|
||||
auto service = ShardingCoordinatorService::getService(opCtx);
|
||||
auto coordinator = checked_pointer_cast<SplitChunkCoordinator>(service->getOrCreateInstance(
|
||||
opCtx, coordinatorDoc.toBSON(), *optFixedFcvRegion, false /*checkOptions*/));
|
||||
|
||||
try {
|
||||
coordinator->checkIfOptionsConflict(coordinatorDoc.toBSON());
|
||||
} catch (const ExceptionFor<ErrorCodes::ConflictingOperationInProgress>& ex) {
|
||||
LOGV2_DEBUG(12117801,
|
||||
1,
|
||||
"Split chunk coordinator already running, waiting for completion",
|
||||
"namespace"_attr = nss,
|
||||
"error"_attr = ex);
|
||||
lastConflictStatus = ex.toStatus();
|
||||
optFixedFcvRegion.reset();
|
||||
coordinator->getCompletionFuture().getNoThrow(opCtx).ignore();
|
||||
continue;
|
||||
}
|
||||
|
||||
optFixedFcvRegion.reset();
|
||||
coordinator->getCompletionFuture().get(opCtx);
|
||||
return true;
|
||||
}
|
||||
|
||||
uasserted(ErrorCodes::ConflictingOperationInProgress,
|
||||
str::stream() << "Failed to execute split chunk for namespace "
|
||||
<< nss.toStringForErrorMsg() << " after " << maxConflictRetries
|
||||
<< " retries due to conflicting operations. Last conflict: "
|
||||
<< lastConflictStatus.reason());
|
||||
}
|
||||
|
||||
class ShardsvrSplitChunkCommand final : public TypedCommand<ShardsvrSplitChunkCommand> {
|
||||
public:
|
||||
SplitChunkCommand() : ErrmsgCommandDeprecated("splitChunk", "_shardsvrSplitChunk") {}
|
||||
using Request = ShardsvrSplitChunk;
|
||||
|
||||
ShardsvrSplitChunkCommand() : TypedCommand(Request::kCommandName, Request::kCommandAlias) {}
|
||||
|
||||
std::string help() const override {
|
||||
return "internal command usage only\n"
|
||||
"example:\n"
|
||||
" { splitChunk:\"db.foo\" , keyPattern: {a:1} , min : {a:100} , max: {a:200} { "
|
||||
"splitKeys : [ {a:150} , ... ]}";
|
||||
" { _shardsvrSplitChunk: \"db.foo\", keyPattern: {a:1}, min: {a:100},"
|
||||
" max: {a:200}, splitKeys: [{a:150}] }";
|
||||
}
|
||||
|
||||
bool supportsWriteConcern(const BSONObj& cmd) const override {
|
||||
return false;
|
||||
bool skipApiVersionCheck() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
@ -97,107 +143,70 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
Status checkAuthForOperation(OperationContext* opCtx,
|
||||
const DatabaseName& dbName,
|
||||
const BSONObj&) const override {
|
||||
if (!AuthorizationSession::get(opCtx->getClient())
|
||||
->isAuthorizedForActionsOnResource(
|
||||
ResourcePattern::forClusterResource(dbName.tenantId()),
|
||||
ActionType::internal)) {
|
||||
return Status(ErrorCodes::Unauthorized, "Unauthorized");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
class Invocation final : public InvocationBase {
|
||||
public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
NamespaceString parseNs(const DatabaseName& dbName, const BSONObj& cmdObj) const override {
|
||||
return NamespaceStringUtil::deserialize(dbName.tenantId(),
|
||||
CommandHelpers::parseNsFullyQualified(cmdObj),
|
||||
SerializationContext::stateDefault());
|
||||
}
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
ShardingState::get(opCtx)->assertCanAcceptShardedCommands();
|
||||
|
||||
bool errmsgRun(OperationContext* opCtx,
|
||||
const DatabaseName& dbName,
|
||||
const BSONObj& cmdObj,
|
||||
std::string& errmsg,
|
||||
BSONObjBuilder& result) override {
|
||||
ShardingState::get(opCtx)->assertCanAcceptShardedCommands();
|
||||
const auto& nss = ns();
|
||||
const auto& req = request();
|
||||
|
||||
const NamespaceString nss(parseNs(dbName, cmdObj));
|
||||
|
||||
// Check whether parameters passed to splitChunk are sound
|
||||
BSONObj keyPatternObj;
|
||||
{
|
||||
BSONElement keyPatternElem;
|
||||
auto keyPatternStatus =
|
||||
bsonExtractTypedField(cmdObj, "keyPattern", BSONType::object, &keyPatternElem);
|
||||
|
||||
if (!keyPatternStatus.isOK()) {
|
||||
errmsg = "need to specify the key pattern the collection is sharded over";
|
||||
return false;
|
||||
}
|
||||
keyPatternObj = keyPatternElem.Obj();
|
||||
}
|
||||
|
||||
auto chunkRange = ChunkRange::fromBSON(cmdObj);
|
||||
|
||||
std::string shardName;
|
||||
auto parseShardNameStatus = bsonExtractStringField(cmdObj, "from", &shardName);
|
||||
uassertStatusOK(parseShardNameStatus);
|
||||
|
||||
LOGV2(22104, "Received splitChunk request", "request"_attr = redact(cmdObj));
|
||||
|
||||
std::vector<BSONObj> splitKeys;
|
||||
{
|
||||
BSONElement splitKeysElem;
|
||||
auto splitKeysElemStatus =
|
||||
bsonExtractTypedField(cmdObj, "splitKeys", BSONType::array, &splitKeysElem);
|
||||
|
||||
if (!splitKeysElemStatus.isOK()) {
|
||||
errmsg = "need to provide the split points to chunk over";
|
||||
return false;
|
||||
if (tryRunSplitChunkCoordinator(opCtx, nss, req)) {
|
||||
return;
|
||||
}
|
||||
|
||||
BSONObjIterator it(splitKeysElem.Obj());
|
||||
while (it.more()) {
|
||||
splitKeys.push_back(it.next().Obj().getOwned());
|
||||
// Legacy path: precondition checks + splitChunk().
|
||||
auto chunkRange = ChunkRange(req.getMin(), req.getMax());
|
||||
uassertStatusOK(ChunkRange::validate(chunkRange));
|
||||
|
||||
{
|
||||
uassertStatusOK(
|
||||
FilteringMetadataCache::get(opCtx)->onCollectionPlacementVersionMismatch(
|
||||
opCtx, nss, boost::none));
|
||||
const auto metadata =
|
||||
checkCollectionIdentity(opCtx, nss, req.getEpoch(), req.getTimestamp());
|
||||
checkShardKeyPattern(opCtx, nss, metadata, chunkRange);
|
||||
checkChunkMatchesRange(opCtx, nss, metadata, chunkRange);
|
||||
}
|
||||
|
||||
auto scopedChunk =
|
||||
uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk(
|
||||
opCtx, nss, chunkRange));
|
||||
|
||||
uassertStatusOK(splitChunk(
|
||||
opCtx,
|
||||
nss,
|
||||
req.getKeyPattern(),
|
||||
chunkRange,
|
||||
std::vector<BSONObj>(req.getSplitKeys().begin(), req.getSplitKeys().end()),
|
||||
std::string{req.getFrom()},
|
||||
req.getEpoch(),
|
||||
req.getTimestamp(),
|
||||
scopedChunk));
|
||||
}
|
||||
|
||||
OID expectedCollectionEpoch;
|
||||
uassertStatusOK(bsonExtractOIDField(cmdObj, "epoch", &expectedCollectionEpoch));
|
||||
|
||||
boost::optional<Timestamp> expectedCollectionTimestamp;
|
||||
if (cmdObj["timestamp"]) {
|
||||
expectedCollectionTimestamp.emplace();
|
||||
uassertStatusOK(bsonExtractTimestampField(
|
||||
cmdObj, "timestamp", expectedCollectionTimestamp.get_ptr()));
|
||||
private:
|
||||
NamespaceString ns() const override {
|
||||
return request().getCommandParameter();
|
||||
}
|
||||
|
||||
// Check that the preconditions for split chunk are met and throw StaleShardVersion
|
||||
// otherwise.
|
||||
{
|
||||
uassertStatusOK(
|
||||
FilteringMetadataCache::get(opCtx)->onCollectionPlacementVersionMismatch(
|
||||
opCtx, nss, boost::none));
|
||||
const auto metadata = checkCollectionIdentity(
|
||||
opCtx, nss, expectedCollectionEpoch, expectedCollectionTimestamp);
|
||||
checkShardKeyPattern(opCtx, nss, metadata, chunkRange);
|
||||
checkChunkMatchesRange(opCtx, nss, metadata, chunkRange);
|
||||
bool supportsWriteConcern() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
uassertStatusOK(splitChunk(opCtx,
|
||||
nss,
|
||||
keyPatternObj,
|
||||
chunkRange,
|
||||
std::move(splitKeys),
|
||||
shardName,
|
||||
expectedCollectionEpoch,
|
||||
expectedCollectionTimestamp));
|
||||
|
||||
return true;
|
||||
}
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {
|
||||
uassert(ErrorCodes::Unauthorized,
|
||||
"Unauthorized",
|
||||
AuthorizationSession::get(opCtx->getClient())
|
||||
->isAuthorizedForActionsOnResource(
|
||||
ResourcePattern::forClusterResource(request().getDbName().tenantId()),
|
||||
ActionType::internal));
|
||||
}
|
||||
};
|
||||
};
|
||||
MONGO_REGISTER_COMMAND(SplitChunkCommand).forShard();
|
||||
MONGO_REGISTER_COMMAND(ShardsvrSplitChunkCommand).forShard();
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
|
||||
@ -157,10 +157,8 @@ Status splitChunk(OperationContext* opCtx,
|
||||
std::vector<BSONObj>&& splitPoints,
|
||||
const std::string& shardName,
|
||||
const OID& expectedCollectionEpoch,
|
||||
const boost::optional<Timestamp>& expectedCollectionTimestamp) {
|
||||
auto scopedSplitOrMergeChunk(uassertStatusOK(
|
||||
ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk(opCtx, nss, chunkRange)));
|
||||
|
||||
const boost::optional<Timestamp>& expectedCollectionTimestamp,
|
||||
const ScopedSplitMergeChunk&) {
|
||||
// If the shard key is hashed, then we must make sure that the split points are of supported
|
||||
// data types.
|
||||
const auto hashedField = ShardKeyPattern::extractHashedField(keyPatternObj);
|
||||
|
||||
@ -47,12 +47,17 @@ class BSONObj;
|
||||
class ChunkRange;
|
||||
class NamespaceString;
|
||||
class OperationContext;
|
||||
class ScopedSplitMergeChunk;
|
||||
|
||||
/**
|
||||
* Attempts to split a chunk with the specified parameters. If the split fails, then the StatusWith
|
||||
* object returned will contain a Status with an ErrorCode regarding the cause of failure. If the
|
||||
* split succeeds, then the StatusWith object returned will contain Status::Ok().
|
||||
* Will update the shard's filtering metadata.
|
||||
*
|
||||
* The caller must hold the ActiveMigrationsRegistry split/merge lock for this chunk range and pass
|
||||
* it as scopedSplitMergeChunk to prove exclusive ownership. The lock must remain live for the
|
||||
* duration of the call.
|
||||
*/
|
||||
Status splitChunk(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
@ -61,6 +66,7 @@ Status splitChunk(OperationContext* opCtx,
|
||||
std::vector<BSONObj>&& splitPoints,
|
||||
const std::string& shardName,
|
||||
const OID& expectedCollectionEpoch,
|
||||
const boost::optional<Timestamp>& expectedCollectionTimestamp);
|
||||
const boost::optional<Timestamp>& expectedCollectionTimestamp,
|
||||
const ScopedSplitMergeChunk& scopedSplitMergeChunk);
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
159
src/mongo/db/global_catalog/ddl/split_chunk_coordinator.cpp
Normal file
159
src/mongo/db/global_catalog/ddl/split_chunk_coordinator.cpp
Normal file
@ -0,0 +1,159 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/global_catalog/ddl/split_chunk_coordinator.h"
|
||||
|
||||
#include "mongo/bson/simple_bsonobj_comparator.h"
|
||||
#include "mongo/db/global_catalog/ddl/split_chunk.h"
|
||||
#include "mongo/db/s/chunk_operation_precondition_checks.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/shard_filtering_metadata_refresh.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/util/future_util.h"
|
||||
#include "mongo/util/str.h"
|
||||
|
||||
#include <boost/none.hpp>
|
||||
#include <boost/optional.hpp>
|
||||
#include <boost/optional/optional.hpp>
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
|
||||
|
||||
namespace mongo {
|
||||
|
||||
SplitChunkCoordinator::SplitChunkCoordinator(ShardingCoordinatorService* service,
|
||||
const BSONObj& initialStateDoc)
|
||||
: ChunkOperationShardingCoordinator(service, "SplitChunkCoordinator", initialStateDoc),
|
||||
_request(_doc.getShardsvrSplitChunkRequest()) {}
|
||||
|
||||
void SplitChunkCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
|
||||
const auto otherDoc = SplitChunkCoordinatorDocument::parse(
|
||||
doc, IDLParserContext("SplitChunkCoordinatorDocument"));
|
||||
|
||||
const auto& selfReq = _request.toBSON();
|
||||
const auto& otherReq = otherDoc.getShardsvrSplitChunkRequest().toBSON();
|
||||
|
||||
uassert(ErrorCodes::ConflictingOperationInProgress,
|
||||
str::stream() << "Another split chunk operation for namespace "
|
||||
<< nss().toStringForErrorMsg()
|
||||
<< " is being executed with different parameters: " << redact(selfReq)
|
||||
<< " vs " << redact(otherReq),
|
||||
SimpleBSONObjComparator::kInstance.evaluate(selfReq == otherReq));
|
||||
}
|
||||
|
||||
void SplitChunkCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const {
|
||||
cmdInfoBuilder->appendElements(_request.toBSON());
|
||||
}
|
||||
|
||||
bool SplitChunkCoordinator::isInCriticalSection(Phase phase) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
ExecutorFuture<void> SplitChunkCoordinator::_acquireLocksAsync(
|
||||
OperationContext* opCtx,
|
||||
std::shared_ptr<executor::ScopedTaskExecutor> executor,
|
||||
const CancellationToken& token) {
|
||||
return AsyncTry([this, anchor = shared_from_this()] {
|
||||
auto opCtxHolder = makeOperationContext(/*deprioritizable=*/true);
|
||||
auto* opCtx = opCtxHolder.get();
|
||||
|
||||
auto chunkRange = ChunkRange(_request.getMin(), _request.getMax());
|
||||
_scopedSplitMergeChunk.emplace(
|
||||
uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk(
|
||||
opCtx, nss(), chunkRange)));
|
||||
})
|
||||
.until([this, anchor = shared_from_this()](Status status) {
|
||||
if (!status.isOK()) {
|
||||
LOGV2_WARNING(12117803,
|
||||
"ActiveMigrationsRegistry lock acquisition attempt failed",
|
||||
logv2::DynamicAttributes{getCoordinatorLogAttrs(),
|
||||
"error"_attr = redact(status)});
|
||||
}
|
||||
return !_recoveredFromDisk || status.isOK();
|
||||
})
|
||||
.withBackoffBetweenIterations(kExponentialBackoff)
|
||||
.on(**executor, token);
|
||||
}
|
||||
|
||||
void SplitChunkCoordinator::_releaseLocks(OperationContext* opCtx) {
|
||||
_scopedSplitMergeChunk.reset();
|
||||
}
|
||||
|
||||
ExecutorFuture<void> SplitChunkCoordinator::_runImpl(
|
||||
std::shared_ptr<executor::ScopedTaskExecutor> executor,
|
||||
const CancellationToken& token) noexcept {
|
||||
return ExecutorFuture<void>(**executor).then([this, anchor = shared_from_this()] {
|
||||
auto opCtxHolder = makeOperationContext(/*deprioritizable=*/true);
|
||||
auto* opCtx = opCtxHolder.get();
|
||||
|
||||
const auto& keyPatternObj = _request.getKeyPattern();
|
||||
auto chunkRange = ChunkRange(_request.getMin(), _request.getMax());
|
||||
uassertStatusOK(ChunkRange::validate(chunkRange));
|
||||
const auto& splitKeys = _request.getSplitKeys();
|
||||
const std::string shardName{_request.getFrom()};
|
||||
const auto& expectedCollectionEpoch = _request.getEpoch();
|
||||
const auto& expectedCollectionTimestamp = _request.getTimestamp();
|
||||
|
||||
LOGV2(12117800,
|
||||
"Running split chunk operation",
|
||||
logAttrs(nss()),
|
||||
"range"_attr = chunkRange.toString());
|
||||
|
||||
uassert(ErrorCodes::InvalidOptions,
|
||||
"need to provide the split points to chunk over",
|
||||
!splitKeys.empty());
|
||||
|
||||
// Verify placement version, collection identity, shard key pattern and chunk range
|
||||
// are still valid.
|
||||
{
|
||||
uassertStatusOK(
|
||||
FilteringMetadataCache::get(opCtx)->onCollectionPlacementVersionMismatch(
|
||||
opCtx, nss(), boost::none));
|
||||
const auto metadata = checkCollectionIdentity(
|
||||
opCtx, nss(), expectedCollectionEpoch, expectedCollectionTimestamp);
|
||||
checkShardKeyPattern(opCtx, nss(), metadata, chunkRange);
|
||||
checkChunkMatchesRange(opCtx, nss(), metadata, chunkRange);
|
||||
}
|
||||
|
||||
uassertStatusOK(splitChunk(opCtx,
|
||||
nss(),
|
||||
keyPatternObj,
|
||||
chunkRange,
|
||||
std::vector<BSONObj>(splitKeys.begin(), splitKeys.end()),
|
||||
shardName,
|
||||
expectedCollectionEpoch,
|
||||
expectedCollectionTimestamp,
|
||||
*_scopedSplitMergeChunk));
|
||||
|
||||
LOGV2(12117802,
|
||||
"Completed split chunk operation",
|
||||
logAttrs(nss()),
|
||||
"range"_attr = chunkRange.toString());
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
64
src/mongo/db/global_catalog/ddl/split_chunk_coordinator.h
Normal file
64
src/mongo/db/global_catalog/ddl/split_chunk_coordinator.h
Normal file
@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/db/global_catalog/ddl/chunk_operation_sharding_coordinator.h"
|
||||
#include "mongo/db/global_catalog/ddl/split_chunk_coordinator_document_gen.h"
|
||||
#include "mongo/db/s/active_migrations_registry.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class SplitChunkCoordinator final
|
||||
: public ChunkOperationShardingCoordinator<SplitChunkCoordinatorDocument> {
|
||||
public:
|
||||
SplitChunkCoordinator(ShardingCoordinatorService* service, const BSONObj& initialStateDoc);
|
||||
|
||||
void checkIfOptionsConflict(const BSONObj& doc) const final;
|
||||
|
||||
void appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const override;
|
||||
|
||||
protected:
|
||||
bool isInCriticalSection(Phase phase) const override;
|
||||
|
||||
private:
|
||||
ExecutorFuture<void> _acquireLocksAsync(OperationContext* opCtx,
|
||||
std::shared_ptr<executor::ScopedTaskExecutor> executor,
|
||||
const CancellationToken& token) override;
|
||||
|
||||
void _releaseLocks(OperationContext* opCtx) override;
|
||||
|
||||
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
|
||||
const CancellationToken& token) noexcept override;
|
||||
|
||||
const ShardsvrSplitChunkRequest _request;
|
||||
boost::optional<ScopedSplitMergeChunk> _scopedSplitMergeChunk;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
@ -0,0 +1,58 @@
|
||||
# Copyright (C) 2026-present MongoDB, Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the Server Side Public License, version 1,
|
||||
# as published by MongoDB, Inc.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# Server Side Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the Server Side Public License
|
||||
# along with this program. If not, see
|
||||
# <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
#
|
||||
# As a special exception, the copyright holders give permission to link the
|
||||
# code of portions of this program with the OpenSSL library under certain
|
||||
# conditions as described in each individual source file and distribute
|
||||
# linked combinations including the program with the OpenSSL library. You
|
||||
# must comply with the Server Side Public License in all respects for
|
||||
# all of the code used other than as permitted herein. If you modify file(s)
|
||||
# with this exception, you may extend this exception to your version of the
|
||||
# file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
# delete this exception statement from your version. If you delete this
|
||||
# exception statement from all source files in the program, then also delete
|
||||
# it in the license file.
|
||||
#
|
||||
|
||||
# This file defines the format of the state document for the split chunk coordinator.
|
||||
|
||||
global:
|
||||
mod_visibility: private
|
||||
cpp_namespace: "mongo"
|
||||
|
||||
imports:
|
||||
- "mongo/db/basic_types.idl"
|
||||
- "mongo/db/global_catalog/ddl/sharding_coordinator.idl"
|
||||
- "mongo/db/global_catalog/ddl/sharded_ddl_commands.idl"
|
||||
|
||||
enums:
|
||||
SplitChunkCoordinatorPhase:
|
||||
description: "Phases for the split chunk coordinator."
|
||||
type: string
|
||||
values:
|
||||
kUnset: "unset"
|
||||
|
||||
structs:
|
||||
SplitChunkCoordinatorDocument:
|
||||
description: "State document for the split chunk coordinator."
|
||||
generate_comparison_operators: false
|
||||
strict: false
|
||||
chained_structs:
|
||||
ShardingCoordinatorMetadata: ShardingCoordinatorMetadata
|
||||
ShardsvrSplitChunkRequest: ShardsvrSplitChunkRequest
|
||||
fields:
|
||||
phase:
|
||||
type: SplitChunkCoordinatorPhase
|
||||
default: kUnset
|
||||
@ -660,6 +660,8 @@ mongo_cc_library(
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_untrack_unsplittable_collection_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_upgrade_downgrade_viewless_timeseries_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_validate_shard_key_candidate.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:split_chunk_coordinator.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:split_chunk_coordinator_document_gen",
|
||||
"//src/mongo/db/global_catalog/ddl:timeseries_upgrade_downgrade_coordinator.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:timeseries_upgrade_downgrade_coordinator_document_gen",
|
||||
"//src/mongo/db/global_catalog/ddl:untrack_unsplittable_collection_coordinator.cpp",
|
||||
|
||||
@ -227,3 +227,16 @@ server_parameters:
|
||||
gt: 0
|
||||
default: 10
|
||||
redact: false
|
||||
|
||||
# TODO (SERVER-125033): Remove this parameter once this task gets done.
|
||||
shardsvrSplitChunkMaxConflictRetries:
|
||||
description: >-
|
||||
Maximum number of times _shardsvrSplitChunk will retry when a conflicting split
|
||||
chunk coordinator is already running for the same namespace.
|
||||
set_at: [startup, runtime]
|
||||
cpp_vartype: AtomicWord<int>
|
||||
cpp_varname: shardsvrSplitChunkMaxConflictRetries
|
||||
validator:
|
||||
gt: 0
|
||||
default: 10
|
||||
redact: false
|
||||
|
||||
Loading…
Reference in New Issue
Block a user