SERVER-116461 Propagate deadline timestamp to extensions (#48716)

GitOrigin-RevId: 73bb84c9e3d63541d2710dd1dceb476f02bdd346
This commit is contained in:
Santiago Roche 2026-03-05 20:13:30 -05:00 committed by MongoDB Bot
parent f0016fd637
commit 40bd5f34d8
11 changed files with 427 additions and 1 deletions

View File

@ -0,0 +1,139 @@
/**
* Tests that the operation deadline is correctly carried forward and refreshed
* during getMore commands.
*
* Uses the $assertDeadlineIncreaseAfterBatch extension stage which verifies that:
* - Within a batch, the deadline does not change between getNext() calls.
* - At the batch boundary (when a getMore is issued), the deadline increases.
*
* @tags: [featureFlagExtensionsAPI]
*/
import {checkPlatformCompatibleWithExtensions, withExtensions} from "jstests/noPassthrough/libs/extension_helpers.js";
checkPlatformCompatibleWithExtensions();
const kExtensionLib = "libassert_deadline_increased_after_batch_extension.so";
const NUM_DOCS = 10;
function setupCollection(conn, db, coll) {
if (conn.isMongos()) {
// If were on mongos, set up sharding for this collection. Distribute documents across the two shards to ensure we don't target a single shard during execution.
const res = db.adminCommand({listShards: 1});
assert.commandWorked(res);
const shardIds = res.shards.map((s) => s._id);
assert.commandWorked(db.adminCommand({enableSharding: db.getName()}));
assert.commandWorked(coll.createIndex({x: 1}));
assert.commandWorked(db.adminCommand({shardCollection: coll.getFullName(), key: {x: 1}}));
assert.commandWorked(db.adminCommand({split: coll.getFullName(), middle: {x: Math.floor(NUM_DOCS / 2)}}));
assert(shardIds.length > 1);
assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 2}, to: shardIds[0]}));
assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 7}, to: shardIds[1]}));
}
const bulkDocs = [];
for (let i = 0; i < NUM_DOCS; i++) {
bulkDocs.push({_id: i, x: i});
}
coll.insertMany(bulkDocs);
}
function runDeadlineTests(conn, _shardingTest) {
const db = conn.getDB("test");
const collName = jsTestName();
const coll = db[collName];
coll.drop();
setupCollection(conn, db, coll);
const sleepTimeBetweenGetMore = 50;
const maxTimeMs = 5 * 1000;
// Test 1: With a batchSize of 3 and maxTimeMS (5 seconds), iterating through multiple getMore
// batches should succeed because the deadline is refreshed at each batch boundary.
(function testDeadlineRefreshedAcrossGetMores() {
jsTest.log("Test: deadline is refreshed across getMore batches");
const batchSize = 3;
const result = db.runCommand({
aggregate: collName,
pipeline: [{$sort: {x: 1}}, {$assertDeadlineIncreaseAfterBatch: {batchSize: batchSize}}],
cursor: {batchSize: batchSize},
maxTimeMS: maxTimeMs,
});
assert.commandWorked(result);
let cursorId = result.cursor.id;
// The initial batch should have returned 'batchSize' documents.
assert.eq(result.cursor.firstBatch.length, batchSize);
// Sanity check, sleep for maxTimeMs, query succeeds since deadline is updated each getMore.
sleep(maxTimeMs);
// Iterate through the remaining documents via getMore.
let totalDocs = result.cursor.firstBatch.length;
while (cursorId != 0) {
// Sleep to give the ensure the deadline always moves forward.
sleep(sleepTimeBetweenGetMore);
const getMoreResult = assert.commandWorked(
db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}),
);
totalDocs += getMoreResult.cursor.nextBatch.length;
cursorId = getMoreResult.cursor.id;
}
assert.eq(totalDocs, NUM_DOCS, "Expected all documents to be returned");
})();
// Test 2: Using the shell cursor helper with maxTimeMS and batchSize to verify
// deadline propagation across multiple getMore operations.
(function testDeadlineWithCursorHelper() {
jsTest.log("Test: deadline propagation via shell cursor helper");
const batchSize = 3;
const cursor = coll.aggregate([{$sort: {x: 1}}, {$assertDeadlineIncreaseAfterBatch: {batchSize: batchSize}}], {
cursor: {batchSize: batchSize},
maxTimeMS: maxTimeMs,
});
// Exhaust the cursor; if the deadline is not refreshed correctly,
// the extension stage will trigger a uassert.
let count = 0;
while (cursor.hasNext()) {
sleep(sleepTimeBetweenGetMore);
cursor.next();
count++;
}
assert.eq(count, NUM_DOCS, "Expected all documents to be returned");
})();
// Test 3: Multiple batch boundaries with a small batchSize to ensure
// deadline refresh happens at each getMore, not just the first.
(function testMultipleBatchBoundaries() {
jsTest.log("Test: multiple batch boundaries with batchSize=1");
const batchSize = 1;
const result = db.runCommand({
aggregate: collName,
pipeline: [{$sort: {x: 1}}, {$assertDeadlineIncreaseAfterBatch: {batchSize: batchSize}}],
cursor: {batchSize: batchSize},
maxTimeMS: maxTimeMs,
});
assert.commandWorked(result);
let cursorId = result.cursor.id;
let totalDocs = result.cursor.firstBatch.length;
// Issue multiple getMore commands, each fetching 1 document.
while (cursorId != 0) {
sleep(sleepTimeBetweenGetMore);
const getMoreResult = assert.commandWorked(
db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}),
);
totalDocs += getMoreResult.cursor.nextBatch.length;
cursorId = getMoreResult.cursor.id;
}
assert.eq(totalDocs, NUM_DOCS, "Expected all documents to be returned");
})();
}
withExtensions({[kExtensionLib]: {}}, runDeadlineTests, ["sharded", "standalone"], {shards: 2});

View File

@ -62,6 +62,10 @@ public:
return opDebugMetrics.getOrCreateMetrics(stageName, execStage);
}
int64_t getDeadlineTimestampMs() const override {
return _ctx->getOperationContext()->getDeadline().asInt64();
}
private:
const ExpressionContext* _ctx;
};

View File

@ -60,4 +60,11 @@ MongoExtensionStatus* QueryExecutionContextAdapter::_extGetMetrics(
});
}
MongoExtensionStatus* QueryExecutionContextAdapter::_extGetDeadlineTimestampMs(
const MongoExtensionQueryExecutionContext* ctx, int64_t* deadlineTimestampMs) noexcept {
return wrapCXXAndConvertExceptionToStatus([&]() {
const auto& execCtx = static_cast<const QueryExecutionContextAdapter*>(ctx)->getCtxImpl();
*deadlineTimestampMs = execCtx.getDeadlineTimestampMs();
});
}
} // namespace mongo::extension::host_connector

View File

@ -57,6 +57,7 @@ public:
virtual Status checkForInterrupt() const = 0;
virtual UnownedOperationMetricsHandle getMetrics(
const std::string& stageName, const UnownedExecAggStageHandle& execStage) const = 0;
virtual int64_t getDeadlineTimestampMs() const = 0;
};
/**
@ -79,6 +80,10 @@ public:
return *_ctx;
}
static ::MongoExtensionQueryExecutionContextVTable getVTable() {
return VTABLE;
}
private:
static MongoExtensionStatus* _extCheckForInterrupt(
const MongoExtensionQueryExecutionContext* ctx, MongoExtensionStatus* queryStatus) noexcept;
@ -87,8 +92,13 @@ private:
MongoExtensionExecAggStage* execAggStage,
MongoExtensionOperationMetrics** metrics) noexcept;
static MongoExtensionStatus* _extGetDeadlineTimestampMs(
const MongoExtensionQueryExecutionContext* ctx, int64_t* deadlineTimestampMs) noexcept;
static constexpr ::MongoExtensionQueryExecutionContextVTable VTABLE = {
.check_for_interrupt = &_extCheckForInterrupt, .get_metrics = &_extGetMetrics};
.check_for_interrupt = &_extCheckForInterrupt,
.get_metrics = &_extGetMetrics,
.get_deadline_timestamp_ms = &_extGetDeadlineTimestampMs};
std::unique_ptr<QueryExecutionContextBase> _ctx;
};

View File

@ -942,6 +942,13 @@ typedef struct MongoExtensionQueryExecutionContextVTable {
MongoExtensionStatus* (*get_metrics)(const MongoExtensionQueryExecutionContext* ctx,
MongoExtensionExecAggStage* execAggStage,
MongoExtensionOperationMetrics** metrics);
/**
* Returns the deadline UNIX timestamp expressed in milliseconds. If the operation has no
* deadline, returns the maximum representable int64_t value.
*/
MongoExtensionStatus* (*get_deadline_timestamp_ms)(
const MongoExtensionQueryExecutionContext* ctx, int64_t* deadlineTimestampMs);
} MongoExtensionQueryExecutionContextVTable;
////////////////////////////////////////////////////////////////

View File

@ -50,4 +50,10 @@ UnownedOperationMetricsHandle QueryExecutionContextAPI::getMetrics(
return UnownedOperationMetricsHandle(metrics);
}
int64_t QueryExecutionContextAPI::getDeadlineTimestampMs() const {
int64_t deadlineTimestampMs{0};
invokeCAndConvertStatusToException(
[&]() { return _vtable().get_deadline_timestamp_ms(get(), &deadlineTimestampMs); });
return deadlineTimestampMs;
}
} // namespace mongo::extension::sdk

View File

@ -60,6 +60,8 @@ public:
UnownedOperationMetricsHandle getMetrics(MongoExtensionExecAggStage* execStage) const;
int64_t getDeadlineTimestampMs() const;
static void assertVTableConstraints(const VTable_t& vtable) {
sdk_tassert(11098300,
"QueryExecutionContext' 'check_for_interrupt' is null",
@ -67,6 +69,9 @@ public:
sdk_tassert(11213507,
"QueryExecutionContext' 'get_metrics' is null",
vtable.get_metrics != nullptr);
sdk_tassert(11646100,
"QueryExecutionContext' 'get_deadline_timestamp_ms' is null",
vtable.get_deadline_timestamp_ms != nullptr);
};
};

View File

@ -29,10 +29,12 @@
#include "mongo/db/extension/host/query_execution_context.h"
#include "mongo/db/extension/host_connector/adapter/host_services_adapter.h"
#include "mongo/db/extension/host_connector/adapter/query_execution_context_adapter.h"
#include "mongo/db/extension/sdk/query_execution_context_handle.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include <string>
@ -90,5 +92,56 @@ TEST_F(QueryExecutionContextTestFixture, CheckForInterruptCustomKillCode) {
ExtensionGenericStatus(customKillCode, "operation was interrupted"));
}
TEST_F(QueryExecutionContextTestFixture, CheckForDeadline) {
_opCtx->setDeadlineAfterNowBy(Seconds{5}, ErrorCodes::ExceededTimeLimit);
const auto deadlineTimestampMs = _opCtx->getDeadline().asInt64();
std::unique_ptr<host::QueryExecutionContext> wrappedCtx =
std::make_unique<host::QueryExecutionContext>(_expCtx.get());
host_connector::QueryExecutionContextAdapter adapter(std::move(wrappedCtx));
sdk::QueryExecutionContextHandle handle(&adapter);
ASSERT_EQ(handle->getDeadlineTimestampMs(), deadlineTimestampMs);
}
TEST_F(QueryExecutionContextTestFixture, GetDeadlineTimestampMsWhenNoDeadline) {
// Do not set a deadline; OperationContext defaults to Date_t::max() (no deadline).
const auto deadlineTimestampMs = _opCtx->getDeadline().asInt64();
std::unique_ptr<host::QueryExecutionContext> wrappedCtx =
std::make_unique<host::QueryExecutionContext>(_expCtx.get());
host_connector::QueryExecutionContextAdapter adapter(std::move(wrappedCtx));
sdk::QueryExecutionContextHandle handle(&adapter);
ASSERT_EQ(handle->getDeadlineTimestampMs(), deadlineTimestampMs);
ASSERT_EQ(handle->getDeadlineTimestampMs(), std::numeric_limits<int64_t>::max());
}
class QueryExecutionContextVTableDeathTest : public unittest::Test {
public:
void setUp() override {
// Initialize HostServices so that aggregation stages will be able to access member
// functions, e.g. to run assertions.
extension::sdk::HostServicesAPI::setHostServices(
&extension::host_connector::HostServicesAdapter::get());
}
};
DEATH_TEST_F(QueryExecutionContextVTableDeathTest, InvalidCheckForInterrupt, "11098300") {
auto vtable = mongo::extension::host_connector::QueryExecutionContextAdapter::getVTable();
vtable.check_for_interrupt = nullptr;
sdk::QueryExecutionContextAPI::assertVTableConstraints(vtable);
};
DEATH_TEST_F(QueryExecutionContextVTableDeathTest, InvalidGetMetrics, "11213507") {
auto vtable = mongo::extension::host_connector::QueryExecutionContextAdapter::getVTable();
vtable.get_metrics = nullptr;
sdk::QueryExecutionContextAPI::assertVTableConstraints(vtable);
};
DEATH_TEST_F(QueryExecutionContextVTableDeathTest, InvalidGetDeadlineTimestampMs, "11646100") {
auto vtable = mongo::extension::host_connector::QueryExecutionContextAdapter::getVTable();
vtable.get_deadline_timestamp_ms = nullptr;
sdk::QueryExecutionContextAPI::assertVTableConstraints(vtable);
};
} // namespace
} // namespace mongo::extension

View File

@ -952,6 +952,10 @@ public:
const std::string& stageName, const UnownedExecAggStageHandle& execStage) const override {
return UnownedOperationMetricsHandle{nullptr};
}
int64_t getDeadlineTimestampMs() const override {
return std::numeric_limits<int64_t>::max();
}
};
static constexpr std::string_view kMergeOnlyDPLStageName = "$mergeOnlyDPL";

View File

@ -66,6 +66,7 @@ extensions_with_config(
":duplicate_stage_descriptor_bad_extension_signed_lib",
":foo_extension_v2_signed_lib",
":vector_search_extension_signed_lib",
":assert_deadline_increased_after_batch_extension_signed_lib",
],
)
@ -289,6 +290,13 @@ signed_mongo_cc_extension_shared_library(
srcs = ["vector_search.cpp"],
)
# "assert_deadline_increased_after_batch" is used to test that the operation deadline
# is correctly refreshed when issuing a getMore command.
signed_mongo_cc_extension_shared_library(
name = "assert_deadline_increased_after_batch_extension",
srcs = ["assert_deadline_increased_after_batch.cpp"],
)
signed_mongo_cc_extension_shared_library(
name = "mongothost_extension",
srcs = [pkg_name + "extension_options:mongothost.cpp"],

View File

@ -0,0 +1,183 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/extension/sdk/aggregation_stage.h"
#include "mongo/db/extension/sdk/distributed_plan_logic.h"
#include "mongo/db/extension/sdk/dpl_array_container.h"
#include "mongo/db/extension/sdk/extension_factory.h"
#include "mongo/db/extension/sdk/test_extension_factory.h"
#include <memory>
namespace sdk = mongo::extension::sdk;
using namespace mongo;
/**
* AssertDeadlineIncreaseAfterBatchExecStage asserts that the operation deadline increases between
* batches. The stage is not aware of what the actual batch size is, and instead relies on the user
* to provide the correct batch size as part of the stage's spec. While the current iteration is
* deemed to be within the current batch, this stage asserts that the deadline remains unchanged.
*
*/
class AssertDeadlineIncreaseAfterBatchExecStage : public sdk::TestExecStage {
public:
static inline const std::string kBatchSizeField = "batchSize";
AssertDeadlineIncreaseAfterBatchExecStage(std::string_view stageName, BSONObj arguments)
: sdk::TestExecStage(stageName, arguments),
_batchSize([&]() {
sdk_uassert(11650001,
"$assertDeadlineIncreaseAfterBatch requires a 'batchSize' integer field",
arguments.hasField(kBatchSizeField) &&
arguments[kBatchSizeField].isNumber());
const auto batchSize = arguments[kBatchSizeField].numberInt();
sdk_uassert(11650002,
"$assertDeadlineIncreaseAfterBatch batchSize must be positive",
batchSize > 0);
return batchSize;
}()),
_callCount(0),
_lastDeadline(0) {}
mongo::extension::ExtensionGetNextResult getNext(
const sdk::QueryExecutionContextHandle& execCtx,
MongoExtensionExecAggStage* execStage) override {
// Get the current deadline from the execution context.
const int64_t currentDeadline = execCtx->getDeadlineTimestampMs();
++_callCount;
if (_callCount % _batchSize == 1 || _batchSize == 1) {
// We've started a new batch. The deadline should have increased (been refreshed for the
// new getMore).
sdk_uassert(
11650003,
fmt::format("$assertDeadlineIncreaseAfterBatch: expected deadline to increase at "
"batch boundary, but it did not. currentDeadline={}, lastDeadline={}",
currentDeadline,
_lastDeadline),
currentDeadline > _lastDeadline);
} else {
// Within a batch: the deadline should not have changed.
sdk_uassert(
11650004,
fmt::format(
"$assertDeadlineIncreaseAfterBatch: expected deadline to remain the same "
"within a batch, but it changed. currentDeadline={}, lastDeadline={}",
currentDeadline,
_lastDeadline),
currentDeadline == _lastDeadline);
}
_lastDeadline = currentDeadline;
auto input =
_getSource()->getNext(const_cast<MongoExtensionQueryExecutionContext*>(execCtx.get()));
if (input.code != mongo::extension::GetNextCode::kAdvanced) {
return input;
}
auto doc = input.resultDocument->getUnownedBSONObj();
return mongo::extension::ExtensionGetNextResult::advanced(
mongo::extension::ExtensionBSONObj::makeAsByteBuf(doc));
}
private:
const size_t _batchSize;
size_t _callCount;
int64_t _lastDeadline;
};
/**
* AssertDeadlineIncreaseAfterBatchLogicalStage specifies a DPL with only a merge pipeline, since
* this stage must always run on the router node. We do this because in a sharded cluster, mongos
* sets the internal batch size to 0 instead of propagating the client provided batch size, which
* would cause the stage to fail if running on a shard.
*/
class AssertDeadlineIncreaseAfterBatchLogicalStage
: public sdk::TestLogicalStage<AssertDeadlineIncreaseAfterBatchExecStage> {
public:
AssertDeadlineIncreaseAfterBatchLogicalStage(std::string_view stageName,
const mongo::BSONObj& arguments)
: sdk::TestLogicalStage<AssertDeadlineIncreaseAfterBatchExecStage>(stageName, arguments) {}
std::unique_ptr<sdk::LogicalAggStage> clone() const override {
return std::make_unique<AssertDeadlineIncreaseAfterBatchLogicalStage>(_name, _arguments);
}
boost::optional<sdk::DistributedPlanLogic> getDistributedPlanLogic() const override {
sdk::DistributedPlanLogic dpl;
{
std::vector<mongo::extension::VariantDPLHandle> pipeline;
pipeline.emplace_back(mongo::extension::LogicalAggStageHandle{
new sdk::ExtensionLogicalAggStageAdapter(clone())});
dpl.mergingPipeline = sdk::DPLArrayContainer(std::move(pipeline));
}
return dpl;
}
};
DEFAULT_AST_NODE(AssertDeadlineIncreaseAfterBatch);
DEFAULT_PARSE_NODE(AssertDeadlineIncreaseAfterBatch);
/**
* $assertDeadlineIncreaseAfterBatch aggregation stage.
*
* Syntax: {$assertDeadlineIncreaseAfterBatch: {batchSize: <integer>}}
*
* This stage passes through documents from its source unchanged. During
* execution, it tracks the number of getNext() calls per batch and asserts:
* - Within a batch (calls < batchSize): the deadline has not changed.
* - At the batch boundary (call == batchSize): the deadline has increased.
*
* This is used to verify that the server correctly refreshes the operation deadline when issuing
* a getMore command.
*/
using AssertDeadlineIncreaseAfterBatchStageDescriptorBase =
sdk::TestStageDescriptor<"$assertDeadlineIncreaseAfterBatch",
AssertDeadlineIncreaseAfterBatchParseNode>;
class AssertDeadlineIncreaseAfterBatchStageDescriptor
: public AssertDeadlineIncreaseAfterBatchStageDescriptorBase {
void validate(const mongo::BSONObj& arguments) const override {
sdk_uassert(
11650005,
"$assertDeadlineIncreaseAfterBatch requires a 'batchSize' field that is a number",
arguments.hasField(AssertDeadlineIncreaseAfterBatchExecStage::kBatchSizeField) &&
arguments[AssertDeadlineIncreaseAfterBatchExecStage::kBatchSizeField].isNumber());
sdk_uassert(
11650006,
"$assertDeadlineIncreaseAfterBatch 'batchSize' must be a positive integer",
arguments[AssertDeadlineIncreaseAfterBatchExecStage::kBatchSizeField].numberInt() > 0);
}
};
DEFAULT_EXTENSION(AssertDeadlineIncreaseAfterBatch)
REGISTER_EXTENSION(AssertDeadlineIncreaseAfterBatchExtension)
DEFINE_GET_EXTENSION()