From 9fc7ed1e11a8019595cc082f517328c8bec29b04 Mon Sep 17 00:00:00 2001 From: Prashant Mital Date: Thu, 25 Jun 2020 10:38:12 -0700 Subject: [PATCH 1/8] PYTHON-2143 Use an allow-list to determine resumable change stream errors --- pymongo/change_stream.py | 34 ++++- pymongo/errors.py | 14 +- pymongo/helpers.py | 21 ++- pymongo/server.py | 3 +- .../change_streams/change-streams-errors.json | 53 ++++++- test/test_change_stream.py | 143 ++++++++---------- 6 files changed, 170 insertions(+), 98 deletions(-) diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index f026dd7f5..b86dca541 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -25,6 +25,7 @@ from pymongo.aggregation import (_CollectionAggregationCommand, from pymongo.collation import validate_collation_or_none from pymongo.command_cursor import CommandCursor from pymongo.errors import (ConnectionFailure, + CursorNotFound, InvalidOperation, OperationFailure, PyMongoError) @@ -32,11 +33,25 @@ from pymongo.errors import (ConnectionFailure, # The change streams spec considers the following server errors from the # getMore command non-resumable. All other getMore errors are resumable. -_NON_RESUMABLE_GETMORE_ERRORS = frozenset([ - 11601, # Interrupted - 136, # CappedPositionLost - 237, # CursorKilled - None, # No error code was returned. +_RESUMABLE_GETMORE_ERRORS = frozenset([ + 6, # HostUnreachable + 7, # HostNotFound + 89, # NetworkTimeout + 91, # ShutdownInProgress + 189, # PrimarySteppedDown + 262, # ExceededTimeLimit + 9001, # SocketException + 10107, # NotMaster + 11600, # InterruptedAtShutdown + 11602, # InterruptedDueToReplStateChange + 13435, # NotMasterNoSlaveOk + 13436, # NotMasterOrSecondary + 63, # StaleShardVersion + 150, # StaleEpoch + 13388, # StaleConfig + 234, # RetryChangeStream + 133, # FailedToSatisfyReadPreference + 216, # ElectionInProgress ]) @@ -283,12 +298,15 @@ class ChangeStream(object): # one resume attempt. try: change = self._cursor._try_next(True) - except ConnectionFailure: + except (ConnectionFailure, CursorNotFound): self._resume() change = self._cursor._try_next(False) except OperationFailure as exc: - if (exc.code in _NON_RESUMABLE_GETMORE_ERRORS or - exc.has_error_label("NonResumableChangeStreamError")): + is_resumable = ((exc.max_wire_version >= 9 and + exc.has_error_label("ResumableChangeStreamError")) or + (exc.max_wire_version < 9 and + exc.code in _RESUMABLE_GETMORE_ERRORS)) + if not is_resumable: raise self._resume() change = self._cursor._try_next(False) diff --git a/pymongo/errors.py b/pymongo/errors.py index a309a9e7a..bf31aeba1 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -140,11 +140,13 @@ class ConfigurationError(PyMongoError): class OperationFailure(PyMongoError): """Raised when a database operation fails. + .. versionadded:: 3.11 + The :attr:`max_wire_version` attribute. .. versionadded:: 2.7 The :attr:`details` attribute. """ - def __init__(self, error, code=None, details=None): + def __init__(self, error, code=None, details=None, max_wire_version=None): error_labels = None if details is not None: error_labels = details.get('errorLabels') @@ -152,6 +154,7 @@ class OperationFailure(PyMongoError): error, error_labels=error_labels) self.__code = code self.__details = details + self.__max_wire_version = max_wire_version @property def code(self): @@ -171,6 +174,15 @@ class OperationFailure(PyMongoError): """ return self.__details + @property + def max_wire_version(self): + """The latest version of the wire protocol supported by the socket + that was used to run the operation that raised this exception. + + PyMongo does not always record this value and it may be None. + """ + return self.__max_wire_version + def __str__(self): output_str = "%s, full error: %s" % (self._message, self.__details) if sys.version_info[0] == 2 and isinstance(output_str, unicode): diff --git a/pymongo/helpers.py b/pymongo/helpers.py index 51215b4c4..0cd2b00b8 100644 --- a/pymongo/helpers.py +++ b/pymongo/helpers.py @@ -103,14 +103,16 @@ def _index_document(index_list): def _check_command_response(response, msg=None, allowable_errors=None, - parse_write_concern_error=False): + parse_write_concern_error=False, + max_wire_version=None): """Check the response to a command for errors. """ if "ok" not in response: # Server didn't recognize our message as a command. raise OperationFailure(response.get("$err"), response.get("code"), - response) + response, + max_wire_version) if parse_write_concern_error and 'writeConcernError' in response: _raise_write_concern_error(response['writeConcernError']) @@ -146,19 +148,24 @@ def _check_command_response(response, msg=None, allowable_errors=None, details.get("assertion", "")) raise OperationFailure(errmsg, details.get("assertionCode"), - response) + response, + max_wire_version) # Other errors # findAndModify with upsert can raise duplicate key error if code in (11000, 11001, 12582): - raise DuplicateKeyError(errmsg, code, response) + raise DuplicateKeyError(errmsg, code, response, + max_wire_version) elif code == 50: - raise ExecutionTimeout(errmsg, code, response) + raise ExecutionTimeout(errmsg, code, response, + max_wire_version) elif code == 43: - raise CursorNotFound(errmsg, code, response) + raise CursorNotFound(errmsg, code, response, + max_wire_version) msg = msg or "%s" - raise OperationFailure(msg % errmsg, code, response) + raise OperationFailure(msg % errmsg, code, response, + max_wire_version) def _check_gle_response(result): diff --git a/pymongo/server.py b/pymongo/server.py index 18919b9e2..a45b1eefd 100644 --- a/pymongo/server.py +++ b/pymongo/server.py @@ -133,7 +133,8 @@ class Server(object): first = docs[0] operation.client._process_response( first, operation.session) - _check_command_response(first) + _check_command_response( + first, max_wire_version=sock_info.max_wire_version) except Exception as exc: if publish: duration = datetime.now() - start diff --git a/test/change_streams/change-streams-errors.json b/test/change_streams/change-streams-errors.json index 5ebbd28f4..7b7cea30a 100644 --- a/test/change_streams/change-streams-errors.json +++ b/test/change_streams/change-streams-errors.json @@ -75,7 +75,6 @@ { "description": "Change Stream should error when _id is projected out", "minServerVersion": "4.1.11", - "maxServerVersion": "4.3.3", "target": "collection", "topology": [ "replicaset", @@ -103,10 +102,54 @@ ], "result": { "error": { - "code": 280, - "errorLabels": [ - "NonResumableChangeStreamError" - ] + "code": 280 + } + } + }, + { + "description": "change stream errors on MaxTimeMSExpired", + "minServerVersion": "4.2", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "getMore" + ], + "errorCode": 50, + "closeConnection": false + } + }, + "target": "collection", + "topology": [ + "replicaset", + "sharded" + ], + "changeStreamPipeline": [ + { + "$project": { + "_id": 0 + } + } + ], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "result": { + "error": { + "code": 50 } } } diff --git a/test/test_change_stream.py b/test/test_change_stream.py index 669c774a3..f050d7c56 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -37,7 +37,7 @@ from bson.py3compat import iteritems from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument from pymongo import MongoClient -from pymongo.change_stream import _NON_RESUMABLE_GETMORE_ERRORS +# from pymongo.change_stream import _NON_RESUMABLE_GETMORE_ERRORS from pymongo.command_cursor import CommandCursor from pymongo.errors import (InvalidOperation, OperationFailure, ServerSelectionTimeoutError) @@ -555,47 +555,11 @@ class ProseSpecTestsMixin(object): self.assertEqual(listener.results['started'][0].command_name, 'aggregate') - # Prose test no. 5 - def test_does_not_resume_fatal_errors(self): - """ChangeStream will not attempt to resume fatal server errors.""" - if client_context.supports_failCommand_fail_point: - # failCommand does not support returning no errorCode. - TEST_ERROR_CODES = _NON_RESUMABLE_GETMORE_ERRORS - {None} - @contextmanager - def generate_error(change_stream, code): - fail_point = {'mode': {'times': 1}, 'data': { - 'errorCode': code, 'failCommands': ['getMore']}} - with self.fail_point(fail_point): - yield - else: - TEST_ERROR_CODES = _NON_RESUMABLE_GETMORE_ERRORS - @contextmanager - def generate_error(change_stream, code): - def mock_try_next(*args, **kwargs): - change_stream._cursor.close() - raise OperationFailure('Mock server error', code=code) - - original_cursor = change_stream._cursor - change_stream._cursor._try_next = mock_try_next - try: - yield - finally: - # Un patch the instance. - del original_cursor._try_next - - for code in TEST_ERROR_CODES: - with self.change_stream() as change_stream: - self.watched_collection().insert_one({}) - with generate_error(change_stream, code): - with self.assertRaises(OperationFailure): - next(change_stream) - with self.assertRaises(StopIteration): - next(change_stream) - + # Prose test no. 5 - REMOVED # Prose test no. 6 - SKIPPED - # readPreference is not configurable using the watch() helpers so we can - # skip this test. Also, PyMongo performs server selection for each - # operation which ensure compliance with this prose test. + # Reason: readPreference is not configurable using the watch() helpers + # so we can skip this test. Also, PyMongo performs server selection for + # each operation which ensure compliance with this prose test. # Prose test no. 7 def test_initial_empty_batch(self): @@ -1075,7 +1039,7 @@ class TestAllScenarios(unittest.TestCase): @classmethod @client_context.require_connection def setUpClass(cls): - cls.listener = WhiteListEventListener("aggregate") + cls.listener = WhiteListEventListener("aggregate", "getMore") cls.client = rs_or_single_client(event_listeners=[cls.listener]) @classmethod @@ -1086,14 +1050,66 @@ class TestAllScenarios(unittest.TestCase): self.listener.results.clear() def setUpCluster(self, scenario_dict): - assets = [ - (scenario_dict["database_name"], scenario_dict["collection_name"]), - (scenario_dict["database2_name"], scenario_dict["collection2_name"]), - ] + assets = [(scenario_dict["database_name"], + scenario_dict["collection_name"]), + (scenario_dict.get("database2_name", "db2"), + scenario_dict.get("collection2_name", "coll2"))] for db, coll in assets: self.client.drop_database(db) self.client[db].create_collection(coll) + def setFailPoint(self, scenario_dict): + fail_point = scenario_dict.get("failPoint") + if fail_point is None: + return + + fail_cmd = SON([('configureFailPoint', 'failCommand')]) + fail_cmd.update(fail_point) + client_context.client.admin.command(fail_cmd) + self.addCleanup( + client_context.client.admin.command, + 'configureFailPoint', fail_cmd['configureFailPoint'], mode='off') + + def assert_list_contents_are_subset(self, superlist, sublist): + """Check that each element in sublist is a subset of the corresponding + element in superlist.""" + self.assertEqual(len(superlist), len(sublist)) + for sup, sub in zip(superlist, sublist): + if isinstance(sub, dict): + self.assert_dict_is_subset(sup, sub) + continue + if isinstance(sub, (list, tuple)): + self.assert_list_contents_are_subset(sup, sub) + continue + self.assertEqual(sup, sub) + + def assert_dict_is_subset(self, superdict, subdict): + """Check that subdict is a subset of superdict.""" + exempt_fields = ["documentKey", "_id", "getMore"] + for key, value in iteritems(subdict): + if key not in superdict: + self.fail('Key %s not found in %s' % (key, superdict)) + if isinstance(value, dict): + self.assert_dict_is_subset(superdict[key], value) + continue + if isinstance(value, (list, tuple)): + self.assert_list_contents_are_subset(superdict[key], value) + continue + if key in exempt_fields: + # Only check for presence of these exempt fields, but not value. + self.assertIn(key, superdict) + else: + self.assertEqual(superdict[key], value) + + def check_event(self, event, expectation_dict): + if event is None: + self.fail() + for key, value in iteritems(expectation_dict): + if isinstance(value, dict): + self.assert_dict_is_subset(getattr(event, key), value) + else: + self.assertEqual(getattr(event, key), value) + def tearDown(self): self.listener.results.clear() @@ -1147,36 +1163,11 @@ def run_operation(client, operation): return cmd(**arguments) -def assert_dict_is_subset(superdict, subdict): - """Check that subdict is a subset of superdict.""" - exempt_fields = ["documentKey", "_id"] - for key, value in iteritems(subdict): - if key not in superdict: - assert False - if isinstance(value, dict): - assert_dict_is_subset(superdict[key], value) - continue - if key in exempt_fields: - superdict[key] = "42" - assert superdict[key] == value - - -def check_event(event, expectation_dict): - if event is None: - raise AssertionError - for key, value in iteritems(expectation_dict): - if isinstance(value, dict): - assert_dict_is_subset( - getattr(event, key), value - ) - else: - assert getattr(event, key) == value - - def create_test(scenario_def, test): def run_scenario(self): # Set up self.setUpCluster(scenario_def) + self.setFailPoint(test) is_error = test["result"].get("error", False) try: with get_change_stream( @@ -1202,17 +1193,17 @@ def create_test(scenario_def, test): else: # Check for expected output from change streams for change, expected_changes in zip(changes, test["result"]["success"]): - assert_dict_is_subset(change, expected_changes) + self.assert_dict_is_subset(change, expected_changes) self.assertEqual(len(changes), len(test["result"]["success"])) finally: # Check for expected events results = self.listener.results - for expectation in test.get("expectations", []): - for idx, (event_type, event_desc) in enumerate(iteritems(expectation)): + for idx, expectation in enumerate(test.get("expectations", [])): + for event_type, event_desc in iteritems(expectation): results_key = event_type.split("_")[1] event = results[results_key][idx] if len(results[results_key]) > idx else None - check_event(event, event_desc) + self.check_event(event, event_desc) return run_scenario From 956ce3d4b0edfa9c1d946109db743f82ed0bfc0a Mon Sep 17 00:00:00 2001 From: Prashant Mital Date: Mon, 29 Jun 2020 19:53:21 -0700 Subject: [PATCH 2/8] Incorporate review changes --- pymongo/change_stream.py | 4 ++-- pymongo/errors.py | 5 +++++ pymongo/helpers.py | 10 +++++----- pymongo/network.py | 4 ++-- pymongo/pool.py | 15 ++++++++------- pymongo/server.py | 2 +- test/test_change_stream.py | 1 - test/test_database.py | 10 +++++----- test/test_discovery_and_monitoring.py | 2 +- 9 files changed, 29 insertions(+), 24 deletions(-) diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index b86dca541..750931567 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -302,9 +302,9 @@ class ChangeStream(object): self._resume() change = self._cursor._try_next(False) except OperationFailure as exc: - is_resumable = ((exc.max_wire_version >= 9 and + is_resumable = ((exc._max_wire_version >= 9 and exc.has_error_label("ResumableChangeStreamError")) or - (exc.max_wire_version < 9 and + (exc._max_wire_version < 9 and exc.code in _RESUMABLE_GETMORE_ERRORS)) if not is_resumable: raise diff --git a/pymongo/errors.py b/pymongo/errors.py index bf31aeba1..80e48692a 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -156,6 +156,10 @@ class OperationFailure(PyMongoError): self.__details = details self.__max_wire_version = max_wire_version + @property + def _max_wire_version(self): + return self.__max_wire_version + @property def code(self): """The error code returned by the server, if any. @@ -189,6 +193,7 @@ class OperationFailure(PyMongoError): return output_str.encode('utf-8', errors='replace') return output_str + class CursorNotFound(OperationFailure): """Raised while iterating query results if the cursor is invalidated on the server. diff --git a/pymongo/helpers.py b/pymongo/helpers.py index 0cd2b00b8..67b2e1584 100644 --- a/pymongo/helpers.py +++ b/pymongo/helpers.py @@ -102,9 +102,9 @@ def _index_document(index_list): return index -def _check_command_response(response, msg=None, allowable_errors=None, - parse_write_concern_error=False, - max_wire_version=None): +def _check_command_response(response, max_wire_version, msg=None, + allowable_errors=None, + parse_write_concern_error=False): """Check the response to a command for errors. """ if "ok" not in response: @@ -168,10 +168,10 @@ def _check_command_response(response, msg=None, allowable_errors=None, max_wire_version) -def _check_gle_response(result): +def _check_gle_response(result, max_wire_version): """Return getlasterror response as a dict, or raise OperationFailure.""" # Did getlasterror itself fail? - _check_command_response(result) + _check_command_response(result, max_wire_version) if result.get("wtimeout", False): # MongoDB versions before 1.8.0 return the error message in an "errmsg" diff --git a/pymongo/network.py b/pymongo/network.py index 3224cf649..759872ef1 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -40,7 +40,7 @@ from pymongo.socket_checker import _errno_from_exception _UNPACK_HEADER = struct.Struct(" Date: Wed, 1 Jul 2020 16:25:09 -0700 Subject: [PATCH 3/8] cleanup cruft --- pymongo/errors.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pymongo/errors.py b/pymongo/errors.py index 80e48692a..aaf51e0bc 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -140,8 +140,6 @@ class ConfigurationError(PyMongoError): class OperationFailure(PyMongoError): """Raised when a database operation fails. - .. versionadded:: 3.11 - The :attr:`max_wire_version` attribute. .. versionadded:: 2.7 The :attr:`details` attribute. """ From dd23624100842bafee1b62d4f59bdfae89803423 Mon Sep 17 00:00:00 2001 From: Prashant Mital Date: Wed, 1 Jul 2020 16:37:39 -0700 Subject: [PATCH 4/8] handle None case --- pymongo/change_stream.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index 750931567..08b2043de 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -302,6 +302,8 @@ class ChangeStream(object): self._resume() change = self._cursor._try_next(False) except OperationFailure as exc: + if exc._max_wire_version is None: + raise is_resumable = ((exc._max_wire_version >= 9 and exc.has_error_label("ResumableChangeStreamError")) or (exc._max_wire_version < 9 and From 8e3fd0040e63199315cc5578593e1248bdf5be50 Mon Sep 17 00:00:00 2001 From: Prashant Mital Date: Wed, 1 Jul 2020 16:49:36 -0700 Subject: [PATCH 5/8] cruft removal 2 --- pymongo/errors.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/pymongo/errors.py b/pymongo/errors.py index aaf51e0bc..7a6902be8 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -154,6 +154,12 @@ class OperationFailure(PyMongoError): self.__details = details self.__max_wire_version = max_wire_version + def __str__(self): + output_str = "%s, full error: %s" % (self._message, self.__details) + if sys.version_info[0] == 2 and isinstance(output_str, unicode): + return output_str.encode('utf-8', errors='replace') + return output_str + @property def _max_wire_version(self): return self.__max_wire_version @@ -176,21 +182,6 @@ class OperationFailure(PyMongoError): """ return self.__details - @property - def max_wire_version(self): - """The latest version of the wire protocol supported by the socket - that was used to run the operation that raised this exception. - - PyMongo does not always record this value and it may be None. - """ - return self.__max_wire_version - - def __str__(self): - output_str = "%s, full error: %s" % (self._message, self.__details) - if sys.version_info[0] == 2 and isinstance(output_str, unicode): - return output_str.encode('utf-8', errors='replace') - return output_str - class CursorNotFound(OperationFailure): """Raised while iterating query results if the cursor is From d97a43ea9b14a8456051b1b22babf424378e5fef Mon Sep 17 00:00:00 2001 From: Prashant Mital Date: Wed, 1 Jul 2020 16:58:56 -0700 Subject: [PATCH 6/8] cleanup --- pymongo/network.py | 5 +++-- pymongo/pool.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pymongo/network.py b/pymongo/network.py index 759872ef1..d9d645fa9 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -40,7 +40,7 @@ from pymongo.socket_checker import _errno_from_exception _UNPACK_HEADER = struct.Struct(" Date: Wed, 1 Jul 2020 17:24:41 -0700 Subject: [PATCH 7/8] remove unnecessary changes --- pymongo/errors.py | 12 ++++++------ pymongo/pool.py | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pymongo/errors.py b/pymongo/errors.py index 7a6902be8..e5d52bfe3 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -154,12 +154,6 @@ class OperationFailure(PyMongoError): self.__details = details self.__max_wire_version = max_wire_version - def __str__(self): - output_str = "%s, full error: %s" % (self._message, self.__details) - if sys.version_info[0] == 2 and isinstance(output_str, unicode): - return output_str.encode('utf-8', errors='replace') - return output_str - @property def _max_wire_version(self): return self.__max_wire_version @@ -182,6 +176,12 @@ class OperationFailure(PyMongoError): """ return self.__details + def __str__(self): + output_str = "%s, full error: %s" % (self._message, self.__details) + if sys.version_info[0] == 2 and isinstance(output_str, unicode): + return output_str.encode('utf-8', errors='replace') + return output_str + class CursorNotFound(OperationFailure): """Raised while iterating query results if the cursor is diff --git a/pymongo/pool.py b/pymongo/pool.py index 166800895..74965620e 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -680,11 +680,11 @@ class SocketInfo(object): if self.op_msg_enabled: self._raise_if_not_writable(unacknowledged) try: - return command(self, dbname, spec, - slave_ok, self.is_mongos, read_preference, - codec_options, session, client, check, - allowable_errors, self.address, check_keys, - listeners, self.max_bson_size, read_concern, + return command(self, dbname, spec, slave_ok, + self.is_mongos, read_preference, codec_options, + session, client, check, allowable_errors, + self.address, check_keys, listeners, + self.max_bson_size, read_concern, parse_write_concern_error=parse_write_concern_error, collation=collation, compression_ctx=self.compression_context, From 04926c6ccdd167a6498acaeeae3f8ce7bea5a7c0 Mon Sep 17 00:00:00 2001 From: Prashant Mital Date: Wed, 1 Jul 2020 17:52:35 -0700 Subject: [PATCH 8/8] fix monitor failure --- pymongo/pool.py | 2 +- test/test_discovery_and_monitoring.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pymongo/pool.py b/pymongo/pool.py index 74965620e..b04e4bd33 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -606,7 +606,7 @@ class SocketInfo(object): self.more_to_come = reply.more_to_come unpacked_docs = reply.unpack_response() response_doc = unpacked_docs[0] - helpers._check_command_response(response_doc) + helpers._check_command_response(response_doc, self.max_wire_version) return response_doc def command(self, dbname, spec, slave_ok=False, diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 27f385d6a..ef97bcc67 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -334,7 +334,8 @@ class TestIntegration(SpecRunner): Assert the given event was published exactly `count` times. """ - self.assertEqual(self._event_count(event), count) + self.assertEqual(self._event_count(event), count, + 'expected %s not %r' % (count, event)) def wait_for_event(self, event, count): """Run the waitForEvent test operation.