SERVER-113622: Restrict cases for retrying transactions with startTransaction=true (#45989)
GitOrigin-RevId: c736dab2da47cc912660005fe7a6484ee0db7d0b
This commit is contained in:
parent
4bcc77299a
commit
739462e80d
@ -53,11 +53,11 @@ assert.commandWorked(
|
||||
|
||||
// Set the failCommand failpoint to make the next 'find' command fail once due to a failover.
|
||||
// Start a transaction & execute a find command.
|
||||
// It should succeed since the command will be retried.
|
||||
jsTest.log("Testing that mongos retries read commands with startTransaction=true on replication set failover.");
|
||||
// It should fail once due to the 'failCommand' failpoint and should not be retried.
|
||||
jsTest.log("Testing that mongos does not retry read commands with startTransaction=true on replication set failover.");
|
||||
assert.commandWorked(setCommandToFailOnce(primaryConnection, "find", kNs));
|
||||
|
||||
assert.commandWorked(
|
||||
assert.commandFailedWithCode(
|
||||
mongosDB.runCommand({
|
||||
find: kCollName,
|
||||
filter: kDoc0,
|
||||
@ -66,6 +66,7 @@ assert.commandWorked(
|
||||
stmtId: NumberInt(0),
|
||||
autocommit: false,
|
||||
}),
|
||||
ErrorCodes.doMongosRewrite(st.s0, ErrorCodes.HostUnreachable),
|
||||
);
|
||||
|
||||
// Set the failCommand failpoint to make the next 'update' command fail once due to a failover.
|
||||
|
||||
@ -167,8 +167,12 @@ std::vector<AsyncRequestsSender::Response> sendAuthenticatedVersionedCommandTarg
|
||||
originalOpts->exec, opCtx, shardId, readPref);
|
||||
auto shard =
|
||||
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
|
||||
Shard::RetryStrategy::RequestStartTransactionState startTxnState =
|
||||
originalOpts->cmd.getStartTransaction().value_or(false)
|
||||
? Shard::RetryStrategy::RequestStartTransactionState::kStartingTransaction
|
||||
: Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction;
|
||||
auto retryStrategy = std::make_shared<Shard::OwnerRetryStrategy>(
|
||||
shard, Shard::RetryPolicy::kIdempotentOrCursorInvalidated);
|
||||
shard, Shard::RetryPolicy::kIdempotentOrCursorInvalidated, startTxnState);
|
||||
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<CommandType>>(
|
||||
originalOpts->exec, cancelSource.token(), std::move(request), retryStrategy);
|
||||
futures.push_back(
|
||||
@ -209,8 +213,13 @@ std::vector<AsyncRequestsSender::Response> sendAuthenticatedCommandToShards(
|
||||
originalOpts->exec, opCtx, shardIds[i], readPref);
|
||||
auto shard = uassertStatusOK(
|
||||
Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardIds[i]));
|
||||
Shard::RetryStrategy::RequestStartTransactionState startTxnState =
|
||||
originalOpts->cmd.getStartTransaction().value_or(false)
|
||||
? Shard::RetryStrategy::RequestStartTransactionState::kStartingTransaction
|
||||
: Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction;
|
||||
auto retryStrategy = std::make_shared<Shard::OwnerRetryStrategy>(
|
||||
shard, Shard::RetryPolicy::kIdempotentOrCursorInvalidated);
|
||||
shard, Shard::RetryPolicy::kIdempotentOrCursorInvalidated, startTxnState);
|
||||
|
||||
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<CommandType>>(
|
||||
originalOpts->exec, cancelSource.token(), originalOpts->cmd, retryStrategy);
|
||||
if (shardVersions) {
|
||||
|
||||
@ -109,10 +109,15 @@ void writeToLocalShard(OperationContext* opCtx,
|
||||
invariant(shState->enabled());
|
||||
auto shardId = shState->shardId();
|
||||
auto shardState = ShardSharedStateCache::get(opCtx).getShardState(shardId);
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
cmdObj.getField("startTransaction").booleanSafe()
|
||||
? Shard::RetryStrategy::RequestStartTransactionState::kStartingTransaction
|
||||
: Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction;
|
||||
|
||||
Shard::RetryStrategy retryStrategy{ConnectionString::ConnectionType::kLocal,
|
||||
*shardState,
|
||||
Shard::RetryPolicy::kStrictlyNotIdempotent};
|
||||
Shard::RetryPolicy::kStrictlyNotIdempotent,
|
||||
isStartTransaction};
|
||||
|
||||
uassertStatusOK(runWithRetryStrategy(
|
||||
opCtx, retryStrategy, [&](const TargetingMetadata&) -> RetryStrategy::Result<BSONObj> {
|
||||
|
||||
@ -53,15 +53,40 @@
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
auto makeRetryCriteriaForShard(const Shard& shard, Shard::RetryPolicy retryPolicy)
|
||||
-> AdaptiveRetryStrategy::RetryCriteria {
|
||||
return [retryPolicy, shard = &shard](Status s, std::span<const std::string> errorLabels) {
|
||||
return shard->isRetriableError(s, errorLabels, retryPolicy);
|
||||
};
|
||||
Shard::RetryPolicy getPolicyForStartTransaction(
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction) {
|
||||
// If startTransaction is true, it is not safe to retry unless the response includes a
|
||||
// retryable error label, so we demote the policy (unless it is already kNoRetry). If the error
|
||||
// was due to an unclean shutdown, the transaction may not be reaped on participant shards,
|
||||
// which can cause the retry to fail due to readConcern conflicts with the stale metadata on the
|
||||
// participants.
|
||||
if (retryPolicy != Shard::RetryPolicy::kNoRetry &&
|
||||
isStartTransaction ==
|
||||
Shard::RetryStrategy::RequestStartTransactionState::kStartingTransaction) {
|
||||
return Shard::RetryPolicy::kStrictlyNotIdempotent;
|
||||
} else {
|
||||
return retryPolicy;
|
||||
}
|
||||
}
|
||||
|
||||
auto makeRetryCriteriaForConnectionType(ConnectionString::ConnectionType connectionType,
|
||||
Shard::RetryPolicy retryPolicy)
|
||||
auto makeRetryCriteriaForShard(
|
||||
const Shard& shard,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction)
|
||||
-> AdaptiveRetryStrategy::RetryCriteria {
|
||||
Shard::RetryPolicy validatedRetryPolicy =
|
||||
getPolicyForStartTransaction(retryPolicy, isStartTransaction);
|
||||
return
|
||||
[validatedRetryPolicy, shard = &shard](Status s, std::span<const std::string> errorLabels) {
|
||||
return shard->isRetriableError(s, errorLabels, validatedRetryPolicy);
|
||||
};
|
||||
}
|
||||
|
||||
auto makeRetryCriteriaForConnectionType(
|
||||
ConnectionString::ConnectionType connectionType,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction)
|
||||
-> AdaptiveRetryStrategy::RetryCriteria {
|
||||
tassert(10944501,
|
||||
str::stream() << "Unexpected ConnectionString type "
|
||||
@ -69,14 +94,17 @@ auto makeRetryCriteriaForConnectionType(ConnectionString::ConnectionType connect
|
||||
connectionType == ConnectionString::ConnectionType::kLocal ||
|
||||
connectionType == ConnectionString::ConnectionType::kReplicaSet);
|
||||
|
||||
Shard::RetryPolicy validatedRetryPolicy =
|
||||
getPolicyForStartTransaction(retryPolicy, isStartTransaction);
|
||||
|
||||
switch (connectionType) {
|
||||
case ConnectionString::ConnectionType::kLocal:
|
||||
return [retryPolicy](Status s, std::span<const std::string> errorLabels) {
|
||||
return Shard::localIsRetriableError(s, errorLabels, retryPolicy);
|
||||
return [validatedRetryPolicy](Status s, std::span<const std::string> errorLabels) {
|
||||
return Shard::localIsRetriableError(s, errorLabels, validatedRetryPolicy);
|
||||
};
|
||||
case ConnectionString::ConnectionType::kReplicaSet:
|
||||
return [retryPolicy](Status s, std::span<const std::string> errorLabels) {
|
||||
return Shard::remoteIsRetriableError(s, errorLabels, retryPolicy);
|
||||
return [validatedRetryPolicy](Status s, std::span<const std::string> errorLabels) {
|
||||
return Shard::remoteIsRetriableError(s, errorLabels, validatedRetryPolicy);
|
||||
};
|
||||
default:
|
||||
MONGO_UNREACHABLE;
|
||||
@ -134,32 +162,33 @@ StatusWith<Shard::CommandResponse> runCommandWithRetryStrategy(Interruptible* in
|
||||
|
||||
} // namespace
|
||||
|
||||
Shard::RetryStrategy::RetryStrategy(const Shard& shard, Shard::RetryPolicy retryPolicy)
|
||||
: RetryStrategy{makeRetryCriteriaForShard(shard, retryPolicy),
|
||||
Shard::RetryStrategy::RetryStrategy(
|
||||
const Shard& shard,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction)
|
||||
: RetryStrategy{makeRetryCriteriaForShard(shard, retryPolicy, isStartTransaction),
|
||||
DefaultRetryStrategy::getRetryParametersFromServerParameters(),
|
||||
*shard._sharedState} {}
|
||||
|
||||
Shard::RetryStrategy::RetryStrategy(const Shard& shard,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
std::int32_t maxRetryAttempts)
|
||||
: RetryStrategy{makeRetryCriteriaForShard(shard, retryPolicy),
|
||||
Shard::RetryStrategy::RetryStrategy(
|
||||
const Shard& shard,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
std::int32_t maxRetryAttempts,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction)
|
||||
: RetryStrategy{makeRetryCriteriaForShard(shard, retryPolicy, isStartTransaction),
|
||||
backoffFromMaxRetryAttempts(maxRetryAttempts),
|
||||
*shard._sharedState} {}
|
||||
|
||||
Shard::RetryStrategy::RetryStrategy(ConnectionString::ConnectionType connectionType,
|
||||
ShardSharedStateCache::State& state,
|
||||
Shard::RetryPolicy retryPolicy)
|
||||
: RetryStrategy{makeRetryCriteriaForConnectionType(connectionType, retryPolicy),
|
||||
DefaultRetryStrategy::getRetryParametersFromServerParameters(),
|
||||
state} {}
|
||||
Shard::RetryStrategy::RetryStrategy(
|
||||
ConnectionString::ConnectionType connectionType,
|
||||
ShardSharedStateCache::State& state,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction)
|
||||
: RetryStrategy{
|
||||
makeRetryCriteriaForConnectionType(connectionType, retryPolicy, isStartTransaction),
|
||||
DefaultRetryStrategy::getRetryParametersFromServerParameters(),
|
||||
state} {}
|
||||
|
||||
Shard::RetryStrategy::RetryStrategy(ConnectionString::ConnectionType connectionType,
|
||||
ShardSharedStateCache::State& state,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
std::int32_t maxRetryAttempts)
|
||||
: RetryStrategy{makeRetryCriteriaForConnectionType(connectionType, retryPolicy),
|
||||
backoffFromMaxRetryAttempts(maxRetryAttempts),
|
||||
state} {}
|
||||
|
||||
Shard::RetryStrategy::RetryStrategy(AdaptiveRetryStrategy::RetryCriteria retryCriteria,
|
||||
AdaptiveRetryStrategy::RetryParameters parameters,
|
||||
@ -446,8 +475,10 @@ StatusWith<Shard::CommandResponse> Shard::_runCommandImpl(OperationContext* opCt
|
||||
Milliseconds maxTimeMSOverride,
|
||||
RetryPolicy retryPolicy,
|
||||
std::int32_t maxRetryAttempt) {
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
Shard::RetryStrategy::extractRequestTransactionState(cmdObj);
|
||||
auto retryStrategy = RetryStrategyWithFailureRetryHook{
|
||||
RetryStrategy{*this, retryPolicy, maxRetryAttempt}, [&](Status status) {
|
||||
RetryStrategy{*this, retryPolicy, maxRetryAttempt, isStartTransaction}, [&](Status status) {
|
||||
LOGV2(22720,
|
||||
"Command failed with a retryable error and will be retried",
|
||||
"command"_attr = redact(cmdObj),
|
||||
@ -468,7 +499,9 @@ StatusWith<Shard::QueryResponse> Shard::runExhaustiveCursorCommand(
|
||||
const DatabaseName& dbName,
|
||||
const BSONObj& cmdObj,
|
||||
Milliseconds maxTimeMSOverride) {
|
||||
RetryStrategy retryStrategy{*this, RetryPolicy::kIdempotent};
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
Shard::RetryStrategy::extractRequestTransactionState(cmdObj);
|
||||
RetryStrategy retryStrategy{*this, RetryPolicy::kIdempotent, isStartTransaction};
|
||||
|
||||
return runWithRetryStrategy(
|
||||
opCtx, retryStrategy, [&](const TargetingMetadata& targetingMetadata) {
|
||||
@ -488,8 +521,9 @@ StatusWith<Shard::QueryResponse> Shard::exhaustiveFindOnConfig(
|
||||
const boost::optional<BSONObj>& hint) {
|
||||
// Do not allow exhaustive finds to be run against regular shards.
|
||||
invariant(isConfig());
|
||||
|
||||
RetryStrategy retryStrategy{*this, RetryPolicy::kIdempotent};
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
Shard::RetryStrategy::extractRequestTransactionState(query);
|
||||
RetryStrategy retryStrategy{*this, RetryPolicy::kIdempotent, isStartTransaction};
|
||||
return runWithRetryStrategy(
|
||||
opCtx, retryStrategy, [&](const TargetingMetadata& targetingMetadata) {
|
||||
return _exhaustiveFindOnConfig(opCtx,
|
||||
@ -511,7 +545,10 @@ Status Shard::runAggregation(
|
||||
std::function<bool(const std::vector<BSONObj>& batch,
|
||||
const boost::optional<BSONObj>& postBatchResumeToken)> onBatch,
|
||||
std::function<void(const Status&)> onRetry) {
|
||||
RetryStrategyWithFailureRetryHook retryStrategy{RetryStrategy{*this, retryPolicy}, onRetry};
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
Shard::RetryStrategy::extractRequestTransactionState(aggRequest.getGenericArguments());
|
||||
RetryStrategyWithFailureRetryHook retryStrategy{
|
||||
RetryStrategy{*this, retryPolicy, isStartTransaction}, onRetry};
|
||||
|
||||
auto status =
|
||||
runWithRetryStrategy(opCtx, retryStrategy, [&](const TargetingMetadata& targetingMetadata) {
|
||||
@ -526,8 +563,10 @@ BatchedCommandResponse Shard::_submitBatchWriteCommand(OperationContext* opCtx,
|
||||
const DatabaseName& dbName,
|
||||
Milliseconds maxTimeMS,
|
||||
RetryPolicy retryPolicy) {
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
Shard::RetryStrategy::extractRequestTransactionState(serialisedBatchRequest);
|
||||
auto retryStrategy = RetryStrategyWithFailureRetryHook{
|
||||
RetryStrategy{*this, retryPolicy}, [&](Status status) {
|
||||
RetryStrategy{*this, retryPolicy, isStartTransaction}, [&](Status status) {
|
||||
LOGV2_DEBUG(22721,
|
||||
2,
|
||||
"Batch write command failed with retryable error and will be retried",
|
||||
@ -591,11 +630,12 @@ StatusWith<std::vector<BSONObj>> Shard::runAggregationWithResult(
|
||||
std::make_move_iterator(batch.end()));
|
||||
return true;
|
||||
};
|
||||
|
||||
RetryStrategyWithFailureRetryHook retryStrategy{RetryStrategy{*this, retryPolicy},
|
||||
[&](Status s) {
|
||||
aggResult.clear();
|
||||
}};
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
Shard::RetryStrategy::extractRequestTransactionState(aggRequest.getGenericArguments());
|
||||
RetryStrategyWithFailureRetryHook retryStrategy{
|
||||
RetryStrategy{*this, retryPolicy, isStartTransaction}, [&](Status s) {
|
||||
aggResult.clear();
|
||||
}};
|
||||
|
||||
auto status =
|
||||
runWithRetryStrategy(opCtx, retryStrategy, [&](const TargetingMetadata& targetingMetadata) {
|
||||
|
||||
@ -50,6 +50,7 @@
|
||||
#include "mongo/db/sharding_environment/shard_id.h"
|
||||
#include "mongo/db/sharding_environment/shard_shared_state_cache.h"
|
||||
#include "mongo/executor/remote_command_response.h"
|
||||
#include "mongo/idl/command_generic_argument.h"
|
||||
#include "mongo/s/write_ops/batched_command_request.h"
|
||||
#include "mongo/s/write_ops/batched_command_response.h"
|
||||
#include "mongo/util/duration.h"
|
||||
@ -154,12 +155,31 @@ public:
|
||||
*/
|
||||
class RetryStrategy final : public mongo::RetryStrategy {
|
||||
public:
|
||||
/**
|
||||
* Enum indicating whether the request associated with this RetryStrategy is starting a
|
||||
* transaction or not. Transaction-initiating requests can only be retried under certain
|
||||
* circumstances, regardless of the provided RetryPolicy.
|
||||
*/
|
||||
enum class RequestStartTransactionState {
|
||||
/**
|
||||
* This request is either not initating a transaction or is not part of a transaction
|
||||
* altogether.
|
||||
*/
|
||||
kNotStartingTransaction,
|
||||
/**
|
||||
* This request is initiating a new transaction (i.e. includes startTransaction: true).
|
||||
*/
|
||||
kStartingTransaction
|
||||
};
|
||||
|
||||
/**
|
||||
* Constructs a RetryStrategy object given a shard object and a retryPolicy.
|
||||
* Note that, if this constructor is used, the returned instance of this class cannot
|
||||
* outlive the shard passed in as it internally holds a pointer to the shard.
|
||||
*/
|
||||
RetryStrategy(const Shard& shard, Shard::RetryPolicy retryPolicy);
|
||||
RetryStrategy(const Shard& shard,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction);
|
||||
|
||||
/**
|
||||
* Same as the previous constructor but will use a custom 'maxRetryAttempts' parameter
|
||||
@ -167,25 +187,19 @@ public:
|
||||
*/
|
||||
RetryStrategy(const Shard& shard,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
std::int32_t maxRetryAttempts);
|
||||
std::int32_t maxRetryAttempts,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction);
|
||||
|
||||
/**
|
||||
* Constructs a RetryStrategy object given the connectionType and the shardState of a shard.
|
||||
* This is useful if we don't want or can't provide a shard object, mainly for performance
|
||||
* reasons.
|
||||
*/
|
||||
RetryStrategy(ConnectionString::ConnectionType connectionType,
|
||||
ShardSharedStateCache::State& shardState,
|
||||
Shard::RetryPolicy retryPolicy);
|
||||
|
||||
/**
|
||||
* Same as the previous constructor but will use a custom 'maxRetryAttempts' parameter
|
||||
* instead of the default one.
|
||||
*/
|
||||
RetryStrategy(ConnectionString::ConnectionType connectionType,
|
||||
ShardSharedStateCache::State& shardState,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
std::int32_t maxRetryAttempts);
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction);
|
||||
|
||||
|
||||
bool recordFailureAndEvaluateShouldRetry(Status s,
|
||||
const boost::optional<HostAndPort>& target,
|
||||
@ -202,6 +216,20 @@ public:
|
||||
return _underlyingStrategy.getTargetingMetadata();
|
||||
}
|
||||
|
||||
static Shard::RetryStrategy::RequestStartTransactionState extractRequestTransactionState(
|
||||
BSONObj request) {
|
||||
return request.getField("startTransaction").booleanSafe()
|
||||
? Shard::RetryStrategy::RequestStartTransactionState::kStartingTransaction
|
||||
: Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction;
|
||||
}
|
||||
|
||||
static Shard::RetryStrategy::RequestStartTransactionState extractRequestTransactionState(
|
||||
const GenericArguments& args) {
|
||||
return args.getStartTransaction().value_or(false)
|
||||
? Shard::RetryStrategy::RequestStartTransactionState::kStartingTransaction
|
||||
: Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction;
|
||||
}
|
||||
|
||||
private:
|
||||
void _recordOperationAttempted();
|
||||
void _recordOperationNotOverloaded();
|
||||
@ -226,28 +254,26 @@ public:
|
||||
*/
|
||||
class OwnerRetryStrategy final : public mongo::RetryStrategy {
|
||||
public:
|
||||
OwnerRetryStrategy(std::shared_ptr<Shard> shard, Shard::RetryPolicy retryPolicy)
|
||||
: _underlyingStrategy{*shard, retryPolicy},
|
||||
OwnerRetryStrategy(std::shared_ptr<Shard> shard,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction)
|
||||
: _underlyingStrategy{*shard, retryPolicy, isStartTransaction},
|
||||
_stateOwner{shard->_sharedState},
|
||||
_shardOwner{std::move(shard)} {}
|
||||
|
||||
OwnerRetryStrategy(std::shared_ptr<Shard> shard,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
std::int32_t maxRetryAttempts)
|
||||
: _underlyingStrategy{*shard, retryPolicy, maxRetryAttempts},
|
||||
std::int32_t maxRetryAttempts,
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction)
|
||||
: _underlyingStrategy{*shard, retryPolicy, maxRetryAttempts, isStartTransaction},
|
||||
_stateOwner{shard->_sharedState},
|
||||
_shardOwner{std::move(shard)} {}
|
||||
|
||||
OwnerRetryStrategy(ConnectionString::ConnectionType connectionType,
|
||||
std::shared_ptr<ShardSharedStateCache::State> shardState,
|
||||
Shard::RetryPolicy retryPolicy)
|
||||
: _underlyingStrategy{connectionType, *shardState, retryPolicy},
|
||||
_stateOwner{std::move(shardState)} {}
|
||||
OwnerRetryStrategy(ConnectionString::ConnectionType connectionType,
|
||||
std::shared_ptr<ShardSharedStateCache::State> shardState,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
std::int32_t maxRetryAttempts)
|
||||
: _underlyingStrategy{connectionType, *shardState, retryPolicy, maxRetryAttempts},
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction)
|
||||
: _underlyingStrategy{connectionType, *shardState, retryPolicy, isStartTransaction},
|
||||
_stateOwner{std::move(shardState)} {}
|
||||
|
||||
bool recordFailureAndEvaluateShouldRetry(
|
||||
|
||||
@ -189,7 +189,10 @@ TEST_F(ShardRemoteTest, GridSetRetryBudgetCapacityServerParameter) {
|
||||
|
||||
auto shard = uassertStatusOK(shardRegistry()->getShard(operationContext(), firstShard));
|
||||
auto& retryBudget = shard->getRetryBudget_forTest();
|
||||
auto retryStrategy = Shard::RetryStrategy{*shard, Shard::RetryPolicy::kIdempotent};
|
||||
auto retryStrategy = Shard::RetryStrategy{
|
||||
*shard,
|
||||
Shard::RetryPolicy::kIdempotent,
|
||||
Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction};
|
||||
|
||||
auto initialBalance = retryBudget.getBalance_forTest();
|
||||
|
||||
@ -207,7 +210,10 @@ TEST_F(ShardRemoteTest, GridSetRetryBudgetReturnRateServerParameter) {
|
||||
|
||||
auto shard = uassertStatusOK(shardRegistry()->getShard(operationContext(), firstShard));
|
||||
auto& retryBudget = shard->getRetryBudget_forTest();
|
||||
auto retryStrategy = Shard::RetryStrategy{*shard, Shard::RetryPolicy::kIdempotent};
|
||||
auto retryStrategy = Shard::RetryStrategy{
|
||||
*shard,
|
||||
Shard::RetryPolicy::kIdempotent,
|
||||
Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction};
|
||||
|
||||
auto initialBalance = retryBudget.getBalance_forTest();
|
||||
auto error = Status(ErrorCodes::PrimarySteppedDown, "Interrupted at shutdown");
|
||||
@ -238,7 +244,10 @@ TEST_F(ShardRemoteTest, ShardRetryStrategy) {
|
||||
auto& [retryBudget, stats] = *shardState;
|
||||
|
||||
auto shard = uassertStatusOK(shardRegistry()->getShard(operationContext(), firstShard));
|
||||
auto retryStrategy = Shard::RetryStrategy{*shard, Shard::RetryPolicy::kIdempotent};
|
||||
auto retryStrategy = Shard::RetryStrategy{
|
||||
*shard,
|
||||
Shard::RetryPolicy::kIdempotent,
|
||||
Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction};
|
||||
|
||||
auto initialBalance = retryBudget.getBalance_forTest();
|
||||
auto error = Status(ErrorCodes::PrimarySteppedDown, "Interrupted at shutdown");
|
||||
|
||||
@ -303,7 +303,11 @@ auto AsyncRequestsSender::RemoteData::scheduleRequest() -> SemiFuture<RemoteComm
|
||||
.thenRunOn(*_ars->_subBaton)
|
||||
.then([this](const auto& shard) -> SemiFuture<HostAndPort> {
|
||||
if (!_retryStrategy) {
|
||||
_retryStrategy.emplace(shard, _ars->_retryPolicy);
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
_cmdObj.getField("startTransaction").booleanSafe()
|
||||
? Shard::RetryStrategy::RequestStartTransactionState::kStartingTransaction
|
||||
: Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction;
|
||||
_retryStrategy.emplace(shard, _ars->_retryPolicy, isStartTransaction);
|
||||
}
|
||||
|
||||
if (!_designatedHostAndPort.empty()) {
|
||||
|
||||
@ -145,9 +145,16 @@ Shard::OwnerRetryStrategy buildRetryStrategy(OperationContext* opCtx, const Shar
|
||||
return ConnectionString::ConnectionType::kReplicaSet;
|
||||
});
|
||||
|
||||
BSONObj cmdObj = CurOp::get(opCtx)->originatingCommand();
|
||||
Shard::RetryStrategy::RequestStartTransactionState isStartTransaction =
|
||||
cmdObj.getField("startTransaction").booleanSafe()
|
||||
? Shard::RetryStrategy::RequestStartTransactionState::kStartingTransaction
|
||||
: Shard::RetryStrategy::RequestStartTransactionState::kNotStartingTransaction;
|
||||
|
||||
return Shard::OwnerRetryStrategy(connType,
|
||||
ShardSharedStateCache::get(opCtx).getShardState(shardId),
|
||||
Shard::RetryPolicy::kStrictlyNotIdempotent);
|
||||
Shard::RetryPolicy::kStrictlyNotIdempotent,
|
||||
isStartTransaction);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Loading…
Reference in New Issue
Block a user