Compare commits

...

3 Commits

Author SHA1 Message Date
Marcelo Trylesinski
0f274d2a4e Refine thread worker heartbeat handling 2026-04-12 17:11:40 +02:00
Marcelo Trylesinski
7e901a0d2b Document free-threaded runtime detection 2026-04-12 16:36:23 +02:00
Marcelo Trylesinski
eea9b41c99 Add free-threaded thread worker class 2026-04-12 16:21:59 +02:00
11 changed files with 673 additions and 10 deletions

View File

@ -70,15 +70,26 @@ A process manager will handle the socket setup, start-up multiple server process
### Built-in
Uvicorn includes a `--workers` option that allows you to run multiple worker processes.
Uvicorn includes a `--workers` option that allows you to run multiple worker instances.
```bash
$ uvicorn main:app --workers 4
```
The default worker class is `process`. Uvicorn also includes a `thread` worker class:
```bash
$ uvicorn main:app --workers 4 --worker-class thread
```
The `thread` worker class was built specifically for free-threaded Python 3.14 runtimes.
It requires a free-threaded build (`Py_GIL_DISABLED=1`) with the GIL disabled at runtime.
Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spawn`](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multiprocess manager to still work well on Windows.
The default process manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface.
The default `process` manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface.
The `thread` worker class automatically restarts worker threads that exit unexpectedly and uses cooperative healthchecks to replace stale worker threads. Unlike the `process` worker class, it cannot force-kill a hung thread. When a thread fails its healthcheck, Uvicorn starts a replacement thread and lets the previous thread continue draining if it is still running.
You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.)

View File

@ -73,13 +73,21 @@ Using Uvicorn with watchfiles will enable the following options (which are other
## Production
* `--workers <int>` - Number of worker processes. Defaults to the `$WEB_CONCURRENCY` environment variable if available, or 1. Not valid with `--reload`.
* `--workers <int>` - Number of worker instances. Defaults to the `$WEB_CONCURRENCY` environment variable if available, or 1. Not valid with `--reload`.
* `--worker-class [process|thread]` - Worker implementation to use when running multiple workers. `process` is the default. `thread` was built specifically for free-threaded Python 3.14 runtimes and requires `Py_GIL_DISABLED=1` with the GIL disabled at runtime.
* `--env-file <path>` - Environment configuration file for the ASGI application. **Default:** *None*.
* `--timeout-worker-healthcheck <int>` - Maximum number of seconds to wait for a worker to respond to a healthcheck. **Default:** *5*.
!!! note
The `--reload` and `--workers` arguments are mutually exclusive. You cannot use both at the same time.
!!! note
The `thread` worker class was built specifically for free-threaded Python 3.14.
It uses cooperative healthchecks to detect and replace stale worker threads.
Unlike the `process` worker class, it cannot force-kill a hung thread.
When a thread fails its healthcheck, Uvicorn starts a replacement thread and
lets the previous thread continue draining if it is still running.
## Logging
* `--log-config <path>` - Logging configuration file. **Options:** *`dictConfig()` formats: .json, .yaml*. Any other format will be processed with `fileConfig()`. Set the `formatters.default.use_colors` and `formatters.access.use_colors` values to override the auto-detected behavior.

View File

@ -0,0 +1,323 @@
from __future__ import annotations
import signal
import socket
import threading
from collections.abc import Callable
import pytest
from pytest_mock import MockerFixture
from uvicorn import Config
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
from uvicorn.server import Server
from uvicorn.supervisors.multithread import Multithread, Thread
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
pass # pragma: no cover
class FakeThread:
def __init__(
self,
config: Config,
target: Callable[[list[socket.socket] | None], None],
sockets: list[socket.socket],
) -> None:
self.config = config
self.target = target
self.sockets = sockets
self.started = False
self.alive = True
self.terminated = False
self.joined = False
self.healthy = True
self.join_result = True
self.ready_for_healthcheck = True
def is_alive(self) -> bool:
return self.alive
def is_healthy(self, timeout: float) -> bool:
return self.healthy
def is_ready_for_healthcheck(self) -> bool:
return self.ready_for_healthcheck
def start(self) -> None:
self.started = True
def terminate(self) -> None:
self.terminated = True
self.alive = False
def join(self, timeout: float | None = None) -> bool:
self.joined = True
if self.join_result:
self.alive = False
return self.join_result
def test_thread_target_passes_duplicated_sockets() -> None:
captured_sockets: list[socket.socket] | None = None
def target(sockets: list[socket.socket] | None) -> None:
nonlocal captured_sockets
captured_sockets = sockets
sock = socket.socket()
try:
thread = Thread(Config(app=app), target=target, sockets=[sock])
thread.target()
finally:
sock.close()
assert captured_sockets is not None
assert len(captured_sockets) == 1
assert captured_sockets[0].fileno() == -1
def test_thread_terminate_sets_server_exit_flag() -> None:
config = Config(app=app)
thread = Thread(config, target=Server(config).run, sockets=[])
target = thread._get_target()
assert thread.server is not None
assert isinstance(thread.server, Server)
assert target.__self__ is thread.server
assert thread.server.should_exit is False
assert thread.config.callback_progress is not None
assert thread.config.callback_progress.__self__ is thread
assert thread.config.callback_progress.__func__ is thread.record_heartbeat.__func__
thread.terminate()
assert thread.server.should_exit is True
def test_thread_record_heartbeat_and_is_healthy() -> None:
thread = Thread(Config(app=app, timeout_worker_healthcheck=1), target=lambda sockets: None, sockets=[])
thread.last_heartbeat -= 5
assert thread.is_healthy(1) is False
thread.record_heartbeat()
assert thread.is_healthy(1) is True
@pytest.mark.anyio
async def test_server_progress_callback_records_heartbeat_on_tick() -> None:
thread = Thread(Config(app=app), target=lambda sockets: None, sockets=[])
config = Config(app=app, callback_progress=thread.record_heartbeat)
server = Server(config=config)
before = thread.last_heartbeat
thread.last_heartbeat -= 5
await server.on_tick(10)
assert thread.last_heartbeat > before
def test_thread_start_and_join() -> None:
finished = threading.Event()
def target(sockets: list[socket.socket] | None) -> None:
finished.set()
thread = Thread(Config(app=app), target=target, sockets=[])
thread.start()
assert thread.join() is True
assert finished.is_set()
assert thread.is_alive() is False
def test_thread_join_timeout_returns_false_for_hung_thread() -> None:
blocker = threading.Event()
def target(sockets: list[socket.socket] | None) -> None:
blocker.wait()
thread = Thread(Config(app=app), target=target, sockets=[])
thread.start()
try:
assert thread.join(timeout=0.01) is False
assert thread.is_alive() is True
finally:
blocker.set()
assert thread.join(timeout=1) is True
def test_multithread_init_terminate_join_and_restart(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
supervisor.init_threads()
original_threads = list(supervisor.threads)
assert len(supervisor.threads) == 2
assert all(thread.started for thread in supervisor.threads)
supervisor.terminate_all()
assert all(thread.terminated for thread in original_threads)
supervisor.join_all()
assert all(thread.joined for thread in original_threads)
supervisor.restart_all()
assert len(supervisor.threads) == 2
assert all(thread is not old for thread, old in zip(supervisor.threads, original_threads))
assert all(thread.started for thread in supervisor.threads)
def test_multithread_keep_subthread_alive_replaces_dead_thread(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
supervisor.init_threads()
dead_thread = supervisor.threads[0]
dead_thread.alive = False
supervisor.keep_subthread_alive()
assert supervisor.threads[0] is not dead_thread
assert supervisor.threads[0].started is True
def test_multithread_keep_subthread_alive_replaces_unhealthy_thread(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
supervisor.init_threads()
unhealthy_thread = supervisor.threads[0]
unhealthy_thread.healthy = False
supervisor.keep_subthread_alive()
assert supervisor.threads[0] is not unhealthy_thread
assert supervisor.threads[0].started is True
def test_multithread_keep_subthread_alive_skips_healthcheck_until_ready(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
supervisor.init_threads()
thread = supervisor.threads[0]
thread.healthy = False
thread.ready_for_healthcheck = False
supervisor.keep_subthread_alive()
assert supervisor.threads[0] is thread
def test_multithread_keep_subthread_alive_replaces_unhealthy_thread_without_blocking_join(
mocker: MockerFixture,
) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
supervisor.init_threads()
unhealthy_thread = supervisor.threads[0]
unhealthy_thread.healthy = False
unhealthy_thread.join_result = False
supervisor.keep_subthread_alive()
assert supervisor.threads[0] is not unhealthy_thread
assert unhealthy_thread in supervisor.stale_threads
def test_multithread_keep_subthread_alive_noop_when_exiting(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
supervisor.init_threads()
dead_thread = supervisor.threads[0]
dead_thread.alive = False
supervisor.should_exit.set()
supervisor.keep_subthread_alive()
assert supervisor.threads[0] is dead_thread
def test_multithread_signal_handlers(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
supervisor.init_threads()
supervisor.handle_ttin()
assert len(supervisor.threads) == 3
removed_thread = supervisor.threads[-1]
supervisor.handle_ttou()
assert len(supervisor.threads) == 2
assert removed_thread.terminated is True
assert removed_thread.joined is True
supervisor.handle_ttou()
supervisor.handle_ttou()
assert len(supervisor.threads) == 1
original_threads = list(supervisor.threads)
supervisor.handle_hup()
assert len(supervisor.threads) == 1
assert supervisor.threads[0] is not original_threads[0]
supervisor.handle_term()
assert supervisor.should_exit.is_set()
def test_multithread_join_all_uses_timeout_and_warns(mocker: MockerFixture, caplog: pytest.LogCaptureFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(
Config(app=app, workers=1, timeout_worker_healthcheck=2, timeout_graceful_shutdown=3),
target=lambda sockets: None,
sockets=[],
)
supervisor.init_threads()
thread = supervisor.threads[0]
thread.join_result = False
supervisor.join_all()
assert thread.joined is True
assert "Worker thread did not exit within 3.00 seconds." in caplog.records[-1].message
def test_multithread_thread_shutdown_timeout_defaults_to_healthcheck(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(
Config(app=app, workers=1, timeout_worker_healthcheck=7),
target=lambda sockets: None,
sockets=[],
)
assert supervisor._thread_shutdown_timeout == 7.0
@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK")
def test_multithread_handle_break() -> None: # pragma: py-not-win32
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
supervisor.handle_break()
assert supervisor.should_exit.is_set()
def test_multithread_handle_signals_and_run(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
supervisor.signal_queue.extend([signal.SIGINT, signal.SIGUSR1])
supervisor.handle_signals()
assert supervisor.should_exit.is_set()
assert supervisor.signal_queue == []
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
supervisor.signal_queue.append(signal.SIGINT)
supervisor.run()
assert supervisor.should_exit.is_set()

View File

@ -15,7 +15,7 @@ import uvicorn
from uvicorn.config import Config
from uvicorn.main import main as cli
from uvicorn.server import Server
from uvicorn.supervisors import ChangeReload, Multiprocess
from uvicorn.supervisors import ChangeReload, Multiprocess, Multithread
HEADERS = "Content-Security-Policy:default-src 'self'; script-src https://example.com"
main = importlib.import_module("uvicorn.main")
@ -101,6 +101,19 @@ def test_cli_call_multiprocess_run() -> None:
mock_run.assert_called_once()
def test_cli_call_multithread_run() -> None:
runner = CliRunner()
with mock.patch("uvicorn.config.is_free_threaded_runtime", return_value=True):
with mock.patch.object(Config, "bind_socket") as mock_bind_socket:
with mock.patch.object(Multithread, "run") as mock_run:
result = runner.invoke(cli, ["tests.test_cli:App", "--workers=2", "--worker-class=thread"])
assert result.exit_code == 0
mock_bind_socket.assert_called_once()
mock_run.assert_called_once()
@pytest.fixture(params=(True, False))
def uds_file(tmp_path: Path, request: pytest.FixtureRequest) -> Path: # pragma: py-win32
file = tmp_path / "uvicorn.sock"
@ -125,6 +138,15 @@ def test_cli_uds(uds_file: Path) -> None: # pragma: py-win32
assert not uds_file.exists()
def test_cli_thread_worker_class_requires_free_threaded_runtime() -> None:
runner = CliRunner()
result = runner.invoke(cli, ["tests.test_cli:App", "--workers=2", "--worker-class=thread"])
assert result.exit_code == 1
assert 'Worker class "thread" requires a free-threaded Python 3.14 runtime' in str(result.exception)
def test_cli_incomplete_app_parameter() -> None:
runner = CliRunner()

View File

@ -536,12 +536,24 @@ def test_config_use_subprocess(reload: bool, workers: int, expected: bool):
assert config.use_subprocess == expected
def test_config_use_subprocess_with_thread_worker_class(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.config.is_free_threaded_runtime", return_value=True)
config = Config(app=asgi_app, workers=2, worker_class="thread")
config.load()
assert config.use_subprocess is False
def test_warn_when_using_reload_and_workers(caplog: pytest.LogCaptureFixture) -> None:
Config(app=asgi_app, reload=True, workers=2)
assert len(caplog.records) == 1
assert '"workers" flag is ignored when reloading is enabled.' in caplog.records[0].message
def test_thread_worker_class_requires_free_threaded_runtime() -> None:
with pytest.raises(ValueError, match='Worker class "thread" requires a free-threaded Python 3.14 runtime'):
Config(app=asgi_app, worker_class="thread")
@pytest.mark.parametrize(
("loop_type", "expected_loop_factory"),
[

View File

@ -5,6 +5,7 @@ from logging import WARNING
import httpx
import pytest
from pytest_mock import MockerFixture
import uvicorn.server
from tests.utils import run_server
@ -12,6 +13,7 @@ from uvicorn import Server
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
from uvicorn.config import Config
from uvicorn.main import run
from uvicorn.supervisors import Multithread
pytestmark = pytest.mark.anyio
@ -85,6 +87,22 @@ def test_run_invalid_app_config_combination(caplog: pytest.LogCaptureFixture) ->
)
def test_run_invalid_thread_worker_class_config() -> None:
with pytest.raises(ValueError, match='Worker class "thread" requires a free-threaded Python 3.14 runtime'):
run("tests.test_main:app", workers=2, worker_class="thread")
def test_run_multithread(mocker: MockerFixture) -> None:
mocker.patch("uvicorn.config.is_free_threaded_runtime", return_value=True)
mock_bind_socket = mocker.patch.object(Config, "bind_socket")
mock_run = mocker.patch.object(Multithread, "run")
run("tests.test_main:app", workers=2, worker_class="thread")
mock_bind_socket.assert_called_once()
mock_run.assert_called_once()
def test_run_startup_failure(caplog: pytest.LogCaptureFixture) -> None:
async def app(scope, receive, send):
assert scope["type"] == "lifespan"
@ -101,7 +119,7 @@ def test_run_match_config_params() -> None:
config_params = {
key: repr(value)
for key, value in inspect.signature(Config.__init__).parameters.items()
if key not in ("self", "timeout_notify", "callback_notify")
if key not in ("self", "timeout_notify", "callback_notify", "callback_progress")
}
run_params = {
key: repr(value) for key, value in inspect.signature(run).parameters.items() if key not in ("app_dir",)

View File

@ -9,6 +9,7 @@ import os
import socket
import ssl
import sys
import sysconfig
from collections.abc import Awaitable, Callable
from configparser import RawConfigParser
from pathlib import Path
@ -30,6 +31,7 @@ WSProtocolType = Literal["auto", "none", "websockets", "websockets-sansio", "wsp
LifespanType = Literal["auto", "on", "off"]
LoopFactoryType = Literal["none", "auto", "asyncio", "uvloop"]
InterfaceType = Literal["auto", "asgi3", "asgi2", "wsgi"]
WorkerClassType = Literal["process", "thread"]
LOG_LEVELS: dict[str, int] = {
"critical": logging.CRITICAL,
@ -63,6 +65,7 @@ LOOP_FACTORIES: dict[str, str | None] = {
"uvloop": "uvicorn.loops.uvloop:uvloop_loop_factory",
}
INTERFACES: list[InterfaceType] = ["auto", "asgi3", "asgi2", "wsgi"]
WORKER_CLASSES: list[WorkerClassType] = ["process", "thread"]
SSL_PROTOCOL_VERSION: int = ssl.PROTOCOL_TLS_SERVER
@ -175,6 +178,21 @@ def _normalize_dirs(dirs: list[str] | str | None) -> list[str]:
return list(set(dirs))
def is_free_threaded_runtime() -> bool:
# CPython documents these as separate checks:
# - `sysconfig.get_config_var("Py_GIL_DISABLED")` for whether the build supports free threading
# - `sys._is_gil_enabled()` for whether the GIL is actually disabled in the running process
# https://docs.python.org/3/howto/free-threading-python.html#identifying-free-threaded-python
if sys.version_info < (3, 14):
return False
if sysconfig.get_config_var("Py_GIL_DISABLED") != 1:
return False
is_gil_enabled = getattr(sys, "_is_gil_enabled", None)
if is_gil_enabled is None:
return True
return not is_gil_enabled()
class Config:
def __init__(
self,
@ -204,6 +222,7 @@ class Config:
reload_includes: list[str] | str | None = None,
reload_excludes: list[str] | str | None = None,
workers: int | None = None,
worker_class: WorkerClassType = "process",
proxy_headers: bool = True,
server_header: bool = True,
date_header: bool = True,
@ -218,6 +237,7 @@ class Config:
timeout_graceful_shutdown: int | None = None,
timeout_worker_healthcheck: int = 5,
callback_notify: Callable[..., Awaitable[None]] | None = None,
callback_progress: Callable[[], None] | None = None,
ssl_keyfile: str | os.PathLike[str] | None = None,
ssl_certfile: str | os.PathLike[str] | None = None,
ssl_keyfile_password: str | None = None,
@ -251,6 +271,7 @@ class Config:
self.reload = reload
self.reload_delay = reload_delay
self.workers = workers or 1
self.worker_class = worker_class
self.proxy_headers = proxy_headers
self.server_header = server_header
self.date_header = date_header
@ -264,6 +285,7 @@ class Config:
self.timeout_graceful_shutdown = timeout_graceful_shutdown
self.timeout_worker_healthcheck = timeout_worker_healthcheck
self.callback_notify = callback_notify
self.callback_progress = callback_progress
self.ssl_keyfile = ssl_keyfile
self.ssl_certfile = ssl_certfile
self.ssl_keyfile_password = ssl_keyfile_password
@ -344,6 +366,11 @@ class Config:
if self.reload and self.workers > 1:
logger.warning('"workers" flag is ignored when reloading is enabled.')
if self.worker_class == "thread" and not is_free_threaded_runtime():
raise ValueError(
'Worker class "thread" requires a free-threaded Python 3.14 runtime (Py_GIL_DISABLED=1, GIL off).'
)
@property
def asgi_version(self) -> Literal["2.0", "3.0"]:
mapping: dict[str, Literal["2.0", "3.0"]] = {
@ -359,7 +386,7 @@ class Config:
@property
def use_subprocess(self) -> bool:
return bool(self.reload or self.workers > 1)
return bool(self.reload or (self.workers > 1 and self.worker_class == "process"))
def configure_logging(self) -> None:
logging.addLevelName(TRACE_LOG_LEVEL, "TRACE")

View File

@ -21,19 +21,22 @@ from uvicorn.config import (
LOG_LEVELS,
LOGGING_CONFIG,
SSL_PROTOCOL_VERSION,
WORKER_CLASSES,
Config,
HTTPProtocolType,
InterfaceType,
LifespanType,
LoopFactoryType,
WorkerClassType,
WSProtocolType,
)
from uvicorn.server import Server
from uvicorn.supervisors import ChangeReload, Multiprocess
from uvicorn.supervisors import ChangeReload, Multiprocess, Multithread
LEVEL_CHOICES = click.Choice(list(LOG_LEVELS.keys()))
LIFESPAN_CHOICES = click.Choice(list(LIFESPAN.keys()))
INTERFACE_CHOICES = click.Choice(INTERFACES)
WORKER_CLASS_CHOICES = click.Choice(WORKER_CLASSES)
def _metavar_from_type(_type: Any) -> str:
@ -113,9 +116,16 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
"--workers",
default=None,
type=int,
help="Number of worker processes. Defaults to the $WEB_CONCURRENCY environment"
help="Number of worker instances. Defaults to the $WEB_CONCURRENCY environment"
" variable if available, or 1. Not valid with --reload.",
)
@click.option(
"--worker-class",
type=WORKER_CLASS_CHOICES,
default="process",
help="Worker implementation to use when running multiple workers.",
show_default=True,
)
@click.option(
"--loop",
type=str,
@ -401,6 +411,7 @@ def main(
reload_excludes: list[str],
reload_delay: float,
workers: int,
worker_class: WorkerClassType,
env_file: str,
log_config: str,
log_level: str,
@ -456,6 +467,7 @@ def main(
reload_excludes=reload_excludes or None,
reload_delay=reload_delay,
workers=workers,
worker_class=worker_class,
proxy_headers=proxy_headers,
server_header=server_header,
date_header=date_header,
@ -506,6 +518,7 @@ def run(
reload_excludes: list[str] | str | None = None,
reload_delay: float = 0.25,
workers: int | None = None,
worker_class: WorkerClassType = "process",
env_file: str | os.PathLike[str] | None = None,
log_config: dict[str, Any] | str | RawConfigParser | IO[Any] | None = LOGGING_CONFIG,
log_level: str | int | None = None,
@ -560,6 +573,7 @@ def run(
reload_excludes=reload_excludes,
reload_delay=reload_delay,
workers=workers,
worker_class=worker_class,
env_file=env_file,
log_config=log_config,
log_level=log_level,
@ -601,7 +615,10 @@ def run(
ChangeReload(config, target=server.run, sockets=[sock]).run()
elif config.workers > 1:
sock = config.bind_socket()
Multiprocess(config, target=server.run, sockets=[sock]).run()
if config.worker_class == "process":
Multiprocess(config, target=server.run, sockets=[sock]).run()
else:
Multithread(config, target=server.run, sockets=[sock]).run()
else:
server.run()
except KeyboardInterrupt:

View File

@ -241,6 +241,9 @@ class Server:
async def on_tick(self, counter: int) -> bool:
# Update the default headers, once per second.
if counter % 10 == 0:
if self.config.callback_progress is not None:
self.config.callback_progress()
current_time = time.time()
current_date = formatdate(current_time, usegmt=True).encode()

View File

@ -4,6 +4,7 @@ from typing import TYPE_CHECKING
from uvicorn.supervisors.basereload import BaseReload
from uvicorn.supervisors.multiprocess import Multiprocess
from uvicorn.supervisors.multithread import Multithread
if TYPE_CHECKING:
ChangeReload: type[BaseReload]
@ -13,4 +14,4 @@ else:
except ImportError: # pragma: no cover
from uvicorn.supervisors.statreload import StatReload as ChangeReload
__all__ = ["Multiprocess", "ChangeReload"]
__all__ = ["Multiprocess", "Multithread", "ChangeReload"]

View File

@ -0,0 +1,221 @@
from __future__ import annotations
import copy
import inspect
import logging
import os
import signal
import threading
import time
from collections.abc import Callable
from socket import socket
from typing import Any
import click
from uvicorn.config import Config
from uvicorn.server import Server
from uvicorn.supervisors.multiprocess import SIGNALS
logger = logging.getLogger("uvicorn.error")
class Thread:
def __init__(
self,
config: Config,
target: Callable[[list[socket] | None], None],
sockets: list[socket],
) -> None:
self.config = copy.copy(config)
self.real_target = target
self.sockets = sockets
self.server: Server | None = None
self.last_heartbeat = time.monotonic()
self.thread = threading.Thread(target=self.target, daemon=True)
def _get_target(self) -> Callable[[list[socket] | None], None]:
if inspect.ismethod(self.real_target) and isinstance(self.real_target.__self__, Server):
self.config.callback_progress = self.record_heartbeat
self.server = Server(config=self.config)
return self.server.run
return self.real_target
def record_heartbeat(self) -> None:
self.last_heartbeat = time.monotonic()
def target(self, sockets: list[socket] | None = None) -> Any:
sockets = [sock.dup() for sock in self.sockets]
try:
return self._get_target()(sockets)
finally:
for sock in sockets:
if sock.fileno() != -1:
sock.close()
def is_alive(self) -> bool:
return self.thread.is_alive()
def start(self) -> None:
self.thread.start()
def terminate(self) -> None:
if self.server is not None:
self.server.should_exit = True
def join(self, timeout: float | None = None) -> bool:
self.thread.join(timeout=timeout)
return not self.is_alive()
def is_healthy(self, timeout: float) -> bool:
return time.monotonic() - self.last_heartbeat <= timeout
def is_ready_for_healthcheck(self) -> bool:
return self.server is None or self.server.started
class Multithread:
def __init__(
self,
config: Config,
target: Callable[[list[socket] | None], None],
sockets: list[socket],
) -> None:
self.config = config
self.target = target
self.sockets = sockets
self.threads_num = config.workers
self.threads: list[Thread] = []
self.stale_threads: list[Thread] = []
self.should_exit = threading.Event()
self.signal_queue: list[int] = []
for sig in SIGNALS:
signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig))
def init_threads(self) -> None:
for _ in range(self.threads_num):
self.threads.append(self._start_thread())
def terminate_all(self) -> None:
for thread in self._all_threads():
thread.terminate()
def join_all(self) -> None:
timeout = self._thread_shutdown_timeout
for thread in self._all_threads():
joined = thread.join(timeout=timeout)
if not joined:
logger.warning("Worker thread did not exit within %.2f seconds.", timeout)
def restart_all(self) -> None:
for idx, thread in enumerate(self.threads):
self._replace_thread(idx, thread, reason="Worker thread restarted")
def run(self) -> None:
message = f"Started parent process [{os.getpid()}]"
color_message = "Started parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True))
logger.info(message, extra={"color_message": color_message})
self.init_threads()
while not self.should_exit.wait(0.5):
self.handle_signals()
self.keep_subthread_alive()
self.terminate_all()
self.join_all()
message = f"Stopping parent process [{os.getpid()}]"
color_message = "Stopping parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True))
logger.info(message, extra={"color_message": color_message})
def keep_subthread_alive(self) -> None:
if self.should_exit.is_set():
return
for idx, thread in enumerate(self.threads):
if self.should_exit.is_set(): # pragma: no cover
return
if not thread.is_alive():
self._replace_thread(idx, thread, reason="Child thread died")
continue
if not thread.is_ready_for_healthcheck():
continue
if not thread.is_healthy(timeout=self.config.timeout_worker_healthcheck):
self._replace_thread(idx, thread, reason="Worker thread failed healthcheck")
def handle_signals(self) -> None:
for sig in tuple(self.signal_queue):
self.signal_queue.remove(sig)
sig_name = SIGNALS[sig]
sig_handler = getattr(self, f"handle_{sig_name.lower()}", None)
if sig_handler is not None:
sig_handler()
else: # pragma: no cover
logger.debug(f"Received signal {sig_name}, but no handler is defined for it.")
def handle_int(self) -> None:
logger.info("Received SIGINT, exiting.")
self.should_exit.set()
def handle_term(self) -> None:
logger.info("Received SIGTERM, exiting.")
self.should_exit.set()
def handle_break(self) -> None: # pragma: py-not-win32
logger.info("Received SIGBREAK, exiting.")
self.should_exit.set()
def handle_hup(self) -> None: # pragma: py-win32
logger.info("Received SIGHUP, restarting threads.")
self.restart_all()
def handle_ttin(self) -> None: # pragma: py-win32
logger.info("Received SIGTTIN, increasing the number of threads.")
self.threads_num += 1
self.threads.append(self._start_thread())
def handle_ttou(self) -> None: # pragma: py-win32
logger.info("Received SIGTTOU, decreasing number of threads.")
if self.threads_num <= 1:
logger.info("Already reached one thread, cannot decrease the number of threads anymore.")
return
self.threads_num -= 1
thread = self.threads.pop()
thread.terminate()
if not thread.join(timeout=self._thread_shutdown_timeout):
logger.warning("Worker thread did not exit within %.2f seconds.", self._thread_shutdown_timeout)
self.stale_threads.append(thread)
@property
def _thread_shutdown_timeout(self) -> float:
if self.config.timeout_graceful_shutdown is not None:
return float(self.config.timeout_graceful_shutdown)
return float(self.config.timeout_worker_healthcheck)
def _start_thread(self) -> Thread:
thread = Thread(self.config, self.target, self.sockets)
thread.start()
return thread
def _replace_thread(self, idx: int, thread: Thread, *, reason: str) -> None:
thread.terminate()
if not thread.join(timeout=self._thread_shutdown_timeout):
logger.warning("%s; starting a replacement thread while the previous thread is still running.", reason)
self.stale_threads.append(thread)
else:
logger.info(reason)
self.threads[idx] = self._start_thread()
def _all_threads(self) -> list[Thread]:
threads = list(self.threads)
for thread in self.stale_threads:
if thread not in threads:
threads.append(thread)
return threads