SERVER-116464 Ensure change stream does not emit change events when isTimeseries flag is set and rawData is absent (#46205)

GitOrigin-RevId: 6bbcab97cce7270d3554948f49084199d1561044
This commit is contained in:
Denis Grebennicov 2026-01-20 12:39:57 +01:00 committed by MongoDB Bot
parent 5f30e00dba
commit 58eb569074
19 changed files with 1221 additions and 469 deletions

View File

@ -15,6 +15,9 @@ selector:
# TODO: SERVER-114511 re-enable this test.
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
# TODO: SERVER-117391 Ensure change_streams/timeseries.js test passes when running change streams in v2 mode.
- jstests/change_streams/timeseries.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the

View File

@ -14,6 +14,9 @@ selector:
# TODO: SERVER-114511 re-enable this test.
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
# TODO: SERVER-117391 Ensure change_streams/timeseries.js test passes when running change streams in v2 mode.
- jstests/change_streams/timeseries.js
exclude_with_any_tags:
# These tests would fail with "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged

View File

@ -13,6 +13,9 @@ selector:
# TODO: SERVER-114511 re-enable this test.
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
# TODO: SERVER-117391 Ensure change_streams/timeseries.js test passes when running change streams in v2 mode.
- jstests/change_streams/timeseries.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the

View File

@ -9379,6 +9379,37 @@ export const authCommandsLib = {
},
],
},
{
testname: "aggregate_$changeStream_rawData",
command: {
aggregate: "foo",
pipeline: [{$changeStream: {}}],
cursor: {},
rawData: true,
},
skipTest: (conn) => {
const isStandalone =
!FixtureHelpers.isReplSet(conn.getDB(adminDbName)) &&
!FixtureHelpers.isMongos(conn.getDB(adminDbName));
return isStandalone || !isFeatureEnabled(conn, "featureFlagRawDataCrudOperations");
},
testcases: [
{
runOnDb: firstDbName,
privileges: [
{
resource: {db: firstDbName, collection: ""},
actions: ["performRawDataOperations", "changeStream", "find"],
},
],
},
{
runOnDb: firstDbName,
privileges: [],
expectAuthzFailure: true,
},
],
},
],
/************* SHARED TEST LOGIC ****************/

View File

@ -2,17 +2,16 @@
* Test that change streams returns DDL operation on views.
*
* @tags: [
* requires_fcv_60,
* # TODO SERVER-111733 re-enable this test in viewless timeseries suites
* featureFlagCreateViewlessTimeseriesCollections_incompatible,
* ]
*/
import {assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
import {assertCreateCollection, assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
import {
assertChangeStreamEventEq,
canonicalizeEventForTesting,
ChangeStreamTest,
} from "jstests/libs/query/change_stream_util.js";
import {isRawOperationSupported} from "jstests/libs/raw_operation_utils.js";
import {describe, afterEach, it} from "jstests/libs/mochalite.js";
const testDB = db.getSiblingDB(jsTestName());
@ -20,7 +19,7 @@ const dbName = testDB.getName();
const viewPipeline = [{$match: {a: 2}}, {$project: {a: 1}}];
function runViewEventAndResumeTest(showSystemEvents) {
jsTest.log("runViewEventAndResumeTest(showSystemEvents=" + showSystemEvents + ")");
jsTest.log.info("runViewEventAndResumeTest(showSystemEvents=" + showSystemEvents + ")");
// Drop all the namespaces accessed in the test.
assertDropCollection(testDB, "base");
@ -113,6 +112,38 @@ function runViewEventAndResumeTest(showSystemEvents) {
assertChangeStreamEventEq(dropEventView, {operationType: "drop", ns: {db: dbName, coll: "viewOnView"}});
// Generate a dummy event so that we can test all events for resumability.
assert.commandWorked(testDB.createView("dummyView", "view", viewPipeline));
assert.soon(() => cursor.hasNext());
const dummyEvent = cursor.next();
events.push(dummyEvent);
assertDropCollection(testDB, "dummyView");
assert.soon(() => cursor.hasNext());
// Test that for all the commands we can resume the change stream using a resume token.
for (let idx = 0; idx < events.length - 1; idx++) {
const event = events[idx];
const subsequent = events[idx + 1];
const newCursor = testDB.aggregate([
{
$changeStream: {
resumeAfter: event._id,
showExpandedEvents: true,
showSystemEvents: showSystemEvents,
},
},
]);
assert.soon(() => newCursor.hasNext());
assertChangeStreamEventEq(newCursor.next(), subsequent);
}
}
function runViewEventForTsCollectionTest(showSystemEvents) {
const events = [];
let cursor = testDB.aggregate([{$changeStream: {showExpandedEvents: true, showSystemEvents: showSystemEvents}}]);
// Test view change stream events on a timeseries collection.
assert.commandWorked(testDB.createCollection("timeseries_coll", {timeseries: {timeField: "time"}}));
@ -226,132 +257,149 @@ function runViewEventAndResumeTest(showSystemEvents) {
ns: {db: dbName, coll: "system.views"},
});
}
// Generate a dummy event so that we can test all events for resumability.
assert.commandWorked(testDB.createView("dummyView", "view", viewPipeline));
assert.soon(() => cursor.hasNext());
const dummyEvent = cursor.next();
events.push(dummyEvent);
assertDropCollection(testDB, "dummyView");
assert.soon(() => cursor.hasNext());
// Test that for all the commands we can resume the change stream using a resume token.
for (let idx = 0; idx < events.length - 1; idx++) {
const event = events[idx];
const subsequent = events[idx + 1];
const newCursor = testDB.aggregate([
{
$changeStream: {
resumeAfter: event._id,
showExpandedEvents: true,
showSystemEvents: showSystemEvents,
},
},
]);
assert.soon(() => newCursor.hasNext());
assertChangeStreamEventEq(newCursor.next(), subsequent);
}
}
runViewEventAndResumeTest(false /* showSystemEvents */);
runViewEventAndResumeTest(true /* showSystemEvents */);
describe("$changeStream", function () {
describe("can emit view DDL events", function () {
it("runViewEventAndResumeTest without showSystemEvents", function () {
runViewEventAndResumeTest(false /* showSystemEvents */);
});
const cst = new ChangeStreamTest(testDB);
it("runViewEventAndResumeTest with showSystemEvents", function () {
runViewEventAndResumeTest(true /* showSystemEvents */);
});
});
// Cannot start a change stream on a view namespace.
assert.commandWorked(testDB.createView("view", "base", viewPipeline));
assert.soon(() => {
try {
cst.startWatchingChanges({
describe("can emit view DDL events for timeseries collections", function () {
if (isRawOperationSupported(db)) {
jsTest.log.info(
"If raw operations are supported, skipping running tests as timeseries collection will not be implemented via view",
);
return;
}
it("runViewEventAndResumeTestForTsCollection without showSystemEvents", function () {
runViewEventForTsCollectionTest(false /* showSystemEvents */);
});
it("runViewEventAndResumeTestForTsCollection with showSystemEvents", function () {
runViewEventForTsCollectionTest(true /* showSystemEvents */);
});
});
const cst = new ChangeStreamTest(testDB);
afterEach(function () {
assertDropCollection(testDB, "view");
assertDropCollection(testDB, "base");
cst.cleanUp();
});
it("can not be opened on views", function () {
// Cannot start a change stream on a view namespace.
assert.commandWorked(testDB.createView("view", "base", viewPipeline));
assert.soon(() => {
try {
cst.startWatchingChanges({
pipeline: [{$changeStream: {showExpandedEvents: true}}],
collection: "view",
doNotModifyInPassthroughs: true,
});
} catch (e) {
assert.commandFailedWithCode(e, ErrorCodes.CommandNotSupportedOnView);
return true;
}
return false;
});
});
it("creating and dropping a view with the same name as an existing collection does not emit view events", function () {
let cursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {showExpandedEvents: true}}],
collection: "view",
doNotModifyInPassthroughs: true,
});
} catch (e) {
assert.commandFailedWithCode(e, ErrorCodes.CommandNotSupportedOnView);
return true;
}
return false;
});
// Creating a collection level change stream before creating a view with the same name, does not
// produce any view related events.
assertDropCollection(testDB, "view");
// Create a view, then drop it and create a normal collection with the same name.
assert.commandWorked(testDB.createView("view", "base", viewPipeline));
assertDropCollection(testDB, "view");
assert.commandWorked(testDB.createCollection("view"));
let cursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {showExpandedEvents: true}}],
collection: "view",
doNotModifyInPassthroughs: true,
});
// Create a view, then drop it and create a normal collection with the same name.
assert.commandWorked(testDB.createView("view", "base", viewPipeline));
assertDropCollection(testDB, "view");
assert.commandWorked(testDB.createCollection("view"));
// Confirm that the stream only sees the normal collection creation, not the view events.
let event = cst.getNextChanges(cursor, 1)[0];
assert(event.collectionUUID, event);
assertChangeStreamEventEq(event, {
operationType: "create",
ns: {db: dbName, coll: "view"},
operationDescription: {idIndex: {v: 2, key: {_id: 1}, name: "_id_"}},
nsType: "collection",
});
// Change stream on a single collection does not produce view events.
assertDropCollection(testDB, "view");
cursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {showExpandedEvents: true}}],
collection: "base",
doNotModifyInPassthroughs: true,
});
// Verify that the view related operations are ignored, and only the event for insert on the base
// collection is returned.
assert.commandWorked(testDB.createView("view", "base", viewPipeline));
assertDropCollection(testDB, "view");
assert.commandWorked(testDB["base"].insert({_id: 0}));
event = cst.getNextChanges(cursor, 1)[0];
assert(event.collectionUUID, event);
assertChangeStreamEventEq(event, {
operationType: "insert",
ns: {db: dbName, coll: "base"},
fullDocument: {_id: 0},
documentKey: {_id: 0},
});
// Verify that collection names such as 'system_views' do not trigger fake view creation events.
["system_views", "systemxviews", "systemviews"].forEach((collName) => {
assertDropCollection(testDB, collName);
assert.commandWorked(testDB.createCollection(collName));
// Note: excluding "createIndexes" and "shardCollection" events is necessary here because some
// passthroughs add index creation events to the change stream.
cursor = cst.startWatchingChanges({
pipeline: [
{$changeStream: {showExpandedEvents: true}},
{$match: {operationType: {$nin: ["shardCollection", "createIndexes"]}}},
],
collection: 1,
doNotModifyInPassthroughs: true,
// Confirm that the stream only sees the normal collection creation, not the view events.
const event = cst.getNextChanges(cursor, 1)[0];
assert(event.collectionUUID, event);
assertChangeStreamEventEq(event, {
operationType: "create",
ns: {db: dbName, coll: "view"},
operationDescription: {idIndex: {v: 2, key: {_id: 1}, name: "_id_"}},
nsType: "collection",
});
});
// Insert a document into a collection with a name similar to 'system.views'.
testDB[collName].insert({_id: "test"});
it("view change events are not emitted for change streams opened on the underlying collection", function () {
assertCreateCollection(testDB, "base");
// Verify that we only see the insert and no view creation event.
event = cst.getNextChanges(cursor, 1)[0];
assertChangeStreamEventEq(event, {
operationType: "insert",
ns: {db: dbName, coll: collName},
fullDocument: {_id: "test"},
documentKey: {_id: "test"},
// Change stream on a single collection does not produce view events.
const cursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {showExpandedEvents: true}}],
collection: "base",
doNotModifyInPassthroughs: true,
});
// Create a view on the base collection, then drop it and implicitly create a collection by inserting a document.
assert.commandWorked(testDB.createView("view", "base", viewPipeline));
assertDropCollection(testDB, "view");
assert.commandWorked(testDB["base"].insert({_id: 0}));
// Verify that the view related operations are ignored, and only the event for insert on the base
// collection is returned.
cst.assertNextChangesEqual({
cursor,
expectedChanges: [
{
operationType: "insert",
ns: {db: dbName, coll: "base"},
fullDocument: {_id: 0},
documentKey: {_id: 0},
},
],
});
});
cst.assertNoChange(cursor);
// Verify that collection names such as 'system_views' do not trigger fake view creation events.
["system_views", "systemxviews", "systemviews"].forEach((collName) => {
it("creating collection with name " + collName + " does not emit view events", function () {
assertCreateCollection(testDB, collName);
assertDropCollection(testDB, collName);
// Note: excluding "createIndexes" and "shardCollection" events is necessary here because some
// passthroughs add index creation events to the change stream.
const cursor = cst.startWatchingChanges({
pipeline: [
{$changeStream: {showExpandedEvents: true}},
{$match: {operationType: {$nin: ["shardCollection", "createIndexes"]}}},
],
collection: 1,
doNotModifyInPassthroughs: true,
});
// Insert a document into a collection with a name similar to 'system.views'.
testDB[collName].insert({_id: "test"});
// Verify that we only see the insert and no view creation event.
cst.assertNextChangesEqual({
cursor,
expectedChanges: [
{
operationType: "insert",
ns: {db: dbName, coll: collName},
fullDocument: {_id: "test"},
documentKey: {_id: "test"},
},
],
});
cst.assertNoChange(cursor);
assertDropCollection(testDB, collName);
});
});
});

File diff suppressed because it is too large Load Diff

View File

@ -1,50 +0,0 @@
// Tests that $changeStream aggregations against time-series collections fail cleanly.
// @tags: [
// requires_timeseries,
// requires_replication,
// # TODO SERVER-111733 remove this tag once change streams properly interact with viewless timeseries
// featureFlagCreateViewlessTimeseriesCollections_incompatible,
// ]
import {getRawOperationSpec, getTimeseriesCollForRawOps} from "jstests/libs/raw_operation_utils.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();
const timeFieldName = "time";
const metaFieldName = "tags";
const testDB = rst.getPrimary().getDB(jsTestName());
assert.commandWorked(testDB.dropDatabase());
const tsColl = testDB.getCollection("ts_point_data");
tsColl.drop();
assert.commandWorked(
testDB.createCollection(tsColl.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}),
);
const nMeasurements = 10;
for (let i = 0; i < nMeasurements; i++) {
const docToInsert = {
time: ISODate(),
tags: i.toString(),
value: i + nMeasurements,
};
assert.commandWorked(tsColl.insert(docToInsert));
}
// Test that a changeStream cannot be opened on a time-series collection because it's a view, both
// with and without rawData.
assert.throwsWithCode(
() => getTimeseriesCollForRawOps(testDB, tsColl).aggregate([{$changeStream: {}}], getRawOperationSpec(testDB)),
ErrorCodes.CommandNotSupportedOnView,
);
assert.commandFailedWithCode(
testDB.runCommand({aggregate: tsColl.getName(), pipeline: [{$changeStream: {}}], cursor: {}}),
ErrorCodes.CommandNotSupportedOnView,
);
rst.stopSet();

View File

@ -8,6 +8,8 @@
*
* @tags: [
* uses_change_streams,
* # Change streams must be opened with rawData: true flag to watch the underlying buckets collection, which was introduced in 8.3.
* requires_fcv_83,
* ]
*/
@ -106,7 +108,7 @@ assert.commandWorked(
//
// Note: the change stream is on the database because watching the change stream events on the
// raw buckets is not allowed.
const mongosDbChangeStream = db.watch([], {showSystemEvents: true});
const mongosDbChangeStream = db.watch([], {showSystemEvents: true, rawData: true});
const shard0DB = st.shard0.getDB(dbName);
const shard0Coll = shard0DB.getCollection(collName);

View File

@ -49,6 +49,8 @@
#include "mongo/db/version_context.h"
#include "mongo/db/views/view_catalog_helpers.h"
#include <fmt/format.h>
namespace mongo {
namespace {
@ -276,22 +278,36 @@ public:
void validate() const override {
AggCatalogState::validate();
// Raise an error if original nss is a view. We do not need to check this if we are opening
// a stream on an entire db or across the cluster.
// Raise an error if original nss is a view or timeseries collection. We do not need to
// check this if we are opening a stream on an entire db or across the cluster.
if (!_aggExState.getOriginalNss().isCollectionlessAggregateNS()) {
auto view = _catalog->lookupView(_aggExState.getOpCtx(), _aggExState.getOriginalNss());
uassert(ErrorCodes::CommandNotSupportedOnView,
str::stream() << "Cannot run aggregation on timeseries with namespace "
<< _aggExState.getOriginalNss().toStringForErrorMsg(),
!view || !view->timeseries());
// Only allow change streams over timeseries collections when 'rawData' set to true.
// This will result in change streams emitting events in the internal bucket format.
if (!isRawDataOperation(_aggExState.getOpCtx())) {
uassert(ErrorCodes::CommandNotSupported,
fmt::format("Cannot run aggregation on timeseries with namespace {}",
_aggExState.getOriginalNss().toStringForErrorMsg()),
!isTimeseries());
}
uassert(ErrorCodes::CommandNotSupportedOnView,
str::stream() << "Namespace "
<< _aggExState.getOriginalNss().toStringForErrorMsg()
<< " is a view, not a collection",
!view);
!_catalog->lookupView(_aggExState.getOpCtx(), _aggExState.getOriginalNss()));
}
}
/**
* Returns true if the collection is a timeseries collection (backed by view or viewless).
*/
bool isTimeseries() const override {
if (auto collPtr = _catalog->lookupCollectionByNamespace(_aggExState.getOpCtx(),
_aggExState.getOriginalNss())) {
return collPtr->isTimeseriesCollection();
}
return false;
}
std::pair<std::unique_ptr<CollatorInterface>, ExpressionContextCollationMatchesDefault>
resolveCollator() const override {
// If the user specified an explicit collation, adopt it; otherwise, use the simple

View File

@ -269,8 +269,9 @@ protected:
/**
* Create an AggExState instance that one might see for change stream query.
*/
std::unique_ptr<AggExState> createOplogAggExState(StringData coll) {
std::unique_ptr<AggExState> createOplogAggExState(StringData coll, bool rawData = false) {
auto opCtx = operationContext();
isRawDataOperation(opCtx) = rawData;
// We will wait indefinitely in this unit test for the read concern to be set unless we set
// it explicitly here.
@ -899,6 +900,77 @@ TEST_F(AggregationExecutionStateTest,
aggExState->createAggCatalogState(), DBException, ErrorCodes::CommandNotSupportedOnView);
}
TEST_F(AggregationExecutionStateTest,
Given_OplogAggCatalogStateWithViewlessTimeseriesColl_Then_IsTimeseries) {
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagCreateViewlessTimeseriesCollections", true);
StringData timeseriesColl{"timeseries"};
createTimeseriesCollection(
timeseriesColl, false /*sharded*/, false /*requiresExtendedRangeSupport*/);
auto aggExState = createOplogAggExState(timeseriesColl, true /*rawData*/);
auto state = aggExState->createAggCatalogState();
ASSERT_TRUE(state->isTimeseries());
}
TEST_F(
AggregationExecutionStateTest,
Given_OplogAggCatalogStateWithViewTimeseriesColl_When_CallingValidate_Then_ExceptionIsThrown) {
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagCreateViewlessTimeseriesCollections", false);
StringData timeseriesColl{"timeseries"};
createTimeseriesCollection(
timeseriesColl, false /*sharded*/, false /*requiresExtendedRangeSupport*/);
{
auto aggExState = createOplogAggExState(timeseriesColl, false /*rawData*/);
// Do not allow opening a change stream on a timeseries collection.
ASSERT_THROWS_CODE(aggExState->createAggCatalogState(),
DBException,
ErrorCodes::CommandNotSupportedOnView);
}
{
auto aggExState = createOplogAggExState(timeseriesColl, true /*rawData*/);
// Do not allow opening a change stream on a timeseries collection.
ASSERT_THROWS_CODE(aggExState->createAggCatalogState(),
DBException,
ErrorCodes::CommandNotSupportedOnView);
}
}
TEST_F(
AggregationExecutionStateTest,
Given_OplogAggCatalogStateWithViewlessTimeseriesCollAndNoRawData_When_CallingValidate_Then_ExceptionIsThrown) {
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagCreateViewlessTimeseriesCollections", true);
StringData timeseriesColl{"timeseries"};
createTimeseriesCollection(
timeseriesColl, false /*sharded*/, false /*requiresExtendedRangeSupport*/);
auto aggExState = createOplogAggExState(timeseriesColl, false /*rawData*/);
// Do not allow opening a change stream on a timeseries collection.
ASSERT_THROWS_CODE(
aggExState->createAggCatalogState(), DBException, ErrorCodes::CommandNotSupported);
}
TEST_F(
AggregationExecutionStateTest,
Given_OplogAggCatalogStateWithViewlessTimeseriesCollAndRawData_When_CallingValidate_Then_NoExceptionIsThrown) {
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagCreateViewlessTimeseriesCollections", true);
StringData timeseriesColl{"timeseries"};
createTimeseriesCollection(
timeseriesColl, false /*sharded*/, false /*requiresExtendedRangeSupport*/);
auto aggExState = createOplogAggExState(timeseriesColl, true /*rawData*/);
// Do allow opening a change stream on a timeseries collection if rawData is set to true.
ASSERT_DOES_NOT_THROW(aggExState->createAggCatalogState());
}
} // namespace
} // namespace mongo

View File

@ -132,16 +132,36 @@ CollectionType determineCollectionType(const Document& data, const DatabaseName&
tassert(8814203,
"'viewOn' should either be missing or a non-empty string",
viewOn.missing() || viewOn.getType() == BSONType::string);
if (viewOn.missing()) {
return CollectionType::kCollection;
}
StringData viewOnNss = viewOn.getStringData();
tassert(8814204, "'viewOn' should be a non-empty string", !viewOnNss.empty());
if (NamespaceString nss = NamespaceStringUtil::deserialize(dbName, viewOnNss);
nss.isTimeseriesBucketsCollection()) {
const bool isTimeseriesCollection = [&]() {
if (viewOn.missing()) {
const bool hasTimeseriesAttribute = !data.getField("timeseries"_sd).missing();
// For backwards compatibility do not classify buckets collections as timeseries.
const bool isBucketsCollection = [&]() {
auto createField = data.getField("create"_sd);
if (createField.missing()) {
return false;
}
return NamespaceStringUtil::deserialize(dbName, createField.getStringData())
.isTimeseriesBucketsCollection();
}();
return hasTimeseriesAttribute && !isBucketsCollection;
}
StringData viewOnNss = viewOn.getStringData();
tassert(8814204, "'viewOn' should be a non-empty string", !viewOnNss.empty());
return NamespaceStringUtil::deserialize(dbName, viewOnNss).isTimeseriesBucketsCollection();
}();
if (isTimeseriesCollection) {
return CollectionType::kTimeseries;
} else if (viewOn.missing()) {
return CollectionType::kCollection;
} else {
return CollectionType::kView;
}
return CollectionType::kView;
}
Document copyDocExceptFields(const Document& source, std::initializer_list<StringData> fieldNames) {
@ -453,12 +473,13 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
Document opDesc = copyDocExceptFields(oField, {"create"_sd});
operationDescription = Value(opDesc);
// Populate 'nsType' field with collection type (always "collection" here).
// Populate 'nsType' field with collection type.
auto collectionType = determineCollectionType(oField, nss.dbName());
tassert(8814201,
"'operationDescription.type' should always resolve to 'collection' for "
"collection create events",
collectionType == CollectionType::kCollection);
"'nsType' field should always resolve to 'collection' or 'timeseries' for "
"collection creation event",
collectionType == CollectionType::kCollection ||
collectionType == CollectionType::kTimeseries);
nsType = toString(collectionType);
} else if (auto nssField = oField.getField("createIndexes"_sd); !nssField.missing()) {
operationType = DocumentSourceChangeStream::kCreateIndexesOpType;

View File

@ -266,6 +266,31 @@ TEST(ChangeStreamEventTransformTest, TestCreateTimeseriesTransform) {
expectedDoc);
}
TEST(
ChangeStreamEventTransformTest,
Given_CreateTimeseriesCollectionEvent_When_ApplyingTransformation_Then_CollectionTypeIsTimeseries) {
const NamespaceString nss =
NamespaceString::createNamespaceString_forTest(boost::none, "testDB.coll.name");
auto serviceContext = std::make_unique<QueryTestServiceContext>();
auto opCtx = serviceContext->makeOperationContext();
auto oplogEntry =
makeOplogEntry(repl::OpTypeEnum::kCommand, // op type
nss, // namespace
BSON("create" << nss.coll() << "timeseries"
<< BSON("timeField" << "time"
<< "metaField"
<< "meta"
<< "granularity"
<< "seconds"
<< "bucketMaxSpanSeconds" << 3600)), // o
testUuid(), // uuid
boost::none, // fromMigrate
boost::none); // o2
Document resultDoc = applyTransformation(oplogEntry, nss);
ASSERT_EQ(resultDoc[DocumentSourceChangeStream::kNsTypeField].getStringData(), "timeseries"_sd);
}
TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) {
const NamespaceString systemViewNss = NamespaceString::makeSystemDotViewsNamespace(
DatabaseName::createDatabaseName_forTest(boost::none, "viewDB"));

View File

@ -45,6 +45,7 @@
#include "mongo/db/query/compiler/parsers/matcher/expression_parser.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/shard_role/shard_catalog/raw_data_operation.h"
#include <set>
#include <string>
@ -76,6 +77,15 @@ std::unique_ptr<MatchExpression> buildTsFilter(
backingBsonObjs.emplace_back(BSON("ts" << GTE << startFromInclusive)), expCtx);
}
std::unique_ptr<MatchExpression> buildNotViewlessTimeSeriesFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MatchExpression* userMatch,
std::vector<BSONObj>& backingBsonObjs) {
auto isTimeseriesFilter = BSON(repl::OplogEntry::kIsTimeseriesFieldName << BSON("$ne" << true));
return MatchExpressionParser::parseAndNormalize(
backingBsonObjs.emplace_back(isTimeseriesFilter), expCtx);
}
std::unique_ptr<MatchExpression> buildFromMigrateSystemOpFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MatchExpression* userMatch,
@ -301,6 +311,14 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
// "o.applyOps" stores the list of operations, so it must be an array.
applyOpsBuilder.append("o.applyOps", BSON("$type" << "array"));
// If the change stream is opened without the 'rawData' flag, we must filter out oplog entries
// with 'isTimeseries' set to true.
if (!isRawDataOperation(expCtx->getOperationContext())) {
auto isTimeseriesFieldPath =
fmt::format("o.applyOps.{}", repl::OplogEntry::kIsTimeseriesFieldName);
applyOpsBuilder.append(isTimeseriesFieldPath, BSON("$ne" << true));
}
BSONObj nsMatch = DocumentSourceChangeStream::getNsMatchObjForChangeStream(expCtx);
{

View File

@ -52,6 +52,16 @@ std::unique_ptr<MatchExpression> buildTsFilter(
const MatchExpression* userMatch,
std::vector<BSONObj>& backingBsonObjs);
/**
* Produce a filter that excludes time-series oplog entries when rawData flag is not set. These
* unsupported operations are marked in the oplog with the "isTimeseries" field. Also populates the
* 'backingBsonObjs' vector to store BSONObjs referenced in the returned MatchExpression.
*/
std::unique_ptr<MatchExpression> buildNotViewlessTimeSeriesFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MatchExpression* userMatch,
std::vector<BSONObj>& backingBsonObjs);
/**
* Produce a filter that rejects any operations marked with the "fromMigrate" flag. These operations
* occur as part of chunk migration and should not be visible to user change streams, because they

View File

@ -40,6 +40,7 @@
#include "mongo/db/pipeline/optimization/optimize.h"
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/query/compiler/rewrites/matcher/expression_optimizer.h"
#include "mongo/db/shard_role/shard_catalog/raw_data_operation.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/util/assert_util.h"
@ -92,6 +93,12 @@ std::unique_ptr<MatchExpression> buildOplogMatchFilter(
oplogFilter->add(buildNotFromMigrateFilter(expCtx, userMatch, backingBsonObjs));
}
// If the change stream is opened without the 'rawData' flag, we must filter out events
// with 'isTimeseries' set to true.
if (!isRawDataOperation(expCtx->getOperationContext())) {
oplogFilter->add(buildNotViewlessTimeSeriesFilter(expCtx, userMatch, backingBsonObjs));
}
// Create an $or filter which only captures relevant events in the oplog.
auto eventFilter = std::make_unique<OrMatchExpression>();
eventFilter->add(buildOperationFilter(expCtx, userMatch, backingBsonObjs));

View File

@ -951,6 +951,13 @@ TEST_F(ChangeStreamStageTest, BuildTransactionFilterForV1ChangeStream) {
"$eq": "c"
}
},
{
"o.applyOps.isTimeseries": {
"$not": {
"$eq": true
}
}
},
{
"o.partialTxn": {
"$not": {
@ -1094,6 +1101,13 @@ TEST_F(ChangeStreamStageTest, BuildTransactionFilterForV2ChangeStream) {
"$eq": "c"
}
},
{
"o.applyOps.isTimeseries": {
"$not": {
"$eq": true
}
}
},
{
"o.partialTxn": {
"$not": {

View File

@ -40,6 +40,7 @@
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/query/compiler/parsers/matcher/expression_parser.h"
#include "mongo/db/query/compiler/rewrites/matcher/expression_optimizer.h"
#include "mongo/db/shard_role/shard_catalog/raw_data_operation.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
@ -96,6 +97,14 @@ std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
unwindFilter->add(buildNotFromMigrateFilter(expCtx, userMatch, bsonObj));
}
// If the change stream is opened without the 'rawData' flag, we must filter out events with
// 'isTimeseries' set to true.
// While currently transactions cannot contain time-series writes, this filter is included for
// future-proofing, in case that changes.
if (!isRawDataOperation(expCtx->getOperationContext())) {
unwindFilter->add(buildNotViewlessTimeSeriesFilter(expCtx, userMatch, bsonObj));
}
// Attempt to rewrite the user's filter and combine it with the standard operation filter. We do
// this separately because we need to exclude certain fields from the user's filters. Unwound
// transaction events do not have these fields until we populate them from the commitTransaction

View File

@ -37,6 +37,7 @@
#include "mongo/db/query/client_cursor/cursor_response.h"
#include "mongo/db/query/client_cursor/kill_cursors_gen.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@ -301,6 +302,12 @@ AggregateCommandRequest ReshardingChangeStreamsMonitor::makeAggregateCommandRequ
aggRequest.setComment(
mongo::IDLAnyTypeOwned(BSON("" << makeAggregateComment(_reshardingUUID)).firstElement()));
// TODO: SERVER-107180 always set rawData once 9.0 becomes last LTS.
if (gFeatureFlagAllBinariesSupportRawDataOperations.isEnabled(
kNoVersionContext, serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
aggRequest.setRawData(true);
}
return aggRequest;
}

View File

@ -1225,6 +1225,10 @@ TEST_F(ReshardingChangeStreamsMonitorTest, TestChangeStreamMonitorSettingsForDon
ASSERT_EQ(donorRequest.getNamespace(), sourceNss.makeTimeseriesBucketsNamespace());
// Ensure that 'rawData' is passed for change stream aggregate request over timeseries
// collection.
ASSERT_TRUE(donorRequest.getRawData());
ASSERT_TRUE(!donorRequest.getPipeline().empty());
BSONObj donorFirstStage = donorRequest.getPipeline().front();