SERVER-57519 Make ARS use causally consistent ShardRegistry::getShard() function
(cherry picked from commit 527dfe85a1)
This commit is contained in:
parent
a1189eadd8
commit
e6d0a3ee2b
@ -193,15 +193,6 @@ std::vector<AsyncRequestsSender::Response> sendAuthenticatedCommandToShards(
|
||||
const BSONObj& command,
|
||||
const std::vector<ShardId>& shardIds,
|
||||
const std::shared_ptr<executor::TaskExecutor>& executor) {
|
||||
// TODO SERVER-57519: remove the following scope
|
||||
{
|
||||
// Ensure ShardRegistry is initialized before using the AsyncRequestsSender that relies on
|
||||
// unsafe functions (SERVER-57280)
|
||||
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
|
||||
if (!shardRegistry->isUp()) {
|
||||
shardRegistry->reload(opCtx);
|
||||
}
|
||||
}
|
||||
|
||||
// The AsyncRequestsSender ignore impersonation metadata so we need to manually attach them to
|
||||
// the command
|
||||
|
||||
@ -170,9 +170,10 @@ AsyncRequestsSender::RemoteData::RemoteData(AsyncRequestsSender* ars,
|
||||
BSONObj cmdObj)
|
||||
: _ars(ars), _shardId(std::move(shardId)), _cmdObj(std::move(cmdObj)) {}
|
||||
|
||||
std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() {
|
||||
// TODO: Pass down an OperationContext* to use here.
|
||||
return Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(_shardId);
|
||||
SemiFuture<std::shared_ptr<Shard>> AsyncRequestsSender::RemoteData::getShard() noexcept {
|
||||
return Grid::get(getGlobalServiceContext())
|
||||
->shardRegistry()
|
||||
->getShard(*_ars->_subBaton, _shardId);
|
||||
}
|
||||
|
||||
void AsyncRequestsSender::RemoteData::executeRequest() {
|
||||
@ -192,7 +193,12 @@ void AsyncRequestsSender::RemoteData::executeRequest() {
|
||||
|
||||
auto AsyncRequestsSender::RemoteData::scheduleRequest()
|
||||
-> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
|
||||
return resolveShardIdToHostAndPorts(_ars->_readPreference)
|
||||
return getShard()
|
||||
.thenRunOn(*_ars->_subBaton)
|
||||
.then([this](auto&& shard) {
|
||||
return shard->getTargeter()->findHosts(_ars->_readPreference,
|
||||
CancellationToken::uncancelable());
|
||||
})
|
||||
.thenRunOn(*_ars->_subBaton)
|
||||
.then([this](auto&& hostAndPorts) {
|
||||
_shardHostAndPort.emplace(hostAndPorts.front());
|
||||
@ -202,17 +208,6 @@ auto AsyncRequestsSender::RemoteData::scheduleRequest()
|
||||
.semi();
|
||||
}
|
||||
|
||||
SemiFuture<std::vector<HostAndPort>> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPorts(
|
||||
const ReadPreferenceSetting& readPref) {
|
||||
const auto shard = getShard();
|
||||
if (!shard) {
|
||||
return Status(ErrorCodes::ShardNotFound,
|
||||
str::stream() << "Could not find shard " << _shardId);
|
||||
}
|
||||
|
||||
return shard->getTargeter()->findHosts(readPref, CancellationToken::uncancelable());
|
||||
}
|
||||
|
||||
auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector<HostAndPort>&& hostAndPorts)
|
||||
-> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
|
||||
hangBeforeSchedulingRemoteCommand.executeIf(
|
||||
@ -278,43 +273,47 @@ auto AsyncRequestsSender::RemoteData::handleResponse(RemoteCommandOnAnyCallbackA
|
||||
}
|
||||
|
||||
// There was an error with either the response or the command.
|
||||
auto shard = getShard();
|
||||
if (!shard) {
|
||||
uasserted(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << _shardId);
|
||||
} else {
|
||||
std::vector<HostAndPort> failedTargets;
|
||||
return getShard()
|
||||
.thenRunOn(*_ars->_subBaton)
|
||||
.then([this, status = std::move(status), rcr = std::move(rcr)](
|
||||
std::shared_ptr<mongo::Shard>&& shard) {
|
||||
std::vector<HostAndPort> failedTargets;
|
||||
|
||||
if (rcr.response.target) {
|
||||
failedTargets = {*rcr.response.target};
|
||||
} else {
|
||||
failedTargets = rcr.request.target;
|
||||
}
|
||||
if (rcr.response.target) {
|
||||
failedTargets = {*rcr.response.target};
|
||||
} else {
|
||||
failedTargets = rcr.request.target;
|
||||
}
|
||||
|
||||
shard->updateReplSetMonitor(failedTargets.front(), status);
|
||||
bool isStartingTransaction = _cmdObj.getField("startTransaction").booleanSafe();
|
||||
if (!_ars->_stopRetrying && shard->isRetriableError(status.code(), _ars->_retryPolicy) &&
|
||||
_retryCount < kMaxNumFailedHostRetryAttempts && !isStartingTransaction) {
|
||||
shard->updateReplSetMonitor(failedTargets.front(), status);
|
||||
bool isStartingTransaction = _cmdObj.getField("startTransaction").booleanSafe();
|
||||
if (!_ars->_stopRetrying &&
|
||||
shard->isRetriableError(status.code(), _ars->_retryPolicy) &&
|
||||
_retryCount < kMaxNumFailedHostRetryAttempts && !isStartingTransaction) {
|
||||
|
||||
LOGV2_DEBUG(4615637,
|
||||
1,
|
||||
"Command to remote {shardId} for hosts {hosts} failed with retryable error "
|
||||
"{error} and will be retried",
|
||||
"Command to remote shard failed with retryable error and will be retried",
|
||||
"shardId"_attr = _shardId,
|
||||
"hosts"_attr = failedTargets,
|
||||
"error"_attr = redact(status));
|
||||
++_retryCount;
|
||||
_shardHostAndPort.reset();
|
||||
// retry through recursion
|
||||
return scheduleRequest();
|
||||
}
|
||||
}
|
||||
LOGV2_DEBUG(
|
||||
4615637,
|
||||
1,
|
||||
"Command to remote {shardId} for hosts {hosts} failed with retryable error "
|
||||
"{error} and will be retried",
|
||||
"Command to remote shard failed with retryable error and will be retried",
|
||||
"shardId"_attr = _shardId,
|
||||
"hosts"_attr = failedTargets,
|
||||
"error"_attr = redact(status));
|
||||
++_retryCount;
|
||||
_shardHostAndPort.reset();
|
||||
// retry through recursion
|
||||
return scheduleRequest();
|
||||
}
|
||||
|
||||
// Status' in the response.status field that aren't retried get converted to top level errors
|
||||
uassertStatusOK(rcr.response.status);
|
||||
// Status' in the response.status field that aren't retried get converted to top level
|
||||
// errors
|
||||
uassertStatusOK(rcr.response.status);
|
||||
|
||||
// We're not okay (on the remote), but still not going to retry
|
||||
return std::move(rcr);
|
||||
// We're not okay (on the remote), but still not going to retry
|
||||
return Future<RemoteCommandOnAnyCallbackArgs>::makeReady(std::move(rcr)).semi();
|
||||
})
|
||||
.semi();
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -179,9 +179,15 @@ private:
|
||||
RemoteData(AsyncRequestsSender* ars, ShardId shardId, BSONObj cmdObj);
|
||||
|
||||
/**
|
||||
* Returns the Shard object associated with this remote.
|
||||
* Returns a SemiFuture containing a shard object associated with this remote.
|
||||
*
|
||||
* This will return a SemiFuture with a ShardNotFound error status in case the shard is not
|
||||
* found.
|
||||
*
|
||||
* Additionally this call can trigger a refresh of the ShardRegistry so it could possibly
|
||||
* return other network error status related to the refresh.
|
||||
*/
|
||||
std::shared_ptr<Shard> getShard();
|
||||
SemiFuture<std::shared_ptr<Shard>> getShard() noexcept;
|
||||
|
||||
/**
|
||||
* Returns true if we've already queued a response from the remote.
|
||||
|
||||
@ -207,9 +207,9 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
|
||||
|
||||
AsyncTry([this] {
|
||||
LOGV2_DEBUG(22726, 1, "Reloading shardRegistry");
|
||||
return _reloadInternal();
|
||||
return _reloadAsyncNoRetry();
|
||||
})
|
||||
.until([](auto sw) {
|
||||
.until([](auto&& sw) {
|
||||
if (!sw.isOK()) {
|
||||
LOGV2(22727,
|
||||
"Error running periodic reload of shard registry",
|
||||
@ -221,7 +221,7 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
|
||||
})
|
||||
.withDelayBetweenIterations(kRefreshPeriod) // This call is optional.
|
||||
.on(_executor, CancellationToken::uncancelable())
|
||||
.getAsync([](auto sw) {
|
||||
.getAsync([](auto&& sw) {
|
||||
LOGV2_DEBUG(22725,
|
||||
1,
|
||||
"Exiting periodic shard registry reloader",
|
||||
@ -284,6 +284,49 @@ StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opC
|
||||
return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
|
||||
}
|
||||
|
||||
SemiFuture<std::shared_ptr<Shard>> ShardRegistry::getShard(ExecutorPtr executor,
|
||||
const ShardId& shardId) noexcept {
|
||||
|
||||
// Fetch the shard registry data associated to the latest known topology time
|
||||
return _getDataAsync()
|
||||
.thenRunOn(executor)
|
||||
.then([this, executor, shardId](auto&& cachedData) {
|
||||
// First check if this is a non config shard lookup
|
||||
if (auto shard = cachedData->findShard(shardId)) {
|
||||
return SemiFuture<std::shared_ptr<Shard>>::makeReady(std::move(shard));
|
||||
}
|
||||
|
||||
// then check if this is a config shard (this call is blocking in any case)
|
||||
{
|
||||
stdx::lock_guard<Latch> lk(_mutex);
|
||||
if (auto shard = _configShardData.findShard(shardId)) {
|
||||
return SemiFuture<std::shared_ptr<Shard>>::makeReady(std::move(shard));
|
||||
}
|
||||
}
|
||||
|
||||
// If the shard was not found, force reload the shard regitry data and try again.
|
||||
//
|
||||
// This is to cover the following scenario:
|
||||
// 1. Primary of the replicaset fetch the list of shards and store it on disk
|
||||
// 2. Primary crash before the latest VectorClock topology time is majority written to
|
||||
// disk
|
||||
// 3. A new primary with a stale ShardRegistry is elected and read the set of shards
|
||||
// from disk and calls ShardRegistry::getShard
|
||||
|
||||
return _reloadAsync()
|
||||
.thenRunOn(executor)
|
||||
.then([this, executor, shardId](auto&& cachedData) -> std::shared_ptr<Shard> {
|
||||
auto shard = cachedData->findShard(shardId);
|
||||
uassert(ErrorCodes::ShardNotFound,
|
||||
str::stream() << "Shard " << shardId << " not found",
|
||||
shard);
|
||||
return shard;
|
||||
})
|
||||
.semi();
|
||||
})
|
||||
.semi();
|
||||
}
|
||||
|
||||
std::vector<ShardId> ShardRegistry::getAllShardIds(OperationContext* opCtx) {
|
||||
auto shardIds = _getData(opCtx)->getAllShardIds();
|
||||
if (shardIds.empty()) {
|
||||
@ -399,23 +442,26 @@ void ShardRegistry::toBSON(BSONObjBuilder* result) const {
|
||||
}
|
||||
|
||||
void ShardRegistry::reload(OperationContext* opCtx) {
|
||||
_reloadAsync().get(opCtx);
|
||||
}
|
||||
|
||||
SharedSemiFuture<ShardRegistry::Cache::ValueHandle> ShardRegistry::_reloadAsync() {
|
||||
if (MONGO_unlikely(TestingProctor::instance().isEnabled())) {
|
||||
// Some unit tests don't support running the reload's AsyncTry on the fixed executor.
|
||||
_reloadInternal().get(opCtx);
|
||||
return _reloadAsyncNoRetry();
|
||||
} else {
|
||||
AsyncTry([=]() mutable { return _reloadInternal(); })
|
||||
return AsyncTry([=]() mutable { return _reloadAsyncNoRetry(); })
|
||||
.until([](auto sw) mutable {
|
||||
return sw.getStatus() != ErrorCodes::ReadConcernMajorityNotAvailableYet;
|
||||
})
|
||||
.withBackoffBetweenIterations(kExponentialBackoff)
|
||||
.on(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
|
||||
.on(Grid::get(getGlobalServiceContext())->getExecutorPool()->getFixedExecutor(),
|
||||
CancellationToken::uncancelable())
|
||||
.semi()
|
||||
.get(opCtx);
|
||||
.share();
|
||||
}
|
||||
}
|
||||
|
||||
SharedSemiFuture<ShardRegistry::Cache::ValueHandle> ShardRegistry::_reloadInternal() {
|
||||
SharedSemiFuture<ShardRegistry::Cache::ValueHandle> ShardRegistry::_reloadAsyncNoRetry() {
|
||||
// Make the next acquire do a lookup.
|
||||
auto value = _forceReloadIncrement.addAndFetch(1);
|
||||
LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value);
|
||||
|
||||
@ -239,6 +239,9 @@ public:
|
||||
*/
|
||||
StatusWith<std::shared_ptr<Shard>> getShard(OperationContext* opCtx, const ShardId& shardId);
|
||||
|
||||
SemiFuture<std::shared_ptr<Shard>> getShard(ExecutorPtr executor,
|
||||
const ShardId& shardId) noexcept;
|
||||
|
||||
/**
|
||||
* Returns a vector containing all known shard IDs.
|
||||
* The order of the elements is not guaranteed.
|
||||
@ -438,7 +441,8 @@ private:
|
||||
|
||||
void _initializeCacheIfNecessary() const;
|
||||
|
||||
SharedSemiFuture<Cache::ValueHandle> _reloadInternal();
|
||||
SharedSemiFuture<Cache::ValueHandle> _reloadAsync();
|
||||
SharedSemiFuture<Cache::ValueHandle> _reloadAsyncNoRetry();
|
||||
|
||||
/**
|
||||
* Factory to create shards. Never changed after startup so safe to access outside of _mutex.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user