Revert "SERVER-127220 Ensure change stream topology v2 stage respects maxAwaitTimeMS in Waiting state (#54110)" (#54351)

Co-authored-by: auto-revert-processor <devprod-si-team@mongodb.com>
GitOrigin-RevId: 3da30b19c0e4a670ea1dcf4196c7fadb8243f41c
This commit is contained in:
auto-revert-app[bot] 2026-05-22 16:19:04 +00:00 committed by MongoDB Bot
parent 7dbfe6f86f
commit 84ffb5f8ec
2 changed files with 17 additions and 238 deletions

View File

@ -45,7 +45,6 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/query/compiler/parsers/matcher/expression_parser.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/query_execution_knobs_gen.h"
#include "mongo/db/router_role/cluster_commands_helpers.h"
#include "mongo/db/service_context.h"
@ -1149,26 +1148,18 @@ ChangeStreamHandleTopologyChangeV2Stage::_handleStateWaiting() {
Seconds secondsSinceLastPoll = duration_cast<Seconds>(now - _lastAllocationToShardsRequestTime);
if (secondsSinceLastPoll < Seconds(_params->minAllocationToShardsPollPeriodSecs)) {
// Next planned time to poll again.
const Date_t nextPollTime = _lastAllocationToShardsRequestTime +
// Wait until the next poll time.
Date_t nextPollTime = _lastAllocationToShardsRequestTime +
Seconds(_params->minAllocationToShardsPollPeriodSecs);
// For awaitData getMores the caller's deadline lives in 'awaitDataState', not on opCtx.
// Cap the wait time to not be larger than the maxAwaitTimeMS.
const auto& awaitState = awaitDataState(opCtx);
const Date_t waitUntilDate = awaitState.shouldWaitForInserts
? std::min(nextPollTime, awaitState.waitForInsertsDeadline)
: nextPollTime;
try {
// Returns normally when 'waitUntilDate' is reached. Throws ExceededTimeLimit when
// the opCtx deadline expires (only set when the aggregate was issued with explicit
// 'maxTimeMS', as mongos does not set it for awaitData getMores) caught below.
_params->deadlineWaiter->waitUntil(opCtx, waitUntilDate);
// The following call throws if the operation got interrupted or killed, or if the
// OperationContext's own deadline (maxAwaitTimeMS) has been exceeded. Does not throw if
// waiting reached 'nextPollTime', but the OperationContext's deadline has not yet
// expired.
_params->deadlineWaiter->waitUntil(opCtx, nextPollTime);
} catch (const ExceptionFor<ErrorCategory::ExceededTimeLimitError>& ex) {
// OperationContext deadline reached. Reachable only on an initial aggregate where the
// user passed 'maxTimeMS'. Surface a clean EOF so the aggregate returns the cursor with
// an empty firstBatch.
// OperationContext deadline exceeded. Return EOF so the client gets an intermediate
// result back.
LOGV2_DEBUG(10657544,
3,
STAGE_LOG_PREFIX "Deadline time limit exceeded",
@ -1182,14 +1173,8 @@ ChangeStreamHandleTopologyChangeV2Stage::_handleStateWaiting() {
return GetNextResult::makeEOF();
}
// The wait may have ended because the awaitData deadline. Timeout is reached. Return EOF.
if (awaitState.shouldWaitForInserts &&
awaitState.waitForInsertsDeadline <=
opCtx->getServiceContext()->getPreciseClockSource()->now()) {
return GetNextResult::makeEOF();
}
// We only waited up to 'nextPollTime'. Stay in kWaiting.
// No state change here, so we enter the state machine in the next turn with kWaiting
// again.
return boost::none;
}
@ -1201,13 +1186,9 @@ ChangeStreamHandleTopologyChangeV2Stage::_handleStateWaiting() {
uasserted(ErrorCodes::RetryChangeStream,
"Could not retrieve placement information for the specified cluster time");
case AllocationToShardsStatus::kFutureClusterTime:
// No awaitData deadline means there's nothing to time-bound a loop against (e.g. the
// initial aggregate). Surface an empty batch so the client gets the cursor back and
// can issue getMore, which will carry the awaitData deadline.
if (!awaitDataState(opCtx).shouldWaitForInserts) {
return GetNextResult::makeEOF();
}
return boost::none;
// Cluster time is still in the future. Return EOF to the client because
// maxAwaitTimeMS has expired.
return GetNextResult::makeEOF();
case AllocationToShardsStatus::kOk:
// Transition to kFetchingInitialization state.
_setState(State::kFetchingInitialization);

View File

@ -30,7 +30,6 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/pipeline/change_stream_read_mode.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2_test_helpers.h"
#include "mongo/db/query/find_common.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/duration.h"
@ -312,220 +311,18 @@ TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingFutureClusterTime) {
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
// Mark this as an awaitData getMore so the poll branch's kFutureClusterTime arm loops
// instead of EOFing (that EOF is reserved for the initial-aggregate path).
awaitDataState(getOpCtx()).shouldWaitForInserts = true;
awaitDataState(getOpCtx()).waitForInsertsDeadline = now + Hours(1);
// Set deadline to a few milliseconds in the future.
Date_t deadline = now + Milliseconds(5);
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
// State should remain in kWaiting, as no progress has been made.
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
}
// Tests that consecutive kFutureClusterTime poll results keep the state machine in kWaiting and
// never short-circuit awaitData. EOF must only fire when the OperationContext deadline expires
// (i.e., the deadlineWaiter throws ExceededTimeLimit).
TEST_F(DSV2StateUninitializedAndWaitingTest,
StateWaitingFutureClusterTimeRepeatsPollsUntilDeadline) {
const Timestamp ts = Timestamp(42, 0);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
constexpr int kPollPeriodSecs = 5;
constexpr int kNumPolls = 3;
auto params = buildParametersForTest(getExpCtx(),
kPollPeriodSecs /* minAllocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
// Buffer kNumPolls consecutive kFutureClusterTime responses, one per loop iteration.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
for (int i = 0; i < kNumPolls; ++i) {
mockResponses.emplace_back(ts, AllocationToShardsStatus::kFutureClusterTime);
}
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
// Mark this as an awaitData getMore so the poll branch's kFutureClusterTime arm loops
// instead of EOFing on the first poll (that EOF is reserved for the initial-aggregate path).
awaitDataState(getOpCtx()).shouldWaitForInserts = true;
awaitDataState(getOpCtx()).waitForInsertsDeadline = now + Hours(1);
// Generous OperationContext deadline, far past kNumPolls * kPollPeriodSecs so it cannot
// be the reason for any EOF in the loop below.
getOpCtx()->setDeadlineByDate(now + Seconds(kPollPeriodSecs * (kNumPolls + 10)),
ErrorCodes::ExceededTimeLimit);
// Drive kNumPolls poll iterations. For each, pretend we last polled long enough ago that
// _handleStateWaiting bypasses deadlineWaiter and goes straight to the poll branch.
// With the fix, every poll returning kFutureClusterTime must return boost::none (loop)
// and keep the state in kWaiting — never EOF.
for (int i = 0; i < kNumPolls; ++i) {
docSource->setLastAllocationToShardsRequestTime_forTest(now - Seconds(kPollPeriodSecs));
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value())
<< "kFutureClusterTime must not emit EOF (iteration " << i << ")";
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest())
<< "state must remain kWaiting (iteration " << i << ")";
}
// Now simulate the deadline finally firing, making the waiter return ExceededTimeLimit,
// mirroring what happens when the opCtx deadline is reached while waiting for the next poll.
docSource->setLastAllocationToShardsRequestTime_forTest(now);
getDeadlineWaiterMock(params)->setStatus(
Status(ErrorCodes::ExceededTimeLimit, "operation context deadline exceeded"));
auto finalResult = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(finalResult.has_value());
ASSERT_TRUE(finalResult->isEOF())
<< "EOF must come from the deadlineWaiter ExceededTimeLimit catch, not from "
"kFutureClusterTime";
// The catch block returns EOF without transitioning state; the cursor is still alive
// and a subsequent getMore would re-enter kWaiting.
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
}
// Tests that when this stage is in waiting state, it honours the awaitData deadline stored in
// 'awaitDataState(opCtx).waitForInsertsDeadline' by mongos. When that deadline is reached, the
// stage must surface a clean EOF.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingHonoursAwaitDataDeadline) {
const Timestamp ts = Timestamp(42, 0);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
constexpr int kPollPeriodSecs = 10;
auto params = buildParametersForTest(getExpCtx(),
kPollPeriodSecs /* minAllocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
// Pretend we just polled, so the wait branch is taken (not the poll branch).
docSource->setLastAllocationToShardsRequestTime_forTest(now);
// No opCtx deadline - mongos does not set one for tailable+awaitData getMores. The awaitData
// deadline is stashed in awaitDataState instead. Set it well before nextPollTime so it is the
// active cap.
awaitDataState(getOpCtx()).shouldWaitForInserts = true;
awaitDataState(getOpCtx()).waitForInsertsDeadline = now + Seconds(1);
// Simulate "the wait completed normally at waitForInsertsDeadline": when the stage looks at
// 'now' after waitUntil returns, it must see now >= waitForInsertsDeadline and EOF cleanly.
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now + Seconds(1));
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF())
<< "awaitData deadline expiry must surface a clean EOF (empty batch + PBRT)";
// The cursor stays alive; only the batch ended. State must remain kWaiting so a subsequent
// getMore picks up here and re-enters the same flow.
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
// The waiter should have been clamped to the awaitData deadline (= now + 1s in the original
// 'now'), not to nextPollTime (= now + 10s).
ASSERT_EQ((now + Seconds(1)).toMillisSinceEpoch(),
getDeadlineWaiterMock(params)->getLastUsedDeadline().toMillisSinceEpoch());
}
// Tests the full "initial aggregate against a future clusterTime" flow through this stage:
// turn 1 enters the wait branch and calls waitUntil with nextPollTime (no awaitData clamp);
// turn 2 (after the wait has elapsed) takes the poll branch, sees kFutureClusterTime, and
// emits a clean EOF so the aggregate returns the cursor with empty firstBatch.
TEST_F(DSV2StateUninitializedAndWaitingTest,
InitialAggregateWithFutureClusterTimeEofsAfterOnePollCycle) {
const Timestamp ts = Timestamp(42, 0);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
constexpr int kPollPeriodSecs = 10;
auto params = buildParametersForTest(getExpCtx(),
kPollPeriodSecs /* minAllocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
// Turn 2 will poll and see kFutureClusterTime.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
mockResponses.emplace_back(ts, AllocationToShardsStatus::kFutureClusterTime);
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
// Generous opCtx deadline so it cannot cause EOF.
getOpCtx()->setDeadlineByDate(now + Seconds(kPollPeriodSecs * 5),
ErrorCodes::ExceededTimeLimit);
// No awaitData state -> this is the initial aggregate path.
ASSERT_FALSE(awaitDataState(getOpCtx()).shouldWaitForInserts);
// Turn 1: wait branch (lastPoll == now, so secondsSinceLastPoll < kPollPeriodSecs). Without
// an awaitData deadline, waitUntilDate must equal nextPollTime; the state machine loops
// (returns boost::none), no EOF.
docSource->setLastAllocationToShardsRequestTime_forTest(now);
{
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value()) << "turn 1 must loop, not EOF";
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
ASSERT_EQ((now + Seconds(kPollPeriodSecs)).toMillisSinceEpoch(),
getDeadlineWaiterMock(params)->getLastUsedDeadline().toMillisSinceEpoch())
<< "waiter must be called with nextPollTime when there is no awaitData deadline";
}
// Turn 2: pretend the wait has elapsed by backdating lastPoll, so we take the poll branch.
// With kFutureClusterTime and no awaitData state, the stage must EOF instead of looping.
docSource->setLastAllocationToShardsRequestTime_forTest(now - Seconds(kPollPeriodSecs + 1));
{
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF())
<< "initial aggregate with kFutureClusterTime must EOF after one poll cycle";
// State stays kWaiting so the follow-up getMore picks up here with its awaitData
// deadline and takes the loop-with-clamped-wait path.
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
}
}
// Tests state machine for input state kWaiting, when the data-to-shards allocation query service
// returns that the allocation is available.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingTransitioningToFetching) {
@ -788,3 +585,4 @@ TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingBehaviorWhenWaitReturns
} // namespace
} // namespace mongo