SERVER-121180 Implement mergeAllChunksOnShard as coordinator (#52347)

Co-authored-by: name <email>
GitOrigin-RevId: 925d5b9a278d75217d76b0852ed1b91bd4248a3e
This commit is contained in:
Silvia Surroca 2026-05-19 15:03:13 +02:00 committed by MongoDB Bot
parent 813d9f2c98
commit 0bbbd8012a
13 changed files with 536 additions and 26 deletions

View File

@ -285,7 +285,8 @@ export let MongosAPIParametersUtil = (function () {
commandName: "mergeAllChunksOnShard",
run: {
inAPIVersion1: false,
configServerCommandName: "_configsvrCommitMergeAllChunksOnShard",
// TODO (SERVER-108802): Re-enable this test case after the api version is propagated to the config server.
// configServerCommandName: "_configsvrCommitMergeAllChunksOnShard",
shardCommandName: "_shardsvrMergeAllChunksOnShard",
runsAgainstAdminDb: true,
permittedInTxn: false,

View File

@ -572,6 +572,16 @@ idl_generator(
],
)
idl_generator(
name = "merge_all_chunks_coordinator_document_gen",
src = "merge_all_chunks_coordinator_document.idl",
deps = [
"//src/mongo/db:basic_types_gen",
"//src/mongo/db/global_catalog/ddl:merge_chunk_request_gen",
"//src/mongo/db/global_catalog/ddl:sharding_coordinator_gen",
],
)
idl_generator(
name = "split_chunk_coordinator_document_gen",
src = "split_chunk_coordinator_document.idl",

View File

@ -29,6 +29,7 @@
#include "mongo/db/global_catalog/ddl/chunk_operation_sharding_coordinator.h"
#include "mongo/db/global_catalog/ddl/merge_all_chunks_coordinator.h"
#include "mongo/db/global_catalog/ddl/merge_chunks_coordinator.h"
#include "mongo/db/global_catalog/ddl/sharding_coordinator_external_state_for_test.h"
#include "mongo/db/global_catalog/ddl/split_chunk_coordinator.h"
@ -122,6 +123,25 @@ protected:
return doc;
}
MergeAllChunksCoordinatorDocument makeMergeAllChunksCoordinatorDoc(
const ShardId& shard,
int maxNumberOfChunksToMerge = std::numeric_limits<int>::max(),
int maxTimeProcessingChunksMS = std::numeric_limits<int>::max()) {
MergeAllChunksCoordinatorDocument doc;
ShardsvrMergeAllChunksOnShardRequest req;
req.setShard(shard);
req.setMaxNumberOfChunksToMerge(maxNumberOfChunksToMerge);
req.setMaxTimeProcessingChunksMS(maxTimeProcessingChunksMS);
ShardingCoordinatorMetadata metadata{
{NamespaceString::createNamespaceString_forTest("test.coll"),
CoordinatorTypeEnum::kMergeAllChunks}};
ForwardableOperationMetadata forwardableOpMetadata(_opCtx);
metadata.setForwardableOpMetadata(forwardableOpMetadata);
doc.setShardingCoordinatorMetadata(std::move(metadata));
doc.setShardsvrMergeAllChunksOnShardRequest(req);
return doc;
}
SplitChunkCoordinatorDocument makeSplitChunkCoordinatorDoc(std::vector<BSONObj> splitKeys,
OID epoch) {
SplitChunkCoordinatorDocument doc;
@ -213,6 +233,21 @@ TEST_F(ChunkOperationShardingCoordinatorTest, MergeChunksCheckIfOptionsConflictS
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
}
TEST_F(ChunkOperationShardingCoordinatorTest, MergeAllChunksCheckIfOptionsConflictSameParams) {
const ShardId shard{"shard0"};
auto coordinatorDoc = makeMergeAllChunksCoordinatorDoc(shard);
auto coordinator = std::make_shared<MergeAllChunksCoordinator>(
static_cast<ShardingCoordinatorService*>(_service), coordinatorDoc.toBSON());
// Same parameters — should not throw.
ASSERT_DOES_NOT_THROW(coordinator->checkIfOptionsConflict(coordinatorDoc.toBSON()));
// Satisfy destructor invariants by resolving internal promises.
static_cast<repl::PrimaryOnlyService::Instance*>(coordinator.get())
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
}
TEST_F(ChunkOperationShardingCoordinatorTest, SplitChunkCheckIfOptionsConflictSameParams) {
auto epoch = OID::gen();
std::vector<BSONObj> splitKeys = {BSON("x" << 50)};
@ -249,6 +284,25 @@ TEST_F(ChunkOperationShardingCoordinatorTest, MergeChunksCheckIfOptionsConflictD
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
}
TEST_F(ChunkOperationShardingCoordinatorTest, MergeAllChunksCheckIfOptionsConflictDifferentShard) {
const ShardId shard{"shard0"};
auto coordinatorDoc = makeMergeAllChunksCoordinatorDoc(shard);
auto coordinator = std::make_shared<MergeAllChunksCoordinator>(
static_cast<ShardingCoordinatorService*>(_service), coordinatorDoc.toBSON());
// Different shard — should throw.
const ShardId differentShard{"shard1"};
auto otherDoc = makeMergeAllChunksCoordinatorDoc(differentShard);
ASSERT_THROWS_CODE(coordinator->checkIfOptionsConflict(otherDoc.toBSON()),
DBException,
ErrorCodes::ConflictingOperationInProgress);
static_cast<repl::PrimaryOnlyService::Instance*>(coordinator.get())
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
}
TEST_F(ChunkOperationShardingCoordinatorTest, SplitChunkCheckIfOptionsConflictDifferentSplitKeys) {
auto epoch = OID::gen();
std::vector<BSONObj> splitKeys = {BSON("x" << 50)};
@ -289,6 +343,25 @@ TEST_F(ChunkOperationShardingCoordinatorTest, MergeChunksCheckIfOptionsConflictD
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
}
TEST_F(ChunkOperationShardingCoordinatorTest,
MergeAllChunksCheckIfOptionsConflictDifferentMaxNumberOfChunks) {
const ShardId shard{"shard0"};
auto coordinatorDoc = makeMergeAllChunksCoordinatorDoc(shard, /*maxNumberOfChunksToMerge=*/100);
auto coordinator = std::make_shared<MergeAllChunksCoordinator>(
static_cast<ShardingCoordinatorService*>(_service), coordinatorDoc.toBSON());
// Different maxNumberOfChunksToMerge — should throw.
auto otherDoc = makeMergeAllChunksCoordinatorDoc(shard, /*maxNumberOfChunksToMerge=*/50);
ASSERT_THROWS_CODE(coordinator->checkIfOptionsConflict(otherDoc.toBSON()),
DBException,
ErrorCodes::ConflictingOperationInProgress);
static_cast<repl::PrimaryOnlyService::Instance*>(coordinator.get())
->interrupt({ErrorCodes::Interrupted, "Test cleanup"});
}
TEST_F(ChunkOperationShardingCoordinatorTest, SplitChunkAppendCommandInfoIncludesRequestFields) {
auto epoch = OID::gen();
std::vector<BSONObj> splitKeys = {BSON("x" << 50), BSON("x" << 75)};

View File

@ -0,0 +1,175 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/global_catalog/ddl/merge_all_chunks_coordinator.h"
#include "mongo/bson/bsontypes.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/generic_argument_util.h"
#include "mongo/db/global_catalog/ddl/merge_chunk_request_gen.h"
#include "mongo/db/global_catalog/type_chunk_range.h"
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
#include "mongo/db/shard_role/shard_catalog/shard_filtering_metadata_refresh.h"
#include "mongo/db/sharding_environment/grid.h"
#include "mongo/db/topology/shard_registry.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/logv2/log.h"
#include "mongo/util/future_util.h"
#include "mongo/util/str.h"
#include <boost/none.hpp>
#include <boost/optional.hpp>
#include <boost/optional/optional.hpp>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
namespace {
static inline IDLParserContext kIdlParserCtx{"MergeAllChunksOnShardResponse"};
} // namespace
MergeAllChunksCoordinator::MergeAllChunksCoordinator(ShardingCoordinatorService* service,
const BSONObj& initialStateDoc)
: ChunkOperationShardingCoordinator(service, "MergeAllChunksCoordinator", initialStateDoc),
_request(_doc.getShardsvrMergeAllChunksOnShardRequest()) {}
void MergeAllChunksCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
const auto otherDoc = MergeAllChunksCoordinatorDocument::parse(
doc, IDLParserContext("MergeAllChunksCoordinatorDocument"));
const auto& selfReq = _request.toBSON();
const auto& otherReq = otherDoc.getShardsvrMergeAllChunksOnShardRequest().toBSON();
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Another merge all chunks operation for namespace "
<< nss().toStringForErrorMsg()
<< " is being executed with different parameters: " << redact(selfReq)
<< " vs " << redact(otherReq),
SimpleBSONObjComparator::kInstance.evaluate(selfReq == otherReq));
}
void MergeAllChunksCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const {
cmdInfoBuilder->appendElements(_request.toBSON());
}
bool MergeAllChunksCoordinator::isInCriticalSection(Phase phase) const {
return false;
}
ExecutorFuture<void> MergeAllChunksCoordinator::_acquireLocksAsync(
OperationContext* opCtx,
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) {
return AsyncTry([this, anchor = shared_from_this()] {
auto opCtxHolder = makeOperationContext(/*deprioritizable=*/true);
auto* newOpCtx = opCtxHolder.get();
// Span the whole shard key space so the registration acts as a
// namespace-wide guard (see class comment for details).
const auto chunkRange = ChunkRange(kMinBSONKey, kMaxBSONKey);
_scopedSplitMergeChunk.emplace(uassertStatusOK(
ActiveMigrationsRegistry::get(newOpCtx).registerSplitOrMergeChunk(
newOpCtx, nss(), chunkRange)));
})
.until([this, anchor = shared_from_this()](Status status) {
if (!status.isOK()) {
LOGV2_WARNING(12117914,
"ActiveMigrationsRegistry lock acquisition attempt failed",
logv2::DynamicAttributes{getCoordinatorLogAttrs(),
"error"_attr = redact(status)});
}
return !_recoveredFromDisk || status.isOK();
})
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
void MergeAllChunksCoordinator::_releaseLocks(OperationContext* opCtx) {
_scopedSplitMergeChunk.reset();
}
ExecutorFuture<void> MergeAllChunksCoordinator::_runImpl(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor).then([this, anchor = shared_from_this()] {
auto opCtxHolder = makeOperationContext(/*deprioritizable=*/true);
auto* opCtx = opCtxHolder.get();
LOGV2(12118000,
"Running merge all chunks on shard operation",
logAttrs(nss()),
"shard"_attr = _request.getShard());
// Because this is a non-authoritative update, we must mark the CSR metadata as
// kNonAuthoritative so that the following refresh will fetch the metadata from the config
// server. Leaving it kAuthoritative would short-circuit the refresh against the durable
// shard catalog and keep the CSR pinned to the pre-mergeAllChunks version. This must be
// done before starting the operation to ensure the CSR is left as kNonAuthoritative in case
// of an unexpected failure.
// TODO (SERVER-125786) The clearFilteringMetadata_nonAuthoritative should go away
// once mergeAllChunks becomes authoritative.
{
auto scopedCsr = CollectionShardingRuntime::acquireExclusive(opCtx, nss());
scopedCsr->clearFilteringMetadata_nonAuthoritative(opCtx);
}
ConfigSvrCommitMergeAllChunksOnShard configRequest(nss());
configRequest.setDbName(DatabaseName::kAdmin);
configRequest.setShard(_request.getShard());
configRequest.setMaxNumberOfChunksToMerge(_request.getMaxNumberOfChunksToMerge());
configRequest.setMaxTimeProcessingChunksMS(_request.getMaxTimeProcessingChunksMS());
configRequest.setWriteConcern(defaultMajorityWriteConcernDoNotUse());
auto cmdResponse =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand(
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
DatabaseName::kAdmin,
configRequest.toBSON(),
Shard::RetryPolicy::kIdempotent));
uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(cmdResponse));
_response = MergeAllChunksOnShardResponse::parse(cmdResponse.response, kIdlParserCtx);
// Update the shard catalog filtering metadata to reflect the new shard
// version produced by the config server merge.
uassertStatusOK(FilteringMetadataCache::get(opCtx)->onCollectionPlacementVersionMismatch(
opCtx, nss(), _response->getShardVersion()));
LOGV2(12118001,
"Completed merge all chunks on shard operation",
logAttrs(nss()),
"shard"_attr = _request.getShard(),
"numMergedChunks"_attr = _response->getNumMergedChunks());
});
}
} // namespace mongo

View File

@ -0,0 +1,84 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/global_catalog/ddl/chunk_operation_sharding_coordinator.h"
#include "mongo/db/global_catalog/ddl/merge_all_chunks_coordinator_document_gen.h"
#include "mongo/db/global_catalog/ddl/merge_chunk_request_gen.h"
#include "mongo/db/s/active_migrations_registry.h"
namespace mongo {
/**
* Coordinator that drives a mergeAllChunks operation on a single shard for a given collection.
*
* For mutual exclusion against concurrent chunk operations (moveChunk / split / merge) on the
* same namespace, the coordinator registers a ScopedSplitMergeChunk with the
* ActiveMigrationsRegistry on the entire shard key space (Min, Max). Conflicts are detected at
* the namespace level - the ChunkRange itself is stored only as informational state - so the
* registration effectively acts as a per-collection guard for the lifetime of the coordinator.
*/
class MergeAllChunksCoordinator final
: public ChunkOperationShardingCoordinator<MergeAllChunksCoordinatorDocument> {
public:
MergeAllChunksCoordinator(ShardingCoordinatorService* service, const BSONObj& initialStateDoc);
void checkIfOptionsConflict(const BSONObj& doc) const final;
void appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const override;
/**
* Waits for the merge to complete and returns the response built from the config server reply.
*/
MergeAllChunksOnShardResponse getResponse(OperationContext* opCtx) {
getCompletionFuture().get(opCtx);
tassert(12117910, "Expected _response to be set", _response);
return *_response;
}
protected:
bool isInCriticalSection(Phase phase) const override;
private:
ExecutorFuture<void> _acquireLocksAsync(OperationContext* opCtx,
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) override;
void _releaseLocks(OperationContext* opCtx) override;
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
const ShardsvrMergeAllChunksOnShardRequest _request;
boost::optional<MergeAllChunksOnShardResponse> _response;
boost::optional<ScopedSplitMergeChunk> _scopedSplitMergeChunk;
};
} // namespace mongo

View File

@ -0,0 +1,59 @@
# Copyright (C) 2026-present MongoDB, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Server Side Public License, version 1,
# as published by MongoDB, Inc.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Server Side Public License for more details.
#
# You should have received a copy of the Server Side Public License
# along with this program. If not, see
# <http://www.mongodb.com/licensing/server-side-public-license>.
#
# As a special exception, the copyright holders give permission to link the
# code of portions of this program with the OpenSSL library under certain
# conditions as described in each individual source file and distribute
# linked combinations including the program with the OpenSSL library. You
# must comply with the Server Side Public License in all respects for
# all of the code used other than as permitted herein. If you modify file(s)
# with this exception, you may extend this exception to your version of the
# file(s), but you are not obligated to do so. If you do not wish to do so,
# delete this exception statement from your version. If you delete this
# exception statement from all source files in the program, then also delete
# it in the license file.
#
# This file defines the format of the state document for the merge all chunks coordinator.
global:
mod_visibility: private
cpp_namespace: "mongo"
imports:
- "mongo/db/basic_types.idl"
- "mongo/db/global_catalog/ddl/sharding_coordinator.idl"
- "mongo/db/global_catalog/ddl/merge_chunk_request.idl"
enums:
MergeAllChunksCoordinatorPhase:
description: "Phases for the merge all chunks coordinator."
type: string
values:
kUnset: "unset"
kMergeAllChunks: "mergeAllChunks"
structs:
MergeAllChunksCoordinatorDocument:
description: "State document for the merge all chunks coordinator."
generate_comparison_operators: false
strict: false
chained_structs:
ShardingCoordinatorMetadata: ShardingCoordinatorMetadata
ShardsvrMergeAllChunksOnShardRequest: ShardsvrMergeAllChunksOnShardRequest
fields:
phase:
type: MergeAllChunksCoordinatorPhase
default: kUnset

View File

@ -65,6 +65,25 @@ structs:
type: timestamp
optional: true
ShardsvrMergeAllChunksOnShardRequest:
description: "mergeAllChunksOnShard command parameters"
strict: false
fields:
shard:
description: "The id of the shard."
type: shard_id
validator:
callback: "ShardId::validate"
maxNumberOfChunksToMerge:
description: "The maximum number of chunks to merge."
type: int
default: 2147483647 # INT_MAX
maxTimeProcessingChunksMS:
description: >-
"The maximum time in milliseconds spent looking for mergeable chunks."
type: int
default: 2147483647 # INT_MAX
ConfigSvrMergeResponse:
description: "Response of the _configsvrCommitChunksMerge command"
strict: false
@ -174,21 +193,8 @@ commands:
type: namespacestring
api_version: ""
strict: false
fields:
shard:
description: "The id of the shard."
type: shard_id
validator:
callback: "ShardId::validate"
maxNumberOfChunksToMerge:
description: "The maximum number of chunks to merge."
type: int
default: 2147483647 # INT_MAX
maxTimeProcessingChunksMS:
description: >-
"The maximum time in milliseconds spent looking for mergeable chunks."
type: int
default: 2147483647 # INT_MAX
chained_structs:
ShardsvrMergeAllChunksOnShardRequest: ShardsvrMergeAllChunksOnShardRequest
_configsvrCommitMergeAllChunksOnShard:
command_name: _configsvrCommitMergeAllChunksOnShard

View File

@ -68,6 +68,7 @@ enums:
# TODO (SERVER-116499): Remove this coordinator type once 9.0 becomes last LTS.
kTimeseriesUpgradeDowngrade: "upgradeDowngradeViewlessTimeseries"
kMergeChunks: "mergeChunks"
kMergeAllChunks: "mergeAllChunks"
kSplitChunk: "splitChunk"
kTestCoordinator: "testCoordinator"

View File

@ -49,6 +49,7 @@
#include "mongo/db/global_catalog/ddl/drop_database_coordinator.h"
#include "mongo/db/global_catalog/ddl/drop_indexes_coordinator.h"
#include "mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.h"
#include "mongo/db/global_catalog/ddl/merge_all_chunks_coordinator.h"
#include "mongo/db/global_catalog/ddl/merge_chunks_coordinator.h"
#include "mongo/db/global_catalog/ddl/migration_blocking_operation_coordinator.h"
#include "mongo/db/global_catalog/ddl/move_primary_coordinator.h"
@ -135,6 +136,7 @@ constexpr std::pair<CoordinatorTypeEnum,
{CoordinatorTypeEnum::kTimeseriesUpgradeDowngrade,
typedInstance<TimeseriesUpgradeDowngradeCoordinator>},
{CoordinatorTypeEnum::kMergeChunks, typedInstance<MergeChunksCoordinator>},
{CoordinatorTypeEnum::kMergeAllChunks, typedInstance<MergeAllChunksCoordinator>},
{CoordinatorTypeEnum::kSplitChunk, typedInstance<SplitChunkCoordinator>},
{CoordinatorTypeEnum::kTestCoordinator, noInstance},
};

View File

@ -38,9 +38,12 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/resource_pattern.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/database_name.h"
#include "mongo/db/generic_argument_util.h"
#include "mongo/db/global_catalog/ddl/merge_all_chunks_coordinator.h"
#include "mongo/db/global_catalog/ddl/merge_chunk_request_gen.h"
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
#include "mongo/db/global_catalog/sharding_catalog_client.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
@ -49,12 +52,16 @@
#include "mongo/db/shard_role/shard_catalog/shard_filtering_metadata_refresh.h"
#include "mongo/db/sharding_environment/client/shard.h"
#include "mongo/db/sharding_environment/grid.h"
#include "mongo/db/sharding_environment/sharding_runtime_d_params_gen.h"
#include "mongo/db/topology/shard_registry.h"
#include "mongo/db/topology/sharding_state.h"
#include "mongo/db/version_context.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
#include <memory>
#include <string>
@ -65,6 +72,72 @@
namespace mongo {
namespace {
/**
* Attempts to execute the merge-all-chunks-on-shard operation through the sharding coordinator
* service, retrying while a conflicting coordinator is already running for the same namespace.
* Populates `response` and returns true if the merge completed via the coordinator; returns
* false if the caller should fall back to the legacy config-server path (because the
* authoritative metadata feature flag is disabled). Throws ConflictingOperationInProgress if
* the configured retry budget is exhausted.
*/
bool tryRunMergeAllChunksCoordinator(OperationContext* opCtx,
const NamespaceString& nss,
const ShardSvrMergeAllChunksOnShard& req,
MergeAllChunksOnShardResponse* response) {
// If a conflicting merge all chunks coordinator is already running for this namespace,
// wait for it to complete and retry.
// TODO (SERVER-125033): Remove the retry-loop once this task gets done.
const int maxConflictRetries = shardsvrMergeAllChunksMaxConflictRetries.load();
Status lastConflictStatus = Status::OK();
for (int retries = 0; retries < maxConflictRetries; ++retries) {
boost::optional<FixedFCVRegion> optFixedFcvRegion{boost::in_place_init, opCtx};
if (sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel(
VersionContext::getDecoration(opCtx),
optFixedFcvRegion.get()->acquireFCVSnapshot()) ==
AuthoritativeMetadataAccessLevelEnum::kNone) {
return false;
}
auto coordinatorDoc = MergeAllChunksCoordinatorDocument();
coordinatorDoc.setShardsvrMergeAllChunksOnShardRequest(
req.getShardsvrMergeAllChunksOnShardRequest());
coordinatorDoc.setShardingCoordinatorMetadata(
{{nss, CoordinatorTypeEnum::kMergeAllChunks}});
// Defer option conflict checking to the explicit checkIfOptionsConflict
// call below, allowing the retry loop to handle ConflictingOperationInProgress.
auto service = ShardingCoordinatorService::getService(opCtx);
auto coordinator =
checked_pointer_cast<MergeAllChunksCoordinator>(service->getOrCreateInstance(
opCtx, coordinatorDoc.toBSON(), *optFixedFcvRegion, false /*checkOptions*/));
try {
coordinator->checkIfOptionsConflict(coordinatorDoc.toBSON());
} catch (const ExceptionFor<ErrorCodes::ConflictingOperationInProgress>& ex) {
LOGV2_DEBUG(12118002,
1,
"Merge all chunks coordinator already running, waiting for completion",
"namespace"_attr = nss,
"error"_attr = ex);
lastConflictStatus = ex.toStatus();
optFixedFcvRegion.reset();
coordinator->getCompletionFuture().getNoThrow(opCtx).ignore();
continue;
}
optFixedFcvRegion.reset();
*response = coordinator->getResponse(opCtx);
return true;
}
uasserted(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Failed to execute merge all chunks for namespace "
<< nss.toStringForErrorMsg() << " after " << maxConflictRetries
<< " retries due to conflicting operations. Last conflict: "
<< lastConflictStatus.reason());
}
class ShardSvrMergeAllChunksOnShardCommand final
: public TypedCommand<ShardSvrMergeAllChunksOnShardCommand> {
@ -102,10 +175,18 @@ public:
"invalid namespace specified for request",
ns().isValid());
const auto& nss = ns();
const auto& req = request();
MergeAllChunksOnShardResponse response;
if (tryRunMergeAllChunksCoordinator(opCtx, nss, req, &response)) {
return response;
}
// Because this is a non-authoritative update, we must mark the CSR metadata as
// kNonAuthoritative so that the following refresh will fetch the metadata from the
// config server. Leaving it kAuthoritative would short-circuit the refresh against the
// durable shard catalog and keep the CSR pinned to the pre-split version.
// durable shard catalog and keep the CSR pinned to the pre-mergeAllChunks version.
// This must be done before starting the operation to ensure the CSR is left as
// kNonAuthoritative in case of an unexpected failure.
// TODO (SERVER-125786) The clearFilteringMetadata_nonAuthoritative should go away once
@ -115,13 +196,14 @@ public:
scopedCsr->clearFilteringMetadata_nonAuthoritative(opCtx);
}
ConfigSvrCommitMergeAllChunksOnShard configSvrCommitMergeAllChunksOnShard(ns());
// Legacy path: forward directly to the config server.
ConfigSvrCommitMergeAllChunksOnShard configSvrCommitMergeAllChunksOnShard(nss);
configSvrCommitMergeAllChunksOnShard.setDbName(DatabaseName::kAdmin);
configSvrCommitMergeAllChunksOnShard.setShard(request().getShard());
configSvrCommitMergeAllChunksOnShard.setShard(req.getShard());
configSvrCommitMergeAllChunksOnShard.setMaxNumberOfChunksToMerge(
request().getMaxNumberOfChunksToMerge());
req.getMaxNumberOfChunksToMerge());
configSvrCommitMergeAllChunksOnShard.setMaxTimeProcessingChunksMS(
request().getMaxTimeProcessingChunksMS());
req.getMaxTimeProcessingChunksMS());
configSvrCommitMergeAllChunksOnShard.setWriteConcern(
defaultMajorityWriteConcernDoNotUse());
@ -135,16 +217,16 @@ public:
uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(swCommandResponse));
auto response = MergeAllChunksOnShardResponse::parse(
swCommandResponse.getValue().response, IDL_PARSER_CONTEXT);
auto res = MergeAllChunksOnShardResponse::parse(swCommandResponse.getValue().response,
IDL_PARSER_CONTEXT);
// Update the shard catalog filtering metadata to reflect the new shard
// version produced by the config server merge.
uassertStatusOK(
FilteringMetadataCache::get(opCtx)->onCollectionPlacementVersionMismatch(
opCtx, ns(), response.getShardVersion()));
opCtx, ns(), res.getShardVersion()));
return response;
return res;
}
private:

View File

@ -600,6 +600,8 @@ mongo_cc_library(
"//src/mongo/db/global_catalog/ddl:drop_indexes_coordinator_document_gen",
"//src/mongo/db/global_catalog/ddl:initialize_placement_history_coordinator.cpp",
"//src/mongo/db/global_catalog/ddl:initialize_placement_history_coordinator_document_gen",
"//src/mongo/db/global_catalog/ddl:merge_all_chunks_coordinator.cpp",
"//src/mongo/db/global_catalog/ddl:merge_all_chunks_coordinator_document_gen",
"//src/mongo/db/global_catalog/ddl:merge_chunks_coordinator.cpp",
"//src/mongo/db/global_catalog/ddl:merge_chunks_coordinator_document_gen",
"//src/mongo/db/global_catalog/ddl:migration_blocking_operation_coordinator.cpp",

View File

@ -311,7 +311,9 @@ public:
: CommandInfo(shardId, nss, boost::none) {}
BSONObj serialise() const override {
ShardSvrMergeAllChunksOnShard req(getNameSpace(), getTarget());
ShardSvrMergeAllChunksOnShard req(getNameSpace());
req.setDbName(DatabaseName::kAdmin);
req.setShard(getTarget());
req.setMaxNumberOfChunksToMerge(AutoMergerPolicy::MAX_NUMBER_OF_CHUNKS_TO_MERGE);
req.setMaxTimeProcessingChunksMS(autoMergerMaxTimeProcessingChunksMS.load());
return req.toBSON();

View File

@ -228,6 +228,19 @@ server_parameters:
default: 10
redact: false
# TODO (SERVER-125033): Remove this parameter once this task gets done.
shardsvrMergeAllChunksMaxConflictRetries:
description: >-
Maximum number of times _shardsvrMergeAllChunksOnShard will retry when a conflicting
merge all chunks coordinator is already running for the same namespace.
set_at: [startup, runtime]
cpp_vartype: AtomicWord<int>
cpp_varname: shardsvrMergeAllChunksMaxConflictRetries
validator:
gt: 0
default: 10
redact: false
# TODO (SERVER-125033): Remove this parameter once this task gets done.
shardsvrSplitChunkMaxConflictRetries:
description: >-