SERVER-41183 Add test suites for change streams with transactions
This commit is contained in:
parent
a368e90685
commit
b3c26131f6
@ -0,0 +1,66 @@
|
||||
test_kind: js_test
|
||||
|
||||
selector:
|
||||
roots:
|
||||
- jstests/change_streams/**/*.js
|
||||
exclude_files:
|
||||
# Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
|
||||
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
|
||||
exclude_with_any_tags:
|
||||
# These tests would fail with "Cowardly refusing to override write concern of command: ..."
|
||||
- assumes_write_concern_unchanged
|
||||
# No need to use a passthrough to add transactions to a test that already has its own
|
||||
# transactions.
|
||||
- uses_transactions
|
||||
# These tests make assumptions about change stream results that are no longer true once operations
|
||||
# get bundled into transactions.
|
||||
- change_stream_does_not_expect_txns
|
||||
|
||||
executor:
|
||||
archive:
|
||||
hooks:
|
||||
- CheckReplDBHash
|
||||
- CheckReplOplogs
|
||||
- ValidateCollections
|
||||
config:
|
||||
shell_options:
|
||||
global_vars:
|
||||
TestData:
|
||||
networkErrorAndTxnOverrideConfig:
|
||||
wrapCRUDinTransactions: true
|
||||
# Enable the transactions passthrough.
|
||||
eval: >-
|
||||
var testingReplication = true;
|
||||
load('jstests/libs/override_methods/enable_sessions.js');
|
||||
load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js');
|
||||
load('jstests/libs/override_methods/network_error_and_txn_override.js');
|
||||
readMode: commands
|
||||
hooks:
|
||||
# The CheckReplDBHash hook waits until all operations have replicated to and have been applied
|
||||
# on the secondaries, so we run the ValidateCollections hook after it to ensure we're
|
||||
# validating the entire contents of the collection.
|
||||
- class: CheckReplOplogs
|
||||
- class: CheckReplDBHash
|
||||
- class: ValidateCollections
|
||||
- class: CleanEveryN
|
||||
n: 20
|
||||
fixture:
|
||||
class: ShardedClusterFixture
|
||||
# Use two shards to make sure we will only talk to the primary shard for the database and will
|
||||
# not delay changes to wait for notifications or a clock advancement from other shards.
|
||||
num_shards: 2
|
||||
mongos_options:
|
||||
bind_ip_all: ''
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
mongod_options:
|
||||
bind_ip_all: ''
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
periodicNoopIntervalSecs: 1
|
||||
writePeriodicNoops: true
|
||||
num_rs_nodes_per_shard: 1
|
||||
# This test suite doesn't actually shard any collections, but enabling sharding will prevent
|
||||
# read commands against non-existent databases from unconditionally returning a CursorId of 0.
|
||||
enable_sharding:
|
||||
- test
|
||||
@ -0,0 +1,53 @@
|
||||
test_kind: js_test
|
||||
|
||||
selector:
|
||||
roots:
|
||||
- jstests/change_streams/**/*.js
|
||||
exclude_files:
|
||||
# Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
|
||||
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
|
||||
exclude_with_any_tags:
|
||||
# These tests would fail with "Cowardly refusing to override write concern of command: ..."
|
||||
- assumes_write_concern_unchanged
|
||||
# No need to use a passthrough to add transactions to a test that already has its own
|
||||
# transactions.
|
||||
- uses_transactions
|
||||
# These tests make assumptions about change stream results that are no longer true once operations
|
||||
# get bundled into transactions.
|
||||
- change_stream_does_not_expect_txns
|
||||
|
||||
executor:
|
||||
archive:
|
||||
hooks:
|
||||
- CheckReplDBHash
|
||||
- CheckReplOplogs
|
||||
- ValidateCollections
|
||||
config:
|
||||
shell_options:
|
||||
global_vars:
|
||||
TestData:
|
||||
networkErrorAndTxnOverrideConfig:
|
||||
wrapCRUDinTransactions: true
|
||||
# Enable the transactions passthrough.
|
||||
eval: >-
|
||||
var testingReplication = true;
|
||||
load('jstests/libs/override_methods/enable_sessions.js');
|
||||
load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js');
|
||||
load('jstests/libs/override_methods/network_error_and_txn_override.js');
|
||||
readMode: commands
|
||||
hooks:
|
||||
# The CheckReplDBHash hook waits until all operations have replicated to and have been applied
|
||||
# on the secondaries, so we run the ValidateCollections hook after it to ensure we're
|
||||
# validating the entire contents of the collection.
|
||||
- class: CheckReplOplogs
|
||||
- class: CheckReplDBHash
|
||||
- class: ValidateCollections
|
||||
- class: CleanEveryN
|
||||
n: 20
|
||||
fixture:
|
||||
class: ReplicaSetFixture
|
||||
mongod_options:
|
||||
bind_ip_all: ''
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
num_nodes: 1
|
||||
@ -0,0 +1,62 @@
|
||||
test_kind: js_test
|
||||
|
||||
selector:
|
||||
roots:
|
||||
- jstests/change_streams/**/*.js
|
||||
exclude_files:
|
||||
# Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
|
||||
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
|
||||
exclude_with_any_tags:
|
||||
# These tests would fail with "Cowardly refusing to override write concern of command: ..."
|
||||
- assumes_write_concern_unchanged
|
||||
# No need to use a passthrough to add transactions to a test that already has its own
|
||||
# transactions.
|
||||
- uses_transactions
|
||||
# These tests make assumptions about change stream results that are no longer true once operations
|
||||
# get bundled into transactions.
|
||||
- change_stream_does_not_expect_txns
|
||||
|
||||
executor:
|
||||
archive:
|
||||
hooks:
|
||||
- CheckReplDBHash
|
||||
- CheckReplOplogs
|
||||
- ValidateCollections
|
||||
config:
|
||||
shell_options:
|
||||
global_vars:
|
||||
TestData:
|
||||
networkErrorAndTxnOverrideConfig:
|
||||
wrapCRUDinTransactions: true
|
||||
# Enable the transactions passthrough.
|
||||
eval: >-
|
||||
var testingReplication = true;
|
||||
load('jstests/libs/override_methods/enable_sessions.js');
|
||||
load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js');
|
||||
load('jstests/libs/override_methods/network_error_and_txn_override.js');
|
||||
readMode: commands
|
||||
hooks:
|
||||
# The CheckReplDBHash hook waits until all operations have replicated to and have been applied
|
||||
# on the secondaries, so we run the ValidateCollections hook after it to ensure we're
|
||||
# validating the entire contents of the collection.
|
||||
- class: CheckReplOplogs
|
||||
- class: CheckReplDBHash
|
||||
- class: ValidateCollections
|
||||
- class: CleanEveryN
|
||||
n: 20
|
||||
fixture:
|
||||
class: ShardedClusterFixture
|
||||
mongos_options:
|
||||
bind_ip_all: ''
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
mongod_options:
|
||||
bind_ip_all: ''
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
writePeriodicNoops: 1
|
||||
periodicNoopIntervalSecs: 1
|
||||
num_rs_nodes_per_shard: 1
|
||||
num_shards: 2
|
||||
enable_sharding:
|
||||
- test
|
||||
@ -5816,6 +5816,39 @@ tasks:
|
||||
vars:
|
||||
resmoke_args: --suites=change_streams_whole_cluster_sharded_collections_passthrough --storageEngine=wiredTiger
|
||||
|
||||
- <<: *task_template
|
||||
name: change_streams_multi_stmt_txn_passthrough
|
||||
tags: ["change_streams"]
|
||||
depends_on:
|
||||
- name: change_streams
|
||||
commands:
|
||||
- func: "do setup"
|
||||
- func: "run tests"
|
||||
vars:
|
||||
resmoke_args: --suites=change_streams_multi_stmt_txn_passthrough --storageEngine=wiredTiger
|
||||
|
||||
- <<: *task_template
|
||||
name: change_streams_multi_stmt_txn_mongos_passthrough
|
||||
tags: ["change_streams"]
|
||||
depends_on:
|
||||
- name: change_streams
|
||||
commands:
|
||||
- func: "do setup"
|
||||
- func: "run tests"
|
||||
vars:
|
||||
resmoke_args: --suites=change_streams_multi_stmt_txn_mongos_passthrough --storageEngine=wiredTiger
|
||||
|
||||
- <<: *task_template
|
||||
name: change_streams_multi_stmt_txn_sharded_collections_passthrough
|
||||
tags: ["change_streams"]
|
||||
depends_on:
|
||||
- name: change_streams
|
||||
commands:
|
||||
- func: "do setup"
|
||||
- func: "run tests"
|
||||
vars:
|
||||
resmoke_args: --suites=change_streams_multi_stmt_txn_sharded_collections_passthrough --storageEngine=wiredTiger
|
||||
|
||||
- <<: *task_template
|
||||
name: disk_mobile
|
||||
commands:
|
||||
|
||||
@ -1,4 +1,9 @@
|
||||
// Tests that each change in the stream will include the cluster time at which it happened.
|
||||
//
|
||||
// This test expects each change stream result to have an operationTime based on the clusterTime in
|
||||
// the oplog entry. When operations get bundled into a transaction, their operationTime is instead
|
||||
// based on the commit oplog entry, which would cause this test to fail.
|
||||
// @tags: [change_stream_does_not_expect_txns]
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
// Tests that an aggregate with a $changeStream stage will report the latest optime read in
|
||||
// the oplog by its cursor. This is information is needed in order to correctly merge the results
|
||||
// from the various shards on mongos.
|
||||
//
|
||||
// This test expects operations timestamps from a change stream to strictly increase with each
|
||||
// operation, which does not happen when the operations get grouped into a transaction.
|
||||
// @tags: [change_stream_does_not_expect_txns]
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
|
||||
@ -1,4 +1,9 @@
|
||||
// Tests that the $changeStream stage can only be present as the first stage in the pipeline.
|
||||
//
|
||||
// The passthrough logic that bundles operations into transactions needs to be able identify change
|
||||
// stream aggregations so as to avoid running them in a transaction, but that code would fail to
|
||||
// recognize the intentionally malformed aggergations that we test here.
|
||||
// @tags: [change_stream_does_not_expect_txns]
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
|
||||
@ -160,18 +160,22 @@
|
||||
const otherCollection = assertCreateCollection(db, otherCollName);
|
||||
const adminDB = db.getSiblingDB("admin");
|
||||
|
||||
// Open a stream on the test collection. Write one document to the test collection and one to
|
||||
// the unrelated collection, in order to push the postBatchResumeToken (PBRT) past the last
|
||||
// related event.
|
||||
// Open a stream on the test collection, and write a document to it.
|
||||
csCursor = testCollection.watch();
|
||||
assert.commandWorked(testCollection.insert({_id: docId++}));
|
||||
assert.commandWorked(otherCollection.insert({}));
|
||||
|
||||
// Consume all events. The PBRT of the batch should be greater than the last event, which
|
||||
// guarantees that it is a synthetic high-water-mark token.
|
||||
// Write an event to the unrelated collection in order to advance the PBRT, and then consume all
|
||||
// events. When we see a PBRT that is greater than the timestamp of the last event (stored in
|
||||
// 'relatedEvent'), we know it must be a synthetic high-water-mark token.
|
||||
//
|
||||
// Note that the first insert into the unrelated collection may not be enough to advance the
|
||||
// PBRT; some passthroughs will group the unrelated write into a transaction with the related
|
||||
// write, giving them the same timestamp. We put the unrelated insert into the assert.soon loop,
|
||||
// so that it will eventually get its own transaction with a new timestamp.
|
||||
let relatedEvent = null;
|
||||
let hwmToken = null;
|
||||
assert.soon(() => {
|
||||
assert.commandWorked(otherCollection.insert({}));
|
||||
if (csCursor.hasNext()) {
|
||||
relatedEvent = csCursor.next();
|
||||
}
|
||||
|
||||
@ -1,6 +1,11 @@
|
||||
// Test change streams related shell helpers and options passed to them. Note that, while we only
|
||||
// call the DBCollection.watch helper in this file, it will be redirected to the DB.watch or
|
||||
// Mongo.watch equivalents in the whole_db and whole_cluster passthroughs.
|
||||
//
|
||||
// This test expects each change stream result to have an operationTime based on the clusterTime in
|
||||
// the oplog entry. When operations get bundled into a transaction, their operationTime is instead
|
||||
// based on the commit oplog entry, which would cause this test to fail.
|
||||
// @tags: [change_stream_does_not_expect_txns]
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
|
||||
@ -65,12 +65,21 @@ function assertInvalidateOp({cursor, opType}) {
|
||||
*/
|
||||
function assertChangeStreamEventEq(actualEvent, expectedEvent) {
|
||||
const testEvent = Object.assign({}, actualEvent);
|
||||
if (expectedEvent._id == null) {
|
||||
if (!expectedEvent.hasOwnProperty("_id")) {
|
||||
delete testEvent._id; // Remove the resume token, if present.
|
||||
}
|
||||
if (expectedEvent.clusterTime == null) {
|
||||
if (!expectedEvent.hasOwnProperty("clusterTime")) {
|
||||
delete testEvent.clusterTime; // Remove the cluster time, if present.
|
||||
}
|
||||
|
||||
// The change stream transaction passthrough causes operations to have txnNumber and lsid
|
||||
// values that the test doesn't expect, which can cause comparisons to fail.
|
||||
if (!expectedEvent.hasOwnProperty("txnNumber")) {
|
||||
delete testEvent.txnNumber; // Remove the txnNumber, if present.
|
||||
}
|
||||
if (!expectedEvent.hasOwnProperty("lsid")) {
|
||||
delete testEvent.lsid; // Remove the lsid, if present.
|
||||
}
|
||||
assert.docEq(testEvent,
|
||||
expectedEvent,
|
||||
"Change did not match expected change. Expected change: " + tojson(expectedEvent) +
|
||||
|
||||
@ -250,6 +250,12 @@
|
||||
ops = [];
|
||||
}
|
||||
|
||||
// The (initially empty) set of cursors belonging to aggregation operations that executed
|
||||
// outside of a transaction. Any getMore operations on these cursors must also execute outside
|
||||
// of a transaction. The set stores key/value pairs where the key is a cursor id and the value
|
||||
// is the true boolean value.
|
||||
let nonTxnAggCursorSet = {};
|
||||
|
||||
// Set the max number of operations to run in a transaction. Once we've hit this number of
|
||||
// operations, we will commit the transaction. This is to prevent having to retry an extremely
|
||||
// long running transaction.
|
||||
@ -381,8 +387,11 @@
|
||||
shouldForceWriteConcern = false;
|
||||
}
|
||||
} else if (cmdName === "aggregate") {
|
||||
if (OverrideHelpers.isAggregationWithListLocalSessionsStage(cmdName, cmdObj)) {
|
||||
// The $listLocalSessions stage can only be used with readConcern={level: "local"}.
|
||||
if (OverrideHelpers.isAggregationWithListLocalSessionsStage(cmdName, cmdObj) ||
|
||||
OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj)) {
|
||||
// The $listLocalSessions stage can only be used with readConcern={level: "local"},
|
||||
// and the $changeStream stage can only be used with
|
||||
// readConcern={level: "majority"}.
|
||||
shouldForceReadConcern = false;
|
||||
}
|
||||
|
||||
@ -573,6 +582,12 @@
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true iff a command is a "getMore" on a cursor that is in the `nonTxnAggCursorSet`
|
||||
// dictionary of cursors that were created outside of any transaction.
|
||||
function isCommandNonTxnGetMore(cmdName, cmdObj) {
|
||||
return cmdName === "getMore" && nonTxnAggCursorSet[cmdObj.getMore];
|
||||
}
|
||||
|
||||
function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) {
|
||||
// We want to overwrite whatever read and write concern is already set.
|
||||
delete cmdObj.readConcern;
|
||||
@ -583,7 +598,8 @@
|
||||
const driverSession = conn.getDB(dbName).getSession();
|
||||
const commandSupportsTransaction =
|
||||
TransactionsUtil.commandSupportsTxn(dbName, cmdName, cmdObj);
|
||||
if (commandSupportsTransaction && driverSession.getSessionId() !== null) {
|
||||
if (commandSupportsTransaction && driverSession.getSessionId() !== null &&
|
||||
!isCommandNonTxnGetMore(cmdName, cmdObj)) {
|
||||
if (isNested()) {
|
||||
// Nested commands should never start a new transaction.
|
||||
} else if (ops.length === 0) {
|
||||
@ -966,6 +982,11 @@
|
||||
if (configuredForTxnOverride()) {
|
||||
logMsgFull("Override got response",
|
||||
`res: ${tojsononeline(res)}, cmd: ${tojsononeline(cmdObj)}`);
|
||||
|
||||
if (!hasError(res) &&
|
||||
TransactionsUtil.commandIsNonTxnAggregation(cmdName, cmdObj)) {
|
||||
nonTxnAggCursorSet[res.cursor.id] = true;
|
||||
}
|
||||
}
|
||||
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, res);
|
||||
|
||||
@ -101,6 +101,7 @@ var OverrideHelpers = (function() {
|
||||
isAggregationWithListLocalSessionsStage:
|
||||
makeIsAggregationWithFirstStage("$listLocalSessions"),
|
||||
isAggregationWithOutOrMergeStage: isAggregationWithOutOrMergeStage,
|
||||
isAggregationWithChangeStreamStage: makeIsAggregationWithFirstStage("$changeStream"),
|
||||
isMapReduceWithInlineOutput: isMapReduceWithInlineOutput,
|
||||
prependOverrideInParallelShell: prependOverrideInParallelShell,
|
||||
overrideRunCommand: overrideRunCommand,
|
||||
|
||||
@ -2,6 +2,8 @@
|
||||
* Utilities for testing transactions.
|
||||
*/
|
||||
var TransactionsUtil = (function() {
|
||||
load("jstests/libs/override_methods/override_helpers.js");
|
||||
|
||||
const kCmdsSupportingTransactions = new Set([
|
||||
'aggregate',
|
||||
'delete',
|
||||
@ -21,12 +23,21 @@ var TransactionsUtil = (function() {
|
||||
'delete',
|
||||
]);
|
||||
|
||||
// Indicates an aggregation command with a pipeline that cannot run in a transaction but can
|
||||
// still execute concurrently with other transactions. Pipelines with $changeStream or $out
|
||||
// cannot run within a transaction.
|
||||
function commandIsNonTxnAggregation(cmdName, cmdObj) {
|
||||
return OverrideHelpers.isAggregationWithOutOrMergeStage(cmdName, cmdObj) ||
|
||||
OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj);
|
||||
}
|
||||
|
||||
function commandSupportsTxn(dbName, cmdName, cmdObj) {
|
||||
if (cmdName === 'commitTransaction' || cmdName === 'abortTransaction') {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!kCmdsSupportingTransactions.has(cmdName)) {
|
||||
if (!kCmdsSupportingTransactions.has(cmdName) ||
|
||||
commandIsNonTxnAggregation(cmdName, cmdObj)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -89,6 +100,10 @@ var TransactionsUtil = (function() {
|
||||
}
|
||||
|
||||
return {
|
||||
commandSupportsTxn, commandTypeCanSupportTxn, deepCopyObject, isTransientTransactionError,
|
||||
commandIsNonTxnAggregation,
|
||||
commandSupportsTxn,
|
||||
commandTypeCanSupportTxn,
|
||||
deepCopyObject,
|
||||
isTransientTransactionError,
|
||||
};
|
||||
})();
|
||||
|
||||
@ -1348,6 +1348,32 @@
|
||||
testBadDBName(session, 'local');
|
||||
}
|
||||
},
|
||||
{
|
||||
name: "getMore on change stream executes outside transaction",
|
||||
test: function() {
|
||||
assert.commandWorked(testDB.createCollection(collName1));
|
||||
|
||||
// Starting a $changeStream aggregation within a transaction would fail, so the
|
||||
// override has to execute this as a standalone command.
|
||||
const changeStream = testDB.collName1.watch();
|
||||
assert.commandWorked(testDB.collName1.insert({_id: 1}));
|
||||
endCurrentTransactionIfOpen();
|
||||
|
||||
// Calling the `next` function on the change stream cursor will trigger a getmore,
|
||||
// which the override must also run as a standalone command.
|
||||
assert.eq(changeStream.next()["fullDocument"], {_id: 1});
|
||||
|
||||
// An aggregation without $changeStream runs within a transaction.
|
||||
let aggCursor = testDB.collName1.aggregate([], {cursor: {batchSize: 0}});
|
||||
assert.eq(aggCursor.next(), {_id: 1});
|
||||
|
||||
// Creating a non-$changeStream aggregation cursor and running its getMore in a
|
||||
// different transaction will fail.
|
||||
aggCursor = testDB.collName1.aggregate([], {cursor: {batchSize: 0}});
|
||||
endCurrentTransactionIfOpen();
|
||||
assert.throws(() => aggCursor.next());
|
||||
}
|
||||
},
|
||||
];
|
||||
|
||||
// Failpoints, overrides, and post-command functions are set by default to only run once, so
|
||||
|
||||
Loading…
Reference in New Issue
Block a user