PYTHON-5784 Nit fixes

- Replace pe_module alias with periodic_executor
- Remove underscore from base test class name
- Alpha sort test_periodic_executor.py in synchro list
- Rename ex to executor in tests
- Rename r to executor_repr in repr test
- Replace boom with error in exception messages
This commit is contained in:
Jeffrey 'Alex' Clark 2026-04-24 19:29:05 -04:00
parent 5b259b2c91
commit 7400105aeb
3 changed files with 179 additions and 179 deletions

View File

@ -26,7 +26,7 @@ sys.path[0:0] = [""]
from test.asynchronous import AsyncUnitTest, unittest
import pymongo.periodic_executor as pe_module
from pymongo import periodic_executor
from pymongo.periodic_executor import (
AsyncPeriodicExecutor,
_register_executor,
@ -47,71 +47,71 @@ def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"):
)
class _AsyncPeriodicExecutorTestBase(AsyncUnitTest):
class AsyncPeriodicExecutorTestBase(AsyncUnitTest):
async def asyncSetUp(self):
self.ex = _make_executor()
self.executor = _make_executor()
async def asyncTearDown(self):
self.ex.close()
await self.ex.join(timeout=2)
self.executor.close()
await self.executor.join(timeout=2)
class TestAsyncPeriodicExecutorRepr(AsyncUnitTest):
async def test_repr_contains_class_and_name(self):
ex = _make_executor(name="exec")
r = repr(ex)
self.assertIn("AsyncPeriodicExecutor", r)
self.assertIn("exec", r)
executor = _make_executor(name="exec")
executor_repr = repr(executor)
self.assertIn("AsyncPeriodicExecutor", executor_repr)
self.assertIn("exec", executor_repr)
class TestAsyncPeriodicExecutorBasic(_AsyncPeriodicExecutorTestBase):
class TestAsyncPeriodicExecutorBasic(AsyncPeriodicExecutorTestBase):
async def test_wake_sets_event(self):
self.assertFalse(self.ex._event)
self.ex.wake()
self.assertTrue(self.ex._event)
self.assertFalse(self.executor._event)
self.executor.wake()
self.assertTrue(self.executor._event)
async def test_update_interval(self):
self.ex.update_interval(60)
self.assertEqual(self.ex._interval, 60)
self.executor.update_interval(60)
self.assertEqual(self.executor._interval, 60)
async def test_skip_sleep(self):
self.assertFalse(self.ex._skip_sleep)
self.ex.skip_sleep()
self.assertTrue(self.ex._skip_sleep)
self.assertFalse(self.executor._skip_sleep)
self.executor.skip_sleep()
self.assertTrue(self.executor._skip_sleep)
class TestAsyncPeriodicExecutorLifecycle(_AsyncPeriodicExecutorTestBase):
class TestAsyncPeriodicExecutorLifecycle(AsyncPeriodicExecutorTestBase):
async def test_open_starts_worker(self):
self.ex.open()
self.executor.open()
if _IS_SYNC:
self.assertIsNotNone(self.ex._thread)
self.assertTrue(self.ex._thread.is_alive())
self.assertIsNotNone(self.executor._thread)
self.assertTrue(self.executor._thread.is_alive())
else:
self.assertIsNotNone(self.ex._task)
self.assertIsNotNone(self.executor._task)
async def test_close_sets_stopped(self):
self.ex.open()
self.ex.close()
self.assertTrue(self.ex._stopped)
await self.ex.join(timeout=1)
self.executor.open()
self.executor.close()
self.assertTrue(self.executor._stopped)
await self.executor.join(timeout=1)
async def test_join_without_open_is_safe(self):
await self.ex.join(timeout=0.01)
await self.executor.join(timeout=0.01)
async def test_multiple_open_calls_have_no_effect(self):
self.ex.open()
self.executor.open()
if _IS_SYNC:
worker_id = id(self.ex._thread)
worker_id = id(self.executor._thread)
else:
worker_id = id(self.ex._task)
self.ex.open()
worker_id = id(self.executor._task)
self.executor.open()
if _IS_SYNC:
self.assertEqual(worker_id, id(self.ex._thread))
self.assertEqual(worker_id, id(self.executor._thread))
else:
self.assertEqual(worker_id, id(self.ex._task))
self.assertEqual(worker_id, id(self.executor._task))
class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase):
class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase):
async def test_target_returning_false_stops_executor(self):
if _IS_SYNC:
ran = threading.Event()
@ -122,14 +122,14 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase):
ran.set()
return False
self.ex = _make_executor(target=target)
self.ex.open()
self.executor = _make_executor(target=target)
self.executor.open()
if _IS_SYNC:
self.assertTrue(ran.wait(timeout=2), "target never ran")
else:
await asyncio.wait_for(ran.wait(), timeout=2)
await self.ex.join(timeout=2)
self.assertTrue(self.ex._stopped)
await self.executor.join(timeout=2)
self.assertTrue(self.executor._stopped)
async def test_target_exception_stops_executor(self):
if _IS_SYNC:
@ -145,15 +145,15 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase):
def target():
ran.set()
raise RuntimeError("boom")
raise RuntimeError("error")
self.ex = _make_executor(target=target)
self.ex.open()
self.executor = _make_executor(target=target)
self.executor.open()
self.assertTrue(ran.wait(timeout=2), "target never ran")
self.ex.join(timeout=2)
self.executor.join(timeout=2)
finally:
threading.excepthook = orig_excepthook
self.assertTrue(self.ex._stopped)
self.assertTrue(self.executor._stopped)
self.assertEqual(len(captured_exc), 1)
self.assertIsInstance(captured_exc[0], RuntimeError)
else:
@ -161,15 +161,15 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase):
async def target():
ran.set()
raise RuntimeError("async boom")
raise RuntimeError("error")
self.ex = _make_executor(target=target)
self.ex.open()
self.executor = _make_executor(target=target)
self.executor.open()
await asyncio.wait_for(ran.wait(), timeout=2)
await self.ex.join(timeout=2)
self.assertTrue(self.ex._stopped)
if self.ex._task is not None and self.ex._task.done():
self.ex._task.exception()
await self.executor.join(timeout=2)
self.assertTrue(self.executor._stopped)
if self.executor._task is not None and self.executor._task.done():
self.executor._task.exception()
async def test_skip_sleep_flag_skips_interval(self):
call_times = []
@ -180,10 +180,10 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase):
return False
return True
self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target)
self.ex.skip_sleep()
self.ex.open()
await self.ex.join(timeout=3)
self.executor = _make_executor(interval=30.0, min_interval=0.001, target=target)
self.executor.skip_sleep()
self.executor.open()
await self.executor.join(timeout=3)
self.assertGreaterEqual(len(call_times), 2)
self.assertLess(call_times[1] - call_times[0], 5.0)
@ -202,14 +202,14 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase):
return False
return True
self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target)
self.ex.open()
self.executor = _make_executor(interval=30.0, min_interval=0.01, target=target)
self.executor.open()
if _IS_SYNC:
woken.wait(timeout=2)
else:
await asyncio.wait_for(woken.wait(), timeout=2)
self.ex.wake()
await self.ex.join(timeout=3)
self.executor.wake()
await self.executor.join(timeout=3)
self.assertGreaterEqual(call_count[0], 2)
async def test_open_after_target_returns_false(self):
@ -219,53 +219,53 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase):
called[0] += 1
return False
self.ex = _make_executor(target=target)
self.ex.open()
await self.ex.join(timeout=2)
self.assertTrue(self.ex._stopped)
self.executor = _make_executor(target=target)
self.executor.open()
await self.executor.join(timeout=2)
self.assertTrue(self.executor._stopped)
if not _IS_SYNC:
first_task = self.ex._task
self.ex.open()
await self.ex.join(timeout=2)
first_task = self.executor._task
self.executor.open()
await self.executor.join(timeout=2)
self.assertGreaterEqual(called[0], 2)
if not _IS_SYNC:
self.assertIsNot(self.ex._task, first_task)
self.assertIsNot(self.executor._task, first_task)
class TestShouldStop(AsyncUnitTest):
if _IS_SYNC:
def test_returns_false_when_not_stopped(self):
ex = _make_executor()
self.assertFalse(ex._should_stop())
self.assertFalse(ex._thread_will_exit)
executor = _make_executor()
self.assertFalse(executor._should_stop())
self.assertFalse(executor._thread_will_exit)
def test_returns_true_and_sets_thread_will_exit(self):
ex = _make_executor()
ex._stopped = True
self.assertTrue(ex._should_stop())
self.assertTrue(ex._thread_will_exit)
executor = _make_executor()
executor._stopped = True
self.assertTrue(executor._should_stop())
self.assertTrue(executor._thread_will_exit)
class TestRegisterExecutor(AsyncUnitTest):
if _IS_SYNC:
def setUp(self):
self._orig = set(pe_module._EXECUTORS)
self._orig = set(periodic_executor._EXECUTORS)
def tearDown(self):
pe_module._EXECUTORS.clear()
pe_module._EXECUTORS.update(self._orig)
periodic_executor._EXECUTORS.clear()
periodic_executor._EXECUTORS.update(self._orig)
def test_register_adds_weakref(self):
ex = _make_executor()
before = len(pe_module._EXECUTORS)
_register_executor(ex)
self.assertEqual(len(pe_module._EXECUTORS), before + 1)
ref = next(r for r in pe_module._EXECUTORS if r() is ex)
del ex
executor = _make_executor()
before = len(periodic_executor._EXECUTORS)
_register_executor(executor)
self.assertEqual(len(periodic_executor._EXECUTORS), before + 1)
ref = next(r for r in periodic_executor._EXECUTORS if r() is executor)
del executor
gc.collect()
self.assertNotIn(ref, pe_module._EXECUTORS)
self.assertNotIn(ref, periodic_executor._EXECUTORS)
def test_shutdown_executors_stops_running_executors(self):
ran = threading.Event()
@ -274,15 +274,15 @@ class TestRegisterExecutor(AsyncUnitTest):
ran.set()
return True
ex = _make_executor(target=target)
ex.open()
executor = _make_executor(target=target)
executor.open()
self.assertTrue(ran.wait(timeout=2), "target never ran")
_shutdown_executors()
ex.join(timeout=2)
self.assertTrue(ex._stopped)
executor.join(timeout=2)
self.assertTrue(executor._stopped)
def test_shutdown_executors_safe_when_empty(self):
pe_module._EXECUTORS.clear()
periodic_executor._EXECUTORS.clear()
_shutdown_executors()

View File

@ -26,7 +26,7 @@ sys.path[0:0] = [""]
from test import UnitTest, unittest
import pymongo.periodic_executor as pe_module
from pymongo import periodic_executor
from pymongo.periodic_executor import (
PeriodicExecutor,
_register_executor,
@ -45,71 +45,71 @@ def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"):
return PeriodicExecutor(interval=interval, min_interval=min_interval, target=target, name=name)
class _PeriodicExecutorTestBase(UnitTest):
class PeriodicExecutorTestBase(UnitTest):
def setUp(self):
self.ex = _make_executor()
self.executor = _make_executor()
def tearDown(self):
self.ex.close()
self.ex.join(timeout=2)
self.executor.close()
self.executor.join(timeout=2)
class TestPeriodicExecutorRepr(UnitTest):
def test_repr_contains_class_and_name(self):
ex = _make_executor(name="exec")
r = repr(ex)
self.assertIn("PeriodicExecutor", r)
self.assertIn("exec", r)
executor = _make_executor(name="exec")
executor_repr = repr(executor)
self.assertIn("PeriodicExecutor", executor_repr)
self.assertIn("exec", executor_repr)
class TestPeriodicExecutorBasic(_PeriodicExecutorTestBase):
class TestPeriodicExecutorBasic(PeriodicExecutorTestBase):
def test_wake_sets_event(self):
self.assertFalse(self.ex._event)
self.ex.wake()
self.assertTrue(self.ex._event)
self.assertFalse(self.executor._event)
self.executor.wake()
self.assertTrue(self.executor._event)
def test_update_interval(self):
self.ex.update_interval(60)
self.assertEqual(self.ex._interval, 60)
self.executor.update_interval(60)
self.assertEqual(self.executor._interval, 60)
def test_skip_sleep(self):
self.assertFalse(self.ex._skip_sleep)
self.ex.skip_sleep()
self.assertTrue(self.ex._skip_sleep)
self.assertFalse(self.executor._skip_sleep)
self.executor.skip_sleep()
self.assertTrue(self.executor._skip_sleep)
class TestPeriodicExecutorLifecycle(_PeriodicExecutorTestBase):
class TestPeriodicExecutorLifecycle(PeriodicExecutorTestBase):
def test_open_starts_worker(self):
self.ex.open()
self.executor.open()
if _IS_SYNC:
self.assertIsNotNone(self.ex._thread)
self.assertTrue(self.ex._thread.is_alive())
self.assertIsNotNone(self.executor._thread)
self.assertTrue(self.executor._thread.is_alive())
else:
self.assertIsNotNone(self.ex._task)
self.assertIsNotNone(self.executor._task)
def test_close_sets_stopped(self):
self.ex.open()
self.ex.close()
self.assertTrue(self.ex._stopped)
self.ex.join(timeout=1)
self.executor.open()
self.executor.close()
self.assertTrue(self.executor._stopped)
self.executor.join(timeout=1)
def test_join_without_open_is_safe(self):
self.ex.join(timeout=0.01)
self.executor.join(timeout=0.01)
def test_multiple_open_calls_have_no_effect(self):
self.ex.open()
self.executor.open()
if _IS_SYNC:
worker_id = id(self.ex._thread)
worker_id = id(self.executor._thread)
else:
worker_id = id(self.ex._task)
self.ex.open()
worker_id = id(self.executor._task)
self.executor.open()
if _IS_SYNC:
self.assertEqual(worker_id, id(self.ex._thread))
self.assertEqual(worker_id, id(self.executor._thread))
else:
self.assertEqual(worker_id, id(self.ex._task))
self.assertEqual(worker_id, id(self.executor._task))
class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase):
class TestPeriodicExecutorTarget(PeriodicExecutorTestBase):
def test_target_returning_false_stops_executor(self):
if _IS_SYNC:
ran = threading.Event()
@ -120,14 +120,14 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase):
ran.set()
return False
self.ex = _make_executor(target=target)
self.ex.open()
self.executor = _make_executor(target=target)
self.executor.open()
if _IS_SYNC:
self.assertTrue(ran.wait(timeout=2), "target never ran")
else:
asyncio.wait_for(ran.wait(), timeout=2)
self.ex.join(timeout=2)
self.assertTrue(self.ex._stopped)
self.executor.join(timeout=2)
self.assertTrue(self.executor._stopped)
def test_target_exception_stops_executor(self):
if _IS_SYNC:
@ -143,15 +143,15 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase):
def target():
ran.set()
raise RuntimeError("boom")
raise RuntimeError("error")
self.ex = _make_executor(target=target)
self.ex.open()
self.executor = _make_executor(target=target)
self.executor.open()
self.assertTrue(ran.wait(timeout=2), "target never ran")
self.ex.join(timeout=2)
self.executor.join(timeout=2)
finally:
threading.excepthook = orig_excepthook
self.assertTrue(self.ex._stopped)
self.assertTrue(self.executor._stopped)
self.assertEqual(len(captured_exc), 1)
self.assertIsInstance(captured_exc[0], RuntimeError)
else:
@ -159,15 +159,15 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase):
def target():
ran.set()
raise RuntimeError("async boom")
raise RuntimeError("error")
self.ex = _make_executor(target=target)
self.ex.open()
self.executor = _make_executor(target=target)
self.executor.open()
asyncio.wait_for(ran.wait(), timeout=2)
self.ex.join(timeout=2)
self.assertTrue(self.ex._stopped)
if self.ex._task is not None and self.ex._task.done():
self.ex._task.exception()
self.executor.join(timeout=2)
self.assertTrue(self.executor._stopped)
if self.executor._task is not None and self.executor._task.done():
self.executor._task.exception()
def test_skip_sleep_flag_skips_interval(self):
call_times = []
@ -178,10 +178,10 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase):
return False
return True
self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target)
self.ex.skip_sleep()
self.ex.open()
self.ex.join(timeout=3)
self.executor = _make_executor(interval=30.0, min_interval=0.001, target=target)
self.executor.skip_sleep()
self.executor.open()
self.executor.join(timeout=3)
self.assertGreaterEqual(len(call_times), 2)
self.assertLess(call_times[1] - call_times[0], 5.0)
@ -200,14 +200,14 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase):
return False
return True
self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target)
self.ex.open()
self.executor = _make_executor(interval=30.0, min_interval=0.01, target=target)
self.executor.open()
if _IS_SYNC:
woken.wait(timeout=2)
else:
asyncio.wait_for(woken.wait(), timeout=2)
self.ex.wake()
self.ex.join(timeout=3)
self.executor.wake()
self.executor.join(timeout=3)
self.assertGreaterEqual(call_count[0], 2)
def test_open_after_target_returns_false(self):
@ -217,53 +217,53 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase):
called[0] += 1
return False
self.ex = _make_executor(target=target)
self.ex.open()
self.ex.join(timeout=2)
self.assertTrue(self.ex._stopped)
self.executor = _make_executor(target=target)
self.executor.open()
self.executor.join(timeout=2)
self.assertTrue(self.executor._stopped)
if not _IS_SYNC:
first_task = self.ex._task
self.ex.open()
self.ex.join(timeout=2)
first_task = self.executor._task
self.executor.open()
self.executor.join(timeout=2)
self.assertGreaterEqual(called[0], 2)
if not _IS_SYNC:
self.assertIsNot(self.ex._task, first_task)
self.assertIsNot(self.executor._task, first_task)
class TestShouldStop(UnitTest):
if _IS_SYNC:
def test_returns_false_when_not_stopped(self):
ex = _make_executor()
self.assertFalse(ex._should_stop())
self.assertFalse(ex._thread_will_exit)
executor = _make_executor()
self.assertFalse(executor._should_stop())
self.assertFalse(executor._thread_will_exit)
def test_returns_true_and_sets_thread_will_exit(self):
ex = _make_executor()
ex._stopped = True
self.assertTrue(ex._should_stop())
self.assertTrue(ex._thread_will_exit)
executor = _make_executor()
executor._stopped = True
self.assertTrue(executor._should_stop())
self.assertTrue(executor._thread_will_exit)
class TestRegisterExecutor(UnitTest):
if _IS_SYNC:
def setUp(self):
self._orig = set(pe_module._EXECUTORS)
self._orig = set(periodic_executor._EXECUTORS)
def tearDown(self):
pe_module._EXECUTORS.clear()
pe_module._EXECUTORS.update(self._orig)
periodic_executor._EXECUTORS.clear()
periodic_executor._EXECUTORS.update(self._orig)
def test_register_adds_weakref(self):
ex = _make_executor()
before = len(pe_module._EXECUTORS)
_register_executor(ex)
self.assertEqual(len(pe_module._EXECUTORS), before + 1)
ref = next(r for r in pe_module._EXECUTORS if r() is ex)
del ex
executor = _make_executor()
before = len(periodic_executor._EXECUTORS)
_register_executor(executor)
self.assertEqual(len(periodic_executor._EXECUTORS), before + 1)
ref = next(r for r in periodic_executor._EXECUTORS if r() is executor)
del executor
gc.collect()
self.assertNotIn(ref, pe_module._EXECUTORS)
self.assertNotIn(ref, periodic_executor._EXECUTORS)
def test_shutdown_executors_stops_running_executors(self):
ran = threading.Event()
@ -272,15 +272,15 @@ class TestRegisterExecutor(UnitTest):
ran.set()
return True
ex = _make_executor(target=target)
ex.open()
executor = _make_executor(target=target)
executor.open()
self.assertTrue(ran.wait(timeout=2), "target never ran")
_shutdown_executors()
ex.join(timeout=2)
self.assertTrue(ex._stopped)
executor.join(timeout=2)
self.assertTrue(executor._stopped)
def test_shutdown_executors_safe_when_empty(self):
pe_module._EXECUTORS.clear()
periodic_executor._EXECUTORS.clear()
_shutdown_executors()

View File

@ -252,6 +252,7 @@ converted_tests = [
"test_monitoring.py",
"test_mongos_load_balancing.py",
"test_on_demand_csfle.py",
"test_periodic_executor.py",
"test_pooling.py",
"test_raw_bson.py",
"test_read_concern.py",
@ -279,7 +280,6 @@ converted_tests = [
"unified_format.py",
"utils_selection_tests.py",
"utils.py",
"test_periodic_executor.py",
]