PYTHON-3516 Improve test EventListener api (#1114)

This commit is contained in:
Shane Harvey 2022-11-14 08:50:08 -08:00 committed by GitHub
parent d0568042fa
commit 79aa5e6757
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 460 additions and 486 deletions

View File

@ -392,7 +392,7 @@ class TestSCRAM(IntegrationTest):
if client_context.version < (4, 4, -1):
# Assert we sent the skipEmptyExchange option.
first_event = listener.results["started"][0]
first_event = listener.started_events[0]
self.assertEqual(first_event.command_name, "saslStart")
self.assertEqual(first_event.command["options"], {"skipEmptyExchange": True})
@ -449,7 +449,7 @@ class TestSCRAM(IntegrationTest):
)
client.testscram.command("dbstats")
self.listener.results.clear()
self.listener.reset()
client = rs_or_single_client_noauth(
username="both", password="pwd", authSource="testscram", event_listeners=[self.listener]
)
@ -457,9 +457,9 @@ class TestSCRAM(IntegrationTest):
if client_context.version.at_least(4, 4, -1):
# Speculative authentication in 4.4+ sends saslStart with the
# handshake.
self.assertEqual(self.listener.results["started"], [])
self.assertEqual(self.listener.started_events, [])
else:
started = self.listener.results["started"][0]
started = self.listener.started_events[0]
self.assertEqual(started.command.get("mechanism"), "SCRAM-SHA-256")
# Step 3: verify auth failure conditions

View File

@ -167,7 +167,7 @@ class APITestsMixin(object):
client = rs_or_single_client(event_listeners=[listener])
# Connect to the cluster.
client.admin.command("ping")
listener.results.clear()
listener.reset()
# ChangeStreams only read majority committed data so use w:majority.
coll = self.watched_collection().with_options(write_concern=WriteConcern("majority"))
coll.drop()
@ -177,25 +177,25 @@ class APITestsMixin(object):
self.addCleanup(coll.drop)
with self.change_stream_with_client(client, max_await_time_ms=250) as stream:
self.assertEqual(listener.started_command_names(), ["aggregate"])
listener.results.clear()
listener.reset()
# Confirm that only a single getMore is run even when no documents
# are returned.
self.assertIsNone(stream.try_next())
self.assertEqual(listener.started_command_names(), ["getMore"])
listener.results.clear()
listener.reset()
self.assertIsNone(stream.try_next())
self.assertEqual(listener.started_command_names(), ["getMore"])
listener.results.clear()
listener.reset()
# Get at least one change before resuming.
coll.insert_one({"_id": 2})
wait_until(lambda: stream.try_next() is not None, "get change from try_next")
listener.results.clear()
listener.reset()
# Cause the next request to initiate the resume process.
self.kill_change_stream_cursor(stream)
listener.results.clear()
listener.reset()
# The sequence should be:
# - getMore, fail
@ -203,7 +203,7 @@ class APITestsMixin(object):
# - no results, return immediately without another getMore
self.assertIsNone(stream.try_next())
self.assertEqual(listener.started_command_names(), ["getMore", "aggregate"])
listener.results.clear()
listener.reset()
# Stream still works after a resume.
coll.insert_one({"_id": 3})
@ -217,7 +217,7 @@ class APITestsMixin(object):
client = rs_or_single_client(event_listeners=[listener])
# Connect to the cluster.
client.admin.command("ping")
listener.results.clear()
listener.reset()
# ChangeStreams only read majority committed data so use w:majority.
coll = self.watched_collection().with_options(write_concern=WriteConcern("majority"))
coll.drop()
@ -229,12 +229,12 @@ class APITestsMixin(object):
expected = {"batchSize": 23}
with self.change_stream_with_client(client, max_await_time_ms=250, batch_size=23) as stream:
# Confirm that batchSize is honored for initial batch.
cmd = listener.results["started"][0].command
cmd = listener.started_events[0].command
self.assertEqual(cmd["cursor"], expected)
listener.results.clear()
listener.reset()
# Confirm that batchSize is honored by getMores.
self.assertIsNone(stream.try_next())
cmd = listener.results["started"][0].command
cmd = listener.started_events[0].command
key = next(iter(expected))
self.assertEqual(expected[key], cmd[key])
@ -255,12 +255,11 @@ class APITestsMixin(object):
@no_type_check
def _test_full_pipeline(self, expected_cs_stage):
client, listener = self.client_with_listener("aggregate")
results = listener.results
with self.change_stream_with_client(client, [{"$project": {"foo": 0}}]) as _:
pass
self.assertEqual(1, len(results["started"]))
command = results["started"][0]
self.assertEqual(1, len(listener.started_events))
command = listener.started_events[0]
self.assertEqual("aggregate", command.command_name)
self.assertEqual(
[{"$changeStream": expected_cs_stage}, {"$project": {"foo": 0}}],
@ -464,7 +463,7 @@ class ProseSpecTestsMixin(object):
versions that don't support postBatchResumeToken. Assumes the stream
has never returned any changes if previous_change is None."""
if previous_change is None:
agg_cmd = listener.results["started"][0]
agg_cmd = listener.started_events[0]
stage = agg_cmd.command["pipeline"][0]["$changeStream"]
return stage.get("resumeAfter") or stage.get("startAfter")
@ -481,7 +480,7 @@ class ProseSpecTestsMixin(object):
if token is not None:
return token
response = listener.results["succeeded"][-1].reply
response = listener.succeeded_events[-1].reply
return response["cursor"]["postBatchResumeToken"]
@no_type_check
@ -558,8 +557,8 @@ class ProseSpecTestsMixin(object):
pass
# Driver should have attempted aggregate command only once.
self.assertEqual(len(listener.results["started"]), 1)
self.assertEqual(listener.results["started"][0].command_name, "aggregate")
self.assertEqual(len(listener.started_events), 1)
self.assertEqual(listener.started_events[0].command_name, "aggregate")
# Prose test no. 5 - REMOVED
# Prose test no. 6 - SKIPPED
@ -603,20 +602,20 @@ class ProseSpecTestsMixin(object):
with self.change_stream_with_client(client) as cs:
self.kill_change_stream_cursor(cs)
cs.try_next()
cmd = listener.results["started"][-1].command
cmd = listener.started_events[-1].command
self.assertIsNotNone(cmd["pipeline"][0]["$changeStream"].get("startAtOperationTime"))
# Case 2: change stream started with startAtOperationTime
listener.results.clear()
listener.reset()
optime = self.get_start_at_operation_time()
with self.change_stream_with_client(client, start_at_operation_time=optime) as cs:
self.kill_change_stream_cursor(cs)
cs.try_next()
cmd = listener.results["started"][-1].command
cmd = listener.started_events[-1].command
self.assertEqual(
cmd["pipeline"][0]["$changeStream"].get("startAtOperationTime"),
optime,
str([k.command for k in listener.results["started"]]),
str([k.command for k in listener.started_events]),
)
# Prose test no. 10 - SKIPPED
@ -631,7 +630,7 @@ class ProseSpecTestsMixin(object):
self.assertIsNone(change_stream.try_next())
resume_token = change_stream.resume_token
response = listener.results["succeeded"][0].reply
response = listener.succeeded_events[0].reply
self.assertEqual(resume_token, response["cursor"]["postBatchResumeToken"])
# Prose test no. 11
@ -643,7 +642,7 @@ class ProseSpecTestsMixin(object):
self._populate_and_exhaust_change_stream(change_stream)
resume_token = change_stream.resume_token
response = listener.results["succeeded"][-1].reply
response = listener.succeeded_events[-1].reply
self.assertEqual(resume_token, response["cursor"]["postBatchResumeToken"])
# Prose test no. 12
@ -737,7 +736,7 @@ class ProseSpecTestsMixin(object):
self.kill_change_stream_cursor(change_stream)
change_stream.try_next() # Resume attempt
response = listener.results["started"][-1]
response = listener.started_events[-1]
self.assertIsNone(response.command["pipeline"][0]["$changeStream"].get("resumeAfter"))
self.assertIsNotNone(response.command["pipeline"][0]["$changeStream"].get("startAfter"))
@ -756,7 +755,7 @@ class ProseSpecTestsMixin(object):
self.kill_change_stream_cursor(change_stream)
change_stream.try_next() # Resume attempt
response = listener.results["started"][-1]
response = listener.started_events[-1]
self.assertIsNotNone(response.command["pipeline"][0]["$changeStream"].get("resumeAfter"))
self.assertIsNone(response.command["pipeline"][0]["$changeStream"].get("startAfter"))
@ -1056,7 +1055,7 @@ class TestAllLegacyScenarios(IntegrationTest):
def setUp(self):
super(TestAllLegacyScenarios, self).setUp()
self.listener.results.clear()
self.listener.reset()
def setUpCluster(self, scenario_dict):
assets = [
@ -1128,7 +1127,7 @@ class TestAllLegacyScenarios(IntegrationTest):
self.assertEqual(getattr(event, key), value)
def tearDown(self):
self.listener.results.clear()
self.listener.reset()
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "change_streams")

View File

@ -113,11 +113,11 @@ class TestCollation(IntegrationTest):
super(TestCollation, cls).tearDownClass()
def tearDown(self):
self.listener.results.clear()
self.listener.reset()
super(TestCollation, self).tearDown()
def last_command_started(self):
return self.listener.results["started"][-1].command
return self.listener.started_events[-1].command
def assertCollationInLastCommand(self):
self.assertEqual(self.collation.document, self.last_command_started()["collation"])
@ -129,7 +129,7 @@ class TestCollation(IntegrationTest):
# Test passing collation as a dict as well.
self.db.test.drop()
self.listener.results.clear()
self.listener.reset()
self.db.create_collection("test", collation=self.collation.document)
self.assertCollationInLastCommand()
@ -139,7 +139,7 @@ class TestCollation(IntegrationTest):
def test_create_index(self):
self.db.test.create_index("foo", collation=self.collation)
ci_cmd = self.listener.results["started"][0].command
ci_cmd = self.listener.started_events[0].command
self.assertEqual(self.collation.document, ci_cmd["indexes"][0]["collation"])
def test_aggregate(self):
@ -154,18 +154,18 @@ class TestCollation(IntegrationTest):
self.db.test.distinct("foo", collation=self.collation)
self.assertCollationInLastCommand()
self.listener.results.clear()
self.listener.reset()
self.db.test.find(collation=self.collation).distinct("foo")
self.assertCollationInLastCommand()
def test_find_command(self):
self.db.test.insert_one({"is this thing on?": True})
self.listener.results.clear()
self.listener.reset()
next(self.db.test.find(collation=self.collation))
self.assertCollationInLastCommand()
def test_explain_command(self):
self.listener.results.clear()
self.listener.reset()
self.db.test.find(collation=self.collation).explain()
# The collation should be part of the explained command.
self.assertEqual(
@ -174,40 +174,40 @@ class TestCollation(IntegrationTest):
def test_delete(self):
self.db.test.delete_one({"foo": 42}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["deletes"][0]["collation"])
self.listener.results.clear()
self.listener.reset()
self.db.test.delete_many({"foo": 42}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["deletes"][0]["collation"])
def test_update(self):
self.db.test.replace_one({"foo": 42}, {"foo": 43}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["updates"][0]["collation"])
self.listener.results.clear()
self.listener.reset()
self.db.test.update_one({"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["updates"][0]["collation"])
self.listener.results.clear()
self.listener.reset()
self.db.test.update_many({"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["updates"][0]["collation"])
def test_find_and(self):
self.db.test.find_one_and_delete({"foo": 42}, collation=self.collation)
self.assertCollationInLastCommand()
self.listener.results.clear()
self.listener.reset()
self.db.test.find_one_and_update(
{"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation
)
self.assertCollationInLastCommand()
self.listener.results.clear()
self.listener.reset()
self.db.test.find_one_and_replace({"foo": 42}, {"foo": 43}, collation=self.collation)
self.assertCollationInLastCommand()
@ -229,8 +229,8 @@ class TestCollation(IntegrationTest):
]
)
delete_cmd = self.listener.results["started"][0].command
update_cmd = self.listener.results["started"][1].command
delete_cmd = self.listener.started_events[0].command
update_cmd = self.listener.started_events[1].command
def check_ops(ops):
for op in ops:

View File

@ -1986,21 +1986,20 @@ class TestCollection(IntegrationTest):
c_w0 = db.get_collection("test", write_concern=WriteConcern(w=0))
# default WriteConcern.
c_default = db.get_collection("test", write_concern=WriteConcern())
results = listener.results
# Authenticate the client and throw out auth commands from the listener.
db.command("ping")
results.clear()
listener.reset()
c_w0.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}})
self.assertEqual({"w": 0}, results["started"][0].command["writeConcern"])
results.clear()
self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"])
listener.reset()
c_w0.find_one_and_replace({"_id": 1}, {"foo": "bar"})
self.assertEqual({"w": 0}, results["started"][0].command["writeConcern"])
results.clear()
self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"])
listener.reset()
c_w0.find_one_and_delete({"_id": 1})
self.assertEqual({"w": 0}, results["started"][0].command["writeConcern"])
results.clear()
self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"])
listener.reset()
# Test write concern errors.
if client_context.is_rs:
@ -2017,27 +2016,27 @@ class TestCollection(IntegrationTest):
WriteConcernError,
c_wc_error.find_one_and_replace,
{"w": 0},
results["started"][0].command["writeConcern"],
listener.started_events[0].command["writeConcern"],
)
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_delete,
{"w": 0},
results["started"][0].command["writeConcern"],
listener.started_events[0].command["writeConcern"],
)
results.clear()
listener.reset()
c_default.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}})
self.assertNotIn("writeConcern", results["started"][0].command)
results.clear()
self.assertNotIn("writeConcern", listener.started_events[0].command)
listener.reset()
c_default.find_one_and_replace({"_id": 1}, {"foo": "bar"})
self.assertNotIn("writeConcern", results["started"][0].command)
results.clear()
self.assertNotIn("writeConcern", listener.started_events[0].command)
listener.reset()
c_default.find_one_and_delete({"_id": 1})
self.assertNotIn("writeConcern", results["started"][0].command)
results.clear()
self.assertNotIn("writeConcern", listener.started_events[0].command)
listener.reset()
def test_find_with_nested(self):
c = self.db.test

View File

@ -54,17 +54,7 @@ class TestAllScenarios(unittest.TestCase):
cls.client.close()
def tearDown(self):
self.listener.results.clear()
def format_actual_results(results):
started = results["started"]
succeeded = results["succeeded"]
failed = results["failed"]
msg = "\nStarted: %r" % (started[0].command if len(started) else None,)
msg += "\nSucceeded: %r" % (succeeded[0].reply if len(succeeded) else None,)
msg += "\nFailed: %r" % (failed[0].failure if len(failed) else None,)
return msg
self.listener.reset()
def create_test(scenario_def, test):
@ -75,7 +65,7 @@ def create_test(scenario_def, test):
coll = self.client[dbname][collname]
coll.drop()
coll.insert_many(scenario_def["data"])
self.listener.results.clear()
self.listener.reset()
name = camel_to_snake(test["operation"]["name"])
if "read_preference" in test["operation"]:
coll = coll.with_options(
@ -127,11 +117,13 @@ def create_test(scenario_def, test):
except OperationFailure:
pass
res = self.listener.results
started_events = self.listener.started_events
succeeded_events = self.listener.succeeded_events
failed_events = self.listener.failed_events
for expectation in test["expectations"]:
event_type = next(iter(expectation))
if event_type == "command_started_event":
event = res["started"][0] if len(res["started"]) else None
event = started_events[0] if len(started_events) else None
if event is not None:
# The tests substitute 42 for any number other than 0.
if event.command_name == "getMore" and event.command["getMore"]:
@ -147,7 +139,7 @@ def create_test(scenario_def, test):
update.setdefault("upsert", False)
update.setdefault("multi", False)
elif event_type == "command_succeeded_event":
event = res["succeeded"].pop(0) if len(res["succeeded"]) else None
event = succeeded_events.pop(0) if len(succeeded_events) else None
if event is not None:
reply = event.reply
# The tests substitute 42 for any number other than 0,
@ -171,12 +163,12 @@ def create_test(scenario_def, test):
reply.pop("cursorsKilled")
reply["cursorsUnknown"] = [42]
# Found succeeded event. Pop related started event.
res["started"].pop(0)
started_events.pop(0)
elif event_type == "command_failed_event":
event = res["failed"].pop(0) if len(res["failed"]) else None
event = failed_events.pop(0) if len(failed_events) else None
if event is not None:
# Found failed event. Pop related started event.
res["started"].pop(0)
started_events.pop(0)
else:
self.fail("Unknown event type")
@ -184,11 +176,11 @@ def create_test(scenario_def, test):
event_name = event_type.split("_")[1]
self.fail(
"Expected %s event for %s command. Actual "
"results:%s"
"results:\n%s"
% (
event_name,
expectation[event_type]["command_name"],
format_actual_results(res),
"\n".join(str(e) for e in self.listener.events),
)
)

View File

@ -43,12 +43,11 @@ class TestComment(IntegrationTest):
def _test_ops(
self, helpers, already_supported, listener, db=Empty(), coll=Empty() # noqa: B008
):
results = listener.results
for h, args in helpers:
c = "testing comment with " + h.__name__
with self.subTest("collection-" + h.__name__ + "-comment"):
for cc in [c, {"key": c}, ["any", 1]]:
results.clear()
listener.reset()
kwargs = {"comment": cc}
if h == coll.rename:
_ = db.get_collection("temp_temp_temp").drop()
@ -77,7 +76,7 @@ class TestComment(IntegrationTest):
tested = False
# For some reason collection.list_indexes creates two commands and the first
# one doesn't contain 'comment'.
for i in results["started"]:
for i in listener.started_events:
if cc == i.command.get("comment", ""):
self.assertEqual(cc, i.command["comment"])
tested = True
@ -98,7 +97,7 @@ class TestComment(IntegrationTest):
h.__doc__,
)
results.clear()
listener.reset()
@client_context.require_version_min(4, 7, -1)
@client_context.require_replica_set

View File

@ -218,79 +218,78 @@ class TestCursor(IntegrationTest):
listener = AllowListEventListener("find", "getMore")
coll = rs_or_single_client(event_listeners=[listener])[self.db.name].pymongo_test
results = listener.results
# Tailable_await defaults.
list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT))
# find
self.assertFalse("maxTimeMS" in results["started"][0].command)
self.assertFalse("maxTimeMS" in listener.started_events[0].command)
# getMore
self.assertFalse("maxTimeMS" in results["started"][1].command)
results.clear()
self.assertFalse("maxTimeMS" in listener.started_events[1].command)
listener.reset()
# Tailable_await with max_await_time_ms set.
list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_await_time_ms(99))
# find
self.assertEqual("find", results["started"][0].command_name)
self.assertFalse("maxTimeMS" in results["started"][0].command)
self.assertEqual("find", listener.started_events[0].command_name)
self.assertFalse("maxTimeMS" in listener.started_events[0].command)
# getMore
self.assertEqual("getMore", results["started"][1].command_name)
self.assertTrue("maxTimeMS" in results["started"][1].command)
self.assertEqual(99, results["started"][1].command["maxTimeMS"])
results.clear()
self.assertEqual("getMore", listener.started_events[1].command_name)
self.assertTrue("maxTimeMS" in listener.started_events[1].command)
self.assertEqual(99, listener.started_events[1].command["maxTimeMS"])
listener.reset()
# Tailable_await with max_time_ms
list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99))
# find
self.assertEqual("find", results["started"][0].command_name)
self.assertTrue("maxTimeMS" in results["started"][0].command)
self.assertEqual(99, results["started"][0].command["maxTimeMS"])
self.assertEqual("find", listener.started_events[0].command_name)
self.assertTrue("maxTimeMS" in listener.started_events[0].command)
self.assertEqual(99, listener.started_events[0].command["maxTimeMS"])
# getMore
self.assertEqual("getMore", results["started"][1].command_name)
self.assertFalse("maxTimeMS" in results["started"][1].command)
results.clear()
self.assertEqual("getMore", listener.started_events[1].command_name)
self.assertFalse("maxTimeMS" in listener.started_events[1].command)
listener.reset()
# Tailable_await with both max_time_ms and max_await_time_ms
list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99).max_await_time_ms(99))
# find
self.assertEqual("find", results["started"][0].command_name)
self.assertTrue("maxTimeMS" in results["started"][0].command)
self.assertEqual(99, results["started"][0].command["maxTimeMS"])
self.assertEqual("find", listener.started_events[0].command_name)
self.assertTrue("maxTimeMS" in listener.started_events[0].command)
self.assertEqual(99, listener.started_events[0].command["maxTimeMS"])
# getMore
self.assertEqual("getMore", results["started"][1].command_name)
self.assertTrue("maxTimeMS" in results["started"][1].command)
self.assertEqual(99, results["started"][1].command["maxTimeMS"])
results.clear()
self.assertEqual("getMore", listener.started_events[1].command_name)
self.assertTrue("maxTimeMS" in listener.started_events[1].command)
self.assertEqual(99, listener.started_events[1].command["maxTimeMS"])
listener.reset()
# Non tailable_await with max_await_time_ms
list(coll.find(batch_size=1).max_await_time_ms(99))
# find
self.assertEqual("find", results["started"][0].command_name)
self.assertFalse("maxTimeMS" in results["started"][0].command)
self.assertEqual("find", listener.started_events[0].command_name)
self.assertFalse("maxTimeMS" in listener.started_events[0].command)
# getMore
self.assertEqual("getMore", results["started"][1].command_name)
self.assertFalse("maxTimeMS" in results["started"][1].command)
results.clear()
self.assertEqual("getMore", listener.started_events[1].command_name)
self.assertFalse("maxTimeMS" in listener.started_events[1].command)
listener.reset()
# Non tailable_await with max_time_ms
list(coll.find(batch_size=1).max_time_ms(99))
# find
self.assertEqual("find", results["started"][0].command_name)
self.assertTrue("maxTimeMS" in results["started"][0].command)
self.assertEqual(99, results["started"][0].command["maxTimeMS"])
self.assertEqual("find", listener.started_events[0].command_name)
self.assertTrue("maxTimeMS" in listener.started_events[0].command)
self.assertEqual(99, listener.started_events[0].command["maxTimeMS"])
# getMore
self.assertEqual("getMore", results["started"][1].command_name)
self.assertFalse("maxTimeMS" in results["started"][1].command)
self.assertEqual("getMore", listener.started_events[1].command_name)
self.assertFalse("maxTimeMS" in listener.started_events[1].command)
# Non tailable_await with both max_time_ms and max_await_time_ms
list(coll.find(batch_size=1).max_time_ms(99).max_await_time_ms(88))
# find
self.assertEqual("find", results["started"][0].command_name)
self.assertTrue("maxTimeMS" in results["started"][0].command)
self.assertEqual(99, results["started"][0].command["maxTimeMS"])
self.assertEqual("find", listener.started_events[0].command_name)
self.assertTrue("maxTimeMS" in listener.started_events[0].command)
self.assertEqual(99, listener.started_events[0].command["maxTimeMS"])
# getMore
self.assertEqual("getMore", results["started"][1].command_name)
self.assertFalse("maxTimeMS" in results["started"][1].command)
self.assertEqual("getMore", listener.started_events[1].command_name)
self.assertFalse("maxTimeMS" in listener.started_events[1].command)
@client_context.require_test_commands
@client_context.require_no_mongos
@ -329,7 +328,7 @@ class TestCursor(IntegrationTest):
self.addCleanup(client.close)
coll = client.pymongo_test.test.with_options(read_concern=ReadConcern(level="local"))
self.assertTrue(coll.find().explain())
started = listener.results["started"]
started = listener.started_events
self.assertEqual(len(started), 1)
self.assertNotIn("readConcern", started[0].command)
@ -1169,7 +1168,6 @@ class TestCursor(IntegrationTest):
self.client._process_periodic_tasks()
listener = AllowListEventListener("killCursors")
results = listener.results
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
coll = client[self.db.name].test_close_kills_cursors
@ -1178,7 +1176,7 @@ class TestCursor(IntegrationTest):
docs_inserted = 1000
coll.insert_many([{"i": i} for i in range(docs_inserted)])
results.clear()
listener.reset()
# Close a cursor while it's still open on the server.
cursor = coll.find().batch_size(10)
@ -1187,13 +1185,13 @@ class TestCursor(IntegrationTest):
cursor.close()
def assertCursorKilled():
self.assertEqual(1, len(results["started"]))
self.assertEqual("killCursors", results["started"][0].command_name)
self.assertEqual(1, len(results["succeeded"]))
self.assertEqual("killCursors", results["succeeded"][0].command_name)
self.assertEqual(1, len(listener.started_events))
self.assertEqual("killCursors", listener.started_events[0].command_name)
self.assertEqual(1, len(listener.succeeded_events))
self.assertEqual("killCursors", listener.succeeded_events[0].command_name)
assertCursorKilled()
results.clear()
listener.reset()
# Close a command cursor while it's still open on the server.
cursor = coll.aggregate([], batchSize=10)
@ -1204,7 +1202,7 @@ class TestCursor(IntegrationTest):
if cursor.cursor_id:
assertCursorKilled()
else:
self.assertEqual(0, len(results["started"]))
self.assertEqual(0, len(listener.started_events))
def test_delete_not_initialized(self):
# Creating a cursor with invalid arguments will not run __init__
@ -1226,7 +1224,7 @@ class TestCursor(IntegrationTest):
self.addCleanup(coll.drop)
list(coll.find(batch_size=3))
started = listener.results["started"]
started = listener.started_events
self.assertEqual(2, len(started))
self.assertEqual("find", started[0].command_name)
if client_context.is_rs or client_context.is_mongos:
@ -1261,13 +1259,13 @@ class TestRawBatchCursor(IntegrationTest):
batches = list(
client[self.db.name].test.find_raw_batches(session=session).sort("_id")
)
cmd = listener.results["started"][0]
cmd = listener.started_events[0]
self.assertEqual(cmd.command_name, "find")
self.assertIn("$clusterTime", cmd.command)
self.assertEqual(cmd.command["startTransaction"], True)
self.assertEqual(cmd.command["txnNumber"], 1)
# Ensure we update $clusterTime from the command response.
last_cmd = listener.results["succeeded"][-1]
last_cmd = listener.succeeded_events[-1]
self.assertEqual(
last_cmd.reply["$clusterTime"]["clusterTime"],
session.cluster_time["clusterTime"],
@ -1293,8 +1291,8 @@ class TestRawBatchCursor(IntegrationTest):
self.assertEqual(1, len(batches))
self.assertEqual(docs, decode_all(batches[0]))
self.assertEqual(len(listener.results["started"]), 2)
for cmd in listener.results["started"]:
self.assertEqual(len(listener.started_events), 2)
for cmd in listener.started_events:
self.assertEqual(cmd.command_name, "find")
@client_context.require_version_min(5, 0, 0)
@ -1314,7 +1312,7 @@ class TestRawBatchCursor(IntegrationTest):
self.assertEqual(1, len(batches))
self.assertEqual(docs, decode_all(batches[0]))
find_cmd = listener.results["started"][1].command
find_cmd = listener.started_events[1].command
self.assertEqual(find_cmd["readConcern"]["level"], "snapshot")
self.assertIsNotNone(find_cmd["readConcern"]["atClusterTime"])
@ -1372,15 +1370,15 @@ class TestRawBatchCursor(IntegrationTest):
c.drop()
c.insert_many([{"_id": i} for i in range(10)])
listener.results.clear()
listener.reset()
cursor = c.find_raw_batches(batch_size=4)
# First raw batch of 4 documents.
next(cursor)
started = listener.results["started"][0]
succeeded = listener.results["succeeded"][0]
self.assertEqual(0, len(listener.results["failed"]))
started = listener.started_events[0]
succeeded = listener.succeeded_events[0]
self.assertEqual(0, len(listener.failed_events))
self.assertEqual("find", started.command_name)
self.assertEqual("pymongo_test", started.database_name)
self.assertEqual("find", succeeded.command_name)
@ -1391,15 +1389,14 @@ class TestRawBatchCursor(IntegrationTest):
self.assertEqual(len(csr["firstBatch"]), 1)
self.assertEqual(decode_all(csr["firstBatch"][0]), [{"_id": i} for i in range(0, 4)])
listener.results.clear()
listener.reset()
# Next raw batch of 4 documents.
next(cursor)
try:
results = listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = listener.started_events[0]
succeeded = listener.succeeded_events[0]
self.assertEqual(0, len(listener.failed_events))
self.assertEqual("getMore", started.command_name)
self.assertEqual("pymongo_test", started.database_name)
self.assertEqual("getMore", succeeded.command_name)
@ -1442,13 +1439,13 @@ class TestRawBatchCommandCursor(IntegrationTest):
[{"$sort": {"_id": 1}}], session=session
)
)
cmd = listener.results["started"][0]
cmd = listener.started_events[0]
self.assertEqual(cmd.command_name, "aggregate")
self.assertIn("$clusterTime", cmd.command)
self.assertEqual(cmd.command["startTransaction"], True)
self.assertEqual(cmd.command["txnNumber"], 1)
# Ensure we update $clusterTime from the command response.
last_cmd = listener.results["succeeded"][-1]
last_cmd = listener.succeeded_events[-1]
self.assertEqual(
last_cmd.reply["$clusterTime"]["clusterTime"],
session.cluster_time["clusterTime"],
@ -1473,8 +1470,8 @@ class TestRawBatchCommandCursor(IntegrationTest):
self.assertEqual(1, len(batches))
self.assertEqual(docs, decode_all(batches[0]))
self.assertEqual(len(listener.results["started"]), 3)
cmds = listener.results["started"]
self.assertEqual(len(listener.started_events), 3)
cmds = listener.started_events
self.assertEqual(cmds[0].command_name, "aggregate")
self.assertEqual(cmds[1].command_name, "aggregate")
@ -1495,7 +1492,7 @@ class TestRawBatchCommandCursor(IntegrationTest):
self.assertEqual(1, len(batches))
self.assertEqual(docs, decode_all(batches[0]))
find_cmd = listener.results["started"][1].command
find_cmd = listener.started_events[1].command
self.assertEqual(find_cmd["readConcern"]["level"], "snapshot")
self.assertIsNotNone(find_cmd["readConcern"]["atClusterTime"])
@ -1536,13 +1533,13 @@ class TestRawBatchCommandCursor(IntegrationTest):
c.drop()
c.insert_many([{"_id": i} for i in range(10)])
listener.results.clear()
listener.reset()
cursor = c.aggregate_raw_batches([{"$sort": {"_id": 1}}], batchSize=4)
# Start cursor, no initial batch.
started = listener.results["started"][0]
succeeded = listener.results["succeeded"][0]
self.assertEqual(0, len(listener.results["failed"]))
started = listener.started_events[0]
succeeded = listener.succeeded_events[0]
self.assertEqual(0, len(listener.failed_events))
self.assertEqual("aggregate", started.command_name)
self.assertEqual("pymongo_test", started.database_name)
self.assertEqual("aggregate", succeeded.command_name)
@ -1551,15 +1548,14 @@ class TestRawBatchCommandCursor(IntegrationTest):
# First batch is empty.
self.assertEqual(len(csr["firstBatch"]), 0)
listener.results.clear()
listener.reset()
# Batches of 4 documents.
n = 0
for batch in cursor:
results = listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = listener.started_events[0]
succeeded = listener.succeeded_events[0]
self.assertEqual(0, len(listener.failed_events))
self.assertEqual("getMore", started.command_name)
self.assertEqual("pymongo_test", started.database_name)
self.assertEqual("getMore", succeeded.command_name)
@ -1570,7 +1566,7 @@ class TestRawBatchCommandCursor(IntegrationTest):
self.assertEqual(decode_all(batch), [{"_id": i} for i in range(n, min(n + 4, 10))])
n += 4
listener.results.clear()
listener.reset()
if __name__ == "__main__":

View File

@ -62,16 +62,16 @@ class TestDataLakeProse(IntegrationTest):
next(cursor)
# find command assertions
find_cmd = listener.results["succeeded"][-1]
find_cmd = listener.succeeded_events[-1]
self.assertEqual(find_cmd.command_name, "find")
cursor_id = find_cmd.reply["cursor"]["id"]
cursor_ns = find_cmd.reply["cursor"]["ns"]
# killCursors command assertions
cursor.close()
started = listener.results["started"][-1]
started = listener.started_events[-1]
self.assertEqual(started.command_name, "killCursors")
succeeded = listener.results["succeeded"][-1]
succeeded = listener.succeeded_events[-1]
self.assertEqual(succeeded.command_name, "killCursors")
self.assertIn(cursor_id, started.command["cursors"])

View File

@ -193,7 +193,6 @@ class TestDatabase(IntegrationTest):
def test_list_collection_names_filter(self):
listener = OvertCommandListener()
results = listener.results
client = rs_or_single_client(event_listeners=[listener])
db = client[self.db.name]
db.capped.drop()
@ -204,24 +203,23 @@ class TestDatabase(IntegrationTest):
filter: Union[None, dict]
# Should not send nameOnly.
for filter in ({"options.capped": True}, {"options.capped": True, "name": "capped"}):
results.clear()
listener.reset()
names = db.list_collection_names(filter=filter)
self.assertEqual(names, ["capped"])
self.assertNotIn("nameOnly", results["started"][0].command)
self.assertNotIn("nameOnly", listener.started_events[0].command)
# Should send nameOnly (except on 2.6).
for filter in (None, {}, {"name": {"$in": ["capped", "non_capped"]}}):
results.clear()
listener.reset()
names = db.list_collection_names(filter=filter)
self.assertIn("capped", names)
self.assertIn("non_capped", names)
command = results["started"][0].command
command = listener.started_events[0].command
self.assertIn("nameOnly", command)
self.assertTrue(command["nameOnly"])
def test_check_exists(self):
listener = OvertCommandListener()
results = listener.results
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
db = client[self.db.name]
@ -231,7 +229,7 @@ class TestDatabase(IntegrationTest):
listener.reset()
db.drop_collection("unique")
db.create_collection("unique", check_exists=False)
self.assertTrue(len(results["started"]) > 0)
self.assertTrue(len(listener.started_events) > 0)
self.assertNotIn("listCollections", listener.started_command_names())
def test_list_collections(self):

View File

@ -814,7 +814,7 @@ class TestDataKeyDoubleEncryption(EncryptionIntegrationTest):
provider_name, master_key=master_key, key_alt_names=["%s_altname" % (provider_name,)]
)
self.assertBinaryUUID(datakey_id)
cmd = self.listener.results["started"][-1]
cmd = self.listener.started_events[-1]
self.assertEqual("insert", cmd.command_name)
self.assertEqual({"w": "majority"}, cmd.command.get("writeConcern"))
docs = list(self.vault.find({"_id": datakey_id}))
@ -1489,7 +1489,7 @@ class AzureGCPEncryptionTestMixin(object):
expected_document = json_util.loads(expectation_extjson, json_options=JSON_OPTS)
coll.insert_one(payload)
event = insert_listener.results["started"][0]
event = insert_listener.started_events[0]
inserted_doc = event.command["documents"][0]
for key, value in expected_document.items():
@ -1622,7 +1622,7 @@ class TestDeadlockProse(EncryptionIntegrationTest):
),
)
cev = self.client_listener.results["started"]
cev = self.client_listener.started_events
self.assertEqual(len(cev), 4)
self.assertEqual(cev[0].command_name, "listCollections")
self.assertEqual(cev[0].database_name, "db")
@ -1643,7 +1643,7 @@ class TestDeadlockProse(EncryptionIntegrationTest):
),
)
cev = self.client_listener.results["started"]
cev = self.client_listener.started_events
self.assertEqual(len(cev), 3)
self.assertEqual(cev[0].command_name, "listCollections")
self.assertEqual(cev[0].database_name, "db")
@ -1652,7 +1652,7 @@ class TestDeadlockProse(EncryptionIntegrationTest):
self.assertEqual(cev[2].command_name, "find")
self.assertEqual(cev[2].database_name, "db")
cev = self.client_keyvault_listener.results["started"]
cev = self.client_keyvault_listener.started_events
self.assertEqual(len(cev), 1)
self.assertEqual(cev[0].command_name, "find")
self.assertEqual(cev[0].database_name, "keyvault")
@ -1667,7 +1667,7 @@ class TestDeadlockProse(EncryptionIntegrationTest):
),
)
cev = self.client_listener.results["started"]
cev = self.client_listener.started_events
self.assertEqual(len(cev), 2)
self.assertEqual(cev[0].command_name, "find")
self.assertEqual(cev[0].database_name, "db")
@ -1684,12 +1684,12 @@ class TestDeadlockProse(EncryptionIntegrationTest):
),
)
cev = self.client_listener.results["started"]
cev = self.client_listener.started_events
self.assertEqual(len(cev), 1)
self.assertEqual(cev[0].command_name, "find")
self.assertEqual(cev[0].database_name, "db")
cev = self.client_keyvault_listener.results["started"]
cev = self.client_keyvault_listener.started_events
self.assertEqual(len(cev), 1)
self.assertEqual(cev[0].command_name, "find")
self.assertEqual(cev[0].database_name, "keyvault")
@ -1704,7 +1704,7 @@ class TestDeadlockProse(EncryptionIntegrationTest):
),
)
cev = self.client_listener.results["started"]
cev = self.client_listener.started_events
self.assertEqual(len(cev), 5)
self.assertEqual(cev[0].command_name, "listCollections")
self.assertEqual(cev[0].database_name, "db")
@ -1727,7 +1727,7 @@ class TestDeadlockProse(EncryptionIntegrationTest):
),
)
cev = self.client_listener.results["started"]
cev = self.client_listener.started_events
self.assertEqual(len(cev), 3)
self.assertEqual(cev[0].command_name, "listCollections")
self.assertEqual(cev[0].database_name, "db")
@ -1736,7 +1736,7 @@ class TestDeadlockProse(EncryptionIntegrationTest):
self.assertEqual(cev[2].command_name, "find")
self.assertEqual(cev[2].database_name, "db")
cev = self.client_keyvault_listener.results["started"]
cev = self.client_keyvault_listener.started_events
self.assertEqual(len(cev), 1)
self.assertEqual(cev[0].command_name, "find")
self.assertEqual(cev[0].database_name, "keyvault")
@ -1751,7 +1751,7 @@ class TestDeadlockProse(EncryptionIntegrationTest):
),
)
cev = self.client_listener.results["started"]
cev = self.client_listener.started_events
self.assertEqual(len(cev), 2)
self.assertEqual(cev[0].command_name, "find")
self.assertEqual(cev[0].database_name, "db")
@ -1768,12 +1768,12 @@ class TestDeadlockProse(EncryptionIntegrationTest):
),
)
cev = self.client_listener.results["started"]
cev = self.client_listener.started_events
self.assertEqual(len(cev), 1)
self.assertEqual(cev[0].command_name, "find")
self.assertEqual(cev[0].database_name, "db")
cev = self.client_keyvault_listener.results["started"]
cev = self.client_keyvault_listener.started_events
self.assertEqual(len(cev), 1)
self.assertEqual(cev[0].command_name, "find")
self.assertEqual(cev[0].database_name, "keyvault")
@ -1821,8 +1821,8 @@ class TestDecryptProse(EncryptionIntegrationTest):
):
with self.assertRaises(OperationFailure):
self.encrypted_client.db.decryption_events.aggregate([])
self.assertEqual(len(self.listener.results["failed"]), 1)
for event in self.listener.results["failed"]:
self.assertEqual(len(self.listener.failed_events), 1)
for event in self.listener.failed_events:
self.assertEqual(event.failure["code"], 123)
def test_02_network_error(self):
@ -1834,8 +1834,8 @@ class TestDecryptProse(EncryptionIntegrationTest):
):
with self.assertRaises(AutoReconnect):
self.encrypted_client.db.decryption_events.aggregate([])
self.assertEqual(len(self.listener.results["failed"]), 1)
self.assertEqual(self.listener.results["failed"][0].command_name, "aggregate")
self.assertEqual(len(self.listener.failed_events), 1)
self.assertEqual(self.listener.failed_events[0].command_name, "aggregate")
def test_03_decrypt_error(self):
self.encrypted_client.db.decryption_events.insert_one(
@ -1843,8 +1843,8 @@ class TestDecryptProse(EncryptionIntegrationTest):
)
with self.assertRaises(EncryptionError):
next(self.encrypted_client.db.decryption_events.aggregate([]))
event = self.listener.results["succeeded"][0]
self.assertEqual(len(self.listener.results["failed"]), 0)
event = self.listener.succeeded_events[0]
self.assertEqual(len(self.listener.failed_events), 0)
self.assertEqual(
event.reply["cursor"]["firstBatch"][0]["encrypted"], self.malformed_cipher_text
)
@ -1852,8 +1852,8 @@ class TestDecryptProse(EncryptionIntegrationTest):
def test_04_decrypt_success(self):
self.encrypted_client.db.decryption_events.insert_one({"encrypted": self.cipher_text})
next(self.encrypted_client.db.decryption_events.aggregate([]))
event = self.listener.results["succeeded"][0]
self.assertEqual(len(self.listener.results["failed"]), 0)
event = self.listener.succeeded_events[0]
self.assertEqual(len(self.listener.failed_events), 0)
self.assertEqual(event.reply["cursor"]["firstBatch"][0]["encrypted"], self.cipher_text)

View File

@ -49,15 +49,14 @@ class TestCommandMonitoring(IntegrationTest):
super(TestCommandMonitoring, cls).tearDownClass()
def tearDown(self):
self.listener.results.clear()
self.listener.reset()
super(TestCommandMonitoring, self).tearDown()
def test_started_simple(self):
self.client.pymongo_test.command("ping")
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(SON([("ping", 1)]), started.command)
@ -68,10 +67,9 @@ class TestCommandMonitoring(IntegrationTest):
def test_succeeded_simple(self):
self.client.pymongo_test.command("ping")
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertEqual("ping", succeeded.command_name)
@ -85,10 +83,9 @@ class TestCommandMonitoring(IntegrationTest):
self.client.pymongo_test.command("oops!")
except OperationFailure:
pass
results = self.listener.results
started = results["started"][0]
failed = results["failed"][0]
self.assertEqual(0, len(results["succeeded"]))
started = self.listener.started_events[0]
failed = self.listener.failed_events[0]
self.assertEqual(0, len(self.listener.succeeded_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent))
self.assertEqual("oops!", failed.command_name)
@ -99,10 +96,9 @@ class TestCommandMonitoring(IntegrationTest):
def test_find_one(self):
self.client.pymongo_test.test.find_one()
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
@ -117,15 +113,14 @@ class TestCommandMonitoring(IntegrationTest):
def test_find_and_get_more(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
self.listener.results.clear()
self.listener.reset()
cursor = self.client.pymongo_test.test.find(projection={"_id": False}, batch_size=4)
for _ in range(4):
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON(
@ -147,15 +142,14 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(csr["ns"], "pymongo_test.test")
self.assertEqual(csr["firstBatch"], [{} for _ in range(4)])
self.listener.results.clear()
self.listener.reset()
# Next batch. Exhausting the cursor could cause a getMore
# that returns id of 0 and no results.
next(cursor)
try:
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 4)]),
@ -182,16 +176,15 @@ class TestCommandMonitoring(IntegrationTest):
cmd = SON([("explain", SON([("find", "test"), ("filter", {})]))])
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_one({})
self.listener.results.clear()
self.listener.reset()
coll = self.client.pymongo_test.test
# Test that we publish the unwrapped command.
if self.client.is_mongos:
coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED)
res = coll.find().explain()
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(cmd, started.command)
self.assertEqual("explain", started.command_name)
@ -212,7 +205,7 @@ class TestCommandMonitoring(IntegrationTest):
coll.insert_many([{"x": i} for i in range(5)])
# Test that we publish the unwrapped command.
self.listener.results.clear()
self.listener.reset()
if self.client.is_mongos:
coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED)
@ -220,10 +213,9 @@ class TestCommandMonitoring(IntegrationTest):
next(cursor)
try:
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(expected_cmd, started.command)
self.assertEqual("find", started.command_name)
@ -293,7 +285,7 @@ class TestCommandMonitoring(IntegrationTest):
def test_command_and_get_more(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{"x": 1} for _ in range(10)])
self.listener.results.clear()
self.listener.reset()
coll = self.client.pymongo_test.test
# Test that we publish the unwrapped command.
if self.client.is_mongos:
@ -302,10 +294,9 @@ class TestCommandMonitoring(IntegrationTest):
for _ in range(4):
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON(
@ -333,13 +324,12 @@ class TestCommandMonitoring(IntegrationTest):
}
self.assertEqualCommand(expected_cursor, succeeded.reply.get("cursor"))
self.listener.results.clear()
self.listener.reset()
next(cursor)
try:
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 4)]),
@ -377,10 +367,9 @@ class TestCommandMonitoring(IntegrationTest):
next(cursor)
except Exception:
pass
results = self.listener.results
started = results["started"][0]
self.assertEqual(0, len(results["succeeded"]))
failed = results["failed"][0]
started = self.listener.started_events[0]
self.assertEqual(0, len(self.listener.succeeded_events))
failed = self.listener.failed_events[0]
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([("getMore", cursor_id), ("collection", "test")]), started.command
@ -403,16 +392,15 @@ class TestCommandMonitoring(IntegrationTest):
client = single_client(*address, event_listeners=[self.listener])
# Clear authentication command results from the listener.
client.admin.command("ping")
self.listener.results.clear()
self.listener.reset()
error = None
try:
client.pymongo_test.test.find_one_and_delete({})
except NotPrimaryError as exc:
error = exc.errors
results = self.listener.results
started = results["started"][0]
failed = results["failed"][0]
self.assertEqual(0, len(results["succeeded"]))
started = self.listener.started_events[0]
failed = self.listener.failed_events[0]
self.assertEqual(0, len(self.listener.succeeded_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent))
self.assertEqual("findAndModify", failed.command_name)
@ -426,16 +414,15 @@ class TestCommandMonitoring(IntegrationTest):
def test_exhaust(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{} for _ in range(11)])
self.listener.results.clear()
self.listener.reset()
cursor = self.client.pymongo_test.test.find(
projection={"_id": False}, batch_size=5, cursor_type=CursorType.EXHAUST
)
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON(
@ -462,11 +449,10 @@ class TestCommandMonitoring(IntegrationTest):
}
self.assertEqualReply(expected_result, succeeded.reply)
self.listener.results.clear()
self.listener.reset()
tuple(cursor)
results = self.listener.results
self.assertEqual(0, len(results["failed"]))
for event in results["started"]:
self.assertEqual(0, len(self.listener.failed_events))
for event in self.listener.started_events:
self.assertTrue(isinstance(event, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 5)]),
@ -476,14 +462,14 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(cursor.address, event.connection_id)
self.assertEqual("pymongo_test", event.database_name)
self.assertTrue(isinstance(event.request_id, int))
for event in results["succeeded"]:
for event in self.listener.succeeded_events:
self.assertTrue(isinstance(event, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(event.duration_micros, int))
self.assertEqual("getMore", event.command_name)
self.assertTrue(isinstance(event.request_id, int))
self.assertEqual(cursor.address, event.connection_id)
# Last getMore receives a response with cursor id 0.
self.assertEqual(0, results["succeeded"][-1].reply["cursor"]["id"])
self.assertEqual(0, self.listener.succeeded_events[-1].reply["cursor"]["id"])
def test_kill_cursors(self):
with client_knobs(kill_cursor_frequency=0.01):
@ -492,13 +478,12 @@ class TestCommandMonitoring(IntegrationTest):
cursor = self.client.pymongo_test.test.find().batch_size(5)
next(cursor)
cursor_id = cursor.cursor_id
self.listener.results.clear()
self.listener.reset()
cursor.close()
time.sleep(2)
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
# There could be more than one cursor_id here depending on
# when the thread last ran.
@ -524,14 +509,13 @@ class TestCommandMonitoring(IntegrationTest):
def test_non_bulk_writes(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
self.listener.reset()
# Implied write concern insert_one
res = coll.insert_one({"x": 1})
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -555,13 +539,12 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(1, reply.get("n"))
# Unacknowledged insert_one
self.listener.results.clear()
self.listener.reset()
coll = coll.with_options(write_concern=WriteConcern(w=0))
res = coll.insert_one({"x": 1})
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -584,13 +567,12 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqualReply(succeeded.reply, {"ok": 1})
# Explicit write concern insert_one
self.listener.results.clear()
self.listener.reset()
coll = coll.with_options(write_concern=WriteConcern(w=1))
res = coll.insert_one({"x": 1})
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -615,12 +597,11 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(1, reply.get("n"))
# delete_many
self.listener.results.clear()
self.listener.reset()
res = coll.delete_many({"x": 1})
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -645,13 +626,12 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(res.deleted_count, reply.get("n"))
# replace_one
self.listener.results.clear()
self.listener.reset()
oid = ObjectId()
res = coll.replace_one({"_id": oid}, {"_id": oid, "x": 1}, upsert=True)
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -689,12 +669,11 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual([{"index": 0, "_id": oid}], reply.get("upserted"))
# update_one
self.listener.results.clear()
self.listener.reset()
res = coll.update_one({"x": 1}, {"$inc": {"x": 1}})
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -731,12 +710,11 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(1, reply.get("n"))
# update_many
self.listener.results.clear()
self.listener.reset()
res = coll.update_many({"x": 2}, {"$inc": {"x": 1}})
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -773,12 +751,11 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(1, reply.get("n"))
# delete_one
self.listener.results.clear()
self.listener.reset()
_ = coll.delete_one({"x": 3})
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -807,14 +784,13 @@ class TestCommandMonitoring(IntegrationTest):
# write errors
coll.insert_one({"_id": 1})
try:
self.listener.results.clear()
self.listener.reset()
coll.insert_one({"_id": 1})
except OperationFailure:
pass
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON(
[
@ -848,15 +824,14 @@ class TestCommandMonitoring(IntegrationTest):
# This always uses the bulk API.
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
self.listener.reset()
big = "x" * (1024 * 1024 * 4)
docs = [{"_id": i, "big": big} for i in range(6)]
coll.insert_many(docs)
results = self.listener.results
started = results["started"]
succeeded = results["succeeded"]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events
succeeded = self.listener.succeeded_events
self.assertEqual(0, len(self.listener.failed_events))
documents = []
count = 0
operation_id = started[0].operation_id
@ -889,16 +864,15 @@ class TestCommandMonitoring(IntegrationTest):
coll = self.client.pymongo_test.test
coll.drop()
unack_coll = coll.with_options(write_concern=WriteConcern(w=0))
self.listener.results.clear()
self.listener.reset()
# Force two batches on legacy servers.
big = "x" * (1024 * 1024 * 12)
docs = [{"_id": i, "big": big} for i in range(6)]
unack_coll.insert_many(docs)
results = self.listener.results
started = results["started"]
succeeded = results["succeeded"]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events
succeeded = self.listener.succeeded_events
self.assertEqual(0, len(self.listener.failed_events))
documents = []
operation_id = started[0].operation_id
self.assertIsInstance(operation_id, int)
@ -928,7 +902,7 @@ class TestCommandMonitoring(IntegrationTest):
def test_bulk_write(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
self.listener.reset()
coll.bulk_write(
[
@ -937,10 +911,9 @@ class TestCommandMonitoring(IntegrationTest):
DeleteOne({"_id": 1}),
]
)
results = self.listener.results
started = results["started"]
succeeded = results["succeeded"]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events
succeeded = self.listener.succeeded_events
self.assertEqual(0, len(self.listener.failed_events))
operation_id = started[0].operation_id
pairs = list(zip(started, succeeded))
self.assertEqual(3, len(pairs))
@ -991,7 +964,7 @@ class TestCommandMonitoring(IntegrationTest):
@client_context.require_failCommand_fail_point
def test_bulk_write_command_network_error(self):
coll = self.client.pymongo_test.test
self.listener.results.clear()
self.listener.reset()
insert_network_error = {
"configureFailPoint": "failCommand",
@ -1004,7 +977,7 @@ class TestCommandMonitoring(IntegrationTest):
with self.fail_point(insert_network_error):
with self.assertRaises(AutoReconnect):
coll.bulk_write([InsertOne({"_id": 1})])
failed = self.listener.results["failed"]
failed = self.listener.failed_events
self.assertEqual(1, len(failed))
event = failed[0]
self.assertEqual(event.command_name, "insert")
@ -1015,7 +988,7 @@ class TestCommandMonitoring(IntegrationTest):
@client_context.require_failCommand_fail_point
def test_bulk_write_command_error(self):
coll = self.client.pymongo_test.test
self.listener.results.clear()
self.listener.reset()
insert_command_error = {
"configureFailPoint": "failCommand",
@ -1029,7 +1002,7 @@ class TestCommandMonitoring(IntegrationTest):
with self.fail_point(insert_command_error):
with self.assertRaises(NotPrimaryError):
coll.bulk_write([InsertOne({"_id": 1})])
failed = self.listener.results["failed"]
failed = self.listener.failed_events
self.assertEqual(1, len(failed))
event = failed[0]
self.assertEqual(event.command_name, "insert")
@ -1040,7 +1013,7 @@ class TestCommandMonitoring(IntegrationTest):
def test_write_errors(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
self.listener.reset()
try:
coll.bulk_write(
@ -1054,10 +1027,9 @@ class TestCommandMonitoring(IntegrationTest):
)
except OperationFailure:
pass
results = self.listener.results
started = results["started"]
succeeded = results["succeeded"]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events
succeeded = self.listener.succeeded_events
self.assertEqual(0, len(self.listener.failed_events))
operation_id = started[0].operation_id
pairs = list(zip(started, succeeded))
errors = []
@ -1084,12 +1056,11 @@ class TestCommandMonitoring(IntegrationTest):
def test_first_batch_helper(self):
# Regardless of server version and use of helpers._first_batch
# this test should still pass.
self.listener.results.clear()
self.listener.reset()
tuple(self.client.pymongo_test.test.list_indexes())
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([("listIndexes", "test"), ("cursor", {})])
self.assertEqualCommand(expected, started.command)
@ -1105,22 +1076,21 @@ class TestCommandMonitoring(IntegrationTest):
self.assertTrue("cursor" in succeeded.reply)
self.assertTrue("ok" in succeeded.reply)
self.listener.results.clear()
self.listener.reset()
def test_sensitive_commands(self):
listeners = self.client._event_listeners
self.listener.results.clear()
self.listener.reset()
cmd = SON([("getnonce", 1)])
listeners.publish_command_start(cmd, "pymongo_test", 12345, self.client.address)
delta = datetime.timedelta(milliseconds=100)
listeners.publish_command_success(
delta, {"nonce": "e474f4561c5eb40b", "ok": 1.0}, "getnonce", 12345, self.client.address
)
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
self.assertEqual({}, started.command)
self.assertEqual("pymongo_test", started.database_name)
@ -1159,14 +1129,13 @@ class TestGlobalListener(IntegrationTest):
def setUp(self):
super(TestGlobalListener, self).setUp()
self.listener.results.clear()
self.listener.reset()
def test_simple(self):
self.client.pymongo_test.command("ping")
results = self.listener.results
started = results["started"][0]
succeeded = results["succeeded"][0]
self.assertEqual(0, len(results["failed"]))
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
self.assertEqual(0, len(self.listener.failed_events))
self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(SON([("ping", 1)]), started.command)

View File

@ -14,6 +14,11 @@
"""Test the read_concern module."""
import sys
import unittest
sys.path[0:0] = [""]
from test import IntegrationTest, client_context
from test.utils import OvertCommandListener, rs_or_single_client, single_client
@ -41,7 +46,7 @@ class TestReadConcern(IntegrationTest):
super(TestReadConcern, cls).tearDownClass()
def tearDown(self):
self.listener.results.clear()
self.listener.reset()
super(TestReadConcern, self).tearDown()
def test_read_concern(self):
@ -74,9 +79,9 @@ class TestReadConcern(IntegrationTest):
# readConcern not sent in command if not specified.
coll = self.db.coll
tuple(coll.find({"field": "value"}))
self.assertNotIn("readConcern", self.listener.results["started"][0].command)
self.assertNotIn("readConcern", self.listener.started_events[0].command)
self.listener.results.clear()
self.listener.reset()
# Explicitly set readConcern to 'local'.
coll = self.db.get_collection("coll", read_concern=ReadConcern("local"))
@ -89,23 +94,21 @@ class TestReadConcern(IntegrationTest):
("readConcern", {"level": "local"}),
]
),
self.listener.results["started"][0].command,
self.listener.started_events[0].command,
)
def test_command_cursor(self):
# readConcern not sent in command if not specified.
coll = self.db.coll
tuple(coll.aggregate([{"$match": {"field": "value"}}]))
self.assertNotIn("readConcern", self.listener.results["started"][0].command)
self.assertNotIn("readConcern", self.listener.started_events[0].command)
self.listener.results.clear()
self.listener.reset()
# Explicitly set readConcern to 'local'.
coll = self.db.get_collection("coll", read_concern=ReadConcern("local"))
tuple(coll.aggregate([{"$match": {"field": "value"}}]))
self.assertEqual(
{"level": "local"}, self.listener.results["started"][0].command["readConcern"]
)
self.assertEqual({"level": "local"}, self.listener.started_events[0].command["readConcern"])
def test_aggregate_out(self):
coll = self.db.get_collection("coll", read_concern=ReadConcern("local"))
@ -113,6 +116,10 @@ class TestReadConcern(IntegrationTest):
# Aggregate with $out supports readConcern MongoDB 4.2 onwards.
if client_context.version >= (4, 1):
self.assertIn("readConcern", self.listener.results["started"][0].command)
self.assertIn("readConcern", self.listener.started_events[0].command)
else:
self.assertNotIn("readConcern", self.listener.results["started"][0].command)
self.assertNotIn("readConcern", self.listener.started_events[0].command)
if __name__ == "__main__":
unittest.main()

View File

@ -541,7 +541,7 @@ class TestMongosAndReadPreference(IntegrationTest):
coll = client.test.get_collection("test", read_preference=pref)
listener.reset()
coll.find_one()
started = listener.results["started"]
started = listener.started_events
self.assertEqual(len(started), 1, started)
cmd = started[0].command
if client_context.is_rs or client_context.is_mongos:

View File

@ -85,11 +85,11 @@ class TestReadWriteConcernSpec(IntegrationTest):
]
for name, f in ops:
listener.results.clear()
listener.reset()
f()
self.assertGreaterEqual(len(listener.results["started"]), 1)
for i, event in enumerate(listener.results["started"]):
self.assertGreaterEqual(len(listener.started_events), 1)
for i, event in enumerate(listener.started_events):
self.assertNotIn(
"readConcern",
event.command,
@ -221,7 +221,7 @@ class TestReadWriteConcernSpec(IntegrationTest):
self.assertIsNotNone(ctx.exception.details)
assert ctx.exception.details is not None
self.assertIsNotNone(ctx.exception.details.get("errInfo"))
for event in listener.results["succeeded"]:
for event in listener.succeeded_events:
if event.command_name == "insert":
self.assertEqual(event.reply["writeErrors"][0], ctx.exception.details)
break

View File

@ -208,12 +208,12 @@ class TestPoolPausedError(IntegrationTest):
# Connection check out failures are not reflected in command
# monitoring because we only publish command events _after_ checking
# out a connection.
started = cmd_listener.results["started"]
started = cmd_listener.started_events
msg = pprint.pformat(cmd_listener.results)
self.assertEqual(3, len(started), msg)
succeeded = cmd_listener.results["succeeded"]
succeeded = cmd_listener.succeeded_events
self.assertEqual(2, len(succeeded), msg)
failed = cmd_listener.results["failed"]
failed = cmd_listener.failed_events
self.assertEqual(1, len(failed), msg)

View File

@ -227,9 +227,9 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
self.addCleanup(client.close)
for method, args, kwargs in retryable_single_statement_ops(client.db.retryable_write_test):
msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs)
listener.results.clear()
listener.reset()
method(*args, **kwargs)
for event in listener.results["started"]:
for event in listener.started_events:
self.assertNotIn(
"txnNumber",
event.command,
@ -240,10 +240,10 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
def test_supported_single_statement_supported_cluster(self):
for method, args, kwargs in retryable_single_statement_ops(self.db.retryable_write_test):
msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs)
self.listener.results.clear()
self.listener.reset()
method(*args, **kwargs)
commands_started = self.listener.results["started"]
self.assertEqual(len(self.listener.results["succeeded"]), 1, msg)
commands_started = self.listener.started_events
self.assertEqual(len(self.listener.succeeded_events), 1, msg)
first_attempt = commands_started[0]
self.assertIn(
"lsid",
@ -283,10 +283,10 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
for method, args, kwargs in retryable_single_statement_ops(self.db.retryable_write_test):
msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs)
self.listener.results.clear()
self.listener.reset()
method(*args, **kwargs)
for event in self.listener.results["started"]:
for event in self.listener.started_events:
self.assertNotIn(
"txnNumber",
event.command,
@ -301,11 +301,11 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
coll
) + retryable_single_statement_ops(coll_w0):
msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs)
self.listener.results.clear()
self.listener.reset()
method(*args, **kwargs)
started_events = self.listener.results["started"]
self.assertEqual(len(self.listener.results["succeeded"]), len(started_events), msg)
self.assertEqual(len(self.listener.results["failed"]), 0, msg)
started_events = self.listener.started_events
self.assertEqual(len(self.listener.succeeded_events), len(started_events), msg)
self.assertEqual(len(self.listener.failed_events), 0, msg)
for event in started_events:
self.assertNotIn(
"txnNumber",
@ -324,10 +324,10 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
)
for method, args, kwargs in retryable_single_statement_ops(client.db.retryable_write_test):
msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs)
listener.results.clear()
listener.reset()
with self.assertRaises(ServerSelectionTimeoutError, msg=msg):
method(*args, **kwargs)
self.assertEqual(len(listener.results["started"]), 0, msg)
self.assertEqual(len(listener.started_events), 0, msg)
@client_context.require_replica_set
@client_context.require_test_commands
@ -353,11 +353,11 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
for method, args, kwargs in retryable_single_statement_ops(client.db.retryable_write_test):
msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs)
listener.results.clear()
listener.reset()
topology.select_server = mock_select_server
with self.assertRaises(ConnectionFailure, msg=msg):
method(*args, **kwargs)
self.assertEqual(len(listener.results["started"]), 1, msg)
self.assertEqual(len(listener.started_events), 1, msg)
@client_context.require_replica_set
@client_context.require_test_commands
@ -366,7 +366,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
large = "s" * 1024 * 1024 * 15
coll = self.db.retryable_write_test
coll.delete_many({})
self.listener.results.clear()
self.listener.reset()
bulk_result = coll.bulk_write(
[
InsertOne({"_id": 1, "l": large}),
@ -381,7 +381,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
# Each command should fail and be retried.
# With OP_MSG 3 inserts are one batch. 2 updates another.
# 2 deletes a third.
self.assertEqual(len(self.listener.results["started"]), 6)
self.assertEqual(len(self.listener.started_events), 6)
self.assertEqual(coll.find_one(), {"_id": 1, "count": 1})
# Assert the final result
expected_result = {
@ -412,7 +412,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
]
)
)
self.listener.results.clear()
self.listener.reset()
with self.client.start_session() as session:
initial_txn = session._server_session._transaction_id
try:
@ -430,9 +430,9 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
else:
self.fail("bulk_write should have failed")
started = self.listener.results["started"]
started = self.listener.started_events
self.assertEqual(len(started), 3)
self.assertEqual(len(self.listener.results["succeeded"]), 1)
self.assertEqual(len(self.listener.succeeded_events), 1)
expected_txn = Int64(initial_txn + 1)
self.assertEqual(started[0].command["txnNumber"], expected_txn)
self.assertEqual(started[0].command["lsid"], session.session_id)
@ -483,9 +483,7 @@ class TestWriteConcernError(IntegrationTest):
if client_context.version >= Version(4, 4):
# In MongoDB 4.4+ we rely on the server returning the error label.
self.assertIn(
"RetryableWriteError", listener.results["succeeded"][-1].reply["errorLabels"]
)
self.assertIn("RetryableWriteError", listener.succeeded_events[-1].reply["errorLabels"])
@client_context.require_version_min(4, 4)
def test_RetryableWriteError_error_label_RawBSONDocument(self):
@ -575,12 +573,12 @@ class TestPoolPausedError(IntegrationTest):
# Connection check out failures are not reflected in command
# monitoring because we only publish command events _after_ checking
# out a connection.
started = cmd_listener.results["started"]
started = cmd_listener.started_events
msg = pprint.pformat(cmd_listener.results)
self.assertEqual(3, len(started), msg)
succeeded = cmd_listener.results["succeeded"]
succeeded = cmd_listener.succeeded_events
self.assertEqual(2, len(succeeded), msg)
failed = cmd_listener.results["failed"]
failed = cmd_listener.failed_events
self.assertEqual(1, len(failed), msg)
@ -605,7 +603,7 @@ class TestRetryableWritesTxnNumber(IgnoreDeprecationsTest):
raise ConnectionFailure("Connection refused")
for method, args, kwargs in retryable_single_statement_ops(client.db.retryable_write_test):
listener.results.clear()
listener.reset()
topology.select_server = raise_connection_err_select_server
with client.start_session() as session:
kwargs = copy.deepcopy(kwargs)
@ -616,8 +614,8 @@ class TestRetryableWritesTxnNumber(IgnoreDeprecationsTest):
# Each operation should fail on the first attempt and succeed
# on the second.
method(*args, **kwargs)
self.assertEqual(len(listener.results["started"]), 1, msg)
retry_cmd = listener.results["started"][0].command
self.assertEqual(len(listener.started_events), 1, msg)
retry_cmd = listener.started_events[0].command
sent_txn_id = retry_cmd["txnNumber"]
final_txn_id = session._server_session.transaction_id
self.assertEqual(Int64(initial_txn_id + 1), sent_txn_id, msg)

View File

@ -97,7 +97,7 @@ class TestCustomServerSelectorFunction(IntegrationTest):
coll.find_one({"name": "John Doe"})
# Confirm all find commands are run against appropriate host.
for command in listener.results["started"]:
for command in listener.started_events:
if command.command_name == "find":
self.assertEqual(command.connection_id[1], expected_port)

View File

@ -115,7 +115,7 @@ class TestProse(IntegrationTest):
for thread in threads:
self.assertTrue(thread.passed)
events = listener.results["started"]
events = listener.started_events
self.assertEqual(len(events), n_finds * N_THREADS)
nodes = client.nodes
self.assertEqual(len(nodes), 2)

View File

@ -58,9 +58,9 @@ class SessionTestListener(EventListener):
super(SessionTestListener, self).failed(event)
def first_command_started(self):
assert len(self.results["started"]) >= 1, "No command-started events"
assert len(self.started_events) >= 1, "No command-started events"
return self.results["started"][0]
return self.started_events[0]
def session_ids(client):
@ -103,7 +103,7 @@ class TestSession(IntegrationTest):
"""All sessions used in the test must be returned to the pool."""
self.client.drop_database("pymongo_test")
used_lsids = self.initial_lsids.copy()
for event in self.session_checker_listener.results["started"]:
for event in self.session_checker_listener.started_events:
if "lsid" in event.command:
used_lsids.add(event.command["lsid"]["id"])
@ -118,15 +118,15 @@ class TestSession(IntegrationTest):
last_use = s._server_session.last_use
start = time.monotonic()
self.assertLessEqual(last_use, start)
listener.results.clear()
listener.reset()
# In case "f" modifies its inputs.
args = copy.copy(args)
kw = copy.copy(kw)
kw["session"] = s
f(*args, **kw)
self.assertGreaterEqual(s._server_session.last_use, start)
self.assertGreaterEqual(len(listener.results["started"]), 1)
for event in listener.results["started"]:
self.assertGreaterEqual(len(listener.started_events), 1)
for event in listener.started_events:
self.assertTrue(
"lsid" in event.command,
"%s sent no lsid with %s" % (f.__name__, event.command_name),
@ -157,11 +157,11 @@ class TestSession(IntegrationTest):
# No explicit session.
for f, args, kw in ops:
listener.results.clear()
listener.reset()
f(*args, **kw)
self.assertGreaterEqual(len(listener.results["started"]), 1)
self.assertGreaterEqual(len(listener.started_events), 1)
lsids = []
for event in listener.results["started"]:
for event in listener.started_events:
self.assertTrue(
"lsid" in event.command,
"%s sent no lsid with %s" % (f.__name__, event.command_name),
@ -205,7 +205,7 @@ class TestSession(IntegrationTest):
(client.db.list_collections, []),
]
threads = []
listener.results.clear()
listener.reset()
def thread_target(op, *args):
res = op(*args)
@ -225,7 +225,7 @@ class TestSession(IntegrationTest):
self.assertIsNone(thread.exc)
client.close()
lsid_set.clear()
for i in listener.results["started"]:
for i in listener.started_events:
if i.command.get("lsid"):
lsid_set.add(i.command.get("lsid")["id"])
if len(lsid_set) == 1:
@ -280,13 +280,13 @@ class TestSession(IntegrationTest):
self.assertEqual(len(client._topology._session_pool), _MAX_END_SESSIONS + 1)
client.close()
self.assertEqual(len(client._topology._session_pool), 0)
end_sessions = [e for e in listener.results["started"] if e.command_name == "endSessions"]
end_sessions = [e for e in listener.started_events if e.command_name == "endSessions"]
self.assertEqual(len(end_sessions), 2)
# Closing again should not send any commands.
listener.results.clear()
listener.reset()
client.close()
self.assertEqual(len(listener.results["started"]), 0)
self.assertEqual(len(listener.started_events), 0)
def test_client(self):
client = self.client
@ -399,10 +399,10 @@ class TestSession(IntegrationTest):
for name, f in ops:
with client.start_session() as s:
listener.results.clear()
listener.reset()
f(session=s)
self.assertGreaterEqual(len(listener.results["started"]), 1)
for event in listener.results["started"]:
self.assertGreaterEqual(len(listener.started_events), 1)
for event in listener.started_events:
self.assertTrue(
"lsid" in event.command,
"%s sent no lsid with %s" % (name, event.command_name),
@ -419,7 +419,7 @@ class TestSession(IntegrationTest):
# No explicit session.
for name, f in ops:
listener.results.clear()
listener.reset()
f(session=None)
event0 = listener.first_command_started()
self.assertTrue(
@ -428,7 +428,7 @@ class TestSession(IntegrationTest):
lsid = event0.command["lsid"]
for event in listener.results["started"][1:]:
for event in listener.started_events[1:]:
self.assertTrue(
"lsid" in event.command, "%s sent no lsid with %s" % (name, event.command_name)
)
@ -600,7 +600,7 @@ class TestSession(IntegrationTest):
# 3.6.0 mongos only validates the aggregate pipeline when the
# database exists.
coll.insert_one({})
listener.results.clear()
listener.reset()
with self.assertRaises(OperationFailure):
coll.aggregate([{"$badOperation": {"bar": 1}}])
@ -687,7 +687,7 @@ class TestSession(IntegrationTest):
for f, args, kw in ops:
with client.start_session() as s:
listener.results.clear()
listener.reset()
# In case "f" modifies its inputs.
args = copy.copy(args)
kw = copy.copy(kw)
@ -698,7 +698,7 @@ class TestSession(IntegrationTest):
f(*args, **kw)
if f.__name__ == "create_collection":
# create_collection runs listCollections first.
event = listener.results["started"].pop(0)
event = listener.started_events.pop(0)
self.assertEqual("listCollections", event.command_name)
self.assertIn(
"lsid",
@ -707,19 +707,19 @@ class TestSession(IntegrationTest):
)
# Should not run any command before raising an error.
self.assertFalse(listener.results["started"], "%s sent command" % (f.__name__,))
self.assertFalse(listener.started_events, "%s sent command" % (f.__name__,))
self.assertTrue(s.has_ended)
# Unacknowledged write without a session does not send an lsid.
for f, args, kw in ops:
listener.results.clear()
listener.reset()
f(*args, **kw)
self.assertGreaterEqual(len(listener.results["started"]), 1)
self.assertGreaterEqual(len(listener.started_events), 1)
if f.__name__ == "create_collection":
# create_collection runs listCollections first.
event = listener.results["started"].pop(0)
event = listener.started_events.pop(0)
self.assertEqual("listCollections", event.command_name)
self.assertIn(
"lsid",
@ -727,7 +727,7 @@ class TestSession(IntegrationTest):
"%s sent no lsid with %s" % (f.__name__, event.command_name),
)
for event in listener.results["started"]:
for event in listener.started_events:
self.assertNotIn(
"lsid", event.command, "%s sent lsid with %s" % (f.__name__, event.command_name)
)
@ -799,26 +799,26 @@ class TestCausalConsistency(unittest.TestCase):
with self.client.start_session() as sess:
self.assertIsNone(sess.cluster_time)
self.assertIsNone(sess.operation_time)
self.listener.results.clear()
self.listener.reset()
self.client.pymongo_test.test.find_one(session=sess)
started = self.listener.results["started"][0]
started = self.listener.started_events[0]
cmd = started.command
self.assertIsNone(cmd.get("readConcern"))
op_time = sess.operation_time
self.assertIsNotNone(op_time)
succeeded = self.listener.results["succeeded"][0]
succeeded = self.listener.succeeded_events[0]
reply = succeeded.reply
self.assertEqual(op_time, reply.get("operationTime"))
# No explicit session
self.client.pymongo_test.test.insert_one({})
self.assertEqual(sess.operation_time, op_time)
self.listener.results.clear()
self.listener.reset()
try:
self.client.pymongo_test.command("doesntexist", session=sess)
except:
pass
failed = self.listener.results["failed"][0]
failed = self.listener.failed_events[0]
failed_op_time = failed.failure.get("operationTime")
# Some older builds of MongoDB 3.5 / 3.6 return None for
# operationTime when a command fails. Make sure we don't
@ -848,14 +848,14 @@ class TestCausalConsistency(unittest.TestCase):
coll.find_one({}, session=sess)
operation_time = sess.operation_time
self.assertIsNotNone(operation_time)
self.listener.results.clear()
self.listener.reset()
if exception:
with self.assertRaises(exception):
op(coll, sess)
else:
op(coll, sess)
act = (
self.listener.results["started"][0]
self.listener.started_events[0]
.command.get("readConcern", {})
.get("afterClusterTime")
)
@ -887,10 +887,10 @@ class TestCausalConsistency(unittest.TestCase):
op(coll, sess)
operation_time = sess.operation_time
self.assertIsNotNone(operation_time)
self.listener.results.clear()
self.listener.reset()
coll.find_one({}, session=sess)
act = (
self.listener.results["started"][0]
self.listener.started_events[0]
.command.get("readConcern", {})
.get("afterClusterTime")
)
@ -938,9 +938,9 @@ class TestCausalConsistency(unittest.TestCase):
coll.find_one({}, session=sess)
operation_time = sess.operation_time
self.assertIsNotNone(operation_time)
self.listener.results.clear()
self.listener.reset()
op(coll, sess)
rc = self.listener.results["started"][0].command.get("readConcern")
rc = self.listener.started_events[0].command.get("readConcern")
self.assertIsNone(rc)
@client_context.require_no_standalone
@ -1001,19 +1001,19 @@ class TestCausalConsistency(unittest.TestCase):
coll.insert_many([{}, {}])
cursor = coll.find({}).batch_size(1)
next(cursor)
self.listener.results.clear()
self.listener.reset()
list(cursor)
started = self.listener.results["started"][0]
started = self.listener.started_events[0]
self.assertEqual(started.command_name, "getMore")
self.assertIsNone(started.command.get("readConcern"))
def test_session_not_causal(self):
with self.client.start_session(causal_consistency=False) as s:
self.client.pymongo_test.test.insert_one({}, session=s)
self.listener.results.clear()
self.listener.reset()
self.client.pymongo_test.test.find_one({}, session=s)
act = (
self.listener.results["started"][0]
self.listener.started_events[0]
.command.get("readConcern", {})
.get("afterClusterTime")
)
@ -1023,10 +1023,10 @@ class TestCausalConsistency(unittest.TestCase):
def test_server_not_causal(self):
with self.client.start_session(causal_consistency=True) as s:
self.client.pymongo_test.test.insert_one({}, session=s)
self.listener.results.clear()
self.listener.reset()
self.client.pymongo_test.test.find_one({}, session=s)
act = (
self.listener.results["started"][0]
self.listener.started_events[0]
.command.get("readConcern", {})
.get("afterClusterTime")
)
@ -1038,17 +1038,17 @@ class TestCausalConsistency(unittest.TestCase):
with self.client.start_session(causal_consistency=True) as s:
coll = self.client.pymongo_test.test
coll.insert_one({}, session=s)
self.listener.results.clear()
self.listener.reset()
coll.find_one({}, session=s)
read_concern = self.listener.results["started"][0].command.get("readConcern")
read_concern = self.listener.started_events[0].command.get("readConcern")
self.assertIsNotNone(read_concern)
self.assertIsNone(read_concern.get("level"))
self.assertIsNotNone(read_concern.get("afterClusterTime"))
coll = coll.with_options(read_concern=ReadConcern("majority"))
self.listener.results.clear()
self.listener.reset()
coll.find_one({}, session=s)
read_concern = self.listener.results["started"][0].command.get("readConcern")
read_concern = self.listener.started_events[0].command.get("readConcern")
self.assertIsNotNone(read_concern)
self.assertEqual(read_concern.get("level"), "majority")
self.assertIsNotNone(read_concern.get("afterClusterTime"))
@ -1056,17 +1056,17 @@ class TestCausalConsistency(unittest.TestCase):
@client_context.require_no_standalone
def test_cluster_time_with_server_support(self):
self.client.pymongo_test.test.insert_one({})
self.listener.results.clear()
self.listener.reset()
self.client.pymongo_test.test.find_one({})
after_cluster_time = self.listener.results["started"][0].command.get("$clusterTime")
after_cluster_time = self.listener.started_events[0].command.get("$clusterTime")
self.assertIsNotNone(after_cluster_time)
@client_context.require_standalone
def test_cluster_time_no_server_support(self):
self.client.pymongo_test.test.insert_one({})
self.listener.results.clear()
self.listener.reset()
self.client.pymongo_test.test.find_one({})
after_cluster_time = self.listener.results["started"][0].command.get("$clusterTime")
after_cluster_time = self.listener.started_events[0].command.get("$clusterTime")
self.assertIsNone(after_cluster_time)
@ -1129,22 +1129,22 @@ class TestClusterTime(IntegrationTest):
]
for name, f in ops:
listener.results.clear()
listener.reset()
# Call f() twice, insert to advance clusterTime, call f() again.
f()
f()
collection.insert_one({})
f()
self.assertGreaterEqual(len(listener.results["started"]), 1)
for i, event in enumerate(listener.results["started"]):
self.assertGreaterEqual(len(listener.started_events), 1)
for i, event in enumerate(listener.started_events):
self.assertTrue(
"$clusterTime" in event.command,
"%s sent no $clusterTime with %s" % (f.__name__, event.command_name),
)
if i > 0:
succeeded = listener.results["succeeded"][i - 1]
succeeded = listener.succeeded_events[i - 1]
self.assertTrue(
"$clusterTime" in succeeded.reply,
"%s received no $clusterTime with %s"

View File

@ -343,11 +343,11 @@ class TestTransactions(TransactionsBase):
self.assertEqual(
["insert", "insert", "commitTransaction"], listener.started_command_names()
)
first_cmd = listener.results["started"][0].command
first_cmd = listener.started_events[0].command
self.assertTrue(first_cmd["startTransaction"])
lsid = first_cmd["lsid"]
txn_number = first_cmd["txnNumber"]
for event in listener.results["started"][1:]:
for event in listener.started_events[1:]:
self.assertNotIn("startTransaction", event.command)
self.assertEqual(lsid, event.command["lsid"])
self.assertEqual(txn_number, event.command["txnNumber"])
@ -459,7 +459,7 @@ class TestTransactionsConvenientAPI(TransactionsBase):
# Create the collection.
coll.insert_one({})
listener.results.clear()
listener.reset()
with client.start_session() as s:
with PatchSessionTimeout(0):
with self.assertRaises(OperationFailure):
@ -491,7 +491,7 @@ class TestTransactionsConvenientAPI(TransactionsBase):
}
)
self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"})
listener.results.clear()
listener.reset()
with client.start_session() as s:
with PatchSessionTimeout(0):
@ -521,7 +521,7 @@ class TestTransactionsConvenientAPI(TransactionsBase):
}
)
self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"})
listener.results.clear()
listener.reset()
with client.start_session() as s:
with PatchSessionTimeout(0):

View File

@ -83,7 +83,7 @@ class TestServerApi(IntegrationTest):
self.addCleanup(coll.delete_many, {})
list(coll.find(batch_size=25))
client.admin.command("ping")
self.assertServerApiInAllCommands(listener.results["started"])
self.assertServerApiInAllCommands(listener.started_events)
@client_context.require_version_min(4, 7)
@client_context.require_transactions
@ -100,7 +100,7 @@ class TestServerApi(IntegrationTest):
coll.insert_many([{} for _ in range(100)], session=s)
list(coll.find(batch_size=25, session=s))
client.test.command("find", "test", session=s)
self.assertServerApiInAllCommands(listener.results["started"])
self.assertServerApiInAllCommands(listener.started_events)
if __name__ == "__main__":

View File

@ -29,7 +29,7 @@ import warnings
from collections import abc, defaultdict
from functools import partial
from test import client_context, db_pwd, db_user
from typing import Any
from typing import Any, List
from bson import json_util
from bson.objectid import ObjectId
@ -140,26 +140,43 @@ class CMAPListener(BaseListener, monitoring.ConnectionPoolListener):
self.add_event(event)
class EventListener(monitoring.CommandListener):
class EventListener(BaseListener, monitoring.CommandListener):
def __init__(self):
super(EventListener, self).__init__()
self.results = defaultdict(list)
def started(self, event):
self.results["started"].append(event)
@property
def started_events(self) -> List[monitoring.CommandStartedEvent]:
return self.results["started"]
def succeeded(self, event):
self.results["succeeded"].append(event)
@property
def succeeded_events(self) -> List[monitoring.CommandSucceededEvent]:
return self.results["succeeded"]
def failed(self, event):
self.results["failed"].append(event)
@property
def failed_events(self) -> List[monitoring.CommandFailedEvent]:
return self.results["failed"]
def started_command_names(self):
def started(self, event: monitoring.CommandStartedEvent) -> None:
self.started_events.append(event)
self.add_event(event)
def succeeded(self, event: monitoring.CommandSucceededEvent) -> None:
self.succeeded_events.append(event)
self.add_event(event)
def failed(self, event: monitoring.CommandFailedEvent) -> None:
self.failed_events.append(event)
self.add_event(event)
def started_command_names(self) -> List[str]:
"""Return list of command names started."""
return [event.command_name for event in self.results["started"]]
return [event.command_name for event in self.started_events]
def reset(self):
def reset(self) -> None:
"""Reset the state of this listener."""
self.results.clear()
super(EventListener, self).reset()
class TopologyEventListener(monitoring.TopologyListener):

View File

@ -371,16 +371,16 @@ class SpecRunner(IntegrationTest):
# TODO: factor with test_command_monitoring.py
def check_events(self, test, listener, session_ids):
res = listener.results
events = listener.started_events
if not len(test["expectations"]):
return
# Give a nicer message when there are missing or extra events
cmds = decode_raw([event.command for event in res["started"]])
self.assertEqual(len(res["started"]), len(test["expectations"]), cmds)
cmds = decode_raw([event.command for event in events])
self.assertEqual(len(events), len(test["expectations"]), cmds)
for i, expectation in enumerate(test["expectations"]):
event_type = next(iter(expectation))
event = res["started"][i]
event = events[i]
# The tests substitute 42 for any number other than 0.
if event.command_name == "getMore" and event.command["getMore"]: