PYTHON-1911 Implement missing changeStream prose tests
This commit is contained in:
parent
d0423d2d53
commit
e31a0ef95f
@ -29,6 +29,7 @@ try:
|
||||
except ImportError:
|
||||
HAVE_IPADDRESS = False
|
||||
|
||||
from contextlib import contextmanager
|
||||
from functools import wraps
|
||||
from unittest import SkipTest
|
||||
|
||||
@ -585,6 +586,13 @@ class ClientContext(object):
|
||||
"Test commands must be enabled",
|
||||
func=func)
|
||||
|
||||
def require_failCommand_fail_point(self, func):
|
||||
"""Run a test only if the server supports the failCommand fail
|
||||
point."""
|
||||
return self._require(lambda: self.supports_failCommand_fail_point,
|
||||
"failCommand fail point must be supported",
|
||||
func=func)
|
||||
|
||||
def require_ssl(self, func):
|
||||
"""Run a test only if the client can connect over SSL."""
|
||||
return self._require(lambda: self.ssl,
|
||||
@ -656,6 +664,17 @@ class ClientContext(object):
|
||||
"""Does the connected server support getpreverror?"""
|
||||
return not (self.version.at_least(4, 1, 0) or self.is_mongos)
|
||||
|
||||
@property
|
||||
def supports_failCommand_fail_point(self):
|
||||
"""Does the server support the failCommand fail point?"""
|
||||
if self.is_mongos:
|
||||
return (self.version.at_least(4, 1, 5) and
|
||||
self.test_commands_enabled)
|
||||
else:
|
||||
return (self.version.at_least(4, 0) and
|
||||
self.test_commands_enabled)
|
||||
|
||||
|
||||
@property
|
||||
def requires_hint_with_min_max_queries(self):
|
||||
"""Does the server require a hint with min/max queries."""
|
||||
@ -713,6 +732,17 @@ class IntegrationTest(PyMongoTestCase):
|
||||
else:
|
||||
cls.credentials = {}
|
||||
|
||||
@contextmanager
|
||||
def fail_point(self, command_args):
|
||||
cmd_on = SON([('configureFailPoint', 'failCommand')])
|
||||
cmd_on.update(command_args)
|
||||
self.client.admin.command(cmd_on)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
cmd_off = {'configureFailPoint': cmd_on['configureFailPoint'],
|
||||
'mode': 'off'}
|
||||
self.client.admin.command(cmd_off)
|
||||
|
||||
# Use assertRaisesRegex if available, otherwise use Python 2.7's
|
||||
# deprecated assertRaisesRegexp, with a 'p'.
|
||||
|
||||
@ -23,6 +23,7 @@ import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from contextlib import contextmanager
|
||||
from itertools import product
|
||||
|
||||
sys.path[0:0] = ['']
|
||||
@ -535,26 +536,65 @@ class ProseSpecTestsMixin(object):
|
||||
self.kill_change_stream_cursor(change_stream)
|
||||
self.insert_one_and_check(change_stream, {'_id': 2})
|
||||
|
||||
# Prose test no. 4
|
||||
@client_context.require_failCommand_fail_point
|
||||
def test_no_resume_attempt_if_aggregate_command_fails(self):
|
||||
# Set non-retryable error on aggregate command.
|
||||
fail_point = {'mode': {'times': 1},
|
||||
'data': {'errorCode': 2, 'failCommands': ['aggregate']}}
|
||||
client, listener = self._client_with_listener("aggregate", "getMore")
|
||||
with self.fail_point(fail_point):
|
||||
try:
|
||||
_ = self.change_stream_with_client(client)
|
||||
except OperationFailure:
|
||||
pass
|
||||
|
||||
# Driver should have attempted aggregate command only once.
|
||||
self.assertEqual(len(listener.results['started']), 1)
|
||||
self.assertEqual(listener.results['started'][0].command_name,
|
||||
'aggregate')
|
||||
|
||||
# Prose test no. 5
|
||||
def test_does_not_resume_fatal_errors(self):
|
||||
"""ChangeStream will not attempt to resume fatal server errors."""
|
||||
for code in _NON_RESUMABLE_GETMORE_ERRORS:
|
||||
with self.change_stream() as change_stream:
|
||||
self.watched_collection().insert_one({})
|
||||
|
||||
if client_context.supports_failCommand_fail_point:
|
||||
# failCommand does not support returning no errorCode.
|
||||
TEST_ERROR_CODES = _NON_RESUMABLE_GETMORE_ERRORS - {None}
|
||||
@contextmanager
|
||||
def generate_error(change_stream, code):
|
||||
fail_point = {'mode': {'times': 1}, 'data': {
|
||||
'errorCode': code, 'failCommands': ['getMore']}}
|
||||
with self.fail_point(fail_point):
|
||||
yield
|
||||
else:
|
||||
TEST_ERROR_CODES = _NON_RESUMABLE_GETMORE_ERRORS
|
||||
@contextmanager
|
||||
def generate_error(change_stream, code):
|
||||
def mock_try_next(*args, **kwargs):
|
||||
change_stream._cursor.close()
|
||||
raise OperationFailure('Mock server error', code=code)
|
||||
|
||||
original_try_next = change_stream._cursor._try_next
|
||||
change_stream._cursor._try_next = mock_try_next
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
change_stream._cursor._try_next = original_try_next
|
||||
|
||||
with self.assertRaises(OperationFailure):
|
||||
next(change_stream)
|
||||
change_stream._cursor._try_next = original_try_next
|
||||
for code in TEST_ERROR_CODES:
|
||||
with self.change_stream() as change_stream:
|
||||
self.watched_collection().insert_one({})
|
||||
with generate_error(change_stream, code):
|
||||
with self.assertRaises(OperationFailure):
|
||||
next(change_stream)
|
||||
with self.assertRaises(StopIteration):
|
||||
next(change_stream)
|
||||
|
||||
# Prose test no. 6 - SKIPPED
|
||||
# readPreference is not configurable using the watch() helpers so we can
|
||||
# skip this test. Also, PyMongo performs server selection for each
|
||||
# operation which ensure compliance with this prose test.
|
||||
|
||||
# Prose test no. 7
|
||||
def test_initial_empty_batch(self):
|
||||
with self.change_stream() as change_stream:
|
||||
@ -603,6 +643,9 @@ class ProseSpecTestsMixin(object):
|
||||
"startAtOperationTime"), optime, str([k.command for k in
|
||||
listener.results['started']]))
|
||||
|
||||
# Prose test no. 10 - SKIPPED
|
||||
# This test is identical to prose test no. 3.
|
||||
|
||||
# Prose test no. 11
|
||||
@client_context.require_version_min(4, 0, 7)
|
||||
def test_resumetoken_empty_batch(self):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user