Revert "SERVER-126252 Create resharding participant cancel state with ChangeStreamsMonitor-aware token (#53468)" (#54172)
GitOrigin-RevId: 20e7fbf07b66445102c00c26f850917f6a46f3d5
This commit is contained in:
parent
0e298d597d
commit
0722eba8f4
@ -993,7 +993,6 @@ mongo_cc_unit_test(
|
||||
"//src/mongo/db/s/resharding:resharding_donor_service_test.cpp",
|
||||
"//src/mongo/db/s/resharding:resharding_future_util_test.cpp",
|
||||
"//src/mongo/db/s/resharding:resharding_metrics_test.cpp",
|
||||
"//src/mongo/db/s/resharding:resharding_participant_cancel_state_test.cpp",
|
||||
],
|
||||
tags = [
|
||||
"mongo_unittest_fourth_group",
|
||||
|
||||
@ -63,7 +63,7 @@ namespace primary_only_service_helpers {
|
||||
* - abort() cancels _abortOrStepdownSource only
|
||||
* - stepdown cancels both _stepdownSource and _abortOrStepdownSource
|
||||
*/
|
||||
class MONGO_MOD_OPEN CancelState {
|
||||
class CancelState {
|
||||
public:
|
||||
CancelState(boost::optional<CancellationToken> stepdownToken = boost::none);
|
||||
void attachStepdownToken(const CancellationToken& stepdownToken);
|
||||
|
||||
@ -1522,7 +1522,7 @@ void ReshardingDonorService::DonorStateMachine::_initCancelState(
|
||||
reshardingPauseDonorBeforeInitCancelState.pauseWhileSet();
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(_mutex);
|
||||
_cancelState = std::make_unique<ReshardingParticipantCancelState>(stepdownToken);
|
||||
_cancelState = std::make_unique<primary_only_service_helpers::CancelState>(stepdownToken);
|
||||
}
|
||||
|
||||
if (_donorCtx.getState() == DonorStateEnum::kDone && _donorCtx.getAbortReason()) {
|
||||
|
||||
@ -41,11 +41,11 @@
|
||||
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
|
||||
#include "mongo/db/repl/primary_only_service.h"
|
||||
#include "mongo/db/s/forwardable_operation_metadata.h"
|
||||
#include "mongo/db/s/primary_only_service_helpers/cancel_state.h"
|
||||
#include "mongo/db/s/resharding/donor_document_gen.h"
|
||||
#include "mongo/db/s/resharding/resharding_change_streams_monitor.h"
|
||||
#include "mongo/db/s/resharding/resharding_future_util.h"
|
||||
#include "mongo/db/s/resharding/resharding_metrics.h"
|
||||
#include "mongo/db/s/resharding/resharding_participant_cancel_state.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/sharding_environment/shard_id.h"
|
||||
#include "mongo/executor/scoped_task_executor.h"
|
||||
@ -402,7 +402,7 @@ private:
|
||||
|
||||
// Manages abort state and provides cancellation tokens for async operations. Initialized in
|
||||
// _initCancelState().
|
||||
std::unique_ptr<ReshardingParticipantCancelState> _cancelState;
|
||||
std::unique_ptr<primary_only_service_helpers::CancelState> _cancelState;
|
||||
|
||||
// The identifier associated to the recoverable critical section.
|
||||
const BSONObj _critSecReason;
|
||||
|
||||
@ -1,108 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/db/s/primary_only_service_helpers/cancel_state.h"
|
||||
#include "mongo/util/cancellation.h"
|
||||
|
||||
#include <boost/optional/optional.hpp>
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* Wraps CancelState to add change streams monitor (CSM) cancellation support. The CSM runs in
|
||||
* parallel with the main resharding state machine and can be cancelled independently.
|
||||
*
|
||||
* Cancellation Hierarchy:
|
||||
* stepdownToken (replica set primary stepped down)
|
||||
* └─> abortOrStepdownToken (user/coordinator requested abort OR stepdown)
|
||||
* └─> changeStreamsMonitorToken (monitor failure OR abort OR stepdown)
|
||||
*/
|
||||
class ReshardingParticipantCancelState {
|
||||
public:
|
||||
ReshardingParticipantCancelState(boost::optional<CancellationToken> stepdownToken = boost::none)
|
||||
: _cancelState(stepdownToken) {}
|
||||
|
||||
void attachStepdownToken(const CancellationToken& stepdownToken) {
|
||||
_cancelState.attachStepdownToken(stepdownToken);
|
||||
}
|
||||
|
||||
CancellationToken getStepdownToken() const {
|
||||
return _cancelState.getStepdownToken();
|
||||
}
|
||||
|
||||
CancellationToken getAbortOrStepdownToken() const {
|
||||
return _cancelState.getAbortOrStepdownToken();
|
||||
}
|
||||
|
||||
bool isSteppingDown() const {
|
||||
return _cancelState.isSteppingDown();
|
||||
}
|
||||
|
||||
bool isAbortedOrSteppingDown() const {
|
||||
return _cancelState.isAbortedOrSteppingDown();
|
||||
}
|
||||
|
||||
void abort() {
|
||||
_cancelState.abort();
|
||||
}
|
||||
|
||||
void createChangeStreamsMonitorAbortSource() {
|
||||
if (_changeStreamsMonitorAbortSource) {
|
||||
return;
|
||||
}
|
||||
_changeStreamsMonitorAbortSource =
|
||||
CancellationSource(_cancelState.getAbortOrStepdownToken());
|
||||
}
|
||||
|
||||
void cancelChangeStreamsMonitor() {
|
||||
tassert(10885201,
|
||||
"No change streams monitor abort source was created",
|
||||
_changeStreamsMonitorAbortSource);
|
||||
_changeStreamsMonitorAbortSource->cancel();
|
||||
}
|
||||
|
||||
CancellationToken getEffectiveCancellationToken() const {
|
||||
if (_changeStreamsMonitorAbortSource) {
|
||||
return _changeStreamsMonitorAbortSource->token();
|
||||
}
|
||||
return _cancelState.getAbortOrStepdownToken();
|
||||
}
|
||||
|
||||
bool isEffectiveTokenCancelled() const {
|
||||
return getEffectiveCancellationToken().isCanceled();
|
||||
}
|
||||
|
||||
private:
|
||||
primary_only_service_helpers::CancelState _cancelState;
|
||||
boost::optional<CancellationSource> _changeStreamsMonitorAbortSource;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
@ -1,110 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/s/resharding/resharding_participant_cancel_state.h"
|
||||
|
||||
#include "mongo/unittest/death_test.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
TEST(ReshardingParticipantCancelStateTest, DefaultConstruction) {
|
||||
ReshardingParticipantCancelState state;
|
||||
ASSERT_FALSE(state.isSteppingDown());
|
||||
ASSERT_FALSE(state.isAbortedOrSteppingDown());
|
||||
ASSERT_FALSE(state.isEffectiveTokenCancelled());
|
||||
}
|
||||
|
||||
TEST(ReshardingParticipantCancelStateTest, CreateChangeStreamsMonitorAbortSourceIsIdempotent) {
|
||||
ReshardingParticipantCancelState state;
|
||||
state.createChangeStreamsMonitorAbortSource();
|
||||
state.createChangeStreamsMonitorAbortSource();
|
||||
|
||||
// No crash means idempotency is satisfied.
|
||||
ASSERT_FALSE(state.isEffectiveTokenCancelled());
|
||||
}
|
||||
|
||||
TEST(ReshardingParticipantCancelStateTest, AbortCascadesToCSMToken) {
|
||||
ReshardingParticipantCancelState state;
|
||||
state.createChangeStreamsMonitorAbortSource();
|
||||
ASSERT_FALSE(state.isEffectiveTokenCancelled());
|
||||
|
||||
state.abort();
|
||||
ASSERT_TRUE(state.isAbortedOrSteppingDown());
|
||||
ASSERT_TRUE(state.isEffectiveTokenCancelled());
|
||||
}
|
||||
|
||||
TEST(ReshardingParticipantCancelStateTest, StepdownTokenPassedToConstructorCascadesToCSMToken) {
|
||||
CancellationSource stepdownSource;
|
||||
ReshardingParticipantCancelState state(stepdownSource.token());
|
||||
state.createChangeStreamsMonitorAbortSource();
|
||||
ASSERT_FALSE(state.isEffectiveTokenCancelled());
|
||||
|
||||
stepdownSource.cancel();
|
||||
ASSERT_TRUE(state.isSteppingDown());
|
||||
ASSERT_TRUE(state.isAbortedOrSteppingDown());
|
||||
ASSERT_TRUE(state.isEffectiveTokenCancelled());
|
||||
}
|
||||
|
||||
TEST(ReshardingParticipantCancelStateTest, StepdownCascadesToCSMToken) {
|
||||
CancellationSource stepdownSource;
|
||||
ReshardingParticipantCancelState state;
|
||||
state.attachStepdownToken(stepdownSource.token());
|
||||
state.createChangeStreamsMonitorAbortSource();
|
||||
ASSERT_FALSE(state.isEffectiveTokenCancelled());
|
||||
|
||||
stepdownSource.cancel();
|
||||
ASSERT_TRUE(state.isSteppingDown());
|
||||
ASSERT_TRUE(state.isAbortedOrSteppingDown());
|
||||
ASSERT_TRUE(state.isEffectiveTokenCancelled());
|
||||
}
|
||||
|
||||
TEST(ReshardingParticipantCancelStateTest, CancelChangeStreamsMonitorCancelsCSMToken) {
|
||||
ReshardingParticipantCancelState state;
|
||||
state.createChangeStreamsMonitorAbortSource();
|
||||
ASSERT_FALSE(state.isEffectiveTokenCancelled());
|
||||
|
||||
state.cancelChangeStreamsMonitor();
|
||||
ASSERT_TRUE(state.isEffectiveTokenCancelled());
|
||||
|
||||
// The abortOrStepdown token should not be cancelled.
|
||||
ASSERT_FALSE(state.isAbortedOrSteppingDown());
|
||||
ASSERT_FALSE(state.isSteppingDown());
|
||||
}
|
||||
|
||||
DEATH_TEST(ReshardingParticipantCancelStateDeathTest,
|
||||
CancelChangeStreamsMonitorWithoutSourceTasserts,
|
||||
"No change streams monitor abort source was created") {
|
||||
ReshardingParticipantCancelState state;
|
||||
state.cancelChangeStreamsMonitor();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
@ -193,7 +193,7 @@ bool inPotentialAbortScenario(const RecipientStateEnum& state) {
|
||||
}
|
||||
|
||||
auto makeCancelState(const RecipientShardContext& recipientCtx) {
|
||||
ReshardingParticipantCancelState cancelState;
|
||||
primary_only_service_helpers::CancelState cancelState;
|
||||
if (recipientCtx.getState() == RecipientStateEnum::kDone && recipientCtx.getAbortReason()) {
|
||||
cancelState.abort();
|
||||
}
|
||||
|
||||
@ -41,13 +41,13 @@
|
||||
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
|
||||
#include "mongo/db/repl/primary_only_service.h"
|
||||
#include "mongo/db/s/forwardable_operation_metadata.h"
|
||||
#include "mongo/db/s/primary_only_service_helpers/cancel_state.h"
|
||||
#include "mongo/db/s/resharding/recipient_document_gen.h"
|
||||
#include "mongo/db/s/resharding/resharding_change_streams_monitor.h"
|
||||
#include "mongo/db/s/resharding/resharding_data_replication.h"
|
||||
#include "mongo/db/s/resharding/resharding_future_util.h"
|
||||
#include "mongo/db/s/resharding/resharding_metrics.h"
|
||||
#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h"
|
||||
#include "mongo/db/s/resharding/resharding_participant_cancel_state.h"
|
||||
#include "mongo/db/s/resharding/resharding_recipient_promises.h"
|
||||
#include "mongo/db/s/resharding/resharding_util.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
@ -503,7 +503,7 @@ private:
|
||||
mutable std::mutex _mutex;
|
||||
|
||||
// Manages abort state and provides cancellation tokens for async operations.
|
||||
ReshardingParticipantCancelState _cancelState;
|
||||
primary_only_service_helpers::CancelState _cancelState;
|
||||
|
||||
std::unique_ptr<ReshardingDataReplicationInterface> _dataReplication;
|
||||
std::shared_ptr<ReshardingChangeStreamsMonitor> _changeStreamsMonitor;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user