SERVER-114640 aggregate correctly reports empty result on dropped databases for stale routers (#46856)
GitOrigin-RevId: acbe2e416af8804ccd17ca289e0921c8f7c725e8
This commit is contained in:
parent
d7d974e095
commit
f9e1714897
1
.github/CODEOWNERS
vendored
1
.github/CODEOWNERS
vendored
@ -152,6 +152,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/core_min_batch_repeat_queries_multiplan_single_solutions_ese_gsm.yml @10gen/query-optimization @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/core_repeat_queries.yml @10gen/query-optimization @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/replica_sets_jscore_pqs* @10gen/query-execution-query-settings @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/shard* @10gen/server-catalog-and-routing @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/sharded_collections_pqs* @10gen/query-execution-query-settings @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/sharded_collections_query_shape_hash_stability* @10gen/query-execution-query-settings @10gen/query-integration-observability @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/*legacy_timeseries_no_rawdata* @10gen/server-catalog-and-routing-shard-catalog @svc-auto-approve-bot
|
||||
|
||||
@ -29,7 +29,7 @@ executor:
|
||||
mongos_options:
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
num_mongos: 3
|
||||
num_mongos: 1
|
||||
num_rs_nodes_per_shard: 2
|
||||
num_shards: 2
|
||||
old_bin_version: last_continuous
|
||||
|
||||
@ -62,6 +62,7 @@ executor:
|
||||
mongos_options:
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
num_mongos: 1
|
||||
num_rs_nodes_per_shard: 2
|
||||
num_shards: 2
|
||||
old_bin_version: last_continuous
|
||||
|
||||
@ -27,7 +27,7 @@ executor:
|
||||
mongos_options:
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
num_mongos: 2
|
||||
num_mongos: 1
|
||||
num_rs_nodes_per_shard: 2
|
||||
num_shards: 2
|
||||
old_bin_version: last_continuous
|
||||
|
||||
@ -60,7 +60,7 @@ executor:
|
||||
mongos_options:
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
num_mongos: 2
|
||||
num_mongos: 1
|
||||
num_rs_nodes_per_shard: 3
|
||||
num_shards: 2
|
||||
old_bin_version: last_continuous
|
||||
|
||||
@ -40,6 +40,9 @@ filters:
|
||||
- "replica_sets_jscore_pqs*":
|
||||
approvers:
|
||||
- 10gen/query-execution-query-settings
|
||||
- "shard*":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing
|
||||
- "sharded_collections_pqs*":
|
||||
approvers:
|
||||
- 10gen/query-execution-query-settings
|
||||
|
||||
@ -3,3 +3,5 @@ overrides:
|
||||
- "multiversion.sharded_collections_temporarily_disable_due_to_fcv_upgrade"
|
||||
- "multiversion.sharded_fixture_last_continuous_new_old_old_new"
|
||||
- "multiversion.sharding_multiversion_mongos_testdata_last_continuous"
|
||||
# TODO (SERVER-117693) - Remove this override once SERVER-114640 is backported to 8.2
|
||||
- "multiversion.sharding_temporarily_enforce_1_mongos"
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
base_suite: sharded_retryable_writes_downgrade
|
||||
overrides:
|
||||
- "multiversion.sharded_fixture_last_continuous_new_old_old_new"
|
||||
# TODO (SERVER-117693) - Remove this override once SERVER-114640 is backported to 8.2
|
||||
- "multiversion.sharding_temporarily_enforce_1_mongos"
|
||||
|
||||
@ -3,3 +3,5 @@ overrides:
|
||||
- "multiversion.sharding_temporarily_disable_due_to_fcv_upgrade"
|
||||
- "multiversion.sharded_fixture_last_continuous_new_old_old_new"
|
||||
- "multiversion.sharding_multiversion_mongos_testdata_last_continuous"
|
||||
# TODO (SERVER-117693) - Remove this override once SERVER-114640 is backported to 8.2
|
||||
- "multiversion.sharding_temporarily_enforce_1_mongos"
|
||||
|
||||
@ -5,6 +5,8 @@ overrides:
|
||||
- "kill_primary.sharding_reconfig_archive"
|
||||
- "kill_primary.sharding_shell_options"
|
||||
- "kill_primary.all_nodes_can_be_primary"
|
||||
# TODO (SERVER-117693) - Remove this override once SERVER-114640 is backported to 8.2
|
||||
- "multiversion.sharding_temporarily_enforce_1_mongos"
|
||||
|
||||
extends:
|
||||
- "replica_sets_stepdown_selector.kill_primary_jscore_passthrough_exclude_with_any_tags"
|
||||
|
||||
@ -178,6 +178,12 @@
|
||||
- requires_profiling
|
||||
- DISABLED_TEMPORARILY_DUE_TO_FCV_UPGRADE
|
||||
|
||||
- name: sharding_temporarily_enforce_1_mongos
|
||||
value:
|
||||
executor:
|
||||
fixture:
|
||||
num_mongos: 1
|
||||
|
||||
- name: multiversion_future_git_tag_exclude_files
|
||||
value:
|
||||
exclude_with_any_tags:
|
||||
|
||||
@ -1648,6 +1648,14 @@ export const authCommandsLib = {
|
||||
},
|
||||
{
|
||||
testname: "aggregate_merge_insert_documents",
|
||||
setup: function (db) {
|
||||
assert.commandWorked(db.getSiblingDB(firstDbName).foo.insert({}));
|
||||
assert.commandWorked(db.getSiblingDB(secondDbName).foo.insert({}));
|
||||
},
|
||||
teardown: function (db) {
|
||||
assert.commandWorked(db.getSiblingDB(firstDbName).dropDatabase());
|
||||
assert.commandWorked(db.getSiblingDB(secondDbName).dropDatabase());
|
||||
},
|
||||
command: function (state, args) {
|
||||
return {
|
||||
aggregate: "foo",
|
||||
@ -1724,6 +1732,14 @@ export const authCommandsLib = {
|
||||
},
|
||||
{
|
||||
testname: "aggregate_merge_replace_documents",
|
||||
setup: function (db) {
|
||||
assert.commandWorked(db.getSiblingDB(firstDbName).foo.insert({}));
|
||||
assert.commandWorked(db.getSiblingDB(secondDbName).foo.insert({}));
|
||||
},
|
||||
teardown: function (db) {
|
||||
assert.commandWorked(db.getSiblingDB(firstDbName).dropDatabase());
|
||||
assert.commandWorked(db.getSiblingDB(secondDbName).dropDatabase());
|
||||
},
|
||||
command: function (state, args) {
|
||||
return {
|
||||
aggregate: "foo",
|
||||
|
||||
@ -13,9 +13,6 @@
|
||||
|
||||
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
|
||||
|
||||
// TODO SERVER-114640 Remove this line once aggregate won't report NamespaceNotFound if the db gets dropped.
|
||||
TestData.pinToSingleMongos = true;
|
||||
|
||||
let _collCounter = 0;
|
||||
function getNewColl(db) {
|
||||
const collNamePrefix = jsTestName() + "_coll_";
|
||||
|
||||
@ -20,7 +20,7 @@ import {
|
||||
getTimeseriesCollForDDLOps,
|
||||
} from "jstests/core/timeseries/libs/viewless_timeseries_util.js";
|
||||
|
||||
// TODO SERVER-114640 Remove this line once aggregate won't report NamespaceNotFound if the db gets dropped.
|
||||
// TODO (SERVER-117871) remove once the topologyTime is guaranteed to be gossiped out to all routers.
|
||||
TestData.pinToSingleMongos = true;
|
||||
|
||||
assert.commandWorked(db.dropDatabase());
|
||||
|
||||
@ -17,9 +17,6 @@
|
||||
* ]
|
||||
*/
|
||||
|
||||
// TODO SERVER-114640 Remove this line once aggregate won't report NamespaceNotFound if the db gets dropped.
|
||||
TestData.pinToSingleMongos = true;
|
||||
|
||||
import {withSkipRetryOnNetworkError} from "jstests/concurrency/fsm_workload_helpers/stepdown_suite_helpers.js";
|
||||
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
||||
import {configureFailPointForRS} from "jstests/libs/fail_point_util.js";
|
||||
|
||||
@ -148,6 +148,10 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
bool isMergeStage() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
std::unique_ptr<StageParams> getStageParams() const override {
|
||||
return std::make_unique<MergeStageParams>(_originalBson);
|
||||
}
|
||||
|
||||
@ -463,6 +463,13 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this is the merge stage.
|
||||
*/
|
||||
virtual bool isMergeStage() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this stage is an initial source and should run just once on the entire
|
||||
* cluster.
|
||||
|
||||
@ -200,6 +200,10 @@ public:
|
||||
return !_stageSpecs.empty() && _stageSpecs.back()->isWriteStage();
|
||||
}
|
||||
|
||||
bool endsWithMergeStage() const {
|
||||
return !_stageSpecs.empty() && _stageSpecs.back()->isMergeStage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the pipeline has a $changeStream stage.
|
||||
*/
|
||||
|
||||
@ -1128,27 +1128,28 @@ Status ClusterAggregate::runAggregate(
|
||||
router.createDbImplicitlyOnRoute();
|
||||
}
|
||||
|
||||
// We'll use routerBodyStarted to distinguish whether an error was thrown before or after the
|
||||
// body function was executed.
|
||||
bool routerBodyStarted = false;
|
||||
// We'll use the aggregationStatus to distinguish whether an error was thrown by the
|
||||
// aggregation command or the refresh loop.
|
||||
Status aggregationStatus = Status::OK();
|
||||
auto bodyFn = [&](OperationContext* opCtx, RoutingContext& routingCtx) {
|
||||
routerBodyStarted = true;
|
||||
uassertStatusOK(runAggregateImpl(opCtx,
|
||||
routingCtx,
|
||||
namespaces,
|
||||
request,
|
||||
liteParsedPipeline,
|
||||
privileges,
|
||||
boost::none /* resolvedView */,
|
||||
boost::none /* originalRequest */,
|
||||
verbosity,
|
||||
result,
|
||||
ifrContext));
|
||||
aggregationStatus = runAggregateImpl(opCtx,
|
||||
routingCtx,
|
||||
namespaces,
|
||||
request,
|
||||
liteParsedPipeline,
|
||||
privileges,
|
||||
boost::none /* resolvedView */,
|
||||
boost::none /* originalRequest */,
|
||||
verbosity,
|
||||
result,
|
||||
ifrContext);
|
||||
|
||||
uassertStatusOK(aggregationStatus);
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
// Route the command and capture the returned status.
|
||||
Status status = std::invoke([&]() -> Status {
|
||||
Status finalStatus = std::invoke([&]() -> Status {
|
||||
try {
|
||||
return router.routeWithRoutingContext(comment, bodyFn);
|
||||
} catch (const DBException& ex) {
|
||||
@ -1156,23 +1157,40 @@ Status ClusterAggregate::runAggregate(
|
||||
}
|
||||
});
|
||||
|
||||
// Error handling for exceptions raised prior to executing the runAggregation operation.
|
||||
if (!status.isOK() && !routerBodyStarted) {
|
||||
// Error handling for exceptions raised by the refresh loop.
|
||||
// We can infer this by comparing the 2 status received:
|
||||
// - a failed finalStatus might be either an error from the refresh loop or from the the
|
||||
// aggregate command
|
||||
// - a failed aggregationStatus can only be from the aggregate command (note this includes
|
||||
// StaleDb and StaleConfig)
|
||||
// A refresh error is calculated as follows
|
||||
// - the finalStatus fails but the aggregation doesn't
|
||||
// - both finalStatus and aggregationStatus fails, but they are different
|
||||
bool isRefreshError =
|
||||
!finalStatus.isOK() && (aggregationStatus.isOK() || aggregationStatus != finalStatus);
|
||||
if (isRefreshError) {
|
||||
uassert(CollectionUUIDMismatchInfo(request.getDbName(),
|
||||
*request.getCollectionUUID(),
|
||||
std::string{request.getNamespace().coll()},
|
||||
boost::none),
|
||||
"Database does not exist",
|
||||
status != ErrorCodes::NamespaceNotFound || !request.getCollectionUUID());
|
||||
finalStatus != ErrorCodes::NamespaceNotFound || !request.getCollectionUUID());
|
||||
|
||||
if (liteParsedPipeline.startsWithCollStats()) {
|
||||
uassertStatusOKWithContext(status,
|
||||
uassertStatusOKWithContext(finalStatus,
|
||||
"Unable to retrieve information for $collStats stage");
|
||||
}
|
||||
|
||||
// $merge is the only stage that requires to report specifically NamespaceNotFound, instead
|
||||
// of returning an empty batch with status ok.
|
||||
if (finalStatus == ErrorCodes::NamespaceNotFound &&
|
||||
liteParsedPipeline.endsWithMergeStage()) {
|
||||
return finalStatus;
|
||||
}
|
||||
|
||||
// Return an empty cursor with the given status.
|
||||
return _parseQueryStatsAndReturnEmptyResult(opCtx,
|
||||
status,
|
||||
finalStatus,
|
||||
namespaces,
|
||||
request,
|
||||
liteParsedPipeline,
|
||||
@ -1182,7 +1200,7 @@ Status ClusterAggregate::runAggregate(
|
||||
result);
|
||||
}
|
||||
|
||||
return status;
|
||||
return finalStatus;
|
||||
}
|
||||
|
||||
Status ClusterAggregate::runAggregateWithRoutingCtx(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user