GitOrigin-RevId: 3d321696148279c1e4253f1fac9407d86d024c8e
This commit is contained in:
parent
4d44dff8b3
commit
ca08e60b44
@ -19,6 +19,7 @@ mongo_cc_library(
|
||||
"bsoncolumn.cpp",
|
||||
"bsoncolumn_interleaved.cpp",
|
||||
"bsoncolumnbuilder.cpp",
|
||||
"interleaved_schema.cpp",
|
||||
"simple8b_type_util.cpp",
|
||||
],
|
||||
features = ["-pgo_profile_use"],
|
||||
@ -80,6 +81,7 @@ mongo_cc_unit_test(
|
||||
"bson_column_compressed_data.inl",
|
||||
"bsoncolumn_blockbased_test.cpp",
|
||||
"bsoncolumn_test.cpp",
|
||||
"interleaved_schema_test.cpp",
|
||||
"simple8b_test.cpp",
|
||||
"simple8b_type_util_test.cpp",
|
||||
],
|
||||
|
||||
@ -71,6 +71,39 @@ constexpr int kMaxCapacity = BSONObjMaxUserSize;
|
||||
constexpr int kElementValueOffset = 2;
|
||||
|
||||
|
||||
// Write a BSON sub-object header into the allocator: type byte + field name + null + 4-byte size
|
||||
// placeholder. Returns the offset of the size field for later fill-in.
|
||||
int writeSubObjHeader(BSONElementStorage& allocator, StringData fieldName, BSONType type) {
|
||||
auto fieldNameSize = fieldName.size();
|
||||
char* objdata = allocator.allocate(6 + fieldNameSize);
|
||||
objdata[0] = stdx::to_underlying(type);
|
||||
if (fieldNameSize > 0) {
|
||||
memcpy(objdata + 1, fieldName.data(), fieldNameSize);
|
||||
}
|
||||
objdata[fieldNameSize + 1] = '\0';
|
||||
|
||||
// The 4 bytes after the header are reserved for the sub-object size, filled in by
|
||||
// writeSubObjFooter once all fields are written.
|
||||
return allocator.position() - allocator.contiguous() - 4;
|
||||
}
|
||||
|
||||
// Finalize a sub-object: write EOO + fill in size, or deallocate if empty and not allowed.
|
||||
void writeSubObjFooter(BSONElementStorage& allocator,
|
||||
int sizeOffset,
|
||||
StringData fieldName,
|
||||
bool allowEmpty) {
|
||||
// No scalars were written — empty subobject not present in this element.
|
||||
if (!allowEmpty && allocator.position() == allocator.contiguous() + sizeOffset + 4) {
|
||||
allocator.deallocate(fieldName.size() + 6);
|
||||
return;
|
||||
}
|
||||
|
||||
auto eoo = allocator.allocate(1);
|
||||
*eoo = '\0';
|
||||
int32_t size = allocator.position() - allocator.contiguous() - sizeOffset;
|
||||
DataView(allocator.contiguous() + sizeOffset).write<LittleEndian<uint32_t>>(size);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
BSONElementStorage::Element::Element(char* buffer, int nameSize, int valueSize)
|
||||
@ -275,130 +308,46 @@ void BSONColumn::Iterator::_incrementRegular(Regular& regular) {
|
||||
}
|
||||
|
||||
void BSONColumn::Iterator::_incrementInterleaved(Interleaved& interleaved) {
|
||||
// Notify the internal allocator to keep all allocations in contigous memory. That way we can
|
||||
// produce the full BSONObj that we need to return.
|
||||
if (!interleaved.schema) {
|
||||
interleaved.schema.emplace(
|
||||
interleaved.referenceObj, interleaved.rootType, interleaved.arrays);
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Wrong number of interleaved states in BSON Column encoding",
|
||||
interleaved.schema->scalarCount() == interleaved.states.size());
|
||||
}
|
||||
using Op = InterleavedSchema::Op;
|
||||
|
||||
auto contiguous = _allocator->startContiguous();
|
||||
|
||||
// Iterate over the reference interleaved object. We match scalar subfields with our interleaved
|
||||
// states in order. Internally the necessary recursion is performed and the second lambda below
|
||||
// is called for scalar fields. Every element writes its data to the allocator so a full BSONObj
|
||||
// is produced, this usually happens within _loadDelta() but must explicitly be done in the
|
||||
// cases where re-materialization of the Element wasn't required (same as previous for example).
|
||||
// The first lambda outputs an RAII object that is instantiated every time we recurse deeper.
|
||||
// This handles writing the BSONObj size and EOO bytes for subobjects.
|
||||
auto stateIt = interleaved.states.begin();
|
||||
auto stateEnd = interleaved.states.end();
|
||||
int processed = 0;
|
||||
BSONObjTraversal t(
|
||||
interleaved.arrays,
|
||||
interleaved.rootType,
|
||||
[this](StringData fieldName, const BSONObj& obj, BSONType type) {
|
||||
// Called every time we recurse into a subobject. It makes sure we write the size and
|
||||
// EOO bytes.
|
||||
return BSONSubObjectAllocator(*_allocator, fieldName, obj, type);
|
||||
},
|
||||
[this, &stateIt, &stateEnd, &processed](const BSONElement& referenceField) {
|
||||
// Called for every scalar field in the reference interleaved BSONObj. We have as many
|
||||
// decoding states as scalars.
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Wrong number of interleaved states in BSON Column encoding",
|
||||
stateIt != stateEnd);
|
||||
auto& state = *(stateIt++);
|
||||
// Stack of sizeOffsets for nested sub-objects. Nesting is typically 1-2 levels deep.
|
||||
boost::container::small_vector<int, 4> subObjStack;
|
||||
|
||||
// Remember the iterator position before writing anything. This is to detect that
|
||||
// nothing was written and we need to copy the element into the allocator position.
|
||||
auto allocatorPosition = _allocator->position();
|
||||
BSONElement elem;
|
||||
// Load deltas if decoders are setup. nullptr is always used for "current". So even if
|
||||
// we are iterating the second time we are going to allocate new memory. This is a
|
||||
// tradeoff to avoid a decoded list of literals for every state that will only be used
|
||||
// if we iterate multiple times.
|
||||
if (auto d64 = get_if<DecodingState::Decoder64>(&state.decoder);
|
||||
d64 && d64->pos.valid() && (++d64->pos).more()) {
|
||||
elem = state.loadDelta(*_allocator, *d64);
|
||||
} else if (auto d128 = get_if<DecodingState::Decoder128>(&state.decoder);
|
||||
d128 && d128->pos.valid() && (++d128->pos).more()) {
|
||||
elem = state.loadDelta(*_allocator, *d128);
|
||||
} else if (*_control == stdx::to_underlying(BSONType::eoo)) {
|
||||
// Decoders are exhausted and the next control byte was EOO then we should exit
|
||||
// interleaved mode. Return false to end the recursion early.
|
||||
++_control;
|
||||
return false;
|
||||
} else {
|
||||
// Decoders are exhausted so we need to load the next control byte that by
|
||||
// definition belong to this decoder state as we iterate in the same known order.
|
||||
auto result = state.loadControl(*_allocator, _control, _end);
|
||||
_control += result.size;
|
||||
elem = result.element;
|
||||
|
||||
// If the loaded control byte was a literal it is stored without field name. We need
|
||||
// to create a new BSONElement with the field name added as this is a sub-field in
|
||||
// an object.
|
||||
auto fieldName = referenceField.fieldNameStringData();
|
||||
if (!elem.eoo() && elem.fieldNameStringData() != fieldName) {
|
||||
auto allocatedElem =
|
||||
_allocator->allocate(elem.type(), fieldName, elem.valuesize());
|
||||
memcpy(allocatedElem.value(), elem.value(), elem.valuesize());
|
||||
elem = allocatedElem.element();
|
||||
state.lastValue = elem;
|
||||
for (const auto& entry : interleaved.schema->entries()) {
|
||||
switch (entry.op) {
|
||||
case Op::kEnterSubObj:
|
||||
subObjStack.push_back(writeSubObjHeader(*_allocator, entry.fieldName, entry.type));
|
||||
break;
|
||||
case Op::kScalar:
|
||||
if (!_processScalar(interleaved.states[entry.stateIdx], entry.fieldName)) {
|
||||
// EOO encountered — exit interleaved mode. Must be the first scalar.
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Cannot load regular mode after processing interleaved mode",
|
||||
entry.stateIdx == 0);
|
||||
auto stateIdx = entry.stateIdx; // save before reset invalidates entry
|
||||
interleaved.schema.reset();
|
||||
_drainAndVerifyDecoders(interleaved.states, stateIdx + 1);
|
||||
_exitInterleavedMode();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// If the encoded element wasn't stored in the allocator above we need to copy it here
|
||||
// as we're building a full BSONObj.
|
||||
if (!elem.eoo()) {
|
||||
if (_allocator->position() == allocatorPosition) {
|
||||
auto size = elem.size();
|
||||
memcpy(_allocator->allocate(size), elem.rawdata(), size);
|
||||
}
|
||||
|
||||
// Remember last known value, needed for further decompression.
|
||||
state.lastValue = elem;
|
||||
}
|
||||
|
||||
++processed;
|
||||
return true;
|
||||
});
|
||||
|
||||
// Traverse interleaved reference object, we will match interleaved states with literals.
|
||||
auto res = t.traverse(interleaved.referenceObj);
|
||||
if (!res) {
|
||||
// Exit interleaved mode and load as regular. Re-instantiate the state and set last known
|
||||
// value.
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Cannot load regular mode after processing interleaved mode",
|
||||
processed == 0);
|
||||
|
||||
// Before exiting interleaved mode, verify all of the decoders are exhausted.
|
||||
while (stateIt != stateEnd) {
|
||||
auto& state = *stateIt;
|
||||
if (auto d64 = get_if<DecodingState::Decoder64>(&state.decoder);
|
||||
d64 && d64->pos.valid()) {
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Not all 64-bit BSON Column interleaved encoding decoders are exhausted",
|
||||
!((++d64->pos).more()));
|
||||
} else if (auto d128 = get_if<DecodingState::Decoder128>(&state.decoder);
|
||||
d128 && d128->pos.valid()) {
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Not all 128-bit BSON Column interleaved encoding decoders are exhausted",
|
||||
!((++d128->pos).more()));
|
||||
}
|
||||
stateIt++;
|
||||
break;
|
||||
case Op::kExitSubObj:
|
||||
writeSubObjFooter(
|
||||
*_allocator, subObjStack.back(), entry.fieldName, entry.allowEmpty);
|
||||
subObjStack.pop_back();
|
||||
break;
|
||||
}
|
||||
|
||||
// This invalidates 'interleaved' reference, may no longer be dereferenced.
|
||||
Regular& regular = _mode.emplace<Regular>();
|
||||
get<0>(regular.state.decoder).deltaOfDelta = false;
|
||||
regular.state.lastValue = _decompressed;
|
||||
_incrementRegular(regular);
|
||||
return;
|
||||
}
|
||||
|
||||
// There should have been as many interleaved states as scalar fields.
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Too many interleaved states in BSON Column encoding",
|
||||
stateIt == stateEnd);
|
||||
|
||||
// Store built BSONObj in the decompressed list
|
||||
auto [objdata, objsize] = contiguous.done();
|
||||
|
||||
@ -414,6 +363,81 @@ void BSONColumn::Iterator::_incrementInterleaved(Interleaved& interleaved) {
|
||||
_decompressed = obj;
|
||||
}
|
||||
|
||||
bool BSONColumn::Iterator::_processScalar(DecodingState& state, StringData fieldName) {
|
||||
auto allocatorPosition = _allocator->position();
|
||||
BSONElement elem;
|
||||
|
||||
// Load deltas if decoders are setup. nullptr is always used for "current". So even if
|
||||
// we are iterating the second time we are going to allocate new memory. This is a
|
||||
// tradeoff to avoid a decoded list of literals for every state that will only be used
|
||||
// if we iterate multiple times.
|
||||
if (auto d64 = get_if<DecodingState::Decoder64>(&state.decoder);
|
||||
d64 && d64->pos.valid() && (++d64->pos).more()) {
|
||||
elem = state.loadDelta(*_allocator, *d64);
|
||||
} else if (auto d128 = get_if<DecodingState::Decoder128>(&state.decoder);
|
||||
d128 && d128->pos.valid() && (++d128->pos).more()) {
|
||||
elem = state.loadDelta(*_allocator, *d128);
|
||||
} else if (*_control == stdx::to_underlying(BSONType::eoo)) {
|
||||
// Decoders are exhausted and the next control byte was EOO — signal caller to exit
|
||||
// interleaved mode.
|
||||
++_control;
|
||||
return false;
|
||||
} else {
|
||||
// Decoders are exhausted so we need to load the next control byte that by
|
||||
// definition belong to this decoder state as we iterate in the same known order.
|
||||
auto result = state.loadControl(*_allocator, _control, _end);
|
||||
_control += result.size;
|
||||
elem = result.element;
|
||||
|
||||
// If the loaded control byte was a literal it is stored without field name. We need
|
||||
// to create a new BSONElement with the field name added as this is a sub-field in
|
||||
// an object.
|
||||
if (!elem.eoo() && elem.fieldNameStringData() != fieldName) {
|
||||
auto allocatedElem = _allocator->allocate(elem.type(), fieldName, elem.valuesize());
|
||||
memcpy(allocatedElem.value(), elem.value(), elem.valuesize());
|
||||
elem = allocatedElem.element();
|
||||
}
|
||||
}
|
||||
|
||||
if (elem.eoo()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// If the encoded element wasn't stored in the allocator above we need to copy it here
|
||||
// as we're building a full BSONObj.
|
||||
if (_allocator->position() == allocatorPosition) {
|
||||
auto size = elem.size();
|
||||
memcpy(_allocator->allocate(size), elem.rawdata(), size);
|
||||
}
|
||||
state.lastValue = elem;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void BSONColumn::Iterator::_drainAndVerifyDecoders(std::vector<DecodingState>& states,
|
||||
size_t startIdx) {
|
||||
for (size_t i = startIdx; i < states.size(); ++i) {
|
||||
auto& state = states[i];
|
||||
if (auto d64 = get_if<DecodingState::Decoder64>(&state.decoder); d64 && d64->pos.valid()) {
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Not all 64-bit BSON Column interleaved encoding decoders are exhausted",
|
||||
!((++d64->pos).more()));
|
||||
} else if (auto d128 = get_if<DecodingState::Decoder128>(&state.decoder);
|
||||
d128 && d128->pos.valid()) {
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
"Not all 128-bit BSON Column interleaved encoding decoders are exhausted",
|
||||
!((++d128->pos).more()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void BSONColumn::Iterator::_exitInterleavedMode() {
|
||||
Regular& regular = _mode.emplace<Regular>();
|
||||
get<0>(regular.state.decoder).deltaOfDelta = false;
|
||||
regular.state.lastValue = _decompressed;
|
||||
_incrementRegular(regular);
|
||||
}
|
||||
|
||||
void BSONColumn::Iterator::_handleEOO() {
|
||||
++_control;
|
||||
uassert(ErrorCodes::InvalidBSONColumn,
|
||||
@ -764,7 +788,7 @@ bool BSONColumn::contains_forTest(BSONType elementType) const {
|
||||
if (control == stdx::to_underlying(elementType)) {
|
||||
return true;
|
||||
} else if (control == stdx::to_underlying(BSONType::eoo)) {
|
||||
// TODO: check for valid encoding
|
||||
// TODO(SERVER-74926): check for valid encoding
|
||||
// reached end of column
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -37,6 +37,7 @@
|
||||
#include "mongo/bson/column/bson_element_storage.h"
|
||||
#include "mongo/bson/column/bsoncolumn_interleaved.h"
|
||||
#include "mongo/bson/column/bsoncolumn_util.h"
|
||||
#include "mongo/bson/column/interleaved_schema.h"
|
||||
#include "mongo/bson/column/simple8b.h"
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/platform/int128.h"
|
||||
@ -254,12 +255,26 @@ public:
|
||||
|
||||
// Type for root object/reference object. May be Object or Array.
|
||||
BSONType rootType;
|
||||
|
||||
// Schema discovered from the reference object. Populated on first decode.
|
||||
boost::optional<InterleavedSchema> schema;
|
||||
};
|
||||
|
||||
// Advance and verify all decoders in [startIdx, states.size()) are exhausted. Throws on
|
||||
// violation.
|
||||
static void _drainAndVerifyDecoders(std::vector<DecodingState>& states, size_t startIdx);
|
||||
|
||||
// Helpers to increment the iterator in regular and interleaved mode.
|
||||
void _incrementRegular(Regular& regular);
|
||||
void _incrementInterleaved(Interleaved& interleaved);
|
||||
|
||||
// Exit interleaved mode and switch to regular decoding.
|
||||
void _exitInterleavedMode();
|
||||
|
||||
// Process a single scalar field: advance its decoder, load delta or control byte,
|
||||
// copy into allocator if needed. Returns false if EOO encountered (exit interleaved).
|
||||
bool _processScalar(DecodingState& state, StringData fieldName);
|
||||
|
||||
std::variant<Regular, Interleaved> _mode = Regular{};
|
||||
};
|
||||
|
||||
|
||||
64
src/mongo/bson/column/interleaved_schema.cpp
Normal file
64
src/mongo/bson/column/interleaved_schema.cpp
Normal file
@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/bson/column/interleaved_schema.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
InterleavedSchema::InterleavedSchema(const BSONObj& referenceObj, BSONType rootType, bool arrays)
|
||||
: _arrays(arrays) {
|
||||
uint16_t scalarIdx = 0;
|
||||
_discover(referenceObj, ""_sd, rootType, referenceObj.isEmpty(), scalarIdx);
|
||||
_scalarCount = scalarIdx;
|
||||
}
|
||||
|
||||
void InterleavedSchema::_discover(
|
||||
const BSONObj& obj, StringData fieldName, BSONType type, bool allowEmpty, uint16_t& scalarIdx) {
|
||||
_entries.push_back({Op::kEnterSubObj, fieldName, 0, type, allowEmpty});
|
||||
|
||||
for (auto&& elem : obj) {
|
||||
bool isSubObj = _arrays
|
||||
? (elem.type() == BSONType::object || elem.type() == BSONType::array)
|
||||
: (elem.type() == BSONType::object);
|
||||
if (isSubObj) {
|
||||
_discover(elem.Obj(),
|
||||
elem.fieldNameStringData(),
|
||||
elem.type(),
|
||||
elem.Obj().isEmpty(),
|
||||
scalarIdx);
|
||||
} else {
|
||||
_entries.push_back(
|
||||
{Op::kScalar, elem.fieldNameStringData(), scalarIdx++, BSONType::eoo, false});
|
||||
}
|
||||
}
|
||||
|
||||
_entries.push_back({Op::kExitSubObj, fieldName, 0, type, allowEmpty});
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
91
src/mongo/bson/column/interleaved_schema.h
Normal file
91
src/mongo/bson/column/interleaved_schema.h
Normal file
@ -0,0 +1,91 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/bson/bsontypes.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <vector>
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* Represents the fixed field layout of an interleaved BSONColumn section, discovered from the
|
||||
* reference object. The reference object defines the field names, nesting, and types for all
|
||||
* elements in the section. We discover this structure once and decode all subsequent elements
|
||||
* against it.
|
||||
*
|
||||
* Caller must ensure the reference BSONObj outlives this schema — Entry::fieldName points into it.
|
||||
*/
|
||||
class InterleavedSchema {
|
||||
public:
|
||||
enum class Op : uint8_t {
|
||||
kEnterSubObj, // Write subobj header (type + fieldName + null + 4-byte size)
|
||||
kScalar, // Decode next scalar via DecodingState
|
||||
kExitSubObj, // Write EOO + fill in size
|
||||
};
|
||||
|
||||
struct Entry {
|
||||
Op op;
|
||||
StringData fieldName; // Points into referenceObj
|
||||
uint16_t stateIdx; // kScalar: index into decoder states
|
||||
BSONType type; // kEnterSubObj/kExitSubObj: sub-object type
|
||||
bool allowEmpty; // kEnterSubObj/kExitSubObj: whether empty subobj is valid
|
||||
};
|
||||
|
||||
/**
|
||||
* Discovers the schema by walking the reference object. 'arrays' controls whether array
|
||||
* fields are treated as sub-objects (true) or scalars (false).
|
||||
*/
|
||||
InterleavedSchema(const BSONObj& referenceObj, BSONType rootType, bool arrays);
|
||||
|
||||
const std::vector<Entry>& entries() const {
|
||||
return _entries;
|
||||
}
|
||||
|
||||
uint16_t scalarCount() const {
|
||||
return _scalarCount;
|
||||
}
|
||||
|
||||
private:
|
||||
void _discover(const BSONObj& obj,
|
||||
StringData fieldName,
|
||||
BSONType type,
|
||||
bool allowEmpty,
|
||||
uint16_t& scalarIdx);
|
||||
|
||||
std::vector<Entry> _entries;
|
||||
uint16_t _scalarCount = 0;
|
||||
bool _arrays;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
185
src/mongo/bson/column/interleaved_schema_test.cpp
Normal file
185
src/mongo/bson/column/interleaved_schema_test.cpp
Normal file
@ -0,0 +1,185 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/bson/column/interleaved_schema.h"
|
||||
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
using Op = InterleavedSchema::Op;
|
||||
|
||||
TEST(InterleavedSchemaTest, FlatObject) {
|
||||
auto ref = BSON("a" << 1 << "b" << 2);
|
||||
InterleavedSchema schema(ref, BSONType::object, false);
|
||||
|
||||
ASSERT_EQ(schema.scalarCount(), 2);
|
||||
|
||||
auto& e = schema.entries();
|
||||
ASSERT_EQ(e.size(), 4); // Enter, Scalar(a), Scalar(b), Exit
|
||||
|
||||
ASSERT_EQ(e[0].op, Op::kEnterSubObj);
|
||||
ASSERT_EQ(e[0].fieldName, ""_sd);
|
||||
ASSERT_EQ(e[0].type, BSONType::object);
|
||||
|
||||
ASSERT_EQ(e[1].op, Op::kScalar);
|
||||
ASSERT_EQ(e[1].fieldName, "a"_sd);
|
||||
ASSERT_EQ(e[1].stateIdx, 0);
|
||||
|
||||
ASSERT_EQ(e[2].op, Op::kScalar);
|
||||
ASSERT_EQ(e[2].fieldName, "b"_sd);
|
||||
ASSERT_EQ(e[2].stateIdx, 1);
|
||||
|
||||
ASSERT_EQ(e[3].op, Op::kExitSubObj);
|
||||
}
|
||||
|
||||
TEST(InterleavedSchemaTest, NestedObject) {
|
||||
auto ref = BSON("a" << 1 << "obj" << BSON("b" << 2 << "c" << 3));
|
||||
InterleavedSchema schema(ref, BSONType::object, false);
|
||||
|
||||
ASSERT_EQ(schema.scalarCount(), 3);
|
||||
|
||||
auto& e = schema.entries();
|
||||
// Enter(root), Scalar(a,0), Enter(obj), Scalar(b,1), Scalar(c,2), Exit(obj), Exit(root)
|
||||
ASSERT_EQ(e.size(), 7);
|
||||
|
||||
ASSERT_EQ(e[0].op, Op::kEnterSubObj);
|
||||
ASSERT_EQ(e[1].op, Op::kScalar);
|
||||
ASSERT_EQ(e[1].fieldName, "a"_sd);
|
||||
ASSERT_EQ(e[1].stateIdx, 0);
|
||||
|
||||
ASSERT_EQ(e[2].op, Op::kEnterSubObj);
|
||||
ASSERT_EQ(e[2].fieldName, "obj"_sd);
|
||||
ASSERT_EQ(e[2].type, BSONType::object);
|
||||
|
||||
ASSERT_EQ(e[3].op, Op::kScalar);
|
||||
ASSERT_EQ(e[3].stateIdx, 1);
|
||||
ASSERT_EQ(e[4].op, Op::kScalar);
|
||||
ASSERT_EQ(e[4].stateIdx, 2);
|
||||
|
||||
ASSERT_EQ(e[5].op, Op::kExitSubObj);
|
||||
ASSERT_EQ(e[5].fieldName, "obj"_sd);
|
||||
ASSERT_EQ(e[6].op, Op::kExitSubObj);
|
||||
ASSERT_EQ(e[6].fieldName, ""_sd);
|
||||
}
|
||||
|
||||
TEST(InterleavedSchemaTest, DeepNesting) {
|
||||
auto ref = BSON("a" << BSON("b" << BSON("c" << BSON("d" << 1)) << "e" << 2) << "f" << 3);
|
||||
InterleavedSchema schema(ref, BSONType::object, false);
|
||||
|
||||
ASSERT_EQ(schema.scalarCount(), 3);
|
||||
|
||||
// Verify stateIdx is sequential across depth: d=0, e=1, f=2
|
||||
auto& e = schema.entries();
|
||||
int scalarsSeen = 0;
|
||||
for (auto& entry : e) {
|
||||
if (entry.op == Op::kScalar) {
|
||||
ASSERT_EQ(entry.stateIdx, scalarsSeen);
|
||||
scalarsSeen++;
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(scalarsSeen, 3);
|
||||
}
|
||||
|
||||
TEST(InterleavedSchemaTest, EmptySubObject) {
|
||||
auto ref = BSON("obj" << BSONObj());
|
||||
InterleavedSchema schema(ref, BSONType::object, false);
|
||||
|
||||
ASSERT_EQ(schema.scalarCount(), 0);
|
||||
|
||||
auto& e = schema.entries();
|
||||
// Enter(root), Enter(obj), Exit(obj), Exit(root)
|
||||
ASSERT_EQ(e.size(), 4);
|
||||
|
||||
ASSERT_EQ(e[1].op, Op::kEnterSubObj);
|
||||
ASSERT_EQ(e[1].fieldName, "obj"_sd);
|
||||
ASSERT_TRUE(e[1].allowEmpty);
|
||||
|
||||
ASSERT_EQ(e[2].op, Op::kExitSubObj);
|
||||
ASSERT_TRUE(e[2].allowEmpty);
|
||||
}
|
||||
|
||||
TEST(InterleavedSchemaTest, EmptyRootObject) {
|
||||
InterleavedSchema schema(BSONObj(), BSONType::object, false);
|
||||
|
||||
ASSERT_EQ(schema.scalarCount(), 0);
|
||||
|
||||
auto& e = schema.entries();
|
||||
ASSERT_EQ(e.size(), 2); // Enter + Exit
|
||||
ASSERT_EQ(e[0].op, Op::kEnterSubObj);
|
||||
ASSERT_TRUE(e[0].allowEmpty);
|
||||
ASSERT_EQ(e[1].op, Op::kExitSubObj);
|
||||
}
|
||||
|
||||
TEST(InterleavedSchemaTest, ArraysFlagTrue) {
|
||||
auto ref = BSON("a" << 1 << "arr" << BSON_ARRAY(10 << 20));
|
||||
InterleavedSchema schema(ref, BSONType::object, true);
|
||||
|
||||
// With arrays=true, the array should be treated as a sub-object
|
||||
auto& e = schema.entries();
|
||||
bool foundArrayEnter = false;
|
||||
for (auto& entry : e) {
|
||||
if (entry.fieldName == "arr"_sd && entry.op == Op::kEnterSubObj) {
|
||||
foundArrayEnter = true;
|
||||
ASSERT_EQ(entry.type, BSONType::array);
|
||||
}
|
||||
}
|
||||
ASSERT_TRUE(foundArrayEnter);
|
||||
}
|
||||
|
||||
TEST(InterleavedSchemaTest, ArraysFlagFalse) {
|
||||
auto ref = BSON("a" << 1 << "arr" << BSON_ARRAY(10 << 20));
|
||||
InterleavedSchema schema(ref, BSONType::object, false);
|
||||
|
||||
// With arrays=false, the array should be treated as a scalar
|
||||
auto& e = schema.entries();
|
||||
bool foundArrayScalar = false;
|
||||
for (auto& entry : e) {
|
||||
if (entry.fieldName == "arr"_sd) {
|
||||
ASSERT_EQ(entry.op, Op::kScalar);
|
||||
foundArrayScalar = true;
|
||||
}
|
||||
}
|
||||
ASSERT_TRUE(foundArrayScalar);
|
||||
}
|
||||
|
||||
TEST(InterleavedSchemaTest, RootTypeArray) {
|
||||
auto ref = BSON("a" << 1);
|
||||
InterleavedSchema schema(ref, BSONType::array, false);
|
||||
|
||||
auto& e = schema.entries();
|
||||
ASSERT_EQ(e[0].op, Op::kEnterSubObj);
|
||||
ASSERT_EQ(e[0].type, BSONType::array);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
@ -463,6 +463,18 @@ mongo_cc_benchmark(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_benchmark(
|
||||
name = "sbe_ts_block_bm",
|
||||
srcs = [
|
||||
"ts_block_bm.cpp",
|
||||
],
|
||||
deps = [
|
||||
":query_sbe",
|
||||
"//src/mongo:base",
|
||||
"//src/mongo/db/timeseries:bucket_compression",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_benchmark(
|
||||
name = "sbe_hash_lookup_bm",
|
||||
srcs = [
|
||||
|
||||
216
src/mongo/db/exec/sbe/ts_block_bm.cpp
Normal file
216
src/mongo/db/exec/sbe/ts_block_bm.cpp
Normal file
@ -0,0 +1,216 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/exec/sbe/values/ts_block.h"
|
||||
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/bson/column/bsoncolumn.h"
|
||||
#include "mongo/db/exec/sbe/values/bson.h"
|
||||
#include "mongo/db/timeseries/bucket_compression.h"
|
||||
#include "mongo/db/timeseries/timeseries_constants.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <random>
|
||||
#include <vector>
|
||||
|
||||
#include <benchmark/benchmark.h>
|
||||
|
||||
namespace mongo::sbe {
|
||||
namespace {
|
||||
|
||||
constexpr int kMeasurementsPerBucket = 1000;
|
||||
|
||||
// Generates a v1 (uncompressed) timeseries bucket matching the MatchGroupSort genny workload data
|
||||
// shape: timestamps incrementing by 100ms, nested obj with measurement1 (int 0-10),
|
||||
// measurement2 (double 0-100), measurement3 (double 0-1000).
|
||||
BSONObj generateUncompressedBucket(int numMeasurements, std::mt19937& gen) {
|
||||
std::uniform_int_distribution<int> intDist(0, 10);
|
||||
std::uniform_real_distribution<double> doubleDist2(0.0, 100.0);
|
||||
std::uniform_real_distribution<double> doubleDist3(0.0, 1000.0);
|
||||
|
||||
auto startTime = Date_t::fromMillisSinceEpoch(1640995200000LL); // 2022-01-01
|
||||
|
||||
BSONObjBuilder bucket;
|
||||
bucket.append("_id", OID::gen());
|
||||
|
||||
// Control section.
|
||||
{
|
||||
BSONObjBuilder control(bucket.subobjStart("control"));
|
||||
control.append("version", 1);
|
||||
|
||||
// Min/max for time.
|
||||
{
|
||||
BSONObjBuilder minBuilder(control.subobjStart("min"));
|
||||
minBuilder.append("time", startTime);
|
||||
minBuilder.append(
|
||||
"obj", BSON("measurement1" << 0 << "measurement2" << 0.0 << "measurement3" << 0.0));
|
||||
}
|
||||
{
|
||||
BSONObjBuilder maxBuilder(control.subobjStart("max"));
|
||||
maxBuilder.append("time", startTime + Milliseconds(100 * (numMeasurements - 1)));
|
||||
maxBuilder.append(
|
||||
"obj",
|
||||
BSON("measurement1" << 10 << "measurement2" << 100.0 << "measurement3" << 1000.0));
|
||||
}
|
||||
}
|
||||
|
||||
bucket.append("meta", "AAAAAA");
|
||||
|
||||
// Data section (v1 format: field -> {0: val, 1: val, ...}).
|
||||
{
|
||||
BSONObjBuilder data(bucket.subobjStart("data"));
|
||||
|
||||
// Time field.
|
||||
{
|
||||
BSONObjBuilder timeBuilder(data.subobjStart("time"));
|
||||
for (int i = 0; i < numMeasurements; ++i) {
|
||||
timeBuilder.append(std::to_string(i), startTime + Milliseconds(100 * i));
|
||||
}
|
||||
}
|
||||
|
||||
// Nested obj field with measurement1/2/3.
|
||||
{
|
||||
BSONObjBuilder objBuilder(data.subobjStart("obj"));
|
||||
for (int i = 0; i < numMeasurements; ++i) {
|
||||
objBuilder.append(std::to_string(i),
|
||||
BSON("measurement1" << intDist(gen) << "measurement2"
|
||||
<< doubleDist2(gen) << "measurement3"
|
||||
<< doubleDist3(gen)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return bucket.obj();
|
||||
}
|
||||
|
||||
// Compress a v1 bucket into v2 format.
|
||||
BSONObj compressBucket(const BSONObj& uncompressed) {
|
||||
auto result = timeseries::compressBucket(uncompressed, "time"_sd, {}, false);
|
||||
invariant(result.compressedBucket);
|
||||
return *result.compressedBucket;
|
||||
}
|
||||
|
||||
int getBucketVersion(const BSONObj& bucket) {
|
||||
return bucket[timeseries::kBucketControlFieldName].embeddedObject().getIntField(
|
||||
timeseries::kBucketControlVersionFieldName);
|
||||
}
|
||||
|
||||
// Construct a TsBlock from a bucket for a given field. The bucket must stay alive.
|
||||
std::unique_ptr<value::TsBlock> makeTsBlock(const BSONObj& bucket,
|
||||
StringData fieldName,
|
||||
int count) {
|
||||
auto bucketElem = bucket["data"][fieldName];
|
||||
auto [columnTag, columnVal] = bson::convertFrom<true>(bucketElem);
|
||||
|
||||
auto minElem = bucket["control"]["min"][fieldName];
|
||||
auto maxElem = bucket["control"]["max"][fieldName];
|
||||
|
||||
auto min = minElem ? bson::convertFrom<true>(minElem)
|
||||
: std::pair{value::TypeTags::Nothing, value::Value{0u}};
|
||||
auto max = maxElem ? bson::convertFrom<true>(maxElem)
|
||||
: std::pair{value::TypeTags::Nothing, value::Value{0u}};
|
||||
|
||||
return std::make_unique<value::TsBlock>(count,
|
||||
false /* owned */,
|
||||
columnTag,
|
||||
columnVal,
|
||||
getBucketVersion(bucket),
|
||||
fieldName == "time",
|
||||
min,
|
||||
max);
|
||||
}
|
||||
|
||||
class TsBlockDeblockFixture : public benchmark::Fixture {
|
||||
public:
|
||||
void SetUp(benchmark::State&) override {
|
||||
std::mt19937 gen(42);
|
||||
_uncompressedBucket = generateUncompressedBucket(kMeasurementsPerBucket, gen);
|
||||
_compressedBucket = compressBucket(_uncompressedBucket);
|
||||
}
|
||||
|
||||
const BSONObj& uncompressedBucket() const {
|
||||
return _uncompressedBucket;
|
||||
}
|
||||
|
||||
const BSONObj& compressedBucket() const {
|
||||
return _compressedBucket;
|
||||
}
|
||||
|
||||
private:
|
||||
BSONObj _uncompressedBucket;
|
||||
BSONObj _compressedBucket;
|
||||
};
|
||||
|
||||
// Scalar fast path: timestamp column (BSONColumn, no arrays/objects).
|
||||
BENCHMARK_DEFINE_F(TsBlockDeblockFixture, BM_DeblockTimestampColumn)(benchmark::State& state) {
|
||||
uint64_t totalElements = 0;
|
||||
for (auto _ : state) {
|
||||
auto tsBlock = makeTsBlock(compressedBucket(), "time", kMeasurementsPerBucket);
|
||||
boost::optional<value::DeblockedTagValStorage> storage;
|
||||
benchmark::DoNotOptimize(tsBlock->deblock(storage));
|
||||
totalElements += kMeasurementsPerBucket;
|
||||
benchmark::ClobberMemory();
|
||||
}
|
||||
state.SetItemsProcessed(totalElements);
|
||||
}
|
||||
|
||||
// Interleaved object slow path: nested obj column (BSONColumn with objects).
|
||||
BENCHMARK_DEFINE_F(TsBlockDeblockFixture, BM_DeblockInterleavedObjectColumn)
|
||||
(benchmark::State& state) {
|
||||
uint64_t totalElements = 0;
|
||||
for (auto _ : state) {
|
||||
auto tsBlock = makeTsBlock(compressedBucket(), "obj", kMeasurementsPerBucket);
|
||||
boost::optional<value::DeblockedTagValStorage> storage;
|
||||
benchmark::DoNotOptimize(tsBlock->deblock(storage));
|
||||
totalElements += kMeasurementsPerBucket;
|
||||
benchmark::ClobberMemory();
|
||||
}
|
||||
state.SetItemsProcessed(totalElements);
|
||||
}
|
||||
|
||||
// V1 uncompressed path: BSONObj-based deblock.
|
||||
BENCHMARK_DEFINE_F(TsBlockDeblockFixture, BM_DeblockBsonObjColumn)(benchmark::State& state) {
|
||||
uint64_t totalElements = 0;
|
||||
for (auto _ : state) {
|
||||
auto tsBlock = makeTsBlock(uncompressedBucket(), "time", kMeasurementsPerBucket);
|
||||
boost::optional<value::DeblockedTagValStorage> storage;
|
||||
benchmark::DoNotOptimize(tsBlock->deblock(storage));
|
||||
totalElements += kMeasurementsPerBucket;
|
||||
benchmark::ClobberMemory();
|
||||
}
|
||||
state.SetItemsProcessed(totalElements);
|
||||
}
|
||||
|
||||
BENCHMARK_REGISTER_F(TsBlockDeblockFixture, BM_DeblockTimestampColumn);
|
||||
BENCHMARK_REGISTER_F(TsBlockDeblockFixture, BM_DeblockInterleavedObjectColumn);
|
||||
BENCHMARK_REGISTER_F(TsBlockDeblockFixture, BM_DeblockBsonObjColumn);
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo::sbe
|
||||
Loading…
Reference in New Issue
Block a user