SERVER-108052 Set sorting metadata for bounded sorts [8.2] (#39265)
Co-authored-by: Lee Maguire lee.maguire@mongodb.com GitOrigin-RevId: 6cece96d93eb2e5c0b110c2f7bfbb74820b7c47e
This commit is contained in:
parent
a8b0cb11c0
commit
c2c5491b61
@ -0,0 +1,32 @@
|
||||
/**
|
||||
* Test that the $rank window function can use a bounded sort with the correct sorting metadata.
|
||||
*
|
||||
* @tags: [requires_fcv_82]
|
||||
*/
|
||||
const tsCollName = jsTestName() + "_ts_coll";
|
||||
const tsColl = db.getCollection(tsCollName);
|
||||
tsColl.drop();
|
||||
|
||||
const document = {
|
||||
metadata: {
|
||||
a: 1,
|
||||
b: 2,
|
||||
},
|
||||
time: new Date(1737331200000), // Mon Jan 21 2025
|
||||
};
|
||||
assert.commandWorked(tsColl.insert(document));
|
||||
|
||||
const indexForBoundedSort = {
|
||||
"metadata.a": 1,
|
||||
"time": -1
|
||||
};
|
||||
assert.commandWorked(tsColl.createIndex(indexForBoundedSort));
|
||||
|
||||
const boundedSortPipeline = [
|
||||
{$setWindowFields: {sortBy: {"time": 1}, output: {rank: {$documentNumber: {}}}}},
|
||||
];
|
||||
|
||||
assert.commandWorked(tsColl.runCommand("aggregate", {
|
||||
pipeline: boundedSortPipeline,
|
||||
cursor: {},
|
||||
}));
|
||||
@ -515,6 +515,7 @@ boost::intrusive_ptr<DocumentSourceSort> DocumentSourceSort::createBoundedSort(
|
||||
StringData boundBase,
|
||||
long long boundOffset,
|
||||
boost::optional<long long> limit,
|
||||
bool outputSortKeyMetadata,
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
|
||||
|
||||
auto ds = DocumentSourceSort::create(expCtx, pat);
|
||||
@ -553,6 +554,8 @@ boost::intrusive_ptr<DocumentSourceSort> DocumentSourceSort::createBoundedSort(
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
|
||||
ds->_outputSortKeyMetadata = outputSortKeyMetadata;
|
||||
|
||||
if (pat.size() > 1) {
|
||||
SortPattern partitionKey =
|
||||
std::vector<SortPattern::SortPatternPart>(pat.begin(), pat.end() - 1);
|
||||
|
||||
@ -133,11 +133,14 @@ public:
|
||||
return create(expCtx, {sortOrder, expCtx}, kDefaultOptions);
|
||||
}
|
||||
|
||||
// TODO SERVER-108133 Consider passing in SortStageOptions instead of limit and
|
||||
// outputSortKeyMetadata.
|
||||
static boost::intrusive_ptr<DocumentSourceSort> createBoundedSort(
|
||||
SortPattern pat,
|
||||
StringData boundBase,
|
||||
long long boundOffset,
|
||||
boost::optional<long long> limit,
|
||||
bool outputSortKeyMetadata,
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx);
|
||||
|
||||
/**
|
||||
|
||||
@ -677,8 +677,12 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToReportSpillingStatsInBound
|
||||
auto expCtx = getExpCtx();
|
||||
expCtx->setTempDir(tempDir.path());
|
||||
expCtx->setAllowDiskUse(true);
|
||||
auto sort = DocumentSourceSort::createBoundedSort(
|
||||
{BSON("time" << 1), expCtx}, "min", -1, boost::none /*limit*/, expCtx);
|
||||
auto sort = DocumentSourceSort::createBoundedSort({BSON("time" << 1), expCtx},
|
||||
"min",
|
||||
-1,
|
||||
boost::none /*limit*/,
|
||||
false /*outputSortKeyMetadata*/,
|
||||
expCtx);
|
||||
|
||||
std::string largeStr(1024, 'x');
|
||||
|
||||
@ -724,6 +728,66 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToReportSpillingStatsInBound
|
||||
ASSERT_GT(sortStats->spillingStats.getSpilledDataStorageSize(), 0);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceSortExecutionTest,
|
||||
ShouldBeAbleToReportSpillingStatsInBoundedSortWithSortKeyMetadata) {
|
||||
RAIIServerParameterControllerForTest sortMemoryLimit{
|
||||
"internalQueryMaxBlockingSortMemoryUsageBytes", 3 * 1024};
|
||||
|
||||
unittest::TempDir tempDir("DocumentSourceSortTest");
|
||||
auto expCtx = getExpCtx();
|
||||
expCtx->setTempDir(tempDir.path());
|
||||
expCtx->setAllowDiskUse(true);
|
||||
auto sort = DocumentSourceSort::createBoundedSort({BSON("time" << 1), expCtx},
|
||||
"min",
|
||||
-1,
|
||||
boost::none /*limit*/,
|
||||
true /*outputSortKeyMetadata*/,
|
||||
expCtx);
|
||||
|
||||
std::string largeStr(1024, 'x');
|
||||
|
||||
std::vector<Document> data = {
|
||||
Document{{"time", Date_t::fromMillisSinceEpoch(1)}, {"largeStr", largeStr}},
|
||||
Document{{"time", Date_t::fromMillisSinceEpoch(0)}, {"largeStr", largeStr}},
|
||||
Document{{"time", Date_t::fromMillisSinceEpoch(2)}, {"largeStr", largeStr}},
|
||||
Document{{"time", Date_t::fromMillisSinceEpoch(3)}, {"largeStr", largeStr}}};
|
||||
for (auto& doc : data) {
|
||||
MutableDocument mdoc{doc};
|
||||
DocumentMetadataFields metadata;
|
||||
metadata.setTimeseriesBucketMinTime(doc.getField("time").getDate());
|
||||
mdoc.setMetadata(std::move(metadata));
|
||||
doc = mdoc.freeze();
|
||||
}
|
||||
|
||||
auto mock = DocumentSourceMock::createForTest(std::move(data), expCtx);
|
||||
sort->setSource(mock.get());
|
||||
|
||||
auto next = sort->getNext();
|
||||
ASSERT_TRUE(next.isAdvanced());
|
||||
ASSERT_VALUE_EQ(next.releaseDocument()["time"], Value(Date_t::fromMillisSinceEpoch(0)));
|
||||
|
||||
next = sort->getNext();
|
||||
ASSERT_TRUE(next.isAdvanced());
|
||||
ASSERT_VALUE_EQ(next.releaseDocument()["time"], Value(Date_t::fromMillisSinceEpoch(1)));
|
||||
|
||||
next = sort->getNext();
|
||||
ASSERT_TRUE(next.isAdvanced());
|
||||
ASSERT_VALUE_EQ(next.releaseDocument()["time"], Value(Date_t::fromMillisSinceEpoch(2)));
|
||||
|
||||
next = sort->getNext();
|
||||
ASSERT_TRUE(next.isAdvanced());
|
||||
ASSERT_VALUE_EQ(next.releaseDocument()["time"], Value(Date_t::fromMillisSinceEpoch(3)));
|
||||
|
||||
ASSERT_TRUE(sort->getNext().isEOF());
|
||||
|
||||
const auto* sortStats = static_cast<const SortStats*>(sort->getSpecificStats());
|
||||
ASSERT_EQ(sortStats->spillingStats.getSpills(), 2);
|
||||
ASSERT_EQ(sortStats->spillingStats.getSpilledRecords(), 4);
|
||||
ASSERT_EQ(sortStats->spillingStats.getSpilledBytes(), 4340);
|
||||
ASSERT_LT(sortStats->spillingStats.getSpilledDataStorageSize(), 1024);
|
||||
ASSERT_GT(sortStats->spillingStats.getSpilledDataStorageSize(), 0);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceSortExecutionTest,
|
||||
ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) {
|
||||
auto expCtx = getExpCtx();
|
||||
@ -771,7 +835,83 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPa
|
||||
TEST_F(DocumentSourceSortTest, Redaction) {
|
||||
createSort(BSON("a" << 1));
|
||||
auto boundedSort = DocumentSourceSort::createBoundedSort(
|
||||
sort()->getSortKeyPattern(), DocumentSourceSort::kMin, 1337, 10, getExpCtx());
|
||||
sort()->getSortKeyPattern(), DocumentSourceSort::kMin, 1337, 10, false, getExpCtx());
|
||||
|
||||
ASSERT_BSONOBJ_EQ_AUTO( //
|
||||
R"({"$sort":{"HASH<a>":1}})",
|
||||
redact(*sort(), true));
|
||||
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
|
||||
R"({
|
||||
"$_internalBoundedSort": {
|
||||
"sortKey": {
|
||||
"HASH<a>": 1
|
||||
},
|
||||
"bound": {
|
||||
"base": "min",
|
||||
"offsetSeconds": "?number"
|
||||
},
|
||||
"limit": "?number"
|
||||
}
|
||||
})",
|
||||
redact(*boundedSort, true));
|
||||
|
||||
ASSERT_BSONOBJ_EQ_AUTO( //
|
||||
R"({"$sort":{"sortKey":{"HASH<a>":1}}})",
|
||||
redact(*sort(), true, ExplainOptions::Verbosity::kQueryPlanner));
|
||||
|
||||
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
|
||||
R"({
|
||||
"$_internalBoundedSort": {
|
||||
"sortKey": {
|
||||
"HASH<a>": 1
|
||||
},
|
||||
"bound": {
|
||||
"base": "min",
|
||||
"offsetSeconds": "?number"
|
||||
},
|
||||
"limit": "?number"
|
||||
}
|
||||
})",
|
||||
redact(*boundedSort, true, ExplainOptions::Verbosity::kQueryPlanner));
|
||||
|
||||
ASSERT_BSONOBJ_EQ_AUTO( //
|
||||
R"({
|
||||
"$sort": {
|
||||
"sortKey": {
|
||||
"HASH<a>": 1
|
||||
}
|
||||
},
|
||||
"totalDataSizeSortedBytesEstimate": "?number",
|
||||
"usedDisk": "?bool",
|
||||
"spills": "?number",
|
||||
"spilledDataStorageSize": "?number"
|
||||
})",
|
||||
redact(*sort(), true, ExplainOptions::Verbosity::kExecStats));
|
||||
|
||||
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
|
||||
R"({
|
||||
"$_internalBoundedSort": {
|
||||
"sortKey": {
|
||||
"HASH<a>": 1
|
||||
},
|
||||
"bound": {
|
||||
"base": "min",
|
||||
"offsetSeconds": "?number"
|
||||
},
|
||||
"limit": "?number"
|
||||
},
|
||||
"totalDataSizeSortedBytesEstimate": "?number",
|
||||
"usedDisk": "?bool",
|
||||
"spills": "?number",
|
||||
"spilledDataStorageSize": "?number"
|
||||
})",
|
||||
redact(*boundedSort, true, ExplainOptions::Verbosity::kExecStats));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceSortTest, RedactionWithSortKeyMetadata) {
|
||||
createSort(BSON("a" << 1));
|
||||
auto boundedSort = DocumentSourceSort::createBoundedSort(
|
||||
sort()->getSortKeyPattern(), DocumentSourceSort::kMin, 1337, 10, true, getExpCtx());
|
||||
|
||||
ASSERT_BSONOBJ_EQ_AUTO( //
|
||||
R"({"$sort":{"HASH<a>":1}})",
|
||||
|
||||
@ -2035,6 +2035,7 @@ void PipelineD::performBoundedSortOptimization(PlanStage* rootStage,
|
||||
: DocumentSourceSort::kMax),
|
||||
0,
|
||||
sort->getLimit(),
|
||||
sort->shouldSetSortKeyMetadata(),
|
||||
expCtx));
|
||||
} else {
|
||||
// Since the sortPattern and the direction of the index don't agree we must use the
|
||||
@ -2050,6 +2051,7 @@ void PipelineD::performBoundedSortOptimization(PlanStage* rootStage,
|
||||
: -unpack->getBucketMaxSpanSeconds()) *
|
||||
1000,
|
||||
sort->getLimit(),
|
||||
sort->shouldSetSortKeyMetadata(),
|
||||
expCtx));
|
||||
|
||||
/**
|
||||
|
||||
Loading…
Reference in New Issue
Block a user