Compare commits
3 Commits
main
...
codex/thre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f274d2a4e | ||
|
|
7e901a0d2b | ||
|
|
eea9b41c99 |
@ -70,15 +70,26 @@ A process manager will handle the socket setup, start-up multiple server process
|
|||||||
|
|
||||||
### Built-in
|
### Built-in
|
||||||
|
|
||||||
Uvicorn includes a `--workers` option that allows you to run multiple worker processes.
|
Uvicorn includes a `--workers` option that allows you to run multiple worker instances.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$ uvicorn main:app --workers 4
|
$ uvicorn main:app --workers 4
|
||||||
```
|
```
|
||||||
|
|
||||||
|
The default worker class is `process`. Uvicorn also includes a `thread` worker class:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
$ uvicorn main:app --workers 4 --worker-class thread
|
||||||
|
```
|
||||||
|
|
||||||
|
The `thread` worker class was built specifically for free-threaded Python 3.14 runtimes.
|
||||||
|
It requires a free-threaded build (`Py_GIL_DISABLED=1`) with the GIL disabled at runtime.
|
||||||
|
|
||||||
Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spawn`](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multiprocess manager to still work well on Windows.
|
Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spawn`](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multiprocess manager to still work well on Windows.
|
||||||
|
|
||||||
The default process manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface.
|
The default `process` manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface.
|
||||||
|
|
||||||
|
The `thread` worker class automatically restarts worker threads that exit unexpectedly and uses cooperative healthchecks to replace stale worker threads. Unlike the `process` worker class, it cannot force-kill a hung thread. When a thread fails its healthcheck, Uvicorn starts a replacement thread and lets the previous thread continue draining if it is still running.
|
||||||
|
|
||||||
You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.)
|
You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.)
|
||||||
|
|
||||||
|
|||||||
@ -73,13 +73,21 @@ Using Uvicorn with watchfiles will enable the following options (which are other
|
|||||||
|
|
||||||
## Production
|
## Production
|
||||||
|
|
||||||
* `--workers <int>` - Number of worker processes. Defaults to the `$WEB_CONCURRENCY` environment variable if available, or 1. Not valid with `--reload`.
|
* `--workers <int>` - Number of worker instances. Defaults to the `$WEB_CONCURRENCY` environment variable if available, or 1. Not valid with `--reload`.
|
||||||
|
* `--worker-class [process|thread]` - Worker implementation to use when running multiple workers. `process` is the default. `thread` was built specifically for free-threaded Python 3.14 runtimes and requires `Py_GIL_DISABLED=1` with the GIL disabled at runtime.
|
||||||
* `--env-file <path>` - Environment configuration file for the ASGI application. **Default:** *None*.
|
* `--env-file <path>` - Environment configuration file for the ASGI application. **Default:** *None*.
|
||||||
* `--timeout-worker-healthcheck <int>` - Maximum number of seconds to wait for a worker to respond to a healthcheck. **Default:** *5*.
|
* `--timeout-worker-healthcheck <int>` - Maximum number of seconds to wait for a worker to respond to a healthcheck. **Default:** *5*.
|
||||||
|
|
||||||
!!! note
|
!!! note
|
||||||
The `--reload` and `--workers` arguments are mutually exclusive. You cannot use both at the same time.
|
The `--reload` and `--workers` arguments are mutually exclusive. You cannot use both at the same time.
|
||||||
|
|
||||||
|
!!! note
|
||||||
|
The `thread` worker class was built specifically for free-threaded Python 3.14.
|
||||||
|
It uses cooperative healthchecks to detect and replace stale worker threads.
|
||||||
|
Unlike the `process` worker class, it cannot force-kill a hung thread.
|
||||||
|
When a thread fails its healthcheck, Uvicorn starts a replacement thread and
|
||||||
|
lets the previous thread continue draining if it is still running.
|
||||||
|
|
||||||
## Logging
|
## Logging
|
||||||
|
|
||||||
* `--log-config <path>` - Logging configuration file. **Options:** *`dictConfig()` formats: .json, .yaml*. Any other format will be processed with `fileConfig()`. Set the `formatters.default.use_colors` and `formatters.access.use_colors` values to override the auto-detected behavior.
|
* `--log-config <path>` - Logging configuration file. **Options:** *`dictConfig()` formats: .json, .yaml*. Any other format will be processed with `fileConfig()`. Set the `formatters.default.use_colors` and `formatters.access.use_colors` values to override the auto-detected behavior.
|
||||||
|
|||||||
323
tests/supervisors/test_multithread.py
Normal file
323
tests/supervisors/test_multithread.py
Normal file
@ -0,0 +1,323 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import signal
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from pytest_mock import MockerFixture
|
||||||
|
|
||||||
|
from uvicorn import Config
|
||||||
|
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
|
||||||
|
from uvicorn.server import Server
|
||||||
|
from uvicorn.supervisors.multithread import Multithread, Thread
|
||||||
|
|
||||||
|
|
||||||
|
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||||
|
pass # pragma: no cover
|
||||||
|
|
||||||
|
|
||||||
|
class FakeThread:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: Config,
|
||||||
|
target: Callable[[list[socket.socket] | None], None],
|
||||||
|
sockets: list[socket.socket],
|
||||||
|
) -> None:
|
||||||
|
self.config = config
|
||||||
|
self.target = target
|
||||||
|
self.sockets = sockets
|
||||||
|
self.started = False
|
||||||
|
self.alive = True
|
||||||
|
self.terminated = False
|
||||||
|
self.joined = False
|
||||||
|
self.healthy = True
|
||||||
|
self.join_result = True
|
||||||
|
self.ready_for_healthcheck = True
|
||||||
|
|
||||||
|
def is_alive(self) -> bool:
|
||||||
|
return self.alive
|
||||||
|
|
||||||
|
def is_healthy(self, timeout: float) -> bool:
|
||||||
|
return self.healthy
|
||||||
|
|
||||||
|
def is_ready_for_healthcheck(self) -> bool:
|
||||||
|
return self.ready_for_healthcheck
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
self.started = True
|
||||||
|
|
||||||
|
def terminate(self) -> None:
|
||||||
|
self.terminated = True
|
||||||
|
self.alive = False
|
||||||
|
|
||||||
|
def join(self, timeout: float | None = None) -> bool:
|
||||||
|
self.joined = True
|
||||||
|
if self.join_result:
|
||||||
|
self.alive = False
|
||||||
|
return self.join_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_thread_target_passes_duplicated_sockets() -> None:
|
||||||
|
captured_sockets: list[socket.socket] | None = None
|
||||||
|
|
||||||
|
def target(sockets: list[socket.socket] | None) -> None:
|
||||||
|
nonlocal captured_sockets
|
||||||
|
captured_sockets = sockets
|
||||||
|
|
||||||
|
sock = socket.socket()
|
||||||
|
try:
|
||||||
|
thread = Thread(Config(app=app), target=target, sockets=[sock])
|
||||||
|
thread.target()
|
||||||
|
finally:
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
assert captured_sockets is not None
|
||||||
|
assert len(captured_sockets) == 1
|
||||||
|
assert captured_sockets[0].fileno() == -1
|
||||||
|
|
||||||
|
|
||||||
|
def test_thread_terminate_sets_server_exit_flag() -> None:
|
||||||
|
config = Config(app=app)
|
||||||
|
thread = Thread(config, target=Server(config).run, sockets=[])
|
||||||
|
target = thread._get_target()
|
||||||
|
|
||||||
|
assert thread.server is not None
|
||||||
|
assert isinstance(thread.server, Server)
|
||||||
|
assert target.__self__ is thread.server
|
||||||
|
assert thread.server.should_exit is False
|
||||||
|
assert thread.config.callback_progress is not None
|
||||||
|
assert thread.config.callback_progress.__self__ is thread
|
||||||
|
assert thread.config.callback_progress.__func__ is thread.record_heartbeat.__func__
|
||||||
|
|
||||||
|
thread.terminate()
|
||||||
|
|
||||||
|
assert thread.server.should_exit is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_thread_record_heartbeat_and_is_healthy() -> None:
|
||||||
|
thread = Thread(Config(app=app, timeout_worker_healthcheck=1), target=lambda sockets: None, sockets=[])
|
||||||
|
thread.last_heartbeat -= 5
|
||||||
|
|
||||||
|
assert thread.is_healthy(1) is False
|
||||||
|
|
||||||
|
thread.record_heartbeat()
|
||||||
|
|
||||||
|
assert thread.is_healthy(1) is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_server_progress_callback_records_heartbeat_on_tick() -> None:
|
||||||
|
thread = Thread(Config(app=app), target=lambda sockets: None, sockets=[])
|
||||||
|
config = Config(app=app, callback_progress=thread.record_heartbeat)
|
||||||
|
server = Server(config=config)
|
||||||
|
before = thread.last_heartbeat
|
||||||
|
|
||||||
|
thread.last_heartbeat -= 5
|
||||||
|
await server.on_tick(10)
|
||||||
|
|
||||||
|
assert thread.last_heartbeat > before
|
||||||
|
|
||||||
|
|
||||||
|
def test_thread_start_and_join() -> None:
|
||||||
|
finished = threading.Event()
|
||||||
|
|
||||||
|
def target(sockets: list[socket.socket] | None) -> None:
|
||||||
|
finished.set()
|
||||||
|
|
||||||
|
thread = Thread(Config(app=app), target=target, sockets=[])
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
assert thread.join() is True
|
||||||
|
assert finished.is_set()
|
||||||
|
assert thread.is_alive() is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_thread_join_timeout_returns_false_for_hung_thread() -> None:
|
||||||
|
blocker = threading.Event()
|
||||||
|
|
||||||
|
def target(sockets: list[socket.socket] | None) -> None:
|
||||||
|
blocker.wait()
|
||||||
|
|
||||||
|
thread = Thread(Config(app=app), target=target, sockets=[])
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
assert thread.join(timeout=0.01) is False
|
||||||
|
assert thread.is_alive() is True
|
||||||
|
finally:
|
||||||
|
blocker.set()
|
||||||
|
assert thread.join(timeout=1) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_init_terminate_join_and_restart(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
|
||||||
|
|
||||||
|
supervisor.init_threads()
|
||||||
|
original_threads = list(supervisor.threads)
|
||||||
|
|
||||||
|
assert len(supervisor.threads) == 2
|
||||||
|
assert all(thread.started for thread in supervisor.threads)
|
||||||
|
|
||||||
|
supervisor.terminate_all()
|
||||||
|
assert all(thread.terminated for thread in original_threads)
|
||||||
|
|
||||||
|
supervisor.join_all()
|
||||||
|
assert all(thread.joined for thread in original_threads)
|
||||||
|
|
||||||
|
supervisor.restart_all()
|
||||||
|
assert len(supervisor.threads) == 2
|
||||||
|
assert all(thread is not old for thread, old in zip(supervisor.threads, original_threads))
|
||||||
|
assert all(thread.started for thread in supervisor.threads)
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_keep_subthread_alive_replaces_dead_thread(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.init_threads()
|
||||||
|
|
||||||
|
dead_thread = supervisor.threads[0]
|
||||||
|
dead_thread.alive = False
|
||||||
|
|
||||||
|
supervisor.keep_subthread_alive()
|
||||||
|
|
||||||
|
assert supervisor.threads[0] is not dead_thread
|
||||||
|
assert supervisor.threads[0].started is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_keep_subthread_alive_replaces_unhealthy_thread(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.init_threads()
|
||||||
|
|
||||||
|
unhealthy_thread = supervisor.threads[0]
|
||||||
|
unhealthy_thread.healthy = False
|
||||||
|
|
||||||
|
supervisor.keep_subthread_alive()
|
||||||
|
|
||||||
|
assert supervisor.threads[0] is not unhealthy_thread
|
||||||
|
assert supervisor.threads[0].started is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_keep_subthread_alive_skips_healthcheck_until_ready(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.init_threads()
|
||||||
|
|
||||||
|
thread = supervisor.threads[0]
|
||||||
|
thread.healthy = False
|
||||||
|
thread.ready_for_healthcheck = False
|
||||||
|
|
||||||
|
supervisor.keep_subthread_alive()
|
||||||
|
|
||||||
|
assert supervisor.threads[0] is thread
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_keep_subthread_alive_replaces_unhealthy_thread_without_blocking_join(
|
||||||
|
mocker: MockerFixture,
|
||||||
|
) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.init_threads()
|
||||||
|
|
||||||
|
unhealthy_thread = supervisor.threads[0]
|
||||||
|
unhealthy_thread.healthy = False
|
||||||
|
unhealthy_thread.join_result = False
|
||||||
|
|
||||||
|
supervisor.keep_subthread_alive()
|
||||||
|
|
||||||
|
assert supervisor.threads[0] is not unhealthy_thread
|
||||||
|
assert unhealthy_thread in supervisor.stale_threads
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_keep_subthread_alive_noop_when_exiting(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.init_threads()
|
||||||
|
dead_thread = supervisor.threads[0]
|
||||||
|
dead_thread.alive = False
|
||||||
|
supervisor.should_exit.set()
|
||||||
|
|
||||||
|
supervisor.keep_subthread_alive()
|
||||||
|
|
||||||
|
assert supervisor.threads[0] is dead_thread
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_signal_handlers(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.init_threads()
|
||||||
|
|
||||||
|
supervisor.handle_ttin()
|
||||||
|
assert len(supervisor.threads) == 3
|
||||||
|
|
||||||
|
removed_thread = supervisor.threads[-1]
|
||||||
|
supervisor.handle_ttou()
|
||||||
|
assert len(supervisor.threads) == 2
|
||||||
|
assert removed_thread.terminated is True
|
||||||
|
assert removed_thread.joined is True
|
||||||
|
|
||||||
|
supervisor.handle_ttou()
|
||||||
|
supervisor.handle_ttou()
|
||||||
|
assert len(supervisor.threads) == 1
|
||||||
|
|
||||||
|
original_threads = list(supervisor.threads)
|
||||||
|
supervisor.handle_hup()
|
||||||
|
assert len(supervisor.threads) == 1
|
||||||
|
assert supervisor.threads[0] is not original_threads[0]
|
||||||
|
|
||||||
|
supervisor.handle_term()
|
||||||
|
assert supervisor.should_exit.is_set()
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_join_all_uses_timeout_and_warns(mocker: MockerFixture, caplog: pytest.LogCaptureFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(
|
||||||
|
Config(app=app, workers=1, timeout_worker_healthcheck=2, timeout_graceful_shutdown=3),
|
||||||
|
target=lambda sockets: None,
|
||||||
|
sockets=[],
|
||||||
|
)
|
||||||
|
supervisor.init_threads()
|
||||||
|
thread = supervisor.threads[0]
|
||||||
|
thread.join_result = False
|
||||||
|
|
||||||
|
supervisor.join_all()
|
||||||
|
|
||||||
|
assert thread.joined is True
|
||||||
|
assert "Worker thread did not exit within 3.00 seconds." in caplog.records[-1].message
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_thread_shutdown_timeout_defaults_to_healthcheck(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(
|
||||||
|
Config(app=app, workers=1, timeout_worker_healthcheck=7),
|
||||||
|
target=lambda sockets: None,
|
||||||
|
sockets=[],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert supervisor._thread_shutdown_timeout == 7.0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK")
|
||||||
|
def test_multithread_handle_break() -> None: # pragma: py-not-win32
|
||||||
|
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.handle_break()
|
||||||
|
assert supervisor.should_exit.is_set()
|
||||||
|
|
||||||
|
|
||||||
|
def test_multithread_handle_signals_and_run(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||||
|
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.signal_queue.extend([signal.SIGINT, signal.SIGUSR1])
|
||||||
|
|
||||||
|
supervisor.handle_signals()
|
||||||
|
assert supervisor.should_exit.is_set()
|
||||||
|
assert supervisor.signal_queue == []
|
||||||
|
|
||||||
|
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||||
|
supervisor.signal_queue.append(signal.SIGINT)
|
||||||
|
supervisor.run()
|
||||||
|
|
||||||
|
assert supervisor.should_exit.is_set()
|
||||||
@ -15,7 +15,7 @@ import uvicorn
|
|||||||
from uvicorn.config import Config
|
from uvicorn.config import Config
|
||||||
from uvicorn.main import main as cli
|
from uvicorn.main import main as cli
|
||||||
from uvicorn.server import Server
|
from uvicorn.server import Server
|
||||||
from uvicorn.supervisors import ChangeReload, Multiprocess
|
from uvicorn.supervisors import ChangeReload, Multiprocess, Multithread
|
||||||
|
|
||||||
HEADERS = "Content-Security-Policy:default-src 'self'; script-src https://example.com"
|
HEADERS = "Content-Security-Policy:default-src 'self'; script-src https://example.com"
|
||||||
main = importlib.import_module("uvicorn.main")
|
main = importlib.import_module("uvicorn.main")
|
||||||
@ -101,6 +101,19 @@ def test_cli_call_multiprocess_run() -> None:
|
|||||||
mock_run.assert_called_once()
|
mock_run.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
def test_cli_call_multithread_run() -> None:
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
with mock.patch("uvicorn.config.is_free_threaded_runtime", return_value=True):
|
||||||
|
with mock.patch.object(Config, "bind_socket") as mock_bind_socket:
|
||||||
|
with mock.patch.object(Multithread, "run") as mock_run:
|
||||||
|
result = runner.invoke(cli, ["tests.test_cli:App", "--workers=2", "--worker-class=thread"])
|
||||||
|
|
||||||
|
assert result.exit_code == 0
|
||||||
|
mock_bind_socket.assert_called_once()
|
||||||
|
mock_run.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(params=(True, False))
|
@pytest.fixture(params=(True, False))
|
||||||
def uds_file(tmp_path: Path, request: pytest.FixtureRequest) -> Path: # pragma: py-win32
|
def uds_file(tmp_path: Path, request: pytest.FixtureRequest) -> Path: # pragma: py-win32
|
||||||
file = tmp_path / "uvicorn.sock"
|
file = tmp_path / "uvicorn.sock"
|
||||||
@ -125,6 +138,15 @@ def test_cli_uds(uds_file: Path) -> None: # pragma: py-win32
|
|||||||
assert not uds_file.exists()
|
assert not uds_file.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_cli_thread_worker_class_requires_free_threaded_runtime() -> None:
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
result = runner.invoke(cli, ["tests.test_cli:App", "--workers=2", "--worker-class=thread"])
|
||||||
|
|
||||||
|
assert result.exit_code == 1
|
||||||
|
assert 'Worker class "thread" requires a free-threaded Python 3.14 runtime' in str(result.exception)
|
||||||
|
|
||||||
|
|
||||||
def test_cli_incomplete_app_parameter() -> None:
|
def test_cli_incomplete_app_parameter() -> None:
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
|
|
||||||
|
|||||||
@ -536,12 +536,24 @@ def test_config_use_subprocess(reload: bool, workers: int, expected: bool):
|
|||||||
assert config.use_subprocess == expected
|
assert config.use_subprocess == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_config_use_subprocess_with_thread_worker_class(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.config.is_free_threaded_runtime", return_value=True)
|
||||||
|
config = Config(app=asgi_app, workers=2, worker_class="thread")
|
||||||
|
config.load()
|
||||||
|
assert config.use_subprocess is False
|
||||||
|
|
||||||
|
|
||||||
def test_warn_when_using_reload_and_workers(caplog: pytest.LogCaptureFixture) -> None:
|
def test_warn_when_using_reload_and_workers(caplog: pytest.LogCaptureFixture) -> None:
|
||||||
Config(app=asgi_app, reload=True, workers=2)
|
Config(app=asgi_app, reload=True, workers=2)
|
||||||
assert len(caplog.records) == 1
|
assert len(caplog.records) == 1
|
||||||
assert '"workers" flag is ignored when reloading is enabled.' in caplog.records[0].message
|
assert '"workers" flag is ignored when reloading is enabled.' in caplog.records[0].message
|
||||||
|
|
||||||
|
|
||||||
|
def test_thread_worker_class_requires_free_threaded_runtime() -> None:
|
||||||
|
with pytest.raises(ValueError, match='Worker class "thread" requires a free-threaded Python 3.14 runtime'):
|
||||||
|
Config(app=asgi_app, worker_class="thread")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
("loop_type", "expected_loop_factory"),
|
("loop_type", "expected_loop_factory"),
|
||||||
[
|
[
|
||||||
|
|||||||
@ -5,6 +5,7 @@ from logging import WARNING
|
|||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
import pytest
|
import pytest
|
||||||
|
from pytest_mock import MockerFixture
|
||||||
|
|
||||||
import uvicorn.server
|
import uvicorn.server
|
||||||
from tests.utils import run_server
|
from tests.utils import run_server
|
||||||
@ -12,6 +13,7 @@ from uvicorn import Server
|
|||||||
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
|
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
|
||||||
from uvicorn.config import Config
|
from uvicorn.config import Config
|
||||||
from uvicorn.main import run
|
from uvicorn.main import run
|
||||||
|
from uvicorn.supervisors import Multithread
|
||||||
|
|
||||||
pytestmark = pytest.mark.anyio
|
pytestmark = pytest.mark.anyio
|
||||||
|
|
||||||
@ -85,6 +87,22 @@ def test_run_invalid_app_config_combination(caplog: pytest.LogCaptureFixture) ->
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_invalid_thread_worker_class_config() -> None:
|
||||||
|
with pytest.raises(ValueError, match='Worker class "thread" requires a free-threaded Python 3.14 runtime'):
|
||||||
|
run("tests.test_main:app", workers=2, worker_class="thread")
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_multithread(mocker: MockerFixture) -> None:
|
||||||
|
mocker.patch("uvicorn.config.is_free_threaded_runtime", return_value=True)
|
||||||
|
mock_bind_socket = mocker.patch.object(Config, "bind_socket")
|
||||||
|
mock_run = mocker.patch.object(Multithread, "run")
|
||||||
|
|
||||||
|
run("tests.test_main:app", workers=2, worker_class="thread")
|
||||||
|
|
||||||
|
mock_bind_socket.assert_called_once()
|
||||||
|
mock_run.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
def test_run_startup_failure(caplog: pytest.LogCaptureFixture) -> None:
|
def test_run_startup_failure(caplog: pytest.LogCaptureFixture) -> None:
|
||||||
async def app(scope, receive, send):
|
async def app(scope, receive, send):
|
||||||
assert scope["type"] == "lifespan"
|
assert scope["type"] == "lifespan"
|
||||||
@ -101,7 +119,7 @@ def test_run_match_config_params() -> None:
|
|||||||
config_params = {
|
config_params = {
|
||||||
key: repr(value)
|
key: repr(value)
|
||||||
for key, value in inspect.signature(Config.__init__).parameters.items()
|
for key, value in inspect.signature(Config.__init__).parameters.items()
|
||||||
if key not in ("self", "timeout_notify", "callback_notify")
|
if key not in ("self", "timeout_notify", "callback_notify", "callback_progress")
|
||||||
}
|
}
|
||||||
run_params = {
|
run_params = {
|
||||||
key: repr(value) for key, value in inspect.signature(run).parameters.items() if key not in ("app_dir",)
|
key: repr(value) for key, value in inspect.signature(run).parameters.items() if key not in ("app_dir",)
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import os
|
|||||||
import socket
|
import socket
|
||||||
import ssl
|
import ssl
|
||||||
import sys
|
import sys
|
||||||
|
import sysconfig
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import Awaitable, Callable
|
||||||
from configparser import RawConfigParser
|
from configparser import RawConfigParser
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@ -30,6 +31,7 @@ WSProtocolType = Literal["auto", "none", "websockets", "websockets-sansio", "wsp
|
|||||||
LifespanType = Literal["auto", "on", "off"]
|
LifespanType = Literal["auto", "on", "off"]
|
||||||
LoopFactoryType = Literal["none", "auto", "asyncio", "uvloop"]
|
LoopFactoryType = Literal["none", "auto", "asyncio", "uvloop"]
|
||||||
InterfaceType = Literal["auto", "asgi3", "asgi2", "wsgi"]
|
InterfaceType = Literal["auto", "asgi3", "asgi2", "wsgi"]
|
||||||
|
WorkerClassType = Literal["process", "thread"]
|
||||||
|
|
||||||
LOG_LEVELS: dict[str, int] = {
|
LOG_LEVELS: dict[str, int] = {
|
||||||
"critical": logging.CRITICAL,
|
"critical": logging.CRITICAL,
|
||||||
@ -63,6 +65,7 @@ LOOP_FACTORIES: dict[str, str | None] = {
|
|||||||
"uvloop": "uvicorn.loops.uvloop:uvloop_loop_factory",
|
"uvloop": "uvicorn.loops.uvloop:uvloop_loop_factory",
|
||||||
}
|
}
|
||||||
INTERFACES: list[InterfaceType] = ["auto", "asgi3", "asgi2", "wsgi"]
|
INTERFACES: list[InterfaceType] = ["auto", "asgi3", "asgi2", "wsgi"]
|
||||||
|
WORKER_CLASSES: list[WorkerClassType] = ["process", "thread"]
|
||||||
|
|
||||||
SSL_PROTOCOL_VERSION: int = ssl.PROTOCOL_TLS_SERVER
|
SSL_PROTOCOL_VERSION: int = ssl.PROTOCOL_TLS_SERVER
|
||||||
|
|
||||||
@ -175,6 +178,21 @@ def _normalize_dirs(dirs: list[str] | str | None) -> list[str]:
|
|||||||
return list(set(dirs))
|
return list(set(dirs))
|
||||||
|
|
||||||
|
|
||||||
|
def is_free_threaded_runtime() -> bool:
|
||||||
|
# CPython documents these as separate checks:
|
||||||
|
# - `sysconfig.get_config_var("Py_GIL_DISABLED")` for whether the build supports free threading
|
||||||
|
# - `sys._is_gil_enabled()` for whether the GIL is actually disabled in the running process
|
||||||
|
# https://docs.python.org/3/howto/free-threading-python.html#identifying-free-threaded-python
|
||||||
|
if sys.version_info < (3, 14):
|
||||||
|
return False
|
||||||
|
if sysconfig.get_config_var("Py_GIL_DISABLED") != 1:
|
||||||
|
return False
|
||||||
|
is_gil_enabled = getattr(sys, "_is_gil_enabled", None)
|
||||||
|
if is_gil_enabled is None:
|
||||||
|
return True
|
||||||
|
return not is_gil_enabled()
|
||||||
|
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -204,6 +222,7 @@ class Config:
|
|||||||
reload_includes: list[str] | str | None = None,
|
reload_includes: list[str] | str | None = None,
|
||||||
reload_excludes: list[str] | str | None = None,
|
reload_excludes: list[str] | str | None = None,
|
||||||
workers: int | None = None,
|
workers: int | None = None,
|
||||||
|
worker_class: WorkerClassType = "process",
|
||||||
proxy_headers: bool = True,
|
proxy_headers: bool = True,
|
||||||
server_header: bool = True,
|
server_header: bool = True,
|
||||||
date_header: bool = True,
|
date_header: bool = True,
|
||||||
@ -218,6 +237,7 @@ class Config:
|
|||||||
timeout_graceful_shutdown: int | None = None,
|
timeout_graceful_shutdown: int | None = None,
|
||||||
timeout_worker_healthcheck: int = 5,
|
timeout_worker_healthcheck: int = 5,
|
||||||
callback_notify: Callable[..., Awaitable[None]] | None = None,
|
callback_notify: Callable[..., Awaitable[None]] | None = None,
|
||||||
|
callback_progress: Callable[[], None] | None = None,
|
||||||
ssl_keyfile: str | os.PathLike[str] | None = None,
|
ssl_keyfile: str | os.PathLike[str] | None = None,
|
||||||
ssl_certfile: str | os.PathLike[str] | None = None,
|
ssl_certfile: str | os.PathLike[str] | None = None,
|
||||||
ssl_keyfile_password: str | None = None,
|
ssl_keyfile_password: str | None = None,
|
||||||
@ -251,6 +271,7 @@ class Config:
|
|||||||
self.reload = reload
|
self.reload = reload
|
||||||
self.reload_delay = reload_delay
|
self.reload_delay = reload_delay
|
||||||
self.workers = workers or 1
|
self.workers = workers or 1
|
||||||
|
self.worker_class = worker_class
|
||||||
self.proxy_headers = proxy_headers
|
self.proxy_headers = proxy_headers
|
||||||
self.server_header = server_header
|
self.server_header = server_header
|
||||||
self.date_header = date_header
|
self.date_header = date_header
|
||||||
@ -264,6 +285,7 @@ class Config:
|
|||||||
self.timeout_graceful_shutdown = timeout_graceful_shutdown
|
self.timeout_graceful_shutdown = timeout_graceful_shutdown
|
||||||
self.timeout_worker_healthcheck = timeout_worker_healthcheck
|
self.timeout_worker_healthcheck = timeout_worker_healthcheck
|
||||||
self.callback_notify = callback_notify
|
self.callback_notify = callback_notify
|
||||||
|
self.callback_progress = callback_progress
|
||||||
self.ssl_keyfile = ssl_keyfile
|
self.ssl_keyfile = ssl_keyfile
|
||||||
self.ssl_certfile = ssl_certfile
|
self.ssl_certfile = ssl_certfile
|
||||||
self.ssl_keyfile_password = ssl_keyfile_password
|
self.ssl_keyfile_password = ssl_keyfile_password
|
||||||
@ -344,6 +366,11 @@ class Config:
|
|||||||
if self.reload and self.workers > 1:
|
if self.reload and self.workers > 1:
|
||||||
logger.warning('"workers" flag is ignored when reloading is enabled.')
|
logger.warning('"workers" flag is ignored when reloading is enabled.')
|
||||||
|
|
||||||
|
if self.worker_class == "thread" and not is_free_threaded_runtime():
|
||||||
|
raise ValueError(
|
||||||
|
'Worker class "thread" requires a free-threaded Python 3.14 runtime (Py_GIL_DISABLED=1, GIL off).'
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def asgi_version(self) -> Literal["2.0", "3.0"]:
|
def asgi_version(self) -> Literal["2.0", "3.0"]:
|
||||||
mapping: dict[str, Literal["2.0", "3.0"]] = {
|
mapping: dict[str, Literal["2.0", "3.0"]] = {
|
||||||
@ -359,7 +386,7 @@ class Config:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def use_subprocess(self) -> bool:
|
def use_subprocess(self) -> bool:
|
||||||
return bool(self.reload or self.workers > 1)
|
return bool(self.reload or (self.workers > 1 and self.worker_class == "process"))
|
||||||
|
|
||||||
def configure_logging(self) -> None:
|
def configure_logging(self) -> None:
|
||||||
logging.addLevelName(TRACE_LOG_LEVEL, "TRACE")
|
logging.addLevelName(TRACE_LOG_LEVEL, "TRACE")
|
||||||
|
|||||||
@ -21,19 +21,22 @@ from uvicorn.config import (
|
|||||||
LOG_LEVELS,
|
LOG_LEVELS,
|
||||||
LOGGING_CONFIG,
|
LOGGING_CONFIG,
|
||||||
SSL_PROTOCOL_VERSION,
|
SSL_PROTOCOL_VERSION,
|
||||||
|
WORKER_CLASSES,
|
||||||
Config,
|
Config,
|
||||||
HTTPProtocolType,
|
HTTPProtocolType,
|
||||||
InterfaceType,
|
InterfaceType,
|
||||||
LifespanType,
|
LifespanType,
|
||||||
LoopFactoryType,
|
LoopFactoryType,
|
||||||
|
WorkerClassType,
|
||||||
WSProtocolType,
|
WSProtocolType,
|
||||||
)
|
)
|
||||||
from uvicorn.server import Server
|
from uvicorn.server import Server
|
||||||
from uvicorn.supervisors import ChangeReload, Multiprocess
|
from uvicorn.supervisors import ChangeReload, Multiprocess, Multithread
|
||||||
|
|
||||||
LEVEL_CHOICES = click.Choice(list(LOG_LEVELS.keys()))
|
LEVEL_CHOICES = click.Choice(list(LOG_LEVELS.keys()))
|
||||||
LIFESPAN_CHOICES = click.Choice(list(LIFESPAN.keys()))
|
LIFESPAN_CHOICES = click.Choice(list(LIFESPAN.keys()))
|
||||||
INTERFACE_CHOICES = click.Choice(INTERFACES)
|
INTERFACE_CHOICES = click.Choice(INTERFACES)
|
||||||
|
WORKER_CLASS_CHOICES = click.Choice(WORKER_CLASSES)
|
||||||
|
|
||||||
|
|
||||||
def _metavar_from_type(_type: Any) -> str:
|
def _metavar_from_type(_type: Any) -> str:
|
||||||
@ -113,9 +116,16 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
|
|||||||
"--workers",
|
"--workers",
|
||||||
default=None,
|
default=None,
|
||||||
type=int,
|
type=int,
|
||||||
help="Number of worker processes. Defaults to the $WEB_CONCURRENCY environment"
|
help="Number of worker instances. Defaults to the $WEB_CONCURRENCY environment"
|
||||||
" variable if available, or 1. Not valid with --reload.",
|
" variable if available, or 1. Not valid with --reload.",
|
||||||
)
|
)
|
||||||
|
@click.option(
|
||||||
|
"--worker-class",
|
||||||
|
type=WORKER_CLASS_CHOICES,
|
||||||
|
default="process",
|
||||||
|
help="Worker implementation to use when running multiple workers.",
|
||||||
|
show_default=True,
|
||||||
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--loop",
|
"--loop",
|
||||||
type=str,
|
type=str,
|
||||||
@ -401,6 +411,7 @@ def main(
|
|||||||
reload_excludes: list[str],
|
reload_excludes: list[str],
|
||||||
reload_delay: float,
|
reload_delay: float,
|
||||||
workers: int,
|
workers: int,
|
||||||
|
worker_class: WorkerClassType,
|
||||||
env_file: str,
|
env_file: str,
|
||||||
log_config: str,
|
log_config: str,
|
||||||
log_level: str,
|
log_level: str,
|
||||||
@ -456,6 +467,7 @@ def main(
|
|||||||
reload_excludes=reload_excludes or None,
|
reload_excludes=reload_excludes or None,
|
||||||
reload_delay=reload_delay,
|
reload_delay=reload_delay,
|
||||||
workers=workers,
|
workers=workers,
|
||||||
|
worker_class=worker_class,
|
||||||
proxy_headers=proxy_headers,
|
proxy_headers=proxy_headers,
|
||||||
server_header=server_header,
|
server_header=server_header,
|
||||||
date_header=date_header,
|
date_header=date_header,
|
||||||
@ -506,6 +518,7 @@ def run(
|
|||||||
reload_excludes: list[str] | str | None = None,
|
reload_excludes: list[str] | str | None = None,
|
||||||
reload_delay: float = 0.25,
|
reload_delay: float = 0.25,
|
||||||
workers: int | None = None,
|
workers: int | None = None,
|
||||||
|
worker_class: WorkerClassType = "process",
|
||||||
env_file: str | os.PathLike[str] | None = None,
|
env_file: str | os.PathLike[str] | None = None,
|
||||||
log_config: dict[str, Any] | str | RawConfigParser | IO[Any] | None = LOGGING_CONFIG,
|
log_config: dict[str, Any] | str | RawConfigParser | IO[Any] | None = LOGGING_CONFIG,
|
||||||
log_level: str | int | None = None,
|
log_level: str | int | None = None,
|
||||||
@ -560,6 +573,7 @@ def run(
|
|||||||
reload_excludes=reload_excludes,
|
reload_excludes=reload_excludes,
|
||||||
reload_delay=reload_delay,
|
reload_delay=reload_delay,
|
||||||
workers=workers,
|
workers=workers,
|
||||||
|
worker_class=worker_class,
|
||||||
env_file=env_file,
|
env_file=env_file,
|
||||||
log_config=log_config,
|
log_config=log_config,
|
||||||
log_level=log_level,
|
log_level=log_level,
|
||||||
@ -601,7 +615,10 @@ def run(
|
|||||||
ChangeReload(config, target=server.run, sockets=[sock]).run()
|
ChangeReload(config, target=server.run, sockets=[sock]).run()
|
||||||
elif config.workers > 1:
|
elif config.workers > 1:
|
||||||
sock = config.bind_socket()
|
sock = config.bind_socket()
|
||||||
Multiprocess(config, target=server.run, sockets=[sock]).run()
|
if config.worker_class == "process":
|
||||||
|
Multiprocess(config, target=server.run, sockets=[sock]).run()
|
||||||
|
else:
|
||||||
|
Multithread(config, target=server.run, sockets=[sock]).run()
|
||||||
else:
|
else:
|
||||||
server.run()
|
server.run()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
|||||||
@ -241,6 +241,9 @@ class Server:
|
|||||||
async def on_tick(self, counter: int) -> bool:
|
async def on_tick(self, counter: int) -> bool:
|
||||||
# Update the default headers, once per second.
|
# Update the default headers, once per second.
|
||||||
if counter % 10 == 0:
|
if counter % 10 == 0:
|
||||||
|
if self.config.callback_progress is not None:
|
||||||
|
self.config.callback_progress()
|
||||||
|
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
current_date = formatdate(current_time, usegmt=True).encode()
|
current_date = formatdate(current_time, usegmt=True).encode()
|
||||||
|
|
||||||
|
|||||||
@ -4,6 +4,7 @@ from typing import TYPE_CHECKING
|
|||||||
|
|
||||||
from uvicorn.supervisors.basereload import BaseReload
|
from uvicorn.supervisors.basereload import BaseReload
|
||||||
from uvicorn.supervisors.multiprocess import Multiprocess
|
from uvicorn.supervisors.multiprocess import Multiprocess
|
||||||
|
from uvicorn.supervisors.multithread import Multithread
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
ChangeReload: type[BaseReload]
|
ChangeReload: type[BaseReload]
|
||||||
@ -13,4 +14,4 @@ else:
|
|||||||
except ImportError: # pragma: no cover
|
except ImportError: # pragma: no cover
|
||||||
from uvicorn.supervisors.statreload import StatReload as ChangeReload
|
from uvicorn.supervisors.statreload import StatReload as ChangeReload
|
||||||
|
|
||||||
__all__ = ["Multiprocess", "ChangeReload"]
|
__all__ = ["Multiprocess", "Multithread", "ChangeReload"]
|
||||||
|
|||||||
221
uvicorn/supervisors/multithread.py
Normal file
221
uvicorn/supervisors/multithread.py
Normal file
@ -0,0 +1,221 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import inspect
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from collections.abc import Callable
|
||||||
|
from socket import socket
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import click
|
||||||
|
|
||||||
|
from uvicorn.config import Config
|
||||||
|
from uvicorn.server import Server
|
||||||
|
from uvicorn.supervisors.multiprocess import SIGNALS
|
||||||
|
|
||||||
|
logger = logging.getLogger("uvicorn.error")
|
||||||
|
|
||||||
|
|
||||||
|
class Thread:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: Config,
|
||||||
|
target: Callable[[list[socket] | None], None],
|
||||||
|
sockets: list[socket],
|
||||||
|
) -> None:
|
||||||
|
self.config = copy.copy(config)
|
||||||
|
self.real_target = target
|
||||||
|
self.sockets = sockets
|
||||||
|
self.server: Server | None = None
|
||||||
|
self.last_heartbeat = time.monotonic()
|
||||||
|
self.thread = threading.Thread(target=self.target, daemon=True)
|
||||||
|
|
||||||
|
def _get_target(self) -> Callable[[list[socket] | None], None]:
|
||||||
|
if inspect.ismethod(self.real_target) and isinstance(self.real_target.__self__, Server):
|
||||||
|
self.config.callback_progress = self.record_heartbeat
|
||||||
|
self.server = Server(config=self.config)
|
||||||
|
return self.server.run
|
||||||
|
return self.real_target
|
||||||
|
|
||||||
|
def record_heartbeat(self) -> None:
|
||||||
|
self.last_heartbeat = time.monotonic()
|
||||||
|
|
||||||
|
def target(self, sockets: list[socket] | None = None) -> Any:
|
||||||
|
sockets = [sock.dup() for sock in self.sockets]
|
||||||
|
try:
|
||||||
|
return self._get_target()(sockets)
|
||||||
|
finally:
|
||||||
|
for sock in sockets:
|
||||||
|
if sock.fileno() != -1:
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
def is_alive(self) -> bool:
|
||||||
|
return self.thread.is_alive()
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def terminate(self) -> None:
|
||||||
|
if self.server is not None:
|
||||||
|
self.server.should_exit = True
|
||||||
|
|
||||||
|
def join(self, timeout: float | None = None) -> bool:
|
||||||
|
self.thread.join(timeout=timeout)
|
||||||
|
return not self.is_alive()
|
||||||
|
|
||||||
|
def is_healthy(self, timeout: float) -> bool:
|
||||||
|
return time.monotonic() - self.last_heartbeat <= timeout
|
||||||
|
|
||||||
|
def is_ready_for_healthcheck(self) -> bool:
|
||||||
|
return self.server is None or self.server.started
|
||||||
|
|
||||||
|
|
||||||
|
class Multithread:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: Config,
|
||||||
|
target: Callable[[list[socket] | None], None],
|
||||||
|
sockets: list[socket],
|
||||||
|
) -> None:
|
||||||
|
self.config = config
|
||||||
|
self.target = target
|
||||||
|
self.sockets = sockets
|
||||||
|
|
||||||
|
self.threads_num = config.workers
|
||||||
|
self.threads: list[Thread] = []
|
||||||
|
self.stale_threads: list[Thread] = []
|
||||||
|
|
||||||
|
self.should_exit = threading.Event()
|
||||||
|
|
||||||
|
self.signal_queue: list[int] = []
|
||||||
|
for sig in SIGNALS:
|
||||||
|
signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig))
|
||||||
|
|
||||||
|
def init_threads(self) -> None:
|
||||||
|
for _ in range(self.threads_num):
|
||||||
|
self.threads.append(self._start_thread())
|
||||||
|
|
||||||
|
def terminate_all(self) -> None:
|
||||||
|
for thread in self._all_threads():
|
||||||
|
thread.terminate()
|
||||||
|
|
||||||
|
def join_all(self) -> None:
|
||||||
|
timeout = self._thread_shutdown_timeout
|
||||||
|
for thread in self._all_threads():
|
||||||
|
joined = thread.join(timeout=timeout)
|
||||||
|
if not joined:
|
||||||
|
logger.warning("Worker thread did not exit within %.2f seconds.", timeout)
|
||||||
|
|
||||||
|
def restart_all(self) -> None:
|
||||||
|
for idx, thread in enumerate(self.threads):
|
||||||
|
self._replace_thread(idx, thread, reason="Worker thread restarted")
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
message = f"Started parent process [{os.getpid()}]"
|
||||||
|
color_message = "Started parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True))
|
||||||
|
logger.info(message, extra={"color_message": color_message})
|
||||||
|
|
||||||
|
self.init_threads()
|
||||||
|
|
||||||
|
while not self.should_exit.wait(0.5):
|
||||||
|
self.handle_signals()
|
||||||
|
self.keep_subthread_alive()
|
||||||
|
|
||||||
|
self.terminate_all()
|
||||||
|
self.join_all()
|
||||||
|
|
||||||
|
message = f"Stopping parent process [{os.getpid()}]"
|
||||||
|
color_message = "Stopping parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True))
|
||||||
|
logger.info(message, extra={"color_message": color_message})
|
||||||
|
|
||||||
|
def keep_subthread_alive(self) -> None:
|
||||||
|
if self.should_exit.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
|
for idx, thread in enumerate(self.threads):
|
||||||
|
if self.should_exit.is_set(): # pragma: no cover
|
||||||
|
return
|
||||||
|
|
||||||
|
if not thread.is_alive():
|
||||||
|
self._replace_thread(idx, thread, reason="Child thread died")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not thread.is_ready_for_healthcheck():
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not thread.is_healthy(timeout=self.config.timeout_worker_healthcheck):
|
||||||
|
self._replace_thread(idx, thread, reason="Worker thread failed healthcheck")
|
||||||
|
|
||||||
|
def handle_signals(self) -> None:
|
||||||
|
for sig in tuple(self.signal_queue):
|
||||||
|
self.signal_queue.remove(sig)
|
||||||
|
sig_name = SIGNALS[sig]
|
||||||
|
sig_handler = getattr(self, f"handle_{sig_name.lower()}", None)
|
||||||
|
if sig_handler is not None:
|
||||||
|
sig_handler()
|
||||||
|
else: # pragma: no cover
|
||||||
|
logger.debug(f"Received signal {sig_name}, but no handler is defined for it.")
|
||||||
|
|
||||||
|
def handle_int(self) -> None:
|
||||||
|
logger.info("Received SIGINT, exiting.")
|
||||||
|
self.should_exit.set()
|
||||||
|
|
||||||
|
def handle_term(self) -> None:
|
||||||
|
logger.info("Received SIGTERM, exiting.")
|
||||||
|
self.should_exit.set()
|
||||||
|
|
||||||
|
def handle_break(self) -> None: # pragma: py-not-win32
|
||||||
|
logger.info("Received SIGBREAK, exiting.")
|
||||||
|
self.should_exit.set()
|
||||||
|
|
||||||
|
def handle_hup(self) -> None: # pragma: py-win32
|
||||||
|
logger.info("Received SIGHUP, restarting threads.")
|
||||||
|
self.restart_all()
|
||||||
|
|
||||||
|
def handle_ttin(self) -> None: # pragma: py-win32
|
||||||
|
logger.info("Received SIGTTIN, increasing the number of threads.")
|
||||||
|
self.threads_num += 1
|
||||||
|
self.threads.append(self._start_thread())
|
||||||
|
|
||||||
|
def handle_ttou(self) -> None: # pragma: py-win32
|
||||||
|
logger.info("Received SIGTTOU, decreasing number of threads.")
|
||||||
|
if self.threads_num <= 1:
|
||||||
|
logger.info("Already reached one thread, cannot decrease the number of threads anymore.")
|
||||||
|
return
|
||||||
|
self.threads_num -= 1
|
||||||
|
thread = self.threads.pop()
|
||||||
|
thread.terminate()
|
||||||
|
if not thread.join(timeout=self._thread_shutdown_timeout):
|
||||||
|
logger.warning("Worker thread did not exit within %.2f seconds.", self._thread_shutdown_timeout)
|
||||||
|
self.stale_threads.append(thread)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _thread_shutdown_timeout(self) -> float:
|
||||||
|
if self.config.timeout_graceful_shutdown is not None:
|
||||||
|
return float(self.config.timeout_graceful_shutdown)
|
||||||
|
return float(self.config.timeout_worker_healthcheck)
|
||||||
|
|
||||||
|
def _start_thread(self) -> Thread:
|
||||||
|
thread = Thread(self.config, self.target, self.sockets)
|
||||||
|
thread.start()
|
||||||
|
return thread
|
||||||
|
|
||||||
|
def _replace_thread(self, idx: int, thread: Thread, *, reason: str) -> None:
|
||||||
|
thread.terminate()
|
||||||
|
if not thread.join(timeout=self._thread_shutdown_timeout):
|
||||||
|
logger.warning("%s; starting a replacement thread while the previous thread is still running.", reason)
|
||||||
|
self.stale_threads.append(thread)
|
||||||
|
else:
|
||||||
|
logger.info(reason)
|
||||||
|
self.threads[idx] = self._start_thread()
|
||||||
|
|
||||||
|
def _all_threads(self) -> list[Thread]:
|
||||||
|
threads = list(self.threads)
|
||||||
|
for thread in self.stale_threads:
|
||||||
|
if thread not in threads:
|
||||||
|
threads.append(thread)
|
||||||
|
return threads
|
||||||
Loading…
Reference in New Issue
Block a user