Compare commits

...

13 Commits

Author SHA1 Message Date
Thomas Grainger
577c0570eb
ignore coverage files that show up in reloader tests 2024-10-01 12:06:29 +01:00
Thomas Grainger
47924f253f
enable sigterm hook 2024-10-01 12:05:48 +01:00
Thomas Grainger
bb6dc58eec
try to collect coverage 2024-10-01 11:25:33 +01:00
Thomas Grainger
d5a1f59be4
test unsupported 2024-10-01 10:58:59 +01:00
Thomas Grainger
77a4a23da4
change box to dataclass to avoid coverage requirement 2024-10-01 10:54:08 +01:00
Thomas Grainger
4166deed90
add coverage pragmas 2024-10-01 10:48:00 +01:00
Thomas Grainger
66afb6229a
remove print and sleep 2024-10-01 10:39:16 +01:00
Thomas Grainger
d9c117a2f3
add cli usage 2024-10-01 10:26:06 +01:00
Thomas Grainger
0af6480dd6
mypy fix 2 2024-10-01 10:19:45 +01:00
Thomas Grainger
2a9b20a7fa
mypy fix 2024-10-01 10:16:04 +01:00
Thomas Grainger
4a0785af65
add a test 2024-10-01 10:12:27 +01:00
Thomas Grainger
1550e7a10d
fix it so it actually runs 2024-10-01 10:12:10 +01:00
Thomas Grainger
3e2854ac30
add socket-load-balance flag 2024-09-30 15:30:39 +01:00
10 changed files with 143 additions and 14 deletions

View File

@ -135,6 +135,7 @@ Options:
buffer of an incomplete event.
--factory Treat APP as an application factory, i.e. a
() -> <ASGI app> callable.
--socket-load-balance Use kernel support for socket load balancing
--help Show this message and exit.
```

View File

@ -205,6 +205,7 @@ Options:
buffer of an incomplete event.
--factory Treat APP as an application factory, i.e. a
() -> <ASGI app> callable.
--socket-load-balance Use kernel support for socket load balancing
--help Show this message and exit.
```

View File

@ -103,6 +103,9 @@ filterwarnings = [
source_pkgs = ["uvicorn", "tests"]
plugins = ["coverage_conditional_plugin"]
omit = ["uvicorn/workers.py", "uvicorn/__main__.py"]
concurrency = ["multiprocessing", "thread"]
parallel = true
sigterm = true
[tool.coverage.report]
precision = 2

View File

@ -8,4 +8,5 @@ export SOURCE_FILES="uvicorn tests"
set -x
${PREFIX}coverage combine
${PREFIX}coverage report

View File

@ -1,17 +1,22 @@
from __future__ import annotations
import dataclasses
import functools
import multiprocessing.managers
import os
import signal
import socket
import sys
import threading
import time
from typing import Any, Callable
from typing import Any, Callable, Generic, TypeVar
import httpx
import pytest
from uvicorn import Config
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
from uvicorn.server import Server
from uvicorn.supervisors import Multiprocess
from uvicorn.supervisors.multiprocess import Process
@ -169,3 +174,71 @@ def test_multiprocess_sigttou() -> None:
assert len(supervisor.processes) == 1
supervisor.signal_queue.append(signal.SIGINT)
supervisor.join_all()
T = TypeVar("T")
@dataclasses.dataclass
class Box(Generic[T]):
v: T
async def lb_app(
d: multiprocessing.managers.DictProxy,
started: threading.Event,
scope: Scope,
receive: ASGIReceiveCallable,
send: ASGISendCallable,
) -> None: # pragma: py-darwin pragma: py-win32
if scope["type"] == "lifespan":
await receive()
scope["state"]["count"] = box = Box(0)
await send({"type": "lifespan.startup.complete"})
started.set()
await receive()
d[os.getpid()] = box.v
await send({"type": "lifespan.shutdown.complete"})
return
scope["state"]["count"].v += 1
headers = [(b"content-type", b"text/plain")]
await send({"type": "http.response.start", "status": 200, "headers": headers})
await send({"type": "http.response.body", "body": b"hello"})
@pytest.mark.skipif(
not ((sys.platform == "linux" and hasattr(socket, "SO_REUSEPORT")) or hasattr(socket, "SO_REUSEPORT_LB")),
reason="unsupported",
)
def test_multiprocess_socket_balance() -> None: # pragma: py-darwin pragma: py-win32
with multiprocessing.Manager() as m:
started = m.Event()
d = m.dict()
app = functools.partial(lb_app, d, started)
config = Config(app=app, workers=2, socket_load_balance=True, port=0, interface="asgi3")
server = Server(config=config)
with config.bind_socket() as sock:
port = sock.getsockname()[1]
try:
supervisor = Multiprocess(config, target=server.run, sockets=[sock])
threading.Thread(target=supervisor.run, daemon=True).start()
if not started.wait(timeout=5): # pragma: no cover
raise TimeoutError
with httpx.Client():
for i in range(100):
httpx.get(f"http://localhost:{port}/").raise_for_status()
finally:
supervisor.signal_queue.append(signal.SIGINT)
supervisor.join_all()
min_conn, max_conn = sorted(d.values())
assert (max_conn - min_conn) < 25
def test_multiprocess_not_supported(monkeypatch):
monkeypatch.delattr(socket, "SO_REUSEPORT")
config = Config(app=app, workers=2, socket_load_balance=True, port=0, interface="asgi3")
with config.bind_socket() as sock:
supervisor = Multiprocess(config, target=run, sockets=[sock])
with pytest.raises(RuntimeError, match="socket_load_balance not supported"):
supervisor.run()

View File

@ -157,7 +157,7 @@ class TestBaseReload:
app="tests.test_config:asgi_app",
reload=True,
reload_includes=["*"],
reload_excludes=["*.js"],
reload_excludes=["*.js", ".coverage.*"],
)
reloader = self._setup_reloader(config)
@ -242,7 +242,7 @@ class TestBaseReload:
reload=True,
# We need to add *.txt otherwise no regular files will match
reload_includes=[".*", "*.txt"],
reload_excludes=["*.py"],
reload_excludes=["*.py", ".coverage.*"],
)
reloader = self._setup_reloader(config)

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import socket
from unittest.mock import patch
from uvicorn._subprocess import SpawnProcess, get_subprocess, subprocess_started
from uvicorn._subprocess import SocketSharePickle, SpawnProcess, get_subprocess, subprocess_started
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
from uvicorn.config import Config
@ -36,7 +36,7 @@ def test_subprocess_started() -> None:
with patch("tests.test_subprocess.server_run") as mock_run:
with patch.object(config, "configure_logging") as mock_config_logging:
subprocess_started(config, server_run, [fdsock], None)
subprocess_started(config, server_run, [SocketSharePickle(fdsock)], None)
mock_run.assert_called_once()
mock_config_logging.assert_called_once()

View File

@ -7,9 +7,9 @@ from __future__ import annotations
import multiprocessing
import os
import socket
import sys
from multiprocessing.context import SpawnProcess
from socket import socket
from typing import Callable
from uvicorn.config import Config
@ -18,10 +18,42 @@ multiprocessing.allow_connection_pickling()
spawn = multiprocessing.get_context("spawn")
class SocketSharePickle:
def __init__(self, sock: socket.socket):
self._sock = sock
def get(self) -> socket.socket:
return self._sock
class SocketShareRebind:
def __init__(self, sock: socket.socket):
if not (sys.platform == "linux" and hasattr(socket, "SO_REUSEPORT")) or hasattr(socket, "SO_REUSEPORT_LB"):
raise RuntimeError("socket_load_balance not supported")
else: # pragma: py-darwin pragma: py-win32
sock.setsockopt(socket.SOL_SOCKET, getattr(socket, "SO_REUSEPORT_LB", socket.SO_REUSEPORT), 1)
self._family = sock.family
self._type = sock.type
self._proto = sock.proto
self._sockname = sock.getsockname()
def get(self) -> socket.socket: # pragma: py-darwin pragma: py-win32
try:
sock = socket.socket(family=self._family, type=self._type, proto=self._proto)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, getattr(socket, "SO_REUSEPORT_LB", socket.SO_REUSEPORT), 1)
sock.bind(self._sockname)
return sock
except BaseException: # pragma: no cover
sock.close()
raise
def get_subprocess(
config: Config,
target: Callable[..., None],
sockets: list[socket],
sockets: list[socket.socket],
) -> SpawnProcess:
"""
Called in the parent process, to instantiate a new child process instance.
@ -41,10 +73,15 @@ def get_subprocess(
except (AttributeError, OSError):
stdin_fileno = None
socket_shares: list[SocketShareRebind] | list[SocketSharePickle]
if config.socket_load_balance: # pragma: py-darwin pragma: py-win32
socket_shares = [SocketShareRebind(s) for s in sockets]
else:
socket_shares = [SocketSharePickle(s) for s in sockets]
kwargs = {
"config": config,
"target": target,
"sockets": sockets,
"sockets": socket_shares,
"stdin_fileno": stdin_fileno,
}
@ -54,7 +91,7 @@ def get_subprocess(
def subprocess_started(
config: Config,
target: Callable[..., None],
sockets: list[socket],
sockets: list[SocketSharePickle] | list[SocketShareRebind],
stdin_fileno: int | None,
) -> None:
"""
@ -77,7 +114,7 @@ def subprocess_started(
try:
# Now we can call into `Server.run(sockets=sockets)`
target(sockets=sockets)
target(sockets=[s.get() for s in sockets])
except KeyboardInterrupt: # pragma: no cover
# supress the exception to avoid a traceback from subprocess.Popen
# the parent already expects us to end, so no vital information is lost

View File

@ -223,6 +223,7 @@ class Config:
headers: list[tuple[str, str]] | None = None,
factory: bool = False,
h11_max_incomplete_event_size: int | None = None,
socket_load_balance: bool = False,
):
self.app = app
self.host = host
@ -268,6 +269,7 @@ class Config:
self.encoded_headers: list[tuple[bytes, bytes]] = []
self.factory = factory
self.h11_max_incomplete_event_size = h11_max_incomplete_event_size
self.socket_load_balance = socket_load_balance
self.loaded = False
self.configure_logging()

View File

@ -360,6 +360,13 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
help="Treat APP as an application factory, i.e. a () -> <ASGI app> callable.",
show_default=True,
)
@click.option(
"--socket-load-balance",
is_flag=True,
default=False,
help="Use kernel support for socket load balancing",
show_default=True,
)
def main(
app: str,
host: str,
@ -408,6 +415,7 @@ def main(
app_dir: str,
h11_max_incomplete_event_size: int | None,
factory: bool,
socket_load_balance: bool = False,
) -> None:
run(
app,
@ -457,6 +465,7 @@ def main(
factory=factory,
app_dir=app_dir,
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
socket_load_balance=socket_load_balance,
)
@ -509,6 +518,7 @@ def run(
app_dir: str | None = None,
factory: bool = False,
h11_max_incomplete_event_size: int | None = None,
socket_load_balance: bool = False,
) -> None:
if app_dir is not None:
sys.path.insert(0, app_dir)
@ -560,6 +570,7 @@ def run(
use_colors=use_colors,
factory=factory,
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
socket_load_balance=socket_load_balance,
)
server = Server(config=config)
@ -570,11 +581,11 @@ def run(
try:
if config.should_reload:
sock = config.bind_socket()
ChangeReload(config, target=server.run, sockets=[sock]).run()
with config.bind_socket() as sock:
ChangeReload(config, target=server.run, sockets=[sock]).run()
elif config.workers > 1:
sock = config.bind_socket()
Multiprocess(config, target=server.run, sockets=[sock]).run()
with config.bind_socket() as sock:
Multiprocess(config, target=server.run, sockets=[sock]).run()
else:
server.run()
except KeyboardInterrupt: