From dff2f03fc5846ad94d74b50e62fe7db975f11ddb Mon Sep 17 00:00:00 2001 From: henrikedin Date: Fri, 10 Apr 2026 08:43:00 -0400 Subject: [PATCH] SERVER-118985 Strict validation for timeseries bucket insert/update via raw API (#50631) GitOrigin-RevId: e9ebe2934fe4ae6ad49f766326b7c6653c4d86b2 --- jstests/core/timeseries/libs/timeseries.js | 14 +- .../write/timeseries_findAndModify_basic.js | 11 - .../txns/txn_ops_allowed_on_buckets_coll.js | 2 +- ...pen_uncompressed_bucket_for_compression.js | 11 + ...repair_invalid_bucket_strict_validation.js | 99 +++ .../timeseries_insert_compression_failure.js | 5 + ...timeseries_insert_decompression_failure.js | 5 + .../timeseries_partial_compressed_bucket.js | 9 +- .../timeseries_system_buckets_metrics.js | 1 + .../timeseries_buckets_oplog_update.js | 4 +- .../validate_timeseries_bucket_max_span.js | 3 + ..._timeseries_bucketing_parameters_change.js | 3 + .../validate/validate_timeseries_count.js | 9 + .../validate_timeseries_data_indexes.js | 5 + .../validate_timeseries_id_timestamp.js | 3 + .../validate/validate_timeseries_minmax.js | 5 + .../validate/validate_timeseries_version.js | 5 + .../validate_v3_buckets_are_unsorted.js | 3 + ...migration_fails_with_spurious_documents.js | 19 +- .../timeseries/timeseries_multiple_mongos.js | 8 +- src/mongo/bson/column/bsoncolumn.cpp | 10 +- .../db/shard_role/shard_catalog/BUILD.bazel | 1 + .../shard_catalog/collection_impl.cpp | 68 +- src/mongo/db/timeseries/BUILD.bazel | 13 + .../db/timeseries/bucket_catalog/BUILD.bazel | 12 +- .../bucket_catalog/bucket_catalog_test.cpp | 11 +- src/mongo/db/timeseries/timeseries.idl | 8 + .../timeseries_bucket_validation.cpp | 618 ++++++++++++++++++ .../timeseries/timeseries_bucket_validation.h | 86 +++ .../validate/collection_validation_test.cpp | 4 +- 30 files changed, 990 insertions(+), 65 deletions(-) create mode 100644 jstests/noPassthrough/timeseries/data_integrity/repair_invalid_bucket_strict_validation.js create mode 100644 src/mongo/db/timeseries/timeseries_bucket_validation.cpp create mode 100644 src/mongo/db/timeseries/timeseries_bucket_validation.h diff --git a/jstests/core/timeseries/libs/timeseries.js b/jstests/core/timeseries/libs/timeseries.js index 48b05d45e37..bbd97718393 100644 --- a/jstests/core/timeseries/libs/timeseries.js +++ b/jstests/core/timeseries/libs/timeseries.js @@ -503,9 +503,9 @@ export var TimeseriesTest = class { * Checks for log entries generated by failed document validation. * @param {DBCollection} * @param {object} record - * @param {number} errorId + * @param {number|number[]} errorIds */ - static checkForDocumentValidationFailureLog(coll, record, errorId = 6698300) { + static checkForDocumentValidationFailureLog(coll, record, errorIds = [6698300, 11634800]) { // To avoid making log checks too strict, either the buckets namespace or view namespace is acceptable in the log message. // Due to differences in EJSON format, only a subset of record data is checked in the log message. const oidStr = JSON.parse(toJsonForLog(record))._id["$oid"]; @@ -529,12 +529,14 @@ export var TimeseriesTest = class { }, }; + const errorIdList = Array.isArray(errorIds) ? errorIds : [errorIds]; assert.soon(function () { - return ( - checkLog.checkContainsWithCountJson(conn, errorId, attrsMatcherBuckets, 1, null, relaxMatch) || - checkLog.checkContainsWithCountJson(conn, errorId, attrsMatcherView, 1, null, relaxMatch) + return errorIdList.some( + (errorId) => + checkLog.checkContainsWithCountJson(conn, errorId, attrsMatcherBuckets, 1, null, relaxMatch) || + checkLog.checkContainsWithCountJson(conn, errorId, attrsMatcherView, 1, null, relaxMatch), ); - }, `Could not find log entries containing the following id: ${errorId}, and attrs: ${attrsMatcherBuckets} or attrs: ${attrsMatcherView}`); + }, `Could not find log entries containing any of the following ids: ${errorIdList}, and attrs: ${attrsMatcherBuckets} or attrs: ${attrsMatcherView}`); } }; diff --git a/jstests/core/timeseries/write/timeseries_findAndModify_basic.js b/jstests/core/timeseries/write/timeseries_findAndModify_basic.js index 2668a4201ab..f12c8ffaeeb 100644 --- a/jstests/core/timeseries/write/timeseries_findAndModify_basic.js +++ b/jstests/core/timeseries/write/timeseries_findAndModify_basic.js @@ -34,17 +34,6 @@ toInsert.forEach((doc) => coll.insertOne(doc)); assert.eq(1, coll.find({t2: {$exists: true}}).toArray().length); } -// Test findAndModify rawData update queries. -{ - const doc = getTimeseriesCollForRawOps(coll).findAndModify({ - query: {}, - update: {$set: {"control.max._id": 100}}, - ...kRawOperationSpec, - }); - assert.eq(1, getTimeseriesCollForRawOps(coll).find({"control.max._id": 100}).rawData().toArray().length); - assert.eq(numDocs - 1, coll.find({_id: {$gt: 0}}).toArray().length); -} - // Test findAndModify delete queries. { const doc = coll.findAndModify({query: {t2: {$exists: true}}, remove: true}); diff --git a/jstests/core/txns/txn_ops_allowed_on_buckets_coll.js b/jstests/core/txns/txn_ops_allowed_on_buckets_coll.js index 55a840f62d3..7b3630b437a 100644 --- a/jstests/core/txns/txn_ops_allowed_on_buckets_coll.js +++ b/jstests/core/txns/txn_ops_allowed_on_buckets_coll.js @@ -76,7 +76,7 @@ assert.commandWorked( assert.commandWorked( getTimeseriesCollForRawOps(tsColl).update( {[metaFieldName]: 65}, - {$set: {"control": bogusBucket.control, "data": bogusBucket.data}}, + {$set: {_id: incrementOID(bogusBucket._id), "control": bogusBucket.control, "data": bogusBucket.data}}, {upsert: true, ...kRawOperationSpec}, ), ); diff --git a/jstests/noPassthrough/timeseries/bucket_reopening/timeseries_reopen_uncompressed_bucket_for_compression.js b/jstests/noPassthrough/timeseries/bucket_reopening/timeseries_reopen_uncompressed_bucket_for_compression.js index 10d3bbc5f84..fab5c2bbef2 100644 --- a/jstests/noPassthrough/timeseries/bucket_reopening/timeseries_reopen_uncompressed_bucket_for_compression.js +++ b/jstests/noPassthrough/timeseries/bucket_reopening/timeseries_reopen_uncompressed_bucket_for_compression.js @@ -79,6 +79,11 @@ const runTest = function (isCorrupted = false) { if (isCorrupted) { jsTestLog("Corrupting the bucket by adding an extra data field."); + // Allow setting an inconsistent state to the bucket so we can test that validate can detect it + assert.commandWorked( + conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}), + ); + // Corrupt the uncompressed bucket by adding an extra data field to it. This // will make the bucket uncompressible. let res = assert.commandWorked( @@ -88,6 +93,12 @@ const runTest = function (isCorrupted = false) { getRawOperationSpec(db), ), ); + + // Disable allowing inconsistent state on buckets + assert.commandWorked( + conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: false}), + ); + jsTestLog(getTimeseriesCollForRawOps(db, coll).find().rawData().toArray()); assert.eq(res.modifiedCount, 1); } diff --git a/jstests/noPassthrough/timeseries/data_integrity/repair_invalid_bucket_strict_validation.js b/jstests/noPassthrough/timeseries/data_integrity/repair_invalid_bucket_strict_validation.js new file mode 100644 index 00000000000..ba02eb98bfa --- /dev/null +++ b/jstests/noPassthrough/timeseries/data_integrity/repair_invalid_bucket_strict_validation.js @@ -0,0 +1,99 @@ +/** + * Tests a repair mechanism for timeseries buckets that fail strict validation. A bucket that is rejected by + * strict validation can still be decompressed by $_internalUnpackBucket in some cases. The repair uses an aggregation + * pipeline with $_internalUnpackBucket to extract the measurements and $out to write them to a + * new valid timeseries collection. + * + * This demonstrates a repair path that customers can use to recover measurements from bucket data + * that was created before strict bucket validation was enforced. + * + * @tags: [ + * requires_timeseries, + * ] + */ + +import {TimeseriesTest} from "jstests/core/timeseries/libs/timeseries.js"; +import {getRawOperationSpec, getTimeseriesCollForRawOps} from "jstests/libs/raw_operation_utils.js"; + +const conn = MongoRunner.runMongod(); +const db = conn.getDB(jsTestName()); + +const timeField = "t"; +const metaField = "m"; + +// Create a timeseries collection and insert two measurements with known values, this gives us a valid +// bucket to base the test on. +const sourceColl = db.source; +assert.commandWorked(db.createCollection(sourceColl.getName(), {timeseries: {timeField, metaField}})); + +const measurements = [ + {[timeField]: ISODate("2024-01-16T20:48:00.000Z"), [metaField]: "sensor1", a: 10}, + {[timeField]: ISODate("2024-01-16T20:48:30.000Z"), [metaField]: "sensor1", a: 20}, +]; +assert.commandWorked(sourceColl.insertMany(measurements)); + +// Extract the compressed bucket that was created for these measurements. +const sourceBuckets = getTimeseriesCollForRawOps(db, sourceColl).find().rawData().toArray(); +assert.eq(1, sourceBuckets.length, "Expected one bucket: " + tojson(sourceBuckets)); +const validBucket = sourceBuckets[0]; + +// Build an invalid bucket by replacing the _id with an OID whose embedded timestamp does not match control.min.t. This bucket can no longer be inserted into a timeseries collection. +const wrongTimestampOID = ObjectId("63b0ec000000000000000000"); +const invalidBucket = Object.assign({}, validBucket, {_id: wrongTimestampOID}); + +const verifyResult = db.runCommand({ + insert: getTimeseriesCollForRawOps(db, sourceColl).getName(), + documents: [invalidBucket], + ...getRawOperationSpec(db), +}); +assert( + verifyResult.writeErrors && verifyResult.writeErrors.length > 0, + "Expected strict validation to reject bucket with mismatched OID timestamp: " + tojson(verifyResult), +); + +// Insert the invalid bucket into a regular (non-timeseries) collection. Regular collections +// do not run timeseries bucket validation, so the insert succeeds unconditionally. +const stagingColl = db.staging; +assert.commandWorked(stagingColl.insertOne(invalidBucket)); + +// Repair the invalid bucket using the internal timeseries aggregation pipeline: +// 1. $_internalUnpackBucket decompresses the bucket into individual measurement documents. +// 2. $out writes those measurements to a new timeseries collection, creating valid buckets +// with correct OID timestamps. +const outCollName = "repaired"; +assert.commandWorked( + db.runCommand({ + aggregate: stagingColl.getName(), + pipeline: [ + { + $_internalUnpackBucket: { + timeField, + metaField, + bucketMaxSpanSeconds: NumberInt(3600), + }, + }, + {$out: {db: db.getName(), coll: outCollName, timeseries: {timeField, metaField}}}, + ], + cursor: {}, + }), +); + +// Verify that the repaired collection contains exactly the original measurements. +const repairedColl = db[outCollName]; +const repairedDocs = repairedColl + .find() + .sort({[timeField]: 1}) + .toArray(); +assert.eq(measurements.length, repairedDocs.length, tojson(repairedDocs)); +for (let i = 0; i < measurements.length; i++) { + assert.eq(measurements[i][timeField], repairedDocs[i][timeField], tojson(repairedDocs[i])); + assert.eq(measurements[i][metaField], repairedDocs[i][metaField], tojson(repairedDocs[i])); + assert.eq(measurements[i].a, repairedDocs[i].a, tojson(repairedDocs[i])); +} + +// Confirm the repaired collection is a valid timeseries collection with no inconsistencies. +const validateResult = repairedColl.validate(); +assert(validateResult.valid, tojson(validateResult)); +assert.eq(0, validateResult.nNonCompliantDocuments, tojson(validateResult)); + +MongoRunner.stopMongod(conn); diff --git a/jstests/noPassthrough/timeseries/data_integrity/timeseries_insert_compression_failure.js b/jstests/noPassthrough/timeseries/data_integrity/timeseries_insert_compression_failure.js index e15397c120c..22334757a46 100644 --- a/jstests/noPassthrough/timeseries/data_integrity/timeseries_insert_compression_failure.js +++ b/jstests/noPassthrough/timeseries/data_integrity/timeseries_insert_compression_failure.js @@ -36,6 +36,11 @@ const runTest = function (ordered) { const timeFieldName = "t"; const metaFieldName = "m"; + // Allow setting an inconsistent state to the bucket so we can test that validate can detect it + assert.commandWorked( + conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}), + ); + assert.commandWorked( db.createCollection(collName, {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}), ); diff --git a/jstests/noPassthrough/timeseries/data_integrity/timeseries_insert_decompression_failure.js b/jstests/noPassthrough/timeseries/data_integrity/timeseries_insert_decompression_failure.js index 1560e8c512c..a36c26870f0 100644 --- a/jstests/noPassthrough/timeseries/data_integrity/timeseries_insert_decompression_failure.js +++ b/jstests/noPassthrough/timeseries/data_integrity/timeseries_insert_decompression_failure.js @@ -19,6 +19,11 @@ TimeseriesTest.run((insert) => { const db = conn.getDB(jsTestName()); const coll = db.coll; + // Allow setting an inconsistent state to the bucket so we can test that validate can detect it + assert.commandWorked( + conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}), + ); + assert.commandWorked(db.createCollection(coll.getName(), {timeseries: {timeField: "t", metaField: "m"}})); const time = ISODate("2024-01-16T20:48:39.448Z"); diff --git a/jstests/noPassthrough/timeseries/data_integrity/timeseries_partial_compressed_bucket.js b/jstests/noPassthrough/timeseries/data_integrity/timeseries_partial_compressed_bucket.js index f22844aed6e..a46bcec0930 100644 --- a/jstests/noPassthrough/timeseries/data_integrity/timeseries_partial_compressed_bucket.js +++ b/jstests/noPassthrough/timeseries/data_integrity/timeseries_partial_compressed_bucket.js @@ -45,6 +45,8 @@ let stats = assert.commandWorked(coll.stats()); assert.eq(stats.timeseries.numBucketsArchivedDueToTimeBackward, 1, tojson(stats)); jsTestLog("Add uncompressed data field to bucket, thus corrupting a compressed bucket."); +// Allow setting an inconsistent state to the bucket so we can test that validate can detect it +assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true})); let res = assert.commandWorked( getTimeseriesCollForRawOps(db, coll).updateOne( {_id: bucketId}, @@ -54,6 +56,9 @@ let res = assert.commandWorked( ); assert.eq(res.modifiedCount, 1); +// Disable allowing inconsistent state on buckets +assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: false})); + jsTestLog( "Insert third measurement. This will attempt to re-open the corrupted bucket, but should then freeze it and insert into a new bucket.", ); @@ -66,7 +71,7 @@ assert.eq(stats.timeseries.numBucketsReopened, 0, tojson(stats.timeseries)); assert.eq(stats.timeseries.numBucketsFrozen, 1, tojson(stats.timeseries)); assert.eq(stats.timeseries.numBucketQueriesFailed, 2, tojson(stats.timeseries)); TimeseriesTest.checkBucketReopeningsFailedCounters(stats.timeseries, { - numBucketReopeningsFailedDueToCompressionFailure: 1, + numBucketReopeningsFailedDueToValidator: 1, }); jsTestLog("Remove the newly created bucket, so it will not be present for the next insert."); @@ -86,7 +91,7 @@ assert.eq(stats.timeseries.numBucketsReopened, 0, tojson(stats.timeseries)); assert.eq(stats.timeseries.numBucketsFrozen, 1, tojson(stats.timeseries)); assert.eq(stats.timeseries.numBucketQueriesFailed, 2, tojson(stats.timeseries)); TimeseriesTest.checkBucketReopeningsFailedCounters(stats.timeseries, { - numBucketReopeningsFailedDueToCompressionFailure: 1, + numBucketReopeningsFailedDueToValidator: 1, numBucketReopeningsFailedDueToMarkedFrozen: 1, }); diff --git a/jstests/noPassthrough/timeseries/stats/timeseries_system_buckets_metrics.js b/jstests/noPassthrough/timeseries/stats/timeseries_system_buckets_metrics.js index 4848d313ea9..d1593d3241d 100644 --- a/jstests/noPassthrough/timeseries/stats/timeseries_system_buckets_metrics.js +++ b/jstests/noPassthrough/timeseries/stats/timeseries_system_buckets_metrics.js @@ -19,6 +19,7 @@ const timeField = "my_time"; const metaField = "my_meta"; const timeseriesRawDoc = { + "_id": ObjectId("67cc72e03dc19da48a654135"), "control": { "version": 2, "min": { diff --git a/jstests/noPassthrough/timeseries/timeseries_buckets_oplog_update.js b/jstests/noPassthrough/timeseries/timeseries_buckets_oplog_update.js index cc4f3a2d092..3c1cf5f1175 100644 --- a/jstests/noPassthrough/timeseries/timeseries_buckets_oplog_update.js +++ b/jstests/noPassthrough/timeseries/timeseries_buckets_oplog_update.js @@ -27,10 +27,10 @@ import {ReplSetTest} from "jstests/libs/replsettest.js"; assert.commandWorked(testDB.createCollection(collName, {timeseries: {timeField: "t"}})); const insertDocFull = { - "_id": ObjectId("64d3c7004c83948224c45ddf"), + "_id": ObjectId("64d3c6104c83948224c45ddf"), "control": { "version": 1, - "min": {"_id": ObjectId("64d24b52469e18af504e506e"), "t": ISODate("2023-08-09T17:04:00Z")}, + "min": {"_id": ObjectId("64d24b52469e18af504e506e"), "t": ISODate("2023-08-09T17:00:00Z")}, "max": {"_id": ObjectId("64d24b52469e18af504e506f"), "t": ISODate("2023-08-09T17:05:42.238Z")}, }, "data": { diff --git a/jstests/noPassthrough/timeseries/validate/validate_timeseries_bucket_max_span.js b/jstests/noPassthrough/timeseries/validate/validate_timeseries_bucket_max_span.js index bea72b82e43..b6fbaa6d9d6 100644 --- a/jstests/noPassthrough/timeseries/validate/validate_timeseries_bucket_max_span.js +++ b/jstests/noPassthrough/timeseries/validate/validate_timeseries_bucket_max_span.js @@ -23,6 +23,9 @@ assert(res.valid, tojson(res)); assert.eq(res.nNonCompliantDocuments, 0); assert.eq(res.errors.length, 0); +// Allow setting an inconsistent state to the bucket so we can test that validate can detect it +assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true})); + // Sets the max timestamp to outside the bucket max span. getTimeseriesCollForRawOps(db, coll).updateOne( {meta: 1}, diff --git a/jstests/noPassthrough/timeseries/validate/validate_timeseries_bucketing_parameters_change.js b/jstests/noPassthrough/timeseries/validate/validate_timeseries_bucketing_parameters_change.js index 43698dd335a..a190cbacea5 100644 --- a/jstests/noPassthrough/timeseries/validate/validate_timeseries_bucketing_parameters_change.js +++ b/jstests/noPassthrough/timeseries/validate/validate_timeseries_bucketing_parameters_change.js @@ -63,6 +63,8 @@ function validateTimeseriesBucketingParametersChangeFail( data: {value: false}, }), ); + // Disable strict bucket validation as it relies on timeseriesBucketingParametersChanged to be set accurately, which we are not doing with the above fail point. + assert.commandWorked(db.adminCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true})); // This collMod should lead to timeseriesBucketingParametersChanged to True because the original // bucketing parameters we set for our collections was different from the updated bucketing @@ -84,6 +86,7 @@ function validateTimeseriesBucketingParametersChangeFail( assert.commandWorked( db.adminCommand({"configureFailPoint": timeseriesBucketingParametersChangedInputValueName, "mode": "off"}), ); + assert.commandWorked(db.adminCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: false})); testCount += 1; collName = collNamePrefix + testCount; diff --git a/jstests/noPassthrough/timeseries/validate/validate_timeseries_count.js b/jstests/noPassthrough/timeseries/validate/validate_timeseries_count.js index 83f406fa02f..01ec79f2548 100644 --- a/jstests/noPassthrough/timeseries/validate/validate_timeseries_count.js +++ b/jstests/noPassthrough/timeseries/validate/validate_timeseries_count.js @@ -58,11 +58,20 @@ coll.insertMany( })), {ordered: false}, ); + +// Allow setting an inconsistent state to the bucket so we can test that validate can detect it +assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true})); + +// Write the incorrect control.count getTimeseriesCollForRawOps(db, coll).updateOne( {"meta.sensorId": 2, "control.version": TimeseriesTest.BucketVersion.kCompressedSorted}, {"$set": {"control.count": 10}}, getRawOperationSpec(db), ); + +// Disable allowing inconsistent state on buckets +assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: false})); + res = coll.validate(); assert(!res.valid, tojson(res)); assert.eq(res.nNonCompliantDocuments, 1); diff --git a/jstests/noPassthrough/timeseries/validate/validate_timeseries_data_indexes.js b/jstests/noPassthrough/timeseries/validate/validate_timeseries_data_indexes.js index 939ca92e5a4..0de58234d17 100644 --- a/jstests/noPassthrough/timeseries/validate/validate_timeseries_data_indexes.js +++ b/jstests/noPassthrough/timeseries/validate/validate_timeseries_data_indexes.js @@ -34,6 +34,11 @@ describe("Tests validate command checks indexes in the time-series buckets data before(function () { this.conn = MongoRunner.runMongod(); this.db = this.conn.getDB(jsTestName()); + + // Allow setting an inconsistent state to the bucket so we can test that validate can detect it + assert.commandWorked( + this.conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}), + ); }); beforeEach(function () { diff --git a/jstests/noPassthrough/timeseries/validate/validate_timeseries_id_timestamp.js b/jstests/noPassthrough/timeseries/validate/validate_timeseries_id_timestamp.js index fbaebf7a12d..c9d8ab17826 100644 --- a/jstests/noPassthrough/timeseries/validate/validate_timeseries_id_timestamp.js +++ b/jstests/noPassthrough/timeseries/validate/validate_timeseries_id_timestamp.js @@ -44,6 +44,9 @@ assert.eq(res.warnings.length, 0); // Inserts documents into another bucket but manually changes the min timestamp. Expects // warnings from validation. +// Allow setting an inconsistent state to the bucket so we can test that validate can detect it +assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true})); + testCount += 1; collName = collNamePrefix + testCount; db.getCollection(collName).drop(); diff --git a/jstests/noPassthrough/timeseries/validate/validate_timeseries_minmax.js b/jstests/noPassthrough/timeseries/validate/validate_timeseries_minmax.js index 3d70a710583..4c24a36eae9 100644 --- a/jstests/noPassthrough/timeseries/validate/validate_timeseries_minmax.js +++ b/jstests/noPassthrough/timeseries/validate/validate_timeseries_minmax.js @@ -184,6 +184,11 @@ describe("Test 'control.min' and 'control.max' match the uncompressed documents before(function () { this.conn = MongoRunner.runMongod(); this.db = this.conn.getDB(jsTestName()); + + // Allow setting an inconsistent state to the bucket so we can test that validate can detect it + assert.commandWorked( + this.conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}), + ); }); it("Updates 'control' min temperature with corresponding update in recorded temperature, testing that valid updates do not return warnings.", function () { diff --git a/jstests/noPassthrough/timeseries/validate/validate_timeseries_version.js b/jstests/noPassthrough/timeseries/validate/validate_timeseries_version.js index 3c9bd4d3777..e84f2e42a63 100644 --- a/jstests/noPassthrough/timeseries/validate/validate_timeseries_version.js +++ b/jstests/noPassthrough/timeseries/validate/validate_timeseries_version.js @@ -21,6 +21,11 @@ describe("Validate Timeseries version Tests", function () { this.conn = MongoRunner.runMongod(); this.db = this.conn.getDB(dbName); this.db.getCollection(collName).drop(); + + // Allow setting an inconsistent state to the bucket so we can test that validate can detect it + assert.commandWorked( + this.conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true}), + ); }); beforeEach(function () { diff --git a/jstests/noPassthrough/timeseries/validate/validate_v3_buckets_are_unsorted.js b/jstests/noPassthrough/timeseries/validate/validate_v3_buckets_are_unsorted.js index 5114900cb3d..23876765eb4 100644 --- a/jstests/noPassthrough/timeseries/validate/validate_v3_buckets_are_unsorted.js +++ b/jstests/noPassthrough/timeseries/validate/validate_v3_buckets_are_unsorted.js @@ -49,6 +49,9 @@ assert(res.valid); // Compressed bucket with the compressed time field in-order and version set to 3. This should fail, // since this bucket's measurements are in-order on time field, meaning this bucket shouldn't have // been promoted to v3. +// Allow setting an inconsistent state to the bucket so we can test that validate can detect it +assert.commandWorked(conn.getDB("admin").runCommand({setParameter: 1, timeseriesDisableStrictBucketValidator: true})); + const invalidVersion3Doc = { _id: ObjectId("65a6eb806ffc9fa4280ecac4"), control: { diff --git a/jstests/sharding/migration_fails_with_spurious_documents.js b/jstests/sharding/migration_fails_with_spurious_documents.js index 226d9c00f99..42f3fed64ab 100644 --- a/jstests/sharding/migration_fails_with_spurious_documents.js +++ b/jstests/sharding/migration_fails_with_spurious_documents.js @@ -309,6 +309,11 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st. jsTest.log.info("Time series collection successfully sharded with key: " + tojson(shardedCollections[0].key)); + // Use a fixed base time so the test is deterministic. + const baseTime = new Date("2026-03-27T17:30:21.332Z"); + const spuriousTimestamp = new Date(baseTime.getTime() + 3000); + const spuriousBucketMinTimestamp = new Date("2026-03-27T00:00:00.000Z"); + // Split the collection at {sensor_id: "sensor_100", timestamp: MinKey}. assert.commandWorked( admin.runCommand({ @@ -321,7 +326,7 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st. assert.commandWorked( admin.runCommand({ moveChunk: tsCollDDLOpsFullName, - find: {meta: "sensor_200", "control.min.timestamp": new Date()}, + find: {meta: "sensor_200", "control.min.timestamp": baseTime}, to: st.shard1.shardName, _waitForDelete: true, }), @@ -330,7 +335,6 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st. // Verify initial chunk distribution for time series collection. // Insert some legitimate time series data through mongos. - const baseTime = new Date(); assert.commandWorked( tsColl.insert({ sensor_id: "sensor_050", @@ -356,21 +360,22 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st. const shard1TsColl = shard1DB.getCollection(tsColl.getName()); // Create a proper bucket document that would map to sensor_025 (which should be in first chunk) - const spuriousTimestamp = new Date(baseTime.getTime() + 3000); assert.commandWorked( getTimeseriesCollForRawOps(shard1DB, shard1TsColl).insert( { - _id: ObjectId(), + _id: ObjectId("69c5c8800000000000000000"), meta: "sensor_025", // This corresponds to sensor_id metaField control: { version: 1, min: { - timestamp: spuriousTimestamp, + timestamp: spuriousBucketMinTimestamp, temperature: 20.0, + data: "spurious_ts_document", }, max: { timestamp: spuriousTimestamp, temperature: 20.0, + data: "spurious_ts_document", }, closed: false, }, @@ -398,7 +403,7 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st. let tsMoveResult = admin.runCommand({ moveChunk: tsCollDDLOpsFullName, - find: {meta: "sensor_050", "control.min.timestamp": new Date()}, // This finds the first chunk + find: {meta: "sensor_050", "control.min.timestamp": baseTime}, // This finds the first chunk to: st.shard1.shardName, _waitForDelete: true, }); @@ -431,7 +436,7 @@ assert.commandWorked(admin.runCommand({enableSharding: "test", primaryShard: st. // Now the migration should succeed. tsMoveResult = admin.runCommand({ moveChunk: tsCollDDLOpsFullName, - find: {meta: "sensor_050", "control.min.timestamp": new Date()}, + find: {meta: "sensor_050", "control.min.timestamp": baseTime}, to: st.shard1.shardName, _waitForDelete: true, }); diff --git a/jstests/sharding/timeseries/timeseries_multiple_mongos.js b/jstests/sharding/timeseries/timeseries_multiple_mongos.js index 0f50904031a..be67e537a0f 100644 --- a/jstests/sharding/timeseries/timeseries_multiple_mongos.js +++ b/jstests/sharding/timeseries/timeseries_multiple_mongos.js @@ -18,7 +18,9 @@ const dbName = "testDB"; const collName = "testColl"; const timeField = "time"; const metaField = "hostid"; -const testTimestamp = ISODate(); +const testTimestamp = ISODate("2023-08-09T17:05:42.238Z"); +const testRoundedMinTimestamp = ISODate("2023-08-09T17:00:00Z"); +const testBucketId = ObjectId("64d3c6104c83948224c45ddf"); // Connections. const st = new ShardingTest({mongos: 2, shards: 2, rs: {nodes: 2}}); @@ -165,9 +167,9 @@ runTest({ insert: getTimeseriesCollForRawOps(mongos0, collName), documents: [ { - _id: ObjectId(), + _id: testBucketId, control: { - min: {time: testTimestamp}, + min: {time: testRoundedMinTimestamp}, max: {time: testTimestamp}, version: TimeseriesTest.BucketVersion.kUncompressed, }, diff --git a/src/mongo/bson/column/bsoncolumn.cpp b/src/mongo/bson/column/bsoncolumn.cpp index 123fce2360a..8e2071bc0fb 100644 --- a/src/mongo/bson/column/bsoncolumn.cpp +++ b/src/mongo/bson/column/bsoncolumn.cpp @@ -732,7 +732,7 @@ BSONColumn::BSONColumn(const char* buffer, size_t size) } BSONColumn::BSONColumn(BSONElement bin) { - tassert(5857700, + uassert(ErrorCodes::InvalidBSONColumn, "Invalid BSON type for column", bin.type() == BSONType::binData && bin.binDataType() == BinDataType::Column); @@ -743,7 +743,9 @@ BSONColumn::BSONColumn(BSONElement bin) { BSONColumn::BSONColumn(BSONBinData bin) : BSONColumn(static_cast(bin.data), bin.length) { - tassert(6179300, "Invalid BSON type for column", bin.type == BinDataType::Column); + uassert(ErrorCodes::InvalidBSONColumn, + "Invalid BSON type for column", + bin.type == BinDataType::Column); } void BSONColumn::_initialValidate() { @@ -828,7 +830,9 @@ BSONColumnBlockBased::BSONColumnBlockBased(const char* buffer, size_t size) BSONColumnBlockBased::BSONColumnBlockBased(BSONBinData bin) : BSONColumnBlockBased(static_cast(bin.data), bin.length) { - tassert(8471202, "Invalid BSON type for column", bin.type == BinDataType::Column); + uassert(ErrorCodes::InvalidBSONColumn, + "Invalid BSON type for column", + bin.type == BinDataType::Column); } BSONElement BSONColumnBlockBased::sum() const { diff --git a/src/mongo/db/shard_role/shard_catalog/BUILD.bazel b/src/mongo/db/shard_role/shard_catalog/BUILD.bazel index 172769a58c3..66dd199ed57 100644 --- a/src/mongo/db/shard_role/shard_catalog/BUILD.bazel +++ b/src/mongo/db/shard_role/shard_catalog/BUILD.bazel @@ -363,6 +363,7 @@ mongo_cc_library( "//src/mongo/db/storage:storage_engine_impl", "//src/mongo/db/storage:storage_options", "//src/mongo/db/storage/key_string", + "//src/mongo/db/timeseries:timeseries_bucket_validation", "//src/mongo/db/timeseries:timeseries_conversion_util", "//src/mongo/db/timeseries:timeseries_extended_range", "//src/mongo/db/timeseries:viewless_timeseries_collection_creation_helpers", diff --git a/src/mongo/db/shard_role/shard_catalog/collection_impl.cpp b/src/mongo/db/shard_role/shard_catalog/collection_impl.cpp index a3c3a64cdd2..96a11c8d1b4 100644 --- a/src/mongo/db/shard_role/shard_catalog/collection_impl.cpp +++ b/src/mongo/db/shard_role/shard_catalog/collection_impl.cpp @@ -99,6 +99,7 @@ #include "mongo/db/storage/oplog_truncate_markers.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_parameters_gen.h" +#include "mongo/db/timeseries/timeseries_bucket_validation.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_extended_range.h" #include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" @@ -599,32 +600,51 @@ std::pair CollectionImpl::checkValid return {SchemaValidationResult::kError, status}; } - try { - if (exec::matcher::matchesBSON(validatorMatchExpr, document)) - return {SchemaValidationResult::kPass, Status::OK()}; - } catch (DBException&) { - }; - - BSONObj generatedError = doc_validation_error::generateError(*validatorMatchExpr, document); - - static constexpr auto kValidationFailureErrorStr = "Document failed validation"_sd; - status = Status(doc_validation_error::DocumentValidationFailureInfo(generatedError), - kValidationFailureErrorStr); - - switch (validationActionOrDefault(_metadata->options.validationAction)) { - case ValidationActionEnum::warn: - if (validationLevelOrDefault(_metadata->options.validationLevel) == - ValidationLevelEnum::constraint) { - // Warn is prohibited for constraint validationLevel. - return {SchemaValidationResult::kError, status}; + // Regular schema validation for everything except timeseries collections which uses a stricter + // validation for bucket documents. + if (!isTimeseriesCollection() || gTimeseriesDisableStrictBucketValidator.load()) { + try { + if (exec::matcher::matchesBSON(validatorMatchExpr, document)) { + return {SchemaValidationResult::kPass, Status::OK()}; } - return {SchemaValidationResult::kWarn, status}; - case ValidationActionEnum::error: - return {SchemaValidationResult::kError, status}; - case ValidationActionEnum::errorAndLog: - return {SchemaValidationResult::kErrorAndLog, status}; + } catch (DBException&) { + }; + + BSONObj generatedError = doc_validation_error::generateError(*validatorMatchExpr, document); + + static constexpr auto kValidationFailureErrorStr = "Document failed validation"_sd; + status = Status(doc_validation_error::DocumentValidationFailureInfo(generatedError), + kValidationFailureErrorStr); + + switch (validationActionOrDefault(_metadata->options.validationAction)) { + case ValidationActionEnum::warn: + if (validationLevelOrDefault(_metadata->options.validationLevel) == + ValidationLevelEnum::constraint) { + // Warn is prohibited for constraint validationLevel. + return {SchemaValidationResult::kError, status}; + } + return {SchemaValidationResult::kWarn, status}; + case ValidationActionEnum::error: + return {SchemaValidationResult::kError, status}; + case ValidationActionEnum::errorAndLog: + return {SchemaValidationResult::kErrorAndLog, status}; + } + MONGO_UNREACHABLE_TASSERT(7488702); } - MONGO_UNREACHABLE_TASSERT(7488702); + + // Strict validation for bucket documents in timeseries collections + try { + timeseries::validateBucketConsistency(this, document); + return {SchemaValidationResult::kPass, Status::OK()}; + } catch (DBException& ex) { + // For strict timeseries validation we ensure that we only return kError or kErrorAndLog. + return {validationActionOrDefault(_metadata->options.validationAction) == + ValidationActionEnum::errorAndLog + ? SchemaValidationResult::kErrorAndLog + : SchemaValidationResult::kError, + Status(doc_validation_error::DocumentValidationFailureInfo(document), + ex.toStatus().toString())}; + }; } Status CollectionImpl::checkValidationAndParseResult(OperationContext* opCtx, diff --git a/src/mongo/db/timeseries/BUILD.bazel b/src/mongo/db/timeseries/BUILD.bazel index c5ccec68a4f..a65f22592d5 100644 --- a/src/mongo/db/timeseries/BUILD.bazel +++ b/src/mongo/db/timeseries/BUILD.bazel @@ -77,6 +77,19 @@ mongo_cc_library( ], ) +mongo_cc_library( + name = "timeseries_bucket_validation", + srcs = [ + "timeseries_bucket_validation.cpp", + ], + deps = [ + ":timeseries_extended_range", + ":timeseries_options", + "//src/mongo/db:server_base", + "//src/mongo/db/timeseries/bucket_catalog:bucket_catalog_utils", + ], +) + mongo_cc_library( name = "timeseries_test_fixture", srcs = [ diff --git a/src/mongo/db/timeseries/bucket_catalog/BUILD.bazel b/src/mongo/db/timeseries/bucket_catalog/BUILD.bazel index 02501a18abe..f1308d4b924 100644 --- a/src/mongo/db/timeseries/bucket_catalog/BUILD.bazel +++ b/src/mongo/db/timeseries/bucket_catalog/BUILD.bazel @@ -21,7 +21,6 @@ mongo_cc_library( "bucket_metadata.cpp", "bucket_state_registry.cpp", "execution_stats.cpp", - "flat_bson.cpp", "global_bucket_catalog.cpp", "measurement_map.cpp", "reopening.cpp", @@ -29,6 +28,7 @@ mongo_cc_library( "write_batch.cpp", ], deps = [ + ":bucket_catalog_utils", "//src/mongo/bson/column", "//src/mongo/db:dbdirectclient", "//src/mongo/db:record_id_helpers", @@ -43,6 +43,16 @@ mongo_cc_library( ], ) +mongo_cc_library( + name = "bucket_catalog_utils", + srcs = [ + "flat_bson.cpp", + ], + deps = [ + "//src/mongo/db:server_base", + ], +) + mongo_cc_unit_test( name = "db_bucket_catalog_test", srcs = [ diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp index 39ac7e44601..546cf875acd 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp @@ -1801,6 +1801,9 @@ TEST_F(BucketCatalogTest, SchemaChanges) { } TEST_F(BucketCatalogTest, ReopenMalformedBucket) { + RAIIServerParameterControllerForTest allowCorruptBucket( + "timeseriesDisableStrictBucketValidator", true); + BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, "control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1}, @@ -1925,7 +1928,7 @@ TEST_F(BucketCatalogTest, ReopenMalformedBucket) { TEST_F(BucketCatalogTest, ReopenMixedSchemaDataBucket) { BSONObj bucketDoc = ::mongo::fromjson( - R"({"_id":{"$oid":"02091c2c050b7495eaef4581"}, + R"({"_id":{"$oid":"63091ca4050b7495eaef4581"}, "control":{"version":1, "min":{"_id":{"$oid":"63091c30138e9261fd70a903"}, "time":{"$date":"2022-08-26T19:19:00Z"}, @@ -2193,7 +2196,7 @@ TEST_F(BucketCatalogTest, RehydrateMalformedBucket) { TEST_F(BucketCatalogTest, RehydrateMixedSchemaDataBucket) { BSONObj bucketDoc = ::mongo::fromjson( - R"({"_id":{"$oid":"02091c2c050b7495eaef4581"}, + R"({"_id":{"$oid":"63091ca4050b7495eaef4581"}, "control":{"version":1, "min":{"_id":{"$oid":"63091c30138e9261fd70a903"}, "time":{"$date":"2022-08-26T19:19:00Z"}, @@ -2377,7 +2380,7 @@ TEST_F(BucketCatalogTest, ReopeningFailedDueToMinMaxCalculation) { } using BucketCatalogTestDeathTest = BucketCatalogTest; -DEATH_TEST_F(BucketCatalogTestDeathTest, ReopeningFailedDueToCompression, "invariant") { +TEST_F(BucketCatalogTest, ReopeningFailedDueToCompression) { BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, "control":{"version":2,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1}, @@ -2389,7 +2392,7 @@ DEATH_TEST_F(BucketCatalogTestDeathTest, ReopeningFailedDueToCompression, "invar AutoGetCollection autoColl(_opCtx, _resolveTimeseriesNss(_ns1), MODE_IX); BSONObj compressedBucketDoc = _getCompressedBucketDoc(bucketDoc); - std::ignore = _reopenBucket(*autoColl, bucketDoc); + ASSERT_NOT_OK(_reopenBucket(*autoColl, bucketDoc)); } TEST_F(BucketCatalogTest, ArchivingAndClosingUnderSideBucketCatalogMemoryPressure) { diff --git a/src/mongo/db/timeseries/timeseries.idl b/src/mongo/db/timeseries/timeseries.idl index f091884b98f..bdb7566e028 100644 --- a/src/mongo/db/timeseries/timeseries.idl +++ b/src/mongo/db/timeseries/timeseries.idl @@ -180,6 +180,14 @@ server_parameters: validator: {gte: 1} redact: false + "timeseriesDisableStrictBucketValidator": + description: "Disable the strict bucket validator for timeseries collection and use the legacy schema validator instead." + set_at: [startup, runtime] + cpp_vartype: "AtomicWord" + cpp_varname: "gTimeseriesDisableStrictBucketValidator" + default: false + redact: false + enums: BucketGranularity: description: diff --git a/src/mongo/db/timeseries/timeseries_bucket_validation.cpp b/src/mongo/db/timeseries/timeseries_bucket_validation.cpp new file mode 100644 index 00000000000..827a337f4f6 --- /dev/null +++ b/src/mongo/db/timeseries/timeseries_bucket_validation.cpp @@ -0,0 +1,618 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/timeseries/timeseries_bucket_validation.h" + +#include "mongo/bson/column/bsoncolumn_expressions.h" +#include "mongo/bson/column/bsoncolumn_helpers.h" +#include "mongo/db/timeseries/bucket_catalog/flat_bson.h" +#include "mongo/db/timeseries/timeseries_constants.h" +#include "mongo/db/timeseries/timeseries_extended_range.h" + +#include + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + +namespace mongo::timeseries { +namespace { +/** + * Performs a rate limited log of an exception. Maximum 10 logs every 10 seconds are allowed. + */ +void logExceptionRateLimited(const DBException& ex) { + // Atomics to implement lockless log rate limiting + static AtomicWord lastLogTime{std::numeric_limits::min()}; + static AtomicWord numErrorsSinceAdvanceLogTime{0}; + static AtomicWord numErrorsTotal{0}; + + // Keep track on how many times we've hit this in total. + auto total = numErrorsTotal.addAndFetch(1); + + // Perform a logging every 10 seconds + auto now = Date_t::now(); + if (now > Date_t::fromMillisSinceEpoch(lastLogTime.load()) + Seconds(10)) { + // Update number of logs we've done and check if we should advance the time + int64_t numErrorsSinceTimeUpdate = numErrorsSinceAdvanceLogTime.addAndFetch(1); + if (numErrorsSinceTimeUpdate > 10) { + lastLogTime.store(now.toMillisSinceEpoch()); + } + + // Perform log. This is internally serialized so important that we have a + // backoff. + LOGV2_WARNING(11898500, + "Strict timeseries bucket validation failed", + "total"_attr = total, + "error"_attr = ex); + } +} + +/** + * Attempts to parse the field name to integer. + */ +int _idxInt(StringData idx) { + int value = INT_MIN; + auto [ptr, ec] = std::from_chars(idx.data(), idx.data() + idx.size(), value); + // Ensure that the parsing consume the entire buffer + if (ec != std::errc{} || ptr != idx.data() + idx.size()) { + return INT_MIN; + } + return value; +} + +/** + * Validates an uncompressed column against expected min/max. + */ +void _validateUncompressedMinMax(StringData fieldName, + BSONElement data, + BSONElement min, + BSONElement max, + int maxCount, + const CollatorInterface* collator) { + // The MinMax type calculates min/max for both scalars and nested objects where the results + // needs to be a merged element-wise min/max. + tracking::Context trackingContext; + timeseries::bucket_catalog::MinMax minmax{trackingContext}; + + // Checks that indices are in increasing order and within the correct range. + int prevIdx = INT_MIN; + for (const auto& metric : data.Obj()) { + auto idx = _idxInt(metric.fieldNameStringData()); + + uassert(ErrorCodes::BadValue, + fmt::format("The index '{}' in time-series bucket data field '{}' is " + "not in increasing order", + metric.fieldNameStringData(), + fieldName), + idx > prevIdx); + + uassert(ErrorCodes::BadValue, + fmt::format("The index '{}' in time-series bucket data field '{}' is " + "out of range", + metric.fieldNameStringData(), + fieldName), + idx <= maxCount); + + uassert(ErrorCodes::BadValue, + fmt::format("The index '{}' in time-series bucket data field '{}' is " + "negative or non-numerical", + metric.fieldNameStringData(), + fieldName), + idx >= 0); + + minmax.update(metric.wrap(fieldName), boost::none, collator); + prevIdx = idx; + } + + uassert(ErrorCodes::BadValue, + fmt::format("Incorrect column data min for field '{}'. Data min '{}' is different " + "than control.min '{}'.", + fieldName, + minmax.min().toString(), + min.wrap().toString()), + minmax.min().woCompare(min.wrap(), + /*ordering=*/BSONObj(), + BSONObj::ComparisonRules::kConsiderFieldName | + BSONObj::ComparisonRules::kIgnoreFieldOrder, + collator) == 0); + + uassert(ErrorCodes::BadValue, + fmt::format("Incorrect column data max for field '{}'. Data min '{}' is different " + "than control.max '{}'.", + fieldName, + minmax.max().toString(), + max.wrap().toString()), + minmax.max().woCompare(max.wrap(), + /*ordering=*/BSONObj(), + BSONObj::ComparisonRules::kConsiderFieldName | + BSONObj::ComparisonRules::kIgnoreFieldOrder, + collator) == 0); +} + +/** + * Validates a compressed column against expected count and min/max. + */ +void _validateCompressedMinMax(boost::intrusive_ptr& allocator, + StringData fieldName, + BSONElement data, + BSONElement min, + BSONElement max, + int expectedCount, + const CollatorInterface* collator) { + uassert(ErrorCodes::BadValue, + fmt::format("Invalid bucket data type. Expected binData, but got {}.", data.type()), + data.type() == BSONType::binData); + + int len = 0; + const char* binary = data.binData(len); + BinDataType type = data.binDataType(); + uassert(ErrorCodes::BadValue, + fmt::format("Invalid bucket data binData subtype. Expected 7, but got {}.", type), + type == BinDataType::Column); + + // All columns should have the count as stored in the control object. + size_t cnt = bsoncolumn::count(binary, len); + uassert(ErrorCodes::BadValue, + fmt::format("Incorrect column data count for field '{}'. Expected {}, but found {}.", + fieldName, + expectedCount, + cnt), + expectedCount == static_cast(cnt)); + + // Scalar types can use a basic BSON ordering to calculate min/max. However objects/arrays + // stores element-wise min/max in the control block where the data is merged from the entire + // column content. + if (min.type() == BSONType::object || min.type() == BSONType::array || + max.type() == BSONType::object || max.type() == BSONType::array) { + tracking::Context trackingContext; + timeseries::bucket_catalog::MinMax minmax{trackingContext}; + + // Decompress the column and calculate element-wise merged min/max for this column. + for (auto&& elem : BSONColumn(binary, len)) { + if (!elem.eoo()) { + minmax.update(elem.wrap(fieldName), boost::none, collator); + } + } + + uassert(ErrorCodes::BadValue, + fmt::format("Incorrect column data min for field '{}'. Data min '{}' is different " + "than control.min '{}'.", + fieldName, + minmax.min().toString(), + min.wrap().toString()), + minmax.min().woCompare(min.wrap(), + /*ordering=*/BSONObj(), + BSONObj::ComparisonRules::kConsiderFieldName | + BSONObj::ComparisonRules::kIgnoreFieldOrder, + collator) == 0); + + uassert(ErrorCodes::BadValue, + fmt::format("Incorrect column data max for field '{}'. Data min '{}' is different " + "than control.max '{}'.", + fieldName, + minmax.max().toString(), + max.wrap().toString()), + minmax.max().woCompare(max.wrap(), + /*ordering=*/BSONObj(), + BSONObj::ComparisonRules::kConsiderFieldName | + BSONObj::ComparisonRules::kIgnoreFieldOrder, + collator) == 0); + + } else { + // Scalar types can use a fast-path to calculate min/max from the compressed column directly + // without materializing the entire content. + auto minmaxElems = bsoncolumn::minmax( + binary, len, allocator, collator); + + uassert(ErrorCodes::BadValue, + fmt::format("Incorrect column data min for field '{}'. Data min '{}' is different " + "than control.min '{}'.", + fieldName, + minmaxElems.first.toString(), + min.toString()), + minmaxElems.first.woCompare( + min, BSONObj::ComparisonRules::kIgnoreFieldOrder, collator) == 0); + + uassert(ErrorCodes::BadValue, + fmt::format("Incorrect column data max for field '{}'. Data min '{}' is different " + "than control.max '{}'.", + fieldName, + minmaxElems.second.toString(), + max.toString()), + minmaxElems.second.woCompare( + max, BSONObj::ComparisonRules::kIgnoreFieldOrder, collator) == 0); + } +} + +/** + * Validates an uncompressed time column against bucket _id, bucket time span and expected min/max. + * Returns the element count. + */ +int _validateUncompressedTimeField(const TimeseriesOptions& timeseriesOptions, + BSONElement data, + BSONElement min, + BSONElement max, + const CollatorInterface* collator) { + tracking::Context trackingContext; + timeseries::bucket_catalog::MinMax minmax{trackingContext}; + + int cnt = 0; + for (const auto& metric : data.Obj()) { + // Checks that indices are consecutively increasing numbers starting from 0. + auto idx = _idxInt(metric.fieldNameStringData()); + + uassert(ErrorCodes::BadValue, + fmt::format("Time-series time field '{}' is not in consecutively increasing order. " + "Got index '{}' when '{}' is expected.", + metric.fieldNameStringData(), + idx, + cnt), + idx == cnt); + + minmax.update(metric.wrap(timeseriesOptions.getTimeField()), boost::none, collator); + ++cnt; + } + + // With measurement-level deletes (deletes with non-metafield filters) it is possible + // that the earliest measurements got deleted. Since we keep the bucket's minTime + // unchanged in that case, we cannot rely on the minTime always corresponding with what + // the actual minimum measurement time is. We can, however, rely on the fact that the + // rounded time of the earliest measurement is at greater than or equal to the + // control.min time-field. + auto minTimestampsMatch = + timeseries::roundTimestampToGranularity( + minmax.min().getField(timeseriesOptions.getTimeField()).Date(), timeseriesOptions) >= + timeseries::roundTimestampToGranularity(min.Date(), timeseriesOptions); + // For the maximum check, if we had measurements that were pre-1970 (the lower end of + // the extended range check), it is possible that the control.max value gets rounded up + // to the epoch and is greater than the observed maximum timestamp. In the case where + // the control.min is earlier than the epoch, we should relax the check. + auto maxTimestampsMatch = + (minmax.min().getField(timeseriesOptions.getTimeField()).Date() < Date_t()) + ? max.Date() >= minmax.max().getField(timeseriesOptions.getTimeField()).Date() + : max.Date() == minmax.max().getField(timeseriesOptions.getTimeField()).Date(); + + uassert( + ErrorCodes::BadValue, + fmt::format("Mismatch between time-series control and observed min or max for field {}. " + "Control had min {} and max {}, but observed data had min {} and max {}.", + timeseriesOptions.getTimeField(), + min.toString(), + max.toString(), + minmax.min().toString(), + minmax.max().toString()), + minTimestampsMatch && maxTimestampsMatch); + + return cnt; +} + +/** + * Validates a compressed time column against bucket _id, bucket time span, expected count and + * min/max. + */ +void _validateCompressedTimeField(boost::intrusive_ptr& allocator, + const TimeseriesOptions& timeseriesOptions, + BSONElement data, + BSONElement min, + BSONElement max, + int expectedCount, + const CollatorInterface* collator) { + int len = 0; + const char* binary = data.binData(len); + BinDataType type = data.binDataType(); + uassert(ErrorCodes::BadValue, + fmt::format("Invalid bucket data binData subtype. Expected 7, but got {}.", type), + type == BinDataType::Column); + + size_t cnt = bsoncolumn::count(binary, len); + uassert(ErrorCodes::BadValue, + fmt::format("Incorrect column data count for field '{}'. Expected {}, but found {}.", + timeseriesOptions.getTimeField(), + expectedCount, + cnt), + expectedCount == static_cast(cnt)); + + // Time field is always a scalar so we can use fast BSON comparison to calculate min/max. + auto minmaxElems = + bsoncolumn::minmax(binary, len, allocator, collator); + + // With measurement-level deletes (deletes with non-metafield filters) it is possible + // that the earliest measurements got deleted. Since we keep the bucket's minTime + // unchanged in that case, we cannot rely on the minTime always corresponding with what + // the actual minimum measurement time is. We can, however, rely on the fact that the + // rounded time of the earliest measurement is at greater than or equal to the + // control.min time-field. + auto minTimestampsMatch = + timeseries::roundTimestampToGranularity(minmaxElems.first.Date(), timeseriesOptions) >= + timeseries::roundTimestampToGranularity(min.Date(), timeseriesOptions); + // For the maximum check, if we had measurements that were pre-1970 (the lower end of + // the extended range check), it is possible that the control.max value gets rounded up + // to the epoch and is greater than the observed maximum timestamp. In the case where + // the control.min is earlier than the epoch, we should relax the check. + auto maxTimestampsMatch = (minmaxElems.first.Date() < Date_t()) + ? max.Date() >= minmaxElems.second.Date() + : max.Date() == minmaxElems.second.Date(); + + uassert( + ErrorCodes::BadValue, + fmt::format("Mismatch between time-series control and observed min or max for field {}. " + "Control had min {} and max {}, but observed data had min {} and max {}.", + timeseriesOptions.getTimeField(), + min.toString(), + max.toString(), + minmaxElems.first.toString(), + minmaxElems.second.toString()), + minTimestampsMatch && maxTimestampsMatch); +} + +/** + * Validates an uncompressed bucket data object. + */ +void _validateUncompressedBucketData(const TimeseriesOptions& timeseriesOptions, + const CollatorInterface* collator, + const StringDataMap& dataFields, + const StringDataMap& controlMinFields, + const StringDataMap& controlMaxFields) { + auto it = dataFields.find(timeseriesOptions.getTimeField()); + uassert(ErrorCodes::BadValue, + fmt::format("Field '{}' is missing from control.max", timeseriesOptions.getTimeField()), + it != dataFields.end()); + BSONElement time = it->second; + + it = controlMinFields.find(timeseriesOptions.getTimeField()); + uassert(ErrorCodes::BadValue, + fmt::format("Field '{}' is missing from control.min", timeseriesOptions.getTimeField()), + it != controlMinFields.end()); + BSONElement min = it->second; + + it = controlMaxFields.find(timeseriesOptions.getTimeField()); + uassert(ErrorCodes::BadValue, + fmt::format("Field '{}' is missing from control.max", timeseriesOptions.getTimeField()), + it != controlMaxFields.end()); + BSONElement max = it->second; + + // Validate the time column first, we use this to discover the count to validate the other + // columns with. + int count = _validateUncompressedTimeField(timeseriesOptions, time, min, max, collator); + + for (auto&& data : dataFields) { + // Time field is already validated. + if (data.first == timeseriesOptions.getTimeField()) { + continue; + } + + it = controlMinFields.find(data.first); + uassert(ErrorCodes::BadValue, + fmt::format("Field '{}' is missing from control.min", data.first), + it != controlMinFields.end()); + BSONElement min = it->second; + + it = controlMaxFields.find(data.first); + uassert(ErrorCodes::BadValue, + fmt::format("Field '{}' is missing from control.max", data.first), + it != controlMaxFields.end()); + BSONElement max = it->second; + + // Validate this data field. + _validateUncompressedMinMax(data.first, data.second, min, max, count, collator); + } +} + +/** + * Validates an uncompressed bucket data object. + */ +void _validateCompressedBucketData(const TimeseriesOptions& timeseriesOptions, + const CollatorInterface* collator, + const int bucketVersion, + const int bucketCount, + const StringDataMap& dataFields, + const StringDataMap& controlMinFields, + const StringDataMap& controlMaxFields) { + boost::intrusive_ptr allocator{new BSONElementStorage()}; + for (auto&& data : dataFields) { + auto it = controlMinFields.find(data.first); + uassert(ErrorCodes::BadValue, + fmt::format("Field '{}' is missing from control.min", data.first), + it != controlMinFields.end()); + BSONElement min = it->second; + + it = controlMaxFields.find(data.first); + uassert(ErrorCodes::BadValue, + fmt::format("Field '{}' is missing from control.max", data.first), + it != controlMaxFields.end()); + BSONElement max = it->second; + + if (data.first == timeseriesOptions.getTimeField()) { + _validateCompressedTimeField( + allocator, timeseriesOptions, data.second, min, max, bucketCount, collator); + } else { + _validateCompressedMinMax( + allocator, data.first, data.second, min, max, bucketCount, collator); + } + } +} + +} // namespace + + +void validateBucketConsistency(const Collection* collection, const BSONObj& bucketDoc) { + OID bucketId; + try { + // First perform some basic schema validation and extract elements to validate more + // thoroughly. + bucketId = bucketDoc[timeseries::kBucketIdFieldName].OID(); + + const auto& timeseriesOptions = collection->getTimeseriesOptions().value(); + + BSONObj control = bucketDoc[timeseries::kBucketControlFieldName].Obj(); + BSONObj data = bucketDoc[timeseries::kBucketDataFieldName].Obj(); + + const int version = control.getIntField(timeseries::kBucketControlVersionFieldName); + BSONObj min = control[timeseries::kBucketControlMinFieldName].Obj(); + BSONObj max = control[timeseries::kBucketControlMaxFieldName].Obj(); + + if (version != timeseries::kTimeseriesControlUncompressedVersion && + version != timeseries::kTimeseriesControlCompressedSortedVersion && + version != timeseries::kTimeseriesControlCompressedUnsortedVersion) { + uasserted( + ErrorCodes::BadValue, + fmt::format("Invalid value for 'control.version'. Expected 1, 2, or 3, but got {}.", + version)); + } + + // Perform the actual validation + validateBucketIdTimestamp(timeseriesOptions, bucketId, min); + + validateBucketTimeSpan( + timeseriesOptions, collection->areTimeseriesBucketsFixed(), min, max); + + validateBucketData(timeseriesOptions, + collection->getDefaultCollator(), + version, + control[timeseries::kBucketControlCountFieldName], + min, + max, + data); + } catch (DBException& ex) { + // Catch any validation error and attach extra context to be able to debug validation errors + // or remediate corrupt buckets + ex.addContext(fmt::format("Bucket _id: {}", bucketId.toString())); + // Perform logging of occurances. This is rate limited to protect against malicious use. + logExceptionRateLimited(ex); + throw; + } +} + +void validateBucketIdTimestamp(const TimeseriesOptions& timeseriesOptions, + const OID& id, + const BSONObj& controlMin) { + // Ensure the time field exists + const StringData timeField = timeseriesOptions.getTimeField(); + + // Compares both timestamps as Dates. + auto minTimestamp = controlMin[timeField].Date(); + auto oidEmbeddedTimestamp = id.asDateT(); + + // If this bucket contains extended-range measurements, we cannot assert that the + // minTimestamp matches the embedded timestamp. + if (minTimestamp != oidEmbeddedTimestamp && + !timeseries::dateOutsideStandardRange(minTimestamp)) { + uasserted(ErrorCodes::BadValue, + str::stream() << "Mismatch between the embedded timestamp " + << oidEmbeddedTimestamp.toString() + << " in the time-series bucket '_id' field and the timestamp " + << minTimestamp.toString() << " in 'control.min' field."); + } +} + +void validateBucketTimeSpan(const TimeseriesOptions& timeseriesOptions, + bool fixedBucketingEnabled, + const BSONObj& controlMin, + const BSONObj& controlMax) { + auto minTimestamp = controlMin[timeseriesOptions.getTimeField()].Date(); + auto maxTimestamp = controlMax[timeseriesOptions.getTimeField()].Date(); + auto bucketMaxSpanSeconds = timeseriesOptions.getBucketMaxSpanSeconds(); + if (maxTimestamp - minTimestamp >= Seconds(*bucketMaxSpanSeconds)) { + uasserted(ErrorCodes::BadValue, + str::stream() << "Time span of measurements in the bucket is too large. " + << "The difference between control.max and control.min is " + << (maxTimestamp - minTimestamp).toString() + << ", but the maximum allowed span is " << bucketMaxSpanSeconds + << " seconds."); + } + + // Enforce that control.min time is aligned to the fixed bucket boundary when the + // fixed-bucketing optimization is enabled. + if (fixedBucketingEnabled) { + auto expectedMinTimestamp = roundTimestampToGranularity(minTimestamp, timeseriesOptions); + uassert(ErrorCodes::BadValue, + fmt::format("control.min.{} is not rounded to expected boundary when " + "fixed-bucketing is enabled. Expected {}, but got {}.", + timeseriesOptions.getTimeField(), + minTimestamp.toString(), + expectedMinTimestamp.toString()), + minTimestamp == expectedMinTimestamp); + } +} + +void validateBucketData(const TimeseriesOptions& timeseriesOptions, + const CollatorInterface* collator, + int bucketVersion, + BSONElement controlCount, + const BSONObj& controlMin, + const BSONObj& controlMax, + const BSONObj& data) { + + // Builds a hash map for the fields to avoid repeated traversals. + auto buildFieldTable = [](StringDataMap& table, const BSONObj& fields) { + for (const auto& field : fields) { + uassert(ErrorCodes::BadValue, + str::stream() << "Duplicate field '" << field.fieldNameStringData() + << "' detected in bucket.", + table.try_emplace(field.fieldNameStringData(), field).second); + } + }; + + StringDataMap dataFields; + StringDataMap controlMinFields; + StringDataMap controlMaxFields; + buildFieldTable(dataFields, data); + buildFieldTable(controlMinFields, controlMin); + buildFieldTable(controlMaxFields, controlMax); + + // Checks that the number of 'control.min' and 'control.max' fields agrees with number of 'data' + // fields. + if (dataFields.size() != controlMinFields.size() || + controlMinFields.size() != controlMaxFields.size()) { + uasserted( + ErrorCodes::BadValue, + fmt::format("Mismatch between the number of time-series control fields and the number " + "of data fields. Control had {} min fields and {} max fields, but observed " + "data had {} fields.", + controlMinFields.size(), + controlMaxFields.size(), + dataFields.size())); + }; + + if (bucketVersion == timeseries::kTimeseriesControlUncompressedVersion) { + _validateUncompressedBucketData( + timeseriesOptions, collator, dataFields, controlMinFields, controlMaxFields); + } else { + int count = controlCount.numberInt(); + uassert(ErrorCodes::BadValue, + "Unexpected control.count value, undefined integer representation", + count == controlCount.safeNumberInt()); + _validateCompressedBucketData(timeseriesOptions, + collator, + bucketVersion, + count, + dataFields, + controlMinFields, + controlMaxFields); + } +} +} // namespace mongo::timeseries diff --git a/src/mongo/db/timeseries/timeseries_bucket_validation.h b/src/mongo/db/timeseries/timeseries_bucket_validation.h new file mode 100644 index 00000000000..71a9f48cbcb --- /dev/null +++ b/src/mongo/db/timeseries/timeseries_bucket_validation.h @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/shard_role/shard_catalog/collection.h" +#include "mongo/db/timeseries/timeseries_options.h" + +MONGO_MOD_PUBLIC; + +namespace mongo::timeseries { +/** + * Performs strict validation of a timeseries bucket document. + * Throws an exception on any validation failure + * + * The strict validation validates the following properties: + * - Bucket schema adheres to specification depending on bucket version. + * - Control.min.time is identical to bucket _id time for dates within normal range + * - Bucket time span is compatible with collection granularity/max span setting. + * - Rounding of control.min.time for collections with fixed-bucket optimization enabled. + * - Control.count matches count of all data columns + * - Control.min is equal to min of data column content. + * - Control.max is equal to max of data column content. + * + * Some properties that are NOT validated due to existing data and benign impact: + * - Presence of mixed schema + * - Rounding of control.min.time, for collections without fixed-bucket optimization. + * - Equality of control.max.time, for buckets with extended range dates. + * - Min/max time for dates outside of normal range. + * - v3 bucket sortedness + */ +void validateBucketConsistency(const Collection* collection, const BSONObj& bucketDoc); + +/** + * TODO SERVER-122862: Use in validation command + */ +void validateBucketIdTimestamp(const TimeseriesOptions& timeseriesOptions, + const OID& id, + const BSONObj& controlMin); + +/** + * TODO SERVER-122862: Use in validation command + */ +void validateBucketTimeSpan(const TimeseriesOptions& timeseriesOptions, + bool fixedBucketingEnabled, + const BSONObj& controlMin, + const BSONObj& controlMax); + +/** + * TODO SERVER-122862: Use in validation command + */ +void validateBucketData(const TimeseriesOptions& timeseriesOptions, + const CollatorInterface* collator, + int bucketVersion, + BSONElement controlCount, + const BSONObj& controlMin, + const BSONObj& controlMax, + const BSONObj& data); +} // namespace mongo::timeseries diff --git a/src/mongo/db/validate/collection_validation_test.cpp b/src/mongo/db/validate/collection_validation_test.cpp index a030270f1fe..8c9080920ca 100644 --- a/src/mongo/db/validate/collection_validation_test.cpp +++ b/src/mongo/db/validate/collection_validation_test.cpp @@ -792,7 +792,8 @@ BSONObj removeNestedField(const BSONObj& bson, std::span neste class TimeseriesCollectionValidationTest : public CatalogTestFixture { public: TimeseriesCollectionValidationTest(Options options = {}) - : CatalogTestFixture(std::move(options)) { + : CatalogTestFixture(std::move(options)), + _allowCorruptTimeseriesBuckets("timeseriesDisableStrictBucketValidator", true) { _nss = NamespaceString::createNamespaceString_forTest("test.system.buckets.ts"); } @@ -979,6 +980,7 @@ public: CollectionOptions _options; CollectionValidation::ValidateMode _validateMode{ CollectionValidation::ValidateMode::kForeground}; + RAIIServerParameterControllerForTest _allowCorruptTimeseriesBuckets; protected: void setUp() override {