SERVER-111406 Fix change stream results for top-level $v fields (#42273)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
GitOrigin-RevId: 4398df22b0422da5739f4537fc1b95c03b6a4289
This commit is contained in:
Jan 2025-10-15 16:22:00 +02:00 committed by MongoDB Bot
parent 4be6f775aa
commit 7d97af9679
4 changed files with 263 additions and 11 deletions

View File

@ -474,6 +474,8 @@ last-continuous:
ticket: SERVER-95570
- test_file: jstests/aggregation/expressions/reduce_overflow.js
ticket: SERVER-102364
- test_file: jstests/change_streams/events_containing_version_literals.js
ticket: SERVER-111406
suites: null
last-lts:
all:
@ -1005,4 +1007,6 @@ last-lts:
ticket: SERVER-95570
- test_file: jstests/aggregation/expressions/reduce_overflow.js
ticket: SERVER-102364
- test_file: jstests/change_streams/events_containing_version_literals.js
ticket: SERVER-111406
suites: null

View File

@ -0,0 +1,173 @@
/**
* Tests that change stream events containg a '$v' field work as expected.
* @tags: [
* uses_change_streams,
* ]
*/
import {assertDropAndRecreateCollection} from "jstests/libs/collection_drop_recreate.js";
import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js";
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
describe("change streams correctly return documents containing $v attributes", () => {
const kCollName = jsTestName();
let cst;
let cursor;
beforeEach(() => {
assertDropAndRecreateCollection(db, kCollName);
cst = new ChangeStreamTest(db);
cursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {}}],
collection: db[kCollName],
});
// Insert 5 documents that will be used in the following update tests.
[1, 2, "1", "2", "test"].forEach((v, i) => {
assert.commandWorked(db[kCollName].insert({_id: i, $v: v}));
let expected = {
documentKey: {_id: i},
fullDocument: {_id: i, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "insert",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
afterEach(() => {
cst.cleanUp();
});
// Test update operations using $v field name, and check that they are either
// We update in different order here than update, so that we don't cause no-op updates.
it("tests that updates using $v inside an object literal fail", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(db[kCollName].update({_id: i}, {$v: v}),
ErrorCodes.FailedToParse);
});
});
it("tests that updates using $v inside an object literal succeed with upsert when the source documents do not exist",
() => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(
db[kCollName].update({_id: i + 10}, {$v: v}, {upsert: true}),
ErrorCodes.FailedToParse,
);
});
});
it("tests that updates using $v inside $set and an object literal fail", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(
db[kCollName].update({_id: i}, {$set: {$v: v}}),
ErrorCodes.DollarPrefixedFieldName,
);
});
});
it("tests that updates using $v inside $set and an object literal succeed with upsert when the source documents do not exist",
() => {
// Target documents do not yet exist, so the upsert creates them using inserts.
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandWorked(
db[kCollName].update({_id: i + 10}, {$set: {$v: v}}, {upsert: true}));
let expected = {
documentKey: {_id: i + 10},
fullDocument: {_id: i + 10, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "insert",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
it("tests that pipeline updates using $v inside $replaceWith and $literal succeed", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandWorked(
db[kCollName].update({_id: i}, [{$replaceWith: {$literal: {_id: i, $v: v}}}]));
let expected = {
documentKey: {_id: i},
fullDocument: {_id: i, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "replace",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
it("tests that pipeline updates using $v inside using $replaceWith and an object literal fail",
() => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(
db[kCollName].update({_id: i}, [{$replaceWith: {_id: i, $v: v}}]), 16410);
});
});
it("tests that pipeline updates using $v inside using $replaceRoot and $literal succeed",
() => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandWorked(
db[kCollName].update({_id: i},
[{$replaceRoot: {newRoot: {$literal: {_id: i, $v: v}}}}]),
);
let expected = {
documentKey: {_id: i},
fullDocument: {_id: i, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "replace",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
it("tests that pipeline updates using $v using $replaceRoot and an object literal fail", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(
db[kCollName].update({_id: i}, [{$replaceRoot: {newRoot: {_id: i, $v: v}}}]),
16410,
);
});
});
it("tests that update using $v inside $inc fails", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(db[kCollName].update({_id: i}, {$inc: {$v: 1}}), [
ErrorCodes.DollarPrefixedFieldName,
ErrorCodes.TypeMismatch,
]);
});
});
it("tests that updates using $v inside $addFields fails", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(db[kCollName].update({_id: i}, [{$addFields: {$v: i}}]),
16410);
});
});
it("tests that updates using $v inside $replaceWith and $setField and an object literal succeed",
() => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandWorked(
db[kCollName].update(
{_id: i},
[
{
$replaceWith:
{$setField: {field: {$literal: "$v"}, input: "$$ROOT", value: v}}
},
]),
);
let expected = {
documentKey: {_id: i},
fullDocument: {_id: i, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "replace",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
});

View File

@ -330,7 +330,26 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// indicates the delta oplog entry.
Value oplogVersion =
input[repl::OplogEntry::kObjectFieldName][kUpdateOplogEntryVersionFieldName];
if (!oplogVersion.missing() && oplogVersion.getInt() == 2) {
// Check that the oplog entry format is as expected:
// - if there is an '_id' field, it is a replace.
// - if there is no '_id' field and the '$v' is 2, it is a delta (diff) update.
// If there is no '_id' field and the '$v' is not 2, it is an old-style modifier
// update. This is unsupported.
// It is important to check for '_id' field first, because a replacement style update
// can still have a '$v' field in the object.
const bool isUpdateEntry = id.missing();
uassert(
6741200,
str::stream() << "Expected _id field, or $v field missing, or $v equal to "
<< static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)
<< " (kDeltaV2), but got oplog version $v: "
<< oplogVersion.toString(),
!isUpdateEntry ||
(!oplogVersion.missing() && oplogVersion.getType() == BSONType::numberInt &&
oplogVersion.getInt() == static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)));
if (isUpdateEntry) {
// Parsing the delta oplog entry.
operationType = DocumentSourceChangeStream::kUpdateOpType;
Value diffObj = input[repl::OplogEntry::kObjectFieldName]
@ -362,12 +381,8 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
{"truncatedArrays", std::move(deltaDesc.truncatedArrays)}});
}
}
} else if (!oplogVersion.missing() || id.missing()) {
// This is not a replacement op, and we did not see a valid update version number.
uasserted(6741200,
str::stream() << "Unsupported or missing oplog version, $v: "
<< oplogVersion.toString());
} else {
// Replace.
operationType = DocumentSourceChangeStream::kReplaceOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
}
@ -609,7 +624,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
boost::optional<std::pair<StringData, Value>>
ChangeStreamDefaultEventTransformation::handleSupportedEvent(const Document& o2Field) const {
for (auto&& supportedEvent : _supportedEvents) {
if (auto lookup = o2Field[supportedEvent]; !o2Field[supportedEvent].missing()) {
if (auto lookup = o2Field[supportedEvent]; !lookup.missing()) {
// Known event.
return std::make_pair(supportedEvent,
Value{copyDocExceptFields(o2Field, {supportedEvent})});

View File

@ -3625,15 +3625,44 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
}
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissingNotSupported) {
// A missing $v field in the update oplog entry implies $v:1, which is no longer supported.
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissingWithId) {
// An _id field in the update oplog entry implies a replace.
BSONObj o = BSON("_id" << 1 << "$set" << BSON("y" << 1));
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
auto replace = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
Document expectedReplace{
{DSChangeStream::kIdField,
makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"$set", D{{"y", 1}}}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
checkTransformation(replace, expectedReplace);
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissingWithoutId) {
// A missing _id field and a missing $v field in the update oplog entry implies a $v:1 update
// oplog entry, which is no longer supported.
BSONObj o = BSON("$set" << BSON("y" << 1));
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsNonV2NotSupported) {
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsInvalidV) {
// A $v field in the update oplog entry without an _id field requires '$v:2'.
BSONObj o = BSON("$v" << "ABC" << "diff" << BSON("i" << BSON("v" << 5)));
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsDeltaUpdateNonV2NotSupported) {
BSONObj diff = BSON("u" << BSON("y" << 1));
BSONObj o = BSON("diff" << diff << "$v" << 3);
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
@ -3641,7 +3670,7 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsNonV2NotSupported) {
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) {
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsDeltaUpdate) {
BSONObj diff = BSON("u" << BSON("y" << 1));
BSONObj o = BSON("diff" << diff << "$v" << 2);
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
@ -3657,6 +3686,13 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) {
checkTransformation(updateField, expectedUpdateField);
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsModifierUpdate) {
BSONObj o = BSON("x" << 2 << "y" << 1);
BSONObj o2 = BSON("_id" << 1);
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
}
TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) {
BSONObj diff = BSON("d" << BSON("y" << false));
BSONObj o = BSON("diff" << diff << "$v" << 2);
@ -3702,6 +3738,30 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) {
checkTransformation(replace, expectedReplace);
}
TEST_F(ChangeStreamStageDBTest, TransformReplaceWithVField) {
BSONObj o = BSON("_id" << 1 << "x" << 2 << "y" << 1 << "$v" << 2);
BSONObj o2 = BSON("_id" << 1);
auto replace = makeOplogEntry(OpTypeEnum::kUpdate, // op type
nss, // namespace
o, // o
testUuid(), // uuid
boost::none, // fromMigrate
o2); // o2
// Replace
Document expectedReplace{
{DSChangeStream::kIdField,
makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}, {"$v", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
checkTransformation(replace, expectedReplace);
}
TEST_F(ChangeStreamStageDBTest, TransformDelete) {
BSONObj o = BSON("_id" << 1 << "x" << 2);
auto deleteEntry = makeOplogEntry(OpTypeEnum::kDelete, // op type