SERVER-123711 scan collection for non-compliant docs on constraint upgrade sharded (#53220)
Co-authored-by: Mickey J. Winters <mickey.winters@mongodb.com> GitOrigin-RevId: a933f9c34f76d5bfc6c97a50d4ff7d50c9ba04b5
This commit is contained in:
parent
650d9f458c
commit
25e034e700
@ -6,8 +6,6 @@
|
||||
* @tags: [
|
||||
* requires_fcv_90,
|
||||
* featureFlagConstraintValidationLevel,
|
||||
* assumes_unsharded_collection,
|
||||
* assumes_against_mongod_not_mongos,
|
||||
* ]
|
||||
*/
|
||||
|
||||
@ -19,11 +17,19 @@ const validator = {a: {$exists: true}};
|
||||
describe("collMod upgrade to constraint validationLevel", function () {
|
||||
beforeEach(() => {
|
||||
db[collName].drop();
|
||||
// Explicit create so the collection exists on standalone; in sharded passthroughs the
|
||||
// drop() hook may have already re-created it, in which case this is a no-op.
|
||||
assert.commandWorked(db.createCollection(collName));
|
||||
});
|
||||
|
||||
it("blocks upgrade when a document violates the validator", () => {
|
||||
assert.commandWorked(
|
||||
db.createCollection(collName, {validator: validator, validationLevel: "strict", validationAction: "warn"}),
|
||||
db.runCommand({
|
||||
collMod: collName,
|
||||
validator: validator,
|
||||
validationLevel: "strict",
|
||||
validationAction: "warn",
|
||||
}),
|
||||
);
|
||||
// Insert a non-compliant document. Succeeds because validationAction is "warn".
|
||||
assert.commandWorked(db[collName].insert({b: 1}));
|
||||
@ -40,18 +46,21 @@ describe("collMod upgrade to constraint validationLevel", function () {
|
||||
// Verify the scan-based error fired and surfaces a find query for the user.
|
||||
assert(
|
||||
res.errmsg.includes("Cannot upgrade validationLevel to 'constraint'"),
|
||||
"expected scan-based error message: " + res.errmsg,
|
||||
"expected scan-based error message",
|
||||
{res},
|
||||
);
|
||||
assert(
|
||||
res.errmsg.includes("db." + collName + ".find("),
|
||||
"expected find-query suggestion in errmsg: " + res.errmsg,
|
||||
);
|
||||
assert(res.errmsg.includes("$nor"), "expected $nor in suggested query: " + res.errmsg);
|
||||
assert(res.errmsg.includes("db." + collName + ".find("), "expected find-query suggestion in errmsg", {res});
|
||||
assert(res.errmsg.includes("$nor"), "expected $nor in suggested query", {res});
|
||||
});
|
||||
|
||||
it("allows upgrade when all documents conform to the validator", () => {
|
||||
assert.commandWorked(
|
||||
db.createCollection(collName, {validator: validator, validationLevel: "strict", validationAction: "warn"}),
|
||||
db.runCommand({
|
||||
collMod: collName,
|
||||
validator: validator,
|
||||
validationLevel: "strict",
|
||||
validationAction: "warn",
|
||||
}),
|
||||
);
|
||||
assert.commandWorked(db[collName].insert({a: 1}));
|
||||
|
||||
@ -71,13 +80,18 @@ describe("collMod upgrade to constraint validationLevel", function () {
|
||||
});
|
||||
|
||||
it("allows upgrade on an empty collection with a validator", () => {
|
||||
assert.commandWorked(db.createCollection(collName, {validator: validator, validationLevel: "strict"}));
|
||||
assert.commandWorked(db.runCommand({collMod: collName, validator: validator, validationLevel: "strict"}));
|
||||
assert.commandWorked(db.runCommand({collMod: collName, validationLevel: "constraint"}));
|
||||
});
|
||||
|
||||
it("allows re-upgrade after downgrade and validator change", () => {
|
||||
assert.commandWorked(
|
||||
db.createCollection(collName, {validator: validator, validationLevel: "strict", validationAction: "warn"}),
|
||||
db.runCommand({
|
||||
collMod: collName,
|
||||
validator: validator,
|
||||
validationLevel: "strict",
|
||||
validationAction: "warn",
|
||||
}),
|
||||
);
|
||||
// Insert a document with both 'a' and 'b' so it satisfies the updated validator later.
|
||||
assert.commandWorked(db[collName].insert({a: 1, b: 1}));
|
||||
|
||||
@ -3,6 +3,9 @@ filters:
|
||||
- "*":
|
||||
approvers:
|
||||
- 10gen/query
|
||||
- "collmod_upgrade_to_constraint_sharded.js":
|
||||
approvers:
|
||||
- 10gen/query-execution
|
||||
- "bulk_write*":
|
||||
approvers:
|
||||
- 10gen/query-execution-router
|
||||
|
||||
116
jstests/sharding/query/collmod_upgrade_to_constraint_sharded.js
Normal file
116
jstests/sharding/query/collmod_upgrade_to_constraint_sharded.js
Normal file
@ -0,0 +1,116 @@
|
||||
/**
|
||||
* Tests that collMod blocks upgrading validationLevel to 'constraint' when the sharded
|
||||
* collection contains documents that violate the validator, and allows the upgrade when all
|
||||
* documents conform. The scan is performed by the coordinator before pausing migrations.
|
||||
*
|
||||
* @tags: [
|
||||
* requires_fcv_90,
|
||||
* featureFlagConstraintValidationLevel,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {after, before, describe, it} from "jstests/libs/mochalite.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
describe("collMod upgrade to constraint validationLevel on sharded collections", function () {
|
||||
let st;
|
||||
let testDb;
|
||||
const dbName = jsTestName();
|
||||
const collName = "test";
|
||||
const validator = {a: {$exists: true}};
|
||||
|
||||
before(function () {
|
||||
st = new ShardingTest({shards: 2});
|
||||
testDb = st.s.getDB(dbName);
|
||||
|
||||
// Pin the coordinator to shard0 by making it the primary shard.
|
||||
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
assert.commandWorked(
|
||||
testDb.createCollection(collName, {
|
||||
validator: validator,
|
||||
validationLevel: "strict",
|
||||
validationAction: "warn",
|
||||
}),
|
||||
);
|
||||
|
||||
// Split: _id < 0 on shard0 (coordinator), _id >= 0 on shard1.
|
||||
st.shardColl(collName, {_id: 1}, {_id: 0}, {_id: 1}, dbName, true);
|
||||
|
||||
assert.commandWorked(testDb[collName].insert({_id: -1, a: 1}));
|
||||
assert.commandWorked(testDb[collName].insert({_id: 1, a: 1}));
|
||||
});
|
||||
|
||||
after(function () {
|
||||
st.stop();
|
||||
});
|
||||
|
||||
it("blocks upgrade when a document on the non-coordinating shard violates the validator", function () {
|
||||
// _id: 2 >= 0 routes to shard1 (non-coordinator). Succeeds because action is "warn".
|
||||
assert.commandWorked(testDb[collName].insert({_id: 2, b: 1}));
|
||||
|
||||
const res = assert.commandFailedWithCode(
|
||||
testDb.runCommand({
|
||||
collMod: collName,
|
||||
validationLevel: "constraint",
|
||||
validationAction: "error",
|
||||
}),
|
||||
12370902,
|
||||
);
|
||||
assert(res.errmsg.includes("Cannot upgrade validationLevel to 'constraint'"), "expected scan-based error", {
|
||||
res,
|
||||
});
|
||||
assert(res.errmsg.includes("db." + collName + ".find("), "expected find-query suggestion", {res});
|
||||
|
||||
assert.commandWorked(testDb[collName].deleteOne({_id: 2}));
|
||||
});
|
||||
|
||||
it("allows upgrade when all documents conform", function () {
|
||||
assert.commandWorked(
|
||||
testDb.runCommand({collMod: collName, validationLevel: "strict", validationAction: "warn"}),
|
||||
);
|
||||
assert.commandWorked(
|
||||
testDb.runCommand({
|
||||
collMod: collName,
|
||||
validationLevel: "constraint",
|
||||
validationAction: "error",
|
||||
}),
|
||||
);
|
||||
|
||||
// After upgrade, compliant inserts still work.
|
||||
assert.commandWorked(testDb[collName].insert({_id: 3, a: 1}));
|
||||
|
||||
// After upgrade, non-compliant inserts are rejected.
|
||||
assert.commandFailedWithCode(testDb[collName].insert({_id: 4, b: 1}), ErrorCodes.DocumentValidationFailure);
|
||||
});
|
||||
|
||||
it("blocks upgrade when a document on the coordinator shard violates the validator", function () {
|
||||
assert.commandWorked(
|
||||
testDb.runCommand({
|
||||
collMod: collName,
|
||||
validationLevel: "strict",
|
||||
validationAction: "warn",
|
||||
}),
|
||||
);
|
||||
|
||||
// _id: -2 < 0 routes to shard0, which is the coordinator (primary) shard.
|
||||
assert.commandWorked(testDb[collName].insert({_id: -2, b: 1}));
|
||||
|
||||
assert.commandFailedWithCode(
|
||||
testDb.runCommand({
|
||||
collMod: collName,
|
||||
validationLevel: "constraint",
|
||||
validationAction: "error",
|
||||
}),
|
||||
12370902,
|
||||
);
|
||||
|
||||
assert.commandWorked(testDb[collName].deleteOne({_id: -2}));
|
||||
assert.commandWorked(
|
||||
testDb.runCommand({
|
||||
collMod: collName,
|
||||
validationLevel: "constraint",
|
||||
validationAction: "error",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
@ -841,8 +841,6 @@ mongo_cc_library(
|
||||
"//src/mongo/db/commands/query_cmd:run_aggregate.cpp",
|
||||
"//src/mongo/db/commands/query_cmd:write_commands.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shuffle_list_command_results.cpp",
|
||||
"//src/mongo/db/matcher/doc_validation:constraint_validation_level_upgrade.cpp",
|
||||
"//src/mongo/db/shard_role/ddl:collmod_cmd.cpp",
|
||||
"//src/mongo/db/shard_role/ddl:create_command.cpp",
|
||||
"//src/mongo/db/shard_role/ddl:create_indexes_cmd.cpp",
|
||||
"//src/mongo/db/shard_role/ddl:drop_collection_cmd.cpp",
|
||||
@ -889,7 +887,6 @@ mongo_cc_library(
|
||||
"//src/mongo/db/commands/query_cmd:plan_cache_commands",
|
||||
"//src/mongo/db/commands/query_cmd:release_memory_cmd",
|
||||
"//src/mongo/db/commands/query_cmd:search_index_commands",
|
||||
"//src/mongo/db/index_builds:index_build_knobs_idl",
|
||||
"//src/mongo/db/index_builds:index_builds_coordinator",
|
||||
"//src/mongo/db/index_builds:multi_index_block",
|
||||
"//src/mongo/db/memory_tracking",
|
||||
@ -952,6 +949,17 @@ mongo_cc_library(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "collmod_command",
|
||||
srcs = [
|
||||
"//src/mongo/db/shard_role/ddl:collmod_cmd.cpp",
|
||||
],
|
||||
deps = [
|
||||
":standalone",
|
||||
"//src/mongo/db/matcher/doc_validation:constraint_validation_level_upgrade",
|
||||
],
|
||||
)
|
||||
|
||||
# Commands that should only be present in mongod
|
||||
mongo_cc_library(
|
||||
name = "mongod",
|
||||
@ -995,6 +1003,7 @@ mongo_cc_library(
|
||||
],
|
||||
deps = [
|
||||
"cluster_server_parameter_commands_invocation",
|
||||
"collmod_command",
|
||||
"core",
|
||||
"create_command",
|
||||
"dbcheck_command",
|
||||
|
||||
@ -43,6 +43,8 @@
|
||||
#include "mongo/db/global_catalog/sharding_catalog_client.h"
|
||||
#include "mongo/db/global_catalog/type_collection.h"
|
||||
#include "mongo/db/global_catalog/type_database_gen.h"
|
||||
#include "mongo/db/matcher/doc_validation/constraint_validation_level_upgrade.h"
|
||||
#include "mongo/db/repl/read_concern_args.h"
|
||||
#include "mongo/db/router_role/cluster_commands_helpers.h"
|
||||
#include "mongo/db/router_role/routing_cache/catalog_cache.h"
|
||||
#include "mongo/db/s/forwardable_operation_metadata.h"
|
||||
@ -55,6 +57,7 @@
|
||||
#include "mongo/db/shard_role/shard_catalog/coll_mod.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/commit_collection_metadata_locally.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/participant_block_gen.h"
|
||||
#include "mongo/db/shard_role/shard_role.h"
|
||||
#include "mongo/db/sharding_environment/client/shard.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/timeseries/catalog_helper.h"
|
||||
@ -343,6 +346,16 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
|
||||
|
||||
_saveCollectionInfoOnCoordinatorIfNecessary(opCtx);
|
||||
|
||||
if (_doc.getPhase() == Phase::kUnset &&
|
||||
_request.getValidationLevel() == ValidationLevelEnum::constraint) {
|
||||
// Only scan on the first execution.
|
||||
// TODO SERVER-125951: skip scan if collection is already at constraint level.
|
||||
uassertStatusOK(noDocumentsViolatingValidator(opCtx,
|
||||
_collInfo->nsForTargeting,
|
||||
PlacementConcern::kPretendUnsharded,
|
||||
makeClusterValidatorScanFn(opCtx)));
|
||||
}
|
||||
|
||||
auto isGranularityUpdate = (_request.getTimeseries().has_value() &&
|
||||
!_request.getTimeseries()->toBSON().isEmpty());
|
||||
uassert(6201808,
|
||||
|
||||
@ -21,6 +21,25 @@ mongo_cc_library(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "constraint_validation_level_upgrade",
|
||||
srcs = [
|
||||
"constraint_validation_level_upgrade.cpp",
|
||||
],
|
||||
hdrs = [
|
||||
"constraint_validation_level_upgrade.h",
|
||||
],
|
||||
deps = [
|
||||
"//src/mongo/db/auth:authprivilege",
|
||||
"//src/mongo/db/commands:standalone",
|
||||
"//src/mongo/db/pipeline:aggregation_request_helper",
|
||||
"//src/mongo/db/query/client_cursor:cursor_response_idl",
|
||||
"//src/mongo/db/shard_role",
|
||||
"//src/mongo/rpc",
|
||||
"//src/mongo/s/query/planner:cluster_aggregate",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "constraint_validation_level_upgrade_test",
|
||||
srcs = [
|
||||
@ -28,8 +47,8 @@ mongo_cc_unit_test(
|
||||
],
|
||||
tags = ["mongo_unittest_sixth_group"],
|
||||
deps = [
|
||||
":constraint_validation_level_upgrade",
|
||||
"//src/mongo/db:dbhelpers",
|
||||
"//src/mongo/db/commands:standalone",
|
||||
"//src/mongo/db/shard_role/shard_catalog:catalog_test_fixture",
|
||||
],
|
||||
)
|
||||
|
||||
@ -29,18 +29,17 @@
|
||||
|
||||
#include "mongo/db/matcher/doc_validation/constraint_validation_level_upgrade.h"
|
||||
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/db/auth/action_type.h"
|
||||
#include "mongo/db/auth/privilege.h"
|
||||
#include "mongo/db/auth/resource_pattern.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/commands/query_cmd/run_aggregate.h"
|
||||
#include "mongo/db/pipeline/aggregate_command_gen.h"
|
||||
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
|
||||
#include "mongo/db/query/client_cursor/cursor_response.h"
|
||||
#include "mongo/db/shard_role/shard_role.h"
|
||||
#include "mongo/rpc/op_msg_rpc_impls.h"
|
||||
#include "mongo/s/query/planner/cluster_aggregate.h"
|
||||
#include "mongo/util/str.h"
|
||||
|
||||
#include <vector>
|
||||
@ -48,10 +47,62 @@
|
||||
#include <boost/none.hpp>
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
Status makeViolatingValidatorStatus(const BSONObj& validator, StringData collName) {
|
||||
str::stream msg;
|
||||
msg << "Cannot upgrade validationLevel to 'constraint': the collection contains documents "
|
||||
"that do not satisfy the validator.";
|
||||
constexpr size_t kMaxValidatorInErrorMessage = 10 * 1024;
|
||||
StringBuilder validatorStr;
|
||||
validator.toString(validatorStr, /*isArray=*/false, /*full=*/true);
|
||||
auto validatorStrMaterialized =
|
||||
static_cast<size_t>(validatorStr.len()) < kMaxValidatorInErrorMessage
|
||||
? validatorStr.str()
|
||||
: "<your collection's validator>";
|
||||
msg << " Run db." << collName << ".find({\"$nor\": [" << validatorStrMaterialized
|
||||
<< "]}) to find non-compliant documents.";
|
||||
return {ErrorCodes::Error(12370902), msg};
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
// TODO SERVER-127395: Replace runAggregate/ClusterAggregate::runAggregate with public API calls
|
||||
// (e.g. CommandHelpers::runCommandDirectly) and pass isSharded as a bool instead of a function.
|
||||
ValidatorScanFn makeLocalValidatorScanFn(OperationContext* opCtx) {
|
||||
return
|
||||
[opCtx](AggregateCommandRequest& req, const PrivilegeVector& privs) -> StatusWith<BSONObj> {
|
||||
rpc::OpMsgReplyBuilder reply;
|
||||
if (auto s = runAggregate(
|
||||
opCtx, req, LiteParsedPipeline{req}, req.toBSON(), privs, boost::none, &reply);
|
||||
!s.isOK()) {
|
||||
return s;
|
||||
}
|
||||
auto b = reply.getBodyBuilder();
|
||||
CommandHelpers::appendSimpleCommandStatus(b, true);
|
||||
b.doneFast();
|
||||
return reply.releaseBody();
|
||||
};
|
||||
}
|
||||
|
||||
ValidatorScanFn makeClusterValidatorScanFn(OperationContext* opCtx) {
|
||||
return [opCtx](AggregateCommandRequest& req,
|
||||
const PrivilegeVector& privs) -> StatusWith<BSONObj> {
|
||||
auto ns = req.getNamespace();
|
||||
BSONObjBuilder result;
|
||||
if (auto s =
|
||||
ClusterAggregate::runAggregate(opCtx, {ns, ns}, req, privs, boost::none, &result);
|
||||
!s.isOK()) {
|
||||
return s;
|
||||
}
|
||||
return result.obj();
|
||||
};
|
||||
}
|
||||
|
||||
Status noDocumentsViolatingValidator(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
PlacementConcern placementConcern) {
|
||||
PlacementConcern placementConcern,
|
||||
ValidatorScanFn runAgg) {
|
||||
BSONObj validator;
|
||||
{
|
||||
auto coll =
|
||||
@ -80,29 +131,14 @@ Status noDocumentsViolatingValidator(OperationContext* opCtx,
|
||||
AggregateCommandRequest aggRequest(nss, std::move(pipeline));
|
||||
aggRequest.setHint(BSON("$natural" << 1));
|
||||
|
||||
auto aggCmdObj = aggRequest.toBSON();
|
||||
rpc::OpMsgReplyBuilder replyBuilder;
|
||||
PrivilegeVector privs{Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)};
|
||||
|
||||
// We use runAggregate here instead of something like DBDirectClient because in a sharded
|
||||
// environment we want the DDL coordinating shard to do targeting and send sub aggregations to
|
||||
// the other participants.
|
||||
auto status =
|
||||
runAggregate(opCtx,
|
||||
aggRequest,
|
||||
{aggRequest},
|
||||
aggCmdObj,
|
||||
{Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)},
|
||||
boost::none,
|
||||
&replyBuilder);
|
||||
if (!status.isOK()) {
|
||||
return status;
|
||||
auto resultOrStatus = runAgg(aggRequest, privs);
|
||||
if (!resultOrStatus.isOK()) {
|
||||
return resultOrStatus.getStatus();
|
||||
}
|
||||
|
||||
auto bodyBuilder = replyBuilder.getBodyBuilder();
|
||||
CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true);
|
||||
bodyBuilder.doneFast();
|
||||
|
||||
auto cursorResponse = CursorResponse::parseFromBSON(replyBuilder.releaseBody(),
|
||||
auto cursorResponse = CursorResponse::parseFromBSON(resultOrStatus.getValue(),
|
||||
nullptr,
|
||||
nss.tenantId(),
|
||||
SerializationContext::stateCommandReply());
|
||||
@ -114,19 +150,7 @@ Status noDocumentsViolatingValidator(OperationContext* opCtx,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
str::stream msg;
|
||||
msg << "Cannot upgrade validationLevel to 'constraint': the collection contains documents "
|
||||
"that do not satisfy the validator.";
|
||||
constexpr size_t kMaxValidatorInErrorMessage = 10 * 1024;
|
||||
StringBuilder validatorStr;
|
||||
validator.toString(validatorStr, /*isArray=*/false, /*full=*/true);
|
||||
auto validatorStrMaterialized =
|
||||
static_cast<size_t>(validatorStr.len()) < kMaxValidatorInErrorMessage
|
||||
? validatorStr.str()
|
||||
: "<your collection's validator>";
|
||||
msg << " Run db." << nss.coll() << ".find({\"$nor\": [" << validatorStrMaterialized
|
||||
<< "]}) to find non-compliant documents.";
|
||||
return {ErrorCodes::Error(12370902), msg};
|
||||
return makeViolatingValidatorStatus(validator, nss.coll());
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -30,23 +30,52 @@
|
||||
#pragma once
|
||||
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/base/status_with.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/db/auth/privilege.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/pipeline/aggregate_command_gen.h"
|
||||
#include "mongo/db/shard_role/transaction_resources.h"
|
||||
#include "mongo/util/modules.h"
|
||||
|
||||
#include <functional>
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* Reads the collection validator from the local catalog and runs an aggregation to find any
|
||||
* document that violates it. Returns a non-OK status if such a document exists, or Status::OK()
|
||||
* if all documents conform or if the collection has no validator.
|
||||
* Callback type for executing the violation-scan aggregate. Receives the pre-built request and
|
||||
* the required privilege vector; returns the raw cursor-response BSONObj or a non-OK status.
|
||||
* The shared impl handles everything else (collection read, pipeline, privilege construction,
|
||||
* cursor parsing, and error formatting).
|
||||
*/
|
||||
using ValidatorScanFn =
|
||||
std::function<StatusWith<BSONObj>(AggregateCommandRequest&, const PrivilegeVector&)>;
|
||||
|
||||
/**
|
||||
* Returns a ValidatorScanFn that runs the aggregate locally via runAggregate.
|
||||
* Use on standalone nodes and shard-local collMod execution.
|
||||
*/
|
||||
MONGO_MOD_PUBLIC ValidatorScanFn makeLocalValidatorScanFn(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* Returns a ValidatorScanFn that runs the aggregate via ClusterAggregate, fanning out to all
|
||||
* shards. Use on the DDL coordinator for sharded collections.
|
||||
*/
|
||||
MONGO_MOD_PUBLIC ValidatorScanFn makeClusterValidatorScanFn(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* Reads the collection validator from the local catalog, builds a violation-scan aggregate, and
|
||||
* executes it via 'runAgg'. Returns a non-OK status if any document violates the validator, or
|
||||
* Status::OK() if all documents conform or the collection has no validator.
|
||||
*
|
||||
* 'placementConcern' is used only when acquiring the collection to read its validator; the
|
||||
* subsequent aggregation acquires the collection itself or sends commands to remote shards.
|
||||
* 'placementConcern' is passed to the collection acquisition used to read the validator.
|
||||
* 'runAgg' is responsible only for dispatching the aggregate and returning the raw response;
|
||||
* the caller supplies the appropriate executor (local runAggregate or ClusterAggregate).
|
||||
*/
|
||||
MONGO_MOD_PUBLIC Status noDocumentsViolatingValidator(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
PlacementConcern placementConcern);
|
||||
PlacementConcern placementConcern,
|
||||
ValidatorScanFn runAgg);
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -39,9 +39,10 @@
|
||||
#include "mongo/db/shard_role/shard_role.h"
|
||||
#include "mongo/db/storage/write_unit_of_work.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
#include "mongo/util/uuid.h"
|
||||
|
||||
#include <boost/none.hpp>
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
@ -59,8 +60,12 @@ class ConstraintValidationLevelUpgradeTest : public CatalogTestFixture {};
|
||||
|
||||
TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsOKForNonExistentCollection) {
|
||||
const auto nss = NamespaceString::createNamespaceString_forTest("testdb", "testcoll");
|
||||
ASSERT_OK(noDocumentsViolatingValidator(
|
||||
operationContext(), nss, PlacementConcern(boost::none, ShardVersion::UNTRACKED())));
|
||||
auto* opCtx = operationContext();
|
||||
ASSERT_OK(
|
||||
noDocumentsViolatingValidator(opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
makeLocalValidatorScanFn(opCtx)));
|
||||
}
|
||||
|
||||
TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsOKForCollectionWithNoValidator) {
|
||||
@ -71,8 +76,31 @@ TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsOKForCollectionWithNoValidat
|
||||
options.uuid = UUID::gen();
|
||||
ASSERT_OK(createCollection(opCtx, nss, options, boost::none));
|
||||
|
||||
ASSERT_OK(noDocumentsViolatingValidator(
|
||||
opCtx, nss, PlacementConcern(boost::none, ShardVersion::UNTRACKED())));
|
||||
ASSERT_OK(
|
||||
noDocumentsViolatingValidator(opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
makeLocalValidatorScanFn(opCtx)));
|
||||
}
|
||||
|
||||
TEST_F(ConstraintValidationLevelUpgradeTest, PropagatesAggregateError) {
|
||||
const auto nss = NamespaceString::createNamespaceString_forTest("testdb", "testcoll");
|
||||
auto* opCtx = operationContext();
|
||||
|
||||
CollectionOptions options;
|
||||
options.validator = fromjson("{a: {$exists: true}}");
|
||||
options.uuid = UUID::gen();
|
||||
ASSERT_OK(createCollection(opCtx, nss, options, boost::none));
|
||||
|
||||
auto injectedStatus = Status{ErrorCodes::InternalError, "injected aggregate failure"};
|
||||
auto status = noDocumentsViolatingValidator(
|
||||
opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
[&](AggregateCommandRequest&, const PrivilegeVector&) -> StatusWith<BSONObj> {
|
||||
return injectedStatus;
|
||||
});
|
||||
ASSERT_EQ(status, injectedStatus);
|
||||
}
|
||||
|
||||
TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsOKForEmptyCollectionWithValidator) {
|
||||
@ -85,8 +113,11 @@ TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsOKForEmptyCollectionWithVali
|
||||
|
||||
ASSERT_OK(createCollection(opCtx, nss, options, boost::none));
|
||||
|
||||
ASSERT_OK(noDocumentsViolatingValidator(
|
||||
opCtx, nss, PlacementConcern(boost::none, ShardVersion::UNTRACKED())));
|
||||
ASSERT_OK(
|
||||
noDocumentsViolatingValidator(opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
makeLocalValidatorScanFn(opCtx)));
|
||||
}
|
||||
|
||||
TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsErrorWhenDocumentViolatesValidator) {
|
||||
@ -107,10 +138,13 @@ TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsErrorWhenDocumentViolatesVal
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
ASSERT_EQ(noDocumentsViolatingValidator(
|
||||
opCtx, nss, PlacementConcern(boost::none, ShardVersion::UNTRACKED()))
|
||||
.code(),
|
||||
12370902);
|
||||
ASSERT_EQ(
|
||||
noDocumentsViolatingValidator(opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
makeLocalValidatorScanFn(opCtx))
|
||||
.code(),
|
||||
12370902);
|
||||
}
|
||||
|
||||
TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsOKWhenAllDocumentsConformToValidator) {
|
||||
@ -131,8 +165,11 @@ TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsOKWhenAllDocumentsConformToV
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
ASSERT_OK(noDocumentsViolatingValidator(
|
||||
opCtx, nss, PlacementConcern(boost::none, ShardVersion::UNTRACKED())));
|
||||
ASSERT_OK(
|
||||
noDocumentsViolatingValidator(opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
makeLocalValidatorScanFn(opCtx)));
|
||||
}
|
||||
|
||||
TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsErrorWhenDocumentViolatesJsonSchemaValidator) {
|
||||
@ -153,10 +190,13 @@ TEST_F(ConstraintValidationLevelUpgradeTest, ReturnsErrorWhenDocumentViolatesJso
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
ASSERT_EQ(noDocumentsViolatingValidator(
|
||||
opCtx, nss, PlacementConcern(boost::none, ShardVersion::UNTRACKED()))
|
||||
.code(),
|
||||
12370902);
|
||||
ASSERT_EQ(
|
||||
noDocumentsViolatingValidator(opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
makeLocalValidatorScanFn(opCtx))
|
||||
.code(),
|
||||
12370902);
|
||||
}
|
||||
|
||||
TEST_F(ConstraintValidationLevelUpgradeTest, ErrorMessageTruncatesLargeValidator) {
|
||||
@ -180,8 +220,11 @@ TEST_F(ConstraintValidationLevelUpgradeTest, ErrorMessageTruncatesLargeValidator
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
auto status = noDocumentsViolatingValidator(
|
||||
opCtx, nss, PlacementConcern(boost::none, ShardVersion::UNTRACKED()));
|
||||
auto status =
|
||||
noDocumentsViolatingValidator(opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
makeLocalValidatorScanFn(opCtx));
|
||||
ASSERT_EQ(status.code(), 12370902);
|
||||
ASSERT_STRING_CONTAINS(status.reason(), "<your collection's validator>");
|
||||
ASSERT_STRING_OMITS(status.reason(), longTitle);
|
||||
@ -205,8 +248,11 @@ TEST_F(ConstraintValidationLevelUpgradeTest, ErrorMessageContainsValidatorAndCol
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
auto status = noDocumentsViolatingValidator(
|
||||
opCtx, nss, PlacementConcern(boost::none, ShardVersion::UNTRACKED()));
|
||||
auto status =
|
||||
noDocumentsViolatingValidator(opCtx,
|
||||
nss,
|
||||
PlacementConcern(boost::none, ShardVersion::UNTRACKED()),
|
||||
makeLocalValidatorScanFn(opCtx));
|
||||
ASSERT_EQ(status.code(), 12370902);
|
||||
ASSERT_STRING_CONTAINS(status.reason(), "Run db.testcoll.find({\"$nor\": [");
|
||||
StringBuilder validatorStr;
|
||||
|
||||
@ -772,6 +772,7 @@ mongo_cc_library(
|
||||
"//src/mongo/db/commands/server_status:server_status_core",
|
||||
"//src/mongo/db/global_catalog/ddl:notify_sharding_event_idl",
|
||||
"//src/mongo/db/index_builds:index_builds_coordinator",
|
||||
"//src/mongo/db/matcher/doc_validation:constraint_validation_level_upgrade",
|
||||
"//src/mongo/db/pipeline/process_interface:shardsvr_process_interface",
|
||||
"//src/mongo/db/query/query_settings:query_settings_service",
|
||||
"//src/mongo/db/repl:change_stream_oplog_notification",
|
||||
|
||||
@ -186,12 +186,14 @@ public:
|
||||
.isEnabledUseLastLTSFCVWhenUninitialized(
|
||||
VersionContext::getDecoration(opCtx), fcvSnapshot));
|
||||
|
||||
// TODO SERVER-125951: skip scan if collection is already at
|
||||
// 'constraint' level, since existing docs must already conform.
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
uassertStatusOK(noDocumentsViolatingValidator(
|
||||
opCtx,
|
||||
nss,
|
||||
PlacementConcern{oss.getDbVersion(nss.dbName()),
|
||||
oss.getShardVersion(nss)}));
|
||||
PlacementConcern{oss.getDbVersion(nss.dbName()), oss.getShardVersion(nss)},
|
||||
makeLocalValidatorScanFn(opCtx)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user