SERVER-111158 Split document_source_change_stream_handle_topology_change_v2_test into multiple files (#53482)

GitOrigin-RevId: 5e580f7ff5c9a526bcd009b1f031fc23774827ac
This commit is contained in:
cian-mdb 2026-05-15 10:03:39 +01:00 committed by MongoDB Bot
parent 9fc5c35203
commit 02ad24dea1
10 changed files with 4240 additions and 3663 deletions

View File

@ -43,6 +43,26 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "change_stream_stage_test_fixture",
srcs = [
"change_stream_stage_test_fixture.cpp",
],
deps = [
":aggregation_context_fixture",
":change_stream_pipeline",
":change_stream_test_helpers",
":document_source_mock",
"//src/mongo/db:change_stream_options",
"//src/mongo/db:change_stream_options_manager",
"//src/mongo/db:service_context_d_test_fixture",
"//src/mongo/db/exec/document_value",
"//src/mongo/db/exec/document_value:document_value_test_util",
"//src/mongo/db/repl:oplog_entry",
"//src/mongo/unittest",
],
)
idl_generator(
name = "change_stream_pre_and_post_images_options_gen",
src = "change_stream_pre_and_post_images_options.idl",
@ -1226,10 +1246,8 @@ mongo_cc_unit_test(
"change_stream_rewrites_test.cpp",
"change_stream_shard_targeter_test.cpp",
"change_stream_split_event_helpers_test.cpp",
"change_stream_stage_test_fixture.cpp",
"change_stream_test.cpp",
"document_source_change_stream_add_post_image_test.cpp",
"document_source_change_stream_handle_topology_change_v2_test.cpp",
"document_source_change_stream_test.cpp",
],
tags = [
@ -1241,6 +1259,7 @@ mongo_cc_unit_test(
":aggregation_context_fixture",
":change_stream_pipeline",
":change_stream_preimage",
":change_stream_stage_test_fixture",
":change_stream_test_helpers",
":document_source_mock",
"//src/mongo/db:change_stream_options",
@ -1259,6 +1278,41 @@ mongo_cc_unit_test(
],
)
mongo_cc_unit_test(
name = "change_stream_handle_topology_change_v2_stage_test",
srcs = [
"document_source_change_stream_handle_topology_change_v2_cursor_management_and_error_handling_test.cpp",
"document_source_change_stream_handle_topology_change_v2_invalid_state_transitions_test.cpp",
"document_source_change_stream_handle_topology_change_v2_stage_properties_test.cpp",
"document_source_change_stream_handle_topology_change_v2_state_fetching_initialization_and_starting_test.cpp",
"document_source_change_stream_handle_topology_change_v2_state_fetching_normal_and_degraded_test.cpp",
"document_source_change_stream_handle_topology_change_v2_state_uninitialized_and_waiting_test.cpp",
"document_source_change_stream_handle_topology_change_v2_test_helpers.cpp",
],
tags = [
"code_coverage_quarantine",
"code_coverage_quarantine_SERVER-98846",
"mongo_unittest_fifth_group",
],
deps = [
":aggregation_context_fixture",
":change_stream_pipeline",
":change_stream_stage_test_fixture",
":change_stream_test_helpers",
":document_source_mock",
"//src/mongo/db:change_stream_options",
"//src/mongo/db:change_stream_options_manager",
"//src/mongo/db:service_context_d_test_fixture",
"//src/mongo/db/exec/document_value",
"//src/mongo/db/exec/document_value:document_value_test_util",
"//src/mongo/db/repl:oplog_entry",
"//src/mongo/db/repl:oplog_entry_test_helpers",
"//src/mongo/unittest",
"//src/mongo/util:mock_periodic_runner",
"//src/mongo/util:version_impl",
],
)
mongo_cc_unit_test(
name = "db_pipeline_internal_unpack_bucket_test",
srcs = [

View File

@ -0,0 +1,148 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/timestamp.h"
#include "mongo/db/pipeline/change_stream_read_mode.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2_test_helpers.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include <memory>
namespace mongo {
namespace {
using namespace test;
class DSV2StageInvalidStateTransitionsTest : public ChangeStreamStageTestNoSetup {};
using DSV2StageInvalidStateTransitionsTestDeathTest = DSV2StageInvalidStateTransitionsTest;
// Invalid state transition tests.
// -------------------------------
// Tests that a previous exception must have been registered when running the state machine when the
// start state is kFinal.
DEATH_TEST_REGEX_F(DSV2StageInvalidStateTransitionsTestDeathTest,
StateMachineFailsOnStateFinal,
"Tripwire assertion.*10657532") {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(42, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFinal, false /* validateStateTransition */);
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(), AssertionException, 10657532);
}
// Tests that an exception is thrown when the state is set to the existing state using 'setState()'
// / 'setState_forTest()'.
DEATH_TEST_REGEX_F(DSV2StageInvalidStateTransitionsTestDeathTest,
CheckRepeatedState,
"Tripwire assertion.*10657503") {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(42, 0), ChangeStreamReadMode::kStrict));
auto docSource = make_intrusive<V2Stage>(getExpCtx(), nullptr);
// Check state invalid transition from Final to kWaiting to kWaiting.
auto state = V2Stage::State::kWaiting;
docSource->setState_forTest(state, false /* validateStateTransition */);
ASSERT_THROWS_CODE(docSource->setState_forTest(state, true /* validateStateTransition */),
AssertionException,
10657503);
}
// Tests that an exception is thrown when trying to change the state from the end state kFinal to
// another state.
DEATH_TEST_REGEX_F(DSV2StageInvalidStateTransitionsTestDeathTest,
CheckStateTransitionBackFromFinalState,
"Tripwire assertion.*10657504") {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(42, 0), ChangeStreamReadMode::kStrict));
auto docSource = make_intrusive<V2Stage>(getExpCtx(), nullptr);
docSource->setState_forTest(V2Stage::State::kFinal, false /* validateStateTransition */);
for (auto state : {V2Stage::State::kWaiting,
V2Stage::State::kFetchingInitialization,
V2Stage::State::kDowngrading}) {
ASSERT_THROWS_CODE(docSource->setState_forTest(state, true /* validateStateTransition */),
AssertionException,
10657504);
}
}
// Tests that an exception is thrown when trying to set the state back to kUninitialized.
DEATH_TEST_REGEX_F(DSV2StageInvalidStateTransitionsTestDeathTest,
CheckStateTransitionBackToUninitialized,
"Tripwire assertion.*10657505") {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(42, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
for (auto state : {V2Stage::State::kWaiting,
V2Stage::State::kFetchingInitialization,
V2Stage::State::kDowngrading}) {
docSource->setState_forTest(state, false /* validateStateTransition */);
ASSERT_THROWS_CODE(docSource->setState_forTest(V2Stage::State::kUninitialized,
true /* validateStateTransition */),
AssertionException,
10657505);
}
}
} // namespace
} // namespace mongo

View File

@ -0,0 +1,180 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/exec/agg/mock_stage.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/change_stream_read_mode.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2_test_helpers.h"
#include "mongo/db/query/query_shape/serialization_options.h"
#include "mongo/unittest/unittest.h"
#include <memory>
#include <boost/optional/optional.hpp>
namespace mongo {
namespace {
using namespace test;
// Common stage property tests.
// ----------------------------
TEST_F(ChangeStreamStageTest, DSV2CreateFromInvalidInput) {
// Test invalid top-level BSON type.
// The stage parameters are not be needed for this.
const BSONObj spec =
BSON(DocumentSourceChangeStreamHandleTopologyChangeV2::kStageName << BSON("foo" << "bar"));
ASSERT_THROWS_CODE(DocumentSourceChangeStreamHandleTopologyChangeV2::createFromBson(
spec.firstElement(), getExpCtx()),
AssertionException,
10612600);
}
TEST_F(ChangeStreamStageTest, DSV2HandleInputs) {
const Timestamp ts = Timestamp(42, 0);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(ts,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
// Test that the stage returns all inputs as they are.
const BSONObj doc1 = BSON("operationType" << "test1" << "foo" << "bar");
const BSONObj doc2 = BSON("operationType" << "test2" << "test" << "value");
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makePauseExecution(),
Document::fromBsonWithMetaData(doc1),
DocumentSource::GetNextResult::makePauseExecution(),
Document::fromBsonWithMetaData(doc2),
DocumentSource::GetNextResult::makeEOF(),
};
auto stage = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
stage);
// Prepare DataToShardsAllocationQueryServiceMock.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
mockResponses.push_back(std::make_pair(ts, AllocationToShardsStatus::kOk));
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto handleTopologyChangeStage = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(handleTopologyChangeStage, stage.get());
auto next = handleTopologyChangeStage->getNext();
ASSERT_TRUE(next.isPaused());
next = handleTopologyChangeStage->getNext();
ASSERT_TRUE(next.isAdvanced());
ASSERT_DOCUMENT_EQ(Document::fromBsonWithMetaData(doc1), next.getDocument());
next = handleTopologyChangeStage->getNext();
ASSERT_TRUE(next.isPaused());
next = handleTopologyChangeStage->getNext();
ASSERT_TRUE(next.isAdvanced());
ASSERT_DOCUMENT_EQ(Document::fromBsonWithMetaData(doc2), next.getDocument());
next = handleTopologyChangeStage->getNext();
ASSERT_TRUE(next.isEOF());
}
TEST_F(ChangeStreamStageTest, DSV2Serialization) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(42, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
// The only valid spec for this stage is an empty object.
const BSONObj spec = getStageSpec();
auto handleTopologyChangeStage =
DocumentSourceChangeStreamHandleTopologyChangeV2::createFromBson(spec.firstElement(),
getExpCtx());
std::vector<Value> serialization;
handleTopologyChangeStage->serializeToArray(serialization);
ASSERT_EQ(serialization.size(), 1UL);
ASSERT_EQ(serialization[0].getType(), BSONType::object);
ASSERT_BSONOBJ_EQ(
serialization[0].getDocument().toBson(),
BSON(DocumentSourceChangeStreamHandleTopologyChangeV2::kStageName << BSONObj()));
}
TEST_F(ChangeStreamStageTestNoSetup, DSV2EmptyForQueryStats) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(42, 0), ChangeStreamReadMode::kStrict));
auto docSource = DocumentSourceChangeStreamHandleTopologyChangeV2::create(getExpCtx());
ASSERT_BSONOBJ_EQ( // NOLINT
getStageSpec(),
docSource->serialize().getDocument().toBson());
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
} // namespace
} // namespace mongo

View File

@ -0,0 +1,701 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/exec/agg/mock_stage.h"
#include "mongo/db/pipeline/change_stream_read_mode.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2_test_helpers.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include <memory>
#include <boost/optional/optional.hpp>
namespace mongo {
namespace {
using namespace test;
class DSV2StateFetchingInitializationAndStartingTest : public ChangeStreamStageTestNoSetup {};
using DSV2StateFetchingInitializationAndStartingTestDeathTest =
DSV2StateFetchingInitializationAndStartingTest;
// Tests state machine for input state kFetchingInitialization, when the shard targeter returns
// kContinue. Expects the state to transition to kFetchingGettingChangeEvent.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingInitializationStrictModeContinue) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Timestamp(23, 0),
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingInitialization,
false /* validateStateTransition */);
// Assuming not to have segment start or end timestamps before calling the state machine.
ASSERT_FALSE(docSource->getSegmentStartTimestamp_forTest().has_value());
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
// Assuming not to have segment start or end timestamps after the call because we are in strict
// mode.
ASSERT_FALSE(docSource->getSegmentStartTimestamp_forTest().has_value());
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
ASSERT_EQ(V2Stage::State::kFetchingGettingChangeEvent, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingInitialization, when the shard targeter returns
// kSwitchToV1. Expects the state to transition to kDowngrading.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingInitializationStrictModeSwitchToV1) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Timestamp(23, 0),
ShardTargeterDecision::kSwitchToV1,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingInitialization,
false /* validateStateTransition */);
// Assuming not to have a segment start timestamp before calling the state machine.
ASSERT_FALSE(docSource->getSegmentStartTimestamp_forTest().has_value());
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
// Assuming not to have segment start or end timestamps after the call because we are in strict
// mode.
ASSERT_FALSE(docSource->getSegmentStartTimestamp_forTest().has_value());
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
ASSERT_EQ(V2Stage::State::kDowngrading, docSource->getState_forTest());
// When state is kDowngrading, the expected result upon next state machine invocation is an
// exception.
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::RetryChangeStream);
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingInitialization for non-control events.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingInitializationStrictModeGettingChangeEventNonControlEvents) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
// Test that the stage returns all inputs as they are.
const BSONObj doc1 = BSON("operationType" << "test1" << "foo" << "bar");
const BSONObj doc2 = BSON("operationType" << "test2" << "test" << "value");
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makePauseExecution(),
Document::fromBsonWithMetaData(doc1),
DocumentSource::GetNextResult::makePauseExecution(),
Document::fromBsonWithMetaData(doc2),
DocumentSource::GetNextResult::makeEOF(),
};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingGettingChangeEvent,
false /* validateStateTransition */);
// Check return value 1 (pause).
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isPaused());
ASSERT_EQ(V2Stage::State::kFetchingGettingChangeEvent, docSource->getState_forTest());
// Check return value 2 (doc1).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isAdvanced());
ASSERT_BSONOBJ_EQ(doc1, result->getDocument().toBson());
ASSERT_EQ(V2Stage::State::kFetchingGettingChangeEvent, docSource->getState_forTest());
// Check return value 3 (pause).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isPaused());
ASSERT_EQ(V2Stage::State::kFetchingGettingChangeEvent, docSource->getState_forTest());
// Check return value 4 (doc2).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isAdvanced());
ASSERT_BSONOBJ_EQ(doc2, result->getDocument().toBson());
ASSERT_EQ(V2Stage::State::kFetchingGettingChangeEvent, docSource->getState_forTest());
// Check return value 5 (eof).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingGettingChangeEvent, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingInitialization for a control event.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingInitializationStrictModeGettingChangeEventWithControlEvent) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
const BSONObj event = BSON("operationType" << "test" << "foo" << "bar"
<< Document::metaFieldChangeStreamControlEvent << 1);
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Document{event},
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makeAdvancedControlDocument(
Document::fromBsonWithMetaData(event))};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingGettingChangeEvent,
false /* validateStateTransition */);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingGettingChangeEvent, docSource->getState_forTest());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingGettingChangeEvent, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingInitialization in ignoreRemovedShards mode. This is
// supposed to set the start time of the change stream segment and transition to state
// kFetchingStartingChangeStreamSegment.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingInitializationIgnoreRemovedShards) {
const Timestamp ts = Timestamp(23, 0);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingInitialization,
false /* validateStateTransition */);
// Assuming not to have segment start or end timestamps before calling the state machine.
ASSERT_FALSE(docSource->getSegmentStartTimestamp_forTest().has_value());
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
// After calling the state machine, the segment start timestamp must be set to the resume
// token's timestamp.
const auto& segmentStartTimestamp = docSource->getSegmentStartTimestamp_forTest();
ASSERT_TRUE(segmentStartTimestamp.has_value());
ASSERT_EQ(ts, *segmentStartTimestamp);
// Still assuming no end timestamp.
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
ASSERT_EQ(V2Stage::State::kFetchingStartingChangeStreamSegment, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingStartingChangeStreamSegment, without the segment
// start timestamp being set.
DEATH_TEST_REGEX_F(DSV2StateFetchingInitializationAndStartingTestDeathTest,
StateFetchingStartingChangeStreamSegmentWithoutStartTimestamp,
"Tripwire assertion.*10657518") {
const Timestamp ts = Timestamp(23, 0);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingStartingChangeStreamSegment,
false /* validateStateTransition */);
// Intentionally do not set the segment start timestamp before entering the state to trigger the
// following tassert.
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(), AssertionException, 10657518);
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingStartingChangeStreamSegment, when the shard targeter
// returns kSwitchToV1.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingStartingChangeStreamSegmentSwitchToV1) {
const Timestamp ts = Timestamp(23, 0);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Timestamp(23, 0),
ShardTargeterDecision::kSwitchToV1,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingStartingChangeStreamSegment,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
ASSERT_EQ(V2Stage::State::kDowngrading, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingStartingChangeStreamSegment, when the shard targeter
// returns kContinue and no end timestamp for the segment.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingStartingChangeStreamSegmentContinueWithoutEndTimestamp) {
const Timestamp ts = Timestamp(23, 0);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Timestamp(23, 0),
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingStartingChangeStreamSegment,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingStartingChangeStreamSegment, when the shard targeter
// returns kContinue and an end timestamp for the segment.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingStartingChangeStreamSegmentContinueAndEndTimestamp) {
const Timestamp ts = Timestamp(23, 0);
const Timestamp segmentEndTimestamp = Timestamp(42, 1);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Timestamp(23, 0),
ShardTargeterDecision::kContinue,
segmentEndTimestamp,
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingStartingChangeStreamSegment,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_TRUE(docSource->getSegmentEndTimestamp_forTest().has_value());
ASSERT_EQ(segmentEndTimestamp, *docSource->getSegmentEndTimestamp_forTest());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
// Undo mode must have been turned on when entering the degraded fetching state.
ASSERT_TRUE(*getCursorManagerMock(params)->getUndoNextMode());
}
// Tests state machine for input state kFetchingStartingChangeStreamSegment, when we try to open a
// new cursor on a shard and this fails with 'ShardNotFound' exceptions.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingStartingChangeStreamSegmentOpenCursorFailsWithShardNotFound) {
const Timestamp ts = Timestamp(23, 0);
const Timestamp segmentEndTimestamp = Timestamp(42, 1);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
ChangeStreamShardTargeterMock::Response shardTargeterResponse(
Timestamp(23, 0),
ShardTargeterDecision::kContinue,
segmentEndTimestamp,
[](ChangeStreamShardTargeterMock::TimestampOrDocument tsOrDoc,
ChangeStreamReaderContext& readerContext) {
readerContext.openCursorsOnDataShards(std::get<Timestamp>(tsOrDoc),
stdx::unordered_set<ShardId>{{"shardA"}});
});
// Add the same response twice, as we will make the cursor manager throw an exception upon the
// first attempt, and the shard targeter will be asked again.
shardTargeterResponses.push_back(shardTargeterResponse);
shardTargeterResponses.push_back(shardTargeterResponse);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingStartingChangeStreamSegment,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
// Makes cursor manager throw a 'ShardNotFound' exception when trying to open a cursor.
getCursorManagerMock(params)->setThrowShardNotFoundExceptions(1);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
// State should not have changed!
ASSERT_EQ(V2Stage::State::kFetchingStartingChangeStreamSegment, docSource->getState_forTest());
ASSERT_FALSE(docSource->getSegmentEndTimestamp_forTest().has_value());
result = docSource->runGetNextStateMachine_forTest();
// boost::none is returned because of the state transition.
ASSERT_FALSE(result.has_value());
ASSERT_TRUE(docSource->getSegmentEndTimestamp_forTest().has_value());
ASSERT_EQ(segmentEndTimestamp, *docSource->getSegmentEndTimestamp_forTest());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
// Undo mode must have been turned on when entering the degraded fetching state.
ASSERT_TRUE(*getCursorManagerMock(params)->getUndoNextMode());
}
// Tests state machine for input state kFetchingStartingChangeStreamSegment, when we try to open a
// new cursor on a shard and this fails with 'ShardNotFound' exceptions repeatedly until the max
// number of consecutive failures is reached.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingStartingChangeStreamSegmentOpenCursorFailsWithShardNotFoundRepeatedly) {
const Timestamp ts = Timestamp(23, 0);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
// Insert as many responses as we need to nudge the stage into triggering the tassert for too
// many consecutive "ShardNotFound" errors.
for (int i = 0; i <= V2Stage::kMaxShardNotFoundFailuresInARow; ++i) {
shardTargeterResponses.emplace_back(
ts,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
[](ChangeStreamShardTargeterMock::TimestampOrDocument tsOrDoc,
ChangeStreamReaderContext& readerContext) {
readerContext.openCursorsOnDataShards(std::get<Timestamp>(tsOrDoc),
stdx::unordered_set<ShardId>{{"shardA"}});
});
}
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kFetchingStartingChangeStreamSegment,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
getCursorManagerMock(params)->setThrowShardNotFoundExceptions(
V2Stage::kMaxShardNotFoundFailuresInARow);
for (int i = 0; i < V2Stage::kMaxShardNotFoundFailuresInARow - 1; ++i) {
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingStartingChangeStreamSegment,
docSource->getState_forTest());
ASSERT_EQ(boost::optional<Timestamp>(), docSource->getSegmentEndTimestamp_forTest());
}
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::RetryChangeStream);
}
// Tests state machine for input state kFetching for a control event, when we try to open a new
// cursor on the config server and this fails with 'RetryChangeStream' exception.
TEST_F(DSV2StateFetchingInitializationAndStartingTest,
StateFetchingStartingChangeStreamSegmentOpenConfigServerCursorFailsWitRetryChangeStream) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
ChangeStreamShardTargeterMock::ReaderContextCallback shardTargeterCallback =
[=](ChangeStreamShardTargeterMock::TimestampOrDocument tsOrDocument,
ChangeStreamReaderContext& context) {
Timestamp openTs = std::get<Timestamp>(tsOrDocument);
context.openCursorOnConfigServer(openTs);
};
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Timestamp(23, 0),
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
shardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingStartingChangeStreamSegment,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(Timestamp(23, 0));
// Enable 'RetryChangeStream' exception, so opening the cursor on the config server will throw.
// This makes the stage fail.
getCursorManagerMock(params)->setThrowRetryChangeStreamExceptions(1);
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::RetryChangeStream);
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
// Calling 'getNext()' again should return the same error.
ASSERT_THROWS_CODE(docSource->getNext(), AssertionException, ErrorCodes::RetryChangeStream);
}
} // namespace
} // namespace mongo

View File

@ -0,0 +1,897 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/exec/agg/mock_stage.h"
#include "mongo/db/pipeline/change_stream_read_mode.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2_test_helpers.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include <memory>
#include <boost/optional/optional.hpp>
namespace mongo {
namespace {
using namespace test;
class DSV2StateFetchingNormalAndDegradedTest : public ChangeStreamStageTestNoSetup {};
using DSV2StateFetchingNormalAndDegradedTestDeathTest = DSV2StateFetchingNormalAndDegradedTest;
// Tests state machine for input state kFetchingNormalGettingChangeEvent for non-control events.
TEST_F(DSV2StateFetchingNormalAndDegradedTest,
StateFetchingNormalGettingChangeEventNonControlEvents) {
const Timestamp ts = Timestamp(23, 0);
const Timestamp segmentEndTimestamp = Timestamp(42, 1);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Timestamp(23, 0),
ShardTargeterDecision::kContinue,
segmentEndTimestamp,
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
// Test that the stage returns all inputs as they are.
const BSONObj doc1 = BSON("operationType" << "test1" << "foo" << "bar");
const BSONObj doc2 = BSON("operationType" << "test2" << "test" << "value");
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makePauseExecution(),
Document::fromBsonWithMetaData(doc1),
DocumentSource::GetNextResult::makePauseExecution(),
Document::fromBsonWithMetaData(doc2),
DocumentSource::GetNextResult::makeEOF(),
};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingNormalGettingChangeEvent,
false /* validateStateTransition */);
// Check return value 1 (pause).
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isPaused());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
// Check return value 2 (doc1).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isAdvanced());
ASSERT_BSONOBJ_EQ(doc1, result->getDocument().toBson());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
// Check return value 3 (pause).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isPaused());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
// Check return value 4 (doc2).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isAdvanced());
ASSERT_BSONOBJ_EQ(doc2, result->getDocument().toBson());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
// Check return value 5 (eof).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
}
// Tests state machine for input state kFetching for a control event.
TEST_F(DSV2StateFetchingNormalAndDegradedTest, StateFetchingNormalGettingChangeEventControlEvent) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kIgnoreRemovedShards));
const BSONObj event = BSON("operationType" << "test" << "foo" << "bar" << "_id"
<< buildHighWaterMarkToken(Timestamp(23, 1))
<< Document::metaFieldChangeStreamControlEvent << 1);
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(Document{event},
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makeAdvancedControlDocument(
Document::fromBsonWithMetaData(event))};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingNormalGettingChangeEvent,
false /* validateStateTransition */);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
}
// Tests state machine for input state kFetching for a control event, when we try to open a new
// cursor on a shard and this fails with 'ShardNotFound' exceptions. The state then transitions into
// degraded mode.
TEST_F(
DSV2StateFetchingNormalAndDegradedTest,
StateFetchingNormalGettingChangeEventControlEventOpenCursorFailsWithShardNotFoundTransitionToDegradedFetching) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kIgnoreRemovedShards));
const BSONObj event =
BSON("operationType" << "test" << "foo" << "bar" << "_id"
<< buildHighWaterMarkToken(Timestamp(23, 1)) << "$sortKey"
<< BSON_ARRAY(buildHighWaterMarkToken(Timestamp(23, 1)))
<< Document::metaFieldChangeStreamControlEvent << 1);
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(
Document{event},
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
[](ChangeStreamShardTargeterMock::TimestampOrDocument tsOrDoc,
ChangeStreamReaderContext& readerContext) {
readerContext.openCursorsOnDataShards(
V2Stage::extractTimestampFromDocument(std::get<Document>(tsOrDoc)),
stdx::unordered_set<ShardId>{{"shardA"}});
});
shardTargeterResponses.emplace_back(
Timestamp(23, 2), ShardTargeterDecision::kContinue, boost::none);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makeAdvancedControlDocument(
Document::fromBsonWithMetaData(event))};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingNormalGettingChangeEvent,
false /* validateStateTransition */);
// Before executing the stage, fake that there is already another data-shard cursor open.
getCursorManagerMock(params)->openCursorsOnDataShards(
getExpCtx(), getOpCtx(), Timestamp(23, 0), {ShardId{"shardB"}});
// Enable 'ShardNotFound' exceptions, so opening the next cursor will throw. This makes the
// stage go into degraded fetching mode.
getCursorManagerMock(params)->setThrowShardNotFoundExceptions(1);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(Timestamp(23, 2), *docSource->getSegmentEndTimestamp_forTest());
// Undo mode must have been turned on when entering the degraded fetching state.
ASSERT_TRUE(*getCursorManagerMock(params)->getUndoNextMode());
// Calling the state machine will start a new segment.
getCursorManagerMock(params)->setTimestampForCurrentHighWaterMark(Timestamp(23, 3));
result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingStartingChangeStreamSegment, docSource->getState_forTest());
ASSERT_EQ(Timestamp(23, 2), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>(), docSource->getSegmentEndTimestamp_forTest());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
const stdx::unordered_set<ShardId> expectedShardCursors = {ShardId("shardB")};
ASSERT_FALSE(params->cursorManager->isCursorOnConfigServerOpen());
ASSERT_EQ(expectedShardCursors, params->cursorManager->getCurrentlyTargetedDataShards());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(Timestamp(23, 2), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>(), docSource->getSegmentEndTimestamp_forTest());
}
// Tests state machine for input state kFetching for a control event, when we try to open a new
// cursor on a shard and this fails with 'ShardNotFound' exceptions. When there are no other data
// shard cursors open, the state transitions to starting a new segment.
TEST_F(
DSV2StateFetchingNormalAndDegradedTest,
StateFetchingNormalGettingChangeEventControlEventOpenCursorFailsWithShardNotFoundStartNewSegment) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kIgnoreRemovedShards));
const BSONObj event =
BSON("operationType" << "test" << "foo" << "bar" << "_id"
<< buildHighWaterMarkToken(Timestamp(23, 1)) << "$sortKey"
<< BSON_ARRAY(buildHighWaterMarkToken(Timestamp(23, 1)))
<< Document::metaFieldChangeStreamControlEvent << 1);
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(
Document{event},
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
[](ChangeStreamShardTargeterMock::TimestampOrDocument tsOrDoc,
ChangeStreamReaderContext& readerContext) {
readerContext.closeCursorOnConfigServer();
readerContext.openCursorsOnDataShards(
V2Stage::extractTimestampFromDocument(std::get<Document>(tsOrDoc)),
stdx::unordered_set<ShardId>{{"shardA"}});
});
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makeAdvancedControlDocument(
Document::fromBsonWithMetaData(event))};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
// Before executing the stage, fake that there is already a cursor open for the config server.
getCursorManagerMock(params)->openCursorOnConfigServer(
getExpCtx(), getOpCtx(), Timestamp(23, 0));
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingNormalGettingChangeEvent,
false /* validateStateTransition */);
// Enable 'ShardNotFound' exceptions, so opening the next cursor will throw. This makes the
// stage start a new segment.
getCursorManagerMock(params)->setThrowShardNotFoundExceptions(1);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
// We must have transitioned to starting a new segment.
ASSERT_EQ(V2Stage::State::kFetchingStartingChangeStreamSegment, docSource->getState_forTest());
ASSERT_EQ(Timestamp(23, 1), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>(), docSource->getSegmentEndTimestamp_forTest());
// The cursor on the config server should have been closed, the new data shard cursor should not
// have been opened.
ASSERT_FALSE(params->cursorManager->isCursorOnConfigServerOpen());
ASSERT_TRUE(params->cursorManager->getCurrentlyTargetedDataShards().empty());
}
// Tests state machine for input state kFetchingNormalGettingChangeEvent and the shard targeter
// returning 'kSwitchToV1'.
TEST_F(DSV2StateFetchingNormalAndDegradedTest,
StateFetchingNormalGettingChangeEventShardTargeterReturnsDowngrading) {
const Timestamp ts = Timestamp(23, 0);
const Timestamp segmentEndTimestamp = Timestamp(23, 99);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
const BSONObj event =
BSON("operationType" << "test1" << "foo" << "bar" << "clusterTime" << Timestamp(23, 2));
MutableDocument docBuilder(Document::fromBsonWithMetaData(event));
docBuilder.metadata().setChangeStreamControlEvent();
Document doc = docBuilder.freeze();
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(doc,
ShardTargeterDecision::kSwitchToV1,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makeAdvancedControlDocument(std::move(doc))};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingNormalGettingChangeEvent,
false /* validateStateTransition */);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kDowngrading, docSource->getState_forTest());
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::RetryChangeStream);
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
// Calling 'getNext()' again should return the same error.
ASSERT_THROWS_CODE(docSource->getNext(), AssertionException, ErrorCodes::RetryChangeStream);
}
// Tests state machine for input state kFetchingDegradedGettingChangeEvent for non-control events.
TEST_F(DSV2StateFetchingNormalAndDegradedTest,
StateFetchingDegradedGettingChangeEventNonControlEvents) {
// The change stream segments in this test are [ts(23, 0), ts(42, 1)) and [ts(42, 1), inf).
const Timestamp ts = Timestamp(23, 0);
const Timestamp segmentEndTimestamp = Timestamp(42, 1);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(segmentEndTimestamp,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
// Test that the stage returns all inputs as they are.
const BSONObj doc1 =
BSON("operationType" << "test1" << "foo" << "bar" << "_id"
<< buildHighWaterMarkToken(Timestamp(23, 1)) << "$sortKey"
<< BSON_ARRAY(buildHighWaterMarkToken(Timestamp(23, 1))));
const BSONObj doc2 =
BSON("operationType" << "test2" << "test" << "value" << "_id"
<< buildHighWaterMarkToken(Timestamp(42, 1)) << "$sortKey"
<< BSON_ARRAY(buildHighWaterMarkToken(Timestamp(42, 1))));
const BSONObj doc3 =
BSON("operationType" << "test3" << "test" << "value" << "_id"
<< buildHighWaterMarkToken(Timestamp(43, 1)) << "$sortKey"
<< BSON_ARRAY(buildHighWaterMarkToken(Timestamp(43, 1))));
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makePauseExecution(),
Document::fromBsonWithMetaData(doc1),
DocumentSource::GetNextResult::makePauseExecution(),
Document::fromBsonWithMetaData(doc2),
Document::fromBsonWithMetaData(doc3),
DocumentSource::GetNextResult::makeEOF(),
};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingDegradedGettingChangeEvent,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
docSource->setSegmentEndTimestamp_forTest(segmentEndTimestamp);
// Check return value 1 (pause).
getCursorManagerMock(params)->setTimestampForCurrentHighWaterMark(Timestamp(23, 1));
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isPaused());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(ts, *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(segmentEndTimestamp, *docSource->getSegmentEndTimestamp_forTest());
// Check return value 2 (doc1).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isAdvanced());
ASSERT_BSONOBJ_EQ(Document::fromBsonWithMetaData(doc1).toBson(),
result->getDocument().toBson());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(ts, *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(segmentEndTimestamp, *docSource->getSegmentEndTimestamp_forTest());
// Check return value 3 (pause).
getCursorManagerMock(params)->setTimestampForCurrentHighWaterMark(Timestamp(23, 2));
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isPaused());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(ts, *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(segmentEndTimestamp, *docSource->getSegmentEndTimestamp_forTest());
// Check return value 4 (doc2). This also transitions the state.
getCursorManagerMock(params)->setTimestampForCurrentHighWaterMark(Timestamp(42, 1));
result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingStartingChangeStreamSegment, docSource->getState_forTest());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isAdvanced());
ASSERT_BSONOBJ_EQ(Document::fromBsonWithMetaData(doc2).toBson(),
result->getDocument().toBson());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
// Segment start timestamp should change here.
ASSERT_EQ(Timestamp(42, 1), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>{}, docSource->getSegmentEndTimestamp_forTest());
// 'undoNextReady()' should have been called on the 'CursorManager' for this transition.
ASSERT_TRUE(getCursorManagerMock(params)->undoGetNextCalled());
ASSERT_EQ(Timestamp(42, 1), *getCursorManagerMock(params)->getRestoredHighWaterMark());
// Undo mode must have been turned off when exiting the degraded fetching state.
ASSERT_FALSE(*getCursorManagerMock(params)->getUndoNextMode());
// Check return value 5 (doc3).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_BSONOBJ_EQ(Document::fromBsonWithMetaData(doc3).toBson(),
result->getDocument().toBson());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(Timestamp(42, 1), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>{}, docSource->getSegmentEndTimestamp_forTest());
// Check return value 6 (eof).
getCursorManagerMock(params)->setTimestampForCurrentHighWaterMark(Timestamp(43, 2));
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(Timestamp(42, 1), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>{}, docSource->getSegmentEndTimestamp_forTest());
}
// Tests state machine for input state kFetchingDegradedGettingChangeEvent for Pause and EOF events.
TEST_F(DSV2StateFetchingNormalAndDegradedTest,
StateFetchingDegradedGettingChangeEventPauseAndEOFEvents) {
// The change stream segments in this test are [ts(23, 0), ts(23, 1)) and [ts(23, 1), inf).
const Timestamp ts = Timestamp(23, 0);
const Timestamp segmentEndTimestamp = Timestamp(23, 1);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(segmentEndTimestamp,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makePauseExecution(),
DocumentSource::GetNextResult::makeEOF(),
DocumentSource::GetNextResult::makePauseExecution(),
DocumentSource::GetNextResult::makeEOF(),
};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
// Must enable undo mode when entering the degraded fetching mode.
getCursorManagerMock(params)->enableUndoNextMode();
docSource->setState_forTest(V2Stage::State::kFetchingDegradedGettingChangeEvent,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
docSource->setSegmentEndTimestamp_forTest(segmentEndTimestamp);
getCursorManagerMock(params)->setTimestampForCurrentHighWaterMark(Timestamp(23, 0));
// Check return value 1 (pause).
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isPaused());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
// Undo mode must have been turned on while still in the degraded fetching state.
ASSERT_TRUE(*getCursorManagerMock(params)->getUndoNextMode());
ASSERT_EQ(ts, *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(segmentEndTimestamp, docSource->getSegmentEndTimestamp_forTest());
// 'undoNextReady()' should not have been called on the 'CursorManager' for pause events.
ASSERT_FALSE(getCursorManagerMock(params)->undoGetNextCalled());
// Check return value 2 (EOF).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
// Undo mode must have been turned on while still in the degraded fetching state.
ASSERT_TRUE(*getCursorManagerMock(params)->getUndoNextMode());
ASSERT_EQ(ts, *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(segmentEndTimestamp, docSource->getSegmentEndTimestamp_forTest());
// 'undoNextReady()' should not have been called on the 'CursorManager' for EOF events.
ASSERT_FALSE(getCursorManagerMock(params)->undoGetNextCalled());
// Move high water mark forward.
getCursorManagerMock(params)->setTimestampForCurrentHighWaterMark(Timestamp(23, 1));
// Consume pause event (it is not undone).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingStartingChangeStreamSegment, docSource->getState_forTest());
// Segment start timestamp should change here.
ASSERT_EQ(Timestamp(23, 1), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>(), docSource->getSegmentEndTimestamp_forTest());
// 'undoNextReady()' should not have been called.
ASSERT_FALSE(getCursorManagerMock(params)->undoGetNextCalled());
ASSERT_EQ(Timestamp(23, 1), getCursorManagerMock(params)->getRestoredHighWaterMark());
// Undo mode must have been turned off when exiting the degraded fetching state.
ASSERT_FALSE(*getCursorManagerMock(params)->getUndoNextMode());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(Timestamp(23, 1), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>{}, docSource->getSegmentEndTimestamp_forTest());
// Check return value 4 (EOF).
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(Timestamp(23, 1), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>{}, docSource->getSegmentEndTimestamp_forTest());
// No more results.
result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
}
// Tests state machine for input state kFetchingDegradedGettingChangeEvent for control events.
TEST_F(DSV2StateFetchingNormalAndDegradedTest,
StateFetchingDegradedGettingChangeEventControlEvent) {
// The change stream segments in this test are [ts(23, 0), ts(23, 99)) and [ts(23, 99), inf).
const Timestamp ts = Timestamp(23, 0);
const Timestamp segmentEndTimestamp = Timestamp(23, 99);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
const BSONObj event1 =
BSON("operationType" << "test1" << "foo" << "bar" << "_id"
<< buildHighWaterMarkToken(Timestamp(23, 2)) << "$sortKey"
<< BSON_ARRAY(buildHighWaterMarkToken(Timestamp(23, 2))));
const BSONObj event2 =
BSON("operationType" << "test2" << "foo" << "bar" << "_id"
<< buildHighWaterMarkToken(Timestamp(24, 0)) << "$sortKey"
<< BSON_ARRAY(buildHighWaterMarkToken(Timestamp(24, 0))));
MutableDocument docBuilder(Document::fromBsonWithMetaData(event1));
docBuilder.metadata().setChangeStreamControlEvent();
Document doc1 = docBuilder.freeze();
docBuilder.reset(Document::fromBsonWithMetaData(event2));
docBuilder.metadata().setChangeStreamControlEvent();
Document doc2 = docBuilder.freeze();
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(doc1,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
shardTargeterResponses.emplace_back(Timestamp(23, 99),
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
shardTargeterResponses.emplace_back(doc2,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makeAdvancedControlDocument(std::move(doc1)),
DocumentSource::GetNextResult::makeAdvancedControlDocument(std::move(doc2)),
DocumentSource::GetNextResult::makeEOF()};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingDegradedGettingChangeEvent,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
docSource->setSegmentEndTimestamp_forTest(segmentEndTimestamp);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingDegradedGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(ts, *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(segmentEndTimestamp, *docSource->getSegmentEndTimestamp_forTest());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingStartingChangeStreamSegment, docSource->getState_forTest());
ASSERT_EQ(Timestamp(23, 99), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>{}, docSource->getSegmentEndTimestamp_forTest());
// 'undoNextReady()' should have been called on the 'CursorManager' for this transition.
ASSERT_TRUE(getCursorManagerMock(params)->undoGetNextCalled());
ASSERT_EQ(Timestamp(23, 99), *getCursorManagerMock(params)->getRestoredHighWaterMark());
// Undo mode must have been turned off when exiting the degraded fetching state.
ASSERT_FALSE(*getCursorManagerMock(params)->getUndoNextMode());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
ASSERT_EQ(Timestamp(23, 99), *docSource->getSegmentStartTimestamp_forTest());
ASSERT_EQ(boost::optional<Timestamp>{}, docSource->getSegmentEndTimestamp_forTest());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
ASSERT_FALSE(result.has_value());
result = docSource->runGetNextStateMachine_forTest();
ASSERT_EQ(V2Stage::State::kFetchingNormalGettingChangeEvent, docSource->getState_forTest());
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
}
// Tests state machine for input state kFetchingDegradedGettingChangeEvent and the shard targeter
// returning 'kSwitchToV1', which it shouldn't.
DEATH_TEST_REGEX_F(DSV2StateFetchingNormalAndDegradedTestDeathTest,
StateFetchingDegradedGettingChangeEventShardTargeterReturnsDowngrading,
"Tripwire assertion.*10922904") {
const Timestamp ts = Timestamp(23, 0);
const Timestamp segmentEndTimestamp = Timestamp(23, 99);
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(ts, ChangeStreamReadMode::kIgnoreRemovedShards));
const BSONObj event =
BSON("operationType" << "test1" << "foo" << "bar" << "_id"
<< buildHighWaterMarkToken(Timestamp(23, 2)) << "$sortKey"
<< BSON_ARRAY(buildHighWaterMarkToken(Timestamp(23, 2))));
MutableDocument docBuilder(Document::fromBsonWithMetaData(event));
docBuilder.metadata().setChangeStreamControlEvent();
Document doc = docBuilder.freeze();
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(doc,
ShardTargeterDecision::kSwitchToV1,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
std::deque<DocumentSource::GetNextResult> inputDocs = {
DocumentSource::GetNextResult::makeAdvancedControlDocument(std::move(doc))};
auto source = MockWithUndoStage::createForTest(inputDocs, getExpCtx());
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get(),
source);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
exec::agg::MockStage::setSource_forTest(docSource, source.get());
docSource->setState_forTest(V2Stage::State::kFetchingDegradedGettingChangeEvent,
false /* validateStateTransition */);
docSource->setSegmentStartTimestamp_forTest(ts);
docSource->setSegmentEndTimestamp_forTest(segmentEndTimestamp);
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(), AssertionException, 10922904);
}
// Tests state machine for input state kDowngrading. The change stream is expected to fail with an
// error in this case.
TEST_F(DSV2StateFetchingNormalAndDegradedTest, StateDowngrading) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kDowngrading, false /* validateStateTransition */);
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::RetryChangeStream);
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
// Calling 'getNext()' again should return the same error.
ASSERT_THROWS_CODE(docSource->getNext(), AssertionException, ErrorCodes::RetryChangeStream);
}
} // namespace
} // namespace mongo

View File

@ -0,0 +1,588 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/timestamp.h"
#include "mongo/db/pipeline/change_stream_read_mode.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2_test_helpers.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/duration.h"
#include <memory>
#include <boost/optional/optional.hpp>
namespace mongo {
namespace {
using namespace test;
class DSV2StateUninitializedAndWaitingTest : public ChangeStreamStageTestNoSetup {};
// State tests.
// ------------
// Tests state machine for input state kUninitialized, for a cluster time for which there is no
// data-to-shards allocation information present. The change stream is expected to fail in this
// case.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateUninitializedAllocationNotAvailable) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
// Prepare DataToShardsAllocationQueryServiceMock.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
mockResponses.push_back(
std::make_pair(Timestamp(23, 0), AllocationToShardsStatus::kNotAvailable));
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kUninitialized,
false /* validateStateTransition */);
// Last request time should have no value initially.
ASSERT_EQ(Date_t().toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
ASSERT_FALSE(getCursorManagerMock(params)->isInitialized());
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::RetryChangeStream);
// Cursor manager should not have been initialized.
ASSERT_FALSE(getCursorManagerMock(params)->isInitialized());
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
// Last request time should have been updated due to the query-to-shards-allocation request.
ASSERT_EQ(now.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
}
// Tests state machine for input state kUninitialized, for a cluster time for which there is
// data-to-shards allocation information present. The state machine is supposed to go into state
// kFetchingInitialization.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateUninitializedAllocationOk) {
const Timestamp ts = Timestamp(23, 0);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
// Prepare DataToShardsAllocationQueryServiceMock.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
// Timestamp of ResumeToken is in the past. We simulate that no data-to-shards allocation is
// available anymore.
mockResponses.push_back(std::make_pair(ts, AllocationToShardsStatus::kOk));
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kUninitialized,
false /* validateStateTransition */);
// Last request time should have no value initially.
ASSERT_EQ(Date_t().toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
ASSERT_FALSE(getCursorManagerMock(params)->isInitialized());
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kFetchingInitialization, docSource->getState_forTest());
// Cursor manager should have been initialized.
ASSERT_TRUE(getCursorManagerMock(params)->isInitialized());
ASSERT_BSONOBJ_EQ(ResumeToken::makeHighWaterMarkToken(ts, 1 /* version */).toBSON(),
getCursorManagerMock(params)->getResumeToken().toBSON());
// Last request time should have been updated due to the query-to-shards-allocation request.
ASSERT_EQ(now.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
}
// Tests state machine for input state kUninitialized, for a cluster time which is in the future.
// The state machine is supposed to go into state kWaiting.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateUninitializedAllocationFutureClusterTime) {
const Timestamp ts = Timestamp(42, 23);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params =
buildParametersForTest(getExpCtx(),
V2StageTestHelpers::kDefaultMinAllocationToShardsPollPeriodSecs,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
// Prepare DataToShardsAllocationQueryServiceMock.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
// Timestamp of ResumeToken is in the past. We simulate that no data-to-shards allocation is
// available anymore.
mockResponses.push_back(std::make_pair(ts, AllocationToShardsStatus::kFutureClusterTime));
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kUninitialized,
false /* validateStateTransition */);
// Last request time should have no value initially.
ASSERT_EQ(Date_t().toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
ASSERT_FALSE(getCursorManagerMock(params)->isInitialized());
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
// Cursor manager should have been initialized.
ASSERT_TRUE(getCursorManagerMock(params)->isInitialized());
ASSERT_BSONOBJ_EQ(ResumeToken::makeHighWaterMarkToken(ts, 1 /* version */).toBSON(),
getCursorManagerMock(params)->getResumeToken().toBSON());
// Last request time should have been updated due to the query-to-shards-allocation request.
ASSERT_EQ(now.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
}
// Tests state machine for input state kWaiting, when the data-to-shards allocation query service
// returns that no placement information is available.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingNoPlacementInfoAvailable) {
const Timestamp ts = Timestamp(23, 0);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(ts,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params = buildParametersForTest(
getExpCtx(), 10, changeStreamReaderBuilder.get(), dataToShardsAllocationQueryService.get());
// Prepare DataToShardsAllocationQueryServiceMock.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
mockResponses.push_back(std::make_pair(ts, AllocationToShardsStatus::kNotAvailable));
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
// Last request time should have no value initially.
ASSERT_EQ(Date_t().toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
// Set deadline to a few milliseconds in the future.
Date_t deadline = now + Milliseconds(5);
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::RetryChangeStream);
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
// Calling 'getNext()' again must return the same pre-recorded error:
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::RetryChangeStream);
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
}
// Tests state machine for input state kWaiting, when the data-to-shards allocation query service
// returns that the cluster time is still in the future.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingFutureClusterTime) {
const Timestamp ts = Timestamp(42, 0);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(ts,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params = buildParametersForTest(getExpCtx(),
10 /* minAllocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
// Prepare DataToShardsAllocationQueryServiceMock.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
mockResponses.push_back(std::make_pair(ts, AllocationToShardsStatus::kFutureClusterTime));
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
// Last request time should have no value initially.
ASSERT_EQ(Date_t().toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
// Set deadline to a few milliseconds in the future.
Date_t deadline = now + Milliseconds(5);
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
// State should remain in kWaiting, as no progress has been made.
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
}
// Tests state machine for input state kWaiting, when the data-to-shards allocation query service
// returns that the allocation is available.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingTransitioningToFetching) {
const Timestamp ts = Timestamp(23, 0);
getExpCtx()->setChangeStreamSpec(buildChangeStreamSpec(ts, ChangeStreamReadMode::kStrict));
// Prepare ShardTargeterMock responses.
std::vector<ChangeStreamShardTargeterMock::Response> shardTargeterResponses;
shardTargeterResponses.emplace_back(ts,
ShardTargeterDecision::kContinue,
boost::optional<Timestamp>{},
V2StageTestHelpers::kEmptyShardTargeterCallback);
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[=](OperationContext* opCtx, const ChangeStream& changeStream) {
auto shardTargeter = std::make_unique<ChangeStreamShardTargeterMock>();
shardTargeter->bufferResponses(shardTargeterResponses);
return shardTargeter;
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params = buildParametersForTest(getExpCtx(),
10 /* allocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
// Prepare DataToShardsAllocationQueryServiceMock.
std::vector<DataToShardsAllocationQueryServiceMock::Response> mockResponses;
mockResponses.push_back(std::make_pair(ts, AllocationToShardsStatus::kOk));
getDataToShardsAllocationQueryServiceMock(params)->bufferResponses(mockResponses);
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
// Last request time should have no value initially.
ASSERT_EQ(Date_t().toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
// Set deadline to a few milliseconds in the future.
Date_t deadline = now + Milliseconds(5);
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
// State should have transitioned to kFetchingInitialization.
ASSERT_EQ(V2Stage::State::kFetchingInitialization, docSource->getState_forTest());
}
// Tests state machine for input state kWaiting, when the deadline for the next data-to-shards
// allocation query is earlier than the deadline on the OperationContext.
TEST_F(DSV2StateUninitializedAndWaitingTest,
StateWaitingBehaviorPollBeforeOperationContextDeadline) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params = buildParametersForTest(getExpCtx(),
10 /* minAllocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
Date_t lastAllocationToShardsRequestTime = now - Seconds(2);
docSource->setLastAllocationToShardsRequestTime_forTest(lastAllocationToShardsRequestTime);
ASSERT_EQ(lastAllocationToShardsRequestTime.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
// Set deadline to further away than the next poll date/time.
Date_t deadline = now + Seconds(params->minAllocationToShardsPollPeriodSecs);
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
// The waiter should have been called with a deadline that is equal to now plus the poll period.
ASSERT_EQ(
(lastAllocationToShardsRequestTime + Seconds(params->minAllocationToShardsPollPeriodSecs))
.toMillisSinceEpoch(),
getDeadlineWaiterMock(params)->getLastUsedDeadline().toMillisSinceEpoch());
// Last request time shouldn't have been modified.
ASSERT_EQ(lastAllocationToShardsRequestTime.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
}
// Tests state machine for input state kWaiting, when the deadline for the next data-to-shards
// allocation query is later than the deadline on the OperationContext.
TEST_F(DSV2StateUninitializedAndWaitingTest,
StateWaitingBehaviorPollAfterOperationContextDeadline) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params = buildParametersForTest(getExpCtx(),
10 /* minAllocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
Date_t lastAllocationToShardsRequestTime = now - Seconds(2);
docSource->setLastAllocationToShardsRequestTime_forTest(lastAllocationToShardsRequestTime);
ASSERT_EQ(lastAllocationToShardsRequestTime.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
// Set deadline to earlier than the next poll date/time.
Date_t deadline = now + Seconds(5);
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_FALSE(result.has_value());
// The waiter should have been called with a deadline that is equal to the next poll time.
ASSERT_EQ(
(lastAllocationToShardsRequestTime + Seconds(params->minAllocationToShardsPollPeriodSecs))
.toMillisSinceEpoch(),
getDeadlineWaiterMock(params)->getLastUsedDeadline().toMillisSinceEpoch());
// Last request time shouldn't have been modified.
ASSERT_EQ(lastAllocationToShardsRequestTime.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
}
// Tests state machine for input state kWaiting, when we wait on the OperationContext and the wait
// function returns a timeout error status.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingBehaviorWhenWaitReturnsTimeoutError) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params = buildParametersForTest(getExpCtx(),
10 /* minAllocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
Date_t lastAllocationToShardsRequestTime = now;
docSource->setLastAllocationToShardsRequestTime_forTest(lastAllocationToShardsRequestTime);
ASSERT_EQ(lastAllocationToShardsRequestTime.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
// Set arbitrary deadline.
Date_t deadline = now + Seconds(5);
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
// Make waiting fail with a non-OK status.
getDeadlineWaiterMock(params)->setStatus(
Status(ErrorCodes::ExceededTimeLimit, "timelimit exceeded!"));
auto result = docSource->runGetNextStateMachine_forTest();
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result->isEOF());
// The waiter should have been called with a deadline that is equal to the next poll time.
ASSERT_EQ((now + Seconds(params->minAllocationToShardsPollPeriodSecs)).toMillisSinceEpoch(),
getDeadlineWaiterMock(params)->getLastUsedDeadline().toMillisSinceEpoch());
// Last request time shouldn't have been modified.
ASSERT_EQ(lastAllocationToShardsRequestTime.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
// State should not have changed.
ASSERT_EQ(V2Stage::State::kWaiting, docSource->getState_forTest());
}
// Tests state machine for input state kWaiting, when we wait on the OperationContext and the wait
// function returns a non-timeout error status.
TEST_F(DSV2StateUninitializedAndWaitingTest, StateWaitingBehaviorWhenWaitReturnsNonTimeoutError) {
getExpCtx()->setChangeStreamSpec(
buildChangeStreamSpec(Timestamp(23, 0), ChangeStreamReadMode::kStrict));
auto changeStreamReaderBuilder = std::make_shared<ChangeStreamReaderBuilderMock>(
[](OperationContext* opCtx, const ChangeStream& changeStream) {
return std::make_unique<ChangeStreamShardTargeterMock>();
});
auto dataToShardsAllocationQueryService =
std::make_unique<DataToShardsAllocationQueryServiceMock>();
auto params = buildParametersForTest(getExpCtx(),
10 /* minAllocationToShardsPollPeriodSecs */,
changeStreamReaderBuilder.get(),
dataToShardsAllocationQueryService.get());
auto docSource = make_intrusive<V2Stage>(getExpCtx(), params);
docSource->setState_forTest(V2Stage::State::kWaiting, false /* validateStateTransition */);
Date_t now = Date_t::now();
getPreciseClockSource(getOpCtx()->getServiceContext())->reset(now);
Date_t lastAllocationToShardsRequestTime = now;
docSource->setLastAllocationToShardsRequestTime_forTest(lastAllocationToShardsRequestTime);
ASSERT_EQ(lastAllocationToShardsRequestTime.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
// Set arbitrary deadline.
Date_t deadline = now + Seconds(5);
getOpCtx()->setDeadlineByDate(deadline, ErrorCodes::ExceededTimeLimit);
// Make waiting fail with a non-OK status.
getDeadlineWaiterMock(params)->setStatus(Status(ErrorCodes::ShutdownInProgress, "shutdown!"));
ASSERT_THROWS_CODE(docSource->runGetNextStateMachine_forTest(),
AssertionException,
ErrorCodes::ShutdownInProgress);
// The waiter should have been called with a deadline that is equal to the next poll time.
ASSERT_EQ((now + Seconds(params->minAllocationToShardsPollPeriodSecs)).toMillisSinceEpoch(),
getDeadlineWaiterMock(params)->getLastUsedDeadline().toMillisSinceEpoch());
// Last request time shouldn't have been modified.
ASSERT_EQ(lastAllocationToShardsRequestTime.toMillisSinceEpoch(),
docSource->getLastAllocationToShardsRequestTime_forTest().toMillisSinceEpoch());
// State should have transitioned to kFinal, as waiting threw an exception.
ASSERT_EQ(V2Stage::State::kFinal, docSource->getState_forTest());
}
} // namespace
} // namespace mongo

View File

@ -0,0 +1,293 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2_test_helpers.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/pipeline/change_stream_helpers.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2.h"
namespace mongo {
namespace test {
const ChangeStreamShardTargeterMock::ReaderContextCallback
V2StageTestHelpers::kEmptyShardTargeterCallback =
[](ChangeStreamShardTargeterMock::TimestampOrDocument, ChangeStreamReaderContext&) {
};
// MockWithUndoStage
MockWithUndoStage::MockWithUndoStage(const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::deque<exec::agg::GetNextResult> results)
: Stage("DocumentSourceWithUndoMock"_sd, expCtx), _queue(std::move(results)) {}
boost::intrusive_ptr<MockWithUndoStage> MockWithUndoStage::createForTest(
std::deque<exec::agg::GetNextResult> results,
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return make_intrusive<MockWithUndoStage>(expCtx, std::move(results));
}
void MockWithUndoStage::undo() {
invariant(_undoResult);
_queue.push_front(std::move(*_undoResult));
_undoResult.reset();
}
exec::agg::GetNextResult MockWithUndoStage::doGetNext() {
exec::agg::GetNextResult next = exec::agg::GetNextResult::makeEOF();
if (!_queue.empty()) {
next = std::move(_queue.front());
_queue.pop_front();
}
_undoResult = next;
return next;
}
// CursorManagerMock
CursorManagerMock::CursorManagerMock(
const ChangeStream& changeStream,
ChangeStreamReaderBuilder* readerBuilder,
boost::optional<boost::intrusive_ptr<MockWithUndoStage>> stageForUndo)
: _changeStream(changeStream), _readerBuilder(readerBuilder), _stageForUndo(stageForUndo) {}
void CursorManagerMock::initialize(const boost::intrusive_ptr<ExpressionContext>& expCtx,
V2Stage* stage,
const ResumeTokenData& resumeTokenData) {
_resumeToken.emplace(ResumeToken(resumeTokenData));
}
void CursorManagerMock::openCursorsOnDataShards(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
OperationContext* opCtx,
Timestamp atClusterTime,
const stdx::unordered_set<ShardId>& shardIds) {
throwShardNotFoundExceptionIfRequired();
std::for_each(shardIds.begin(), shardIds.end(), [&](const ShardId& shardId) {
_currentlyTargetedDataShards.insert(shardId);
});
}
void CursorManagerMock::openCursorOnConfigServer(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
OperationContext* opCtx,
Timestamp atClusterTime) {
throwShardNotFoundExceptionIfRequired();
throwRetryChangeStreamExceptionIfRequired();
tassert(12013807,
"expecting no prior call to openCursorOnConfigServer()",
!_cursorOpenedOnConfigServer);
_cursorOpenedOnConfigServer = true;
}
void CursorManagerMock::closeCursorsOnDataShards(const stdx::unordered_set<ShardId>& shardIds) {
std::for_each(shardIds.begin(), shardIds.end(), [&](const ShardId& shardId) {
_currentlyTargetedDataShards.erase(shardId);
});
}
void CursorManagerMock::closeCursorOnConfigServer(OperationContext* opCtx) {
tassert(12013808,
"expecting prior call to openCursorOnConfigServer()",
_cursorOpenedOnConfigServer);
_cursorOpenedOnConfigServer = false;
}
bool CursorManagerMock::isCursorOnConfigServerOpen() const {
return _cursorOpenedOnConfigServer;
}
const stdx::unordered_set<ShardId>& CursorManagerMock::getCurrentlyTargetedDataShards() const {
return _currentlyTargetedDataShards;
}
const ChangeStream& CursorManagerMock::getChangeStream() const {
return _changeStream;
}
void CursorManagerMock::enableUndoNextMode() {
_undoNextMode.emplace(true);
}
void CursorManagerMock::disableUndoNextMode() {
_undoNextMode.emplace(false);
}
void CursorManagerMock::undoGetNext() {
_undoGetNextCalled = true;
invariant(_stageForUndo);
(*_stageForUndo)->undo();
}
void CursorManagerMock::setHighWaterMark(Timestamp highWaterMark) {
_restoredHighWaterMark.emplace(highWaterMark);
}
Timestamp CursorManagerMock::getTimestampFromCurrentHighWaterMark() const {
tassert(10657540,
"expecting high watermark timestamp to be set in test",
_highWaterMarkTimestamp.has_value());
return *_highWaterMarkTimestamp;
}
bool CursorManagerMock::undoGetNextCalled() const {
return _undoGetNextCalled;
}
boost::optional<bool> CursorManagerMock::getUndoNextMode() const {
return _undoNextMode;
}
boost::optional<Timestamp> CursorManagerMock::getRestoredHighWaterMark() const {
return _restoredHighWaterMark;
}
void CursorManagerMock::setTimestampForCurrentHighWaterMark(Timestamp ts) {
_highWaterMarkTimestamp = ts;
}
bool CursorManagerMock::isInitialized() const {
return _resumeToken.has_value();
}
ResumeToken CursorManagerMock::getResumeToken() const {
return *_resumeToken;
}
bool CursorManagerMock::cursorOpenedOnConfigServer() const {
return _cursorOpenedOnConfigServer;
}
void CursorManagerMock::setThrowShardNotFoundExceptions(int value) {
_throwShardNotFoundExceptions = value;
}
void CursorManagerMock::setThrowRetryChangeStreamExceptions(int value) {
_throwRetryChangeStreamExceptions = value;
}
void CursorManagerMock::throwShardNotFoundExceptionIfRequired() {
if (_throwShardNotFoundExceptions > 0) {
_throwShardNotFoundExceptions--;
error_details::throwExceptionForStatus(
Status(ErrorCodes::ShardNotFound, "shard not found"));
}
}
void CursorManagerMock::throwRetryChangeStreamExceptionIfRequired() {
if (_throwRetryChangeStreamExceptions > 0) {
_throwRetryChangeStreamExceptions--;
error_details::throwExceptionForStatus(
Status(ErrorCodes::RetryChangeStream, "please retry change stream"));
}
}
// DeadlineWaiterMock
void DeadlineWaiterMock::waitUntil(OperationContext* opCtx, Date_t deadline) {
_lastUsedDeadline = deadline;
if (!_status.isOK()) {
error_details::throwExceptionForStatus(_status);
}
}
Date_t DeadlineWaiterMock::getLastUsedDeadline() const {
return _lastUsedDeadline;
}
void DeadlineWaiterMock::setStatus(Status status) {
_status = status;
}
// Helper functions used in the tests.
// -----------------------------------
BSONObj buildHighWaterMarkToken(Timestamp ts) {
return ResumeToken::makeHighWaterMarkToken(ts, 1 /* version */).toDocument().toBson();
}
DocumentSourceChangeStreamSpec buildChangeStreamSpec(Timestamp ts, ChangeStreamReadMode mode) {
DocumentSourceChangeStreamSpec spec;
spec.setIgnoreRemovedShards(mode == ChangeStreamReadMode::kIgnoreRemovedShards);
spec.setResumeAfter(ResumeToken::makeHighWaterMarkToken(ts, 1 /* version */));
return spec;
}
BSONObj getStageSpec() {
return BSON(DocumentSourceChangeStreamHandleTopologyChangeV2::kStageName << BSONObj());
}
std::shared_ptr<V2Stage::Parameters> buildParametersForTest(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
int minAllocationToShardsPollPeriodSecs,
ChangeStreamReaderBuilder* changeStreamReaderBuilder,
DataToShardsAllocationQueryService* dataToShardsAllocationQueryService,
boost::optional<boost::intrusive_ptr<MockWithUndoStage>> stageForUndo) {
ChangeStream changeStream = ChangeStream::buildFromExpressionContext(expCtx);
return std::make_shared<V2Stage::Parameters>(
changeStream,
change_stream::resolveResumeTokenFromSpec(expCtx, *expCtx->getChangeStreamSpec()),
minAllocationToShardsPollPeriodSecs,
std::make_unique<DeadlineWaiterMock>(),
std::make_unique<CursorManagerMock>(changeStream, changeStreamReaderBuilder, stageForUndo),
changeStreamReaderBuilder,
dataToShardsAllocationQueryService);
}
DataToShardsAllocationQueryServiceMock* getDataToShardsAllocationQueryServiceMock(
std::shared_ptr<V2Stage::Parameters>& params) {
return static_cast<DataToShardsAllocationQueryServiceMock*>(
params->dataToShardsAllocationQueryService);
}
ChangeStreamReaderBuilderMock* getChangeStreamReaderBuilderMock(
std::shared_ptr<V2Stage::Parameters>& params) {
return static_cast<ChangeStreamReaderBuilderMock*>(params->changeStreamReaderBuilder);
}
ChangeStreamShardTargeterMock* getChangeStreamShardTargeterMock(
std::shared_ptr<V2Stage::Parameters>& params) {
return static_cast<ChangeStreamShardTargeterMock*>(
getChangeStreamReaderBuilderMock(params)->getShardTargeter());
}
CursorManagerMock* getCursorManagerMock(std::shared_ptr<V2Stage::Parameters>& params) {
return static_cast<CursorManagerMock*>(params->cursorManager.get());
}
DeadlineWaiterMock* getDeadlineWaiterMock(std::shared_ptr<V2Stage::Parameters>& params) {
return static_cast<DeadlineWaiterMock*>(params->deadlineWaiter.get());
}
ClockSourceMock* getPreciseClockSource(ServiceContext* serviceContext) {
return dynamic_cast<ClockSourceMock*>(serviceContext->getPreciseClockSource());
}
} // namespace test
} // namespace mongo

View File

@ -0,0 +1,229 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/exec/agg/change_stream_handle_topology_change_v2_stage.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/change_stream.h"
#include "mongo/db/pipeline/change_stream_reader_builder_mock.h"
#include "mongo/db/pipeline/change_stream_shard_targeter_mock.h"
#include "mongo/db/pipeline/change_stream_stage_test_fixture.h"
#include "mongo/db/pipeline/data_to_shards_allocation_query_service_mock.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2.h"
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/util/clock_source_mock.h"
#include <deque>
#include <utility>
#include <boost/optional/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace mongo {
namespace test {
using V2Stage = exec::agg::ChangeStreamHandleTopologyChangeV2Stage;
struct V2StageTestHelpers {
static const ChangeStreamShardTargeterMock::ReaderContextCallback kEmptyShardTargeterCallback;
static constexpr int kDefaultMinAllocationToShardsPollPeriodSecs = 1;
};
// Simple mock aggregation stage that supports an "undo" operation. Needed because the
// 'DocumentSourceMock' agg stage does not support undoing of already returned results.
class MockWithUndoStage : public exec::agg::Stage {
public:
MockWithUndoStage(const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::deque<exec::agg::GetNextResult> results);
static boost::intrusive_ptr<MockWithUndoStage> createForTest(
std::deque<exec::agg::GetNextResult> results,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
void undo();
protected:
exec::agg::GetNextResult doGetNext() override;
private:
std::deque<exec::agg::GetNextResult> _queue;
boost::optional<exec::agg::GetNextResult> _undoResult;
};
class CursorManagerMock : public V2Stage::CursorManager {
public:
CursorManagerMock(
const ChangeStream& changeStream,
ChangeStreamReaderBuilder* readerBuilder,
boost::optional<boost::intrusive_ptr<MockWithUndoStage>> stageForUndo = boost::none);
void initialize(const boost::intrusive_ptr<ExpressionContext>& expCtx,
V2Stage* stage,
const ResumeTokenData& resumeTokenData) override;
void openCursorsOnDataShards(const boost::intrusive_ptr<ExpressionContext>& expCtx,
OperationContext* opCtx,
Timestamp atClusterTime,
const stdx::unordered_set<ShardId>& shardIds) override;
void openCursorOnConfigServer(const boost::intrusive_ptr<ExpressionContext>& expCtx,
OperationContext* opCtx,
Timestamp atClusterTime) override;
void closeCursorsOnDataShards(const stdx::unordered_set<ShardId>& shardIds) override;
void closeCursorOnConfigServer(OperationContext* opCtx) override;
bool isCursorOnConfigServerOpen() const override;
const stdx::unordered_set<ShardId>& getCurrentlyTargetedDataShards() const override;
const ChangeStream& getChangeStream() const override;
void enableUndoNextMode() override;
void disableUndoNextMode() override;
void undoGetNext() override;
void setHighWaterMark(Timestamp highWaterMark) override;
Timestamp getTimestampFromCurrentHighWaterMark() const override;
bool undoGetNextCalled() const;
boost::optional<bool> getUndoNextMode() const;
boost::optional<Timestamp> getRestoredHighWaterMark() const;
void setTimestampForCurrentHighWaterMark(Timestamp ts);
bool isInitialized() const;
ResumeToken getResumeToken() const;
bool cursorOpenedOnConfigServer() const;
void setThrowShardNotFoundExceptions(int value);
void setThrowRetryChangeStreamExceptions(int value);
private:
void throwShardNotFoundExceptionIfRequired();
void throwRetryChangeStreamExceptionIfRequired();
const ChangeStream _changeStream;
ChangeStreamReaderBuilder* _readerBuilder;
stdx::unordered_set<ShardId> _currentlyTargetedDataShards;
// If set, this many attempts to open a cursor will throw a 'ShardNotFound' exception.
int _throwShardNotFoundExceptions = 0;
// If set, this many attempts to open a config server cursor will throw a 'RetryChangeStream'
// exception.
int _throwRetryChangeStreamExceptions = 0;
// Will be set to true if a request was made to open a cursor on the config server.
bool _cursorOpenedOnConfigServer = false;
// If set, any attempt to open a cursor will throw a 'ShardNotFound' exception.
bool _throwShardNotFoundException = false;
// Will be set to true if 'undoGetNext()' was called.
bool _undoGetNextCalled = false;
// Resume token used when initializing the 'CursorManager'.
boost::optional<ResumeToken> _resumeToken;
boost::optional<Timestamp> _highWaterMarkTimestamp;
// Calls to enable/disable undo mode will be recorded here.
boost::optional<bool> _undoNextMode;
// The timestamp used in a call to 'setHighWaterMark()' will be recorded here after overfetching
// in degraded mode.
boost::optional<Timestamp> _restoredHighWaterMark;
// The aggregation stage that is used as input for the v2 stage. Necessary here so we can
// perform an "undo" operation it if necessary.
boost::optional<boost::intrusive_ptr<MockWithUndoStage>> _stageForUndo;
};
class DeadlineWaiterMock : public V2Stage::DeadlineWaiter {
public:
void waitUntil(OperationContext* opCtx, Date_t deadline) override;
Date_t getLastUsedDeadline() const;
void setStatus(Status status);
private:
Status _status = Status::OK();
Date_t _lastUsedDeadline;
};
// Helper functions used in the tests.
// -----------------------------------
BSONObj buildHighWaterMarkToken(Timestamp ts);
DocumentSourceChangeStreamSpec buildChangeStreamSpec(Timestamp ts, ChangeStreamReadMode mode);
BSONObj getStageSpec();
std::shared_ptr<V2Stage::Parameters> buildParametersForTest(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
int minAllocationToShardsPollPeriodSecs,
ChangeStreamReaderBuilder* changeStreamReaderBuilder,
DataToShardsAllocationQueryService* dataToShardsAllocationQueryService,
boost::optional<boost::intrusive_ptr<MockWithUndoStage>> stageForUndo = boost::none);
DataToShardsAllocationQueryServiceMock* getDataToShardsAllocationQueryServiceMock(
std::shared_ptr<V2Stage::Parameters>& params);
ChangeStreamReaderBuilderMock* getChangeStreamReaderBuilderMock(
std::shared_ptr<V2Stage::Parameters>& params);
ChangeStreamShardTargeterMock* getChangeStreamShardTargeterMock(
std::shared_ptr<V2Stage::Parameters>& params);
CursorManagerMock* getCursorManagerMock(std::shared_ptr<V2Stage::Parameters>& params);
DeadlineWaiterMock* getDeadlineWaiterMock(std::shared_ptr<V2Stage::Parameters>& params);
ClockSourceMock* getPreciseClockSource(ServiceContext* serviceContext);
} // namespace test
} // namespace mongo