PYTHON-5120 Reduce configureFailPoint duplication in tests (#2131)

This commit is contained in:
Shane Harvey 2025-02-26 11:16:44 -08:00 committed by GitHub
parent eaae22c63b
commit 2b667df14f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 34 additions and 62 deletions

View File

@ -933,17 +933,22 @@ class PyMongoTestCase(unittest.TestCase):
def assertEqualReply(self, expected, actual, msg=None):
self.assertEqual(sanitize_reply(expected), sanitize_reply(actual), msg)
@staticmethod
def configure_fail_point(client, command_args, off=False):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
client.admin.command(cmd)
@contextmanager
def fail_point(self, command_args):
cmd_on = SON([("configureFailPoint", "failCommand")])
cmd_on.update(command_args)
client_context.client.admin.command(cmd_on)
self.configure_fail_point(client_context.client, command_args)
try:
yield
finally:
client_context.client.admin.command(
"configureFailPoint", cmd_on["configureFailPoint"], mode="off"
)
self.configure_fail_point(client_context.client, command_args, off=True)
@contextmanager
def fork(

View File

@ -935,17 +935,22 @@ class AsyncPyMongoTestCase(unittest.TestCase):
def assertEqualReply(self, expected, actual, msg=None):
self.assertEqual(sanitize_reply(expected), sanitize_reply(actual), msg)
@staticmethod
async def configure_fail_point(client, command_args, off=False):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
await client.admin.command(cmd)
@asynccontextmanager
async def fail_point(self, command_args):
cmd_on = SON([("configureFailPoint", "failCommand")])
cmd_on.update(command_args)
await async_client_context.client.admin.command(cmd_on)
await self.configure_fail_point(async_client_context.client, command_args)
try:
yield
finally:
await async_client_context.client.admin.command(
"configureFailPoint", cmd_on["configureFailPoint"], mode="off"
)
await self.configure_fail_point(async_client_context.client, command_args, off=True)
@contextmanager
def fork(

View File

@ -211,15 +211,10 @@ class AsyncTestCMAP(AsyncIntegrationTest):
self.check_object(actual, expected)
self.assertIn(message, str(actual))
async def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
await client.admin.command(cmd)
async def set_fail_point(self, command_args):
if not async_client_context.supports_failCommand_fail_point:
self.skipTest("failCommand fail point must be supported")
await self._set_fail_point(self.client, command_args)
await self.configure_fail_point(self.client, command_args)
async def run_scenario(self, scenario_def, test):
"""Run a CMAP spec test."""

View File

@ -410,15 +410,10 @@ class TestTransactionsConvenientAPI(AsyncTransactionsBase):
for address in async_client_context.mongoses:
self.mongos_clients.append(await self.async_single_client("{}:{}".format(*address)))
async def _set_fail_point(self, client, command_args):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
await client.admin.command(cmd)
async def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
await self._set_fail_point(client, command_args)
await self.configure_fail_point(client, command_args)
@async_client_context.require_transactions
async def test_callback_raises_custom_error(self):

View File

@ -1008,12 +1008,8 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
if not async_client_context.test_commands_enabled:
self.skipTest("Test commands must be enabled")
cmd_on = SON([("configureFailPoint", "failCommand")])
cmd_on.update(command_args)
await client.admin.command(cmd_on)
self.addAsyncCleanup(
client.admin.command, "configureFailPoint", cmd_on["configureFailPoint"], mode="off"
)
await self.configure_fail_point(client, command_args)
self.addAsyncCleanup(self.configure_fail_point, client, command_args, off=True)
async def _testOperation_failPoint(self, spec):
await self.__set_fail_point(

View File

@ -264,15 +264,10 @@ class AsyncSpecRunner(AsyncIntegrationTest):
async def asyncTearDown(self) -> None:
self.knobs.disable()
async def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
await client.admin.command(cmd)
async def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
await self._set_fail_point(client, command_args)
await self.configure_fail_point(client, command_args)
async def targeted_fail_point(self, session, fail_point):
"""Run the targetedFailPoint test operation.
@ -281,7 +276,7 @@ class AsyncSpecRunner(AsyncIntegrationTest):
"""
clients = {c.address: c for c in self.mongos_clients}
client = clients[session._pinned_address]
await self._set_fail_point(client, fail_point)
await self.configure_fail_point(client, fail_point)
self.addAsyncCleanup(self.set_fail_point, {"mode": "off"})
def assert_session_pinned(self, session):

View File

@ -211,15 +211,10 @@ class TestCMAP(IntegrationTest):
self.check_object(actual, expected)
self.assertIn(message, str(actual))
def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
client.admin.command(cmd)
def set_fail_point(self, command_args):
if not client_context.supports_failCommand_fail_point:
self.skipTest("failCommand fail point must be supported")
self._set_fail_point(self.client, command_args)
self.configure_fail_point(self.client, command_args)
def run_scenario(self, scenario_def, test):
"""Run a CMAP spec test."""

View File

@ -402,15 +402,10 @@ class TestTransactionsConvenientAPI(TransactionsBase):
for address in client_context.mongoses:
self.mongos_clients.append(self.single_client("{}:{}".format(*address)))
def _set_fail_point(self, client, command_args):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
client.admin.command(cmd)
def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
self._set_fail_point(client, command_args)
self.configure_fail_point(client, command_args)
@client_context.require_transactions
def test_callback_raises_custom_error(self):

View File

@ -999,12 +999,8 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
if not client_context.test_commands_enabled:
self.skipTest("Test commands must be enabled")
cmd_on = SON([("configureFailPoint", "failCommand")])
cmd_on.update(command_args)
client.admin.command(cmd_on)
self.addCleanup(
client.admin.command, "configureFailPoint", cmd_on["configureFailPoint"], mode="off"
)
self.configure_fail_point(client, command_args)
self.addCleanup(self.configure_fail_point, client, command_args, off=True)
def _testOperation_failPoint(self, spec):
self.__set_fail_point(

View File

@ -264,15 +264,10 @@ class SpecRunner(IntegrationTest):
def tearDown(self) -> None:
self.knobs.disable()
def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
client.admin.command(cmd)
def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
self._set_fail_point(client, command_args)
self.configure_fail_point(client, command_args)
def targeted_fail_point(self, session, fail_point):
"""Run the targetedFailPoint test operation.
@ -281,7 +276,7 @@ class SpecRunner(IntegrationTest):
"""
clients = {c.address: c for c in self.mongos_clients}
client = clients[session._pinned_address]
self._set_fail_point(client, fail_point)
self.configure_fail_point(client, fail_point)
self.addCleanup(self.set_fail_point, {"mode": "off"})
def assert_session_pinned(self, session):