SERVER-88167 Add 'fromMigrate' field to migration change events (#47612)
GitOrigin-RevId: a30548c43c03a9f39ddda2e972e1acf0fd246d83
This commit is contained in:
parent
147e4a0f32
commit
deac476b89
@ -380,6 +380,11 @@ fields include:
|
||||
"update" events if the change stream is opened with the `fullDocument` parameter set to any other
|
||||
value than `default`.
|
||||
- `updateDescription` / `rawUpdateDescription`: contains details for "update" events.
|
||||
- `fromMigrate`: a boolean field set to `true` for events that originate from a data migration
|
||||
(e.g., chunk migrations, `moveCollection`, `unshardCollection`, `movePrimary`). This field is only
|
||||
present when both the change stream was opened with `showMigrationEvents: true` **and** the server
|
||||
parameter `changeStreamsEmitFromMigrate` is enabled (see below). Regular non-migration events
|
||||
never carry this field.
|
||||
|
||||
The majority of change stream event fields are emitted by the
|
||||
`ChangeStreamDefaultEventTransformation` object
|
||||
@ -540,6 +545,36 @@ happening during chunk migrations. If set to `true`, insert and delete events re
|
||||
migrations will be reported as if they were regular events. The flag defaults to `false` and is
|
||||
internal.
|
||||
|
||||
When `showMigrationEvents` is `true` and the `changeStreamsEmitFromMigrate` server parameter is also
|
||||
enabled (the default), emitted migration events include a `fromMigrate: true` field. This allows
|
||||
consumers to distinguish migration-originated events from user-initiated writes.
|
||||
|
||||
The `fromMigrate` field is emitted for:
|
||||
|
||||
- **DML events** (insert, delete) caused by chunk migration, `moveCollection`, `unshardCollection`,
|
||||
or `movePrimary` data cloning. TODO SERVER-107688: add 'fromMigrate' for migration events stemming
|
||||
from cross-database renames.
|
||||
- **DDL events** (create, createIndexes, rename, reshardDoneCatchUp) generated by the resharding
|
||||
machinery or `movePrimary` on the recipient shard.
|
||||
|
||||
The `fromMigrate` field is intentionally **not** emitted for inserts into temporary resharding
|
||||
system collections (e.g. `system.resharding.*`), because those documents are owned by the shard they
|
||||
are written to — they are not orphans.
|
||||
|
||||
#### `changeStreamsEmitFromMigrate` server parameter
|
||||
|
||||
This is a server parameter (not a change stream flag), configurable at startup or runtime via
|
||||
`setParameter`. When set to `true` (the default), change streams opened with
|
||||
`showMigrationEvents: true` will include the `fromMigrate: true` field on events that originate from
|
||||
a migration. When set to `false`, migration events still appear in the stream (assuming
|
||||
`showMigrationEvents: true`) but the `fromMigrate` field is omitted.
|
||||
|
||||
To disable the `fromMigrate` field at runtime, run the following on each relevant shard:
|
||||
|
||||
```js
|
||||
db.adminCommand({setParameter: 1, changeStreamsEmitFromMigrate: false});
|
||||
```
|
||||
|
||||
#### `showCommitTimestamp` (internal)
|
||||
|
||||
The `showCommitTimestamp` flag can be used to include the transaction commit timestamp inside DML
|
||||
|
||||
@ -97,7 +97,7 @@ describe("$changeStream", function () {
|
||||
// history. Before movePrimary, the recipient shard (shard1) has no data for this
|
||||
// database, so v2 does not open a cursor there. The fromMigrate:true
|
||||
// create/createIndexes events are written to shard1's oplog during the clone phase
|
||||
// — before the v2 reader detects the movePrimary control event and opens a new
|
||||
// - before the v2 reader detects the movePrimary control event and opens a new
|
||||
// cursor on shard1 at clusterTime+1 (after the control event), missing the earlier
|
||||
// fromMigrate events. With v1, cursors are opened on all shards from the start, so
|
||||
// shard1's cursor captures those events as they are written.
|
||||
@ -122,10 +122,6 @@ describe("$changeStream", function () {
|
||||
assert.commandWorked(db[untrackedCollName].insertOne(sentinelDoc));
|
||||
|
||||
if (showSystemEvents) {
|
||||
// TODO SERVER-88167: Once fromMigrate is exposed in change stream event
|
||||
// documents, add fromMigrate: true to these expected events to directly
|
||||
// verify the flag.
|
||||
//
|
||||
// movePrimary replays create and createIndexes on the recipient shard.
|
||||
// Untracked collections get both create + createIndexes (full clone).
|
||||
// Tracked collections (unsplittable, sharded) get only create
|
||||
|
||||
@ -2,24 +2,40 @@
|
||||
//
|
||||
// @tags: [
|
||||
// requires_majority_read_concern,
|
||||
// requires_sharding,
|
||||
// uses_change_streams,
|
||||
// ]
|
||||
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
const isMultiversion = Boolean(jsTest.options().useRandomBinVersionsWithinReplicaSet);
|
||||
const is90OrHigher = MongoRunner.compareBinVersions(lastLTSFCV, "9.0") >= 0;
|
||||
|
||||
function checkEvents(changeStream, cursor, expectedEvents) {
|
||||
expectedEvents.forEach((event) => {
|
||||
expectedEvents.forEach((event, i) => {
|
||||
let next = changeStream.getOneChange(cursor);
|
||||
assert.eq(next.operationType, event["operationType"]);
|
||||
assert.eq(next.documentKey, {_id: event["_id"]});
|
||||
|
||||
if (!isMultiversion || is90OrHigher) {
|
||||
assert.eq(
|
||||
next.fromMigrate,
|
||||
event["fromMigrate"],
|
||||
`got wrong 'fromMigrate' value. actual event: ${tojsononeline(next)}, expected event: ${tojsononeline(event)}`,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function makeEvent(docId, opType) {
|
||||
function makeEvent(docId, opType, fromMigrate = undefined) {
|
||||
assert(typeof docId === "number");
|
||||
assert(typeof opType === "string" && (opType === "insert" || opType === "delete"));
|
||||
return {_id: docId, operationType: opType};
|
||||
let doc = {_id: docId, operationType: opType};
|
||||
if ((!isMultiversion || is90OrHigher) && fromMigrate) {
|
||||
doc.fromMigrate = fromMigrate;
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
// TODO WT-3864: Re-enable test for LSM once transaction visibility bug in LSM is resolved.
|
||||
@ -87,9 +103,9 @@ assert.commandWorked(
|
||||
}),
|
||||
);
|
||||
|
||||
let shardZeroEventsBeforeNewShard = [makeEvent(0, "insert"), makeEvent(20, "insert")];
|
||||
let shardZeroEventsAfterNewShard = [makeEvent(20, "delete")];
|
||||
let shardOneEvents = [makeEvent(20, "insert")];
|
||||
let shardZeroEventsBeforeNewShard = [makeEvent(0, "insert", undefined), makeEvent(20, "insert", undefined)];
|
||||
let shardZeroEventsAfterNewShard = [makeEvent(20, "delete", true)];
|
||||
let shardOneEvents = [makeEvent(20, "insert", true)];
|
||||
|
||||
// Check that each change stream returns the expected events.
|
||||
checkEvents(changeStreamTestShardZero, changeStreamShardZero, shardZeroEventsBeforeNewShard);
|
||||
@ -118,13 +134,18 @@ assert.commandWorked(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}
|
||||
assert.commandWorked(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
|
||||
assert.commandWorked(mongosColl.insert({_id: 22}, {writeConcern: {w: "majority"}}));
|
||||
|
||||
let shardZeroEvents = [makeEvent(1, "insert"), makeEvent(0, "delete"), makeEvent(1, "delete"), makeEvent(-2, "insert")];
|
||||
let shardZeroEvents = [
|
||||
makeEvent(1, "insert", undefined),
|
||||
makeEvent(0, "delete", true),
|
||||
makeEvent(1, "delete", true),
|
||||
makeEvent(-2, "insert", undefined),
|
||||
];
|
||||
shardOneEvents = [
|
||||
makeEvent(21, "insert"),
|
||||
makeEvent(0, "insert"),
|
||||
makeEvent(1, "insert"),
|
||||
makeEvent(2, "insert"),
|
||||
makeEvent(22, "insert"),
|
||||
makeEvent(21, "insert", undefined),
|
||||
makeEvent(0, "insert", true),
|
||||
makeEvent(1, "insert", true),
|
||||
makeEvent(2, "insert", undefined),
|
||||
makeEvent(22, "insert", undefined),
|
||||
];
|
||||
|
||||
// Check that each change stream returns the expected events.
|
||||
@ -164,25 +185,25 @@ assert.commandWorked(mongosColl.insert({_id: 24}, {writeConcern: {w: "majority"}
|
||||
const clustered = mongosColl.getIndexes()[0].clustered;
|
||||
|
||||
// Check that each change stream returns the expected events.
|
||||
shardZeroEvents = [makeEvent(-3, "insert"), makeEvent(-3, "delete"), makeEvent(-2, "delete")];
|
||||
shardZeroEvents = [makeEvent(-3, "insert", undefined), makeEvent(-3, "delete", true), makeEvent(-2, "delete", true)];
|
||||
shardOneEvents = clustered
|
||||
? [
|
||||
makeEvent(3, "insert"),
|
||||
makeEvent(23, "insert"),
|
||||
makeEvent(-3, "insert"), // Clustered order.
|
||||
makeEvent(-2, "insert"),
|
||||
makeEvent(-4, "insert"),
|
||||
makeEvent(4, "insert"),
|
||||
makeEvent(24, "insert"),
|
||||
makeEvent(3, "insert", undefined),
|
||||
makeEvent(23, "insert", undefined),
|
||||
makeEvent(-3, "insert", true), // Clustered order.
|
||||
makeEvent(-2, "insert", true),
|
||||
makeEvent(-4, "insert", undefined),
|
||||
makeEvent(4, "insert", undefined),
|
||||
makeEvent(24, "insert", undefined),
|
||||
]
|
||||
: [
|
||||
makeEvent(3, "insert"),
|
||||
makeEvent(23, "insert"),
|
||||
makeEvent(-2, "insert"),
|
||||
makeEvent(-3, "insert"), // Non-clustered order.
|
||||
makeEvent(-4, "insert"),
|
||||
makeEvent(4, "insert"),
|
||||
makeEvent(24, "insert"),
|
||||
makeEvent(3, "insert", undefined),
|
||||
makeEvent(23, "insert", undefined),
|
||||
makeEvent(-2, "insert", true),
|
||||
makeEvent(-3, "insert", true), // Non-clustered order.
|
||||
makeEvent(-4, "insert", undefined),
|
||||
makeEvent(4, "insert", undefined),
|
||||
makeEvent(24, "insert", undefined),
|
||||
];
|
||||
|
||||
checkEvents(changeStreamTestShardZero, changeStreamShardZero, shardZeroEvents);
|
||||
@ -207,7 +228,11 @@ assert.commandWorked(mongosColl.insert({_id: -5}, {writeConcern: {w: "majority"}
|
||||
assert.commandWorked(mongosColl.insert({_id: 5}, {writeConcern: {w: "majority"}}));
|
||||
assert.commandWorked(mongosColl.insert({_id: 25}, {writeConcern: {w: "majority"}}));
|
||||
|
||||
shardOneEvents = [makeEvent(-5, "insert"), makeEvent(5, "insert"), makeEvent(25, "insert")];
|
||||
shardOneEvents = [
|
||||
makeEvent(-5, "insert", undefined),
|
||||
makeEvent(5, "insert", undefined),
|
||||
makeEvent(25, "insert", undefined),
|
||||
];
|
||||
|
||||
changeStreamTestShardZero.assertNoChange(changeStreamShardZero);
|
||||
checkEvents(changeStreamTestShardOne, changeStreamShardOne, shardOneEvents);
|
||||
@ -225,38 +250,38 @@ assert.commandWorked(mongosColl.insert({_id: -6}, {writeConcern: {w: "majority"}
|
||||
assert.commandWorked(mongosColl.insert({_id: 6}, {writeConcern: {w: "majority"}}));
|
||||
assert.commandWorked(mongosColl.insert({_id: 26}, {writeConcern: {w: "majority"}}));
|
||||
|
||||
let shardOneEventsBeforeNewShard = [makeEvent(16, "insert")];
|
||||
let shardOneEventsBeforeNewShard = [makeEvent(16, "insert", undefined)];
|
||||
let shardOneEventsAfterNewShard = [
|
||||
makeEvent(16, "delete"),
|
||||
makeEvent(20, "delete"),
|
||||
makeEvent(21, "delete"),
|
||||
makeEvent(22, "delete"),
|
||||
makeEvent(23, "delete"),
|
||||
makeEvent(24, "delete"),
|
||||
makeEvent(25, "delete"),
|
||||
makeEvent(-6, "insert"),
|
||||
makeEvent(6, "insert"),
|
||||
makeEvent(16, "delete", true),
|
||||
makeEvent(20, "delete", true),
|
||||
makeEvent(21, "delete", true),
|
||||
makeEvent(22, "delete", true),
|
||||
makeEvent(23, "delete", true),
|
||||
makeEvent(24, "delete", true),
|
||||
makeEvent(25, "delete", true),
|
||||
makeEvent(-6, "insert", undefined),
|
||||
makeEvent(6, "insert", undefined),
|
||||
];
|
||||
let newShardEvents = clustered
|
||||
? [
|
||||
makeEvent(16, "insert"), // Clustered order.
|
||||
makeEvent(20, "insert"),
|
||||
makeEvent(21, "insert"),
|
||||
makeEvent(22, "insert"),
|
||||
makeEvent(23, "insert"),
|
||||
makeEvent(24, "insert"),
|
||||
makeEvent(25, "insert"),
|
||||
makeEvent(26, "insert"),
|
||||
makeEvent(16, "insert", true), // Clustered order.
|
||||
makeEvent(20, "insert", true),
|
||||
makeEvent(21, "insert", true),
|
||||
makeEvent(22, "insert", true),
|
||||
makeEvent(23, "insert", true),
|
||||
makeEvent(24, "insert", true),
|
||||
makeEvent(25, "insert", true),
|
||||
makeEvent(26, "insert", undefined),
|
||||
]
|
||||
: [
|
||||
makeEvent(20, "insert"),
|
||||
makeEvent(21, "insert"),
|
||||
makeEvent(22, "insert"),
|
||||
makeEvent(23, "insert"),
|
||||
makeEvent(24, "insert"),
|
||||
makeEvent(25, "insert"),
|
||||
makeEvent(16, "insert"), // Non-clustered order.
|
||||
makeEvent(26, "insert"),
|
||||
makeEvent(20, "insert", true),
|
||||
makeEvent(21, "insert", true),
|
||||
makeEvent(22, "insert", true),
|
||||
makeEvent(23, "insert", true),
|
||||
makeEvent(24, "insert", true),
|
||||
makeEvent(25, "insert", true),
|
||||
makeEvent(16, "insert", true), // Non-clustered order.
|
||||
makeEvent(26, "insert", undefined),
|
||||
];
|
||||
|
||||
// Check that each change stream returns the expected events.
|
||||
|
||||
@ -0,0 +1,516 @@
|
||||
/**
|
||||
* Tests 'showMigrationEvents' behavior for DDL operations that cause data migration
|
||||
* (moveCollection, unshardCollection, moveChunk, movePrimary) and verifies the
|
||||
* 'changeStreamsEmitFromMigrate' server parameter.
|
||||
*
|
||||
* When 'showMigrationEvents: true' is set on a shard-level change stream, insert and delete events
|
||||
* triggered by migrations become visible. When 'changeStreamsEmitFromMigrate' is true (the
|
||||
* default), those events carry a 'fromMigrate: true' field. When the parameter is false the events
|
||||
* still appear but the 'fromMigrate' field is omitted.
|
||||
*
|
||||
* For 'movePrimary', untracked collections on the old primary shard are cloned onto the new primary
|
||||
* shard. This generates 'create' and 'createIndexes' DDL events on the recipient shard that also
|
||||
* carry 'fromMigrate: true' when 'changeStreamsEmitFromMigrate' is enabled.
|
||||
*
|
||||
* NOTE: 'showMigrationEvents: true' is only accepted on individual shard mongod nodes; mongos
|
||||
* rejects it with error 31123.
|
||||
*
|
||||
* @tags: [
|
||||
* assumes_balancer_off,
|
||||
* does_not_support_stepdowns,
|
||||
* requires_fcv_90,
|
||||
* requires_sharding,
|
||||
* uses_change_streams,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
import {runWithParamsAllNonConfigNodes} from "jstests/noPassthrough/libs/server_parameter_helpers.js";
|
||||
import {describe, it, before, after} from "jstests/libs/mochalite.js";
|
||||
|
||||
// Returns a unique database name so each test starts with a clean slate.
|
||||
let testIdx = 0;
|
||||
const freshDbName = () => {
|
||||
return `test${testIdx++}`;
|
||||
};
|
||||
|
||||
// Inserts a sentinel to delimit the stream, then reads change stream events one at a time until
|
||||
// the sentinel is found. Returns all collected events, including the sentinel.
|
||||
const collectEventsUntilSentinel = (coll, csTest, cursor) => {
|
||||
const sentinelId = "sentinel";
|
||||
assert.commandWorked(coll.insert({_id: sentinelId}));
|
||||
|
||||
const isSentinel = (e) =>
|
||||
e.operationType === "insert" && e.ns.coll === coll.getName() && e.documentKey._id === sentinelId;
|
||||
|
||||
const events = [];
|
||||
while (true) {
|
||||
const event = csTest.getOneChange(cursor);
|
||||
events.push(event);
|
||||
if (isSentinel(event)) {
|
||||
// The sentinel itself must not carry 'fromMigrate'.
|
||||
assert(!event.hasOwnProperty("fromMigrate"), `Sentinel must not have 'fromMigrate': ${tojson(event)}`);
|
||||
return events;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Opens a DB-level change stream on shard1's primary and returns {csTest, cursor}. Creates an
|
||||
// unsplittable collection on shard1 first (and immediately drops it) so that the database exists on
|
||||
// the shard before the change stream is opened.
|
||||
const openRecipientStream = (st, dbName, showExpandedEvents = false) => {
|
||||
const mongos = st.s;
|
||||
const db = mongos.getDB(dbName);
|
||||
|
||||
// Ensure the database is initialized on shard1 before the change stream is opened.
|
||||
assert.commandWorked(db.runCommand({createUnsplittableCollection: "_warmup", dataShard: st.shard1.shardName}));
|
||||
assert.commandWorked(db.runCommand({drop: "_warmup"}));
|
||||
|
||||
const shard1Primary = st.rs1.getPrimary();
|
||||
const csTest = new ChangeStreamTest(shard1Primary.getDB(dbName));
|
||||
const cursor = csTest.startWatchingChanges({
|
||||
pipeline: [
|
||||
{
|
||||
$changeStream: {
|
||||
showMigrationEvents: true,
|
||||
showSystemEvents: true,
|
||||
showExpandedEvents,
|
||||
allowToRunOnSystemNS: true,
|
||||
},
|
||||
},
|
||||
],
|
||||
collection: 1,
|
||||
});
|
||||
return {csTest, cursor};
|
||||
};
|
||||
|
||||
// Asserts that 'migrationInserts' (events into system.resharding.*) do not carry a 'fromMigrate'
|
||||
// flag.
|
||||
// This is intentional because 'fromMigrate' is about orphan documents, and resharding does not
|
||||
// write orphan documents. All documents written by resharding are owned documents by the shards
|
||||
// they are inserted into.
|
||||
const assertMigrationInserts = (allEvents) => {
|
||||
const migrationInserts = allEvents.filter(
|
||||
(e) => e.operationType === "insert" && e.ns.coll.startsWith("system.resharding."),
|
||||
);
|
||||
assert.gt(
|
||||
migrationInserts.length,
|
||||
0,
|
||||
`Expected at least one migration insert from resharding machinery; ` + `all events: ${tojson(allEvents)}`,
|
||||
);
|
||||
|
||||
migrationInserts.forEach((e) => {
|
||||
assert(!e.hasOwnProperty("fromMigrate"), `Expected no fromMigrate field on migration insert: ${tojson(e)}`);
|
||||
});
|
||||
};
|
||||
|
||||
const assertReshardingDoneCatchUpAndRenameEvents = (allEvents, collName, emitFromMigrate) => {
|
||||
const catchUpEvents = allEvents.filter(
|
||||
(e) => e.operationType === "reshardDoneCatchUp" && e.ns.coll.startsWith("system.resharding."),
|
||||
);
|
||||
assert.eq(
|
||||
catchUpEvents.length,
|
||||
1,
|
||||
`Expected at least one 'reshardDoneCatchUp' event for '${collName}'; all events: ${tojson(allEvents)}`,
|
||||
);
|
||||
catchUpEvents.forEach((e) => {
|
||||
assert.eq(
|
||||
emitFromMigrate,
|
||||
e.hasOwnProperty("fromMigrate"),
|
||||
`Invalid fromMigrate on reshardDoneCatchUp event: ${tojson(e)}`,
|
||||
);
|
||||
});
|
||||
|
||||
const renameEvents = allEvents.filter(
|
||||
(e) =>
|
||||
e.operationType === "rename" &&
|
||||
e.ns.coll.startsWith("system.resharding.") &&
|
||||
e.operationDescription.to.coll == collName,
|
||||
);
|
||||
assert.eq(
|
||||
renameEvents.length,
|
||||
1,
|
||||
`Expected at least one 'reshardDoneCatchUp' event for '${collName}'; all events: ${tojson(allEvents)}`,
|
||||
);
|
||||
renameEvents.forEach((e) => {
|
||||
assert.eq(
|
||||
emitFromMigrate,
|
||||
e.hasOwnProperty("fromMigrate"),
|
||||
`Invalid fromMigrate on rename event: ${tojson(e)}`,
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
describe("$changeStream showMigrationEvents", () => {
|
||||
let st;
|
||||
|
||||
before(() => {
|
||||
st = new ShardingTest({
|
||||
shards: 2,
|
||||
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}},
|
||||
});
|
||||
});
|
||||
|
||||
after(() => {
|
||||
st.stop();
|
||||
});
|
||||
|
||||
describe("moveChunk/moveCollection/unshardCollection", () => {
|
||||
// ---------------------------------------------------------------------------
|
||||
// moveChunk
|
||||
// ---------------------------------------------------------------------------
|
||||
function runMoveChunkTest(emitFromMigrate) {
|
||||
const dbName = freshDbName();
|
||||
const collName = "coll";
|
||||
const mongos = st.s;
|
||||
const mongosColl = mongos.getCollection(`${dbName}.${collName}`);
|
||||
|
||||
assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
assert.commandWorked(mongos.adminCommand({shardCollection: `${dbName}.${collName}`, key: {_id: 1}}));
|
||||
assert.commandWorked(mongosColl.insert({_id: 0}));
|
||||
assert.commandWorked(mongosColl.insert({_id: 20}));
|
||||
assert.commandWorked(mongos.adminCommand({split: `${dbName}.${collName}`, middle: {_id: 10}}));
|
||||
|
||||
runWithParamsAllNonConfigNodes(st.s.getDB(dbName), {changeStreamsEmitFromMigrate: emitFromMigrate}, () => {
|
||||
// Open change streams on both shard primaries after the initial inserts.
|
||||
// The streams start from 'now' so they will not see the pre-migration inserts;
|
||||
// the first events they observe will be the migration events from moveChunk.
|
||||
const csTestShard0 = new ChangeStreamTest(st.shard0.getDB(dbName));
|
||||
const cursorShard0 = csTestShard0.startWatchingChanges({
|
||||
pipeline: [{$changeStream: {showMigrationEvents: true}}],
|
||||
collection: st.shard0.getCollection(`${dbName}.${collName}`),
|
||||
});
|
||||
const csTestShard1 = new ChangeStreamTest(st.shard1.getDB(dbName));
|
||||
const cursorShard1 = csTestShard1.startWatchingChanges({
|
||||
pipeline: [{$changeStream: {showMigrationEvents: true}}],
|
||||
collection: st.shard1.getCollection(`${dbName}.${collName}`),
|
||||
});
|
||||
|
||||
assert.commandWorked(
|
||||
mongos.adminCommand({
|
||||
moveChunk: `${dbName}.${collName}`,
|
||||
find: {_id: 20},
|
||||
to: st.shard1.shardName,
|
||||
_waitForDelete: true,
|
||||
}),
|
||||
);
|
||||
|
||||
// Donor shard (shard0) should see a delete for '{_id: 20}'.
|
||||
const donorDeleteEvent = csTestShard0.getOneChange(cursorShard0);
|
||||
assert.eq(donorDeleteEvent.operationType, "delete", tojson(donorDeleteEvent));
|
||||
assert.eq(donorDeleteEvent.documentKey, {_id: 20}, tojson(donorDeleteEvent));
|
||||
assert.eq(
|
||||
emitFromMigrate,
|
||||
donorDeleteEvent.hasOwnProperty("fromMigrate"),
|
||||
`Invalid fromMigrate field value on donor delete: ${tojson(donorDeleteEvent)}`,
|
||||
);
|
||||
|
||||
// Recipient shard (shard1) should see an insert for '{_id: 20}'.
|
||||
const recipientInsertEvent = csTestShard1.getOneChange(cursorShard1);
|
||||
assert.eq(recipientInsertEvent.operationType, "insert", tojson(recipientInsertEvent));
|
||||
assert.eq(recipientInsertEvent.documentKey, {_id: 20}, tojson(recipientInsertEvent));
|
||||
assert.eq(
|
||||
emitFromMigrate,
|
||||
recipientInsertEvent.hasOwnProperty("fromMigrate"),
|
||||
`Invalid fromMigrate field value on recipient insert: ${tojson(recipientInsertEvent)}`,
|
||||
);
|
||||
|
||||
csTestShard0.assertNoChange(cursorShard0);
|
||||
csTestShard1.assertNoChange(cursorShard1);
|
||||
|
||||
csTestShard0.cleanUp();
|
||||
csTestShard1.cleanUp();
|
||||
});
|
||||
}
|
||||
|
||||
it("moveChunk: migration events visible with fromMigrate:true", () => {
|
||||
runMoveChunkTest(true /* emitFromMigrate */);
|
||||
});
|
||||
|
||||
it("moveChunk: migration events not visible with fromMigrate:false", () => {
|
||||
runMoveChunkTest(false /* emitFromMigrate */);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// moveCollection
|
||||
// ---------------------------------------------------------------------------
|
||||
function runMoveCollectionTest(emitFromMigrate) {
|
||||
const dbName = freshDbName();
|
||||
const collName = "coll";
|
||||
const mongos = st.s;
|
||||
const db = mongos.getDB(dbName);
|
||||
const coll = db[collName];
|
||||
|
||||
assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
|
||||
// Create an unsplittable (tracked, single-shard) collection on shard0.
|
||||
assert.commandWorked(
|
||||
db.runCommand({createUnsplittableCollection: collName, dataShard: st.shard0.shardName}),
|
||||
);
|
||||
for (let i = 0; i < 3; ++i) {
|
||||
assert.commandWorked(coll.insert({_id: i}));
|
||||
}
|
||||
|
||||
// 'changeStreamsEmitFromMigrate' is evaluated when a change stream is opened, so the
|
||||
// parameter must be set before openRecipientStream is called.
|
||||
runWithParamsAllNonConfigNodes(db, {changeStreamsEmitFromMigrate: emitFromMigrate}, () => {
|
||||
// Watch shard1 (the recipient) at the DB level before the move.
|
||||
const {csTest, cursor} = openRecipientStream(st, dbName, true /* showExpandedEvents */);
|
||||
|
||||
// Move the collection to shard1.
|
||||
assert.commandWorked(
|
||||
mongos.adminCommand({moveCollection: `${dbName}.${collName}`, toShard: st.shard1.shardName}),
|
||||
);
|
||||
|
||||
// After the move, inserts go to shard1.
|
||||
const events = collectEventsUntilSentinel(coll, csTest, cursor);
|
||||
|
||||
// Migration inserts land in the temporary resharding namespace on shard1.
|
||||
assertMigrationInserts(events);
|
||||
|
||||
assertReshardingDoneCatchUpAndRenameEvents(events, collName, emitFromMigrate);
|
||||
csTest.cleanUp();
|
||||
});
|
||||
|
||||
db.dropDatabase();
|
||||
}
|
||||
|
||||
it("moveCollection: recipient sees migration inserts with fromMigrate:true", () => {
|
||||
runMoveCollectionTest(true /* emitFromMigrate */);
|
||||
});
|
||||
|
||||
it("moveCollection: recipient does not see migration inserts with fromMigrate:false", () => {
|
||||
runMoveCollectionTest(false /* emitFromMigrate */);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// unshardCollection
|
||||
//
|
||||
// 'unshardCollection' converts a sharded collection into an unsplittable one on a specified
|
||||
// shard using the same resharding machinery, so the recipient shard also sees the cloned
|
||||
// documents as migration inserts.
|
||||
// ---------------------------------------------------------------------------
|
||||
function runUnshardCollectionTest(emitFromMigrate) {
|
||||
const dbName = freshDbName();
|
||||
const collName = "coll";
|
||||
const mongos = st.s;
|
||||
const db = mongos.getDB(dbName);
|
||||
const coll = db[collName];
|
||||
|
||||
assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
|
||||
// Shard the collection with all chunks initially on shard0.
|
||||
assert.commandWorked(mongos.adminCommand({shardCollection: `${dbName}.${collName}`, key: {_id: 1}}));
|
||||
for (let i = 0; i < 3; ++i) {
|
||||
assert.commandWorked(coll.insert({_id: i}));
|
||||
}
|
||||
|
||||
runWithParamsAllNonConfigNodes(db, {changeStreamsEmitFromMigrate: emitFromMigrate}, () => {
|
||||
// Watch shard1 (the recipient) at the DB level before the unshard.
|
||||
const {csTest, cursor} = openRecipientStream(st, dbName, true /* showExpandedEvents */);
|
||||
|
||||
// Unshard the collection onto shard1.
|
||||
assert.commandWorked(
|
||||
mongos.adminCommand({
|
||||
unshardCollection: `${dbName}.${collName}`,
|
||||
toShard: st.shard1.shardName,
|
||||
}),
|
||||
);
|
||||
|
||||
// After the unshard, inserts go to shard1.
|
||||
const events = collectEventsUntilSentinel(coll, csTest, cursor);
|
||||
const createEvents = events.filter(
|
||||
(e) => e.operationType === "create" && e.ns.coll.startsWith("system.resharding."),
|
||||
);
|
||||
assert.eq(
|
||||
createEvents.length,
|
||||
1,
|
||||
`Expected at least one 'create' event for '${collName}'; all events: ${tojson(events)}`,
|
||||
);
|
||||
createEvents.forEach((e) => {
|
||||
assert.eq(
|
||||
emitFromMigrate,
|
||||
e.hasOwnProperty("fromMigrate"),
|
||||
`Invalid fromMigrate on create event: ${tojson(e)}`,
|
||||
);
|
||||
});
|
||||
|
||||
// Migration inserts land in the temporary resharding namespace on shard1.
|
||||
assertMigrationInserts(events);
|
||||
|
||||
assertReshardingDoneCatchUpAndRenameEvents(events, collName, emitFromMigrate);
|
||||
|
||||
csTest.cleanUp();
|
||||
});
|
||||
|
||||
db.dropDatabase();
|
||||
}
|
||||
|
||||
it("unshardCollection: recipient sees migration inserts with fromMigrate:true", () => {
|
||||
runUnshardCollectionTest(true /* emitFromMigrate */);
|
||||
});
|
||||
|
||||
it("unshardCollection: recipient does not see migration inserts with fromMigrate:false", () => {
|
||||
runUnshardCollectionTest(false /* emitFromMigrate */);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// reshardCollection
|
||||
//
|
||||
// 'reshardCollection' changes the shard key and redistributes data via the same resharding
|
||||
// machinery. By placing all initial chunks on shard0 and assigning the new key range
|
||||
// exclusively to shard1 via a zone, we force a deterministic cross-shard clone so that shard1
|
||||
// is always the recipient.
|
||||
// ---------------------------------------------------------------------------
|
||||
describe("reshardColllection", () => {
|
||||
function runReshardCollectionTest(emitFromMigrate) {
|
||||
const dbName = freshDbName();
|
||||
const collName = "coll";
|
||||
const mongos = st.s;
|
||||
const db = mongos.getDB(dbName);
|
||||
const coll = db[collName];
|
||||
const ns = `${dbName}.${collName}`;
|
||||
const zoneName = `zone_${dbName}`;
|
||||
|
||||
assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}}));
|
||||
|
||||
// Insert documents with both old and new shard key fields.
|
||||
// All chunks start on shard0 because it is the primary shard.
|
||||
for (let i = 0; i < 3; ++i) {
|
||||
assert.commandWorked(coll.insert({_id: i, a: i}));
|
||||
}
|
||||
|
||||
// Tag shard1 with a zone so that the 'zones' field in 'reshardCollection' forces all
|
||||
// chunks of the new key {a:1} onto shard1.
|
||||
assert.commandWorked(mongos.adminCommand({addShardToZone: st.shard1.shardName, zone: zoneName}));
|
||||
|
||||
runWithParamsAllNonConfigNodes(db, {changeStreamsEmitFromMigrate: emitFromMigrate}, () => {
|
||||
// Watch shard1 (the recipient) at the DB level before the reshard.
|
||||
const {csTest, cursor} = openRecipientStream(st, dbName, true /* showExpandedEvents */);
|
||||
|
||||
// Reshard to the new key {a:1}; the zone forces all chunks onto shard1.
|
||||
assert.commandWorked(
|
||||
mongos.adminCommand({
|
||||
reshardCollection: ns,
|
||||
key: {a: 1},
|
||||
numInitialChunks: 1,
|
||||
zones: [{min: {a: MinKey}, max: {a: MaxKey}, zone: zoneName}],
|
||||
}),
|
||||
);
|
||||
|
||||
// After resharding, inserts go to shard1.
|
||||
const events = collectEventsUntilSentinel(coll, csTest, cursor);
|
||||
|
||||
// Migration inserts land in the temporary resharding namespace on shard1.
|
||||
assertMigrationInserts(events);
|
||||
|
||||
assertReshardingDoneCatchUpAndRenameEvents(events, collName, emitFromMigrate);
|
||||
|
||||
csTest.cleanUp();
|
||||
});
|
||||
|
||||
db.dropDatabase();
|
||||
|
||||
// Remove the zone tag from shard1 (zone key ranges on the collection will be removed
|
||||
// when the database is dropped implicitly between tests).
|
||||
assert.commandWorked(mongos.adminCommand({removeShardFromZone: st.shard1.shardName, zone: zoneName}));
|
||||
}
|
||||
|
||||
it("reshardCollection: recipient does not sees migration inserts with fromMigrate:true", () => {
|
||||
runReshardCollectionTest(true /* emitFromMigrate */);
|
||||
});
|
||||
|
||||
it("reshardCollection: recipient does not sees migration inserts with fromMigrate:false", () => {
|
||||
runReshardCollectionTest(false /* emitFromMigrate */);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// movePrimary
|
||||
//
|
||||
// 'movePrimary' moves the primary shard of a database. Untracked collections on the old primary
|
||||
// are cloned onto the new primary via the migration path, generating 'create' and
|
||||
// 'createIndexes' DDL events on the recipient shard.
|
||||
// ---------------------------------------------------------------------------
|
||||
describe("movePrimary", () => {
|
||||
function runMovePrimaryTest(emitFromMigrate) {
|
||||
const dbName = freshDbName();
|
||||
const collName = "coll";
|
||||
const mongos = st.s;
|
||||
const db = mongos.getDB(dbName);
|
||||
|
||||
// Create the database with shard0 as primary.
|
||||
assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
|
||||
// Create an untracked collection directly on shard0 (bypassing mongos so it is not
|
||||
// registered in the sharding catalog, making it eligible for relocation by movePrimary).
|
||||
assert.commandWorked(db.createCollection(collName));
|
||||
for (let i = 0; i < 3; ++i) {
|
||||
assert.commandWorked(db[collName].insert({_id: i}));
|
||||
}
|
||||
assert.commandWorked(db[collName].createIndexes([{a: 1}]));
|
||||
|
||||
runWithParamsAllNonConfigNodes(db, {changeStreamsEmitFromMigrate: emitFromMigrate}, () => {
|
||||
// Watch shard1 (the recipient) at the DB level before the move.
|
||||
const {csTest, cursor} = openRecipientStream(st, dbName, true /* showExpandedEvents */);
|
||||
|
||||
// Move the primary shard from shard0 to shard1.
|
||||
assert.commandWorked(mongos.adminCommand({movePrimary: dbName, to: st.shard1.shardName}));
|
||||
|
||||
// After the move the collection lives on shard1 (inserts through mongos now route
|
||||
// to shard1 as the new primary).
|
||||
const events = collectEventsUntilSentinel(db[collName], csTest, cursor);
|
||||
|
||||
// movePrimary clones the collection onto the recipient shard, which must emit a
|
||||
// 'create' event. The event must carry 'fromMigrate: true' iff emitFromMigrate is
|
||||
// enabled.
|
||||
const createEvents = events.filter((e) => e.operationType === "create" && e.ns.coll === collName);
|
||||
assert.gt(
|
||||
createEvents.length,
|
||||
0,
|
||||
`Expected at least one 'create' event for '${collName}'; all events: ${tojson(events)}`,
|
||||
);
|
||||
createEvents.forEach((e) => {
|
||||
assert.eq(
|
||||
emitFromMigrate,
|
||||
e.hasOwnProperty("fromMigrate"),
|
||||
`Invalid fromMigrate on create event: ${tojson(e)}`,
|
||||
);
|
||||
});
|
||||
|
||||
// movePrimary also creates the collection's indexes on the recipient shard, which
|
||||
// must emit a 'createIndexes' event with the same fromMigrate semantics.
|
||||
const createIndexesEvents = events.filter(
|
||||
(e) => e.operationType === "createIndexes" && e.ns.coll === collName,
|
||||
);
|
||||
assert.gt(
|
||||
createIndexesEvents.length,
|
||||
0,
|
||||
`Expected at least one 'createIndexes' event for '${collName}'; all events: ${tojson(events)}`,
|
||||
);
|
||||
createIndexesEvents.forEach((e) => {
|
||||
assert.eq(
|
||||
emitFromMigrate,
|
||||
e.hasOwnProperty("fromMigrate"),
|
||||
`Invalid fromMigrate on createIndexes event: ${tojson(e)}`,
|
||||
);
|
||||
});
|
||||
|
||||
csTest.cleanUp();
|
||||
});
|
||||
|
||||
db.dropDatabase();
|
||||
}
|
||||
|
||||
it("movePrimary: recipient sees 'create' and 'createIndexes' events with fromMigrate:true", () => {
|
||||
runMovePrimaryTest(true /* emitFromMigrate */);
|
||||
});
|
||||
|
||||
it("movePrimary: recipient sees 'create' and 'createIndexes' events without fromMigrate:false", () => {
|
||||
runMovePrimaryTest(false /* emitFromMigrate */);
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -14,6 +14,8 @@ import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
||||
import {assertChangeStreamEventEq, ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
|
||||
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
|
||||
|
||||
const isMultiversion = Boolean(jsTest.options().useRandomBinVersionsWithinReplicaSet);
|
||||
|
||||
// Use a higher frequency for periodic noops to speed up the test.
|
||||
const reshardingTest = new ReshardingTest({
|
||||
numDonors: 2,
|
||||
@ -91,12 +93,18 @@ reshardingTest.withReshardingInBackground(
|
||||
operationType: "reshardBegin",
|
||||
ns: {db: kDbName, coll: collName},
|
||||
};
|
||||
if (!isMultiversion) {
|
||||
expectedReshardBeginEvent.fromMigrate = true;
|
||||
}
|
||||
|
||||
const reshardBeginDonor0Event = cstDonor0.getNextChanges(
|
||||
changeStreamsCursorDonor0,
|
||||
1,
|
||||
false /* skipFirstBatch */,
|
||||
);
|
||||
if (isMultiversion) {
|
||||
delete reshardBeginDonor0Event[0].fromMigrate;
|
||||
}
|
||||
|
||||
assertChangeStreamEventEq(reshardBeginDonor0Event[0], expectedReshardBeginEvent);
|
||||
|
||||
@ -105,6 +113,10 @@ reshardingTest.withReshardingInBackground(
|
||||
1,
|
||||
false /* skipFirstBatch */,
|
||||
);
|
||||
if (isMultiversion) {
|
||||
delete reshardBeginDonor1Event[0].fromMigrate;
|
||||
}
|
||||
|
||||
assertChangeStreamEventEq(reshardBeginDonor1Event[0], expectedReshardBeginEvent);
|
||||
},
|
||||
{
|
||||
@ -138,12 +150,18 @@ reshardingTest.withReshardingInBackground(
|
||||
reshardingUUID: reshardingUUID,
|
||||
operationType: "reshardDoneCatchUp",
|
||||
};
|
||||
if (!isMultiversion) {
|
||||
expectedReshardDoneCatchUpEvent.fromMigrate = true;
|
||||
}
|
||||
|
||||
const reshardDoneCatchUpEvent = cstRecipient0.getNextChanges(
|
||||
changeStreamsCursorRecipient0,
|
||||
1,
|
||||
false /* skipFirstBatch */,
|
||||
)[0];
|
||||
if (isMultiversion) {
|
||||
delete reshardDoneCatchUpEvent.fromMigrate;
|
||||
}
|
||||
|
||||
// Ensure that the 'reshardingDoneCatchUp' event has an 'ns' field of the format
|
||||
// '{ns: kDbName, coll: "system.resharding.<>"}.
|
||||
|
||||
@ -41,6 +41,7 @@
|
||||
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
|
||||
#include "mongo/db/pipeline/document_source_change_stream.h"
|
||||
#include "mongo/db/pipeline/resume_token.h"
|
||||
#include "mongo/db/query/query_execution_knobs_gen.h"
|
||||
#include "mongo/db/repl/oplog_entry.h"
|
||||
#include "mongo/db/repl/oplog_entry_gen.h"
|
||||
#include "mongo/db/tenant_id.h"
|
||||
@ -229,18 +230,7 @@ void addTransactionIdFieldsIfPresent(const Document& input, MutableDocument& out
|
||||
ChangeStreamEventTransformation::ChangeStreamEventTransformation(
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
||||
const DocumentSourceChangeStreamSpec& spec)
|
||||
: _changeStreamSpec(spec), _expCtx(expCtx), _resumeToken(resolveResumeToken(expCtx, spec)) {
|
||||
// Determine whether the user requested a point-in-time pre-image, which will affect this
|
||||
// stage's output.
|
||||
_preImageRequested =
|
||||
_changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff;
|
||||
|
||||
// Determine whether the user requested a point-in-time post-image, which will affect this
|
||||
// stage's output.
|
||||
_postImageRequested =
|
||||
_changeStreamSpec.getFullDocument() == FullDocumentModeEnum::kWhenAvailable ||
|
||||
_changeStreamSpec.getFullDocument() == FullDocumentModeEnum::kRequired;
|
||||
}
|
||||
: _changeStreamSpec(spec), _expCtx(expCtx), _resumeToken(resolveResumeToken(expCtx, spec)) {}
|
||||
|
||||
ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal,
|
||||
Value txnOpIndexVal,
|
||||
@ -267,9 +257,19 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal,
|
||||
ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation(
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
||||
const DocumentSourceChangeStreamSpec& spec)
|
||||
: ChangeStreamEventTransformation(expCtx, spec) {
|
||||
_supportedEvents = buildSupportedEvents();
|
||||
}
|
||||
: ChangeStreamEventTransformation(expCtx, spec),
|
||||
_supportedEvents(buildSupportedEvents()),
|
||||
// Determine whether the user requested a point-in-time pre-image, which will affect this
|
||||
// stage's output.
|
||||
_preImageRequested(_changeStreamSpec.getFullDocumentBeforeChange() !=
|
||||
FullDocumentBeforeChangeModeEnum::kOff),
|
||||
// Determine whether the user requested a point-in-time post-image, which will affect this
|
||||
// stage's output.
|
||||
_postImageRequested(_changeStreamSpec.getFullDocument() ==
|
||||
FullDocumentModeEnum::kWhenAvailable ||
|
||||
_changeStreamSpec.getFullDocument() == FullDocumentModeEnum::kRequired),
|
||||
_emitFromMigrateField(_changeStreamSpec.getShowMigrationEvents() &&
|
||||
changeStreamsEmitFromMigrate.loadRelaxed()) {}
|
||||
|
||||
ChangeStreamEventTransformation::SupportedEvents
|
||||
ChangeStreamDefaultEventTransformation::buildSupportedEvents() const {
|
||||
@ -297,6 +297,7 @@ ChangeStreamDefaultEventTransformation::buildSupportedEvents() const {
|
||||
}
|
||||
|
||||
std::set<std::string> ChangeStreamDefaultEventTransformation::getFieldNameDependencies() const {
|
||||
// Fields that are accessed by default.
|
||||
std::set<std::string> accessedFields = {
|
||||
std::string{repl::OplogEntry::kOpTypeFieldName},
|
||||
std::string{repl::OplogEntry::kTimestampFieldName},
|
||||
@ -308,13 +309,26 @@ std::set<std::string> ChangeStreamDefaultEventTransformation::getFieldNameDepend
|
||||
std::string{repl::OplogEntry::kTxnNumberFieldName},
|
||||
std::string{DocumentSourceChangeStream::kTxnOpIndexField},
|
||||
std::string{repl::OplogEntry::kWallClockTimeFieldName},
|
||||
std::string{DocumentSourceChangeStream::kCommitTimestampField},
|
||||
std::string{repl::OplogEntry::kTidFieldName}};
|
||||
|
||||
// Fields that are only accessed when pre- or post-images are selected.
|
||||
if (_preImageRequested || _postImageRequested) {
|
||||
accessedFields.insert(std::string{DocumentSourceChangeStream::kApplyOpsIndexField});
|
||||
accessedFields.insert(std::string{DocumentSourceChangeStream::kApplyOpsTsField});
|
||||
}
|
||||
|
||||
// 'fromMigrate' field is only accessed if the change stream is opened with the flag
|
||||
// 'showMigrationEvents' and the server parameter 'changeStreamsEmitFromMigrate' is enabled.
|
||||
if (_emitFromMigrateField) {
|
||||
accessedFields.insert(std::string{DocumentSourceChangeStream::kFromMigrateField});
|
||||
}
|
||||
|
||||
// The commit timestamp is only needed if the change stream is opened with the flag
|
||||
// 'showCommitTimestamp'.
|
||||
if (_changeStreamSpec.getShowCommitTimestamp()) {
|
||||
accessedFields.insert(std::string{DocumentSourceChangeStream::kCommitTimestampField});
|
||||
}
|
||||
|
||||
return accessedFields;
|
||||
}
|
||||
|
||||
@ -745,6 +759,15 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
|
||||
doc.addField(DocumentSourceChangeStream::kNsTypeField, Value(nsType));
|
||||
}
|
||||
|
||||
// If migration events should be returned, add a field 'fromMigrate' to the result event if case
|
||||
// the 'fromMigrate' field is set for the oplog entry.
|
||||
if (_emitFromMigrateField) {
|
||||
if (auto value = input.getField(DocumentSourceChangeStream::kFromMigrateField);
|
||||
value.getType() == BSONType::boolean) {
|
||||
doc.addField(DocumentSourceChangeStream::kFromMigrateField, Value{value.getBool()});
|
||||
}
|
||||
}
|
||||
|
||||
return doc.freeze();
|
||||
}
|
||||
|
||||
|
||||
@ -79,12 +79,6 @@ protected:
|
||||
const DocumentSourceChangeStreamSpec _changeStreamSpec;
|
||||
boost::intrusive_ptr<ExpressionContext> _expCtx;
|
||||
ResumeTokenData _resumeToken;
|
||||
|
||||
// Set to true if the pre-image should be included in the output documents.
|
||||
bool _preImageRequested = false;
|
||||
|
||||
// Set to true if the post-image should be included in the output documents.
|
||||
bool _postImageRequested = false;
|
||||
};
|
||||
|
||||
/*
|
||||
@ -129,6 +123,24 @@ private:
|
||||
* 'o2' field value.
|
||||
*/
|
||||
ChangeStreamEventTransformation::SupportedEvents _supportedEvents;
|
||||
|
||||
/**
|
||||
* Set to true if the pre-image should be included in the output documents.
|
||||
*/
|
||||
const bool _preImageRequested = false;
|
||||
|
||||
/**
|
||||
* Set to true if the post-image should be included in the output documents.
|
||||
*/
|
||||
const bool _postImageRequested = false;
|
||||
|
||||
/**
|
||||
* If set to 'true', the change stream will emit a 'fromMigrate' field with a value of 'true'
|
||||
* for all events originating from a migration. This requires the change stream to be opened
|
||||
* with the 'showMigrationEvents' flag and also the 'changeStreamsEmitFromMigrate' server
|
||||
* parameter to be enabled.
|
||||
*/
|
||||
const bool _emitFromMigrateField = false;
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -237,6 +237,10 @@ public:
|
||||
// "collection", "view" or "timeseries". Will only be exposed if 'showExpandedEvents' is used.
|
||||
static constexpr StringData kNsTypeField = "nsType"_sd;
|
||||
|
||||
// The name of both the "migration marker" field in oplog entries and the output field in change
|
||||
// events if the change stream is opened with 'showMigrationEvents=true'.
|
||||
static constexpr StringData kFromMigrateField = "fromMigrate"_sd;
|
||||
|
||||
// The name of this stage.
|
||||
static constexpr StringData kStageName = "$changeStream"_sd;
|
||||
|
||||
|
||||
@ -1324,7 +1324,36 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrate) {
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) {
|
||||
bool fromMigrate = true;
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", true);
|
||||
|
||||
constexpr bool fromMigrate = true;
|
||||
auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
|
||||
nss, // namespace
|
||||
BSON("x" << 2 << "_id" << 1), // o
|
||||
testUuid(), // uuid
|
||||
fromMigrate, // fromMigrate
|
||||
BSON("_id" << 1 << "x" << 2)); // o2
|
||||
|
||||
auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
|
||||
Document expectedInsert{
|
||||
{DSChangeStream::kIdField,
|
||||
makeResumeToken(
|
||||
kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2), DSChangeStream::kInsertOpType)},
|
||||
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
|
||||
{DSChangeStream::kClusterTimeField, kDefaultTs},
|
||||
{DSChangeStream::kWallTimeField, Date_t()},
|
||||
{DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}},
|
||||
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
|
||||
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
|
||||
{DSChangeStream::kFromMigrateField, true},
|
||||
};
|
||||
checkTransformation(insert, expectedInsert, spec);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrationsDontEmitFromMigrate) {
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", false);
|
||||
|
||||
constexpr bool fromMigrate = true;
|
||||
auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
|
||||
nss, // namespace
|
||||
BSON("x" << 2 << "_id" << 1), // o
|
||||
@ -1688,7 +1717,36 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrate) {
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) {
|
||||
bool fromMigrate = true;
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", true);
|
||||
|
||||
constexpr bool fromMigrate = true;
|
||||
BSONObj o = BSON("_id" << 1);
|
||||
auto deleteEntry = makeOplogEntry(OpTypeEnum::kDelete, // op type
|
||||
nss, // namespace
|
||||
o, // o
|
||||
testUuid(), // uuid
|
||||
fromMigrate, // fromMigrate
|
||||
BSON("_id" << 1)); // o2
|
||||
|
||||
auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
|
||||
Document expectedDelete{
|
||||
{DSChangeStream::kIdField,
|
||||
makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)},
|
||||
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
|
||||
{DSChangeStream::kClusterTimeField, kDefaultTs},
|
||||
{DSChangeStream::kWallTimeField, Date_t()},
|
||||
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
|
||||
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
|
||||
{DSChangeStream::kFromMigrateField, true},
|
||||
};
|
||||
|
||||
checkTransformation(deleteEntry, expectedDelete, spec);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrationsDontEmitFromMigrate) {
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", false);
|
||||
|
||||
constexpr bool fromMigrate = true;
|
||||
BSONObj o = BSON("_id" << 1);
|
||||
auto deleteEntry = makeOplogEntry(OpTypeEnum::kDelete, // op type
|
||||
nss, // namespace
|
||||
@ -1956,6 +2014,7 @@ TEST_F(ChangeStreamStageTest, TransformShardingEvents) {
|
||||
{DSChangeStream::kWallTimeField, Date_t()},
|
||||
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
|
||||
{DSChangeStream::kOperationDescriptionField, opDesc},
|
||||
{DSChangeStream::kFromMigrateField, false},
|
||||
};
|
||||
|
||||
if (eventType == DSChangeStream::kNewShardDetectedOpType) {
|
||||
@ -1973,6 +2032,40 @@ TEST_F(ChangeStreamStageTest, TransformShardingEvents) {
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageTest, TransformReshardBegin) {
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", true);
|
||||
|
||||
auto uuid = UUID::gen();
|
||||
auto reshardingUuid = UUID::gen();
|
||||
|
||||
ReshardBeginChangeEventO2Field o2Field{nss, reshardingUuid};
|
||||
auto reshardingBegin = makeOplogEntry(OpTypeEnum::kNoop,
|
||||
nss,
|
||||
BSONObj(),
|
||||
uuid,
|
||||
true, // fromMigrate
|
||||
o2Field.toBSON());
|
||||
|
||||
auto spec = fromjson("{$changeStream: {showMigrationEvents: true, showExpandedEvents: true}}");
|
||||
|
||||
const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}};
|
||||
Document expectedReshardingBegin{
|
||||
{DSChangeStream::kReshardingUuidField, reshardingUuid},
|
||||
{DSChangeStream::kIdField,
|
||||
makeResumeToken(kDefaultTs, uuid, opDesc, DSChangeStream::kReshardBeginOpType)},
|
||||
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType},
|
||||
{DSChangeStream::kClusterTimeField, kDefaultTs},
|
||||
{DSChangeStream::kCollectionUuidField, uuid},
|
||||
{DSChangeStream::kWallTimeField, Date_t()},
|
||||
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
|
||||
{DSChangeStream::kOperationDescriptionField, opDesc},
|
||||
{DSChangeStream::kFromMigrateField, true},
|
||||
};
|
||||
checkTransformation(reshardingBegin, expectedReshardingBegin, spec);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageTest, TransformReshardBeginDontEmitFromMigrate) {
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", false);
|
||||
|
||||
auto uuid = UUID::gen();
|
||||
auto reshardingUuid = UUID::gen();
|
||||
|
||||
@ -2034,6 +2127,48 @@ TEST_F(ChangeStreamStageTest, TransformReshardBlockingWrites) {
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", true);
|
||||
|
||||
auto existingUuid = UUID::gen();
|
||||
auto reshardingUuid = UUID::gen();
|
||||
auto temporaryNs = resharding::constructTemporaryReshardingNss(nss, existingUuid);
|
||||
|
||||
ReshardDoneCatchUpChangeEventO2Field o2Field{temporaryNs, reshardingUuid};
|
||||
auto reshardDoneCatchUp = makeOplogEntry(OpTypeEnum::kNoop,
|
||||
temporaryNs,
|
||||
BSONObj(),
|
||||
reshardingUuid,
|
||||
true, // fromMigrate
|
||||
o2Field.toBSON());
|
||||
|
||||
auto spec = fromjson(
|
||||
"{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true, "
|
||||
"showExpandedEvents: true}}");
|
||||
auto expCtx = getExpCtx();
|
||||
expCtx->setNamespaceString(temporaryNs);
|
||||
|
||||
const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}};
|
||||
Document expectedReshardingDoneCatchUp{
|
||||
{DSChangeStream::kReshardingUuidField, reshardingUuid},
|
||||
{DSChangeStream::kIdField,
|
||||
makeResumeToken(
|
||||
kDefaultTs, reshardingUuid, opDesc, DSChangeStream::kReshardDoneCatchUpOpType)},
|
||||
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType},
|
||||
{DSChangeStream::kClusterTimeField, kDefaultTs},
|
||||
{DSChangeStream::kCollectionUuidField, reshardingUuid},
|
||||
{DSChangeStream::kWallTimeField, Date_t()},
|
||||
{DSChangeStream::kNamespaceField,
|
||||
D{{"db", temporaryNs.db_forTest()}, {"coll", temporaryNs.coll()}}},
|
||||
{DSChangeStream::kOperationDescriptionField, opDesc},
|
||||
{DSChangeStream::kFromMigrateField, true},
|
||||
};
|
||||
|
||||
checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUpDontEmitFromMigrate) {
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", false);
|
||||
|
||||
auto existingUuid = UUID::gen();
|
||||
auto reshardingUuid = UUID::gen();
|
||||
auto temporaryNs = resharding::constructTemporaryReshardingNss(nss, existingUuid);
|
||||
@ -4659,6 +4794,36 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrate) {
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) {
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", true);
|
||||
|
||||
bool fromMigrate = true;
|
||||
BSONObj o = BSON("_id" << 1 << "x" << 2);
|
||||
auto deleteEntry = makeOplogEntry(OpTypeEnum::kDelete, // op type
|
||||
nss, // namespace
|
||||
o, // o
|
||||
testUuid(), // uuid
|
||||
fromMigrate, // fromMigrate
|
||||
boost::none); // o2
|
||||
|
||||
// Delete
|
||||
auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
|
||||
Document expectedDelete{
|
||||
{DSChangeStream::kIdField,
|
||||
makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)},
|
||||
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
|
||||
{DSChangeStream::kClusterTimeField, kDefaultTs},
|
||||
{DSChangeStream::kWallTimeField, Date_t()},
|
||||
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
|
||||
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
|
||||
{DSChangeStream::kFromMigrateField, true},
|
||||
};
|
||||
|
||||
checkTransformation(deleteEntry, expectedDelete, spec);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrationsDontEmitFromMigrate) {
|
||||
RAIIServerParameterControllerForTest emitFromMigrate("changeStreamsEmitFromMigrate", false);
|
||||
|
||||
bool fromMigrate = true;
|
||||
BSONObj o = BSON("_id" << 1 << "x" << 2);
|
||||
auto deleteEntry = makeOplogEntry(OpTypeEnum::kDelete, // op type
|
||||
|
||||
@ -639,6 +639,17 @@ server_parameters:
|
||||
lte: 600
|
||||
redact: false
|
||||
|
||||
changeStreamsEmitFromMigrate:
|
||||
description: >-
|
||||
If enabled, exposes a 'fromMigrate' field with a value of 'true' in change stream
|
||||
events that originate from a migration. The field is only emitted for change streams
|
||||
that are opened with the 'showMigrationEvents' flag.
|
||||
set_at: [startup, runtime]
|
||||
cpp_varname: "changeStreamsEmitFromMigrate"
|
||||
cpp_vartype: AtomicWord<bool>
|
||||
default: true
|
||||
redact: false
|
||||
|
||||
internalChangeStreamUseTenantIdForTesting:
|
||||
description: >-
|
||||
If true, then change streams will operate upon an internal tenant id for testing
|
||||
|
||||
Loading…
Reference in New Issue
Block a user