diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py index 15f75b0f4..571a19340 100644 --- a/test/asynchronous/test_periodic_executor.py +++ b/test/asynchronous/test_periodic_executor.py @@ -26,7 +26,7 @@ sys.path[0:0] = [""] from test.asynchronous import AsyncUnitTest, unittest -import pymongo.periodic_executor as pe_module +from pymongo import periodic_executor from pymongo.periodic_executor import ( AsyncPeriodicExecutor, _register_executor, @@ -47,71 +47,71 @@ def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"): ) -class _AsyncPeriodicExecutorTestBase(AsyncUnitTest): +class AsyncPeriodicExecutorTestBase(AsyncUnitTest): async def asyncSetUp(self): - self.ex = _make_executor() + self.executor = _make_executor() async def asyncTearDown(self): - self.ex.close() - await self.ex.join(timeout=2) + self.executor.close() + await self.executor.join(timeout=2) class TestAsyncPeriodicExecutorRepr(AsyncUnitTest): async def test_repr_contains_class_and_name(self): - ex = _make_executor(name="exec") - r = repr(ex) - self.assertIn("AsyncPeriodicExecutor", r) - self.assertIn("exec", r) + executor = _make_executor(name="exec") + executor_repr = repr(executor) + self.assertIn("AsyncPeriodicExecutor", executor_repr) + self.assertIn("exec", executor_repr) -class TestAsyncPeriodicExecutorBasic(_AsyncPeriodicExecutorTestBase): +class TestAsyncPeriodicExecutorBasic(AsyncPeriodicExecutorTestBase): async def test_wake_sets_event(self): - self.assertFalse(self.ex._event) - self.ex.wake() - self.assertTrue(self.ex._event) + self.assertFalse(self.executor._event) + self.executor.wake() + self.assertTrue(self.executor._event) async def test_update_interval(self): - self.ex.update_interval(60) - self.assertEqual(self.ex._interval, 60) + self.executor.update_interval(60) + self.assertEqual(self.executor._interval, 60) async def test_skip_sleep(self): - self.assertFalse(self.ex._skip_sleep) - self.ex.skip_sleep() - self.assertTrue(self.ex._skip_sleep) + self.assertFalse(self.executor._skip_sleep) + self.executor.skip_sleep() + self.assertTrue(self.executor._skip_sleep) -class TestAsyncPeriodicExecutorLifecycle(_AsyncPeriodicExecutorTestBase): +class TestAsyncPeriodicExecutorLifecycle(AsyncPeriodicExecutorTestBase): async def test_open_starts_worker(self): - self.ex.open() + self.executor.open() if _IS_SYNC: - self.assertIsNotNone(self.ex._thread) - self.assertTrue(self.ex._thread.is_alive()) + self.assertIsNotNone(self.executor._thread) + self.assertTrue(self.executor._thread.is_alive()) else: - self.assertIsNotNone(self.ex._task) + self.assertIsNotNone(self.executor._task) async def test_close_sets_stopped(self): - self.ex.open() - self.ex.close() - self.assertTrue(self.ex._stopped) - await self.ex.join(timeout=1) + self.executor.open() + self.executor.close() + self.assertTrue(self.executor._stopped) + await self.executor.join(timeout=1) async def test_join_without_open_is_safe(self): - await self.ex.join(timeout=0.01) + await self.executor.join(timeout=0.01) async def test_multiple_open_calls_have_no_effect(self): - self.ex.open() + self.executor.open() if _IS_SYNC: - worker_id = id(self.ex._thread) + worker_id = id(self.executor._thread) else: - worker_id = id(self.ex._task) - self.ex.open() + worker_id = id(self.executor._task) + self.executor.open() if _IS_SYNC: - self.assertEqual(worker_id, id(self.ex._thread)) + self.assertEqual(worker_id, id(self.executor._thread)) else: - self.assertEqual(worker_id, id(self.ex._task)) + self.assertEqual(worker_id, id(self.executor._task)) -class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): +class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase): async def test_target_returning_false_stops_executor(self): if _IS_SYNC: ran = threading.Event() @@ -122,14 +122,14 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): ran.set() return False - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() if _IS_SYNC: self.assertTrue(ran.wait(timeout=2), "target never ran") else: await asyncio.wait_for(ran.wait(), timeout=2) - await self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) + await self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) async def test_target_exception_stops_executor(self): if _IS_SYNC: @@ -145,15 +145,15 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): def target(): ran.set() - raise RuntimeError("boom") + raise RuntimeError("error") - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() self.assertTrue(ran.wait(timeout=2), "target never ran") - self.ex.join(timeout=2) + self.executor.join(timeout=2) finally: threading.excepthook = orig_excepthook - self.assertTrue(self.ex._stopped) + self.assertTrue(self.executor._stopped) self.assertEqual(len(captured_exc), 1) self.assertIsInstance(captured_exc[0], RuntimeError) else: @@ -161,15 +161,15 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): async def target(): ran.set() - raise RuntimeError("async boom") + raise RuntimeError("error") - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() await asyncio.wait_for(ran.wait(), timeout=2) - await self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) - if self.ex._task is not None and self.ex._task.done(): - self.ex._task.exception() + await self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) + if self.executor._task is not None and self.executor._task.done(): + self.executor._task.exception() async def test_skip_sleep_flag_skips_interval(self): call_times = [] @@ -180,10 +180,10 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): return False return True - self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target) - self.ex.skip_sleep() - self.ex.open() - await self.ex.join(timeout=3) + self.executor = _make_executor(interval=30.0, min_interval=0.001, target=target) + self.executor.skip_sleep() + self.executor.open() + await self.executor.join(timeout=3) self.assertGreaterEqual(len(call_times), 2) self.assertLess(call_times[1] - call_times[0], 5.0) @@ -202,14 +202,14 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): return False return True - self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target) - self.ex.open() + self.executor = _make_executor(interval=30.0, min_interval=0.01, target=target) + self.executor.open() if _IS_SYNC: woken.wait(timeout=2) else: await asyncio.wait_for(woken.wait(), timeout=2) - self.ex.wake() - await self.ex.join(timeout=3) + self.executor.wake() + await self.executor.join(timeout=3) self.assertGreaterEqual(call_count[0], 2) async def test_open_after_target_returns_false(self): @@ -219,53 +219,53 @@ class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): called[0] += 1 return False - self.ex = _make_executor(target=target) - self.ex.open() - await self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) + self.executor = _make_executor(target=target) + self.executor.open() + await self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) if not _IS_SYNC: - first_task = self.ex._task - self.ex.open() - await self.ex.join(timeout=2) + first_task = self.executor._task + self.executor.open() + await self.executor.join(timeout=2) self.assertGreaterEqual(called[0], 2) if not _IS_SYNC: - self.assertIsNot(self.ex._task, first_task) + self.assertIsNot(self.executor._task, first_task) class TestShouldStop(AsyncUnitTest): if _IS_SYNC: def test_returns_false_when_not_stopped(self): - ex = _make_executor() - self.assertFalse(ex._should_stop()) - self.assertFalse(ex._thread_will_exit) + executor = _make_executor() + self.assertFalse(executor._should_stop()) + self.assertFalse(executor._thread_will_exit) def test_returns_true_and_sets_thread_will_exit(self): - ex = _make_executor() - ex._stopped = True - self.assertTrue(ex._should_stop()) - self.assertTrue(ex._thread_will_exit) + executor = _make_executor() + executor._stopped = True + self.assertTrue(executor._should_stop()) + self.assertTrue(executor._thread_will_exit) class TestRegisterExecutor(AsyncUnitTest): if _IS_SYNC: def setUp(self): - self._orig = set(pe_module._EXECUTORS) + self._orig = set(periodic_executor._EXECUTORS) def tearDown(self): - pe_module._EXECUTORS.clear() - pe_module._EXECUTORS.update(self._orig) + periodic_executor._EXECUTORS.clear() + periodic_executor._EXECUTORS.update(self._orig) def test_register_adds_weakref(self): - ex = _make_executor() - before = len(pe_module._EXECUTORS) - _register_executor(ex) - self.assertEqual(len(pe_module._EXECUTORS), before + 1) - ref = next(r for r in pe_module._EXECUTORS if r() is ex) - del ex + executor = _make_executor() + before = len(periodic_executor._EXECUTORS) + _register_executor(executor) + self.assertEqual(len(periodic_executor._EXECUTORS), before + 1) + ref = next(r for r in periodic_executor._EXECUTORS if r() is executor) + del executor gc.collect() - self.assertNotIn(ref, pe_module._EXECUTORS) + self.assertNotIn(ref, periodic_executor._EXECUTORS) def test_shutdown_executors_stops_running_executors(self): ran = threading.Event() @@ -274,15 +274,15 @@ class TestRegisterExecutor(AsyncUnitTest): ran.set() return True - ex = _make_executor(target=target) - ex.open() + executor = _make_executor(target=target) + executor.open() self.assertTrue(ran.wait(timeout=2), "target never ran") _shutdown_executors() - ex.join(timeout=2) - self.assertTrue(ex._stopped) + executor.join(timeout=2) + self.assertTrue(executor._stopped) def test_shutdown_executors_safe_when_empty(self): - pe_module._EXECUTORS.clear() + periodic_executor._EXECUTORS.clear() _shutdown_executors() diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index f07ded1fd..deba99975 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -26,7 +26,7 @@ sys.path[0:0] = [""] from test import UnitTest, unittest -import pymongo.periodic_executor as pe_module +from pymongo import periodic_executor from pymongo.periodic_executor import ( PeriodicExecutor, _register_executor, @@ -45,71 +45,71 @@ def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"): return PeriodicExecutor(interval=interval, min_interval=min_interval, target=target, name=name) -class _PeriodicExecutorTestBase(UnitTest): +class PeriodicExecutorTestBase(UnitTest): def setUp(self): - self.ex = _make_executor() + self.executor = _make_executor() def tearDown(self): - self.ex.close() - self.ex.join(timeout=2) + self.executor.close() + self.executor.join(timeout=2) class TestPeriodicExecutorRepr(UnitTest): def test_repr_contains_class_and_name(self): - ex = _make_executor(name="exec") - r = repr(ex) - self.assertIn("PeriodicExecutor", r) - self.assertIn("exec", r) + executor = _make_executor(name="exec") + executor_repr = repr(executor) + self.assertIn("PeriodicExecutor", executor_repr) + self.assertIn("exec", executor_repr) -class TestPeriodicExecutorBasic(_PeriodicExecutorTestBase): +class TestPeriodicExecutorBasic(PeriodicExecutorTestBase): def test_wake_sets_event(self): - self.assertFalse(self.ex._event) - self.ex.wake() - self.assertTrue(self.ex._event) + self.assertFalse(self.executor._event) + self.executor.wake() + self.assertTrue(self.executor._event) def test_update_interval(self): - self.ex.update_interval(60) - self.assertEqual(self.ex._interval, 60) + self.executor.update_interval(60) + self.assertEqual(self.executor._interval, 60) def test_skip_sleep(self): - self.assertFalse(self.ex._skip_sleep) - self.ex.skip_sleep() - self.assertTrue(self.ex._skip_sleep) + self.assertFalse(self.executor._skip_sleep) + self.executor.skip_sleep() + self.assertTrue(self.executor._skip_sleep) -class TestPeriodicExecutorLifecycle(_PeriodicExecutorTestBase): +class TestPeriodicExecutorLifecycle(PeriodicExecutorTestBase): def test_open_starts_worker(self): - self.ex.open() + self.executor.open() if _IS_SYNC: - self.assertIsNotNone(self.ex._thread) - self.assertTrue(self.ex._thread.is_alive()) + self.assertIsNotNone(self.executor._thread) + self.assertTrue(self.executor._thread.is_alive()) else: - self.assertIsNotNone(self.ex._task) + self.assertIsNotNone(self.executor._task) def test_close_sets_stopped(self): - self.ex.open() - self.ex.close() - self.assertTrue(self.ex._stopped) - self.ex.join(timeout=1) + self.executor.open() + self.executor.close() + self.assertTrue(self.executor._stopped) + self.executor.join(timeout=1) def test_join_without_open_is_safe(self): - self.ex.join(timeout=0.01) + self.executor.join(timeout=0.01) def test_multiple_open_calls_have_no_effect(self): - self.ex.open() + self.executor.open() if _IS_SYNC: - worker_id = id(self.ex._thread) + worker_id = id(self.executor._thread) else: - worker_id = id(self.ex._task) - self.ex.open() + worker_id = id(self.executor._task) + self.executor.open() if _IS_SYNC: - self.assertEqual(worker_id, id(self.ex._thread)) + self.assertEqual(worker_id, id(self.executor._thread)) else: - self.assertEqual(worker_id, id(self.ex._task)) + self.assertEqual(worker_id, id(self.executor._task)) -class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): +class TestPeriodicExecutorTarget(PeriodicExecutorTestBase): def test_target_returning_false_stops_executor(self): if _IS_SYNC: ran = threading.Event() @@ -120,14 +120,14 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): ran.set() return False - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() if _IS_SYNC: self.assertTrue(ran.wait(timeout=2), "target never ran") else: asyncio.wait_for(ran.wait(), timeout=2) - self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) + self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) def test_target_exception_stops_executor(self): if _IS_SYNC: @@ -143,15 +143,15 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): def target(): ran.set() - raise RuntimeError("boom") + raise RuntimeError("error") - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() self.assertTrue(ran.wait(timeout=2), "target never ran") - self.ex.join(timeout=2) + self.executor.join(timeout=2) finally: threading.excepthook = orig_excepthook - self.assertTrue(self.ex._stopped) + self.assertTrue(self.executor._stopped) self.assertEqual(len(captured_exc), 1) self.assertIsInstance(captured_exc[0], RuntimeError) else: @@ -159,15 +159,15 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): def target(): ran.set() - raise RuntimeError("async boom") + raise RuntimeError("error") - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() asyncio.wait_for(ran.wait(), timeout=2) - self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) - if self.ex._task is not None and self.ex._task.done(): - self.ex._task.exception() + self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) + if self.executor._task is not None and self.executor._task.done(): + self.executor._task.exception() def test_skip_sleep_flag_skips_interval(self): call_times = [] @@ -178,10 +178,10 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): return False return True - self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target) - self.ex.skip_sleep() - self.ex.open() - self.ex.join(timeout=3) + self.executor = _make_executor(interval=30.0, min_interval=0.001, target=target) + self.executor.skip_sleep() + self.executor.open() + self.executor.join(timeout=3) self.assertGreaterEqual(len(call_times), 2) self.assertLess(call_times[1] - call_times[0], 5.0) @@ -200,14 +200,14 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): return False return True - self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target) - self.ex.open() + self.executor = _make_executor(interval=30.0, min_interval=0.01, target=target) + self.executor.open() if _IS_SYNC: woken.wait(timeout=2) else: asyncio.wait_for(woken.wait(), timeout=2) - self.ex.wake() - self.ex.join(timeout=3) + self.executor.wake() + self.executor.join(timeout=3) self.assertGreaterEqual(call_count[0], 2) def test_open_after_target_returns_false(self): @@ -217,53 +217,53 @@ class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): called[0] += 1 return False - self.ex = _make_executor(target=target) - self.ex.open() - self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) + self.executor = _make_executor(target=target) + self.executor.open() + self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) if not _IS_SYNC: - first_task = self.ex._task - self.ex.open() - self.ex.join(timeout=2) + first_task = self.executor._task + self.executor.open() + self.executor.join(timeout=2) self.assertGreaterEqual(called[0], 2) if not _IS_SYNC: - self.assertIsNot(self.ex._task, first_task) + self.assertIsNot(self.executor._task, first_task) class TestShouldStop(UnitTest): if _IS_SYNC: def test_returns_false_when_not_stopped(self): - ex = _make_executor() - self.assertFalse(ex._should_stop()) - self.assertFalse(ex._thread_will_exit) + executor = _make_executor() + self.assertFalse(executor._should_stop()) + self.assertFalse(executor._thread_will_exit) def test_returns_true_and_sets_thread_will_exit(self): - ex = _make_executor() - ex._stopped = True - self.assertTrue(ex._should_stop()) - self.assertTrue(ex._thread_will_exit) + executor = _make_executor() + executor._stopped = True + self.assertTrue(executor._should_stop()) + self.assertTrue(executor._thread_will_exit) class TestRegisterExecutor(UnitTest): if _IS_SYNC: def setUp(self): - self._orig = set(pe_module._EXECUTORS) + self._orig = set(periodic_executor._EXECUTORS) def tearDown(self): - pe_module._EXECUTORS.clear() - pe_module._EXECUTORS.update(self._orig) + periodic_executor._EXECUTORS.clear() + periodic_executor._EXECUTORS.update(self._orig) def test_register_adds_weakref(self): - ex = _make_executor() - before = len(pe_module._EXECUTORS) - _register_executor(ex) - self.assertEqual(len(pe_module._EXECUTORS), before + 1) - ref = next(r for r in pe_module._EXECUTORS if r() is ex) - del ex + executor = _make_executor() + before = len(periodic_executor._EXECUTORS) + _register_executor(executor) + self.assertEqual(len(periodic_executor._EXECUTORS), before + 1) + ref = next(r for r in periodic_executor._EXECUTORS if r() is executor) + del executor gc.collect() - self.assertNotIn(ref, pe_module._EXECUTORS) + self.assertNotIn(ref, periodic_executor._EXECUTORS) def test_shutdown_executors_stops_running_executors(self): ran = threading.Event() @@ -272,15 +272,15 @@ class TestRegisterExecutor(UnitTest): ran.set() return True - ex = _make_executor(target=target) - ex.open() + executor = _make_executor(target=target) + executor.open() self.assertTrue(ran.wait(timeout=2), "target never ran") _shutdown_executors() - ex.join(timeout=2) - self.assertTrue(ex._stopped) + executor.join(timeout=2) + self.assertTrue(executor._stopped) def test_shutdown_executors_safe_when_empty(self): - pe_module._EXECUTORS.clear() + periodic_executor._EXECUTORS.clear() _shutdown_executors() diff --git a/tools/synchro.py b/tools/synchro.py index 5b5267b85..51ec7eb24 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -252,6 +252,7 @@ converted_tests = [ "test_monitoring.py", "test_mongos_load_balancing.py", "test_on_demand_csfle.py", + "test_periodic_executor.py", "test_pooling.py", "test_raw_bson.py", "test_read_concern.py", @@ -279,7 +280,6 @@ converted_tests = [ "unified_format.py", "utils_selection_tests.py", "utils.py", - "test_periodic_executor.py", ]