SERVER-65859 Enable filtering of 'fromMigrate' change steam events for individual operations within applyOps array
This commit is contained in:
parent
04e49a5d48
commit
5cc18bcd68
@ -53,16 +53,16 @@ let suspendRangeDeletionShard0 = configureFailPoint(st.shard0, 'suspendRangeDele
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {_id: 1}}));
|
||||
const coll = st.s.getCollection(collNS);
|
||||
assert.commandWorked(coll.insert({_id: -2, name: 'emma', age: 20})); // Test case 4
|
||||
assert.commandWorked(coll.insert({_id: -1, name: 'olivia', age: 25})); // Test case 3
|
||||
assert.commandWorked(coll.insert({_id: 0, name: 'matt', age: 30})); // Test case 1
|
||||
assert.commandWorked(coll.insert({_id: 1, name: 'matt', age: 35})); // Test case 1
|
||||
assert.commandWorked(coll.insert({_id: 2, name: 'john', age: 40})); // Test case 2
|
||||
assert.commandWorked(coll.insert({_id: 3, name: 'robert', age: 45})); // Test case 2
|
||||
assert.commandWorked(coll.insert({_id: 4, name: 'robert', age: 50})); // Test case 2
|
||||
assert.commandWorked(coll.insert({_id: 5, name: 'james', age: 55})); // Test case 3
|
||||
assert.commandWorked(coll.insert({_id: 6, name: 'liam', age: 60})); // Test case 4
|
||||
const mongosColl = st.s.getCollection(collNS);
|
||||
assert.commandWorked(mongosColl.insert({_id: -2, name: 'emma', age: 20})); // Test case 4
|
||||
assert.commandWorked(mongosColl.insert({_id: -1, name: 'olivia', age: 25})); // Test case 3
|
||||
assert.commandWorked(mongosColl.insert({_id: 0, name: 'matt', age: 30})); // Test case 1
|
||||
assert.commandWorked(mongosColl.insert({_id: 1, name: 'matt', age: 35})); // Test case 1
|
||||
assert.commandWorked(mongosColl.insert({_id: 2, name: 'john', age: 40})); // Test case 2
|
||||
assert.commandWorked(mongosColl.insert({_id: 3, name: 'robert', age: 45})); // Test case 2
|
||||
assert.commandWorked(mongosColl.insert({_id: 4, name: 'robert', age: 50})); // Test case 2
|
||||
assert.commandWorked(mongosColl.insert({_id: 5, name: 'james', age: 55})); // Test case 3
|
||||
assert.commandWorked(mongosColl.insert({_id: 6, name: 'liam', age: 60})); // Test case 4
|
||||
|
||||
// Move the chunk to the second shard leaving orphaned documents on the first shard.
|
||||
assert.commandWorked(st.s.adminCommand({split: collNS, middle: {_id: 0}}));
|
||||
@ -70,7 +70,7 @@ assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName}));
|
||||
|
||||
// Setup a change stream on the collection to receive real-time events on any data changes.
|
||||
const changeStream = coll.watch([]);
|
||||
const changeStream = mongosColl.watch([]);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Test case 1: Direct operations to shard on orphaned documents
|
||||
@ -146,7 +146,7 @@ jsTest.log('A direct delete to a shard of multi-documents does not generate dele
|
||||
jsTest.log('A broadcasted update of a single document generates an update event');
|
||||
{
|
||||
// Send a broadcasted update (query on non-key field) on a single document to all the shards.
|
||||
assert.commandWorked(coll.update({name: 'john'}, {$set: {age: 41}}, {multi: true}));
|
||||
assert.commandWorked(mongosColl.update({name: 'john'}, {$set: {age: 41}}, {multi: true}));
|
||||
|
||||
// The document is hosted by the second shard and the update event is notified. The first shard
|
||||
// still hosts the orphaned document so no additional event must be notified.
|
||||
@ -163,7 +163,7 @@ jsTest.log('A broadcasted update of a single document generates an update event'
|
||||
jsTest.log('A broadcasted delete of a single document generates a delete event');
|
||||
{
|
||||
// Send a broadcasted delete (query on non-key field) on a single document to all the shards.
|
||||
assert.commandWorked(coll.remove({name: 'john'}));
|
||||
assert.commandWorked(mongosColl.remove({name: 'john'}));
|
||||
|
||||
// The document is hosted by the second shard and the delete event is notified. The first shard
|
||||
// still hosts the orphaned document so no additional event must be notified.
|
||||
@ -180,7 +180,7 @@ jsTest.log('A broadcasted delete of a single document generates a delete event')
|
||||
jsTest.log('A broadcasted update of multi-documents generates more update events');
|
||||
{
|
||||
// Send a broadcasted update (query on non-key field) on two documents to all the shards.
|
||||
assert.commandWorked(coll.update({name: 'robert'}, {$set: {age: 46}}, {multi: true}));
|
||||
assert.commandWorked(mongosColl.update({name: 'robert'}, {$set: {age: 46}}, {multi: true}));
|
||||
|
||||
// The documents are hosted by the second shard and two delete events are notified. The first
|
||||
// shard still hosts the orphaned documents so no additional event must be notified.
|
||||
@ -201,7 +201,7 @@ jsTest.log('A broadcasted update of multi-documents generates more update events
|
||||
jsTest.log('A broadcasted delete of multi-documents generates more delete events');
|
||||
{
|
||||
// Send a broadcasted delete (query on non-key field) on two documents to all the shards.
|
||||
assert.commandWorked(coll.remove({name: 'robert'}));
|
||||
assert.commandWorked(mongosColl.remove({name: 'robert'}));
|
||||
|
||||
// The documents are hosted by the second shard and two delete events are notified. The first
|
||||
// shard still hosts the orphaned documents so no additional event must be notified.
|
||||
@ -296,14 +296,10 @@ jsTest.log('Direct updates (via a transaction to a shard) of both orphaned and o
|
||||
assert.commandWorked(session.commitTransaction_forTesting());
|
||||
session.endSession();
|
||||
|
||||
// The shard hosts both orphaned (liam) and owned (emma) documents. Consequently, only one
|
||||
// update event is notified.
|
||||
// TODO (SERVER-65859): The second update event will be filtered out when the ticket is
|
||||
// completed.
|
||||
// The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only
|
||||
// one update event is notified.
|
||||
assert.soon(() => changeStream.hasNext(), 'A first update event is expected');
|
||||
assert.eq(changeStream.next().operationType, 'update');
|
||||
assert.soon(() => changeStream.hasNext(), 'A second update event is expected');
|
||||
assert.eq(changeStream.next().operationType, 'update');
|
||||
assertNoChanges(changeStream);
|
||||
|
||||
// Both orphaned (liam) and owned (emma) documents on the shard have been updated.
|
||||
@ -324,14 +320,10 @@ jsTest.log('Direct deletes (via a transaction to a shard) of both orphaned and o
|
||||
assert.commandWorked(session.commitTransaction_forTesting());
|
||||
session.endSession();
|
||||
|
||||
// The shard hosts both orphaned (liam) and owned (emma) documents. Consequently, only one
|
||||
// update event is notified.
|
||||
// TODO (SERVER-65859): The second delete event will be filtered out when the ticket is
|
||||
// completed.
|
||||
// The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only
|
||||
// one update event is notified.
|
||||
assert.soon(() => changeStream.hasNext(), 'A first delete event is expected');
|
||||
assert.eq(changeStream.next().operationType, 'delete');
|
||||
assert.soon(() => changeStream.hasNext(), 'A second delete event is expected');
|
||||
assert.eq(changeStream.next().operationType, 'delete');
|
||||
assertNoChanges(changeStream);
|
||||
|
||||
// Both orphaned (liam) and owned (emma) documents on the shard have been removed.
|
||||
@ -343,7 +335,7 @@ jsTest.log('Direct deletes (via a transaction to a shard) of both orphaned and o
|
||||
|
||||
jsTest.log('The collection drop generates a drop event');
|
||||
{
|
||||
coll.drop();
|
||||
mongosColl.drop();
|
||||
|
||||
// Essentially, this verifies that the operation before dropping the collection did not notify
|
||||
// additional and unexpected events.
|
||||
|
||||
@ -63,6 +63,13 @@ std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
|
||||
// filtered out by the default 'ns' filter this stage gets initialized with.
|
||||
auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr));
|
||||
|
||||
// To correctly handle filtering out entries of direct write operations on orphaned documents,
|
||||
// we include a filter for "fromMigrate" flagged operations, unless "fromMigrate" events are
|
||||
// explicitly requested in the spec.
|
||||
if (!expCtx->changeStreamSpec->getShowMigrationEvents()) {
|
||||
unwindFilter->add(buildNotFromMigrateFilter(expCtx, userMatch));
|
||||
}
|
||||
|
||||
// Attempt to rewrite the user's filter and combine it with the standard operation filter. We do
|
||||
// this separately because we need to exclude certain fields from the user's filters. Unwound
|
||||
// transaction events do not have these fields until we populate them from the commitTransaction
|
||||
|
||||
Loading…
Reference in New Issue
Block a user