From b0bcbc2c5aa72be0ebb6464b7e80c4d1c22f5ddf Mon Sep 17 00:00:00 2001 From: Silvia Surroca Date: Thu, 21 May 2026 17:41:10 +0200 Subject: [PATCH] SERVER-127053 Prepare configsvr chunk opertion's commands to commit authoritatively (#54088) GitOrigin-RevId: abc90243e6d0865aa7bdd882f6d4e76152bbadd9 --- ...nfigsvr_commit_chunk_operation_commands.js | 334 ++++++++++++++++++ ...gsvr_merge_all_chunks_on_shard_command.cpp | 83 ++++- .../ddl/configsvr_merge_chunks_command.cpp | 90 ++++- .../ddl/configsvr_split_chunk_command.cpp | 88 ++++- ...rding_catalog_manager_chunk_operations.cpp | 29 +- 5 files changed, 576 insertions(+), 48 deletions(-) create mode 100644 jstests/noPassthrough/ddl/configsvr_commit_chunk_operation_commands.js diff --git a/jstests/noPassthrough/ddl/configsvr_commit_chunk_operation_commands.js b/jstests/noPassthrough/ddl/configsvr_commit_chunk_operation_commands.js new file mode 100644 index 00000000000..107ce5ee8bf --- /dev/null +++ b/jstests/noPassthrough/ddl/configsvr_commit_chunk_operation_commands.js @@ -0,0 +1,334 @@ +/** + * Tests retryable-write replay protection on the authoritative path for the three configsvr chunk + * commands: `_configsvrCommitChunkSplit`, `_configsvrCommitChunksMerge`, and + * `_configsvrCommitMergeAllChunksOnShard`. + * + * Each command wraps its catalog-manager call in an AlternativeClientRegion and follows it with a + * dummy write to NamespaceString::kServerConfigurationNamespace so that an older request on the + * same session cannot replay the operation onto a newer state. + * + * For `_configsvrCommitMergeAllChunksOnShard`, the catalog manager only merges chunks whose + * `onCurrentShardSince` is older than the snapshot history window; the test does not assume a real + * merge happened. The signals under test are the dummy bookkeeping write, the `TransactionTooOld` + * response for stale txnNumbers, and idempotency for retried txnNumbers. + * + * @tags: [ + * featureFlagAuthoritativeShardsDDL, + * ] + */ +import {after, before, describe, it} from "jstests/libs/mochalite.js"; +import {RetryableWritesUtil} from "jstests/libs/retryable_writes_util.js"; +import {ShardingTest} from "jstests/libs/shardingtest.js"; + +function countChunks(st, uuid) { + return st.s.getCollection("config.chunks").countDocuments({uuid: uuid}); +} + +function getDummyDoc(st, id) { + return st.configRS.getPrimary().getDB("admin").system.version.findOne({_id: id}); +} + +// Retries the command until a non-retryable response is received, then returns it. +function runUntilNonRetryable(st, cmd) { + let res; + assert.soon(() => { + res = st.configRS.getPrimary().adminCommand(cmd); + if ( + RetryableWritesUtil.isRetryableCode(res.code) || + RetryableWritesUtil.errmsgContainsRetryableCodeName(res.errmsg) || + (res.writeConcernError && RetryableWritesUtil.isRetryableCode(res.writeConcernError.code)) + ) { + return false; + } + return true; + }); + return res; +} + +describe("_configsvrCommitChunkSplit retryability", function () { + const dbName = "test"; + const collName = "foo"; + const ns = dbName + "." + collName; + const dummyDocId = "commitChunkSplitStats"; + + before(function () { + this.st = new ShardingTest({shards: 1}); + assert.commandWorked(this.st.s.adminCommand({shardCollection: ns, key: {x: 1}})); + + const coll = this.st.s.getCollection("config.collections").findOne({_id: ns}); + this.collEpoch = coll.lastmodEpoch; + this.collTimestamp = coll.timestamp; + this.collUuid = coll.uuid; + this.shardName = this.st.shard0.shardName; + + this.lsid = assert.commandWorked(this.st.s.getDB("admin").runCommand({startSession: 1})).id; + }); + + after(function () { + this.st.stop(); + }); + + const run = (st, ns, epoch, timestamp, shardName, lsid, txnNumber) => + runUntilNonRetryable(st, { + _configsvrCommitChunkSplit: ns, + collEpoch: epoch, + collTimestamp: timestamp, + min: {x: MinKey}, + max: {x: MaxKey}, + splitPoints: [{x: 0}], + shard: shardName, + lsid: lsid, + txnNumber: txnNumber, + writeConcern: {w: "majority"}, + }); + + it("splits a chunk in the authoritative branch and records the dummy bookkeeping write", function () { + assert.eq(countChunks(this.st, this.collUuid), 1, "expected single initial chunk", {ns}); + + assert.commandWorked( + run(this.st, ns, this.collEpoch, this.collTimestamp, this.shardName, this.lsid, NumberLong(1)), + ); + + assert.eq(countChunks(this.st, this.collUuid), 2, "expected two chunks after split"); + + const dummy = getDummyDoc(this.st, dummyDocId); + assert(dummy, "expected dummy bookkeeping doc to be present", {dummyDocId}); + assert.gte(dummy.count, 1, "expected dummy doc count >= 1", {dummy}); + }); + + it("rejects an older txnNumber on the same lsid with TransactionTooOld", function () { + assert.commandFailedWithCode( + run(this.st, ns, this.collEpoch, this.collTimestamp, this.shardName, this.lsid, NumberLong(0)), + ErrorCodes.TransactionTooOld, + ); + }); + + it("is idempotent when re-invoked with the same lsid and txnNumber", function () { + const chunkCountBefore = countChunks(this.st, this.collUuid); + const dummyBefore = getDummyDoc(this.st, dummyDocId); + + assert.commandWorked( + run(this.st, ns, this.collEpoch, this.collTimestamp, this.shardName, this.lsid, NumberLong(1)), + ); + + const chunkCountAfter = countChunks(this.st, this.collUuid); + const dummyAfter = getDummyDoc(this.st, dummyDocId); + + assert.eq(chunkCountBefore, chunkCountAfter, "split must not be re-applied for the same txnNumber", { + chunkCountBefore, + chunkCountAfter, + }); + assert.eq( + dummyBefore.count, + dummyAfter.count, + "dummy bookkeeping count must not advance on a retried txnNumber", + {dummyBefore, dummyAfter}, + ); + }); +}); + +describe("_configsvrCommitChunksMerge retryability", function () { + const dbName = "test"; + const collName = "foo"; + const ns = dbName + "." + collName; + const dummyDocId = "commitChunksMergeStats"; + + // Range covering the two upper chunks {[0, 100), [100, MaxKey)} so a successful merge + // collapses them into a single chunk [0, MaxKey). + const mergeRange = {min: {x: 0}, max: {x: MaxKey}}; + + before(function () { + this.st = new ShardingTest({shards: 1}); + assert.commandWorked(this.st.s.adminCommand({shardCollection: ns, key: {x: 1}})); + + // Pre-split into three chunks: [MinKey, 0), [0, 100), [100, MaxKey). + assert.commandWorked(this.st.s.adminCommand({split: ns, middle: {x: 0}})); + assert.commandWorked(this.st.s.adminCommand({split: ns, middle: {x: 100}})); + + const coll = this.st.s.getCollection("config.collections").findOne({_id: ns}); + this.collEpoch = coll.lastmodEpoch; + this.collTimestamp = coll.timestamp; + this.collUuid = coll.uuid; + this.shardName = this.st.shard0.shardName; + + this.lsid = assert.commandWorked(this.st.s.getDB("admin").runCommand({startSession: 1})).id; + }); + + after(function () { + this.st.stop(); + }); + + const run = (st, ns, shardName, collUuid, epoch, timestamp, lsid, txnNumber) => + runUntilNonRetryable(st, { + _configsvrCommitChunksMerge: ns, + shard: shardName, + collUUID: {uuid: collUuid}, + chunkRange: mergeRange, + epoch: epoch, + timestamp: timestamp, + lsid: lsid, + txnNumber: txnNumber, + writeConcern: {w: "majority"}, + }); + + it("merges a range in the authoritative branch and records the dummy bookkeeping write", function () { + assert.eq(countChunks(this.st, this.collUuid), 3, "expected three pre-split chunks", {ns}); + + assert.commandWorked( + run( + this.st, + ns, + this.shardName, + this.collUuid, + this.collEpoch, + this.collTimestamp, + this.lsid, + NumberLong(1), + ), + ); + + assert.eq(countChunks(this.st, this.collUuid), 2, "expected two chunks after merge"); + + const dummy = getDummyDoc(this.st, dummyDocId); + assert(dummy, "expected dummy bookkeeping doc to be present", {dummyDocId}); + assert.gte(dummy.count, 1, "expected dummy doc count >= 1", {dummy}); + }); + + it("rejects an older txnNumber on the same lsid with TransactionTooOld", function () { + assert.commandFailedWithCode( + run( + this.st, + ns, + this.shardName, + this.collUuid, + this.collEpoch, + this.collTimestamp, + this.lsid, + NumberLong(0), + ), + ErrorCodes.TransactionTooOld, + ); + }); + + it("is idempotent when re-invoked with the same lsid and txnNumber", function () { + const chunkCountBefore = countChunks(this.st, this.collUuid); + const dummyBefore = getDummyDoc(this.st, dummyDocId); + + assert.commandWorked( + run( + this.st, + ns, + this.shardName, + this.collUuid, + this.collEpoch, + this.collTimestamp, + this.lsid, + NumberLong(1), + ), + ); + + const chunkCountAfter = countChunks(this.st, this.collUuid); + const dummyAfter = getDummyDoc(this.st, dummyDocId); + + assert.eq(chunkCountBefore, chunkCountAfter, "merge must not be re-applied for the same txnNumber", { + chunkCountBefore, + chunkCountAfter, + }); + assert.eq( + dummyBefore.count, + dummyAfter.count, + "dummy bookkeeping count must not advance on a retried txnNumber", + {dummyBefore, dummyAfter}, + ); + }); +}); + +describe("_configsvrCommitMergeAllChunksOnShard retryability", function () { + const dbName = "test"; + const collName = "foo"; + const ns = dbName + "." + collName; + const dummyDocId = "commitMergeAllChunksOnShardStats"; + + const kIntMax = NumberInt(2147483647); + + before(function () { + this.st = new ShardingTest({shards: 1}); + assert.commandWorked(this.st.s.adminCommand({shardCollection: ns, key: {x: 1}})); + + // Pre-split into four chunks on shard0 so that, if the catalog manager finds any of them + // mergeable, the merge has something to do. The test does not assume a successful merge. + for (const middle of [{x: 0}, {x: 100}, {x: 200}]) { + assert.commandWorked(this.st.s.adminCommand({split: ns, middle: middle})); + } + + const coll = this.st.s.getCollection("config.collections").findOne({_id: ns}); + this.collUuid = coll.uuid; + this.shardName = this.st.shard0.shardName; + + this.lsid = assert.commandWorked(this.st.s.getDB("admin").runCommand({startSession: 1})).id; + }); + + after(function () { + this.st.stop(); + }); + + const run = (st, ns, shardName, lsid, txnNumber) => + runUntilNonRetryable(st, { + _configsvrCommitMergeAllChunksOnShard: ns, + shard: shardName, + maxNumberOfChunksToMerge: kIntMax, + maxTimeProcessingChunksMS: kIntMax, + lsid: lsid, + txnNumber: txnNumber, + writeConcern: {w: "majority"}, + }); + + it("succeeds on the authoritative branch and records the dummy bookkeeping write", function () { + const chunkCountBefore = countChunks(this.st, this.collUuid); + assert.gte(chunkCountBefore, 1, "expected at least one pre-split chunk", {ns}); + + assert.commandWorked(run(this.st, ns, this.shardName, this.lsid, NumberLong(1))); + + // The catalog manager may merge zero or more chunks depending on the snapshot history + // window; either way the chunk count must not grow. + const chunkCountAfter = countChunks(this.st, this.collUuid); + assert.lte(chunkCountAfter, chunkCountBefore, "chunk count must not grow after mergeAllChunksOnShard", { + chunkCountBefore, + chunkCountAfter, + }); + + const dummy = getDummyDoc(this.st, dummyDocId); + assert(dummy, "expected dummy bookkeeping doc to be present", {dummyDocId}); + assert.gte(dummy.count, 1, "expected dummy doc count >= 1", {dummy}); + }); + + it("rejects an older txnNumber on the same lsid with TransactionTooOld", function () { + assert.commandFailedWithCode( + run(this.st, ns, this.shardName, this.lsid, NumberLong(0)), + ErrorCodes.TransactionTooOld, + ); + }); + + it("is idempotent when re-invoked with the same lsid and txnNumber", function () { + const chunkCountBefore = countChunks(this.st, this.collUuid); + const dummyBefore = getDummyDoc(this.st, dummyDocId); + + assert.commandWorked(run(this.st, ns, this.shardName, this.lsid, NumberLong(1))); + + const chunkCountAfter = countChunks(this.st, this.collUuid); + const dummyAfter = getDummyDoc(this.st, dummyDocId); + + assert.eq( + chunkCountBefore, + chunkCountAfter, + "mergeAllChunksOnShard must not be re-applied for the same txnNumber", + {chunkCountBefore, chunkCountAfter}, + ); + assert.eq( + dummyBefore.count, + dummyAfter.count, + "dummy bookkeeping count must not advance on a retried txnNumber", + {dummyBefore, dummyAfter}, + ); + }); +}); diff --git a/src/mongo/db/global_catalog/ddl/configsvr_merge_all_chunks_on_shard_command.cpp b/src/mongo/db/global_catalog/ddl/configsvr_merge_all_chunks_on_shard_command.cpp index 0fd5d32441d..d826db16a27 100644 --- a/src/mongo/db/global_catalog/ddl/configsvr_merge_all_chunks_on_shard_command.cpp +++ b/src/mongo/db/global_catalog/ddl/configsvr_merge_all_chunks_on_shard_command.cpp @@ -35,17 +35,22 @@ #include "mongo/db/auth/resource_pattern.h" #include "mongo/db/commands.h" #include "mongo/db/database_name.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/global_catalog/ddl/merge_chunk_request_gen.h" #include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h" +#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_level.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" +#include "mongo/db/sharding_environment/grid.h" #include "mongo/db/topology/cluster_role.h" +#include "mongo/db/version_context.h" #include "mongo/rpc/op_msg.h" #include "mongo/util/assert_util.h" +#include "mongo/util/cancellation.h" #include "mongo/util/str.h" #include @@ -84,6 +89,10 @@ public: return true; } + bool supportsRetryableWrite() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; @@ -94,17 +103,71 @@ public: << " can only be run on the config server", serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer)); - // Set read concern level to local for reads into the config database - repl::ReadConcernArgs::get(opCtx) = - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + // Mark opCtx as interruptible to ensure that all reads and writes to the metadata + // collections under the exclusive _kChunkOpLock happen on the same term. + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - auto response = - uassertStatusOK(ShardingCatalogManager::get(opCtx)->commitMergeAllChunksOnShard( - opCtx, - ns(), - request().getShard(), - request().getMaxNumberOfChunksToMerge(), - request().getMaxTimeProcessingChunksMS())); + const bool isAuthoritative = + sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel( + VersionContext::getDecoration(opCtx), + serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) != + AuthoritativeMetadataAccessLevelEnum::kNone; + + std::pair response; + if (!isAuthoritative) { + // Legacy non-authoritative path: keep the pre-existing behavior. No ACR and no + // dummy write. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + response = + uassertStatusOK(ShardingCatalogManager::get(opCtx)->commitMergeAllChunksOnShard( + opCtx, + ns(), + request().getShard(), + request().getMaxNumberOfChunksToMerge(), + request().getMaxTimeProcessingChunksMS())); + } else { + // Authoritative path. The originating chunk-op coordinator on the data-bearing + // shard attaches a session id and is responsible for installing the new chunk + // layout into the local shard catalog after this command returns. Use ACR so the + // session id stays held while the catalog updates run; the trailing dummy write + // below bumps the txnNumber on the oplog so that an older request on the same + // session cannot replay onto a newer state. + { + auto newClient = opCtx->getServiceContext()->getService()->makeClient( + "CommitMergeAllChunksOnShard"); + AlternativeClientRegion acr(newClient); + auto executor = Grid::get(opCtx->getServiceContext()) + ->getExecutorPool() + ->getFixedExecutor(); + auto newOpCtxPtr = CancelableOperationContext( + cc().makeOperationContext(), opCtx->getCancellationToken(), executor); + + AuthorizationSession::get(newOpCtxPtr.get()->getClient()) + ->grantInternalAuthorization(); + newOpCtxPtr->setWriteConcern(opCtx->getWriteConcern()); + repl::ReadConcernArgs::get(newOpCtxPtr.get()) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + response = uassertStatusOK(ShardingCatalogManager::get(newOpCtxPtr.get()) + ->commitMergeAllChunksOnShard( + newOpCtxPtr.get(), + ns(), + request().getShard(), + request().getMaxNumberOfChunksToMerge(), + request().getMaxTimeProcessingChunksMS())); + } + + // No write happened on this txnNumber in the parent opCtx, so make a dummy write to + // protect against older requests with old txnNumbers being replayed. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace, + BSON("_id" << "commitMergeAllChunksOnShardStats"), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } const auto& [shardAndCollVers, numMergedChunks] = response; return MergeAllChunksOnShardResponse{shardAndCollVers.shardPlacementVersion, diff --git a/src/mongo/db/global_catalog/ddl/configsvr_merge_chunks_command.cpp b/src/mongo/db/global_catalog/ddl/configsvr_merge_chunks_command.cpp index 4f67eccf266..6bd8a4b8680 100644 --- a/src/mongo/db/global_catalog/ddl/configsvr_merge_chunks_command.cpp +++ b/src/mongo/db/global_catalog/ddl/configsvr_merge_chunks_command.cpp @@ -33,17 +33,22 @@ #include "mongo/db/auth/resource_pattern.h" #include "mongo/db/commands.h" #include "mongo/db/database_name.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/global_catalog/ddl/merge_chunk_request_gen.h" #include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h" +#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_level.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" +#include "mongo/db/sharding_environment/grid.h" #include "mongo/db/topology/cluster_role.h" +#include "mongo/db/version_context.h" #include "mongo/rpc/op_msg.h" #include "mongo/util/assert_util.h" +#include "mongo/util/cancellation.h" #include #include @@ -88,6 +93,10 @@ public: return true; } + bool supportsRetryableWrite() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; @@ -100,19 +109,76 @@ public: "invalid namespace specified for request", ns().isValid()); - // Set the operation context read concern level to local for reads into the config - // database. - repl::ReadConcernArgs::get(opCtx) = - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + // Mark opCtx as interruptible to ensure that all reads and writes to the metadata + // collections under the exclusive _kChunkOpLock happen on the same term. + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + const bool isAuthoritative = + sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel( + VersionContext::getDecoration(opCtx), + serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) != + AuthoritativeMetadataAccessLevelEnum::kNone; + + ShardingCatalogManager::ShardAndCollectionPlacementVersions shardAndCollVers; + if (!isAuthoritative) { + // Legacy non-authoritative path: keep the pre-existing behavior. No ACR and no + // dummy write. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + shardAndCollVers = + uassertStatusOK(ShardingCatalogManager::get(opCtx)->commitChunksMerge( + opCtx, + ns(), + request().getEpoch(), + request().getTimestamp(), + request().getCollectionUUID(), + request().getChunkRange(), + request().getShard())); + } else { + // Authoritative path. The originating chunk-op coordinator on the data-bearing + // shard attaches a session id and is responsible for installing the new chunk + // layout into the local shard catalog after this command returns. Use ACR so the + // session id stays held while the catalog updates run; the trailing dummy write + // below bumps the txnNumber on the oplog so that an older request on the same + // session cannot replay onto a newer state. + { + auto newClient = + opCtx->getServiceContext()->getService()->makeClient("CommitChunksMerge"); + AlternativeClientRegion acr(newClient); + auto executor = Grid::get(opCtx->getServiceContext()) + ->getExecutorPool() + ->getFixedExecutor(); + auto newOpCtxPtr = CancelableOperationContext( + cc().makeOperationContext(), opCtx->getCancellationToken(), executor); + + AuthorizationSession::get(newOpCtxPtr.get()->getClient()) + ->grantInternalAuthorization(); + newOpCtxPtr->setWriteConcern(opCtx->getWriteConcern()); + repl::ReadConcernArgs::get(newOpCtxPtr.get()) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + shardAndCollVers = + uassertStatusOK(ShardingCatalogManager::get(newOpCtxPtr.get()) + ->commitChunksMerge(newOpCtxPtr.get(), + ns(), + request().getEpoch(), + request().getTimestamp(), + request().getCollectionUUID(), + request().getChunkRange(), + request().getShard())); + } + + // No write happened on this txnNumber in the parent opCtx, so make a dummy write to + // protect against older requests with old txnNumbers being replayed. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace, + BSON("_id" << "commitChunksMergeStats"), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } - const auto shardAndCollVers = uassertStatusOK( - ShardingCatalogManager::get(opCtx)->commitChunksMerge(opCtx, - ns(), - request().getEpoch(), - request().getTimestamp(), - request().getCollectionUUID(), - request().getChunkRange(), - request().getShard())); return ConfigSvrMergeResponse{shardAndCollVers.shardPlacementVersion}; } diff --git a/src/mongo/db/global_catalog/ddl/configsvr_split_chunk_command.cpp b/src/mongo/db/global_catalog/ddl/configsvr_split_chunk_command.cpp index 103cf4840bf..d9082b76e23 100644 --- a/src/mongo/db/global_catalog/ddl/configsvr_split_chunk_command.cpp +++ b/src/mongo/db/global_catalog/ddl/configsvr_split_chunk_command.cpp @@ -38,7 +38,9 @@ #include "mongo/db/auth/resource_pattern.h" #include "mongo/db/commands.h" #include "mongo/db/database_name.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h" +#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h" #include "mongo/db/global_catalog/ddl/split_chunk_request_type.h" #include "mongo/db/namespace_string.h" #include "mongo/db/namespace_string_util.h" @@ -47,9 +49,12 @@ #include "mongo/db/repl/read_concern_level.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" +#include "mongo/db/sharding_environment/grid.h" #include "mongo/db/topology/cluster_role.h" +#include "mongo/db/version_context.h" #include "mongo/db/versioning_protocol/chunk_version.h" #include "mongo/util/assert_util.h" +#include "mongo/util/cancellation.h" #include @@ -106,6 +111,10 @@ public: return true; } + bool supportsRetryableWrite() const final { + return true; + } + Status checkAuthForOperation(OperationContext* opCtx, const DatabaseName& dbName, const BSONObj&) const override { @@ -132,20 +141,75 @@ public: "_configsvrCommitChunkSplit can only be run on config servers", serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer)); - // Set the operation context read concern level to local for reads into the config database. - repl::ReadConcernArgs::get(opCtx) = - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - auto parsedRequest = uassertStatusOK(SplitChunkRequest::parseFromConfigCommand(cmdObj)); - auto shardAndCollVers = uassertStatusOK( - ShardingCatalogManager::get(opCtx)->commitChunkSplit(opCtx, - parsedRequest.getNamespace(), - parsedRequest.getEpoch(), - parsedRequest.getTimestamp(), - parsedRequest.getChunkRange(), - parsedRequest.getSplitPoints(), - parsedRequest.getShardName())); + // Mark opCtx as interruptible to ensure that all reads and writes to the metadata + // collections under the exclusive _kChunkOpLock happen on the same term. + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + const bool isAuthoritative = + sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel( + VersionContext::getDecoration(opCtx), + serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) != + AuthoritativeMetadataAccessLevelEnum::kNone; + + ShardingCatalogManager::ShardAndCollectionPlacementVersions shardAndCollVers; + if (!isAuthoritative) { + // Legacy non-authoritative path: keep the pre-existing behavior. No ACR and no + // dummy write. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + shardAndCollVers = uassertStatusOK( + ShardingCatalogManager::get(opCtx)->commitChunkSplit(opCtx, + parsedRequest.getNamespace(), + parsedRequest.getEpoch(), + parsedRequest.getTimestamp(), + parsedRequest.getChunkRange(), + parsedRequest.getSplitPoints(), + parsedRequest.getShardName())); + } else { + // Authoritative path. The originating chunk-op coordinator on the data-bearing shard + // attaches a session id and is responsible for installing the new chunk layout into the + // local shard catalog after this command returns. Use ACR so the session id stays held + // while the catalog updates run; the trailing dummy write below bumps the txnNumber on + // the oplog so that an older request on the same session cannot replay onto a newer + // state. + { + auto newClient = + opCtx->getServiceContext()->getService()->makeClient("CommitChunkSplit"); + AlternativeClientRegion acr(newClient); + auto executor = + Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + auto newOpCtxPtr = CancelableOperationContext( + cc().makeOperationContext(), opCtx->getCancellationToken(), executor); + + AuthorizationSession::get(newOpCtxPtr.get()->getClient()) + ->grantInternalAuthorization(); + newOpCtxPtr->setWriteConcern(opCtx->getWriteConcern()); + repl::ReadConcernArgs::get(newOpCtxPtr.get()) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + shardAndCollVers = + uassertStatusOK(ShardingCatalogManager::get(newOpCtxPtr.get()) + ->commitChunkSplit(newOpCtxPtr.get(), + parsedRequest.getNamespace(), + parsedRequest.getEpoch(), + parsedRequest.getTimestamp(), + parsedRequest.getChunkRange(), + parsedRequest.getSplitPoints(), + parsedRequest.getShardName())); + } + + // No write happened on this txnNumber in the parent opCtx, so make a dummy write to + // protect against older requests with old txnNumbers being replayed. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace, + BSON("_id" << "commitChunkSplitStats"), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } shardAndCollVers.collectionPlacementVersion.serialize(kCollectionVersionField, &result); shardAndCollVers.shardPlacementVersion.serialize(ChunkVersion::kChunkVersionField, &result); diff --git a/src/mongo/db/global_catalog/ddl/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/global_catalog/ddl/sharding_catalog_manager_chunk_operations.cpp index a08d4b7d995..e697c8fedef 100644 --- a/src/mongo/db/global_catalog/ddl/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/global_catalog/ddl/sharding_catalog_manager_chunk_operations.cpp @@ -839,6 +839,11 @@ ShardingCatalogManager::_splitChunkInTransaction(OperationContext* opCtx, return splitChunkResult; } +// Invariant: this method only performs config-server-local work (reads and writes against +// config.collections, config.chunks and config.changelog, plus client bookkeeping). It must not +// add any shard-side interaction. Authoritative chunk-op coordinators rely on this property: the +// shard-side install (oplog invalidation + CSR refresh) is driven by the coordinator on the +// originating shard, not from here. StatusWith ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, const NamespaceString& nss, @@ -847,11 +852,6 @@ ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, const ChunkRange& range, const std::vector& splitPoints, const std::string& shardName) { - - // Mark opCtx as interruptible to ensure that all reads and writes to the metadata collections - // under the exclusive _kChunkOpLock happen on the same term. - opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk modifications and generate // strictly monotonously increasing collection placement versions Lock::ExclusiveLock lk(opCtx, _kChunkOpLock); @@ -1074,6 +1074,11 @@ void ShardingCatalogManager::_mergeChunksInTransaction( txn.run(opCtx, updateChunksFn); } +// Invariant: this method only performs config-server-local work (reads and writes against +// config.collections, config.chunks and config.changelog, plus client bookkeeping). It must not +// add any shard-side interaction. Authoritative chunk-op coordinators rely on this property: the +// shard-side install (oplog invalidation + CSR refresh) is driven by the coordinator on the +// originating shard, not from here. StatusWith ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx, const NamespaceString& nss, @@ -1082,11 +1087,6 @@ ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx, const UUID& requestCollectionUUID, const ChunkRange& chunkRange, const ShardId& shardId) { - - // Mark opCtx as interruptible to ensure that all reads and writes to the metadata collections - // under the exclusive _kChunkOpLock happen on the same term. - opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk modifications and generate // strictly monotonously increasing collection placement versions Lock::ExclusiveLock lk(opCtx, _kChunkOpLock); @@ -1228,16 +1228,17 @@ ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx, mergeVersion /*collectionPlacementVersion*/}; } +// Invariant: this method only performs config-server-local work (reads and writes against +// config.collections, config.chunks, config.tags and config.changelog, plus client bookkeeping). +// It must not add any shard-side interaction. Authoritative chunk-op coordinators rely on this +// property: the shard-side install (oplog invalidation + CSR refresh) is driven by the +// coordinator on the originating shard, not from here. StatusWith> ShardingCatalogManager::commitMergeAllChunksOnShard(OperationContext* opCtx, const NamespaceString& nss, const ShardId& shardId, int maxNumberOfChunksToMerge, int maxTimeProcessingChunksMS) { - // Mark opCtx as interruptible to ensure that all reads and writes to the metadata collections - // under the exclusive _kChunkOpLock happen on the same term. - opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - // Retry the commit a fixed number of times before failing: the discovery of chunks to merge // happens before acquiring the `_kChunkOpLock` in order not to block for too long concurrent // chunk operations. This implies that other chunk operations for the same collection could