SERVER-127053 Prepare configsvr chunk opertion's commands to commit authoritatively (#54088)

GitOrigin-RevId: abc90243e6d0865aa7bdd882f6d4e76152bbadd9
This commit is contained in:
Silvia Surroca 2026-05-21 17:41:10 +02:00 committed by MongoDB Bot
parent 88e77b9125
commit b0bcbc2c5a
5 changed files with 576 additions and 48 deletions

View File

@ -0,0 +1,334 @@
/**
* Tests retryable-write replay protection on the authoritative path for the three configsvr chunk
* commands: `_configsvrCommitChunkSplit`, `_configsvrCommitChunksMerge`, and
* `_configsvrCommitMergeAllChunksOnShard`.
*
* Each command wraps its catalog-manager call in an AlternativeClientRegion and follows it with a
* dummy write to NamespaceString::kServerConfigurationNamespace so that an older request on the
* same session cannot replay the operation onto a newer state.
*
* For `_configsvrCommitMergeAllChunksOnShard`, the catalog manager only merges chunks whose
* `onCurrentShardSince` is older than the snapshot history window; the test does not assume a real
* merge happened. The signals under test are the dummy bookkeeping write, the `TransactionTooOld`
* response for stale txnNumbers, and idempotency for retried txnNumbers.
*
* @tags: [
* featureFlagAuthoritativeShardsDDL,
* ]
*/
import {after, before, describe, it} from "jstests/libs/mochalite.js";
import {RetryableWritesUtil} from "jstests/libs/retryable_writes_util.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
function countChunks(st, uuid) {
return st.s.getCollection("config.chunks").countDocuments({uuid: uuid});
}
function getDummyDoc(st, id) {
return st.configRS.getPrimary().getDB("admin").system.version.findOne({_id: id});
}
// Retries the command until a non-retryable response is received, then returns it.
function runUntilNonRetryable(st, cmd) {
let res;
assert.soon(() => {
res = st.configRS.getPrimary().adminCommand(cmd);
if (
RetryableWritesUtil.isRetryableCode(res.code) ||
RetryableWritesUtil.errmsgContainsRetryableCodeName(res.errmsg) ||
(res.writeConcernError && RetryableWritesUtil.isRetryableCode(res.writeConcernError.code))
) {
return false;
}
return true;
});
return res;
}
describe("_configsvrCommitChunkSplit retryability", function () {
const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;
const dummyDocId = "commitChunkSplitStats";
before(function () {
this.st = new ShardingTest({shards: 1});
assert.commandWorked(this.st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
const coll = this.st.s.getCollection("config.collections").findOne({_id: ns});
this.collEpoch = coll.lastmodEpoch;
this.collTimestamp = coll.timestamp;
this.collUuid = coll.uuid;
this.shardName = this.st.shard0.shardName;
this.lsid = assert.commandWorked(this.st.s.getDB("admin").runCommand({startSession: 1})).id;
});
after(function () {
this.st.stop();
});
const run = (st, ns, epoch, timestamp, shardName, lsid, txnNumber) =>
runUntilNonRetryable(st, {
_configsvrCommitChunkSplit: ns,
collEpoch: epoch,
collTimestamp: timestamp,
min: {x: MinKey},
max: {x: MaxKey},
splitPoints: [{x: 0}],
shard: shardName,
lsid: lsid,
txnNumber: txnNumber,
writeConcern: {w: "majority"},
});
it("splits a chunk in the authoritative branch and records the dummy bookkeeping write", function () {
assert.eq(countChunks(this.st, this.collUuid), 1, "expected single initial chunk", {ns});
assert.commandWorked(
run(this.st, ns, this.collEpoch, this.collTimestamp, this.shardName, this.lsid, NumberLong(1)),
);
assert.eq(countChunks(this.st, this.collUuid), 2, "expected two chunks after split");
const dummy = getDummyDoc(this.st, dummyDocId);
assert(dummy, "expected dummy bookkeeping doc to be present", {dummyDocId});
assert.gte(dummy.count, 1, "expected dummy doc count >= 1", {dummy});
});
it("rejects an older txnNumber on the same lsid with TransactionTooOld", function () {
assert.commandFailedWithCode(
run(this.st, ns, this.collEpoch, this.collTimestamp, this.shardName, this.lsid, NumberLong(0)),
ErrorCodes.TransactionTooOld,
);
});
it("is idempotent when re-invoked with the same lsid and txnNumber", function () {
const chunkCountBefore = countChunks(this.st, this.collUuid);
const dummyBefore = getDummyDoc(this.st, dummyDocId);
assert.commandWorked(
run(this.st, ns, this.collEpoch, this.collTimestamp, this.shardName, this.lsid, NumberLong(1)),
);
const chunkCountAfter = countChunks(this.st, this.collUuid);
const dummyAfter = getDummyDoc(this.st, dummyDocId);
assert.eq(chunkCountBefore, chunkCountAfter, "split must not be re-applied for the same txnNumber", {
chunkCountBefore,
chunkCountAfter,
});
assert.eq(
dummyBefore.count,
dummyAfter.count,
"dummy bookkeeping count must not advance on a retried txnNumber",
{dummyBefore, dummyAfter},
);
});
});
describe("_configsvrCommitChunksMerge retryability", function () {
const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;
const dummyDocId = "commitChunksMergeStats";
// Range covering the two upper chunks {[0, 100), [100, MaxKey)} so a successful merge
// collapses them into a single chunk [0, MaxKey).
const mergeRange = {min: {x: 0}, max: {x: MaxKey}};
before(function () {
this.st = new ShardingTest({shards: 1});
assert.commandWorked(this.st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
// Pre-split into three chunks: [MinKey, 0), [0, 100), [100, MaxKey).
assert.commandWorked(this.st.s.adminCommand({split: ns, middle: {x: 0}}));
assert.commandWorked(this.st.s.adminCommand({split: ns, middle: {x: 100}}));
const coll = this.st.s.getCollection("config.collections").findOne({_id: ns});
this.collEpoch = coll.lastmodEpoch;
this.collTimestamp = coll.timestamp;
this.collUuid = coll.uuid;
this.shardName = this.st.shard0.shardName;
this.lsid = assert.commandWorked(this.st.s.getDB("admin").runCommand({startSession: 1})).id;
});
after(function () {
this.st.stop();
});
const run = (st, ns, shardName, collUuid, epoch, timestamp, lsid, txnNumber) =>
runUntilNonRetryable(st, {
_configsvrCommitChunksMerge: ns,
shard: shardName,
collUUID: {uuid: collUuid},
chunkRange: mergeRange,
epoch: epoch,
timestamp: timestamp,
lsid: lsid,
txnNumber: txnNumber,
writeConcern: {w: "majority"},
});
it("merges a range in the authoritative branch and records the dummy bookkeeping write", function () {
assert.eq(countChunks(this.st, this.collUuid), 3, "expected three pre-split chunks", {ns});
assert.commandWorked(
run(
this.st,
ns,
this.shardName,
this.collUuid,
this.collEpoch,
this.collTimestamp,
this.lsid,
NumberLong(1),
),
);
assert.eq(countChunks(this.st, this.collUuid), 2, "expected two chunks after merge");
const dummy = getDummyDoc(this.st, dummyDocId);
assert(dummy, "expected dummy bookkeeping doc to be present", {dummyDocId});
assert.gte(dummy.count, 1, "expected dummy doc count >= 1", {dummy});
});
it("rejects an older txnNumber on the same lsid with TransactionTooOld", function () {
assert.commandFailedWithCode(
run(
this.st,
ns,
this.shardName,
this.collUuid,
this.collEpoch,
this.collTimestamp,
this.lsid,
NumberLong(0),
),
ErrorCodes.TransactionTooOld,
);
});
it("is idempotent when re-invoked with the same lsid and txnNumber", function () {
const chunkCountBefore = countChunks(this.st, this.collUuid);
const dummyBefore = getDummyDoc(this.st, dummyDocId);
assert.commandWorked(
run(
this.st,
ns,
this.shardName,
this.collUuid,
this.collEpoch,
this.collTimestamp,
this.lsid,
NumberLong(1),
),
);
const chunkCountAfter = countChunks(this.st, this.collUuid);
const dummyAfter = getDummyDoc(this.st, dummyDocId);
assert.eq(chunkCountBefore, chunkCountAfter, "merge must not be re-applied for the same txnNumber", {
chunkCountBefore,
chunkCountAfter,
});
assert.eq(
dummyBefore.count,
dummyAfter.count,
"dummy bookkeeping count must not advance on a retried txnNumber",
{dummyBefore, dummyAfter},
);
});
});
describe("_configsvrCommitMergeAllChunksOnShard retryability", function () {
const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;
const dummyDocId = "commitMergeAllChunksOnShardStats";
const kIntMax = NumberInt(2147483647);
before(function () {
this.st = new ShardingTest({shards: 1});
assert.commandWorked(this.st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
// Pre-split into four chunks on shard0 so that, if the catalog manager finds any of them
// mergeable, the merge has something to do. The test does not assume a successful merge.
for (const middle of [{x: 0}, {x: 100}, {x: 200}]) {
assert.commandWorked(this.st.s.adminCommand({split: ns, middle: middle}));
}
const coll = this.st.s.getCollection("config.collections").findOne({_id: ns});
this.collUuid = coll.uuid;
this.shardName = this.st.shard0.shardName;
this.lsid = assert.commandWorked(this.st.s.getDB("admin").runCommand({startSession: 1})).id;
});
after(function () {
this.st.stop();
});
const run = (st, ns, shardName, lsid, txnNumber) =>
runUntilNonRetryable(st, {
_configsvrCommitMergeAllChunksOnShard: ns,
shard: shardName,
maxNumberOfChunksToMerge: kIntMax,
maxTimeProcessingChunksMS: kIntMax,
lsid: lsid,
txnNumber: txnNumber,
writeConcern: {w: "majority"},
});
it("succeeds on the authoritative branch and records the dummy bookkeeping write", function () {
const chunkCountBefore = countChunks(this.st, this.collUuid);
assert.gte(chunkCountBefore, 1, "expected at least one pre-split chunk", {ns});
assert.commandWorked(run(this.st, ns, this.shardName, this.lsid, NumberLong(1)));
// The catalog manager may merge zero or more chunks depending on the snapshot history
// window; either way the chunk count must not grow.
const chunkCountAfter = countChunks(this.st, this.collUuid);
assert.lte(chunkCountAfter, chunkCountBefore, "chunk count must not grow after mergeAllChunksOnShard", {
chunkCountBefore,
chunkCountAfter,
});
const dummy = getDummyDoc(this.st, dummyDocId);
assert(dummy, "expected dummy bookkeeping doc to be present", {dummyDocId});
assert.gte(dummy.count, 1, "expected dummy doc count >= 1", {dummy});
});
it("rejects an older txnNumber on the same lsid with TransactionTooOld", function () {
assert.commandFailedWithCode(
run(this.st, ns, this.shardName, this.lsid, NumberLong(0)),
ErrorCodes.TransactionTooOld,
);
});
it("is idempotent when re-invoked with the same lsid and txnNumber", function () {
const chunkCountBefore = countChunks(this.st, this.collUuid);
const dummyBefore = getDummyDoc(this.st, dummyDocId);
assert.commandWorked(run(this.st, ns, this.shardName, this.lsid, NumberLong(1)));
const chunkCountAfter = countChunks(this.st, this.collUuid);
const dummyAfter = getDummyDoc(this.st, dummyDocId);
assert.eq(
chunkCountBefore,
chunkCountAfter,
"mergeAllChunksOnShard must not be re-applied for the same txnNumber",
{chunkCountBefore, chunkCountAfter},
);
assert.eq(
dummyBefore.count,
dummyAfter.count,
"dummy bookkeeping count must not advance on a retried txnNumber",
{dummyBefore, dummyAfter},
);
});
});

View File

@ -35,17 +35,22 @@
#include "mongo/db/auth/resource_pattern.h"
#include "mongo/db/commands.h"
#include "mongo/db/database_name.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/global_catalog/ddl/merge_chunk_request_gen.h"
#include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h"
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_level.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/sharding_environment/grid.h"
#include "mongo/db/topology/cluster_role.h"
#include "mongo/db/version_context.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/cancellation.h"
#include "mongo/util/str.h"
#include <memory>
@ -84,6 +89,10 @@ public:
return true;
}
bool supportsRetryableWrite() const final {
return true;
}
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
@ -94,17 +103,71 @@ public:
<< " can only be run on the config server",
serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer));
// Set read concern level to local for reads into the config database
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
// Mark opCtx as interruptible to ensure that all reads and writes to the metadata
// collections under the exclusive _kChunkOpLock happen on the same term.
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
auto response =
uassertStatusOK(ShardingCatalogManager::get(opCtx)->commitMergeAllChunksOnShard(
opCtx,
ns(),
request().getShard(),
request().getMaxNumberOfChunksToMerge(),
request().getMaxTimeProcessingChunksMS()));
const bool isAuthoritative =
sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) !=
AuthoritativeMetadataAccessLevelEnum::kNone;
std::pair<ShardingCatalogManager::ShardAndCollectionPlacementVersions, int> response;
if (!isAuthoritative) {
// Legacy non-authoritative path: keep the pre-existing behavior. No ACR and no
// dummy write.
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
response =
uassertStatusOK(ShardingCatalogManager::get(opCtx)->commitMergeAllChunksOnShard(
opCtx,
ns(),
request().getShard(),
request().getMaxNumberOfChunksToMerge(),
request().getMaxTimeProcessingChunksMS()));
} else {
// Authoritative path. The originating chunk-op coordinator on the data-bearing
// shard attaches a session id and is responsible for installing the new chunk
// layout into the local shard catalog after this command returns. Use ACR so the
// session id stays held while the catalog updates run; the trailing dummy write
// below bumps the txnNumber on the oplog so that an older request on the same
// session cannot replay onto a newer state.
{
auto newClient = opCtx->getServiceContext()->getService()->makeClient(
"CommitMergeAllChunksOnShard");
AlternativeClientRegion acr(newClient);
auto executor = Grid::get(opCtx->getServiceContext())
->getExecutorPool()
->getFixedExecutor();
auto newOpCtxPtr = CancelableOperationContext(
cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
AuthorizationSession::get(newOpCtxPtr.get()->getClient())
->grantInternalAuthorization();
newOpCtxPtr->setWriteConcern(opCtx->getWriteConcern());
repl::ReadConcernArgs::get(newOpCtxPtr.get()) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
response = uassertStatusOK(ShardingCatalogManager::get(newOpCtxPtr.get())
->commitMergeAllChunksOnShard(
newOpCtxPtr.get(),
ns(),
request().getShard(),
request().getMaxNumberOfChunksToMerge(),
request().getMaxTimeProcessingChunksMS()));
}
// No write happened on this txnNumber in the parent opCtx, so make a dummy write to
// protect against older requests with old txnNumbers being replayed.
DBDirectClient client(opCtx);
client.update(NamespaceString::kServerConfigurationNamespace,
BSON("_id" << "commitMergeAllChunksOnShardStats"),
BSON("$inc" << BSON("count" << 1)),
true /* upsert */,
false /* multi */);
}
const auto& [shardAndCollVers, numMergedChunks] = response;
return MergeAllChunksOnShardResponse{shardAndCollVers.shardPlacementVersion,

View File

@ -33,17 +33,22 @@
#include "mongo/db/auth/resource_pattern.h"
#include "mongo/db/commands.h"
#include "mongo/db/database_name.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/global_catalog/ddl/merge_chunk_request_gen.h"
#include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h"
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_level.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/sharding_environment/grid.h"
#include "mongo/db/topology/cluster_role.h"
#include "mongo/db/version_context.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/cancellation.h"
#include <memory>
#include <string>
@ -88,6 +93,10 @@ public:
return true;
}
bool supportsRetryableWrite() const final {
return true;
}
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
@ -100,19 +109,76 @@ public:
"invalid namespace specified for request",
ns().isValid());
// Set the operation context read concern level to local for reads into the config
// database.
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
// Mark opCtx as interruptible to ensure that all reads and writes to the metadata
// collections under the exclusive _kChunkOpLock happen on the same term.
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
const bool isAuthoritative =
sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) !=
AuthoritativeMetadataAccessLevelEnum::kNone;
ShardingCatalogManager::ShardAndCollectionPlacementVersions shardAndCollVers;
if (!isAuthoritative) {
// Legacy non-authoritative path: keep the pre-existing behavior. No ACR and no
// dummy write.
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
shardAndCollVers =
uassertStatusOK(ShardingCatalogManager::get(opCtx)->commitChunksMerge(
opCtx,
ns(),
request().getEpoch(),
request().getTimestamp(),
request().getCollectionUUID(),
request().getChunkRange(),
request().getShard()));
} else {
// Authoritative path. The originating chunk-op coordinator on the data-bearing
// shard attaches a session id and is responsible for installing the new chunk
// layout into the local shard catalog after this command returns. Use ACR so the
// session id stays held while the catalog updates run; the trailing dummy write
// below bumps the txnNumber on the oplog so that an older request on the same
// session cannot replay onto a newer state.
{
auto newClient =
opCtx->getServiceContext()->getService()->makeClient("CommitChunksMerge");
AlternativeClientRegion acr(newClient);
auto executor = Grid::get(opCtx->getServiceContext())
->getExecutorPool()
->getFixedExecutor();
auto newOpCtxPtr = CancelableOperationContext(
cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
AuthorizationSession::get(newOpCtxPtr.get()->getClient())
->grantInternalAuthorization();
newOpCtxPtr->setWriteConcern(opCtx->getWriteConcern());
repl::ReadConcernArgs::get(newOpCtxPtr.get()) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
shardAndCollVers =
uassertStatusOK(ShardingCatalogManager::get(newOpCtxPtr.get())
->commitChunksMerge(newOpCtxPtr.get(),
ns(),
request().getEpoch(),
request().getTimestamp(),
request().getCollectionUUID(),
request().getChunkRange(),
request().getShard()));
}
// No write happened on this txnNumber in the parent opCtx, so make a dummy write to
// protect against older requests with old txnNumbers being replayed.
DBDirectClient client(opCtx);
client.update(NamespaceString::kServerConfigurationNamespace,
BSON("_id" << "commitChunksMergeStats"),
BSON("$inc" << BSON("count" << 1)),
true /* upsert */,
false /* multi */);
}
const auto shardAndCollVers = uassertStatusOK(
ShardingCatalogManager::get(opCtx)->commitChunksMerge(opCtx,
ns(),
request().getEpoch(),
request().getTimestamp(),
request().getCollectionUUID(),
request().getChunkRange(),
request().getShard()));
return ConfigSvrMergeResponse{shardAndCollVers.shardPlacementVersion};
}

View File

@ -38,7 +38,9 @@
#include "mongo/db/auth/resource_pattern.h"
#include "mongo/db/commands.h"
#include "mongo/db/database_name.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h"
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
#include "mongo/db/global_catalog/ddl/split_chunk_request_type.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/namespace_string_util.h"
@ -47,9 +49,12 @@
#include "mongo/db/repl/read_concern_level.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/sharding_environment/grid.h"
#include "mongo/db/topology/cluster_role.h"
#include "mongo/db/version_context.h"
#include "mongo/db/versioning_protocol/chunk_version.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/cancellation.h"
#include <string>
@ -106,6 +111,10 @@ public:
return true;
}
bool supportsRetryableWrite() const final {
return true;
}
Status checkAuthForOperation(OperationContext* opCtx,
const DatabaseName& dbName,
const BSONObj&) const override {
@ -132,20 +141,75 @@ public:
"_configsvrCommitChunkSplit can only be run on config servers",
serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer));
// Set the operation context read concern level to local for reads into the config database.
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
auto parsedRequest = uassertStatusOK(SplitChunkRequest::parseFromConfigCommand(cmdObj));
auto shardAndCollVers = uassertStatusOK(
ShardingCatalogManager::get(opCtx)->commitChunkSplit(opCtx,
parsedRequest.getNamespace(),
parsedRequest.getEpoch(),
parsedRequest.getTimestamp(),
parsedRequest.getChunkRange(),
parsedRequest.getSplitPoints(),
parsedRequest.getShardName()));
// Mark opCtx as interruptible to ensure that all reads and writes to the metadata
// collections under the exclusive _kChunkOpLock happen on the same term.
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
const bool isAuthoritative =
sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) !=
AuthoritativeMetadataAccessLevelEnum::kNone;
ShardingCatalogManager::ShardAndCollectionPlacementVersions shardAndCollVers;
if (!isAuthoritative) {
// Legacy non-authoritative path: keep the pre-existing behavior. No ACR and no
// dummy write.
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
shardAndCollVers = uassertStatusOK(
ShardingCatalogManager::get(opCtx)->commitChunkSplit(opCtx,
parsedRequest.getNamespace(),
parsedRequest.getEpoch(),
parsedRequest.getTimestamp(),
parsedRequest.getChunkRange(),
parsedRequest.getSplitPoints(),
parsedRequest.getShardName()));
} else {
// Authoritative path. The originating chunk-op coordinator on the data-bearing shard
// attaches a session id and is responsible for installing the new chunk layout into the
// local shard catalog after this command returns. Use ACR so the session id stays held
// while the catalog updates run; the trailing dummy write below bumps the txnNumber on
// the oplog so that an older request on the same session cannot replay onto a newer
// state.
{
auto newClient =
opCtx->getServiceContext()->getService()->makeClient("CommitChunkSplit");
AlternativeClientRegion acr(newClient);
auto executor =
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
auto newOpCtxPtr = CancelableOperationContext(
cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
AuthorizationSession::get(newOpCtxPtr.get()->getClient())
->grantInternalAuthorization();
newOpCtxPtr->setWriteConcern(opCtx->getWriteConcern());
repl::ReadConcernArgs::get(newOpCtxPtr.get()) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
shardAndCollVers =
uassertStatusOK(ShardingCatalogManager::get(newOpCtxPtr.get())
->commitChunkSplit(newOpCtxPtr.get(),
parsedRequest.getNamespace(),
parsedRequest.getEpoch(),
parsedRequest.getTimestamp(),
parsedRequest.getChunkRange(),
parsedRequest.getSplitPoints(),
parsedRequest.getShardName()));
}
// No write happened on this txnNumber in the parent opCtx, so make a dummy write to
// protect against older requests with old txnNumbers being replayed.
DBDirectClient client(opCtx);
client.update(NamespaceString::kServerConfigurationNamespace,
BSON("_id" << "commitChunkSplitStats"),
BSON("$inc" << BSON("count" << 1)),
true /* upsert */,
false /* multi */);
}
shardAndCollVers.collectionPlacementVersion.serialize(kCollectionVersionField, &result);
shardAndCollVers.shardPlacementVersion.serialize(ChunkVersion::kChunkVersionField, &result);

View File

@ -839,6 +839,11 @@ ShardingCatalogManager::_splitChunkInTransaction(OperationContext* opCtx,
return splitChunkResult;
}
// Invariant: this method only performs config-server-local work (reads and writes against
// config.collections, config.chunks and config.changelog, plus client bookkeeping). It must not
// add any shard-side interaction. Authoritative chunk-op coordinators rely on this property: the
// shard-side install (oplog invalidation + CSR refresh) is driven by the coordinator on the
// originating shard, not from here.
StatusWith<ShardingCatalogManager::ShardAndCollectionPlacementVersions>
ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
const NamespaceString& nss,
@ -847,11 +852,6 @@ ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
const ChunkRange& range,
const std::vector<BSONObj>& splitPoints,
const std::string& shardName) {
// Mark opCtx as interruptible to ensure that all reads and writes to the metadata collections
// under the exclusive _kChunkOpLock happen on the same term.
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
// Take _kChunkOpLock in exclusive mode to prevent concurrent chunk modifications and generate
// strictly monotonously increasing collection placement versions
Lock::ExclusiveLock lk(opCtx, _kChunkOpLock);
@ -1074,6 +1074,11 @@ void ShardingCatalogManager::_mergeChunksInTransaction(
txn.run(opCtx, updateChunksFn);
}
// Invariant: this method only performs config-server-local work (reads and writes against
// config.collections, config.chunks and config.changelog, plus client bookkeeping). It must not
// add any shard-side interaction. Authoritative chunk-op coordinators rely on this property: the
// shard-side install (oplog invalidation + CSR refresh) is driven by the coordinator on the
// originating shard, not from here.
StatusWith<ShardingCatalogManager::ShardAndCollectionPlacementVersions>
ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx,
const NamespaceString& nss,
@ -1082,11 +1087,6 @@ ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx,
const UUID& requestCollectionUUID,
const ChunkRange& chunkRange,
const ShardId& shardId) {
// Mark opCtx as interruptible to ensure that all reads and writes to the metadata collections
// under the exclusive _kChunkOpLock happen on the same term.
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
// Take _kChunkOpLock in exclusive mode to prevent concurrent chunk modifications and generate
// strictly monotonously increasing collection placement versions
Lock::ExclusiveLock lk(opCtx, _kChunkOpLock);
@ -1228,16 +1228,17 @@ ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx,
mergeVersion /*collectionPlacementVersion*/};
}
// Invariant: this method only performs config-server-local work (reads and writes against
// config.collections, config.chunks, config.tags and config.changelog, plus client bookkeeping).
// It must not add any shard-side interaction. Authoritative chunk-op coordinators rely on this
// property: the shard-side install (oplog invalidation + CSR refresh) is driven by the
// coordinator on the originating shard, not from here.
StatusWith<std::pair<ShardingCatalogManager::ShardAndCollectionPlacementVersions, int>>
ShardingCatalogManager::commitMergeAllChunksOnShard(OperationContext* opCtx,
const NamespaceString& nss,
const ShardId& shardId,
int maxNumberOfChunksToMerge,
int maxTimeProcessingChunksMS) {
// Mark opCtx as interruptible to ensure that all reads and writes to the metadata collections
// under the exclusive _kChunkOpLock happen on the same term.
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
// Retry the commit a fixed number of times before failing: the discovery of chunks to merge
// happens before acquiring the `_kChunkOpLock` in order not to block for too long concurrent
// chunk operations. This implies that other chunk operations for the same collection could