SERVER-120993 Use OSI in resharding coordinator commands (#51085)

GitOrigin-RevId: 99f04a794cd6b51616205d853ecd924c583084db
This commit is contained in:
Brett Nawrocki 2026-04-07 10:22:34 -04:00 committed by MongoDB Bot
parent 087c297bc7
commit 65e1bb31f1
13 changed files with 196 additions and 52 deletions

View File

@ -66,6 +66,10 @@ public:
return Command::AllowedOnSecondary::kNever;
}
bool supportsRetryableWrite() const final {
return true;
}
std::string help() const override {
return "Internal command aimed to remove stale entries from the local collection catalog.";
}

View File

@ -1765,12 +1765,12 @@ void ReshardingCoordinator::_generateCommitNotificationForChangeStreams(
// In case the recipient is running a legacy binary, swallow the error.
try {
generic_argument_util::setMajorityWriteConcern(request, &resharding::kMajorityWriteConcern);
const auto opts =
std::make_shared<async_rpc::AsyncRPCOptions<ShardsvrNotifyShardingEventRequest>>(
**executor, _ctHolder->getStepdownToken(), request);
opts->cmd.setDbName(DatabaseName::kAdmin);
resharding::sendCommandToShards(opCtx, opts, {notifierShard});
resharding::sendReshardingCommand(opCtx,
_getNewSession(opCtx),
std::move(request),
_ctHolder->getStepdownToken(),
executor,
{notifierShard});
} catch (const ExceptionFor<ErrorCodes::UnsupportedShardingEventNotification>& e) {
LOGV2_WARNING(7403100,
"Unable to generate op entry on reshardCollection commit",
@ -1796,12 +1796,12 @@ void ReshardingCoordinator::_generatePlacementChangeNotificationForChangeStreams
// In case the recipient is running a legacy binary, swallow the error.
try {
generic_argument_util::setMajorityWriteConcern(request, &resharding::kMajorityWriteConcern);
const auto opts =
std::make_shared<async_rpc::AsyncRPCOptions<ShardsvrNotifyShardingEventRequest>>(
**executor, _ctHolder->getStepdownToken(), request);
opts->cmd.setDbName(DatabaseName::kAdmin);
resharding::sendCommandToShards(opCtx, opts, {notifierShard});
resharding::sendReshardingCommand(opCtx,
_getNewSession(opCtx),
std::move(request),
_ctHolder->getStepdownToken(),
executor,
{notifierShard});
} catch (const ExceptionFor<ErrorCodes::UnsupportedShardingEventNotification>& e) {
tasserted(
10674000,
@ -1842,12 +1842,12 @@ ExecutorFuture<void> ReshardingCoordinator::_awaitAllParticipantShardsDone(
auto cmd = ShardsvrDropCollectionIfUUIDNotMatchingWithWriteConcernRequest(
nss, notMatchingThisUUID);
generic_argument_util::setMajorityWriteConcern(cmd,
&resharding::kMajorityWriteConcern);
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<
ShardsvrDropCollectionIfUUIDNotMatchingWithWriteConcernRequest>>(
**executor, _ctHolder->getStepdownToken(), cmd);
resharding::sendCommandToShards(opCtx.get(), opts, allShardIds);
resharding::sendReshardingCommand(opCtx.get(),
_getNewSession(opCtx.get()),
cmd,
_ctHolder->getStepdownToken(),
executor,
allShardIds);
}
reshardingPauseCoordinatorBeforeRemovingStateDoc.pauseWhileSetAndNotCanceled(
@ -1918,8 +1918,11 @@ void ReshardingCoordinator::_initializeAllDonors(
if (resharding::gFeatureFlagReshardingInitNoRefresh.isEnabled(
VersionContext::getDecoration(opCtx.get()),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
resharding::tellAllDonorsToInitialize(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor);
resharding::tellAllDonorsToInitialize(opCtx.get(),
_getNewSession(opCtx.get()),
_coordinatorDoc,
_ctHolder->getStepdownToken(),
executor);
} else {
_reshardingCoordinatorExternalState->establishAllDonorsAsParticipants(
opCtx.get(),
@ -1937,8 +1940,11 @@ void ReshardingCoordinator::_initializeAllRecipients(
if (resharding::gFeatureFlagReshardingInitNoRefresh.isEnabled(
VersionContext::getDecoration(opCtx.get()),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
resharding::tellAllRecipientsToInitialize(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor);
resharding::tellAllRecipientsToInitialize(opCtx.get(),
_getNewSession(opCtx.get()),
_coordinatorDoc,
_ctHolder->getStepdownToken(),
executor);
} else {
_reshardingCoordinatorExternalState->establishAllRecipientsAsParticipants(
opCtx.get(),
@ -1956,8 +1962,11 @@ void ReshardingCoordinator::_tellAllRecipientsToClone(
reshardingPauseBeforeTellingRecipientsToClone.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
resharding::tellAllRecipientsToClone(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor);
resharding::tellAllRecipientsToClone(opCtx.get(),
_getNewSession(opCtx.get()),
_coordinatorDoc,
_ctHolder->getStepdownToken(),
executor);
}
void ReshardingCoordinator::_tellAllRecipientsToRefresh(
@ -2001,15 +2010,21 @@ void ReshardingCoordinator::_tellAllDonorsToStartChangeStreamsMonitor(
}
auto opCtx = _makeOperationContext();
resharding::tellAllDonorsToStartChangeStreamsMonitor(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor);
resharding::tellAllDonorsToStartChangeStreamsMonitor(opCtx.get(),
_getNewSession(opCtx.get()),
_coordinatorDoc,
_ctHolder->getStepdownToken(),
executor);
}
void ReshardingCoordinator::_tellAllRecipientsCriticalSectionStarted(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
auto opCtx = _makeOperationContext();
resharding::tellAllRecipientsCriticalSectionStarted(
opCtx.get(), _coordinatorDoc, _ctHolder->getAbortToken(), executor);
resharding::tellAllRecipientsCriticalSectionStarted(opCtx.get(),
_getNewSession(opCtx.get()),
_coordinatorDoc,
_ctHolder->getAbortToken(),
executor);
}
void ReshardingCoordinator::_tellAllParticipantsToCommit(
@ -2018,15 +2033,22 @@ void ReshardingCoordinator::_tellAllParticipantsToCommit(
reshardingPauseBeforeTellingParticipantsToCommit.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
resharding::tellAllParticipantsToCommit(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor);
resharding::tellAllParticipantsToCommit(opCtx.get(),
_getNewSession(opCtx.get()),
_coordinatorDoc,
_ctHolder->getStepdownToken(),
executor);
}
void ReshardingCoordinator::_tellAllParticipantsToAbort(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, bool isUserAborted) {
auto opCtx = _makeOperationContext();
resharding::tellAllParticipantsToAbort(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor, isUserAborted);
resharding::tellAllParticipantsToAbort(opCtx.get(),
_getNewSession(opCtx.get()),
_coordinatorDoc,
_ctHolder->getStepdownToken(),
executor,
isUserAborted);
}
void ReshardingCoordinator::_launchDonorPostCloningDeltaCollector(

View File

@ -29,9 +29,9 @@
#include "mongo/db/s/resharding/resharding_coordinator_command_util.h"
#include "mongo/db/generic_argument_util.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding/shardsvr_resharding_commands_gen.h"
#include "mongo/db/session/logical_session_id.h"
#include "mongo/s/request_types/abort_reshard_collection_gen.h"
#include "mongo/s/request_types/commit_reshard_collection_gen.h"
@ -46,47 +46,35 @@ std::vector<ShardId> getAllParticipantShardIds(const ReshardingCoordinatorDocume
shardIds.insert(recipientShardIds.begin(), recipientShardIds.end());
return {shardIds.begin(), shardIds.end()};
}
template <typename Cmd>
void sendReshardingCommand(OperationContext* opCtx,
Cmd cmd,
CancellationToken token,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const std::vector<ShardId>& shardIds,
bool setWriteConcern = true) {
cmd.setDbName(DatabaseName::kAdmin);
if (setWriteConcern) {
generic_argument_util::setMajorityWriteConcern(cmd, &resharding::kMajorityWriteConcern);
}
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<Cmd>>(**executor, token, cmd);
resharding::sendCommandToShards(opCtx, opts, shardIds);
}
} // namespace
namespace resharding {
void tellAllParticipantsToCommit(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
ShardsvrCommitReshardCollection cmd(doc.getSourceNss());
cmd.setReshardingUUID(doc.getReshardingUUID());
sendReshardingCommand(opCtx, cmd, stepdownToken, executor, getAllParticipantShardIds(doc));
sendReshardingCommand(opCtx, osi, cmd, stepdownToken, executor, getAllParticipantShardIds(doc));
}
void tellAllParticipantsToAbort(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
bool isUserAborted) {
ShardsvrAbortReshardCollection abortCmd(doc.getReshardingUUID(), isUserAborted);
sendReshardingCommand(opCtx, abortCmd, stepdownToken, executor, getAllParticipantShardIds(doc));
sendReshardingCommand(
opCtx, osi, abortCmd, stepdownToken, executor, getAllParticipantShardIds(doc));
}
void tellAllDonorsToInitialize(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
@ -96,6 +84,7 @@ void tellAllDonorsToInitialize(OperationContext* opCtx,
resharding::extractShardIdsFromParticipantEntries(doc.getRecipientShards()));
sendReshardingCommand(opCtx,
osi,
cmd,
stepdownToken,
executor,
@ -103,6 +92,7 @@ void tellAllDonorsToInitialize(OperationContext* opCtx,
}
void tellAllRecipientsToInitialize(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
@ -123,6 +113,7 @@ void tellAllRecipientsToInitialize(OperationContext* opCtx,
sendReshardingCommand(
opCtx,
osi,
cmd,
stepdownToken,
executor,
@ -131,6 +122,7 @@ void tellAllRecipientsToInitialize(OperationContext* opCtx,
void tellAllDonorsToStartChangeStreamsMonitor(
OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
@ -141,6 +133,7 @@ void tellAllDonorsToStartChangeStreamsMonitor(
// The donors ensure the change streams monitor start time is majority committed in their
// state document before returning, so no write concern is needed.
sendReshardingCommand(opCtx,
osi,
cmd,
stepdownToken,
executor,
@ -149,6 +142,7 @@ void tellAllDonorsToStartChangeStreamsMonitor(
}
void tellAllRecipientsToClone(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
@ -162,6 +156,7 @@ void tellAllRecipientsToClone(OperationContext* opCtx,
cmd.setApproxCopySize(recipientFields.getReshardingApproxCopySizeStruct());
sendReshardingCommand(opCtx,
osi,
cmd,
stepdownToken,
executor,
@ -174,6 +169,7 @@ void tellAllRecipientsToClone(OperationContext* opCtx,
cmd.setApproxCopySize(approxCopySize);
sendReshardingCommand(opCtx,
osi,
cmd,
stepdownToken,
executor,
@ -183,6 +179,7 @@ void tellAllRecipientsToClone(OperationContext* opCtx,
void tellAllRecipientsCriticalSectionStarted(
OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken abortToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
@ -190,6 +187,7 @@ void tellAllRecipientsCriticalSectionStarted(
sendReshardingCommand(
opCtx,
osi,
cmd,
abortToken,
executor,

View File

@ -28,8 +28,10 @@
*/
#pragma once
#include "mongo/db/generic_argument_util.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/resharding/resharding_coordinator.h"
#include "mongo/db/session/logical_session_id.h"
#include "mongo/db/sharding_environment/shard_id.h"
#include "mongo/executor/scoped_task_executor.h"
#include "mongo/util/cancellation.h"
@ -40,40 +42,70 @@
namespace mongo {
namespace resharding {
template <typename Cmd>
void sendReshardingCommand(OperationContext* opCtx,
const OperationSessionInfo& osi,
Cmd cmd,
CancellationToken token,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const std::vector<ShardId>& shardIds,
bool setWriteConcern = true) {
if (cmd.getDbName().isEmpty()) {
cmd.setDbName(DatabaseName::kAdmin);
}
if (resharding::gFeatureFlagReshardingInitNoRefresh.isEnabled(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
generic_argument_util::setOperationSessionInfo(cmd, osi);
}
if (setWriteConcern) {
generic_argument_util::setMajorityWriteConcern(cmd, &resharding::kMajorityWriteConcern);
}
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<Cmd>>(**executor, token, cmd);
resharding::sendCommandToShards(opCtx, opts, shardIds);
}
void tellAllParticipantsToCommit(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void tellAllParticipantsToAbort(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
bool isUserAborted);
void tellAllDonorsToInitialize(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void tellAllRecipientsToInitialize(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void tellAllDonorsToStartChangeStreamsMonitor(
OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void tellAllRecipientsToClone(OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void tellAllRecipientsCriticalSectionStarted(
OperationContext* opCtx,
const OperationSessionInfo& osi,
const ReshardingCoordinatorDocument& doc,
CancellationToken abortToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);

View File

@ -70,6 +70,7 @@
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/logv2/log.h"
#include "mongo/s/resharding/resharding_coordinator_service_conflicting_op_in_progress_info.h"
#include "mongo/s/resharding/resharding_feature_flag_gen.h"
#include "mongo/s/resharding/type_collection_fields_gen.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/unittest/unittest.h"
@ -77,6 +78,7 @@
#include "mongo/util/clock_source.h"
#include "mongo/util/duration.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/string_map.h"
#include "mongo/util/time_support.h"
#include <functional>
@ -326,6 +328,62 @@ private:
std::shared_ptr<ExternalStateForTest> _externalState;
};
/**
* Wraps a delegate AsyncRPCRunner and asserts that every command sent by the
* ReshardingCoordinator carries both `lsid` and `txnNumber` (i.e. OperationSessionInfo),
* which are required for replay protection.
*/
class OsiCheckingAsyncRPCRunner : public async_rpc::detail::AsyncRPCRunner {
public:
explicit OsiCheckingAsyncRPCRunner(std::unique_ptr<async_rpc::detail::AsyncRPCRunner> inner)
: _inner(std::move(inner)) {}
ExecutorFuture<async_rpc::detail::AsyncRPCInternalResponse> _sendCommand(
std::shared_ptr<executor::TaskExecutor> exec,
CancellationToken token,
OperationContext* opCtx,
async_rpc::Targeter* targeter,
const TargetingMetadata& targetingMetadata,
const DatabaseName& dbName,
BSONObj cmdBSON,
BatonHandle baton,
boost::optional<UUID> clientOperationKey) final {
auto cmdName = cmdBSON.firstElementFieldNameStringData();
if (!kOsiExemptCommands.count(cmdName) &&
resharding::gFeatureFlagReshardingInitNoRefresh.isEnabled(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
ASSERT(cmdBSON.hasField("lsid"))
<< "ReshardingCoordinator RPC '" << cmdName << "' missing lsid (OSI)";
ASSERT(cmdBSON.hasField("txnNumber"))
<< "ReshardingCoordinator RPC '" << cmdName << "' missing txnNumber (OSI)";
}
return _inner->_sendCommand(std::move(exec),
std::move(token),
opCtx,
targeter,
targetingMetadata,
dbName,
std::move(cmdBSON),
std::move(baton),
clientOperationKey);
}
private:
// Commands sent by the coordinator that are exempt from carrying OSI.
// _flushReshardingStateChange is idempotent, so OSI-based deduplication is unnecessary.
// One instance is also sent post-commit on a best-effort basis, after the coordinator
// document and its associated session have already been removed, making it impossible
// to include OSI. This command is expected to be removed once reshardingFields are no
// longer written to config.collections, when shards authoritatively manage their own
// filtering metadata.
static inline const StringSet kOsiExemptCommands{
"_flushReshardingStateChange",
};
std::unique_ptr<async_rpc::detail::AsyncRPCRunner> _inner;
};
class ReshardingCoordinatorServiceTestBase : service_context_test::WithSetupTransportLayer,
public ConfigServerTestFixture {
public:
@ -392,7 +450,8 @@ public:
repl::createOplog(opCtx);
auto asyncRPCMock = std::make_unique<async_rpc::NoopMockAsyncRPCRunner>();
auto asyncRPCMock = std::make_unique<OsiCheckingAsyncRPCRunner>(
std::make_unique<async_rpc::NoopMockAsyncRPCRunner>());
async_rpc::detail::AsyncRPCRunner::set(getServiceContext(), std::move(asyncRPCMock));
_opObserverRegistry =

View File

@ -31,6 +31,7 @@
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
#include "mongo/db/s/resharding/resharding_coordinator_dao.h"
#include "mongo/db/session/logical_session_id.h"
#include "mongo/db/shard_role/shard_catalog/flush_routing_table_cache_updates_gen.h"
#include "mongo/db/sharding_environment/shard_id.h"
#include "mongo/executor/async_rpc.h"

View File

@ -192,6 +192,10 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
bool supportsRetryableWrite() const final {
return true;
}
};
MONGO_REGISTER_COMMAND(ShardsvrCommitReshardCollectionCommand).forShard();

View File

@ -152,6 +152,10 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
bool supportsRetryableWrite() const final {
return true;
}
};
MONGO_REGISTER_COMMAND(ShardsvrReshardDonorInitializeCommand).forShard();

View File

@ -185,6 +185,10 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
bool supportsRetryableWrite() const final {
return true;
}
};
MONGO_REGISTER_COMMAND(ShardsvrReshardRecipientInitializeCommand).forShard();

View File

@ -187,6 +187,10 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
bool supportsRetryableWrite() const final {
return true;
}
};
MONGO_REGISTER_COMMAND(ShardsvrAbortReshardCollectionCommand).forShard();

View File

@ -150,6 +150,10 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
bool supportsRetryableWrite() const final {
return true;
}
};
MONGO_REGISTER_COMMAND(ShardsvrReshardRecipientCloneCommand).forShard();

View File

@ -147,6 +147,10 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
bool supportsRetryableWrite() const final {
return true;
}
};
MONGO_REGISTER_COMMAND(ShardsvrReshardRecipientCriticalSectionStartedCommand).forShard();

View File

@ -61,6 +61,10 @@ public:
return AllowedOnSecondary::kNever;
}
bool supportsRetryableWrite() const final {
return true;
}
bool adminOnly() const override {
return true;
}