SERVER-118985 Strict validation for timeseries bucket insert/update via raw API (#50631)

GitOrigin-RevId: e9ebe2934fe4ae6ad49f766326b7c6653c4d86b2
This commit is contained in:
henrikedin 2026-04-10 08:43:00 -04:00 committed by MongoDB Bot
parent 49d56ca76a
commit dff2f03fc5
30 changed files with 990 additions and 65 deletions

View File

@ -503,9 +503,9 @@ export var TimeseriesTest = class {
* Checks for log entries generated by failed document validation.
* @param {DBCollection}
* @param {object} record
* @param {number} errorId
* @param {number|number[]} errorIds
*/
static checkForDocumentValidationFailureLog(coll, record, errorId = 6698300) {
static checkForDocumentValidationFailureLog(coll, record, errorIds = [6698300, 11634800]) {
// To avoid making log checks too strict, either the buckets namespace or view namespace is acceptable in the log message.
// Due to differences in EJSON format, only a subset of record data is checked in the log message.
const oidStr = JSON.parse(toJsonForLog(record))._id["$oid"];
@ -529,12 +529,14 @@ export var TimeseriesTest = class {
},
};
const errorIdList = Array.isArray(errorIds) ? errorIds : [errorIds];
assert.soon(function () {
return (
checkLog.checkContainsWithCountJson(conn, errorId, attrsMatcherBuckets, 1, null, relaxMatch) ||
checkLog.checkContainsWithCountJson(conn, errorId, attrsMatcherView, 1, null, relaxMatch)
return errorIdList.some(
(errorId) =>
checkLog.checkContainsWithCountJson(conn, errorId, attrsMatcherBuckets, 1, null, relaxMatch) ||
checkLog.checkContainsWithCountJson(conn, errorId, attrsMatcherView, 1, null, relaxMatch),
);
}, `Could not find log entries containing the following id: ${errorId}, and attrs: ${attrsMatcherBuckets} or attrs: ${attrsMatcherView}`);
}, `Could not find log entries containing any of the following ids: ${errorIdList}, and attrs: ${attrsMatcherBuckets} or attrs: ${attrsMatcherView}`);
}
};

View File

@ -34,17 +34,6 @@ toInsert.forEach((doc) => coll.insertOne(doc));
assert.eq(1, coll.find({t2: {$exists: true}}).toArray().length);
}
// Test findAndModify rawData update queries.
{
const doc = getTimeseriesCollForRawOps(coll).findAndModify({
query: {},
update: {$set: {"control.max._id": 100}},
...kRawOperationSpec,
});
assert.eq(1, getTimeseriesCollForRawOps(coll).find({"control.max._id": 100}).rawData().toArray().length);
assert.eq(numDocs - 1, coll.find({_id: {$gt: 0}}).toArray().length);
}
// Test findAndModify delete queries.
{
const doc = coll.findAndModify({query: {t2: {$exists: true}}, remove: true});

View File

@ -76,7 +76,7 @@ assert.commandWorked(
assert.commandWorked(
getTimeseriesCollForRawOps(tsColl).update(
{[metaFieldName]: 65},
{$set: {"control": bogusBucket.control, "data": bogusBucket.data}},
{$set: {_id: incrementOID(bogusBucket._id), "control": bogusBucket.control, "data": bogusBucket.data}},
{upsert: true, ...kRawOperationSpec},
),
);

View File

@ -79,6 +79,11 @@ const runTest = function (isCorrupted = false) {
if (isCorrupted) {
jsTestLog("Corrupting the bucket by adding an extra data field.");
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(
conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}),
);
// Corrupt the uncompressed bucket by adding an extra data field to it. This
// will make the bucket uncompressible.
let res = assert.commandWorked(
@ -88,6 +93,12 @@ const runTest = function (isCorrupted = false) {
getRawOperationSpec(db),
),
);
// Disable allowing inconsistent state on buckets
assert.commandWorked(
conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: false}),
);
jsTestLog(getTimeseriesCollForRawOps(db, coll).find().rawData().toArray());
assert.eq(res.modifiedCount, 1);
}

View File

@ -0,0 +1,99 @@
/**
* Tests a repair mechanism for timeseries buckets that fail strict validation. A bucket that is rejected by
* strict validation can still be decompressed by $_internalUnpackBucket in some cases. The repair uses an aggregation
* pipeline with $_internalUnpackBucket to extract the measurements and $out to write them to a
* new valid timeseries collection.
*
* This demonstrates a repair path that customers can use to recover measurements from bucket data
* that was created before strict bucket validation was enforced.
*
* @tags: [
* requires_timeseries,
* ]
*/
import {TimeseriesTest} from "jstests/core/timeseries/libs/timeseries.js";
import {getRawOperationSpec, getTimeseriesCollForRawOps} from "jstests/libs/raw_operation_utils.js";
const conn = MongoRunner.runMongod();
const db = conn.getDB(jsTestName());
const timeField = "t";
const metaField = "m";
// Create a timeseries collection and insert two measurements with known values, this gives us a valid
// bucket to base the test on.
const sourceColl = db.source;
assert.commandWorked(db.createCollection(sourceColl.getName(), {timeseries: {timeField, metaField}}));
const measurements = [
{[timeField]: ISODate("2024-01-16T20:48:00.000Z"), [metaField]: "sensor1", a: 10},
{[timeField]: ISODate("2024-01-16T20:48:30.000Z"), [metaField]: "sensor1", a: 20},
];
assert.commandWorked(sourceColl.insertMany(measurements));
// Extract the compressed bucket that was created for these measurements.
const sourceBuckets = getTimeseriesCollForRawOps(db, sourceColl).find().rawData().toArray();
assert.eq(1, sourceBuckets.length, "Expected one bucket: " + tojson(sourceBuckets));
const validBucket = sourceBuckets[0];
// Build an invalid bucket by replacing the _id with an OID whose embedded timestamp does not match control.min.t. This bucket can no longer be inserted into a timeseries collection.
const wrongTimestampOID = ObjectId("63b0ec000000000000000000");
const invalidBucket = Object.assign({}, validBucket, {_id: wrongTimestampOID});
const verifyResult = db.runCommand({
insert: getTimeseriesCollForRawOps(db, sourceColl).getName(),
documents: [invalidBucket],
...getRawOperationSpec(db),
});
assert(
verifyResult.writeErrors && verifyResult.writeErrors.length > 0,
"Expected strict validation to reject bucket with mismatched OID timestamp: " + tojson(verifyResult),
);
// Insert the invalid bucket into a regular (non-timeseries) collection. Regular collections
// do not run timeseries bucket validation, so the insert succeeds unconditionally.
const stagingColl = db.staging;
assert.commandWorked(stagingColl.insertOne(invalidBucket));
// Repair the invalid bucket using the internal timeseries aggregation pipeline:
// 1. $_internalUnpackBucket decompresses the bucket into individual measurement documents.
// 2. $out writes those measurements to a new timeseries collection, creating valid buckets
// with correct OID timestamps.
const outCollName = "repaired";
assert.commandWorked(
db.runCommand({
aggregate: stagingColl.getName(),
pipeline: [
{
$_internalUnpackBucket: {
timeField,
metaField,
bucketMaxSpanSeconds: NumberInt(3600),
},
},
{$out: {db: db.getName(), coll: outCollName, timeseries: {timeField, metaField}}},
],
cursor: {},
}),
);
// Verify that the repaired collection contains exactly the original measurements.
const repairedColl = db[outCollName];
const repairedDocs = repairedColl
.find()
.sort({[timeField]: 1})
.toArray();
assert.eq(measurements.length, repairedDocs.length, tojson(repairedDocs));
for (let i = 0; i < measurements.length; i++) {
assert.eq(measurements[i][timeField], repairedDocs[i][timeField], tojson(repairedDocs[i]));
assert.eq(measurements[i][metaField], repairedDocs[i][metaField], tojson(repairedDocs[i]));
assert.eq(measurements[i].a, repairedDocs[i].a, tojson(repairedDocs[i]));
}
// Confirm the repaired collection is a valid timeseries collection with no inconsistencies.
const validateResult = repairedColl.validate();
assert(validateResult.valid, tojson(validateResult));
assert.eq(0, validateResult.nNonCompliantDocuments, tojson(validateResult));
MongoRunner.stopMongod(conn);

View File

@ -36,6 +36,11 @@ const runTest = function (ordered) {
const timeFieldName = "t";
const metaFieldName = "m";
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(
conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}),
);
assert.commandWorked(
db.createCollection(collName, {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}),
);

View File

@ -19,6 +19,11 @@ TimeseriesTest.run((insert) => {
const db = conn.getDB(jsTestName());
const coll = db.coll;
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(
conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}),
);
assert.commandWorked(db.createCollection(coll.getName(), {timeseries: {timeField: "t", metaField: "m"}}));
const time = ISODate("2024-01-16T20:48:39.448Z");

View File

@ -45,6 +45,8 @@ let stats = assert.commandWorked(coll.stats());
assert.eq(stats.timeseries.numBucketsArchivedDueToTimeBackward, 1, tojson(stats));
jsTestLog("Add uncompressed data field to bucket, thus corrupting a compressed bucket.");
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}));
let res = assert.commandWorked(
getTimeseriesCollForRawOps(db, coll).updateOne(
{_id: bucketId},
@ -54,6 +56,9 @@ let res = assert.commandWorked(
);
assert.eq(res.modifiedCount, 1);
// Disable allowing inconsistent state on buckets
assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: false}));
jsTestLog(
"Insert third measurement. This will attempt to re-open the corrupted bucket, but should then freeze it and insert into a new bucket.",
);
@ -66,7 +71,7 @@ assert.eq(stats.timeseries.numBucketsReopened, 0, tojson(stats.timeseries));
assert.eq(stats.timeseries.numBucketsFrozen, 1, tojson(stats.timeseries));
assert.eq(stats.timeseries.numBucketQueriesFailed, 2, tojson(stats.timeseries));
TimeseriesTest.checkBucketReopeningsFailedCounters(stats.timeseries, {
numBucketReopeningsFailedDueToCompressionFailure: 1,
numBucketReopeningsFailedDueToValidator: 1,
});
jsTestLog("Remove the newly created bucket, so it will not be present for the next insert.");
@ -86,7 +91,7 @@ assert.eq(stats.timeseries.numBucketsReopened, 0, tojson(stats.timeseries));
assert.eq(stats.timeseries.numBucketsFrozen, 1, tojson(stats.timeseries));
assert.eq(stats.timeseries.numBucketQueriesFailed, 2, tojson(stats.timeseries));
TimeseriesTest.checkBucketReopeningsFailedCounters(stats.timeseries, {
numBucketReopeningsFailedDueToCompressionFailure: 1,
numBucketReopeningsFailedDueToValidator: 1,
numBucketReopeningsFailedDueToMarkedFrozen: 1,
});

View File

@ -19,6 +19,7 @@ const timeField = "my_time";
const metaField = "my_meta";
const timeseriesRawDoc = {
"_id": ObjectId("67cc72e03dc19da48a654135"),
"control": {
"version": 2,
"min": {

View File

@ -27,10 +27,10 @@ import {ReplSetTest} from "jstests/libs/replsettest.js";
assert.commandWorked(testDB.createCollection(collName, {timeseries: {timeField: "t"}}));
const insertDocFull = {
"_id": ObjectId("64d3c7004c83948224c45ddf"),
"_id": ObjectId("64d3c6104c83948224c45ddf"),
"control": {
"version": 1,
"min": {"_id": ObjectId("64d24b52469e18af504e506e"), "t": ISODate("2023-08-09T17:04:00Z")},
"min": {"_id": ObjectId("64d24b52469e18af504e506e"), "t": ISODate("2023-08-09T17:00:00Z")},
"max": {"_id": ObjectId("64d24b52469e18af504e506f"), "t": ISODate("2023-08-09T17:05:42.238Z")},
},
"data": {

View File

@ -23,6 +23,9 @@ assert(res.valid, tojson(res));
assert.eq(res.nNonCompliantDocuments, 0);
assert.eq(res.errors.length, 0);
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}));
// Sets the max timestamp to outside the bucket max span.
getTimeseriesCollForRawOps(db, coll).updateOne(
{meta: 1},

View File

@ -63,6 +63,8 @@ function validateTimeseriesBucketingParametersChangeFail(
data: {value: false},
}),
);
// Disable strict bucket validation as it relies on timeseriesBucketingParametersChanged to be set accurately, which we are not doing with the above fail point.
assert.commandWorked(db.adminCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}));
// This collMod should lead to timeseriesBucketingParametersChanged to True because the original
// bucketing parameters we set for our collections was different from the updated bucketing
@ -84,6 +86,7 @@ function validateTimeseriesBucketingParametersChangeFail(
assert.commandWorked(
db.adminCommand({"configureFailPoint": timeseriesBucketingParametersChangedInputValueName, "mode": "off"}),
);
assert.commandWorked(db.adminCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: false}));
testCount += 1;
collName = collNamePrefix + testCount;

View File

@ -58,11 +58,20 @@ coll.insertMany(
})),
{ordered: false},
);
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}));
// Write the incorrect control.count
getTimeseriesCollForRawOps(db, coll).updateOne(
{"meta.sensorId": 2, "control.version": TimeseriesTest.BucketVersion.kCompressedSorted},
{"$set": {"control.count": 10}},
getRawOperationSpec(db),
);
// Disable allowing inconsistent state on buckets
assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: false}));
res = coll.validate();
assert(!res.valid, tojson(res));
assert.eq(res.nNonCompliantDocuments, 1);

View File

@ -34,6 +34,11 @@ describe("Tests validate command checks indexes in the time-series buckets data
before(function () {
this.conn = MongoRunner.runMongod();
this.db = this.conn.getDB(jsTestName());
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(
this.conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}),
);
});
beforeEach(function () {

View File

@ -44,6 +44,9 @@ assert.eq(res.warnings.length, 0);
// Inserts documents into another bucket but manually changes the min timestamp. Expects
// warnings from validation.
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}));
testCount += 1;
collName = collNamePrefix + testCount;
db.getCollection(collName).drop();

View File

@ -184,6 +184,11 @@ describe("Test 'control.min' and 'control.max' match the uncompressed documents
before(function () {
this.conn = MongoRunner.runMongod();
this.db = this.conn.getDB(jsTestName());
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(
this.conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}),
);
});
it("Updates 'control' min temperature with corresponding update in recorded temperature, testing that valid updates do not return warnings.", function () {

View File

@ -21,6 +21,11 @@ describe("Validate Timeseries version Tests", function () {
this.conn = MongoRunner.runMongod();
this.db = this.conn.getDB(dbName);
this.db.getCollection(collName).drop();
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(
this.conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}),
);
});
beforeEach(function () {

View File

@ -49,6 +49,9 @@ assert(res.valid);
// Compressed bucket with the compressed time field in-order and version set to 3. This should fail,
// since this bucket's measurements are in-order on time field, meaning this bucket shouldn't have
// been promoted to v3.
// Allow setting an inconsistent state to the bucket so we can test that validate can detect it
assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}));
const invalidVersion3Doc = {
_id: ObjectId("65a6eb806ffc9fa4280ecac4"),
control: {

View File

@ -309,6 +309,11 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st.
jsTest.log.info("Time series collection successfully sharded with key: " + tojson(shardedCollections[0].key));
// Use a fixed base time so the test is deterministic.
const baseTime = new Date("2026-03-27T17:30:21.332Z");
const spuriousTimestamp = new Date(baseTime.getTime() + 3000);
const spuriousBucketMinTimestamp = new Date("2026-03-27T00:00:00.000Z");
// Split the collection at {sensor_id: "sensor_100", timestamp: MinKey}.
assert.commandWorked(
admin.runCommand({
@ -321,7 +326,7 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st.
assert.commandWorked(
admin.runCommand({
moveChunk: tsCollDDLOpsFullName,
find: {meta: "sensor_200", "control.min.timestamp": new Date()},
find: {meta: "sensor_200", "control.min.timestamp": baseTime},
to: st.shard1.shardName,
_waitForDelete: true,
}),
@ -330,7 +335,6 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st.
// Verify initial chunk distribution for time series collection.
// Insert some legitimate time series data through mongos.
const baseTime = new Date();
assert.commandWorked(
tsColl.insert({
sensor_id: "sensor_050",
@ -356,21 +360,22 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st.
const shard1TsColl = shard1DB.getCollection(tsColl.getName());
// Create a proper bucket document that would map to sensor_025 (which should be in first chunk)
const spuriousTimestamp = new Date(baseTime.getTime() + 3000);
assert.commandWorked(
getTimeseriesCollForRawOps(shard1DB, shard1TsColl).insert(
{
_id: ObjectId(),
_id: ObjectId("69c5c8800000000000000000"),
meta: "sensor_025", // This corresponds to sensor_id metaField
control: {
version: 1,
min: {
timestamp: spuriousTimestamp,
timestamp: spuriousBucketMinTimestamp,
temperature: 20.0,
data: "spurious_ts_document",
},
max: {
timestamp: spuriousTimestamp,
temperature: 20.0,
data: "spurious_ts_document",
},
closed: false,
},
@ -398,7 +403,7 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st.
let tsMoveResult = admin.runCommand({
moveChunk: tsCollDDLOpsFullName,
find: {meta: "sensor_050", "control.min.timestamp": new Date()}, // This finds the first chunk
find: {meta: "sensor_050", "control.min.timestamp": baseTime}, // This finds the first chunk
to: st.shard1.shardName,
_waitForDelete: true,
});
@ -431,7 +436,7 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st.
// Now the migration should succeed.
tsMoveResult = admin.runCommand({
moveChunk: tsCollDDLOpsFullName,
find: {meta: "sensor_050", "control.min.timestamp": new Date()},
find: {meta: "sensor_050", "control.min.timestamp": baseTime},
to: st.shard1.shardName,
_waitForDelete: true,
});

View File

@ -18,7 +18,9 @@ const dbName = "testDB";
const collName = "testColl";
const timeField = "time";
const metaField = "hostid";
const testTimestamp = ISODate();
const testTimestamp = ISODate("2023-08-09T17:05:42.238Z");
const testRoundedMinTimestamp = ISODate("2023-08-09T17:00:00Z");
const testBucketId = ObjectId("64d3c6104c83948224c45ddf");
// Connections.
const st = new ShardingTest({mongos: 2, shards: 2, rs: {nodes: 2}});
@ -165,9 +167,9 @@ runTest({
insert: getTimeseriesCollForRawOps(mongos0, collName),
documents: [
{
_id: ObjectId(),
_id: testBucketId,
control: {
min: {time: testTimestamp},
min: {time: testRoundedMinTimestamp},
max: {time: testTimestamp},
version: TimeseriesTest.BucketVersion.kUncompressed,
},

View File

@ -732,7 +732,7 @@ BSONColumn::BSONColumn(const char* buffer, size_t size)
}
BSONColumn::BSONColumn(BSONElement bin) {
tassert(5857700,
uassert(ErrorCodes::InvalidBSONColumn,
"Invalid BSON type for column",
bin.type() == BSONType::binData && bin.binDataType() == BinDataType::Column);
@ -743,7 +743,9 @@ BSONColumn::BSONColumn(BSONElement bin) {
BSONColumn::BSONColumn(BSONBinData bin)
: BSONColumn(static_cast<const char*>(bin.data), bin.length) {
tassert(6179300, "Invalid BSON type for column", bin.type == BinDataType::Column);
uassert(ErrorCodes::InvalidBSONColumn,
"Invalid BSON type for column",
bin.type == BinDataType::Column);
}
void BSONColumn::_initialValidate() {
@ -828,7 +830,9 @@ BSONColumnBlockBased::BSONColumnBlockBased(const char* buffer, size_t size)
BSONColumnBlockBased::BSONColumnBlockBased(BSONBinData bin)
: BSONColumnBlockBased(static_cast<const char*>(bin.data), bin.length) {
tassert(8471202, "Invalid BSON type for column", bin.type == BinDataType::Column);
uassert(ErrorCodes::InvalidBSONColumn,
"Invalid BSON type for column",
bin.type == BinDataType::Column);
}
BSONElement BSONColumnBlockBased::sum() const {

View File

@ -363,6 +363,7 @@ mongo_cc_library(
"//src/mongo/db/storage:storage_engine_impl",
"//src/mongo/db/storage:storage_options",
"//src/mongo/db/storage/key_string",
"//src/mongo/db/timeseries:timeseries_bucket_validation",
"//src/mongo/db/timeseries:timeseries_conversion_util",
"//src/mongo/db/timeseries:timeseries_extended_range",
"//src/mongo/db/timeseries:viewless_timeseries_collection_creation_helpers",

View File

@ -99,6 +99,7 @@
#include "mongo/db/storage/oplog_truncate_markers.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/timeseries/timeseries_bucket_validation.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_extended_range.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
@ -599,32 +600,51 @@ std::pair<Collection::SchemaValidationResult, Status> CollectionImpl::checkValid
return {SchemaValidationResult::kError, status};
}
try {
if (exec::matcher::matchesBSON(validatorMatchExpr, document))
return {SchemaValidationResult::kPass, Status::OK()};
} catch (DBException&) {
};
BSONObj generatedError = doc_validation_error::generateError(*validatorMatchExpr, document);
static constexpr auto kValidationFailureErrorStr = "Document failed validation"_sd;
status = Status(doc_validation_error::DocumentValidationFailureInfo(generatedError),
kValidationFailureErrorStr);
switch (validationActionOrDefault(_metadata->options.validationAction)) {
case ValidationActionEnum::warn:
if (validationLevelOrDefault(_metadata->options.validationLevel) ==
ValidationLevelEnum::constraint) {
// Warn is prohibited for constraint validationLevel.
return {SchemaValidationResult::kError, status};
// Regular schema validation for everything except timeseries collections which uses a stricter
// validation for bucket documents.
if (!isTimeseriesCollection() || gTimeseriesDisableStrictBucketValidator.load()) {
try {
if (exec::matcher::matchesBSON(validatorMatchExpr, document)) {
return {SchemaValidationResult::kPass, Status::OK()};
}
return {SchemaValidationResult::kWarn, status};
case ValidationActionEnum::error:
return {SchemaValidationResult::kError, status};
case ValidationActionEnum::errorAndLog:
return {SchemaValidationResult::kErrorAndLog, status};
} catch (DBException&) {
};
BSONObj generatedError = doc_validation_error::generateError(*validatorMatchExpr, document);
static constexpr auto kValidationFailureErrorStr = "Document failed validation"_sd;
status = Status(doc_validation_error::DocumentValidationFailureInfo(generatedError),
kValidationFailureErrorStr);
switch (validationActionOrDefault(_metadata->options.validationAction)) {
case ValidationActionEnum::warn:
if (validationLevelOrDefault(_metadata->options.validationLevel) ==
ValidationLevelEnum::constraint) {
// Warn is prohibited for constraint validationLevel.
return {SchemaValidationResult::kError, status};
}
return {SchemaValidationResult::kWarn, status};
case ValidationActionEnum::error:
return {SchemaValidationResult::kError, status};
case ValidationActionEnum::errorAndLog:
return {SchemaValidationResult::kErrorAndLog, status};
}
MONGO_UNREACHABLE_TASSERT(7488702);
}
MONGO_UNREACHABLE_TASSERT(7488702);
// Strict validation for bucket documents in timeseries collections
try {
timeseries::validateBucketConsistency(this, document);
return {SchemaValidationResult::kPass, Status::OK()};
} catch (DBException& ex) {
// For strict timeseries validation we ensure that we only return kError or kErrorAndLog.
return {validationActionOrDefault(_metadata->options.validationAction) ==
ValidationActionEnum::errorAndLog
? SchemaValidationResult::kErrorAndLog
: SchemaValidationResult::kError,
Status(doc_validation_error::DocumentValidationFailureInfo(document),
ex.toStatus().toString())};
};
}
Status CollectionImpl::checkValidationAndParseResult(OperationContext* opCtx,

View File

@ -77,6 +77,19 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "timeseries_bucket_validation",
srcs = [
"timeseries_bucket_validation.cpp",
],
deps = [
":timeseries_extended_range",
":timeseries_options",
"//src/mongo/db:server_base",
"//src/mongo/db/timeseries/bucket_catalog:bucket_catalog_utils",
],
)
mongo_cc_library(
name = "timeseries_test_fixture",
srcs = [

View File

@ -21,7 +21,6 @@ mongo_cc_library(
"bucket_metadata.cpp",
"bucket_state_registry.cpp",
"execution_stats.cpp",
"flat_bson.cpp",
"global_bucket_catalog.cpp",
"measurement_map.cpp",
"reopening.cpp",
@ -29,6 +28,7 @@ mongo_cc_library(
"write_batch.cpp",
],
deps = [
":bucket_catalog_utils",
"//src/mongo/bson/column",
"//src/mongo/db:dbdirectclient",
"//src/mongo/db:record_id_helpers",
@ -43,6 +43,16 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "bucket_catalog_utils",
srcs = [
"flat_bson.cpp",
],
deps = [
"//src/mongo/db:server_base",
],
)
mongo_cc_unit_test(
name = "db_bucket_catalog_test",
srcs = [

View File

@ -1801,6 +1801,9 @@ TEST_F(BucketCatalogTest, SchemaChanges) {
}
TEST_F(BucketCatalogTest, ReopenMalformedBucket) {
RAIIServerParameterControllerForTest allowCorruptBucket(
"timeseriesDisableStrictBucketValidator", true);
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
"control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1},
@ -1925,7 +1928,7 @@ TEST_F(BucketCatalogTest, ReopenMalformedBucket) {
TEST_F(BucketCatalogTest, ReopenMixedSchemaDataBucket) {
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"02091c2c050b7495eaef4581"},
R"({"_id":{"$oid":"63091ca4050b7495eaef4581"},
"control":{"version":1,
"min":{"_id":{"$oid":"63091c30138e9261fd70a903"},
"time":{"$date":"2022-08-26T19:19:00Z"},
@ -2193,7 +2196,7 @@ TEST_F(BucketCatalogTest, RehydrateMalformedBucket) {
TEST_F(BucketCatalogTest, RehydrateMixedSchemaDataBucket) {
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"02091c2c050b7495eaef4581"},
R"({"_id":{"$oid":"63091ca4050b7495eaef4581"},
"control":{"version":1,
"min":{"_id":{"$oid":"63091c30138e9261fd70a903"},
"time":{"$date":"2022-08-26T19:19:00Z"},
@ -2377,7 +2380,7 @@ TEST_F(BucketCatalogTest, ReopeningFailedDueToMinMaxCalculation) {
}
using BucketCatalogTestDeathTest = BucketCatalogTest;
DEATH_TEST_F(BucketCatalogTestDeathTest, ReopeningFailedDueToCompression, "invariant") {
TEST_F(BucketCatalogTest, ReopeningFailedDueToCompression) {
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
"control":{"version":2,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1},
@ -2389,7 +2392,7 @@ DEATH_TEST_F(BucketCatalogTestDeathTest, ReopeningFailedDueToCompression, "invar
AutoGetCollection autoColl(_opCtx, _resolveTimeseriesNss(_ns1), MODE_IX);
BSONObj compressedBucketDoc = _getCompressedBucketDoc(bucketDoc);
std::ignore = _reopenBucket(*autoColl, bucketDoc);
ASSERT_NOT_OK(_reopenBucket(*autoColl, bucketDoc));
}
TEST_F(BucketCatalogTest, ArchivingAndClosingUnderSideBucketCatalogMemoryPressure) {

View File

@ -180,6 +180,14 @@ server_parameters:
validator: {gte: 1}
redact: false
"timeseriesDisableStrictBucketValidator":
description: "Disable the strict bucket validator for timeseries collection and use the legacy schema validator instead."
set_at: [startup, runtime]
cpp_vartype: "AtomicWord<bool>"
cpp_varname: "gTimeseriesDisableStrictBucketValidator"
default: false
redact: false
enums:
BucketGranularity:
description:

View File

@ -0,0 +1,618 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/timeseries/timeseries_bucket_validation.h"
#include "mongo/bson/column/bsoncolumn_expressions.h"
#include "mongo/bson/column/bsoncolumn_helpers.h"
#include "mongo/db/timeseries/bucket_catalog/flat_bson.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_extended_range.h"
#include <charconv>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
namespace mongo::timeseries {
namespace {
/**
* Performs a rate limited log of an exception. Maximum 10 logs every 10 seconds are allowed.
*/
void logExceptionRateLimited(const DBException& ex) {
// Atomics to implement lockless log rate limiting
static AtomicWord<int64_t> lastLogTime{std::numeric_limits<int64_t>::min()};
static AtomicWord<int64_t> numErrorsSinceAdvanceLogTime{0};
static AtomicWord<int64_t> numErrorsTotal{0};
// Keep track on how many times we've hit this in total.
auto total = numErrorsTotal.addAndFetch(1);
// Perform a logging every 10 seconds
auto now = Date_t::now();
if (now > Date_t::fromMillisSinceEpoch(lastLogTime.load()) + Seconds(10)) {
// Update number of logs we've done and check if we should advance the time
int64_t numErrorsSinceTimeUpdate = numErrorsSinceAdvanceLogTime.addAndFetch(1);
if (numErrorsSinceTimeUpdate > 10) {
lastLogTime.store(now.toMillisSinceEpoch());
}
// Perform log. This is internally serialized so important that we have a
// backoff.
LOGV2_WARNING(11898500,
"Strict timeseries bucket validation failed",
"total"_attr = total,
"error"_attr = ex);
}
}
/**
* Attempts to parse the field name to integer.
*/
int _idxInt(StringData idx) {
int value = INT_MIN;
auto [ptr, ec] = std::from_chars(idx.data(), idx.data() + idx.size(), value);
// Ensure that the parsing consume the entire buffer
if (ec != std::errc{} || ptr != idx.data() + idx.size()) {
return INT_MIN;
}
return value;
}
/**
* Validates an uncompressed column against expected min/max.
*/
void _validateUncompressedMinMax(StringData fieldName,
BSONElement data,
BSONElement min,
BSONElement max,
int maxCount,
const CollatorInterface* collator) {
// The MinMax type calculates min/max for both scalars and nested objects where the results
// needs to be a merged element-wise min/max.
tracking::Context trackingContext;
timeseries::bucket_catalog::MinMax minmax{trackingContext};
// Checks that indices are in increasing order and within the correct range.
int prevIdx = INT_MIN;
for (const auto& metric : data.Obj()) {
auto idx = _idxInt(metric.fieldNameStringData());
uassert(ErrorCodes::BadValue,
fmt::format("The index '{}' in time-series bucket data field '{}' is "
"not in increasing order",
metric.fieldNameStringData(),
fieldName),
idx > prevIdx);
uassert(ErrorCodes::BadValue,
fmt::format("The index '{}' in time-series bucket data field '{}' is "
"out of range",
metric.fieldNameStringData(),
fieldName),
idx <= maxCount);
uassert(ErrorCodes::BadValue,
fmt::format("The index '{}' in time-series bucket data field '{}' is "
"negative or non-numerical",
metric.fieldNameStringData(),
fieldName),
idx >= 0);
minmax.update(metric.wrap(fieldName), boost::none, collator);
prevIdx = idx;
}
uassert(ErrorCodes::BadValue,
fmt::format("Incorrect column data min for field '{}'. Data min '{}' is different "
"than control.min '{}'.",
fieldName,
minmax.min().toString(),
min.wrap().toString()),
minmax.min().woCompare(min.wrap(),
/*ordering=*/BSONObj(),
BSONObj::ComparisonRules::kConsiderFieldName |
BSONObj::ComparisonRules::kIgnoreFieldOrder,
collator) == 0);
uassert(ErrorCodes::BadValue,
fmt::format("Incorrect column data max for field '{}'. Data min '{}' is different "
"than control.max '{}'.",
fieldName,
minmax.max().toString(),
max.wrap().toString()),
minmax.max().woCompare(max.wrap(),
/*ordering=*/BSONObj(),
BSONObj::ComparisonRules::kConsiderFieldName |
BSONObj::ComparisonRules::kIgnoreFieldOrder,
collator) == 0);
}
/**
* Validates a compressed column against expected count and min/max.
*/
void _validateCompressedMinMax(boost::intrusive_ptr<BSONElementStorage>& allocator,
StringData fieldName,
BSONElement data,
BSONElement min,
BSONElement max,
int expectedCount,
const CollatorInterface* collator) {
uassert(ErrorCodes::BadValue,
fmt::format("Invalid bucket data type. Expected binData, but got {}.", data.type()),
data.type() == BSONType::binData);
int len = 0;
const char* binary = data.binData(len);
BinDataType type = data.binDataType();
uassert(ErrorCodes::BadValue,
fmt::format("Invalid bucket data binData subtype. Expected 7, but got {}.", type),
type == BinDataType::Column);
// All columns should have the count as stored in the control object.
size_t cnt = bsoncolumn::count(binary, len);
uassert(ErrorCodes::BadValue,
fmt::format("Incorrect column data count for field '{}'. Expected {}, but found {}.",
fieldName,
expectedCount,
cnt),
expectedCount == static_cast<int>(cnt));
// Scalar types can use a basic BSON ordering to calculate min/max. However objects/arrays
// stores element-wise min/max in the control block where the data is merged from the entire
// column content.
if (min.type() == BSONType::object || min.type() == BSONType::array ||
max.type() == BSONType::object || max.type() == BSONType::array) {
tracking::Context trackingContext;
timeseries::bucket_catalog::MinMax minmax{trackingContext};
// Decompress the column and calculate element-wise merged min/max for this column.
for (auto&& elem : BSONColumn(binary, len)) {
if (!elem.eoo()) {
minmax.update(elem.wrap(fieldName), boost::none, collator);
}
}
uassert(ErrorCodes::BadValue,
fmt::format("Incorrect column data min for field '{}'. Data min '{}' is different "
"than control.min '{}'.",
fieldName,
minmax.min().toString(),
min.wrap().toString()),
minmax.min().woCompare(min.wrap(),
/*ordering=*/BSONObj(),
BSONObj::ComparisonRules::kConsiderFieldName |
BSONObj::ComparisonRules::kIgnoreFieldOrder,
collator) == 0);
uassert(ErrorCodes::BadValue,
fmt::format("Incorrect column data max for field '{}'. Data min '{}' is different "
"than control.max '{}'.",
fieldName,
minmax.max().toString(),
max.wrap().toString()),
minmax.max().woCompare(max.wrap(),
/*ordering=*/BSONObj(),
BSONObj::ComparisonRules::kConsiderFieldName |
BSONObj::ComparisonRules::kIgnoreFieldOrder,
collator) == 0);
} else {
// Scalar types can use a fast-path to calculate min/max from the compressed column directly
// without materializing the entire content.
auto minmaxElems = bsoncolumn::minmax<bsoncolumn::BSONElementMaterializer>(
binary, len, allocator, collator);
uassert(ErrorCodes::BadValue,
fmt::format("Incorrect column data min for field '{}'. Data min '{}' is different "
"than control.min '{}'.",
fieldName,
minmaxElems.first.toString(),
min.toString()),
minmaxElems.first.woCompare(
min, BSONObj::ComparisonRules::kIgnoreFieldOrder, collator) == 0);
uassert(ErrorCodes::BadValue,
fmt::format("Incorrect column data max for field '{}'. Data min '{}' is different "
"than control.max '{}'.",
fieldName,
minmaxElems.second.toString(),
max.toString()),
minmaxElems.second.woCompare(
max, BSONObj::ComparisonRules::kIgnoreFieldOrder, collator) == 0);
}
}
/**
* Validates an uncompressed time column against bucket _id, bucket time span and expected min/max.
* Returns the element count.
*/
int _validateUncompressedTimeField(const TimeseriesOptions& timeseriesOptions,
BSONElement data,
BSONElement min,
BSONElement max,
const CollatorInterface* collator) {
tracking::Context trackingContext;
timeseries::bucket_catalog::MinMax minmax{trackingContext};
int cnt = 0;
for (const auto& metric : data.Obj()) {
// Checks that indices are consecutively increasing numbers starting from 0.
auto idx = _idxInt(metric.fieldNameStringData());
uassert(ErrorCodes::BadValue,
fmt::format("Time-series time field '{}' is not in consecutively increasing order. "
"Got index '{}' when '{}' is expected.",
metric.fieldNameStringData(),
idx,
cnt),
idx == cnt);
minmax.update(metric.wrap(timeseriesOptions.getTimeField()), boost::none, collator);
++cnt;
}
// With measurement-level deletes (deletes with non-metafield filters) it is possible
// that the earliest measurements got deleted. Since we keep the bucket's minTime
// unchanged in that case, we cannot rely on the minTime always corresponding with what
// the actual minimum measurement time is. We can, however, rely on the fact that the
// rounded time of the earliest measurement is at greater than or equal to the
// control.min time-field.
auto minTimestampsMatch =
timeseries::roundTimestampToGranularity(
minmax.min().getField(timeseriesOptions.getTimeField()).Date(), timeseriesOptions) >=
timeseries::roundTimestampToGranularity(min.Date(), timeseriesOptions);
// For the maximum check, if we had measurements that were pre-1970 (the lower end of
// the extended range check), it is possible that the control.max value gets rounded up
// to the epoch and is greater than the observed maximum timestamp. In the case where
// the control.min is earlier than the epoch, we should relax the check.
auto maxTimestampsMatch =
(minmax.min().getField(timeseriesOptions.getTimeField()).Date() < Date_t())
? max.Date() >= minmax.max().getField(timeseriesOptions.getTimeField()).Date()
: max.Date() == minmax.max().getField(timeseriesOptions.getTimeField()).Date();
uassert(
ErrorCodes::BadValue,
fmt::format("Mismatch between time-series control and observed min or max for field {}. "
"Control had min {} and max {}, but observed data had min {} and max {}.",
timeseriesOptions.getTimeField(),
min.toString(),
max.toString(),
minmax.min().toString(),
minmax.max().toString()),
minTimestampsMatch && maxTimestampsMatch);
return cnt;
}
/**
* Validates a compressed time column against bucket _id, bucket time span, expected count and
* min/max.
*/
void _validateCompressedTimeField(boost::intrusive_ptr<BSONElementStorage>& allocator,
const TimeseriesOptions& timeseriesOptions,
BSONElement data,
BSONElement min,
BSONElement max,
int expectedCount,
const CollatorInterface* collator) {
int len = 0;
const char* binary = data.binData(len);
BinDataType type = data.binDataType();
uassert(ErrorCodes::BadValue,
fmt::format("Invalid bucket data binData subtype. Expected 7, but got {}.", type),
type == BinDataType::Column);
size_t cnt = bsoncolumn::count(binary, len);
uassert(ErrorCodes::BadValue,
fmt::format("Incorrect column data count for field '{}'. Expected {}, but found {}.",
timeseriesOptions.getTimeField(),
expectedCount,
cnt),
expectedCount == static_cast<int>(cnt));
// Time field is always a scalar so we can use fast BSON comparison to calculate min/max.
auto minmaxElems =
bsoncolumn::minmax<bsoncolumn::BSONElementMaterializer>(binary, len, allocator, collator);
// With measurement-level deletes (deletes with non-metafield filters) it is possible
// that the earliest measurements got deleted. Since we keep the bucket's minTime
// unchanged in that case, we cannot rely on the minTime always corresponding with what
// the actual minimum measurement time is. We can, however, rely on the fact that the
// rounded time of the earliest measurement is at greater than or equal to the
// control.min time-field.
auto minTimestampsMatch =
timeseries::roundTimestampToGranularity(minmaxElems.first.Date(), timeseriesOptions) >=
timeseries::roundTimestampToGranularity(min.Date(), timeseriesOptions);
// For the maximum check, if we had measurements that were pre-1970 (the lower end of
// the extended range check), it is possible that the control.max value gets rounded up
// to the epoch and is greater than the observed maximum timestamp. In the case where
// the control.min is earlier than the epoch, we should relax the check.
auto maxTimestampsMatch = (minmaxElems.first.Date() < Date_t())
? max.Date() >= minmaxElems.second.Date()
: max.Date() == minmaxElems.second.Date();
uassert(
ErrorCodes::BadValue,
fmt::format("Mismatch between time-series control and observed min or max for field {}. "
"Control had min {} and max {}, but observed data had min {} and max {}.",
timeseriesOptions.getTimeField(),
min.toString(),
max.toString(),
minmaxElems.first.toString(),
minmaxElems.second.toString()),
minTimestampsMatch && maxTimestampsMatch);
}
/**
* Validates an uncompressed bucket data object.
*/
void _validateUncompressedBucketData(const TimeseriesOptions& timeseriesOptions,
const CollatorInterface* collator,
const StringDataMap<BSONElement>& dataFields,
const StringDataMap<BSONElement>& controlMinFields,
const StringDataMap<BSONElement>& controlMaxFields) {
auto it = dataFields.find(timeseriesOptions.getTimeField());
uassert(ErrorCodes::BadValue,
fmt::format("Field '{}' is missing from control.max", timeseriesOptions.getTimeField()),
it != dataFields.end());
BSONElement time = it->second;
it = controlMinFields.find(timeseriesOptions.getTimeField());
uassert(ErrorCodes::BadValue,
fmt::format("Field '{}' is missing from control.min", timeseriesOptions.getTimeField()),
it != controlMinFields.end());
BSONElement min = it->second;
it = controlMaxFields.find(timeseriesOptions.getTimeField());
uassert(ErrorCodes::BadValue,
fmt::format("Field '{}' is missing from control.max", timeseriesOptions.getTimeField()),
it != controlMaxFields.end());
BSONElement max = it->second;
// Validate the time column first, we use this to discover the count to validate the other
// columns with.
int count = _validateUncompressedTimeField(timeseriesOptions, time, min, max, collator);
for (auto&& data : dataFields) {
// Time field is already validated.
if (data.first == timeseriesOptions.getTimeField()) {
continue;
}
it = controlMinFields.find(data.first);
uassert(ErrorCodes::BadValue,
fmt::format("Field '{}' is missing from control.min", data.first),
it != controlMinFields.end());
BSONElement min = it->second;
it = controlMaxFields.find(data.first);
uassert(ErrorCodes::BadValue,
fmt::format("Field '{}' is missing from control.max", data.first),
it != controlMaxFields.end());
BSONElement max = it->second;
// Validate this data field.
_validateUncompressedMinMax(data.first, data.second, min, max, count, collator);
}
}
/**
* Validates an uncompressed bucket data object.
*/
void _validateCompressedBucketData(const TimeseriesOptions& timeseriesOptions,
const CollatorInterface* collator,
const int bucketVersion,
const int bucketCount,
const StringDataMap<BSONElement>& dataFields,
const StringDataMap<BSONElement>& controlMinFields,
const StringDataMap<BSONElement>& controlMaxFields) {
boost::intrusive_ptr allocator{new BSONElementStorage()};
for (auto&& data : dataFields) {
auto it = controlMinFields.find(data.first);
uassert(ErrorCodes::BadValue,
fmt::format("Field '{}' is missing from control.min", data.first),
it != controlMinFields.end());
BSONElement min = it->second;
it = controlMaxFields.find(data.first);
uassert(ErrorCodes::BadValue,
fmt::format("Field '{}' is missing from control.max", data.first),
it != controlMaxFields.end());
BSONElement max = it->second;
if (data.first == timeseriesOptions.getTimeField()) {
_validateCompressedTimeField(
allocator, timeseriesOptions, data.second, min, max, bucketCount, collator);
} else {
_validateCompressedMinMax(
allocator, data.first, data.second, min, max, bucketCount, collator);
}
}
}
} // namespace
void validateBucketConsistency(const Collection* collection, const BSONObj& bucketDoc) {
OID bucketId;
try {
// First perform some basic schema validation and extract elements to validate more
// thoroughly.
bucketId = bucketDoc[timeseries::kBucketIdFieldName].OID();
const auto& timeseriesOptions = collection->getTimeseriesOptions().value();
BSONObj control = bucketDoc[timeseries::kBucketControlFieldName].Obj();
BSONObj data = bucketDoc[timeseries::kBucketDataFieldName].Obj();
const int version = control.getIntField(timeseries::kBucketControlVersionFieldName);
BSONObj min = control[timeseries::kBucketControlMinFieldName].Obj();
BSONObj max = control[timeseries::kBucketControlMaxFieldName].Obj();
if (version != timeseries::kTimeseriesControlUncompressedVersion &&
version != timeseries::kTimeseriesControlCompressedSortedVersion &&
version != timeseries::kTimeseriesControlCompressedUnsortedVersion) {
uasserted(
ErrorCodes::BadValue,
fmt::format("Invalid value for 'control.version'. Expected 1, 2, or 3, but got {}.",
version));
}
// Perform the actual validation
validateBucketIdTimestamp(timeseriesOptions, bucketId, min);
validateBucketTimeSpan(
timeseriesOptions, collection->areTimeseriesBucketsFixed(), min, max);
validateBucketData(timeseriesOptions,
collection->getDefaultCollator(),
version,
control[timeseries::kBucketControlCountFieldName],
min,
max,
data);
} catch (DBException& ex) {
// Catch any validation error and attach extra context to be able to debug validation errors
// or remediate corrupt buckets
ex.addContext(fmt::format("Bucket _id: {}", bucketId.toString()));
// Perform logging of occurances. This is rate limited to protect against malicious use.
logExceptionRateLimited(ex);
throw;
}
}
void validateBucketIdTimestamp(const TimeseriesOptions& timeseriesOptions,
const OID& id,
const BSONObj& controlMin) {
// Ensure the time field exists
const StringData timeField = timeseriesOptions.getTimeField();
// Compares both timestamps as Dates.
auto minTimestamp = controlMin[timeField].Date();
auto oidEmbeddedTimestamp = id.asDateT();
// If this bucket contains extended-range measurements, we cannot assert that the
// minTimestamp matches the embedded timestamp.
if (minTimestamp != oidEmbeddedTimestamp &&
!timeseries::dateOutsideStandardRange(minTimestamp)) {
uasserted(ErrorCodes::BadValue,
str::stream() << "Mismatch between the embedded timestamp "
<< oidEmbeddedTimestamp.toString()
<< " in the time-series bucket '_id' field and the timestamp "
<< minTimestamp.toString() << " in 'control.min' field.");
}
}
void validateBucketTimeSpan(const TimeseriesOptions& timeseriesOptions,
bool fixedBucketingEnabled,
const BSONObj& controlMin,
const BSONObj& controlMax) {
auto minTimestamp = controlMin[timeseriesOptions.getTimeField()].Date();
auto maxTimestamp = controlMax[timeseriesOptions.getTimeField()].Date();
auto bucketMaxSpanSeconds = timeseriesOptions.getBucketMaxSpanSeconds();
if (maxTimestamp - minTimestamp >= Seconds(*bucketMaxSpanSeconds)) {
uasserted(ErrorCodes::BadValue,
str::stream() << "Time span of measurements in the bucket is too large. "
<< "The difference between control.max and control.min is "
<< (maxTimestamp - minTimestamp).toString()
<< ", but the maximum allowed span is " << bucketMaxSpanSeconds
<< " seconds.");
}
// Enforce that control.min time is aligned to the fixed bucket boundary when the
// fixed-bucketing optimization is enabled.
if (fixedBucketingEnabled) {
auto expectedMinTimestamp = roundTimestampToGranularity(minTimestamp, timeseriesOptions);
uassert(ErrorCodes::BadValue,
fmt::format("control.min.{} is not rounded to expected boundary when "
"fixed-bucketing is enabled. Expected {}, but got {}.",
timeseriesOptions.getTimeField(),
minTimestamp.toString(),
expectedMinTimestamp.toString()),
minTimestamp == expectedMinTimestamp);
}
}
void validateBucketData(const TimeseriesOptions& timeseriesOptions,
const CollatorInterface* collator,
int bucketVersion,
BSONElement controlCount,
const BSONObj& controlMin,
const BSONObj& controlMax,
const BSONObj& data) {
// Builds a hash map for the fields to avoid repeated traversals.
auto buildFieldTable = [](StringDataMap<BSONElement>& table, const BSONObj& fields) {
for (const auto& field : fields) {
uassert(ErrorCodes::BadValue,
str::stream() << "Duplicate field '" << field.fieldNameStringData()
<< "' detected in bucket.",
table.try_emplace(field.fieldNameStringData(), field).second);
}
};
StringDataMap<BSONElement> dataFields;
StringDataMap<BSONElement> controlMinFields;
StringDataMap<BSONElement> controlMaxFields;
buildFieldTable(dataFields, data);
buildFieldTable(controlMinFields, controlMin);
buildFieldTable(controlMaxFields, controlMax);
// Checks that the number of 'control.min' and 'control.max' fields agrees with number of 'data'
// fields.
if (dataFields.size() != controlMinFields.size() ||
controlMinFields.size() != controlMaxFields.size()) {
uasserted(
ErrorCodes::BadValue,
fmt::format("Mismatch between the number of time-series control fields and the number "
"of data fields. Control had {} min fields and {} max fields, but observed "
"data had {} fields.",
controlMinFields.size(),
controlMaxFields.size(),
dataFields.size()));
};
if (bucketVersion == timeseries::kTimeseriesControlUncompressedVersion) {
_validateUncompressedBucketData(
timeseriesOptions, collator, dataFields, controlMinFields, controlMaxFields);
} else {
int count = controlCount.numberInt();
uassert(ErrorCodes::BadValue,
"Unexpected control.count value, undefined integer representation",
count == controlCount.safeNumberInt());
_validateCompressedBucketData(timeseriesOptions,
collator,
bucketVersion,
count,
dataFields,
controlMinFields,
controlMaxFields);
}
}
} // namespace mongo::timeseries

View File

@ -0,0 +1,86 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/bson/bsonobj.h"
#include "mongo/db/shard_role/shard_catalog/collection.h"
#include "mongo/db/timeseries/timeseries_options.h"
MONGO_MOD_PUBLIC;
namespace mongo::timeseries {
/**
* Performs strict validation of a timeseries bucket document.
* Throws an exception on any validation failure
*
* The strict validation validates the following properties:
* - Bucket schema adheres to specification depending on bucket version.
* - Control.min.time is identical to bucket _id time for dates within normal range
* - Bucket time span is compatible with collection granularity/max span setting.
* - Rounding of control.min.time for collections with fixed-bucket optimization enabled.
* - Control.count matches count of all data columns
* - Control.min is equal to min of data column content.
* - Control.max is equal to max of data column content.
*
* Some properties that are NOT validated due to existing data and benign impact:
* - Presence of mixed schema
* - Rounding of control.min.time, for collections without fixed-bucket optimization.
* - Equality of control.max.time, for buckets with extended range dates.
* - Min/max time for dates outside of normal range.
* - v3 bucket sortedness
*/
void validateBucketConsistency(const Collection* collection, const BSONObj& bucketDoc);
/**
* TODO SERVER-122862: Use in validation command
*/
void validateBucketIdTimestamp(const TimeseriesOptions& timeseriesOptions,
const OID& id,
const BSONObj& controlMin);
/**
* TODO SERVER-122862: Use in validation command
*/
void validateBucketTimeSpan(const TimeseriesOptions& timeseriesOptions,
bool fixedBucketingEnabled,
const BSONObj& controlMin,
const BSONObj& controlMax);
/**
* TODO SERVER-122862: Use in validation command
*/
void validateBucketData(const TimeseriesOptions& timeseriesOptions,
const CollatorInterface* collator,
int bucketVersion,
BSONElement controlCount,
const BSONObj& controlMin,
const BSONObj& controlMax,
const BSONObj& data);
} // namespace mongo::timeseries

View File

@ -792,7 +792,8 @@ BSONObj removeNestedField(const BSONObj& bson, std::span<const StringData> neste
class TimeseriesCollectionValidationTest : public CatalogTestFixture {
public:
TimeseriesCollectionValidationTest(Options options = {})
: CatalogTestFixture(std::move(options)) {
: CatalogTestFixture(std::move(options)),
_allowCorruptTimeseriesBuckets("timeseriesDisableStrictBucketValidator", true) {
_nss = NamespaceString::createNamespaceString_forTest("test.system.buckets.ts");
}
@ -979,6 +980,7 @@ public:
CollectionOptions _options;
CollectionValidation::ValidateMode _validateMode{
CollectionValidation::ValidateMode::kForeground};
RAIIServerParameterControllerForTest _allowCorruptTimeseriesBuckets;
protected:
void setUp() override {