PYTHON-2683 Convert change stream spec tests to unified test format (#950)

This commit is contained in:
Steven Silvester 2022-06-01 18:26:52 -05:00 committed by GitHub
parent f4fc742ff3
commit 62a6302181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 6039 additions and 4486 deletions

View File

@ -1,153 +0,0 @@
{
"collection_name": "test",
"database_name": "change-stream-tests",
"collection2_name": "test2",
"database2_name": "change-stream-tests-2",
"tests": [
{
"description": "The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"single"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [],
"expectations": null,
"result": {
"error": {
"code": 40573
}
}
},
{
"description": "Change Stream should error when an invalid aggregation stage is passed in",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [
{
"$unsupported": "foo"
}
],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
},
{
"$unsupported": "foo"
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"error": {
"code": 40324
}
}
},
{
"description": "Change Stream should error when _id is projected out",
"minServerVersion": "4.1.11",
"target": "collection",
"topology": [
"replicaset",
"sharded",
"load-balanced"
],
"changeStreamPipeline": [
{
"$project": {
"_id": 0
}
}
],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"result": {
"error": {
"code": 280
}
}
},
{
"description": "change stream errors on ElectionInProgress",
"minServerVersion": "4.2",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"getMore"
],
"errorCode": 216,
"closeConnection": false
}
},
"target": "collection",
"topology": [
"replicaset",
"sharded",
"load-balanced"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"result": {
"error": {
"code": 216
}
}
}
]
}

File diff suppressed because it is too large Load Diff

View File

@ -1,795 +0,0 @@
{
"collection_name": "test",
"database_name": "change-stream-tests",
"collection2_name": "test2",
"database2_name": "change-stream-tests-2",
"tests": [
{
"description": "$changeStream must be the first stage in a change stream pipeline sent to the server",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"_id": "42",
"documentKey": "42",
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
}
]
}
},
{
"description": "The server returns change stream responses in the specified server response format",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectations": null,
"result": {
"success": [
{
"_id": "42",
"documentKey": "42",
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
}
]
}
},
{
"description": "Executing a watch helper on a Collection results in notifications for changes to the specified collection",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test2",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests-2",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"y": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"z": {
"$numberInt": "3"
}
}
}
]
}
},
{
"description": "Change Stream should allow valid aggregate pipeline stages",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [
{
"$match": {
"fullDocument.z": 3
}
}
],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"y": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
},
{
"$match": {
"fullDocument.z": {
"$numberInt": "3"
}
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"z": {
"$numberInt": "3"
}
}
}
]
}
},
{
"description": "Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.",
"minServerVersion": "3.8.0",
"target": "database",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test2",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests-2",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"y": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": {
"$numberInt": "1"
},
"cursor": {},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test2"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"z": {
"$numberInt": "3"
}
}
}
]
}
},
{
"description": "Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster.",
"minServerVersion": "3.8.0",
"target": "client",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test2",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests-2",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"y": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": {
"$numberInt": "1"
},
"cursor": {},
"pipeline": [
{
"$changeStream": {
"allChangesForCluster": true
}
}
]
},
"command_name": "aggregate",
"database_name": "admin"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test2"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests-2",
"coll": "test"
},
"fullDocument": {
"y": {
"$numberInt": "2"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"z": {
"$numberInt": "3"
}
}
}
]
}
},
{
"description": "Test insert, update, replace, and delete event types",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "updateOne",
"arguments": {
"filter": {
"x": 1
},
"update": {
"$set": {
"x": 2
}
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "replaceOne",
"arguments": {
"filter": {
"x": 2
},
"replacement": {
"x": 3
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "deleteOne",
"arguments": {
"filter": {
"x": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
},
{
"operationType": "update",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"updateDescription": {
"updatedFields": {
"x": {
"$numberInt": "2"
}
}
}
},
{
"operationType": "replace",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "3"
}
}
},
{
"operationType": "delete",
"ns": {
"db": "change-stream-tests",
"coll": "test"
}
}
]
}
},
{
"description": "Test rename and invalidate event types",
"minServerVersion": "4.0.1",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "rename",
"arguments": {
"to": "test2"
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "rename",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"to": {
"db": "change-stream-tests",
"coll": "test2"
}
},
{
"operationType": "invalidate"
}
]
}
},
{
"description": "Test drop and invalidate event types",
"minServerVersion": "4.0.1",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "drop"
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "drop",
"ns": {
"db": "change-stream-tests",
"coll": "test"
}
},
{
"operationType": "invalidate"
}
]
}
},
{
"description": "Test consecutive resume",
"minServerVersion": "4.1.7",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {
"batchSize": 1
},
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
},
"data": {
"failCommands": [
"getMore"
],
"closeConnection": true
}
},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {
"batchSize": 1
},
"pipeline": [
{
"$changeStream": {}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "2"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "3"
}
}
}
]
}
}
]
}

View File

@ -0,0 +1,246 @@
{
"description": "change-streams-errors",
"schemaVersion": "1.7",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent"
],
"ignoreCommandMonitoringEvents": [
"killCursors"
],
"useMultipleMongoses": false
}
},
{
"client": {
"id": "globalClient",
"useMultipleMongoses": false
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "collection0"
}
},
{
"database": {
"id": "globalDatabase0",
"client": "globalClient",
"databaseName": "database0"
}
},
{
"collection": {
"id": "globalCollection0",
"database": "globalDatabase0",
"collectionName": "collection0"
}
}
],
"initialData": [
{
"collectionName": "collection0",
"databaseName": "database0",
"documents": []
}
],
"tests": [
{
"description": "The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error",
"runOnRequirements": [
{
"minServerVersion": "3.6.0",
"topologies": [
"single"
]
}
],
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": []
},
"expectError": {
"errorCode": 40573
}
}
]
},
{
"description": "Change Stream should error when an invalid aggregation stage is passed in",
"runOnRequirements": [
{
"minServerVersion": "3.6.0",
"topologies": [
"replicaset"
]
}
],
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$unsupported": "foo"
}
]
},
"expectError": {
"errorCode": 40324
}
}
],
"expectEvents": [
{
"client": "client0",
"ignoreExtraEvents": true,
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"cursor": {},
"pipeline": [
{
"$changeStream": {}
},
{
"$unsupported": "foo"
}
]
},
"commandName": "aggregate",
"databaseName": "database0"
}
}
]
}
]
},
{
"description": "Change Stream should error when _id is projected out",
"runOnRequirements": [
{
"minServerVersion": "4.1.11",
"topologies": [
"replicaset",
"sharded-replicaset",
"load-balanced"
]
}
],
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$project": {
"_id": 0
}
}
]
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "globalCollection0",
"arguments": {
"document": {
"z": 3
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectError": {
"errorCode": 280
}
}
]
},
{
"description": "change stream errors on ElectionInProgress",
"runOnRequirements": [
{
"minServerVersion": "4.4",
"topologies": [
"replicaset",
"sharded-replicaset",
"load-balanced"
]
}
],
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "globalClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"getMore"
],
"errorCode": 216,
"closeConnection": false
}
}
}
},
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": []
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "globalCollection0",
"arguments": {
"document": {
"z": 3
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectError": {
"errorCode": 216
}
}
]
}
]
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -16,7 +16,6 @@
import os
import random
import re
import string
import sys
import threading
@ -36,7 +35,7 @@ from test.utils import (
wait_until,
)
from bson import SON, ObjectId, Timestamp, encode, json_util
from bson import SON, ObjectId, Timestamp, encode
from bson.binary import ALL_UUID_REPRESENTATIONS, PYTHON_LEGACY, STANDARD, Binary
from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument
from pymongo import MongoClient
@ -1141,131 +1140,6 @@ class TestAllLegacyScenarios(IntegrationTest):
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "change_streams")
def camel_to_snake(camel):
# Regex to convert CamelCase to snake_case.
snake = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", camel)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", snake).lower()
def get_change_stream(client, scenario_def, test):
# Get target namespace on which to instantiate change stream
target = test["target"]
if target == "collection":
db = client.get_database(scenario_def["database_name"])
cs_target = db.get_collection(scenario_def["collection_name"])
elif target == "database":
cs_target = client.get_database(scenario_def["database_name"])
elif target == "client":
cs_target = client
else:
raise ValueError("Invalid target in spec")
# Construct change stream kwargs dict
cs_pipeline = test["changeStreamPipeline"]
options = test["changeStreamOptions"]
cs_options = {}
for key, value in options.items():
cs_options[camel_to_snake(key)] = value
# Create and return change stream
return cs_target.watch(pipeline=cs_pipeline, **cs_options)
def run_operation(client, operation):
# Apply specified operations
opname = camel_to_snake(operation["name"])
arguments = operation.get("arguments", {})
if opname == "rename":
# Special case for rename operation.
arguments = {"new_name": arguments["to"]}
cmd = getattr(
client.get_database(operation["database"]).get_collection(operation["collection"]), opname
)
return cmd(**arguments)
def create_test(scenario_def, test):
def run_scenario(self):
# Set up
self.setUpCluster(scenario_def)
self.setFailPoint(test)
is_error = test["result"].get("error", False)
try:
with get_change_stream(self.client, scenario_def, test) as change_stream:
for operation in test["operations"]:
# Run specified operations
run_operation(self.client, operation)
num_expected_changes = len(test["result"].get("success", []))
changes = [change_stream.next() for _ in range(num_expected_changes)]
# Run a next() to induce an error if one is expected and
# there are no changes.
if is_error and not changes:
change_stream.next()
except OperationFailure as exc:
if not is_error:
raise
expected_code = test["result"]["error"]["code"]
self.assertEqual(exc.code, expected_code)
else:
# Check for expected output from change streams
if test["result"].get("success"):
for change, expected_changes in zip(changes, test["result"]["success"]):
self.assert_dict_is_subset(change, expected_changes)
self.assertEqual(len(changes), len(test["result"]["success"]))
finally:
# Check for expected events
results = self.listener.results
# Note: expectations may be missing, null, or a list of events.
# Extra events emitted by the test are intentionally ignored.
for idx, expectation in enumerate(test.get("expectations") or []):
for event_type, event_desc in expectation.items():
results_key = event_type.split("_")[1]
event = results[results_key][idx] if len(results[results_key]) > idx else None
self.check_event(event, event_desc)
return run_scenario
def create_tests():
for dirpath, _, filenames in os.walk(os.path.join(_TEST_PATH, "legacy")):
dirname = os.path.split(dirpath)[-1]
for filename in filenames:
with open(os.path.join(dirpath, filename)) as scenario_stream:
scenario_def = json_util.loads(scenario_stream.read())
test_type = os.path.splitext(filename)[0]
for test in scenario_def["tests"]:
new_test = create_test(scenario_def, test)
new_test = client_context.require_no_mmap(new_test)
if "minServerVersion" in test:
min_ver = tuple(int(elt) for elt in test["minServerVersion"].split("."))
new_test = client_context.require_version_min(*min_ver)(new_test)
if "maxServerVersion" in test:
max_ver = tuple(int(elt) for elt in test["maxServerVersion"].split("."))
new_test = client_context.require_version_max(*max_ver)(new_test)
topologies = test["topology"]
new_test = client_context.require_cluster_type(topologies)(new_test)
test_name = "test_%s_%s_%s" % (
dirname,
test_type.replace("-", "_"),
str(test["description"].replace(" ", "_")),
)
new_test.__name__ = test_name
setattr(TestAllLegacyScenarios, new_test.__name__, new_test)
create_tests()
globals().update(
generate_test_classes(
os.path.join(_TEST_PATH, "unified"),

View File

@ -65,6 +65,12 @@ class TestMatchEvaluatorUtil(unittest.TestCase):
for actual in [{}, {"x": {}}, {"x": {"y": 2}}]:
self.match_evaluator.match_result(spec, actual)
spec = {"y": {"$$unsetOrMatches": {"$$exists": True}}}
self.match_evaluator.match_result(spec, {})
self.match_evaluator.match_result(spec, {"y": 2})
self.match_evaluator.match_result(spec, {"x": 1})
self.match_evaluator.match_result(spec, {"y": {}})
def test_type(self):
self.match_evaluator.match_result(
{

View File

@ -0,0 +1,151 @@
{
"description": "expectedEventsForClient-ignoreExtraEvents",
"schemaVersion": "1.7",
"createEntities": [
{
"client": {
"id": "client0",
"useMultipleMongoses": true,
"observeEvents": [
"commandStartedEvent"
]
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0Name"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "coll0"
}
}
],
"initialData": [
{
"collectionName": "coll0",
"databaseName": "database0Name",
"documents": []
}
],
"tests": [
{
"description": "ignoreExtraEvents can be set to false",
"operations": [
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1
}
}
}
],
"expectEvents": [
{
"client": "client0",
"ignoreExtraEvents": false,
"events": [
{
"commandStartedEvent": {
"command": {
"insert": "coll0",
"documents": [
{
"_id": 1
}
]
},
"commandName": "insert"
}
}
]
}
]
},
{
"description": "ignoreExtraEvents can be set to true",
"operations": [
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 2
}
}
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 3
}
}
}
],
"expectEvents": [
{
"client": "client0",
"ignoreExtraEvents": true,
"events": [
{
"commandStartedEvent": {
"command": {
"insert": "coll0",
"documents": [
{
"_id": 2
}
]
},
"commandName": "insert"
}
}
]
}
]
},
{
"description": "ignoreExtraEvents defaults to false if unset",
"operations": [
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 4
}
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"insert": "coll0",
"documents": [
{
"_id": 4
}
]
},
"commandName": "insert"
}
}
]
}
]
}
]
}

View File

@ -1,6 +1,11 @@
{
"description": "poc-change-streams",
"schemaVersion": "1.0",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"createEntities": [
{
"client": {
@ -89,6 +94,42 @@
}
],
"tests": [
{
"description": "saveResultAsEntity is optional for createChangeStream",
"runOnRequirements": [
{
"minServerVersion": "3.8.0",
"topologies": [
"replicaset"
]
}
],
"operations": [
{
"name": "createChangeStream",
"object": "client0",
"arguments": {
"pipeline": []
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": 1
},
"commandName": "aggregate",
"databaseName": "admin"
}
}
]
}
]
},
{
"description": "Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster.",
"runOnRequirements": [

View File

@ -470,9 +470,15 @@ class MatchEvaluatorUtil(object):
def _operation_exists(self, spec, actual, key_to_compare):
if spec is True:
self.test.assertIn(key_to_compare, actual)
if key_to_compare is None:
assert actual is not None
else:
self.test.assertIn(key_to_compare, actual)
elif spec is False:
self.test.assertNotIn(key_to_compare, actual)
if key_to_compare is None:
assert actual is None
else:
self.test.assertNotIn(key_to_compare, actual)
else:
self.test.fail("Expected boolean value for $$exists operator, got %s" % (spec,))
@ -704,7 +710,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
a class attribute ``TEST_SPEC``.
"""
SCHEMA_VERSION = Version.from_string("1.5")
SCHEMA_VERSION = Version.from_string("1.7")
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
TEST_SPEC: Any
@ -1181,19 +1187,32 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
events = event_spec["events"]
# Valid types: 'command', 'cmap'
event_type = event_spec.get("eventType", "command")
ignore_extra_events = event_spec.get("ignoreExtraEvents", False)
server_connection_id = event_spec.get("serverConnectionId")
has_server_connection_id = event_spec.get("hasServerConnectionId", False)
assert event_type in ("command", "cmap")
listener = self.entity_map.get_listener_for_client(client_name)
actual_events = listener.get_events(event_type)
if ignore_extra_events:
actual_events = actual_events[: len(events)]
if len(events) == 0:
self.assertEqual(actual_events, [])
continue
self.assertGreaterEqual(len(actual_events), len(events), actual_events)
self.assertEqual(len(actual_events), len(events), actual_events)
for idx, expected_event in enumerate(events):
self.match_evaluator.match_event(event_type, expected_event, actual_events[idx])
if has_server_connection_id:
assert server_connection_id is not None
assert server_connection_id >= 0
else:
assert server_connection_id is None
def verify_outcome(self, spec):
for collection_data in spec:
coll_name = collection_data["collectionName"]