SERVER-103706 Get rid of the stale errors handling on strategy.cpp (part 1) (#40386)
Co-authored-by: Tommaso Tocci <tommaso.tocci@mongodb.com> GitOrigin-RevId: 955200687cfd8ceb908088b8a8ffb9ff836fffb6
This commit is contained in:
parent
2649a7daf3
commit
f09b7a05f5
@ -69,14 +69,15 @@ export const $config = extendWorkload($baseConfig, function ($config, $super) {
|
||||
(err.message &&
|
||||
(err.message.includes("CommandFailed") ||
|
||||
err.message.includes("Documents in target range may still be in use") ||
|
||||
// This error will occur as a result of trying to move a chunk with a pre-refine
|
||||
// collection epoch.
|
||||
// This error will occur as a result of trying to move a chunk with a
|
||||
// pre-refine collection epoch.
|
||||
err.message.includes("collection may have been dropped") ||
|
||||
// This error will occur if a moveChunk command has been sent with chunk boundaries
|
||||
// that represent the pre-refine chunks, but the collection has already been changed
|
||||
// to possess the post-refine chunk boundaries.
|
||||
// This error will occur if a moveChunk command has been sent with chunk
|
||||
// boundaries that represent the pre-refine chunks, but the collection has
|
||||
// already been changed to possess the post-refine chunk boundaries.
|
||||
(err.message.includes("shard key bounds") &&
|
||||
err.message.includes("are not valid for shard key pattern"))))
|
||||
err.message.includes("are not valid for shard key pattern")) ||
|
||||
(err.message.includes("bound") && err.message.includes("is not valid for shard key pattern"))))
|
||||
);
|
||||
};
|
||||
|
||||
|
||||
@ -63,7 +63,7 @@ const sessionDB = session.getDatabase(dbName);
|
||||
sessionDB
|
||||
.getCollection(localColl)
|
||||
.aggregate([{$lookup: {from: foreignColl, localField: "x", foreignField: "_id", as: "result"}}]);
|
||||
}, [ErrorCodes.MigrationConflict, ErrorCodes.ShardCannotRefreshDueToLocksHeld]);
|
||||
}, [ErrorCodes.StaleConfig, ErrorCodes.MigrationConflict, ErrorCodes.ShardCannotRefreshDueToLocksHeld]);
|
||||
assert.contains("TransientTransactionError", err.errorLabels, tojson(err));
|
||||
|
||||
session.abortTransaction();
|
||||
|
||||
@ -225,19 +225,24 @@ public:
|
||||
|
||||
ClusterClientCursorGuard _establishCursorOnDbPrimary(OperationContext* opCtx,
|
||||
const NamespaceString& nss) {
|
||||
const CachedDatabaseInfo dbInfo =
|
||||
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.dbName()));
|
||||
|
||||
ShardsvrCheckMetadataConsistency shardsvrRequest{nss};
|
||||
shardsvrRequest.setDbName(nss.dbName());
|
||||
shardsvrRequest.setCommonFields(request().getCommonFields());
|
||||
shardsvrRequest.setCursor(request().getCursor());
|
||||
generic_argument_util::setDbVersionIfPresent(shardsvrRequest, dbInfo->getVersion());
|
||||
// Attach db and shard version;
|
||||
if (!dbInfo->getVersion().isFixed())
|
||||
shardsvrRequest.setShardVersion(ShardVersion::UNSHARDED());
|
||||
return _establishCursors(
|
||||
opCtx, nss, {{dbInfo->getPrimary(), shardsvrRequest.toBSON()}});
|
||||
sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), nss.dbName());
|
||||
return router.route(
|
||||
opCtx,
|
||||
Request::kCommandName,
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& dbInfo) {
|
||||
ShardsvrCheckMetadataConsistency shardsvrRequest{nss};
|
||||
shardsvrRequest.setDbName(nss.dbName());
|
||||
shardsvrRequest.setCommonFields(request().getCommonFields());
|
||||
shardsvrRequest.setCursor(request().getCursor());
|
||||
generic_argument_util::setDbVersionIfPresent(shardsvrRequest,
|
||||
dbInfo->getVersion());
|
||||
// Attach db and shard version;
|
||||
if (!dbInfo->getVersion().isFixed())
|
||||
shardsvrRequest.setShardVersion(ShardVersion::UNSHARDED());
|
||||
return _establishCursors(
|
||||
opCtx, nss, {{dbInfo->getPrimary(), shardsvrRequest.toBSON()}});
|
||||
});
|
||||
}
|
||||
|
||||
ClusterClientCursorGuard _establishCursors(
|
||||
|
||||
@ -109,6 +109,15 @@ void DBPrimaryRouter::_onException(OperationContext* opCtx, RoutingRetryInfo* re
|
||||
si->getDb() == _dbName);
|
||||
|
||||
catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted());
|
||||
} else if (s == ErrorCodes::StaleConfig) {
|
||||
// For some operations, like create, we need to check the ShardVersion to serialize with the
|
||||
// critical section. Hence, the shard may throw a StaleConfig even when the router didn't
|
||||
// attach a shardVersion. This StaleConfig must be handled by the shard itself, however the
|
||||
// retryable attemps on the shard side are just 1, so that error may bubble up until here.
|
||||
// TODO (SERVER-77402) Remove the StaleConfig retry for the DBPrimaryRouter once the
|
||||
// ShardRole loop increases the retryable attempts.
|
||||
auto si = s.extraInfo<StaleConfigInfo>();
|
||||
tassert(10370601, "StaleConfig must have extraInfo", si);
|
||||
} else {
|
||||
uassertStatusOK(s);
|
||||
}
|
||||
@ -174,9 +183,6 @@ void CollectionRouterCommon::_onException(OperationContext* opCtx,
|
||||
auto si = s.extraInfo<StaleConfigInfo>();
|
||||
tassert(6375904, "StaleConfig must have extraInfo", si);
|
||||
|
||||
if (!isNssInvolvedInRouting(si->getNss())) {
|
||||
uassertStatusOK(s);
|
||||
}
|
||||
|
||||
const bool isShardStale = [&]() {
|
||||
if (!si->getVersionWanted()) {
|
||||
@ -199,7 +205,41 @@ void CollectionRouterCommon::_onException(OperationContext* opCtx,
|
||||
uassertStatusOK(s);
|
||||
}
|
||||
|
||||
catalogCache->onStaleCollectionVersion(si->getNss(), si->getVersionWanted());
|
||||
const auto staleNs = si->getNss();
|
||||
|
||||
// When the shard is stale, the StaleConfig thrown by the shard should be handled by the
|
||||
// shard itself. However, the num of retryable attemps on the shard side is just 1, so that
|
||||
// error may bubble up until here if there is any concurrent metadata flush. In addition,
|
||||
// the StaleConfig thrown by the shard may not be related to the targeted collection. For
|
||||
// example, a lookup may throw a StaleConfig for the foreign collection.
|
||||
// TODO (SERVER-77402) Only allow StaleConfig errors for the involved namespaces once the
|
||||
// ShardRole loop increases the retryable attempts
|
||||
if (!isShardStale) {
|
||||
if (!isNssInvolvedInRouting(staleNs)) {
|
||||
uassertStatusOK(s);
|
||||
}
|
||||
}
|
||||
|
||||
// Refresh the view namespace if the stale namespace is a buckets timeseries collection.
|
||||
if (staleNs.isTimeseriesBucketsCollection()) {
|
||||
// A timeseries might've been created, so we need to invalidate the original namespace
|
||||
// version.
|
||||
catalogCache->onStaleCollectionVersion(staleNs.getTimeseriesViewNamespace(),
|
||||
boost::none);
|
||||
}
|
||||
|
||||
// Refresh the timeseries buckets nss when the command targets the buckets collection but
|
||||
// the stale namespace is it's view.
|
||||
std::for_each(
|
||||
_targetedNamespaces.begin(), _targetedNamespaces.end(), [&](const auto& targetedNss) {
|
||||
if (targetedNss.isTimeseriesBucketsCollection() &&
|
||||
targetedNss.getTimeseriesViewNamespace() == staleNs) {
|
||||
catalogCache->onStaleCollectionVersion(targetedNss, boost::none);
|
||||
}
|
||||
});
|
||||
|
||||
catalogCache->onStaleCollectionVersion(staleNs, si->getVersionWanted());
|
||||
|
||||
} else if (s == ErrorCodes::StaleEpoch) {
|
||||
if (auto si = s.extraInfo<StaleEpochInfo>()) {
|
||||
if (!isNssInvolvedInRouting(si->getNss())) {
|
||||
@ -239,6 +279,11 @@ void CollectionRouterCommon::_onException(OperationContext* opCtx,
|
||||
catalogCache->onStaleCollectionVersion(nss, boost::none);
|
||||
catalogCache->onStaleDatabaseVersion(nss.dbName(), boost::none);
|
||||
}
|
||||
} else if (s == ErrorCodes::ShardCannotRefreshDueToLocksHeld) {
|
||||
// The shard is stale but it was not allowed to retry from the ServiceEntryPoint. For
|
||||
// instance, a getMore command can't be individually retried, therefore we must retry the
|
||||
// entire operation from this point.
|
||||
|
||||
} else {
|
||||
uassertStatusOK(s);
|
||||
}
|
||||
|
||||
@ -901,12 +901,14 @@ TEST_F(RouterRoleTest, MultiCollectionRouterRetryOnStaleConfig) {
|
||||
ASSERT_EQ(tries, 2);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTest, MultiCollectionRouterDoesNotRetryOnStaleConfigNonTargetedNamespace) {
|
||||
TEST_F(RouterRoleTest,
|
||||
MultiCollectionRouterDoesNotRetryOnStaleConfigNonTargetedNamespaceWhenShardIsNotStale) {
|
||||
const OID epoch{OID::gen()};
|
||||
const Timestamp timestamp{1, 0};
|
||||
auto nss2 = NamespaceString::createNamespaceString_forTest("test.foo2");
|
||||
|
||||
// The MultiCollectionRouter should not retry on StaleConfig error with non-targeted namespace.
|
||||
// The MultiCollectionRouter should not retry on StaleConfig error with non-targeted
|
||||
// namespace when the router is stale.
|
||||
int tries = 0;
|
||||
sharding::router::MultiCollectionRouter router(getServiceContext(), {_nss, nss2});
|
||||
auto future = launchAsync([&] {
|
||||
@ -923,7 +925,7 @@ TEST_F(RouterRoleTest, MultiCollectionRouterDoesNotRetryOnStaleConfigNonTargeted
|
||||
uasserted(StaleConfigInfo(
|
||||
NamespaceString::createNamespaceString_forTest("test.foo_not_exist"),
|
||||
ShardVersionFactory::make(ChunkVersion({epoch, timestamp}, {2, 0})),
|
||||
boost::none,
|
||||
ShardVersionFactory::make(ChunkVersion({epoch, timestamp}, {2, 1})),
|
||||
ShardId{"0"}),
|
||||
"StaleConfig error");
|
||||
});
|
||||
@ -1329,12 +1331,11 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleConfigTimeseriesBucket) {
|
||||
const OID epoch{OID::gen()};
|
||||
const Timestamp timestamp{1, 0};
|
||||
|
||||
auto timeseriesNss = NamespaceString::createNamespaceString_forTest("test.timeseries");
|
||||
auto bucketNss = timeseriesNss.makeTimeseriesBucketsNamespace();
|
||||
auto bucketNss = _nss.makeTimeseriesBucketsNamespace();
|
||||
|
||||
// The CollectionRouter should retry because bucket collection maps to targeted timeseries view.
|
||||
int tries = 0;
|
||||
sharding::router::CollectionRouter router(getServiceContext(), timeseriesNss);
|
||||
sharding::router::CollectionRouter router(getServiceContext(), _nss);
|
||||
auto future = launchAsync([&] {
|
||||
router.route(operationContext(),
|
||||
"test timeseries routing",
|
||||
|
||||
@ -746,51 +746,73 @@ void Balancer::joinCurrentRound(OperationContext* opCtx) {
|
||||
});
|
||||
}
|
||||
|
||||
Status Balancer::moveRange(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const ConfigsvrMoveRange& request,
|
||||
bool issuedByRemoteUser) {
|
||||
void Balancer::moveRange(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const ConfigsvrMoveRange& request,
|
||||
bool issuedByRemoteUser) {
|
||||
const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
|
||||
auto coll =
|
||||
catalogClient->getCollection(opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
|
||||
|
||||
if (coll.getUnsplittable())
|
||||
return {ErrorCodes::NamespaceNotSharded,
|
||||
str::stream() << "Can't execute moveRange on unsharded collection "
|
||||
<< nss.toStringForErrorMsg()};
|
||||
uassert(ErrorCodes::NamespaceNotSharded,
|
||||
str::stream() << "Can't execute moveRange on unsharded collection "
|
||||
<< nss.toStringForErrorMsg(),
|
||||
!coll.getUnsplittable());
|
||||
|
||||
const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll);
|
||||
|
||||
const auto fromShardId = [&]() {
|
||||
const auto cm = uassertStatusOK(getPlacementInfoForShardedCollection(opCtx, nss));
|
||||
if (request.getMin()) {
|
||||
const auto& chunk = cm.findIntersectingChunkWithSimpleCollation(*request.getMin());
|
||||
return chunk.getShardId();
|
||||
} else {
|
||||
return getChunkForMaxBound(cm, *request.getMax()).getShardId();
|
||||
}
|
||||
}();
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), nss};
|
||||
router.routeWithRoutingContext(
|
||||
opCtx, "moveRange"_sd, [&](OperationContext* opCtx, RoutingContext& unusedRoutingCtx) {
|
||||
unusedRoutingCtx.skipValidation();
|
||||
|
||||
ShardsvrMoveRange shardSvrRequest(nss);
|
||||
shardSvrRequest.setDbName(DatabaseName::kAdmin);
|
||||
shardSvrRequest.setMoveRangeRequestBase(request.getMoveRangeRequestBase());
|
||||
shardSvrRequest.setMaxChunkSizeBytes(maxChunkSize);
|
||||
shardSvrRequest.setFromShard(fromShardId);
|
||||
shardSvrRequest.setCollectionTimestamp(coll.getTimestamp());
|
||||
const auto [secondaryThrottle, wc] =
|
||||
getSecondaryThrottleAndWriteConcern(request.getSecondaryThrottle());
|
||||
shardSvrRequest.setSecondaryThrottle(secondaryThrottle);
|
||||
shardSvrRequest.setForceJumbo(request.getForceJumbo());
|
||||
const auto cm = uassertStatusOK(getPlacementInfoForShardedCollection(opCtx, nss));
|
||||
|
||||
auto response =
|
||||
_commandScheduler->requestMoveRange(opCtx, shardSvrRequest, wc, issuedByRemoteUser)
|
||||
.getNoThrow(opCtx);
|
||||
return processManualMigrationOutcome(opCtx,
|
||||
request.getMin(),
|
||||
request.getMax(),
|
||||
nss,
|
||||
shardSvrRequest.getToShard(),
|
||||
std::move(response));
|
||||
// Check if the 'min' and 'max' parameters match the shardKey pattern.
|
||||
const boost::optional<BSONObj>& minBound = request.getMin();
|
||||
const boost::optional<BSONObj>& maxBound = request.getMax();
|
||||
uassert(ErrorCodes::InvalidOptions,
|
||||
str::stream() << "The 'min' bound " << *minBound
|
||||
<< " is not valid for shard key pattern "
|
||||
<< cm.getShardKeyPattern().toBSON(),
|
||||
!minBound.has_value() || cm.getShardKeyPattern().isShardKey(*minBound));
|
||||
uassert(ErrorCodes::InvalidOptions,
|
||||
str::stream() << "The 'max' bound " << *maxBound
|
||||
<< " is not valid for shard key pattern "
|
||||
<< cm.getShardKeyPattern().toBSON(),
|
||||
!maxBound.has_value() || cm.getShardKeyPattern().isShardKey(*maxBound));
|
||||
|
||||
// Get the donor shard.
|
||||
const auto fromShardId = [&]() {
|
||||
if (minBound.has_value()) {
|
||||
const auto& chunk = cm.findIntersectingChunkWithSimpleCollation(*minBound);
|
||||
return chunk.getShardId();
|
||||
} else {
|
||||
return getChunkForMaxBound(cm, *maxBound).getShardId();
|
||||
}
|
||||
}();
|
||||
|
||||
ShardsvrMoveRange shardSvrRequest(nss);
|
||||
shardSvrRequest.setDbName(DatabaseName::kAdmin);
|
||||
shardSvrRequest.setMoveRangeRequestBase(request.getMoveRangeRequestBase());
|
||||
shardSvrRequest.setMaxChunkSizeBytes(maxChunkSize);
|
||||
shardSvrRequest.setFromShard(fromShardId);
|
||||
shardSvrRequest.setCollectionTimestamp(cm.getVersion().getTimestamp());
|
||||
const auto [secondaryThrottle, wc] =
|
||||
getSecondaryThrottleAndWriteConcern(request.getSecondaryThrottle());
|
||||
shardSvrRequest.setSecondaryThrottle(secondaryThrottle);
|
||||
shardSvrRequest.setForceJumbo(request.getForceJumbo());
|
||||
|
||||
auto response =
|
||||
_commandScheduler->requestMoveRange(opCtx, shardSvrRequest, wc, issuedByRemoteUser)
|
||||
.getNoThrow(opCtx);
|
||||
uassertStatusOK(processManualMigrationOutcome(opCtx,
|
||||
request.getMin(),
|
||||
request.getMax(),
|
||||
nss,
|
||||
shardSvrRequest.getToShard(),
|
||||
std::move(response)));
|
||||
});
|
||||
}
|
||||
|
||||
void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) {
|
||||
|
||||
@ -133,16 +133,16 @@ public:
|
||||
|
||||
/**
|
||||
* Blocking call, which requests the balancer to move a range to the specified location
|
||||
* in accordance with the active balancer policy. An error will be returned if the attempt to
|
||||
* in accordance with the active balancer policy. An error will be thrown if the attempt to
|
||||
* move fails for any reason.
|
||||
*
|
||||
* NOTE: This call disregards the balancer enabled/disabled status and will proceed with the
|
||||
* move regardless.
|
||||
*/
|
||||
Status moveRange(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const ConfigsvrMoveRange& request,
|
||||
bool issuedByRemoteUser);
|
||||
void moveRange(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const ConfigsvrMoveRange& request,
|
||||
bool issuedByRemoteUser);
|
||||
|
||||
/**
|
||||
* Appends the runtime state of the balancer instance to the specified builder.
|
||||
|
||||
@ -103,8 +103,7 @@ public:
|
||||
"Could not find destination shard");
|
||||
|
||||
try {
|
||||
uassertStatusOK(Balancer::get(opCtx)->moveRange(
|
||||
opCtx, nss, req, true /* issuedByRemoteUser */));
|
||||
Balancer::get(opCtx)->moveRange(opCtx, nss, req, true /* issuedByRemoteUser */);
|
||||
} catch (ExceptionFor<ErrorCodes::InterruptedDueToReplStateChange>& ex) {
|
||||
// Rewrite `InterruptedDueToReplStateChange` with `RetriableRemoteCommandFailure` to
|
||||
// ensure it is not forwarded to remote callers. Until we can expose the error
|
||||
|
||||
@ -85,26 +85,30 @@ public:
|
||||
const auto& nss = ns();
|
||||
uassertStatusOK(validateNamespace(nss));
|
||||
|
||||
const auto cri = uassertStatusOK(
|
||||
Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
|
||||
const auto primaryShardId = cri.getDbPrimaryShardId();
|
||||
|
||||
auto shard =
|
||||
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId));
|
||||
auto versionedCmdObj = makeVersionedCmdObj(
|
||||
cri, CommandHelpers::filterCommandRequestForPassthrough(request().toBSON()));
|
||||
auto swResponse = shard->runCommandWithFixedRetryAttempts(
|
||||
sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), nss.dbName());
|
||||
return router.route(
|
||||
opCtx,
|
||||
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
|
||||
DatabaseName::kAdmin,
|
||||
versionedCmdObj,
|
||||
Shard::RetryPolicy::kIdempotent);
|
||||
Request::kCommandName,
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& dbInfo) {
|
||||
auto cmdObj =
|
||||
CommandHelpers::filterCommandRequestForPassthrough(request().toBSON());
|
||||
|
||||
uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(swResponse));
|
||||
const auto swResponse =
|
||||
executeCommandAgainstDatabasePrimaryOnlyAttachingDbVersion(
|
||||
opCtx,
|
||||
DatabaseName::kAdmin,
|
||||
dbInfo,
|
||||
cmdObj,
|
||||
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
|
||||
Shard::RetryPolicy::kIdempotent);
|
||||
|
||||
auto response = ConfigureQueryAnalyzerResponse::parse(
|
||||
IDLParserContext("clusterConfigureQueryAnalyzer"), swResponse.getValue().response);
|
||||
return response;
|
||||
uassertStatusOK(AsyncRequestsSender::Response::getEffectiveStatus(swResponse));
|
||||
|
||||
auto remoteResponse = uassertStatusOK(swResponse.swResponse).data;
|
||||
auto response = ConfigureQueryAnalyzerResponse::parse(
|
||||
IDLParserContext("clusterConfigureQueryAnalyzer"), remoteResponse);
|
||||
return response;
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@ -496,7 +496,9 @@ namespace {
|
||||
* Replaces the target namespace in the 'cmdObj' by 'bucketNss'. Also sets the
|
||||
* 'isTimeseriesNamespace' flag.
|
||||
*/
|
||||
BSONObj replaceNamespaceByBucketNss(const BSONObj& cmdObj, const NamespaceString& bucketNss) {
|
||||
BSONObj replaceNamespaceByBucketNss(OperationContext* opCtx,
|
||||
const BSONObj& cmdObj,
|
||||
const NamespaceString& bucketNss) {
|
||||
BSONObjBuilder bob;
|
||||
for (const auto& elem : cmdObj) {
|
||||
const auto name = elem.fieldNameStringData();
|
||||
@ -507,10 +509,13 @@ BSONObj replaceNamespaceByBucketNss(const BSONObj& cmdObj, const NamespaceString
|
||||
bob.append(elem);
|
||||
}
|
||||
}
|
||||
// Set this flag so that shards can differentiate a request on a time-series view from a request
|
||||
// on a time-series buckets collection since we replace the target namespace in the command with
|
||||
// the buckets namespace.
|
||||
bob.append(write_ops::FindAndModifyCommandRequest::kIsTimeseriesNamespaceFieldName, true);
|
||||
|
||||
if (!isRawDataOperation(opCtx)) {
|
||||
// Set this flag so that shards can differentiate a request on a time-series view from a
|
||||
// request on a time-series buckets collection since we replace the target namespace in the
|
||||
// command with the buckets namespace.
|
||||
bob.append(write_ops::FindAndModifyCommandRequest::kIsTimeseriesNamespaceFieldName, true);
|
||||
}
|
||||
|
||||
return bob.obj();
|
||||
}
|
||||
@ -542,15 +547,16 @@ CollectionRoutingInfo getCollectionRoutingInfo(OperationContext* opCtx,
|
||||
feature_flags::gTimeseriesUpdatesSupport.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot());
|
||||
if (!arbitraryTimeseriesWritesEnabled || maybeTsNss.isTimeseriesBucketsCollection()) {
|
||||
if ((!arbitraryTimeseriesWritesEnabled && !isRawDataOperation(opCtx)) ||
|
||||
maybeTsNss.isTimeseriesBucketsCollection()) {
|
||||
return cri;
|
||||
}
|
||||
|
||||
// If the 'maybeTsNss' namespace is not a timeseries buckets collection and is not tracked on
|
||||
// the configsvr, try to get the CollectionRoutingInfo for the corresponding timeseries buckets
|
||||
// collection to see if it's tracked and it really is a timeseries buckets collection. We should
|
||||
// do this to figure out whether we need to use the two phase write protocol or not on
|
||||
// timeseries buckets collections.
|
||||
// If the 'maybeTsNss' namespace is not a timeseries buckets collection and is not tracked
|
||||
// on the configsvr, try to get the CollectionRoutingInfo for the corresponding timeseries
|
||||
// buckets collection to see if it's tracked and it really is a timeseries buckets
|
||||
// collection. We should do this to figure out whether we need to use the two phase write
|
||||
// protocol or not on timeseries buckets collections.
|
||||
auto bucketCollNss = maybeTsNss.makeTimeseriesBucketsNamespace();
|
||||
auto bucketCollCri =
|
||||
uassertStatusOK(getCollectionRoutingInfoForTxnCmd_DEPRECATED(opCtx, bucketCollNss));
|
||||
@ -561,7 +567,7 @@ CollectionRoutingInfo getCollectionRoutingInfo(OperationContext* opCtx,
|
||||
|
||||
uassert(ErrorCodes::InvalidOptions,
|
||||
"Cannot perform findAndModify with sort on a sharded timeseries collection",
|
||||
!cmdObj.hasField("sort"));
|
||||
!cmdObj.hasField("sort") || isRawDataOperation(opCtx));
|
||||
|
||||
return bucketCollCri;
|
||||
}
|
||||
@ -632,7 +638,7 @@ Status FindAndModifyCmd::explain(OperationContext* opCtx,
|
||||
rpc::ReplyBuilderInterface* result) const {
|
||||
const DatabaseName dbName = request.parseDbName();
|
||||
auto bodyBuilder = result->getBodyBuilder();
|
||||
BSONObj cmdObj = [&]() {
|
||||
const BSONObj originalCmdObj = [&]() {
|
||||
// Check whether the query portion needs to be rewritten for FLE.
|
||||
auto findAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse(
|
||||
IDLParserContext("ClusterFindAndModify"), request.body);
|
||||
@ -643,282 +649,325 @@ Status FindAndModifyCmd::explain(OperationContext* opCtx,
|
||||
return request.body;
|
||||
}
|
||||
}();
|
||||
NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj));
|
||||
const NamespaceString originalNss(
|
||||
CommandHelpers::parseNsCollectionRequired(dbName, originalCmdObj));
|
||||
|
||||
const auto cri = getCollectionRoutingInfo(opCtx, cmdObj, nss);
|
||||
const auto& cm = cri.getChunkManager();
|
||||
auto isTrackedTimeseries = cm.hasRoutingTable() && cm.getTimeseriesFields();
|
||||
auto isTimeseriesLogicalRequest = false;
|
||||
if (isTrackedTimeseries && !nss.isTimeseriesBucketsCollection()) {
|
||||
nss = std::move(cm.getNss());
|
||||
if (!isRawDataOperation(opCtx)) {
|
||||
isTimeseriesLogicalRequest = true;
|
||||
}
|
||||
}
|
||||
// Note: at this point, 'nss' should be the timeseries buckets collection namespace if we're
|
||||
// writing to a tracked timeseries collection.
|
||||
|
||||
boost::optional<ShardId> shardId;
|
||||
const BSONObj query = cmdObj.getObjectField("query");
|
||||
const BSONObj collation = getCollation(cmdObj);
|
||||
const auto isUpsert = cmdObj.getBoolField("upsert");
|
||||
const auto let = getLet(cmdObj);
|
||||
const auto rc = getLegacyRuntimeConstants(cmdObj);
|
||||
if (cri.hasRoutingTable()) {
|
||||
// If the request is for a view on a sharded timeseries buckets collection, we need to
|
||||
// replace the namespace by buckets collection namespace in the command object.
|
||||
if (!cri.getChunkManager().isNewTimeseriesWithoutView() && isTimeseriesLogicalRequest) {
|
||||
cmdObj = replaceNamespaceByBucketNss(cmdObj, nss);
|
||||
}
|
||||
auto expCtx = makeExpressionContextWithDefaultsForTargeter(
|
||||
opCtx, nss, cri, collation, boost::none /* verbosity */, let, rc);
|
||||
if (write_without_shard_key::useTwoPhaseProtocol(opCtx,
|
||||
nss,
|
||||
false /* isUpdateOrDelete */,
|
||||
isUpsert,
|
||||
query,
|
||||
collation,
|
||||
let,
|
||||
rc,
|
||||
isTimeseriesLogicalRequest)) {
|
||||
shardId = targetPotentiallySingleShard(
|
||||
expCtx, cm, query, collation, isTimeseriesLogicalRequest);
|
||||
} else {
|
||||
shardId = targetSingleShard(expCtx, cm, query, collation, isTimeseriesLogicalRequest);
|
||||
}
|
||||
} else {
|
||||
shardId = cri.getDbPrimaryShardId();
|
||||
}
|
||||
|
||||
// Time how long it takes to run the explain command on the shard.
|
||||
Timer timer;
|
||||
BSONObjBuilder bob;
|
||||
if (!shardId) {
|
||||
_runExplainWithoutShardKey(
|
||||
opCtx, nss, makeExplainCmd(opCtx, cmdObj, verbosity), verbosity, &bob);
|
||||
bodyBuilder.appendElementsUnique(bob.obj());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
auto shardVersion = cri.hasRoutingTable()
|
||||
? boost::make_optional(cri.getShardVersion(*shardId))
|
||||
: boost::make_optional(!cri.getDbVersion().isFixed(), ShardVersion::UNSHARDED());
|
||||
auto dbVersion = cri.hasRoutingTable() ? boost::none : boost::make_optional(cri.getDbVersion());
|
||||
|
||||
_runCommand(
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
return router.routeWithRoutingContext(
|
||||
opCtx,
|
||||
*shardId,
|
||||
shardVersion,
|
||||
dbVersion,
|
||||
nss,
|
||||
applyReadWriteConcern(opCtx, false, false, makeExplainCmd(opCtx, cmdObj, verbosity)),
|
||||
true /* isExplain */,
|
||||
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
|
||||
isTimeseriesLogicalRequest,
|
||||
&bob);
|
||||
"findAndModify explain",
|
||||
[&](OperationContext* opCtx, RoutingContext& unusedRoutingCtx) {
|
||||
// Clear the BSONObjBuilder since this lambda function may be retried if the router
|
||||
// cache is
|
||||
// stale.
|
||||
bodyBuilder.resetToEmpty();
|
||||
|
||||
const auto millisElapsed = timer.millis();
|
||||
// The CollectionRouter is not capable of implicitly translate the namespace to a
|
||||
// timeseries buckets collection, which is required in this command. Hence, we'll use
|
||||
// the CollectionRouter to handle StaleConfig errors but will ignore its RoutingContext.
|
||||
// Instead, we'll use the standalone getCollectionRoutingInfo() function to properly get
|
||||
// the RoutingContext when the collection is timeseries.
|
||||
// TODO (SPM-3830) Use the RoutingContext provided by the CollectionRouter once
|
||||
// all timeseries collections become viewless.
|
||||
unusedRoutingCtx.skipValidation();
|
||||
|
||||
// We fetch an arbitrary host from the ConnectionString, since
|
||||
// ClusterExplain::buildExplainResult() doesn't use the given HostAndPort.
|
||||
auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, *shardId));
|
||||
auto host = shard->getConnString().getServers().front();
|
||||
auto nss = originalNss;
|
||||
auto cmdObj = originalCmdObj;
|
||||
|
||||
executor::RemoteCommandResponse response(host, bob.obj(), Milliseconds(millisElapsed));
|
||||
AsyncRequestsSender::Response arsResponse{*shardId, response, host};
|
||||
const auto cri = getCollectionRoutingInfo(opCtx, cmdObj, nss);
|
||||
const auto& cm = cri.getChunkManager();
|
||||
auto isTrackedTimeseries = cm.hasRoutingTable() && cm.getTimeseriesFields();
|
||||
auto isTimeseriesLogicalRequest = false;
|
||||
if (isTrackedTimeseries && !nss.isTimeseriesBucketsCollection()) {
|
||||
nss = std::move(cm.getNss());
|
||||
if (!isRawDataOperation(opCtx)) {
|
||||
isTimeseriesLogicalRequest = true;
|
||||
}
|
||||
}
|
||||
// Note: at this point, 'nss' should be the timeseries buckets collection namespace if
|
||||
// we're writing to a tracked timeseries collection.
|
||||
|
||||
return ClusterExplain::buildExplainResult(makeBlankExpressionContext(opCtx, nss),
|
||||
{arsResponse},
|
||||
ClusterExplain::kSingleShard,
|
||||
millisElapsed,
|
||||
cmdObj,
|
||||
&bodyBuilder);
|
||||
boost::optional<ShardId> shardId;
|
||||
const BSONObj query = cmdObj.getObjectField("query");
|
||||
const BSONObj collation = getCollation(cmdObj);
|
||||
const auto isUpsert = cmdObj.getBoolField("upsert");
|
||||
const auto let = getLet(cmdObj);
|
||||
const auto rc = getLegacyRuntimeConstants(cmdObj);
|
||||
if (cri.hasRoutingTable()) {
|
||||
// If the request is for a view on a sharded timeseries buckets collection, we need
|
||||
// to replace the namespace by buckets collection namespace in the command object.
|
||||
if (!cri.getChunkManager().isNewTimeseriesWithoutView() &&
|
||||
isTimeseriesLogicalRequest) {
|
||||
cmdObj = replaceNamespaceByBucketNss(opCtx, cmdObj, nss);
|
||||
}
|
||||
auto expCtx = makeExpressionContextWithDefaultsForTargeter(
|
||||
opCtx, nss, cri, collation, boost::none /* verbosity */, let, rc);
|
||||
if (write_without_shard_key::useTwoPhaseProtocol(opCtx,
|
||||
nss,
|
||||
false /* isUpdateOrDelete */,
|
||||
isUpsert,
|
||||
query,
|
||||
collation,
|
||||
let,
|
||||
rc,
|
||||
isTimeseriesLogicalRequest)) {
|
||||
shardId = targetPotentiallySingleShard(
|
||||
expCtx, cm, query, collation, isTimeseriesLogicalRequest);
|
||||
} else {
|
||||
shardId =
|
||||
targetSingleShard(expCtx, cm, query, collation, isTimeseriesLogicalRequest);
|
||||
}
|
||||
} else {
|
||||
shardId = cri.getDbPrimaryShardId();
|
||||
}
|
||||
|
||||
// Time how long it takes to run the explain command on the shard.
|
||||
Timer timer;
|
||||
BSONObjBuilder bob;
|
||||
if (!shardId) {
|
||||
_runExplainWithoutShardKey(
|
||||
opCtx, nss, makeExplainCmd(opCtx, cmdObj, verbosity), verbosity, &bob);
|
||||
bodyBuilder.appendElementsUnique(bob.obj());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
auto shardVersion = cri.hasRoutingTable()
|
||||
? boost::make_optional(cri.getShardVersion(*shardId))
|
||||
: boost::make_optional(!cri.getDbVersion().isFixed(), ShardVersion::UNSHARDED());
|
||||
auto dbVersion =
|
||||
cri.hasRoutingTable() ? boost::none : boost::make_optional(cri.getDbVersion());
|
||||
|
||||
_runCommand(opCtx,
|
||||
*shardId,
|
||||
shardVersion,
|
||||
dbVersion,
|
||||
nss,
|
||||
applyReadWriteConcern(
|
||||
opCtx, false, false, makeExplainCmd(opCtx, cmdObj, verbosity)),
|
||||
true /* isExplain */,
|
||||
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
|
||||
isTimeseriesLogicalRequest,
|
||||
&bob);
|
||||
|
||||
const auto millisElapsed = timer.millis();
|
||||
|
||||
// We fetch an arbitrary host from the ConnectionString, since
|
||||
// ClusterExplain::buildExplainResult() doesn't use the given HostAndPort.
|
||||
auto shard =
|
||||
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, *shardId));
|
||||
auto host = shard->getConnString().getServers().front();
|
||||
|
||||
executor::RemoteCommandResponse response(host, bob.obj(), Milliseconds(millisElapsed));
|
||||
AsyncRequestsSender::Response arsResponse{*shardId, response, host};
|
||||
|
||||
return ClusterExplain::buildExplainResult(makeBlankExpressionContext(opCtx, nss),
|
||||
{arsResponse},
|
||||
ClusterExplain::kSingleShard,
|
||||
millisElapsed,
|
||||
cmdObj,
|
||||
&bodyBuilder);
|
||||
});
|
||||
}
|
||||
|
||||
bool FindAndModifyCmd::run(OperationContext* opCtx,
|
||||
const DatabaseName& dbName,
|
||||
const BSONObj& originalCmdObj,
|
||||
BSONObjBuilder& result) {
|
||||
NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, originalCmdObj));
|
||||
const NamespaceString originalNss(
|
||||
CommandHelpers::parseNsCollectionRequired(dbName, originalCmdObj));
|
||||
|
||||
if (processFLEFindAndModify(opCtx, originalCmdObj, result) == FLEBatchResult::kProcessed) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto cmdObj = [&] {
|
||||
auto rawData = OptionalBool::parseFromBSON(
|
||||
originalCmdObj[write_ops::FindAndModifyCommandRequest::kRawDataFieldName]);
|
||||
auto findAndModifyBody = [&](OperationContext* opCtx, RoutingContext& unusedRoutingCtx) {
|
||||
// Clear the BSONObjBuilder since this lambda function may be retried if the router cache is
|
||||
// stale.
|
||||
result.resetToEmpty();
|
||||
|
||||
if (!rawData ||
|
||||
!CollectionRoutingInfoTargeter{opCtx, nss}.timeseriesNamespaceNeedsRewrite(nss)) {
|
||||
return originalCmdObj;
|
||||
// The CollectionRouter is not capable of implicitly translate the namespace to a timeseries
|
||||
// buckets collection, which is required in this command. Hence, we'll use the
|
||||
// CollectionRouter to handle StaleConfig errors but will ignore its RoutingContext.
|
||||
// Instead, we'll use the standalone getCollectionRoutingInfo() function to properly get
|
||||
// the RoutingContext when the collection is timeseries.
|
||||
// TODO (SPM-3830) Use the RoutingContext provided by the CollectionRouter once
|
||||
// all timeseries collections become viewless.
|
||||
unusedRoutingCtx.skipValidation();
|
||||
|
||||
auto cri = getCollectionRoutingInfo(opCtx, originalCmdObj, originalNss);
|
||||
const auto& cm = cri.getChunkManager();
|
||||
|
||||
auto nss = originalNss;
|
||||
auto cmdObj = originalCmdObj;
|
||||
|
||||
auto isTrackedTimeseries = cri.hasRoutingTable() && cm.getTimeseriesFields();
|
||||
auto isTimeseriesViewRequest = false;
|
||||
if (isTrackedTimeseries && !nss.isTimeseriesBucketsCollection()) {
|
||||
nss = std::move(cm.getNss());
|
||||
isTimeseriesViewRequest = true;
|
||||
}
|
||||
// Note: at this point, 'nss' should be the timeseries buckets collection namespace if we're
|
||||
// writing to a sharded timeseries collection.
|
||||
|
||||
nss = nss.makeTimeseriesBucketsNamespace();
|
||||
return rewriteCommandForRawDataOperation<write_ops::FindAndModifyCommandRequest>(
|
||||
originalCmdObj, nss.coll());
|
||||
}();
|
||||
// Collect metrics.
|
||||
_updateMetrics->collectMetrics(cmdObj);
|
||||
|
||||
// Collect metrics.
|
||||
_updateMetrics->collectMetrics(cmdObj);
|
||||
// Create an RAII object that prints the collection's shard key in the case of a tassert
|
||||
// or crash.
|
||||
ScopedDebugInfo shardKeyDiagnostics(
|
||||
"ShardKeyDiagnostics",
|
||||
diagnostic_printers::ShardKeyDiagnosticPrinter{
|
||||
cm.isSharded() ? cm.getShardKeyPattern().toBSON() : BSONObj()});
|
||||
|
||||
// Append mongoS' runtime constants to the command object before forwarding it to the shard.
|
||||
auto cmdObjForShard = appendLegacyRuntimeConstantsToCommandObject(opCtx, cmdObj);
|
||||
|
||||
auto cri = [&]() {
|
||||
size_t attempts = 1u;
|
||||
while (true) {
|
||||
try {
|
||||
// Technically, findAndModify should only be creating database if upsert is true,
|
||||
// but this would require that the parsing be pulled into this function.
|
||||
cluster::createDatabase(opCtx, nss.dbName());
|
||||
return getCollectionRoutingInfo(opCtx, cmdObj, nss);
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
LOGV2_INFO(8584300,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(nss.dbName()),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
|
||||
if (attempts++ >= kMaxDatabaseCreationAttempts) {
|
||||
// The maximum number of attempts has been reached, so the procedure fails as it
|
||||
// could be a logical error. At this point, it is unlikely that the error is
|
||||
// caused by concurrent drop database operations.
|
||||
throw;
|
||||
}
|
||||
if (cri.hasRoutingTable()) {
|
||||
// If the request is for a view on a sharded timeseries buckets collection, we need to
|
||||
// replace the namespace by buckets collection namespace in the command object.
|
||||
if (isTimeseriesViewRequest && !cri.getChunkManager().isNewTimeseriesWithoutView()) {
|
||||
cmdObjForShard = replaceNamespaceByBucketNss(opCtx, cmdObjForShard, nss);
|
||||
}
|
||||
}
|
||||
}();
|
||||
|
||||
const auto& cm = cri.getChunkManager();
|
||||
auto letParams = getLet(cmdObjForShard);
|
||||
auto runtimeConstants = getLegacyRuntimeConstants(cmdObjForShard);
|
||||
BSONObj collation = getCollation(cmdObjForShard);
|
||||
auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx,
|
||||
nss,
|
||||
cri,
|
||||
collation,
|
||||
boost::none /* verbosity */,
|
||||
letParams,
|
||||
runtimeConstants);
|
||||
|
||||
// Create an RAII object that prints the collection's shard key in the case of a tassert
|
||||
// or crash.
|
||||
ScopedDebugInfo shardKeyDiagnostics(
|
||||
"ShardKeyDiagnostics",
|
||||
diagnostic_printers::ShardKeyDiagnosticPrinter{
|
||||
cm.isSharded() ? cm.getShardKeyPattern().toBSON() : BSONObj()});
|
||||
// If this command has 'let' parameters, then evaluate them once and stash them back on
|
||||
// the original command object. Note that this isn't necessary outside of the case where
|
||||
// we have a routing table because this is intended to prevent evaluating let parameters
|
||||
// multiple times (which can only happen when executing against a sharded cluster).
|
||||
if (letParams) {
|
||||
// Serialize variables before moving 'cmdObjForShard' to avoid invalid access.
|
||||
expCtx->variables.seedVariablesWithLetParameters(
|
||||
expCtx.get(), *letParams, [](const Expression* expr) {
|
||||
return expression::getDependencies(expr).hasNoRequirements();
|
||||
});
|
||||
auto letVars =
|
||||
Value(expCtx->variables.toBSON(expCtx->variablesParseState, *letParams));
|
||||
|
||||
auto isTrackedTimeseries = cri.hasRoutingTable() && cm.getTimeseriesFields();
|
||||
auto isTimeseriesViewRequest = false;
|
||||
if (isTrackedTimeseries && !nss.isTimeseriesBucketsCollection()) {
|
||||
nss = std::move(cm.getNss());
|
||||
isTimeseriesViewRequest = true;
|
||||
}
|
||||
// Note: at this point, 'nss' should be the timeseries buckets collection namespace if we're
|
||||
// writing to a sharded timeseries collection.
|
||||
MutableDocument cmdDoc(Document(std::move(cmdObjForShard)));
|
||||
cmdDoc[write_ops::FindAndModifyCommandRequest::kLetFieldName] = letVars;
|
||||
cmdObjForShard = cmdDoc.freeze().toBson();
|
||||
|
||||
// Append mongoS' runtime constants to the command object before forwarding it to the shard.
|
||||
auto cmdObjForShard = appendLegacyRuntimeConstantsToCommandObject(opCtx, cmdObj);
|
||||
if (cri.hasRoutingTable()) {
|
||||
// If the request is for a view on a sharded timeseries buckets collection, we need to
|
||||
// replace the namespace by buckets collection namespace in the command object.
|
||||
if (isTimeseriesViewRequest && !cri.getChunkManager().isNewTimeseriesWithoutView()) {
|
||||
cmdObjForShard = replaceNamespaceByBucketNss(cmdObjForShard, nss);
|
||||
}
|
||||
// Reset the objects set up above as they are now invalid given that
|
||||
// 'cmdObjForShard' has been changed.
|
||||
letParams = getLet(cmdObjForShard);
|
||||
runtimeConstants = getLegacyRuntimeConstants(cmdObjForShard);
|
||||
collation = getCollation(cmdObjForShard);
|
||||
}
|
||||
|
||||
auto letParams = getLet(cmdObjForShard);
|
||||
auto runtimeConstants = getLegacyRuntimeConstants(cmdObjForShard);
|
||||
BSONObj collation = getCollation(cmdObjForShard);
|
||||
auto expCtx = makeExpressionContextWithDefaultsForTargeter(
|
||||
opCtx, nss, cri, collation, boost::none /* verbosity */, letParams, runtimeConstants);
|
||||
BSONObj query = cmdObjForShard.getObjectField("query");
|
||||
const bool isUpsert = cmdObjForShard.getBoolField("upsert");
|
||||
|
||||
// If this command has 'let' parameters, then evaluate them once and stash them back on the
|
||||
// original command object. Note that this isn't necessary outside of the case where we have
|
||||
// a routing table because this is intended to prevent evaluating let parameters multiple
|
||||
// times (which can only happen when executing against a sharded cluster).
|
||||
if (letParams) {
|
||||
// Serialize variables before moving 'cmdObjForShard' to avoid invalid access.
|
||||
expCtx->variables.seedVariablesWithLetParameters(
|
||||
expCtx.get(), *letParams, [](const Expression* expr) {
|
||||
return expression::getDependencies(expr).hasNoRequirements();
|
||||
});
|
||||
auto letVars = Value(expCtx->variables.toBSON(expCtx->variablesParseState, *letParams));
|
||||
if (write_without_shard_key::useTwoPhaseProtocol(opCtx,
|
||||
nss,
|
||||
false /* isUpdateOrDelete */,
|
||||
isUpsert,
|
||||
query,
|
||||
collation,
|
||||
letParams,
|
||||
runtimeConstants,
|
||||
isTimeseriesViewRequest)) {
|
||||
getQueryCounters(opCtx).findAndModifyNonTargetedShardedCount.increment(1);
|
||||
auto allowShardKeyUpdatesWithoutFullShardKeyInQuery =
|
||||
opCtx->isRetryableWrite() || opCtx->inMultiDocumentTransaction();
|
||||
|
||||
MutableDocument cmdDoc(Document(std::move(cmdObjForShard)));
|
||||
cmdDoc[write_ops::FindAndModifyCommandRequest::kLetFieldName] = letVars;
|
||||
cmdObjForShard = cmdDoc.freeze().toBson();
|
||||
if (auto shardId = targetPotentiallySingleShard(
|
||||
expCtx, cm, query, collation, isTimeseriesViewRequest)) {
|
||||
// If we can find a single shard to target, we can skip the two phase write
|
||||
// protocol.
|
||||
_runCommand(opCtx,
|
||||
*shardId,
|
||||
cri.getShardVersion(*shardId),
|
||||
boost::none,
|
||||
nss,
|
||||
applyReadWriteConcern(opCtx, this, cmdObjForShard),
|
||||
false /* isExplain */,
|
||||
allowShardKeyUpdatesWithoutFullShardKeyInQuery,
|
||||
isTimeseriesViewRequest,
|
||||
&result);
|
||||
} else {
|
||||
_runCommandWithoutShardKey(opCtx,
|
||||
nss,
|
||||
applyReadWriteConcern(opCtx, this, cmdObjForShard),
|
||||
isTimeseriesViewRequest,
|
||||
&result);
|
||||
}
|
||||
} else {
|
||||
if (cm.isSharded()) {
|
||||
getQueryCounters(opCtx).findAndModifyTargetedShardedCount.increment(1);
|
||||
} else {
|
||||
getQueryCounters(opCtx).findAndModifyUnshardedCount.increment(1);
|
||||
}
|
||||
|
||||
// Reset the objects set up above as they are now invalid given that 'cmdObjForShard'
|
||||
// has been changed.
|
||||
letParams = getLet(cmdObjForShard);
|
||||
runtimeConstants = getLegacyRuntimeConstants(cmdObjForShard);
|
||||
collation = getCollation(cmdObjForShard);
|
||||
}
|
||||
ShardId shardId =
|
||||
targetSingleShard(expCtx, cm, query, collation, isTimeseriesViewRequest);
|
||||
|
||||
BSONObj query = cmdObjForShard.getObjectField("query");
|
||||
const bool isUpsert = cmdObjForShard.getBoolField("upsert");
|
||||
|
||||
if (write_without_shard_key::useTwoPhaseProtocol(opCtx,
|
||||
nss,
|
||||
false /* isUpdateOrDelete */,
|
||||
isUpsert,
|
||||
query,
|
||||
collation,
|
||||
letParams,
|
||||
runtimeConstants,
|
||||
isTimeseriesViewRequest)) {
|
||||
getQueryCounters(opCtx).findAndModifyNonTargetedShardedCount.increment(1);
|
||||
auto allowShardKeyUpdatesWithoutFullShardKeyInQuery =
|
||||
opCtx->isRetryableWrite() || opCtx->inMultiDocumentTransaction();
|
||||
|
||||
if (auto shardId = targetPotentiallySingleShard(
|
||||
expCtx, cm, query, collation, isTimeseriesViewRequest)) {
|
||||
// If we can find a single shard to target, we can skip the two phase write
|
||||
// protocol.
|
||||
_runCommand(opCtx,
|
||||
*shardId,
|
||||
cri.getShardVersion(*shardId),
|
||||
shardId,
|
||||
cri.getShardVersion(shardId),
|
||||
boost::none,
|
||||
nss,
|
||||
applyReadWriteConcern(opCtx, this, cmdObjForShard),
|
||||
false /* isExplain */,
|
||||
allowShardKeyUpdatesWithoutFullShardKeyInQuery,
|
||||
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
|
||||
isTimeseriesViewRequest,
|
||||
&result);
|
||||
} else {
|
||||
_runCommandWithoutShardKey(opCtx,
|
||||
nss,
|
||||
applyReadWriteConcern(opCtx, this, cmdObjForShard),
|
||||
isTimeseriesViewRequest,
|
||||
&result);
|
||||
}
|
||||
} else {
|
||||
if (cm.isSharded()) {
|
||||
getQueryCounters(opCtx).findAndModifyTargetedShardedCount.increment(1);
|
||||
} else {
|
||||
getQueryCounters(opCtx).findAndModifyUnshardedCount.increment(1);
|
||||
}
|
||||
getQueryCounters(opCtx).findAndModifyUnshardedCount.increment(1);
|
||||
|
||||
ShardId shardId =
|
||||
targetSingleShard(expCtx, cm, query, collation, isTimeseriesViewRequest);
|
||||
|
||||
_runCommand(opCtx,
|
||||
shardId,
|
||||
cri.getShardVersion(shardId),
|
||||
boost::none,
|
||||
nss,
|
||||
applyReadWriteConcern(opCtx, this, cmdObjForShard),
|
||||
false /* isExplain */,
|
||||
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
|
||||
isTimeseriesViewRequest,
|
||||
&result);
|
||||
_runCommand(
|
||||
opCtx,
|
||||
cri.getDbPrimaryShardId(),
|
||||
boost::make_optional(!cri.getDbVersion().isFixed(), ShardVersion::UNSHARDED()),
|
||||
cri.getDbVersion(),
|
||||
nss,
|
||||
applyReadWriteConcern(opCtx, this, cmdObjForShard),
|
||||
false /* isExplain */,
|
||||
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
|
||||
isTimeseriesViewRequest,
|
||||
&result);
|
||||
}
|
||||
} else {
|
||||
getQueryCounters(opCtx).findAndModifyUnshardedCount.increment(1);
|
||||
};
|
||||
|
||||
_runCommand(opCtx,
|
||||
cri.getDbPrimaryShardId(),
|
||||
boost::make_optional(!cri.getDbVersion().isFixed(), ShardVersion::UNSHARDED()),
|
||||
cri.getDbVersion(),
|
||||
nss,
|
||||
applyReadWriteConcern(opCtx, this, cmdObjForShard),
|
||||
false /* isExplain */,
|
||||
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
|
||||
isTimeseriesViewRequest,
|
||||
&result);
|
||||
while (true) {
|
||||
size_t attempts = 1u;
|
||||
try {
|
||||
// Technically, findAndModify should only be creating database if upsert is true, but
|
||||
// this would require that the parsing be pulled into this function.
|
||||
cluster::createDatabase(opCtx, originalNss.dbName());
|
||||
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
router.routeWithRoutingContext(opCtx, getName(), findAndModifyBody);
|
||||
return true;
|
||||
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
LOGV2_INFO(8584300,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(originalNss.dbName()),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
|
||||
if (attempts++ >= kMaxDatabaseCreationAttempts) {
|
||||
// The maximum number of attempts has been reached, so the procedure fails as it
|
||||
// could be a logical error. At this point, it is unlikely that the error is
|
||||
// caused by concurrent drop database operations.
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
|
||||
bool FindAndModifyCmd::getCrudProcessedFromCmd(const BSONObj& cmdObj) {
|
||||
|
||||
@ -129,34 +129,21 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
void _runAggOnNamespace(OperationContext* opCtx,
|
||||
BSONObjBuilder* result,
|
||||
boost::optional<ExplainOptions::Verbosity> verbosity,
|
||||
const NamespaceString& executionNss) {
|
||||
uassertStatusOK(ClusterAggregate::runAggregate(
|
||||
opCtx,
|
||||
ClusterAggregate::Namespaces{_aggregationRequest.getNamespace(), executionNss},
|
||||
_aggregationRequest,
|
||||
_liteParsedPipeline,
|
||||
_privileges,
|
||||
verbosity,
|
||||
result));
|
||||
}
|
||||
|
||||
void _runAggCommand(OperationContext* opCtx,
|
||||
BSONObjBuilder* result,
|
||||
boost::optional<ExplainOptions::Verbosity> verbosity) {
|
||||
const auto& nss = _aggregationRequest.getNamespace();
|
||||
|
||||
try {
|
||||
_runAggOnNamespace(opCtx,
|
||||
result,
|
||||
verbosity,
|
||||
isRawDataOperation(opCtx) &&
|
||||
CollectionRoutingInfoTargeter{opCtx, nss}
|
||||
.timeseriesNamespaceNeedsRewrite(nss)
|
||||
? nss.makeTimeseriesBucketsNamespace()
|
||||
: nss);
|
||||
uassertStatusOK(
|
||||
ClusterAggregate::runAggregate(opCtx,
|
||||
ClusterAggregate::Namespaces{nss, nss},
|
||||
_aggregationRequest,
|
||||
_liteParsedPipeline,
|
||||
_privileges,
|
||||
verbosity,
|
||||
result));
|
||||
|
||||
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
|
||||
if (!isRawDataOperation(opCtx) ||
|
||||
!ex->getNamespace().isTimeseriesBucketsCollection()) {
|
||||
@ -175,7 +162,15 @@ public:
|
||||
// was for raw data, we want to run aggregate on the buckets namespace instead
|
||||
// of as a view.
|
||||
result->resetToEmpty();
|
||||
_runAggOnNamespace(opCtx, result, verbosity, ex->getNamespace());
|
||||
uassertStatusOK(ClusterAggregate::runAggregate(
|
||||
opCtx,
|
||||
ClusterAggregate::Namespaces{_aggregationRequest.getNamespace(),
|
||||
ex->getNamespace()},
|
||||
_aggregationRequest,
|
||||
_liteParsedPipeline,
|
||||
_privileges,
|
||||
verbosity,
|
||||
result));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,6 +41,7 @@
|
||||
#include "mongo/db/fle_crud.h"
|
||||
#include "mongo/db/generic_argument_util.h"
|
||||
#include "mongo/db/global_catalog/chunk_manager.h"
|
||||
#include "mongo/db/global_catalog/ddl/cluster_ddl.h"
|
||||
#include "mongo/db/global_catalog/router_role_api/cluster_commands_helpers.h"
|
||||
#include "mongo/db/global_catalog/router_role_api/collection_routing_info_targeter.h"
|
||||
#include "mongo/db/internal_transactions_feature_flag_gen.h"
|
||||
@ -303,31 +304,6 @@ UpdateShardKeyResult handleWouldChangeOwningShardErrorTransaction(
|
||||
return UpdateShardKeyResult{sharedBlock->updatedShardKey, std::move(upsertedId)};
|
||||
}
|
||||
|
||||
static BSONObj translateCmdObjForRawData(OperationContext* opCtx,
|
||||
const BatchedCommandRequest& batchedRequest,
|
||||
const BSONObj& cmdObj,
|
||||
NamespaceString& ns) {
|
||||
if (!isRawDataOperation(opCtx) ||
|
||||
!CollectionRoutingInfoTargeter{opCtx, ns}.timeseriesNamespaceNeedsRewrite(ns)) {
|
||||
return cmdObj;
|
||||
}
|
||||
|
||||
ns = ns.makeTimeseriesBucketsNamespace();
|
||||
switch (batchedRequest.getBatchType()) {
|
||||
case BatchedCommandRequest::BatchType_Insert:
|
||||
return rewriteCommandForRawDataOperation<write_ops::InsertCommandRequest>(cmdObj,
|
||||
ns.coll());
|
||||
case BatchedCommandRequest::BatchType_Update:
|
||||
return rewriteCommandForRawDataOperation<write_ops::UpdateCommandRequest>(cmdObj,
|
||||
ns.coll());
|
||||
case BatchedCommandRequest::BatchType_Delete:
|
||||
return rewriteCommandForRawDataOperation<write_ops::DeleteCommandRequest>(cmdObj,
|
||||
ns.coll());
|
||||
}
|
||||
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool ClusterWriteCmd::handleWouldChangeOwningShardError(OperationContext* opCtx,
|
||||
@ -435,8 +411,8 @@ void ClusterWriteCmd::commandOpWrite(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const BSONObj& command,
|
||||
BatchItemRef targetingBatchItem,
|
||||
const CollectionRoutingInfoTargeter& targeter,
|
||||
std::vector<AsyncRequestsSender::Response>* results) {
|
||||
CollectionRoutingInfoTargeter targeter(opCtx, nss);
|
||||
auto endpoints = [&] {
|
||||
// Note that this implementation will not handle targeting retries and does not
|
||||
// completely emulate write behavior.
|
||||
@ -498,41 +474,85 @@ void ClusterWriteCmd::commandOpWrite(OperationContext* opCtx,
|
||||
|
||||
bool ClusterWriteCmd::runExplainWithoutShardKey(OperationContext* opCtx,
|
||||
const BatchedCommandRequest& req,
|
||||
const NamespaceString& nss,
|
||||
const NamespaceString& originalNss,
|
||||
ExplainOptions::Verbosity verbosity,
|
||||
BSONObjBuilder* result) {
|
||||
if (req.getBatchType() == BatchedCommandRequest::BatchType_Delete ||
|
||||
req.getBatchType() == BatchedCommandRequest::BatchType_Update) {
|
||||
bool isMultiWrite = false;
|
||||
BSONObj query;
|
||||
BSONObj collation;
|
||||
bool isUpsert = false;
|
||||
if (req.getBatchType() == BatchedCommandRequest::BatchType_Update) {
|
||||
auto updateOp = req.getUpdateRequest().getUpdates().begin();
|
||||
isMultiWrite = updateOp->getMulti();
|
||||
query = updateOp->getQ();
|
||||
collation = updateOp->getCollation().value_or(BSONObj());
|
||||
isUpsert = updateOp->getUpsert();
|
||||
} else {
|
||||
auto deleteOp = req.getDeleteRequest().getDeletes().begin();
|
||||
isMultiWrite = deleteOp->getMulti();
|
||||
query = deleteOp->getQ();
|
||||
collation = deleteOp->getCollation().value_or(BSONObj());
|
||||
}
|
||||
if (req.getBatchType() != BatchedCommandRequest::BatchType_Delete &&
|
||||
req.getBatchType() != BatchedCommandRequest::BatchType_Update) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto translatedNs = nss;
|
||||
auto translatedReqBSON = translateCmdObjForRawData(opCtx, req, req.toBSON(), translatedNs);
|
||||
bool isMultiWrite = false;
|
||||
BSONObj query;
|
||||
BSONObj collation;
|
||||
bool isUpsert = false;
|
||||
if (req.getBatchType() == BatchedCommandRequest::BatchType_Update) {
|
||||
auto updateOp = req.getUpdateRequest().getUpdates().begin();
|
||||
isMultiWrite = updateOp->getMulti();
|
||||
query = updateOp->getQ();
|
||||
collation = updateOp->getCollation().value_or(BSONObj());
|
||||
isUpsert = updateOp->getUpsert();
|
||||
} else {
|
||||
auto deleteOp = req.getDeleteRequest().getDeletes().begin();
|
||||
isMultiWrite = deleteOp->getMulti();
|
||||
query = deleteOp->getQ();
|
||||
collation = deleteOp->getCollation().value_or(BSONObj());
|
||||
}
|
||||
|
||||
if (isMultiWrite) {
|
||||
return false;
|
||||
}
|
||||
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
return router.routeWithRoutingContext(
|
||||
opCtx,
|
||||
"explain write"_sd,
|
||||
[&](OperationContext* opCtx, RoutingContext& originalRoutingCtx) {
|
||||
auto translatedReqBSON = req.toBSON();
|
||||
auto translatedNss = originalNss;
|
||||
const auto targeter = CollectionRoutingInfoTargeter(opCtx, originalNss);
|
||||
auto& unusedRoutingCtx = translateNssForRawDataAccordingToRoutingInfo(
|
||||
opCtx,
|
||||
originalNss,
|
||||
targeter,
|
||||
originalRoutingCtx,
|
||||
[&](const NamespaceString& bucketsNss) {
|
||||
translatedNss = bucketsNss;
|
||||
switch (req.getBatchType()) {
|
||||
case BatchedCommandRequest::BatchType_Insert:
|
||||
translatedReqBSON =
|
||||
rewriteCommandForRawDataOperation<write_ops::InsertCommandRequest>(
|
||||
translatedReqBSON, translatedNss.coll());
|
||||
break;
|
||||
case BatchedCommandRequest::BatchType_Update:
|
||||
translatedReqBSON =
|
||||
rewriteCommandForRawDataOperation<write_ops::UpdateCommandRequest>(
|
||||
translatedReqBSON, translatedNss.coll());
|
||||
break;
|
||||
case BatchedCommandRequest::BatchType_Delete:
|
||||
translatedReqBSON =
|
||||
rewriteCommandForRawDataOperation<write_ops::DeleteCommandRequest>(
|
||||
translatedReqBSON, translatedNss.coll());
|
||||
break;
|
||||
default:
|
||||
MONGO_UNREACHABLE_TASSERT(10370603);
|
||||
}
|
||||
});
|
||||
unusedRoutingCtx.skipValidation();
|
||||
|
||||
if (!write_without_shard_key::useTwoPhaseProtocol(
|
||||
opCtx,
|
||||
translatedNss,
|
||||
true /* isUpdateOrDelete */,
|
||||
isUpsert,
|
||||
query,
|
||||
collation,
|
||||
req.getLet(),
|
||||
req.getLegacyRuntimeConstants(),
|
||||
false /* isTimeseriesViewRequest */)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!isMultiWrite &&
|
||||
write_without_shard_key::useTwoPhaseProtocol(opCtx,
|
||||
translatedNs,
|
||||
true /* isUpdateOrDelete */,
|
||||
isUpsert,
|
||||
query,
|
||||
collation,
|
||||
req.getLet(),
|
||||
req.getLegacyRuntimeConstants(),
|
||||
false /* isTimeseriesViewRequest */)) {
|
||||
// Explain currently cannot be run within a transaction, so each command is instead run
|
||||
// separately outside of a transaction, and we compose the results at the end.
|
||||
auto vts = auth::ValidatedTenancyScope::get(opCtx);
|
||||
@ -542,7 +562,7 @@ bool ClusterWriteCmd::runExplainWithoutShardKey(OperationContext* opCtx,
|
||||
const auto explainClusterQueryWithoutShardKeyCmd = ClusterExplain::wrapAsExplain(
|
||||
clusterQueryWithoutShardKeyCommand.toBSON(), verbosity);
|
||||
auto opMsg = OpMsgRequestBuilder::create(
|
||||
vts, translatedNs.dbName(), explainClusterQueryWithoutShardKeyCmd);
|
||||
vts, translatedNss.dbName(), explainClusterQueryWithoutShardKeyCmd);
|
||||
return CommandHelpers::runCommandDirectly(opCtx, opMsg).getOwned();
|
||||
}();
|
||||
|
||||
@ -559,7 +579,7 @@ bool ClusterWriteCmd::runExplainWithoutShardKey(OperationContext* opCtx,
|
||||
clusterWriteWithoutShardKeyCommand.toBSON(), verbosity);
|
||||
|
||||
auto opMsg = OpMsgRequestBuilder::create(
|
||||
vts, translatedNs.dbName(), explainClusterWriteWithoutShardKeyCmd);
|
||||
vts, translatedNss.dbName(), explainClusterWriteWithoutShardKeyCmd);
|
||||
return CommandHelpers::runCommandDirectly(opCtx, opMsg).getOwned();
|
||||
}();
|
||||
|
||||
@ -567,10 +587,7 @@ bool ClusterWriteCmd::runExplainWithoutShardKey(OperationContext* opCtx,
|
||||
clusterQueryWithoutShardKeyExplainRes, clusterWriteWithoutShardKeyExplainRes);
|
||||
result->appendElementsUnique(output);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
void ClusterWriteCmd::executeWriteOpExplain(OperationContext* opCtx,
|
||||
@ -585,33 +602,104 @@ void ClusterWriteCmd::executeWriteOpExplain(OperationContext* opCtx,
|
||||
req = processFLEBatchExplain(opCtx, batchedRequest);
|
||||
}
|
||||
|
||||
auto nss = req ? req->getNS() : batchedRequest.getNS();
|
||||
const NamespaceString originalNss = req ? req->getNS() : batchedRequest.getNS();
|
||||
auto requestPtr = req ? req.get() : &batchedRequest;
|
||||
auto bodyBuilder = result->getBodyBuilder();
|
||||
auto originalRequestBSON = req ? req->toBSON() : requestObj;
|
||||
const auto originalRequestBSON = req ? req->toBSON() : requestObj;
|
||||
|
||||
// If we aren't running an explain for updateOne or deleteOne without shard key, continue and
|
||||
// run the original explain path.
|
||||
if (runExplainWithoutShardKey(opCtx, batchedRequest, nss, verbosity, &bodyBuilder)) {
|
||||
return;
|
||||
const size_t kMaxDatabaseCreationAttempts = 3;
|
||||
size_t attempts = 1;
|
||||
while (true) {
|
||||
try {
|
||||
// Implicitly create the db if it doesn't exist. There is no way right now to return an
|
||||
// explain on a sharded cluster if the database doesn't exist.
|
||||
// TODO (SERVER-108882) Stop creating the db once explain can be executed when th db
|
||||
// doesn't exist.
|
||||
cluster::createDatabase(opCtx, originalNss.dbName());
|
||||
|
||||
// If we aren't running an explain for updateOne or deleteOne without shard key,
|
||||
// continue and run the original explain path.
|
||||
if (runExplainWithoutShardKey(
|
||||
opCtx, batchedRequest, originalNss, verbosity, &bodyBuilder)) {
|
||||
return;
|
||||
}
|
||||
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
router.routeWithRoutingContext(
|
||||
opCtx,
|
||||
"explain write"_sd,
|
||||
[&](OperationContext* opCtx, RoutingContext& originalRoutingCtx) {
|
||||
auto translatedReqBSON = originalRequestBSON;
|
||||
auto translatedNss = originalNss;
|
||||
const auto targeter = CollectionRoutingInfoTargeter(opCtx, originalNss);
|
||||
|
||||
auto& unusedRoutingCtx = translateNssForRawDataAccordingToRoutingInfo(
|
||||
opCtx,
|
||||
originalNss,
|
||||
targeter,
|
||||
originalRoutingCtx,
|
||||
[&](const NamespaceString& bucketsNss) {
|
||||
translatedNss = bucketsNss;
|
||||
switch (batchedRequest.getBatchType()) {
|
||||
case BatchedCommandRequest::BatchType_Insert:
|
||||
translatedReqBSON = rewriteCommandForRawDataOperation<
|
||||
write_ops::InsertCommandRequest>(originalRequestBSON,
|
||||
translatedNss.coll());
|
||||
break;
|
||||
case BatchedCommandRequest::BatchType_Update:
|
||||
translatedReqBSON = rewriteCommandForRawDataOperation<
|
||||
write_ops::UpdateCommandRequest>(originalRequestBSON,
|
||||
translatedNss.coll());
|
||||
break;
|
||||
case BatchedCommandRequest::BatchType_Delete:
|
||||
translatedReqBSON = rewriteCommandForRawDataOperation<
|
||||
write_ops::DeleteCommandRequest>(originalRequestBSON,
|
||||
translatedNss.coll());
|
||||
break;
|
||||
}
|
||||
});
|
||||
unusedRoutingCtx.skipValidation();
|
||||
|
||||
const auto explainCmd =
|
||||
ClusterExplain::wrapAsExplain(translatedReqBSON, verbosity);
|
||||
|
||||
// We will time how long it takes to run the commands on the shards.
|
||||
Timer timer;
|
||||
|
||||
// Target the command to the shards based on the singleton batch item.
|
||||
BatchItemRef targetingBatchItem(requestPtr, 0);
|
||||
std::vector<AsyncRequestsSender::Response> shardResponses;
|
||||
commandOpWrite(opCtx,
|
||||
translatedNss,
|
||||
explainCmd,
|
||||
std::move(targetingBatchItem),
|
||||
targeter,
|
||||
&shardResponses);
|
||||
uassertStatusOK(ClusterExplain::buildExplainResult(
|
||||
makeBlankExpressionContext(opCtx, translatedNss),
|
||||
shardResponses,
|
||||
ClusterExplain::kWriteOnShards,
|
||||
timer.millis(),
|
||||
originalRequestBSON,
|
||||
&bodyBuilder));
|
||||
});
|
||||
break;
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
LOGV2_INFO(10370602,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(originalNss.dbName()),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
|
||||
if (++attempts >= kMaxDatabaseCreationAttempts) {
|
||||
// The maximum number of attempts has been reached, so the procedure fails as it
|
||||
// could be a logical error. At this point, it is unlikely that the error is caused
|
||||
// by concurrent drop database operations.
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto requestBSON = translateCmdObjForRawData(opCtx, batchedRequest, originalRequestBSON, nss);
|
||||
const auto explainCmd = ClusterExplain::wrapAsExplain(requestBSON, verbosity);
|
||||
|
||||
// We will time how long it takes to run the commands on the shards.
|
||||
Timer timer;
|
||||
|
||||
// Target the command to the shards based on the singleton batch item.
|
||||
BatchItemRef targetingBatchItem(requestPtr, 0);
|
||||
std::vector<AsyncRequestsSender::Response> shardResponses;
|
||||
commandOpWrite(opCtx, nss, explainCmd, std::move(targetingBatchItem), &shardResponses);
|
||||
uassertStatusOK(ClusterExplain::buildExplainResult(makeBlankExpressionContext(opCtx, nss),
|
||||
shardResponses,
|
||||
ClusterExplain::kWriteOnShards,
|
||||
timer.millis(),
|
||||
originalRequestBSON,
|
||||
&bodyBuilder));
|
||||
}
|
||||
|
||||
bool ClusterWriteCmd::InvocationBase::runImpl(OperationContext* opCtx,
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/commands/query_cmd/update_metrics.h"
|
||||
#include "mongo/db/commands/query_cmd/write_commands_common.h"
|
||||
#include "mongo/db/global_catalog/router_role_api/collection_routing_info_targeter.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/not_primary_error_tracker.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
@ -115,6 +116,7 @@ public:
|
||||
const NamespaceString& nss,
|
||||
const BSONObj& command,
|
||||
BatchItemRef targetingBatchItem,
|
||||
const CollectionRoutingInfoTargeter& targeter,
|
||||
std::vector<AsyncRequestsSender::Response>* results);
|
||||
|
||||
/**
|
||||
|
||||
@ -45,6 +45,7 @@
|
||||
#include "mongo/db/fle_crud.h"
|
||||
#include "mongo/db/global_catalog/catalog_cache/catalog_cache.h"
|
||||
#include "mongo/db/global_catalog/chunk_manager.h"
|
||||
#include "mongo/db/global_catalog/ddl/cluster_ddl.h"
|
||||
#include "mongo/db/global_catalog/router_role_api/cluster_commands_helpers.h"
|
||||
#include "mongo/db/global_catalog/type_collection_common_types_gen.h"
|
||||
#include "mongo/db/local_catalog/collection_uuid_mismatch_info.h"
|
||||
@ -623,8 +624,8 @@ Status _parseQueryStatsAndReturnEmptyResult(
|
||||
}
|
||||
|
||||
Status runAggregateImpl(OperationContext* opCtx,
|
||||
RoutingContext& routingCtx,
|
||||
const ClusterAggregate::Namespaces& namespaces,
|
||||
RoutingContext& originalRoutingCtx,
|
||||
ClusterAggregate::Namespaces namespaces,
|
||||
AggregateCommandRequest& req,
|
||||
const LiteParsedPipeline& liteParsedPipeline,
|
||||
const PrivilegeVector& privileges,
|
||||
@ -633,7 +634,7 @@ Status runAggregateImpl(OperationContext* opCtx,
|
||||
boost::optional<ExplainOptions::Verbosity> verbosity,
|
||||
BSONObjBuilder* res) {
|
||||
const auto pipelineDataSource = sharded_agg_helpers::getPipelineDataSource(liteParsedPipeline);
|
||||
if (!routingCtx.hasNss(namespaces.executionNss) &&
|
||||
if (!originalRoutingCtx.hasNss(namespaces.executionNss) &&
|
||||
sharded_agg_helpers::checkIfMustRunOnAllShards(namespaces.executionNss,
|
||||
pipelineDataSource)) {
|
||||
// Verify that there are shards present in the cluster. We must do this whenever we haven't
|
||||
@ -656,6 +657,22 @@ Status runAggregateImpl(OperationContext* opCtx,
|
||||
}
|
||||
}
|
||||
|
||||
boost::optional<CollectionRoutingInfoTargeter> collectionTargeter;
|
||||
auto& routingCtx = std::invoke([&]() -> RoutingContext& {
|
||||
if (originalRoutingCtx.hasNss(namespaces.executionNss)) {
|
||||
collectionTargeter = CollectionRoutingInfoTargeter(opCtx, namespaces.executionNss);
|
||||
return translateNssForRawDataAccordingToRoutingInfo(
|
||||
opCtx,
|
||||
namespaces.executionNss,
|
||||
*collectionTargeter,
|
||||
originalRoutingCtx,
|
||||
[&](const NamespaceString& tranlatedNss) {
|
||||
namespaces.executionNss = tranlatedNss;
|
||||
});
|
||||
}
|
||||
return originalRoutingCtx;
|
||||
});
|
||||
|
||||
// Given that in single shard/sharded cluster unsharded collection scenarios the query doesn't
|
||||
// go through retryOnViewError mongos doesn't know it's running against a view, and then it
|
||||
// passes the desugared query to the shard, so the shard knows it is running against a view but
|
||||
@ -1030,11 +1047,31 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
const size_t kMaxDatabaseCreationAttempts = 3;
|
||||
size_t attempts = 1;
|
||||
Status status{Status::OK()};
|
||||
try {
|
||||
status = router.routeWithRoutingContext(opCtx, comment, bodyFn);
|
||||
} catch (const DBException& ex) {
|
||||
status = ex.toStatus();
|
||||
bool isExplain = verbosity.has_value();
|
||||
while (true) {
|
||||
if (isExplain) {
|
||||
// Implicitly create the database for explain commands since, right now, there is no way
|
||||
// to respond properly when the database doesn't exist. Before the database was
|
||||
// implicitly created by CollectionRoutingInfoTargeter, now that we are not using this
|
||||
// class anymore still need to create a database.
|
||||
// TODO (SERVER-108882) Stop creating the db once explain can be executed when th db
|
||||
// doesn't exist.
|
||||
cluster::createDatabase(opCtx, namespaces.executionNss.dbName());
|
||||
}
|
||||
try {
|
||||
status = router.routeWithRoutingContext(opCtx, comment, bodyFn);
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
|
||||
if (isExplain && ++attempts < kMaxDatabaseCreationAttempts) {
|
||||
continue;
|
||||
}
|
||||
status = ex.toStatus();
|
||||
} catch (const DBException& ex) {
|
||||
status = ex.toStatus();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (!status.isOK() && !routerBodyStarted) {
|
||||
|
||||
@ -245,26 +245,6 @@ bool processResponseFromRemote(OperationContext* opCtx,
|
||||
// Dispatch was ok, note response
|
||||
batchOp.noteBatchResponse(*batch, batchedCommandResponse, &trackedErrors);
|
||||
|
||||
// If we are in a transaction, we must fail the batch immediately to facilitate aborting
|
||||
// transaction on any error (excluding WouldChangeOwningShard, which also will fail the
|
||||
// batch but in a more graceful manner that keeps the transaction open).
|
||||
if (TransactionRouter::get(opCtx)) {
|
||||
// Note: this returns a bad status if any part of the batch failed.
|
||||
auto batchStatus = batchedCommandResponse.toStatus();
|
||||
if (!batchStatus.isOK() && batchStatus != ErrorCodes::WouldChangeOwningShard) {
|
||||
auto newStatus = batchStatus.withContext(
|
||||
str::stream() << "Encountered error from " << shardInfo << " during a transaction");
|
||||
|
||||
// Throw when there is a transient transaction error since this
|
||||
// should be a top level error and not just a write error.
|
||||
if (hasTransientTransactionError(batchedCommandResponse)) {
|
||||
uassertStatusOK(newStatus);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Note if anything was stale
|
||||
auto staleConfigErrors = trackedErrors.getErrors(ErrorCodes::StaleConfig);
|
||||
const auto& staleDbErrors = trackedErrors.getErrors(ErrorCodes::StaleDbVersion);
|
||||
@ -286,6 +266,29 @@ bool processResponseFromRemote(OperationContext* opCtx,
|
||||
opCtx, cannotImplicitlyCreateCollectionErrors, &targeter);
|
||||
}
|
||||
|
||||
// If we are in a transaction, we must fail the batch immediately to facilitate aborting
|
||||
// transaction on any error (excluding WouldChangeOwningShard, which also will fail the
|
||||
// batch but in a more graceful manner that keeps the transaction open).
|
||||
// Note that if the error belongs to the stale routing family, the routing cache must be
|
||||
// invalidated before the error is thrown. In this case, the routing cache invalidation is
|
||||
// handled via the functions `noteStaleCollVersionResponses` and `noteStaleDbVersionResponses`.
|
||||
if (TransactionRouter::get(opCtx)) {
|
||||
// Note: this returns a bad status if any part of the batch failed.
|
||||
auto batchStatus = batchedCommandResponse.toStatus();
|
||||
if (!batchStatus.isOK() && batchStatus != ErrorCodes::WouldChangeOwningShard) {
|
||||
auto newStatus = batchStatus.withContext(
|
||||
str::stream() << "Encountered error from " << shardInfo << " during a transaction");
|
||||
|
||||
// Throw when there is a transient transaction error since this
|
||||
// should be a top level error and not just a write error.
|
||||
if (hasTransientTransactionError(batchedCommandResponse)) {
|
||||
uassertStatusOK(newStatus);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
} // processResponseFromRemote
|
||||
|
||||
|
||||
@ -772,24 +772,31 @@ BulkWriteReplyInfo execute(OperationContext* opCtx,
|
||||
refreshedTargeter = true;
|
||||
} else {
|
||||
stdx::unordered_map<NamespaceString, TrackedErrors> errorsPerNamespace;
|
||||
if (targetStatus.getValue() == WriteType::TimeseriesRetryableUpdate) {
|
||||
executeRetryableTimeseriesUpdate(opCtx, childBatches, bulkWriteOp);
|
||||
} else if (targetStatus.getValue() == WriteType::WithoutShardKeyOrId) {
|
||||
executeWriteWithoutShardKey(
|
||||
opCtx, targeters, childBatches, bulkWriteOp, errorsPerNamespace);
|
||||
} else if (targetStatus.getValue() == WriteType::WithoutShardKeyWithId) {
|
||||
executeNonTargetedWriteWithoutShardKeyWithId(
|
||||
opCtx, targeters, childBatches, bulkWriteOp, errorsPerNamespace);
|
||||
} else if (targetStatus.getValue() == WriteType::MultiWriteBlockingMigrations) {
|
||||
coordinateMultiUpdate(opCtx, childBatches, bulkWriteOp);
|
||||
} else {
|
||||
// Send the child batches and wait for responses.
|
||||
executeChildBatches(opCtx,
|
||||
targeters,
|
||||
childBatches,
|
||||
bulkWriteOp,
|
||||
errorsPerNamespace,
|
||||
/*allowShardKeyUpdatesWithoutFullShardKeyInQuery=*/boost::none);
|
||||
|
||||
try {
|
||||
if (targetStatus.getValue() == WriteType::TimeseriesRetryableUpdate) {
|
||||
executeRetryableTimeseriesUpdate(opCtx, childBatches, bulkWriteOp);
|
||||
} else if (targetStatus.getValue() == WriteType::WithoutShardKeyOrId) {
|
||||
executeWriteWithoutShardKey(
|
||||
opCtx, targeters, childBatches, bulkWriteOp, errorsPerNamespace);
|
||||
} else if (targetStatus.getValue() == WriteType::WithoutShardKeyWithId) {
|
||||
executeNonTargetedWriteWithoutShardKeyWithId(
|
||||
opCtx, targeters, childBatches, bulkWriteOp, errorsPerNamespace);
|
||||
} else if (targetStatus.getValue() == WriteType::MultiWriteBlockingMigrations) {
|
||||
coordinateMultiUpdate(opCtx, childBatches, bulkWriteOp);
|
||||
} else {
|
||||
// Send the child batches and wait for responses.
|
||||
executeChildBatches(
|
||||
opCtx,
|
||||
targeters,
|
||||
childBatches,
|
||||
bulkWriteOp,
|
||||
errorsPerNamespace,
|
||||
/*allowShardKeyUpdatesWithoutFullShardKeyInQuery=*/boost::none);
|
||||
}
|
||||
} catch (const DBException&) {
|
||||
bulkWriteOp.noteStaleResponses(targeters, errorsPerNamespace);
|
||||
throw;
|
||||
}
|
||||
|
||||
// If we saw any staleness errors, tell the targeters to invalidate their cache
|
||||
@ -1344,7 +1351,7 @@ void BulkWriteOp::processChildBatchResponseFromRemote(
|
||||
// they may be re-targeted if needed.
|
||||
noteChildBatchResponse(writeBatch, bwReply, errorsPerNamespace);
|
||||
} else {
|
||||
noteChildBatchError(writeBatch, childBatchStatus);
|
||||
noteChildBatchError(writeBatch, childBatchStatus, errorsPerNamespace);
|
||||
|
||||
// If we are in a transaction, we must abort execution on any error, excluding
|
||||
// WouldChangeOwningShard. We do not abort on WouldChangeOwningShard because the error is
|
||||
@ -1651,7 +1658,7 @@ void BulkWriteOp::processLocalChildBatchError(const TargetedWriteBatch& batch,
|
||||
: "from failing to target a host in the shard ")
|
||||
<< shardInfo);
|
||||
|
||||
noteChildBatchError(batch, status);
|
||||
noteChildBatchError(batch, status, boost::none);
|
||||
|
||||
LOGV2_DEBUG(8048100,
|
||||
4,
|
||||
@ -1662,8 +1669,10 @@ void BulkWriteOp::processLocalChildBatchError(const TargetedWriteBatch& batch,
|
||||
abortIfNeeded(responseStatus);
|
||||
}
|
||||
|
||||
void BulkWriteOp::noteChildBatchError(const TargetedWriteBatch& targetedBatch,
|
||||
const Status& status) {
|
||||
void BulkWriteOp::noteChildBatchError(
|
||||
const TargetedWriteBatch& targetedBatch,
|
||||
const Status& status,
|
||||
boost::optional<stdx::unordered_map<NamespaceString, TrackedErrors>&> errorsPerNamespace) {
|
||||
// Treat an error to get a batch response as failures of the contained write(s).
|
||||
const int numErrors =
|
||||
(_clientRequest.getOrdered() || _inTransaction) ? 1 : targetedBatch.getWrites().size();
|
||||
@ -1671,9 +1680,7 @@ void BulkWriteOp::noteChildBatchError(const TargetedWriteBatch& targetedBatch,
|
||||
createEmulatedErrorReply(status, numErrors, _clientRequest.getDbName().tenantId());
|
||||
|
||||
// This error isn't actually specific to any namespaces and so we do not want to track it.
|
||||
noteChildBatchResponse(targetedBatch,
|
||||
emulatedReply,
|
||||
/* errorsPerNamespace*/ boost::none);
|
||||
noteChildBatchResponse(targetedBatch, emulatedReply, errorsPerNamespace);
|
||||
}
|
||||
|
||||
void BulkWriteOp::noteWriteOpFinalResponse(
|
||||
|
||||
@ -307,7 +307,10 @@ public:
|
||||
* unordered writes that is all writes in the batch, and for ordered writes it is only the first
|
||||
* write (since we would stop after that failed and not attempt execution of further writes.)
|
||||
*/
|
||||
void noteChildBatchError(const TargetedWriteBatch& targetedBatch, const Status& status);
|
||||
void noteChildBatchError(
|
||||
const TargetedWriteBatch& targetedBatch,
|
||||
const Status& status,
|
||||
boost::optional<stdx::unordered_map<NamespaceString, TrackedErrors>&> errorsPerNamespace);
|
||||
|
||||
/**
|
||||
* Processes a local error encountered while trying to send a child batch to a shard. This could
|
||||
|
||||
@ -1941,7 +1941,7 @@ TEST_F(BulkWriteOpTest, MultiAndTargetedOrderedOpsStaleConfig) {
|
||||
{
|
||||
auto error = Status{StaleConfigInfo(nss0, *endpointA.shardVersion, boost::none, shardIdA),
|
||||
"Mock error: shard version mismatch"};
|
||||
bulkWriteOp.noteChildBatchError(*targeted[shardIdA], error);
|
||||
bulkWriteOp.noteChildBatchError(*targeted[shardIdA], error, boost::none);
|
||||
}
|
||||
|
||||
// Simulate OK response from shardB.
|
||||
@ -2903,7 +2903,7 @@ TEST_F(BulkWriteOpTest, SuccessfulShardRepliesAreSavedAfterRetargeting) {
|
||||
Status{StaleConfigInfo(
|
||||
nss0, ShardVersionFactory::make(ChunkVersion::IGNORED()), boost::none, shardIdB),
|
||||
"Mock error: shard version mismatch"};
|
||||
op.noteChildBatchError(*targeted[shardIdB], error);
|
||||
op.noteChildBatchError(*targeted[shardIdB], error, boost::none);
|
||||
|
||||
// We should have marked the write as ready so we can retarget as needed.
|
||||
ASSERT_EQ(op.getWriteOp_forTest(0).getWriteState(), WriteOpState_Ready);
|
||||
@ -2958,7 +2958,7 @@ TEST_F(BulkWriteOpTest, ShardGetsSuccessfullyRetargetedOnCannotRefreshCacheError
|
||||
// Simulate a ShardCannotRefreshDueToLocksHeld error from the shard.
|
||||
auto error = Status{ShardCannotRefreshDueToLocksHeldInfo(nss),
|
||||
"Mock error: Catalog cache busy in refresh"};
|
||||
op.noteChildBatchError(*targeted[shardId], error);
|
||||
op.noteChildBatchError(*targeted[shardId], error, boost::none);
|
||||
|
||||
// We should have marked the write as ready so we can retarget as needed.
|
||||
ASSERT_EQ(op.getWriteOp_forTest(0).getWriteState(), WriteOpState_Ready);
|
||||
@ -3033,7 +3033,7 @@ TEST_F(BulkWriteOpTest, UnorderedBulkInsertGetsRepeatedOnCannotRefreshShardCache
|
||||
auto error = Status{ShardCannotRefreshDueToLocksHeldInfo(nss),
|
||||
"Mock error: Catalog cache busy in refresh"};
|
||||
|
||||
op.noteChildBatchError(*targeted[shardIdB], error);
|
||||
op.noteChildBatchError(*targeted[shardIdB], error, boost::none);
|
||||
|
||||
// We should have marked the remaining writes as ready so we can retarget as needed.
|
||||
ASSERT_EQ(op.getWriteOp_forTest(0).getWriteState(), WriteOpState_Completed);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user