SERVER-127220 Ensure change stream topology v2 stage respects maxAwaitTimeMS in Waiting state (#54429)
GitOrigin-RevId: 4d06ea5720a3e7285efded27e7b3bfe47a333ff6
This commit is contained in:
parent
c5ae18cad2
commit
0f94ba8b01
@ -1171,12 +1171,14 @@ export function assertOpenCursors(st, expectedDataShards, expectedConfigCursor,
|
||||
const shardsWithOpenCursors = dataShardCursors.map((cursor) => cursor.shard);
|
||||
|
||||
// In config shard mode, the config server is also a data shard (named "config").
|
||||
// Cursors on it are reported as regular data shard cursors via mongos, so we
|
||||
// include "config" in the data shard comparison and skip the separate config
|
||||
// cursor check below.
|
||||
const dataShardsWithOpenCursors = jsTestOptions().configShard
|
||||
? shardsWithOpenCursors
|
||||
: shardsWithOpenCursors.filter((shard) => shard !== "config");
|
||||
// Cursors on it are reported as regular data shard cursors via mongos. When the
|
||||
// caller expects a config cursor, attribute any "config" entry to it and strip
|
||||
// it out before comparing against 'expectedDataShards'.
|
||||
const isConfigShard = jsTestOptions().configShard;
|
||||
const shouldRemoveConfigShardFromDataShardList = !isConfigShard || expectedConfigCursor;
|
||||
const dataShardsWithOpenCursors = shouldRemoveConfigShardFromDataShardList
|
||||
? shardsWithOpenCursors.filter((shard) => shard !== "config")
|
||||
: shardsWithOpenCursors;
|
||||
assert.sameMembers(
|
||||
expectedDataShards,
|
||||
dataShardsWithOpenCursors,
|
||||
@ -1185,11 +1187,18 @@ export function assertOpenCursors(st, expectedDataShards, expectedConfigCursor,
|
||||
|
||||
// With a dedicated config server, check for config cursors directly via
|
||||
// localOps since they don't appear in the mongos $currentOp results.
|
||||
if (!jsTestOptions().configShard) {
|
||||
if (!isConfigShard) {
|
||||
const configCursors = listIdleCursors(configAdminDB, filter, {localOps: true});
|
||||
jsTest.log.debug("Open config cursors", {configCursors});
|
||||
const configMatch = expectedConfigCursor ? configCursors.length > 0 : configCursors.length == 0;
|
||||
return configMatch;
|
||||
} else {
|
||||
// In config-shard mode the config server is one of the data shards. When the
|
||||
// caller expects a config cursor, verify "config" was in the data-shard cursor
|
||||
// list reported by mongos before we stripped it out for the sameMembers check.
|
||||
if (expectedConfigCursor && !shardsWithOpenCursors.includes("config")) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
},
|
||||
|
||||
@ -45,6 +45,7 @@
|
||||
#include "mongo/db/pipeline/pipeline.h"
|
||||
#include "mongo/db/pipeline/resume_token.h"
|
||||
#include "mongo/db/query/compiler/parsers/matcher/expression_parser.h"
|
||||
#include "mongo/db/query/find_common.h"
|
||||
#include "mongo/db/query/query_execution_knobs_gen.h"
|
||||
#include "mongo/db/router_role/cluster_commands_helpers.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
@ -1148,18 +1149,26 @@ ChangeStreamHandleTopologyChangeV2Stage::_handleStateWaiting() {
|
||||
Seconds secondsSinceLastPoll = duration_cast<Seconds>(now - _lastAllocationToShardsRequestTime);
|
||||
|
||||
if (secondsSinceLastPoll < Seconds(_params->minAllocationToShardsPollPeriodSecs)) {
|
||||
// Wait until the next poll time.
|
||||
Date_t nextPollTime = _lastAllocationToShardsRequestTime +
|
||||
// Next planned time to poll again.
|
||||
const Date_t nextPollTime = _lastAllocationToShardsRequestTime +
|
||||
Seconds(_params->minAllocationToShardsPollPeriodSecs);
|
||||
|
||||
// For awaitData getMores the caller's deadline lives in 'awaitDataState', not on opCtx.
|
||||
// Cap the wait time to not be larger than the maxAwaitTimeMS.
|
||||
const auto& awaitState = awaitDataState(opCtx);
|
||||
const Date_t waitUntilDate = awaitState.shouldWaitForInserts
|
||||
? std::min(nextPollTime, awaitState.waitForInsertsDeadline)
|
||||
: nextPollTime;
|
||||
|
||||
try {
|
||||
// The following call throws if the operation got interrupted or killed, or if the
|
||||
// OperationContext's own deadline (maxAwaitTimeMS) has been exceeded. Does not throw if
|
||||
// waiting reached 'nextPollTime', but the OperationContext's deadline has not yet
|
||||
// expired.
|
||||
_params->deadlineWaiter->waitUntil(opCtx, nextPollTime);
|
||||
// Returns normally when 'waitUntilDate' is reached. Throws ExceededTimeLimit when
|
||||
// the opCtx deadline expires (only set when the aggregate was issued with explicit
|
||||
// 'maxTimeMS', as mongos does not set it for awaitData getMores) caught below.
|
||||
_params->deadlineWaiter->waitUntil(opCtx, waitUntilDate);
|
||||
} catch (const ExceptionFor<ErrorCategory::ExceededTimeLimitError>& ex) {
|
||||
// OperationContext deadline exceeded. Return EOF so the client gets an intermediate
|
||||
// result back.
|
||||
// OperationContext deadline reached. Reachable only on an initial aggregate where the
|
||||
// user passed 'maxTimeMS'. Surface a clean EOF so the aggregate returns the cursor with
|
||||
// an empty firstBatch.
|
||||
LOGV2_DEBUG(10657544,
|
||||
3,
|
||||
STAGE_LOG_PREFIX "Deadline time limit exceeded",
|
||||
@ -1173,8 +1182,14 @@ ChangeStreamHandleTopologyChangeV2Stage::_handleStateWaiting() {
|
||||
return GetNextResult::makeEOF();
|
||||
}
|
||||
|
||||
// No state change here, so we enter the state machine in the next turn with kWaiting
|
||||
// again.
|
||||
// The wait may have ended because the awaitData deadline. Timeout is reached. Return EOF.
|
||||
if (awaitState.shouldWaitForInserts &&
|
||||
awaitState.waitForInsertsDeadline <=
|
||||
opCtx->getServiceContext()->getPreciseClockSource()->now()) {
|
||||
return GetNextResult::makeEOF();
|
||||
}
|
||||
|
||||
// We only waited up to 'nextPollTime'. Stay in kWaiting.
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
@ -1186,9 +1201,13 @@ ChangeStreamHandleTopologyChangeV2Stage::_handleStateWaiting() {
|
||||
uasserted(ErrorCodes::RetryChangeStream,
|
||||
"Could not retrieve placement information for the specified cluster time");
|
||||
case AllocationToShardsStatus::kFutureClusterTime:
|
||||
// Cluster time is still in the future. Return EOF to the client because
|
||||
// maxAwaitTimeMS has expired.
|
||||
return GetNextResult::makeEOF();
|
||||
// No awaitData deadline means there's nothing to time-bound a loop against (e.g. the
|
||||
// initial aggregate). Surface an empty batch so the client gets the cursor back and
|
||||
// can issue getMore, which will carry the awaitData deadline.
|
||||
if (!awaitDataState(opCtx).shouldWaitForInserts) {
|
||||
return GetNextResult::makeEOF();
|
||||
}
|
||||
return boost::none;
|
||||
case AllocationToShardsStatus::kOk:
|
||||
// Transition to kFetchingInitialization state.
|
||||
_setState(State::kFetchingInitialization);
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/db/pipeline/change_stream_read_mode.h"
|
||||
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2_test_helpers.h"
|
||||
#include "mongo/db/query/find_common.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/duration.h"
|
||||
|
||||
@ -311,18 +312,220 @@ TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingFutureClusterTime) {
|
||||
Date_t now = Date_t::now();
|
||||
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
|
||||
|
||||
// Mark this as an awaitData getMore so the poll branch's kFutureClusterTime arm loops
|
||||
// instead of EOFing (that EOF is reserved for the initial-aggregate path).
|
||||
awaitDataState(getOpCtx()).shouldWaitForInserts = true;
|
||||
awaitDataState(getOpCtx()).waitForInsertsDeadline = now + Hours(1);
|
||||
|
||||
// Set deadline to a few milliseconds in the future.
|
||||
Date_t deadline = now + Milliseconds(5);
|
||||
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
|
||||
|
||||
auto result = docSource->runGetNextStateMachine_forTest();
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_TRUE(result->isEOF());
|
||||
ASSERT_FALSE(result.has_value());
|
||||
|
||||
// State should remain in kWaiting, as no progress has been made.
|
||||
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
|
||||
}
|
||||
|
||||
// Tests that consecutive kFutureClusterTime poll results keep the state machine in kWaiting and
|
||||
// never short-circuit awaitData. EOF must only fire when the OperationContext deadline expires
|
||||
// (i.e., the deadlineWaiter throws ExceededTimeLimit).
|
||||
TEST_F(DSV2StateUninitializedAndWaitingTest,
|
||||
StateWaitingFutureClusterTimeRepeatsPollsUntilDeadline) {
|
||||
const Timestamp ts = Timestamp(42, 0);
|
||||
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
|
||||
|
||||
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
|
||||
[](OperationContext* opCtx, const ChangeStream& changeStream) {
|
||||
return std::make_unique<ChangeStreamShardTargeterMock>();
|
||||
});
|
||||
auto dataToShardsAllocationQueryService =
|
||||
std::make_unique<DataToShardsAllocationQueryServiceMock>();
|
||||
|
||||
constexpr int kPollPeriodSecs = 5;
|
||||
constexpr int kNumPolls = 3;
|
||||
|
||||
auto params = buildParametersForTest(getExpCtx(),
|
||||
kPollPeriodSecs /* minAllocationToShardsPollPeriodSecs */,
|
||||
changeStreamReaderBuilder.get(),
|
||||
dataToShardsAllocationQueryService.get());
|
||||
|
||||
// Buffer kNumPolls consecutive kFutureClusterTime responses, one per loop iteration.
|
||||
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
|
||||
for (int i = 0; i < kNumPolls; ++i) {
|
||||
mockResponses.emplace_back(ts, AllocationToShardsStatus::kFutureClusterTime);
|
||||
}
|
||||
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
|
||||
|
||||
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
|
||||
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
|
||||
|
||||
Date_t now = Date_t::now();
|
||||
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
|
||||
|
||||
// Mark this as an awaitData getMore so the poll branch's kFutureClusterTime arm loops
|
||||
// instead of EOFing on the first poll (that EOF is reserved for the initial-aggregate path).
|
||||
awaitDataState(getOpCtx()).shouldWaitForInserts = true;
|
||||
awaitDataState(getOpCtx()).waitForInsertsDeadline = now + Hours(1);
|
||||
|
||||
// Generous OperationContext deadline, far past kNumPolls * kPollPeriodSecs so it cannot
|
||||
// be the reason for any EOF in the loop below.
|
||||
getOpCtx()->setDeadlineByDate(now + Seconds(kPollPeriodSecs * (kNumPolls + 10)),
|
||||
ErrorCodes::ExceededTimeLimit);
|
||||
|
||||
// Drive kNumPolls poll iterations. For each, pretend we last polled long enough ago that
|
||||
// _handleStateWaiting bypasses deadlineWaiter and goes straight to the poll branch.
|
||||
// With the fix, every poll returning kFutureClusterTime must return boost::none (loop)
|
||||
// and keep the state in kWaiting — never EOF.
|
||||
for (int i = 0; i < kNumPolls; ++i) {
|
||||
docSource->setLastAllocationToShardsRequestTime_forTest(now - Seconds(kPollPeriodSecs));
|
||||
|
||||
auto result = docSource->runGetNextStateMachine_forTest();
|
||||
ASSERT_FALSE(result.has_value())
|
||||
<< "kFutureClusterTime must not emit EOF (iteration " << i << ")";
|
||||
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest())
|
||||
<< "state must remain kWaiting (iteration " << i << ")";
|
||||
}
|
||||
|
||||
// Now simulate the deadline finally firing, making the waiter return ExceededTimeLimit,
|
||||
// mirroring what happens when the opCtx deadline is reached while waiting for the next poll.
|
||||
docSource->setLastAllocationToShardsRequestTime_forTest(now);
|
||||
getDeadlineWaiterMock(params)->setStatus(
|
||||
Status(ErrorCodes::ExceededTimeLimit, "operation context deadline exceeded"));
|
||||
|
||||
auto finalResult = docSource->runGetNextStateMachine_forTest();
|
||||
ASSERT_TRUE(finalResult.has_value());
|
||||
ASSERT_TRUE(finalResult->isEOF())
|
||||
<< "EOF must come from the deadlineWaiter ExceededTimeLimit catch, not from "
|
||||
"kFutureClusterTime";
|
||||
|
||||
// The catch block returns EOF without transitioning state; the cursor is still alive
|
||||
// and a subsequent getMore would re-enter kWaiting.
|
||||
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
|
||||
}
|
||||
|
||||
// Tests that when this stage is in waiting state, it honours the awaitData deadline stored in
|
||||
// 'awaitDataState(opCtx).waitForInsertsDeadline' by mongos. When that deadline is reached, the
|
||||
// stage must surface a clean EOF.
|
||||
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingHonoursAwaitDataDeadline) {
|
||||
const Timestamp ts = Timestamp(42, 0);
|
||||
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
|
||||
|
||||
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
|
||||
[](OperationContext* opCtx, const ChangeStream& changeStream) {
|
||||
return std::make_unique<ChangeStreamShardTargeterMock>();
|
||||
});
|
||||
auto dataToShardsAllocationQueryService =
|
||||
std::make_unique<DataToShardsAllocationQueryServiceMock>();
|
||||
|
||||
constexpr int kPollPeriodSecs = 10;
|
||||
auto params = buildParametersForTest(getExpCtx(),
|
||||
kPollPeriodSecs /* minAllocationToShardsPollPeriodSecs */,
|
||||
changeStreamReaderBuilder.get(),
|
||||
dataToShardsAllocationQueryService.get());
|
||||
|
||||
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
|
||||
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
|
||||
|
||||
Date_t now = Date_t::now();
|
||||
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
|
||||
|
||||
// Pretend we just polled, so the wait branch is taken (not the poll branch).
|
||||
docSource->setLastAllocationToShardsRequestTime_forTest(now);
|
||||
|
||||
// No opCtx deadline - mongos does not set one for tailable+awaitData getMores. The awaitData
|
||||
// deadline is stashed in awaitDataState instead. Set it well before nextPollTime so it is the
|
||||
// active cap.
|
||||
awaitDataState(getOpCtx()).shouldWaitForInserts = true;
|
||||
awaitDataState(getOpCtx()).waitForInsertsDeadline = now + Seconds(1);
|
||||
|
||||
// Simulate "the wait completed normally at waitForInsertsDeadline": when the stage looks at
|
||||
// 'now' after waitUntil returns, it must see now >= waitForInsertsDeadline and EOF cleanly.
|
||||
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now + Seconds(1));
|
||||
|
||||
auto result = docSource->runGetNextStateMachine_forTest();
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_TRUE(result->isEOF())
|
||||
<< "awaitData deadline expiry must surface a clean EOF (empty batch + PBRT)";
|
||||
|
||||
// The cursor stays alive; only the batch ended. State must remain kWaiting so a subsequent
|
||||
// getMore picks up here and re-enters the same flow.
|
||||
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
|
||||
|
||||
// The waiter should have been clamped to the awaitData deadline (= now + 1s in the original
|
||||
// 'now'), not to nextPollTime (= now + 10s).
|
||||
ASSERT_EQ((now + Seconds(1)).toMillisSinceEpoch(),
|
||||
getDeadlineWaiterMock(params)->getLastUsedDeadline().toMillisSinceEpoch());
|
||||
}
|
||||
|
||||
// Tests the full "initial aggregate against a future clusterTime" flow through this stage:
|
||||
// turn 1 enters the wait branch and calls waitUntil with nextPollTime (no awaitData clamp);
|
||||
// turn 2 (after the wait has elapsed) takes the poll branch, sees kFutureClusterTime, and
|
||||
// emits a clean EOF so the aggregate returns the cursor with empty firstBatch.
|
||||
TEST_F(DSV2StateUninitializedAndWaitingTest,
|
||||
InitialAggregateWithFutureClusterTimeEofsAfterOnePollCycle) {
|
||||
const Timestamp ts = Timestamp(42, 0);
|
||||
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
|
||||
|
||||
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
|
||||
[](OperationContext* opCtx, const ChangeStream& changeStream) {
|
||||
return std::make_unique<ChangeStreamShardTargeterMock>();
|
||||
});
|
||||
auto dataToShardsAllocationQueryService =
|
||||
std::make_unique<DataToShardsAllocationQueryServiceMock>();
|
||||
|
||||
constexpr int kPollPeriodSecs = 10;
|
||||
auto params = buildParametersForTest(getExpCtx(),
|
||||
kPollPeriodSecs /* minAllocationToShardsPollPeriodSecs */,
|
||||
changeStreamReaderBuilder.get(),
|
||||
dataToShardsAllocationQueryService.get());
|
||||
|
||||
// Turn 2 will poll and see kFutureClusterTime.
|
||||
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
|
||||
mockResponses.emplace_back(ts, AllocationToShardsStatus::kFutureClusterTime);
|
||||
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
|
||||
|
||||
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
|
||||
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
|
||||
|
||||
Date_t now = Date_t::now();
|
||||
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
|
||||
|
||||
// Generous opCtx deadline so it cannot cause EOF.
|
||||
getOpCtx()->setDeadlineByDate(now + Seconds(kPollPeriodSecs * 5),
|
||||
ErrorCodes::ExceededTimeLimit);
|
||||
|
||||
// No awaitData state -> this is the initial aggregate path.
|
||||
ASSERT_FALSE(awaitDataState(getOpCtx()).shouldWaitForInserts);
|
||||
|
||||
// Turn 1: wait branch (lastPoll == now, so secondsSinceLastPoll < kPollPeriodSecs). Without
|
||||
// an awaitData deadline, waitUntilDate must equal nextPollTime; the state machine loops
|
||||
// (returns boost::none), no EOF.
|
||||
docSource->setLastAllocationToShardsRequestTime_forTest(now);
|
||||
{
|
||||
auto result = docSource->runGetNextStateMachine_forTest();
|
||||
ASSERT_FALSE(result.has_value()) << "turn 1 must loop, not EOF";
|
||||
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
|
||||
ASSERT_EQ((now + Seconds(kPollPeriodSecs)).toMillisSinceEpoch(),
|
||||
getDeadlineWaiterMock(params)->getLastUsedDeadline().toMillisSinceEpoch())
|
||||
<< "waiter must be called with nextPollTime when there is no awaitData deadline";
|
||||
}
|
||||
|
||||
// Turn 2: pretend the wait has elapsed by backdating lastPoll, so we take the poll branch.
|
||||
// With kFutureClusterTime and no awaitData state, the stage must EOF instead of looping.
|
||||
docSource->setLastAllocationToShardsRequestTime_forTest(now - Seconds(kPollPeriodSecs + 1));
|
||||
{
|
||||
auto result = docSource->runGetNextStateMachine_forTest();
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_TRUE(result->isEOF())
|
||||
<< "initial aggregate with kFutureClusterTime must EOF after one poll cycle";
|
||||
// State stays kWaiting so the follow-up getMore picks up here with its awaitData
|
||||
// deadline and takes the loop-with-clamped-wait path.
|
||||
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
|
||||
}
|
||||
}
|
||||
|
||||
// Tests state machine for input state kWaiting, when the data-to-shards allocation query service
|
||||
// returns that the allocation is available.
|
||||
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingTransitioningToFetching) {
|
||||
@ -585,4 +788,3 @@ TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingBehaviorWhenWaitReturns
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user