SERVER-123693 Extend mixed schema PBT to more comprehensive rollover PBT (#51603)

GitOrigin-RevId: eeb30e6156abc92c219f02ceb711380e14566b75
This commit is contained in:
Binh Vo 2026-04-14 15:08:14 -04:00 committed by MongoDB Bot
parent 8d0efd689f
commit 8669c0ed31
6 changed files with 635 additions and 254 deletions

View File

@ -4,25 +4,13 @@
import {fc} from "jstests/third_party/fast_check/fc-4.6.0.js";
import {normalDistRealArb} from "jstests/write_path/timeseries/pbt/lib/arb_utils.js";
const defaultLatitudeMin = -90;
const defaultLatitudeMax = 90;
const defaultLongitudeMin = -180;
const defaultLongitudeMax = 180;
/**
* Function for producing realistic values for floating point data. fc.double has a tendency
* to cluster values at extremes.
*
* @param {number} min minimum value
* @param {number} max maximum value
* @param {number} [resolution] number of steps to break integers down into
*/
const normalDistRealArb = (min, max, resolution = 1000.0) => {
return fc
.integer({min: min * resolution, max: max * resolution})
.map((x) => Math.min(Math.max(x / resolution, min), max));
};
/**
* Make a GeoData Point arbitrary.
* @param {Object} [ranges] ranges for longitude and latitude

View File

@ -0,0 +1,23 @@
/*
* Shared arbitrary utilities for property-based timeseries tests.
*/
import {fc} from "jstests/third_party/fast_check/fc-4.6.0.js";
/**
* Produce uniformly distributed real numbers in [min, max].
*
* fc.double skews toward extreme values (±0, ±Infinity boundaries, denormals).
* This helper uses fc.integer under the hood, which is truly uniform, then
* scales into the floating-point range.
*
* @param {number} min minimum value (inclusive)
* @param {number} max maximum value (inclusive)
* @param {number} [resolution=1000] number of steps per integer unit
* @returns {fc.Arbitrary<number>}
*/
export const normalDistRealArb = (min, max, resolution = 1000.0) => {
return fc
.integer({min: min * resolution, max: max * resolution})
.map((x) => Math.min(Math.max(x / resolution, min), max));
};

View File

@ -75,7 +75,7 @@ export class InsertCommand {
run(model, real) {
const {tsColl, ctrlColl} = real;
const {resTs, resCtrl} = [tsColl, ctrlColl].map((coll) => {
const [resTs, resCtrl] = [tsColl, ctrlColl].map((coll) => {
try {
return coll.insertOne(this.doc);
} catch (e) {
@ -123,7 +123,7 @@ export class BatchInsertCommand {
if (!this.docs || this.docs.length === 0) {
return;
}
const {resTs, resCtrl} = [tsColl, ctrlColl].map((coll) => {
const [resTs, resCtrl] = [tsColl, ctrlColl].map((coll) => {
try {
return coll.insertMany(this.docs);
} catch (e) {

View File

@ -4,6 +4,7 @@
import {fc} from "jstests/third_party/fast_check/fc-4.6.0.js";
import {normalDistRealArb} from "jstests/write_path/timeseries/pbt/lib/arb_utils.js";
import {
makeMetricArb,
makeSensorDateMetricStreamArb,
@ -12,6 +13,138 @@ import {
makeRunnyMetricStreamArb,
} from "jstests/write_path/timeseries/pbt/lib/metric_arbitraries.js";
// Rollover-condition constants
// Matches server-side gTimeseriesBucketMaxCount (timeseries.idl).
// When countRollover is enabled, streams are sized to exceed this limit.
export const kBucketMaxCount = 1000;
// Field name injected into documents when sizeRolloverChance fires.
// The double-underscore prefix and length (>8 chars) guarantee it never collides
// with the 1-8 char names produced by the default fieldNameArb.
export const kSizeRolloverFieldName = "__largePayloadField";
// Per-document payload size (bytes) used for size-rollover injection.
// Chosen to exceed gTimeseriesBucketMaxSize (128 000 B) on its own, so that a single
// injected document triggers the size check regardless of prior bucket contents.
// The value stays well below kLargeMeasurementsMaxBucketSize (12 MB)
// so the document is absorbed rather than causing a kSize/kCachePressure rollover.
const kSizeRolloverFieldBytes = 130_000;
// Pre-built once at module load; reused across every injected document.
const kSizeRolloverLargeValue = "x".repeat(kSizeRolloverFieldBytes);
/**
* Build a time stream where some inter-document gaps are forced to exceed
* `bucketSpanSeconds`, triggering a kTimeForward bucket rollover.
*
* For each gap between consecutive timestamps the caller decides (via `chance`)
* whether to widen it to > bucketSpanMs. Widening is cumulative so the
* resulting stream stays monotonically non-decreasing.
*
* @param {number} docCount
* @param {Date} dateMin
* @param {Date} dateMax
* @param {number} bucketSpanSeconds
* @param {string|object} timeBucketing - forwarded to makeSensorDateMetricStreamArb
* @param {number} chance - probability per gap of injecting a forward jump [0, 1]
* @returns {fc.Arbitrary<Date[]>}
*/
function makeTimeForwardStreamArb(docCount, dateMin, dateMax, bucketSpanSeconds, timeBucketing, chance) {
const bucketSpanMs = bucketSpanSeconds * 1_000;
const baseArb = makeSensorDateMetricStreamArb(
docCount,
docCount,
{dateRange: {min: dateMin, max: dateMax}},
timeBucketing,
);
if (docCount <= 1) return baseArb;
const numGaps = docCount - 1;
const decisionsArb = fc.array(fc.double({min: 0, max: 1, noNaN: true}), {
minLength: numGaps,
maxLength: numGaps,
});
// Extra milliseconds added on top of the minimum needed to cross the boundary.
const extraJumpArb = fc.array(fc.integer({min: 0, max: bucketSpanMs * 2}), {
minLength: numGaps,
maxLength: numGaps,
});
return fc.tuple(baseArb, decisionsArb, extraJumpArb).map(([stream, decisions, extras]) => {
const result = [stream[0]];
let cumulativeOffset = 0;
for (let i = 1; i < docCount; i++) {
const gap = stream[i].getTime() - stream[i - 1].getTime();
if (decisions[i - 1] < chance) {
// Force this gap to exceed the bucket span.
cumulativeOffset += Math.max(0, bucketSpanMs + 1 - gap) + extras[i - 1];
}
const newMs = Math.min(stream[i].getTime() + cumulativeOffset, dateMax.getTime());
result.push(new Date(newMs));
}
return result;
});
}
/**
* Post-process a docs array to inject:
* - kTimeBackward: replace some timestamps with values before the stream minimum
* - kSize: add a large field to some documents
*
* Returns an fc.Arbitrary so that the random selections participate in
* fast-check's shrinking.
*
* @param {Object[]} docs
* @param {string} timeFieldname
* @param {number} timeBackwardChance - per-doc probability [0, 1]
* @param {number} sizeRolloverChance - per-doc probability [0, 1]
* @param {Date} dateMin
* @returns {fc.Arbitrary<Object[]>}
*/
function applyRolloverInjections(docs, timeFieldname, timeBackwardChance, sizeRolloverChance, dateMin) {
if (docs.length === 0 || (timeBackwardChance <= 0 && sizeRolloverChance <= 0)) {
return fc.constant(docs);
}
const n = docs.length;
const minTimestampMs = docs.reduce((min, d) => Math.min(min, d[timeFieldname].getTime()), Infinity);
// Backward-timestamp decisions + offset magnitudes.
const backDecisionsArb =
timeBackwardChance > 0
? fc.array(normalDistRealArb(0, 1), {minLength: n, maxLength: n})
: fc.constant(new Array(n).fill(1.0));
const backOffsetArb =
timeBackwardChance > 0
? fc.array(fc.integer({min: 3_600_000, max: 86_400_000 * 7}), {minLength: n, maxLength: n})
: fc.constant(new Array(n).fill(0));
// Size-field decisions.
const sizeDecisionsArb =
sizeRolloverChance > 0
? fc.array(fc.double({min: 0, max: 1, noNaN: true}), {minLength: n, maxLength: n})
: fc.constant(new Array(n).fill(1.0));
return fc
.tuple(backDecisionsArb, backOffsetArb, sizeDecisionsArb)
.map(([backDecisions, backOffsets, sizeDecisions]) => {
return docs.map((doc, i) => {
const result = {...doc};
if (backDecisions[i] < timeBackwardChance) {
const backMs = Math.max(dateMin.getTime(), minTimestampMs - backOffsets[i]);
result[timeFieldname] = new Date(backMs);
}
if (sizeDecisions[i] < sizeRolloverChance) {
result[kSizeRolloverFieldName] = kSizeRolloverLargeValue;
}
return result;
});
});
}
/**
* Make a single measurement document arbitrary.
*
@ -156,6 +289,24 @@ export function makeMeasurementDocArb(
* @param {number} [options.mixedSchemaChance=0.0] // chance that a given field will have a mixed schema across the stream
* @param {number} [options.newFieldFrequency=0.1] // per-doc probability that a new field is added to the schema going forward
* @param {fc.Arbitrary<string>} [options.ranges.fieldNameArb]
* @param {Object} [options.rolloverConditions] // composable bucket-rollover injections
* @param {number} [options.rolloverConditions.timeForwardChance=0.0]
* Per-step probability [0,1] of injecting a gap that exceeds bucketSpanSeconds,
* triggering a kTimeForward bucket rollover.
* @param {number} [options.rolloverConditions.timeBackwardChance=0.0]
* Per-doc probability [0,1] of replacing a timestamp with one earlier than the
* stream minimum, triggering a kTimeBackward bucket rollover.
* @param {boolean} [options.rolloverConditions.countRollover=false]
* When true, forces minDocs >= kBucketMaxCount+1 (1001) so that each batch
* is large enough to trigger a kCount bucket rollover.
* @param {number} [options.rolloverConditions.sizeRolloverChance=0.0]
* Per-doc probability [0,1] of adding a ~25 KB field, causing accumulated
* bucket size to exceed gTimeseriesBucketMaxSize (kSize rollover).
* NOTE: kCachePressure rollover depends on server-side memory state and cannot
* be injected at the document level; it is not represented here.
* @param {number} [options.bucketSpanSeconds=3600]
* Bucket time span used when computing time-forward jump sizes. Should match
* the collection's bucketMaxSpanSeconds (default granularity = hours = 3600 s).
*
* @returns {fc.Arbitrary<Object[]>}
*/
@ -172,6 +323,23 @@ export function makeMeasurementDocStreamArb(timeFieldname, metaFieldname, metaVa
throw new Error("makeMeasurementDocStreamArb: mixedSchemaChance must be a number in [0, 1]");
}
const rolloverConditions = options.rolloverConditions ?? {};
const timeForwardChance = rolloverConditions.timeForwardChance ?? 0.0;
const timeBackwardChance = rolloverConditions.timeBackwardChance ?? 0.0;
const countRollover = rolloverConditions.countRollover ?? false;
const sizeRolloverChance = rolloverConditions.sizeRolloverChance ?? 0.0;
const bucketSpanSeconds = options.bucketSpanSeconds ?? 3600;
for (const [key, val] of [
["timeForwardChance", timeForwardChance],
["timeBackwardChance", timeBackwardChance],
["sizeRolloverChance", sizeRolloverChance],
]) {
if (typeof val !== "number" || val < 0 || val > 1) {
throw new Error(`makeMeasurementDocStreamArb: rolloverConditions.${key} must be a number in [0, 1]`);
}
}
const {
intRange,
doubleRange,
@ -196,8 +364,16 @@ export function makeMeasurementDocStreamArb(timeFieldname, metaFieldname, metaVa
const dateMin = dateRange?.min ?? defaultDateMin;
const dateMax = dateRange?.max ?? defaultDateMax;
// Exclude the size-rollover field name so generated names never collide with it.
const baseFieldNameArb = fieldNameArb.filter(
(name) => !["_id", timeFieldname, metaFieldname, ...Object.keys(explicitArbitraries)].includes(name),
(name) =>
![
"_id",
timeFieldname,
metaFieldname,
kSizeRolloverFieldName,
...Object.keys(explicitArbitraries),
].includes(name),
);
const fieldNamesArb = fc.array(baseFieldNameArb, {
@ -205,7 +381,11 @@ export function makeMeasurementDocStreamArb(timeFieldname, metaFieldname, metaVa
maxLength: maxFields,
});
const docCountArb = fc.integer({min: minDocs, max: maxDocs});
// countRollover: ensure the batch is large enough to trigger a kCount rollover.
const effectiveMinDocs = countRollover ? Math.max(minDocs, kBucketMaxCount + 1) : minDocs;
const effectiveMaxDocs = countRollover ? Math.max(maxDocs, kBucketMaxCount + 10) : maxDocs;
const docCountArb = fc.integer({min: effectiveMinDocs, max: effectiveMaxDocs});
// Parent metric arb for meta (when not fixed) and extra fields
const parentMetricArb = makeMetricArb(options.types, {
@ -227,13 +407,24 @@ export function makeMeasurementDocStreamArb(timeFieldname, metaFieldname, metaVa
return fc.constant([]);
}
// Sensor-like increasing time stream to create meaningful bucket boundaries.
const timeStreamArb = makeSensorDateMetricStreamArb(
docCount,
docCount,
{dateRange: {min: dateMin, max: dateMax}},
timeBucketing,
);
// Time stream: use forward-jump injection when timeForwardChance > 0,
// otherwise fall back to the standard sensor-like stream.
const timeStreamArb =
timeForwardChance > 0
? makeTimeForwardStreamArb(
docCount,
dateMin,
dateMax,
bucketSpanSeconds,
timeBucketing,
timeForwardChance,
)
: makeSensorDateMetricStreamArb(
docCount,
docCount,
{dateRange: {min: dateMin, max: dateMax}},
timeBucketing,
);
const metaStreamArb = fc.array(fc.constant(chosenMetaValue), {minLength: docCount, maxLength: docCount});
@ -318,7 +509,12 @@ export function makeMeasurementDocStreamArb(timeFieldname, metaFieldname, metaVa
}
return docs;
}),
})
// Apply time-backward and size-rollover injections as a chain so that
// the random decisions participate in fast-check's shrinking.
.chain((docs) =>
applyRolloverInjections(docs, timeFieldname, timeBackwardChance, sizeRolloverChance, dateMin),
),
);
});
});

View File

@ -1,228 +0,0 @@
/**
* A property-based test that compares compatible command sequences between timeseries
* and non-timeseries collections, with mixed-schema extra field streams enabled.
*
* @tags: [
* query_intensive_pbt,
* requires_timeseries,
* requires_getmore,
* not_allowed_with_signed_security_token,
* ]
*/
import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js";
import {fc} from "jstests/third_party/fast_check/fc-4.6.0.js";
import {makeEmptyModel} from "jstests/write_path/timeseries/pbt/lib/command_grammar.js";
import {makeTimeseriesCommandSequenceArb} from "jstests/write_path/timeseries/pbt/lib/command_arbitraries.js";
import {assertCollectionValid, assertCollectionsMatch} from "jstests/write_path/timeseries/pbt/lib/assertions.js";
import {getFcParams, getFcAssertArgs} from "jstests/write_path/timeseries/pbt/lib/fast_check_params.js";
import {getTimeseriesCollForRawOps} from "jstests/libs/raw_operation_utils.js";
const fcParams = getFcParams();
const fcAssertArgs = getFcAssertArgs();
const ctrlCollName = jsTestName() + "_control";
const tsCollName = jsTestName() + "_timeseries";
const timeField = "ts";
const metaField = "meta";
const metaValue = "metavalue";
function classifyValueType(value) {
if (value === null) {
return "null";
}
if (value === undefined) {
return "undefined";
}
if (value instanceof Date) {
return "date";
}
if (typeof value === "number") {
return "double";
}
if (typeof value === "string") {
return "string";
}
if (typeof value === "boolean") {
return "bool";
}
if (value instanceof Timestamp) {
return "timestamp";
}
if (value instanceof NumberLong) {
return "long";
}
if (value instanceof NumberDecimal) {
return "decimal";
}
if (value instanceof ObjectId) {
return "objectId";
}
if (typeof BinData !== "undefined" && value instanceof BinData) {
return "binData";
}
if (typeof DBPointer !== "undefined" && value instanceof DBPointer) {
return "dbPointer";
}
if (typeof Code !== "undefined" && value instanceof Code) {
return "javascript";
}
return typeof value;
}
function makeOrderedTypeKey(typeSet) {
return Array.from(typeSet).sort().join("|");
}
function updateGeneratedFieldStreamStats(accumulatedStats, docs) {
const fieldTypeSets = {};
for (const doc of docs) {
for (const [fieldName, value] of Object.entries(doc)) {
if (["_id", timeField, metaField].includes(fieldName)) {
continue;
}
fieldTypeSets[fieldName] = fieldTypeSets[fieldName] || new Set();
fieldTypeSets[fieldName].add(classifyValueType(value));
}
}
for (const typeSet of Object.values(fieldTypeSets)) {
if (typeSet.size > 1) {
accumulatedStats.fieldStreamsGenerated.mixedTypeFields += 1;
const typeKey = makeOrderedTypeKey(typeSet);
accumulatedStats.insertedTypeCombinations[typeKey] =
(accumulatedStats.insertedTypeCombinations[typeKey] || 0) + 1;
} else {
accumulatedStats.fieldStreamsGenerated.singleTypeFields += 1;
}
}
return accumulatedStats;
}
function updateFinalCollectionFieldTypeStats(accumulatedStats, docs) {
const fieldTypeSets = {};
for (const doc of docs) {
for (const [fieldName, value] of Object.entries(doc)) {
if (["_id", timeField, metaField].includes(fieldName)) {
continue;
}
fieldTypeSets[fieldName] = fieldTypeSets[fieldName] || new Set();
fieldTypeSets[fieldName].add(classifyValueType(value));
}
}
for (const typeSet of Object.values(fieldTypeSets)) {
if (typeSet.size > 1) {
accumulatedStats.finalCollectionFieldTypes.mixedTypeFields += 1;
} else {
accumulatedStats.finalCollectionFieldTypes.singleTypeFields += 1;
}
}
return accumulatedStats;
}
describe("Comparative PBT for mixed-schema timeseries field streams", () => {
let tsColl;
let ctrlColl;
let bucketColl;
let stats;
const beforeHook = () => {
db[ctrlCollName].drop();
db[tsCollName].drop();
db.createCollection(ctrlCollName);
db.createCollection(tsCollName, {timeseries: {timeField, metaField}});
ctrlColl = db.getCollection(ctrlCollName);
tsColl = db.getCollection(tsCollName);
bucketColl = getTimeseriesCollForRawOps(tsColl.getDB(), tsColl);
};
beforeEach(function () {
stats = {
commands: {},
fieldStreamsGenerated: {
mixedTypeFields: 0,
singleTypeFields: 0,
},
insertedTypeCombinations: {},
finalCollectionFieldTypes: {
mixedTypeFields: 0,
singleTypeFields: 0,
},
};
});
afterEach(function () {
const percentages = {};
for (const [statSliceName, statSlice] of Object.entries(stats)) {
const sum = Object.values(statSlice).reduce((acc, val) => acc + val, 0);
percentages[statSliceName] = Object.fromEntries(
Object.entries(statSlice).map(([key, val]) => [key, sum === 0 ? 0 : (100.0 * val) / sum]),
);
}
jsTest.log.info({
"Arbitrary generation stats for this mixed-schema test run": {
stats,
percentages,
},
});
});
const commandStatsReducer = (accumulatedStats, command) => {
const commandName = command.cmd.constructor.name;
accumulatedStats.commands[commandName] = (accumulatedStats.commands[commandName] || 0) + 1;
if (commandName === "BatchInsertCommand") {
return updateGeneratedFieldStreamStats(accumulatedStats, command.cmd.docs);
}
return accumulatedStats;
};
it("keeps tsColl and ctrlColl in sync under mixed-schema field streams", () => {
const programArb = makeTimeseriesCommandSequenceArb(
fcParams.minCommands || 1, // minCommands
fcParams.maxCommands || 30, // maxCommands
timeField,
metaField,
metaValue,
1, // minFields
3, // maxFields
0, // minDocs
50, // maxDocs
{
// options
mixedSchemaChance: 0.5,
},
undefined, //fieldNameArb
fcParams.replayPath,
);
fc.assert(
fc
.property(programArb, (cmds) => {
const model = makeEmptyModel(ctrlColl, bucketColl);
fc.modelRun(() => ({model, real: {tsColl, ctrlColl}}), cmds);
assertCollectionsMatch(tsColl, ctrlColl);
assertCollectionValid(tsColl);
stats = cmds.commands.reduce(commandStatsReducer, stats);
stats = updateFinalCollectionFieldTypeStats(stats, tsColl.find().toArray());
})
.beforeEach(beforeHook),
fcAssertArgs,
);
});
});

View File

@ -0,0 +1,402 @@
/**
* A property-based test that compares compatible command sequences between timeseries
* and non-timeseries collections, exercising all bucket rollover conditions:
* kTimeForward - measurement time exceeds bucket max span
* kTimeBackward - measurement time precedes bucket start
* kCount - bucket reaches gTimeseriesBucketMaxCount (1000) measurements
* kSize - bucket exceeds gTimeseriesBucketMaxSize (125 KB)
* kSchemaChange - field type changes within a bucket (mixed-schema)
* kCachePressure - depends on server memory state; not injectable at doc level
*
* @tags: [
* query_intensive_pbt,
* requires_timeseries,
* requires_getmore,
* not_allowed_with_signed_security_token,
* ]
*/
import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js";
import {fc} from "jstests/third_party/fast_check/fc-4.6.0.js";
import {makeEmptyModel} from "jstests/write_path/timeseries/pbt/lib/command_grammar.js";
import {makeTimeseriesCommandSequenceArb} from "jstests/write_path/timeseries/pbt/lib/command_arbitraries.js";
import {assertCollectionValid, assertCollectionsMatch} from "jstests/write_path/timeseries/pbt/lib/assertions.js";
import {getFcParams, getFcAssertArgs} from "jstests/write_path/timeseries/pbt/lib/fast_check_params.js";
import {getTimeseriesCollForRawOps} from "jstests/libs/raw_operation_utils.js";
import {kSizeRolloverFieldName} from "jstests/write_path/timeseries/pbt/lib/measurement_arbitraries.js";
const fcParams = getFcParams();
const fcAssertArgs = getFcAssertArgs();
const ctrlCollName = jsTestName() + "_control";
const tsCollName = jsTestName() + "_timeseries";
const timeField = "ts";
const metaField = "meta";
const metaValue = "metavalue";
// Bucket span for the default "hours" granularity (3600 s).
const kBucketSpanSeconds = 3600;
function classifyValueType(value) {
if (value === null) {
return "null";
}
if (value === undefined) {
return "undefined";
}
if (value instanceof Date) {
return "date";
}
if (typeof value === "number") {
return "double";
}
if (typeof value === "string") {
return "string";
}
if (typeof value === "boolean") {
return "bool";
}
if (value instanceof Timestamp) {
return "timestamp";
}
if (value instanceof NumberLong) {
return "long";
}
if (value instanceof NumberDecimal) {
return "decimal";
}
if (value instanceof ObjectId) {
return "objectId";
}
if (typeof BinData !== "undefined" && value instanceof BinData) {
return "binData";
}
if (typeof DBPointer !== "undefined" && value instanceof DBPointer) {
return "dbPointer";
}
if (typeof Code !== "undefined" && value instanceof Code) {
return "javascript";
}
return typeof value;
}
function makeOrderedTypeKey(typeSet) {
return Array.from(typeSet).sort().join("|");
}
function updateGeneratedFieldStreamStats(accumulatedStats, docs) {
const fieldTypeSets = {};
for (const doc of docs) {
for (const [fieldName, value] of Object.entries(doc)) {
if ([timeField, metaField].includes(fieldName)) {
continue;
}
fieldTypeSets[fieldName] = fieldTypeSets[fieldName] || new Set();
fieldTypeSets[fieldName].add(classifyValueType(value));
}
}
for (const typeSet of Object.values(fieldTypeSets)) {
if (typeSet.size > 1) {
accumulatedStats.fieldStreamsGenerated.mixedTypeFields += 1;
const typeKey = makeOrderedTypeKey(typeSet);
accumulatedStats.insertedTypeCombinations[typeKey] =
(accumulatedStats.insertedTypeCombinations[typeKey] || 0) + 1;
} else {
accumulatedStats.fieldStreamsGenerated.singleTypeFields += 1;
}
}
return accumulatedStats;
}
function updateFinalCollectionFieldTypeStats(accumulatedStats, docs) {
const fieldTypeSets = {};
for (const doc of docs) {
for (const [fieldName, value] of Object.entries(doc)) {
if ([timeField, metaField].includes(fieldName)) {
continue;
}
fieldTypeSets[fieldName] = fieldTypeSets[fieldName] || new Set();
fieldTypeSets[fieldName].add(classifyValueType(value));
}
}
for (const typeSet of Object.values(fieldTypeSets)) {
if (typeSet.size > 1) {
accumulatedStats.finalCollectionFieldTypes.mixedTypeFields += 1;
} else {
accumulatedStats.finalCollectionFieldTypes.singleTypeFields += 1;
}
}
return accumulatedStats;
}
/**
* Classify which rollover conditions are present in a batch of generated docs.
* Returns counts of injected condition instances for statistics.
*/
function updateRolloverStats(accumulatedStats, docs) {
if (docs.length === 0) return accumulatedStats;
const timestamps = docs.map((d) => d[timeField].getTime());
const minTs = Math.min(...timestamps);
// kTimeForward: any consecutive gap > kBucketSpanSeconds * 1000 ms
for (let i = 1; i < timestamps.length; i++) {
if (timestamps[i] - timestamps[i - 1] > kBucketSpanSeconds * 1000) {
accumulatedStats.rollover.timeForwardGaps += 1;
}
}
// kTimeBackward: any timestamp strictly before the batch minimum
// (detect docs where timestamp < minTs, i.e. injected backward timestamps)
for (let i = 0; i < timestamps.length; i++) {
if (timestamps[i] < minTs) {
accumulatedStats.rollover.timeBackwardDocs += 1;
}
}
// kSize: presence of the large-payload sentinel field
for (const doc of docs) {
if (Object.prototype.hasOwnProperty.call(doc, kSizeRolloverFieldName)) {
accumulatedStats.rollover.sizeLargeDocs += 1;
}
}
// kCount: batch large enough to trigger count rollover on its own
if (docs.length > 1000) {
accumulatedStats.rollover.countRolloverBatches += 1;
}
return accumulatedStats;
}
function makeEmptyStats() {
return {
commands: {},
fieldStreamsGenerated: {
mixedTypeFields: 0,
singleTypeFields: 0,
},
insertedTypeCombinations: {},
finalCollectionFieldTypes: {
mixedTypeFields: 0,
singleTypeFields: 0,
},
rollover: {
timeForwardGaps: 0,
timeBackwardDocs: 0,
sizeLargeDocs: 0,
countRolloverBatches: 0,
},
};
}
describe("Comparative PBT for timeseries bucket rollover conditions", () => {
let tsColl;
let ctrlColl;
let bucketColl;
let stats;
const beforeHook = () => {
db[ctrlCollName].drop();
db[tsCollName].drop();
db.createCollection(ctrlCollName);
db.createCollection(tsCollName, {timeseries: {timeField, metaField}});
ctrlColl = db.getCollection(ctrlCollName);
tsColl = db.getCollection(tsCollName);
bucketColl = getTimeseriesCollForRawOps(tsColl.getDB(), tsColl);
};
beforeEach(function () {
stats = makeEmptyStats();
});
afterEach(function () {
jsTest.log.info({
"Arbitrary generation stats for this rollover PBT run": stats,
});
});
const commandStatsReducer = (accumulatedStats, command) => {
const commandName = command.cmd.constructor.name;
accumulatedStats.commands[commandName] = (accumulatedStats.commands[commandName] || 0) + 1;
if (commandName === "BatchInsertCommand") {
accumulatedStats = updateGeneratedFieldStreamStats(accumulatedStats, command.cmd.docs);
accumulatedStats = updateRolloverStats(accumulatedStats, command.cmd.docs);
}
return accumulatedStats;
};
/**
* Core helper: run a property-based test with the given command-sequence arbitrary.
* Asserts that tsColl and ctrlColl stay in sync and the TS collection remains valid.
*/
function runRolloverPbt(programArb, assertArgs) {
fc.assert(
fc
.property(programArb, (cmds) => {
const model = makeEmptyModel(ctrlColl, bucketColl);
fc.modelRun(() => ({model, real: {tsColl, ctrlColl}}), cmds);
assertCollectionsMatch(tsColl, ctrlColl);
assertCollectionValid(tsColl);
stats = cmds.commands.reduce(commandStatsReducer, stats);
stats = updateFinalCollectionFieldTypeStats(stats, tsColl.find().toArray());
})
.beforeEach(beforeHook),
assertArgs ?? fcAssertArgs,
);
}
// ---- Individual rollover condition tests ------------------------------------
it("keeps tsColl and ctrlColl in sync under mixed-schema field streams (kSchemaChange)", () => {
runRolloverPbt(
makeTimeseriesCommandSequenceArb(
fcParams.minCommands || 1,
fcParams.maxCommands || 30,
timeField,
metaField,
metaValue,
1, // minFields
3, // maxFields
0, // minDocs
50, // maxDocs
{mixedSchemaChance: 0.5},
undefined,
fcParams.replayPath,
),
);
});
it("keeps tsColl and ctrlColl in sync under time-forward rollover (kTimeForward)", () => {
runRolloverPbt(
makeTimeseriesCommandSequenceArb(
fcParams.minCommands || 1,
fcParams.maxCommands || 30,
timeField,
metaField,
metaValue,
1, // minFields
3, // maxFields
0, // minDocs
50, // maxDocs
{
rolloverConditions: {timeForwardChance: 0.3},
bucketSpanSeconds: kBucketSpanSeconds,
},
undefined,
fcParams.replayPath,
),
);
});
it("keeps tsColl and ctrlColl in sync under time-backward rollover (kTimeBackward)", () => {
runRolloverPbt(
makeTimeseriesCommandSequenceArb(
fcParams.minCommands || 1,
fcParams.maxCommands || 30,
timeField,
metaField,
metaValue,
1, // minFields
3, // maxFields
1, // minDocs (need at least one doc to inject a backward timestamp)
50, // maxDocs
{
rolloverConditions: {timeBackwardChance: 0.3},
bucketSpanSeconds: kBucketSpanSeconds,
},
undefined,
fcParams.replayPath,
),
);
});
it("keeps tsColl and ctrlColl in sync under size-based rollover (kSize)", () => {
runRolloverPbt(
makeTimeseriesCommandSequenceArb(
fcParams.minCommands || 1,
fcParams.maxCommands || 30,
timeField,
metaField,
metaValue,
1, // minFields
3, // maxFields
0, // minDocs
50, // maxDocs
{rolloverConditions: {sizeRolloverChance: 0.3}},
undefined,
fcParams.replayPath,
),
);
});
it("keeps tsColl and ctrlColl in sync under count-based rollover (kCount)", () => {
// Each batch contains >= 1001 docs so every batch causes at least one
// kCount rollover. Reduce numRuns to keep runtime reasonable.
const countAssertArgs = {
...fcAssertArgs,
numRuns: Math.max(1, Math.min(fcAssertArgs.numRuns, 3)),
};
runRolloverPbt(
makeTimeseriesCommandSequenceArb(
1, // minCommands
2, // maxCommands - fewer commands since each batch is large
timeField,
metaField,
metaValue,
1, // minFields
3, // maxFields
0, // minDocs (overridden to 1001 by countRollover)
50, // maxDocs (overridden to 1010 by countRollover)
{rolloverConditions: {countRollover: true}},
undefined,
fcParams.replayPath,
),
countAssertArgs,
);
});
// ---- Comprehensive test: all conditions active simultaneously ----------------
it("keeps tsColl and ctrlColl in sync with all rollover conditions active", () => {
runRolloverPbt(
makeTimeseriesCommandSequenceArb(
fcParams.minCommands || 1,
fcParams.maxCommands || 30,
timeField,
metaField,
metaValue,
1, // minFields
3, // maxFields
0, // minDocs
50, // maxDocs
{
mixedSchemaChance: 0.25,
rolloverConditions: {
timeForwardChance: 0.2,
timeBackwardChance: 0.15,
sizeRolloverChance: 0.15,
// countRollover omitted: combining 1001-doc batches with the other
// conditions is very expensive; kCount is covered by its own test above.
},
bucketSpanSeconds: kBucketSpanSeconds,
},
undefined,
fcParams.replayPath,
),
);
});
});