From e31a0ef95f8cc397eb7d0b4bc83916817acc0a53 Mon Sep 17 00:00:00 2001 From: Prashant Mital Date: Mon, 18 Nov 2019 14:47:28 -0800 Subject: [PATCH] PYTHON-1911 Implement missing changeStream prose tests --- test/__init__.py | 30 ++++++++++++++++++++ test/test_change_stream.py | 57 +++++++++++++++++++++++++++++++++----- 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index 463a05b75..603f158ff 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -29,6 +29,7 @@ try: except ImportError: HAVE_IPADDRESS = False +from contextlib import contextmanager from functools import wraps from unittest import SkipTest @@ -585,6 +586,13 @@ class ClientContext(object): "Test commands must be enabled", func=func) + def require_failCommand_fail_point(self, func): + """Run a test only if the server supports the failCommand fail + point.""" + return self._require(lambda: self.supports_failCommand_fail_point, + "failCommand fail point must be supported", + func=func) + def require_ssl(self, func): """Run a test only if the client can connect over SSL.""" return self._require(lambda: self.ssl, @@ -656,6 +664,17 @@ class ClientContext(object): """Does the connected server support getpreverror?""" return not (self.version.at_least(4, 1, 0) or self.is_mongos) + @property + def supports_failCommand_fail_point(self): + """Does the server support the failCommand fail point?""" + if self.is_mongos: + return (self.version.at_least(4, 1, 5) and + self.test_commands_enabled) + else: + return (self.version.at_least(4, 0) and + self.test_commands_enabled) + + @property def requires_hint_with_min_max_queries(self): """Does the server require a hint with min/max queries.""" @@ -713,6 +732,17 @@ class IntegrationTest(PyMongoTestCase): else: cls.credentials = {} + @contextmanager + def fail_point(self, command_args): + cmd_on = SON([('configureFailPoint', 'failCommand')]) + cmd_on.update(command_args) + self.client.admin.command(cmd_on) + try: + yield + finally: + cmd_off = {'configureFailPoint': cmd_on['configureFailPoint'], + 'mode': 'off'} + self.client.admin.command(cmd_off) # Use assertRaisesRegex if available, otherwise use Python 2.7's # deprecated assertRaisesRegexp, with a 'p'. diff --git a/test/test_change_stream.py b/test/test_change_stream.py index ced8af8cc..97b67fb69 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -23,6 +23,7 @@ import threading import time import uuid +from contextlib import contextmanager from itertools import product sys.path[0:0] = [''] @@ -535,26 +536,65 @@ class ProseSpecTestsMixin(object): self.kill_change_stream_cursor(change_stream) self.insert_one_and_check(change_stream, {'_id': 2}) + # Prose test no. 4 + @client_context.require_failCommand_fail_point + def test_no_resume_attempt_if_aggregate_command_fails(self): + # Set non-retryable error on aggregate command. + fail_point = {'mode': {'times': 1}, + 'data': {'errorCode': 2, 'failCommands': ['aggregate']}} + client, listener = self._client_with_listener("aggregate", "getMore") + with self.fail_point(fail_point): + try: + _ = self.change_stream_with_client(client) + except OperationFailure: + pass + + # Driver should have attempted aggregate command only once. + self.assertEqual(len(listener.results['started']), 1) + self.assertEqual(listener.results['started'][0].command_name, + 'aggregate') + # Prose test no. 5 def test_does_not_resume_fatal_errors(self): """ChangeStream will not attempt to resume fatal server errors.""" - for code in _NON_RESUMABLE_GETMORE_ERRORS: - with self.change_stream() as change_stream: - self.watched_collection().insert_one({}) - + if client_context.supports_failCommand_fail_point: + # failCommand does not support returning no errorCode. + TEST_ERROR_CODES = _NON_RESUMABLE_GETMORE_ERRORS - {None} + @contextmanager + def generate_error(change_stream, code): + fail_point = {'mode': {'times': 1}, 'data': { + 'errorCode': code, 'failCommands': ['getMore']}} + with self.fail_point(fail_point): + yield + else: + TEST_ERROR_CODES = _NON_RESUMABLE_GETMORE_ERRORS + @contextmanager + def generate_error(change_stream, code): def mock_try_next(*args, **kwargs): change_stream._cursor.close() raise OperationFailure('Mock server error', code=code) original_try_next = change_stream._cursor._try_next change_stream._cursor._try_next = mock_try_next + try: + yield + finally: + change_stream._cursor._try_next = original_try_next - with self.assertRaises(OperationFailure): - next(change_stream) - change_stream._cursor._try_next = original_try_next + for code in TEST_ERROR_CODES: + with self.change_stream() as change_stream: + self.watched_collection().insert_one({}) + with generate_error(change_stream, code): + with self.assertRaises(OperationFailure): + next(change_stream) with self.assertRaises(StopIteration): next(change_stream) + # Prose test no. 6 - SKIPPED + # readPreference is not configurable using the watch() helpers so we can + # skip this test. Also, PyMongo performs server selection for each + # operation which ensure compliance with this prose test. + # Prose test no. 7 def test_initial_empty_batch(self): with self.change_stream() as change_stream: @@ -603,6 +643,9 @@ class ProseSpecTestsMixin(object): "startAtOperationTime"), optime, str([k.command for k in listener.results['started']])) + # Prose test no. 10 - SKIPPED + # This test is identical to prose test no. 3. + # Prose test no. 11 @client_context.require_version_min(4, 0, 7) def test_resumetoken_empty_batch(self):