diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py index 571a19340..cdf82ff8a 100644 --- a/test/asynchronous/test_periodic_executor.py +++ b/test/asynchronous/test_periodic_executor.py @@ -17,7 +17,6 @@ from __future__ import annotations import asyncio -import gc import sys import threading import time @@ -26,12 +25,7 @@ sys.path[0:0] = [""] from test.asynchronous import AsyncUnitTest, unittest -from pymongo import periodic_executor -from pymongo.periodic_executor import ( - AsyncPeriodicExecutor, - _register_executor, - _shutdown_executors, -) +from pymongo.periodic_executor import AsyncPeriodicExecutor _IS_SYNC = False @@ -49,69 +43,28 @@ def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"): class AsyncPeriodicExecutorTestBase(AsyncUnitTest): async def asyncSetUp(self): - self.executor = _make_executor() + self.executor = None async def asyncTearDown(self): - self.executor.close() - await self.executor.join(timeout=2) + if self.executor is not None: + self.executor.close() + await self.executor.join(timeout=2) -class TestAsyncPeriodicExecutorRepr(AsyncUnitTest): +class TestAsyncPeriodicExecutor(AsyncPeriodicExecutorTestBase): async def test_repr_contains_class_and_name(self): executor = _make_executor(name="exec") executor_repr = repr(executor) self.assertIn("AsyncPeriodicExecutor", executor_repr) self.assertIn("exec", executor_repr) - -class TestAsyncPeriodicExecutorBasic(AsyncPeriodicExecutorTestBase): - async def test_wake_sets_event(self): - self.assertFalse(self.executor._event) - self.executor.wake() - self.assertTrue(self.executor._event) - - async def test_update_interval(self): - self.executor.update_interval(60) - self.assertEqual(self.executor._interval, 60) - - async def test_skip_sleep(self): - self.assertFalse(self.executor._skip_sleep) - self.executor.skip_sleep() - self.assertTrue(self.executor._skip_sleep) - - -class TestAsyncPeriodicExecutorLifecycle(AsyncPeriodicExecutorTestBase): - async def test_open_starts_worker(self): - self.executor.open() - if _IS_SYNC: - self.assertIsNotNone(self.executor._thread) - self.assertTrue(self.executor._thread.is_alive()) - else: - self.assertIsNotNone(self.executor._task) - - async def test_close_sets_stopped(self): - self.executor.open() - self.executor.close() - self.assertTrue(self.executor._stopped) - await self.executor.join(timeout=1) - async def test_join_without_open_is_safe(self): - await self.executor.join(timeout=0.01) + self.executor = _make_executor() + try: + await self.executor.join(timeout=0.01) + except Exception as e: + self.fail(f"join() raised unexpected Exception {e}") - async def test_multiple_open_calls_have_no_effect(self): - self.executor.open() - if _IS_SYNC: - worker_id = id(self.executor._thread) - else: - worker_id = id(self.executor._task) - self.executor.open() - if _IS_SYNC: - self.assertEqual(worker_id, id(self.executor._thread)) - else: - self.assertEqual(worker_id, id(self.executor._task)) - - -class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase): async def test_target_returning_false_stops_executor(self): if _IS_SYNC: ran = threading.Event() @@ -124,12 +77,8 @@ class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase): self.executor = _make_executor(target=target) self.executor.open() - if _IS_SYNC: - self.assertTrue(ran.wait(timeout=2), "target never ran") - else: - await asyncio.wait_for(ran.wait(), timeout=2) await self.executor.join(timeout=2) - self.assertTrue(self.executor._stopped) + self.assertTrue(ran.is_set(), "target never ran") async def test_target_exception_stops_executor(self): if _IS_SYNC: @@ -149,33 +98,30 @@ class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase): self.executor = _make_executor(target=target) self.executor.open() - self.assertTrue(ran.wait(timeout=2), "target never ran") self.executor.join(timeout=2) + self.assertTrue(ran.is_set(), "target never ran") finally: threading.excepthook = orig_excepthook - self.assertTrue(self.executor._stopped) self.assertEqual(len(captured_exc), 1) self.assertIsInstance(captured_exc[0], RuntimeError) else: - ran = asyncio.Event() + call_count = 0 async def target(): - ran.set() + nonlocal call_count + call_count += 1 raise RuntimeError("error") self.executor = _make_executor(target=target) self.executor.open() - await asyncio.wait_for(ran.wait(), timeout=2) await self.executor.join(timeout=2) - self.assertTrue(self.executor._stopped) - if self.executor._task is not None and self.executor._task.done(): - self.executor._task.exception() + self.assertEqual(call_count, 1, "target should stop after exception") async def test_skip_sleep_flag_skips_interval(self): call_times = [] async def target(): - call_times.append(time.monotonic() if _IS_SYNC else asyncio.get_running_loop().time()) + call_times.append(time.monotonic()) if len(call_times) >= 2: return False return True @@ -188,19 +134,18 @@ class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase): self.assertLess(call_times[1] - call_times[0], 5.0) async def test_wake_causes_early_run(self): - call_count = [0] + call_count = 0 if _IS_SYNC: woken = threading.Event() else: woken = asyncio.Event() async def target(): - call_count[0] += 1 - if call_count[0] == 1: + nonlocal call_count + call_count += 1 + if call_count == 1: woken.set() - if call_count[0] >= 2: - return False - return True + return call_count < 2 self.executor = _make_executor(interval=30.0, min_interval=0.01, target=target) self.executor.open() @@ -210,80 +155,22 @@ class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase): await asyncio.wait_for(woken.wait(), timeout=2) self.executor.wake() await self.executor.join(timeout=3) - self.assertGreaterEqual(call_count[0], 2) + self.assertGreaterEqual(call_count, 2) async def test_open_after_target_returns_false(self): - called = [0] + called = 0 async def target(): - called[0] += 1 + nonlocal called + called += 1 return False self.executor = _make_executor(target=target) self.executor.open() await self.executor.join(timeout=2) - self.assertTrue(self.executor._stopped) - if not _IS_SYNC: - first_task = self.executor._task self.executor.open() await self.executor.join(timeout=2) - self.assertGreaterEqual(called[0], 2) - if not _IS_SYNC: - self.assertIsNot(self.executor._task, first_task) - - -class TestShouldStop(AsyncUnitTest): - if _IS_SYNC: - - def test_returns_false_when_not_stopped(self): - executor = _make_executor() - self.assertFalse(executor._should_stop()) - self.assertFalse(executor._thread_will_exit) - - def test_returns_true_and_sets_thread_will_exit(self): - executor = _make_executor() - executor._stopped = True - self.assertTrue(executor._should_stop()) - self.assertTrue(executor._thread_will_exit) - - -class TestRegisterExecutor(AsyncUnitTest): - if _IS_SYNC: - - def setUp(self): - self._orig = set(periodic_executor._EXECUTORS) - - def tearDown(self): - periodic_executor._EXECUTORS.clear() - periodic_executor._EXECUTORS.update(self._orig) - - def test_register_adds_weakref(self): - executor = _make_executor() - before = len(periodic_executor._EXECUTORS) - _register_executor(executor) - self.assertEqual(len(periodic_executor._EXECUTORS), before + 1) - ref = next(r for r in periodic_executor._EXECUTORS if r() is executor) - del executor - gc.collect() - self.assertNotIn(ref, periodic_executor._EXECUTORS) - - def test_shutdown_executors_stops_running_executors(self): - ran = threading.Event() - - def target(): - ran.set() - return True - - executor = _make_executor(target=target) - executor.open() - self.assertTrue(ran.wait(timeout=2), "target never ran") - _shutdown_executors() - executor.join(timeout=2) - self.assertTrue(executor._stopped) - - def test_shutdown_executors_safe_when_empty(self): - periodic_executor._EXECUTORS.clear() - _shutdown_executors() + self.assertGreaterEqual(called, 2) if __name__ == "__main__": diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index deba99975..3df2dbf97 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -17,7 +17,6 @@ from __future__ import annotations import asyncio -import gc import sys import threading import time @@ -26,12 +25,7 @@ sys.path[0:0] = [""] from test import UnitTest, unittest -from pymongo import periodic_executor -from pymongo.periodic_executor import ( - PeriodicExecutor, - _register_executor, - _shutdown_executors, -) +from pymongo.periodic_executor import PeriodicExecutor _IS_SYNC = True @@ -47,69 +41,28 @@ def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"): class PeriodicExecutorTestBase(UnitTest): def setUp(self): - self.executor = _make_executor() + self.executor = None def tearDown(self): - self.executor.close() - self.executor.join(timeout=2) + if self.executor is not None: + self.executor.close() + self.executor.join(timeout=2) -class TestPeriodicExecutorRepr(UnitTest): +class TestPeriodicExecutor(PeriodicExecutorTestBase): def test_repr_contains_class_and_name(self): executor = _make_executor(name="exec") executor_repr = repr(executor) self.assertIn("PeriodicExecutor", executor_repr) self.assertIn("exec", executor_repr) - -class TestPeriodicExecutorBasic(PeriodicExecutorTestBase): - def test_wake_sets_event(self): - self.assertFalse(self.executor._event) - self.executor.wake() - self.assertTrue(self.executor._event) - - def test_update_interval(self): - self.executor.update_interval(60) - self.assertEqual(self.executor._interval, 60) - - def test_skip_sleep(self): - self.assertFalse(self.executor._skip_sleep) - self.executor.skip_sleep() - self.assertTrue(self.executor._skip_sleep) - - -class TestPeriodicExecutorLifecycle(PeriodicExecutorTestBase): - def test_open_starts_worker(self): - self.executor.open() - if _IS_SYNC: - self.assertIsNotNone(self.executor._thread) - self.assertTrue(self.executor._thread.is_alive()) - else: - self.assertIsNotNone(self.executor._task) - - def test_close_sets_stopped(self): - self.executor.open() - self.executor.close() - self.assertTrue(self.executor._stopped) - self.executor.join(timeout=1) - def test_join_without_open_is_safe(self): - self.executor.join(timeout=0.01) + self.executor = _make_executor() + try: + self.executor.join(timeout=0.01) + except Exception as e: + self.fail(f"join() raised unexpected Exception {e}") - def test_multiple_open_calls_have_no_effect(self): - self.executor.open() - if _IS_SYNC: - worker_id = id(self.executor._thread) - else: - worker_id = id(self.executor._task) - self.executor.open() - if _IS_SYNC: - self.assertEqual(worker_id, id(self.executor._thread)) - else: - self.assertEqual(worker_id, id(self.executor._task)) - - -class TestPeriodicExecutorTarget(PeriodicExecutorTestBase): def test_target_returning_false_stops_executor(self): if _IS_SYNC: ran = threading.Event() @@ -122,12 +75,8 @@ class TestPeriodicExecutorTarget(PeriodicExecutorTestBase): self.executor = _make_executor(target=target) self.executor.open() - if _IS_SYNC: - self.assertTrue(ran.wait(timeout=2), "target never ran") - else: - asyncio.wait_for(ran.wait(), timeout=2) self.executor.join(timeout=2) - self.assertTrue(self.executor._stopped) + self.assertTrue(ran.is_set(), "target never ran") def test_target_exception_stops_executor(self): if _IS_SYNC: @@ -147,33 +96,30 @@ class TestPeriodicExecutorTarget(PeriodicExecutorTestBase): self.executor = _make_executor(target=target) self.executor.open() - self.assertTrue(ran.wait(timeout=2), "target never ran") self.executor.join(timeout=2) + self.assertTrue(ran.is_set(), "target never ran") finally: threading.excepthook = orig_excepthook - self.assertTrue(self.executor._stopped) self.assertEqual(len(captured_exc), 1) self.assertIsInstance(captured_exc[0], RuntimeError) else: - ran = asyncio.Event() + call_count = 0 def target(): - ran.set() + nonlocal call_count + call_count += 1 raise RuntimeError("error") self.executor = _make_executor(target=target) self.executor.open() - asyncio.wait_for(ran.wait(), timeout=2) self.executor.join(timeout=2) - self.assertTrue(self.executor._stopped) - if self.executor._task is not None and self.executor._task.done(): - self.executor._task.exception() + self.assertEqual(call_count, 1, "target should stop after exception") def test_skip_sleep_flag_skips_interval(self): call_times = [] def target(): - call_times.append(time.monotonic() if _IS_SYNC else asyncio.get_running_loop().time()) + call_times.append(time.monotonic()) if len(call_times) >= 2: return False return True @@ -186,19 +132,18 @@ class TestPeriodicExecutorTarget(PeriodicExecutorTestBase): self.assertLess(call_times[1] - call_times[0], 5.0) def test_wake_causes_early_run(self): - call_count = [0] + call_count = 0 if _IS_SYNC: woken = threading.Event() else: woken = asyncio.Event() def target(): - call_count[0] += 1 - if call_count[0] == 1: + nonlocal call_count + call_count += 1 + if call_count == 1: woken.set() - if call_count[0] >= 2: - return False - return True + return call_count < 2 self.executor = _make_executor(interval=30.0, min_interval=0.01, target=target) self.executor.open() @@ -208,80 +153,22 @@ class TestPeriodicExecutorTarget(PeriodicExecutorTestBase): asyncio.wait_for(woken.wait(), timeout=2) self.executor.wake() self.executor.join(timeout=3) - self.assertGreaterEqual(call_count[0], 2) + self.assertGreaterEqual(call_count, 2) def test_open_after_target_returns_false(self): - called = [0] + called = 0 def target(): - called[0] += 1 + nonlocal called + called += 1 return False self.executor = _make_executor(target=target) self.executor.open() self.executor.join(timeout=2) - self.assertTrue(self.executor._stopped) - if not _IS_SYNC: - first_task = self.executor._task self.executor.open() self.executor.join(timeout=2) - self.assertGreaterEqual(called[0], 2) - if not _IS_SYNC: - self.assertIsNot(self.executor._task, first_task) - - -class TestShouldStop(UnitTest): - if _IS_SYNC: - - def test_returns_false_when_not_stopped(self): - executor = _make_executor() - self.assertFalse(executor._should_stop()) - self.assertFalse(executor._thread_will_exit) - - def test_returns_true_and_sets_thread_will_exit(self): - executor = _make_executor() - executor._stopped = True - self.assertTrue(executor._should_stop()) - self.assertTrue(executor._thread_will_exit) - - -class TestRegisterExecutor(UnitTest): - if _IS_SYNC: - - def setUp(self): - self._orig = set(periodic_executor._EXECUTORS) - - def tearDown(self): - periodic_executor._EXECUTORS.clear() - periodic_executor._EXECUTORS.update(self._orig) - - def test_register_adds_weakref(self): - executor = _make_executor() - before = len(periodic_executor._EXECUTORS) - _register_executor(executor) - self.assertEqual(len(periodic_executor._EXECUTORS), before + 1) - ref = next(r for r in periodic_executor._EXECUTORS if r() is executor) - del executor - gc.collect() - self.assertNotIn(ref, periodic_executor._EXECUTORS) - - def test_shutdown_executors_stops_running_executors(self): - ran = threading.Event() - - def target(): - ran.set() - return True - - executor = _make_executor(target=target) - executor.open() - self.assertTrue(ran.wait(timeout=2), "target never ran") - _shutdown_executors() - executor.join(timeout=2) - self.assertTrue(executor._stopped) - - def test_shutdown_executors_safe_when_empty(self): - periodic_executor._EXECUTORS.clear() - _shutdown_executors() + self.assertGreaterEqual(called, 2) if __name__ == "__main__":