mongo-python-driver/test/test_periodic_executor.py
Jeffrey 'Alex' Clark 71a240c576 Copilot + Noah review
2026-05-06 22:41:37 -04:00

131 lines
3.6 KiB
Python

# Copyright 2026-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for periodic_executor.py."""
from __future__ import annotations
import asyncio
import sys
import threading
import time
sys.path[0:0] = [""]
from test.synchronous import UnitTest, unittest
from pymongo.periodic_executor import PeriodicExecutor
_IS_SYNC = False
class TestAsyncPeriodicExecutor(UnitTest):
def _make_executor(self, interval=30.0, min_interval=0.01, target=None, name="test"):
if target is None:
def target():
return True
executor = PeriodicExecutor(
interval=interval, min_interval=min_interval, target=target, name=name
)
self.addCleanup(self._close_executor, executor)
return executor
def _close_executor(self, executor):
executor.close()
executor.join(timeout=2)
def test_join_without_open_is_safe(self):
executor = self._make_executor()
try:
executor.join(timeout=0.01)
except Exception as e:
self.fail(f"join() raised unexpected Exception {e}")
def test_target_returning_false_stops_executor(self):
if _IS_SYNC:
ran = threading.Event()
else:
ran = asyncio.Event()
def target():
ran.set()
return False
executor = self._make_executor(target=target)
executor.open()
executor.join(timeout=2)
self.assertTrue(ran.is_set(), "target never ran")
def test_skip_sleep_flag_skips_interval(self):
call_times = []
def target():
call_times.append(time.monotonic())
if len(call_times) >= 2:
return False
return True
executor = self._make_executor(interval=30.0, min_interval=0.001, target=target)
executor.skip_sleep()
executor.open()
executor.join(timeout=3)
self.assertGreaterEqual(len(call_times), 2)
self.assertLess(call_times[1] - call_times[0], 5.0)
def test_wake_causes_early_run(self):
call_count = 0
if _IS_SYNC:
woken = threading.Event()
else:
woken = asyncio.Event()
def target():
nonlocal call_count
call_count += 1
if call_count == 1:
woken.set()
return call_count < 2
executor = self._make_executor(interval=30.0, min_interval=0.01, target=target)
executor.open()
if _IS_SYNC:
woken.wait(timeout=2)
else:
assert isinstance(woken, asyncio.Event)
asyncio.wait_for(woken.wait(), timeout=2)
executor.wake()
executor.join(timeout=3)
self.assertGreaterEqual(call_count, 2)
def test_open_after_target_returns_false(self):
called = 0
def target():
nonlocal called
called += 1
return False
executor = self._make_executor(target=target)
executor.open()
executor.join(timeout=2)
executor.open()
executor.join(timeout=2)
self.assertGreaterEqual(called, 2)
if __name__ == "__main__":
unittest.main()