Compare commits
13 Commits
main
...
socket-loa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
577c0570eb | ||
|
|
47924f253f | ||
|
|
bb6dc58eec | ||
|
|
d5a1f59be4 | ||
|
|
77a4a23da4 | ||
|
|
4166deed90 | ||
|
|
66afb6229a | ||
|
|
d9c117a2f3 | ||
|
|
0af6480dd6 | ||
|
|
2a9b20a7fa | ||
|
|
4a0785af65 | ||
|
|
1550e7a10d | ||
|
|
3e2854ac30 |
@ -135,6 +135,7 @@ Options:
|
||||
buffer of an incomplete event.
|
||||
--factory Treat APP as an application factory, i.e. a
|
||||
() -> <ASGI app> callable.
|
||||
--socket-load-balance Use kernel support for socket load balancing
|
||||
--help Show this message and exit.
|
||||
```
|
||||
|
||||
|
||||
@ -205,6 +205,7 @@ Options:
|
||||
buffer of an incomplete event.
|
||||
--factory Treat APP as an application factory, i.e. a
|
||||
() -> <ASGI app> callable.
|
||||
--socket-load-balance Use kernel support for socket load balancing
|
||||
--help Show this message and exit.
|
||||
```
|
||||
|
||||
|
||||
@ -103,6 +103,9 @@ filterwarnings = [
|
||||
source_pkgs = ["uvicorn", "tests"]
|
||||
plugins = ["coverage_conditional_plugin"]
|
||||
omit = ["uvicorn/workers.py", "uvicorn/__main__.py"]
|
||||
concurrency = ["multiprocessing", "thread"]
|
||||
parallel = true
|
||||
sigterm = true
|
||||
|
||||
[tool.coverage.report]
|
||||
precision = 2
|
||||
|
||||
@ -8,4 +8,5 @@ export SOURCE_FILES="uvicorn tests"
|
||||
|
||||
set -x
|
||||
|
||||
${PREFIX}coverage combine
|
||||
${PREFIX}coverage report
|
||||
|
||||
@ -1,17 +1,22 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import functools
|
||||
import multiprocessing.managers
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Callable
|
||||
from typing import Any, Callable, Generic, TypeVar
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from uvicorn import Config
|
||||
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
|
||||
from uvicorn.server import Server
|
||||
from uvicorn.supervisors import Multiprocess
|
||||
from uvicorn.supervisors.multiprocess import Process
|
||||
|
||||
@ -169,3 +174,71 @@ def test_multiprocess_sigttou() -> None:
|
||||
assert len(supervisor.processes) == 1
|
||||
supervisor.signal_queue.append(signal.SIGINT)
|
||||
supervisor.join_all()
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Box(Generic[T]):
|
||||
v: T
|
||||
|
||||
|
||||
async def lb_app(
|
||||
d: multiprocessing.managers.DictProxy,
|
||||
started: threading.Event,
|
||||
scope: Scope,
|
||||
receive: ASGIReceiveCallable,
|
||||
send: ASGISendCallable,
|
||||
) -> None: # pragma: py-darwin pragma: py-win32
|
||||
if scope["type"] == "lifespan":
|
||||
await receive()
|
||||
scope["state"]["count"] = box = Box(0)
|
||||
await send({"type": "lifespan.startup.complete"})
|
||||
started.set()
|
||||
await receive()
|
||||
d[os.getpid()] = box.v
|
||||
await send({"type": "lifespan.shutdown.complete"})
|
||||
return
|
||||
|
||||
scope["state"]["count"].v += 1
|
||||
headers = [(b"content-type", b"text/plain")]
|
||||
await send({"type": "http.response.start", "status": 200, "headers": headers})
|
||||
await send({"type": "http.response.body", "body": b"hello"})
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not ((sys.platform == "linux" and hasattr(socket, "SO_REUSEPORT")) or hasattr(socket, "SO_REUSEPORT_LB")),
|
||||
reason="unsupported",
|
||||
)
|
||||
def test_multiprocess_socket_balance() -> None: # pragma: py-darwin pragma: py-win32
|
||||
with multiprocessing.Manager() as m:
|
||||
started = m.Event()
|
||||
d = m.dict()
|
||||
app = functools.partial(lb_app, d, started)
|
||||
config = Config(app=app, workers=2, socket_load_balance=True, port=0, interface="asgi3")
|
||||
server = Server(config=config)
|
||||
with config.bind_socket() as sock:
|
||||
port = sock.getsockname()[1]
|
||||
try:
|
||||
supervisor = Multiprocess(config, target=server.run, sockets=[sock])
|
||||
threading.Thread(target=supervisor.run, daemon=True).start()
|
||||
if not started.wait(timeout=5): # pragma: no cover
|
||||
raise TimeoutError
|
||||
with httpx.Client():
|
||||
for i in range(100):
|
||||
httpx.get(f"http://localhost:{port}/").raise_for_status()
|
||||
finally:
|
||||
supervisor.signal_queue.append(signal.SIGINT)
|
||||
supervisor.join_all()
|
||||
min_conn, max_conn = sorted(d.values())
|
||||
assert (max_conn - min_conn) < 25
|
||||
|
||||
|
||||
def test_multiprocess_not_supported(monkeypatch):
|
||||
monkeypatch.delattr(socket, "SO_REUSEPORT")
|
||||
config = Config(app=app, workers=2, socket_load_balance=True, port=0, interface="asgi3")
|
||||
with config.bind_socket() as sock:
|
||||
supervisor = Multiprocess(config, target=run, sockets=[sock])
|
||||
with pytest.raises(RuntimeError, match="socket_load_balance not supported"):
|
||||
supervisor.run()
|
||||
|
||||
@ -157,7 +157,7 @@ class TestBaseReload:
|
||||
app="tests.test_config:asgi_app",
|
||||
reload=True,
|
||||
reload_includes=["*"],
|
||||
reload_excludes=["*.js"],
|
||||
reload_excludes=["*.js", ".coverage.*"],
|
||||
)
|
||||
reloader = self._setup_reloader(config)
|
||||
|
||||
@ -242,7 +242,7 @@ class TestBaseReload:
|
||||
reload=True,
|
||||
# We need to add *.txt otherwise no regular files will match
|
||||
reload_includes=[".*", "*.txt"],
|
||||
reload_excludes=["*.py"],
|
||||
reload_excludes=["*.py", ".coverage.*"],
|
||||
)
|
||||
reloader = self._setup_reloader(config)
|
||||
|
||||
|
||||
@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
import socket
|
||||
from unittest.mock import patch
|
||||
|
||||
from uvicorn._subprocess import SpawnProcess, get_subprocess, subprocess_started
|
||||
from uvicorn._subprocess import SocketSharePickle, SpawnProcess, get_subprocess, subprocess_started
|
||||
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
|
||||
from uvicorn.config import Config
|
||||
|
||||
@ -36,7 +36,7 @@ def test_subprocess_started() -> None:
|
||||
|
||||
with patch("tests.test_subprocess.server_run") as mock_run:
|
||||
with patch.object(config, "configure_logging") as mock_config_logging:
|
||||
subprocess_started(config, server_run, [fdsock], None)
|
||||
subprocess_started(config, server_run, [SocketSharePickle(fdsock)], None)
|
||||
mock_run.assert_called_once()
|
||||
mock_config_logging.assert_called_once()
|
||||
|
||||
|
||||
@ -7,9 +7,9 @@ from __future__ import annotations
|
||||
|
||||
import multiprocessing
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
from multiprocessing.context import SpawnProcess
|
||||
from socket import socket
|
||||
from typing import Callable
|
||||
|
||||
from uvicorn.config import Config
|
||||
@ -18,10 +18,42 @@ multiprocessing.allow_connection_pickling()
|
||||
spawn = multiprocessing.get_context("spawn")
|
||||
|
||||
|
||||
class SocketSharePickle:
|
||||
def __init__(self, sock: socket.socket):
|
||||
self._sock = sock
|
||||
|
||||
def get(self) -> socket.socket:
|
||||
return self._sock
|
||||
|
||||
|
||||
class SocketShareRebind:
|
||||
def __init__(self, sock: socket.socket):
|
||||
if not (sys.platform == "linux" and hasattr(socket, "SO_REUSEPORT")) or hasattr(socket, "SO_REUSEPORT_LB"):
|
||||
raise RuntimeError("socket_load_balance not supported")
|
||||
else: # pragma: py-darwin pragma: py-win32
|
||||
sock.setsockopt(socket.SOL_SOCKET, getattr(socket, "SO_REUSEPORT_LB", socket.SO_REUSEPORT), 1)
|
||||
self._family = sock.family
|
||||
self._type = sock.type
|
||||
self._proto = sock.proto
|
||||
self._sockname = sock.getsockname()
|
||||
|
||||
def get(self) -> socket.socket: # pragma: py-darwin pragma: py-win32
|
||||
try:
|
||||
sock = socket.socket(family=self._family, type=self._type, proto=self._proto)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.setsockopt(socket.SOL_SOCKET, getattr(socket, "SO_REUSEPORT_LB", socket.SO_REUSEPORT), 1)
|
||||
|
||||
sock.bind(self._sockname)
|
||||
return sock
|
||||
except BaseException: # pragma: no cover
|
||||
sock.close()
|
||||
raise
|
||||
|
||||
|
||||
def get_subprocess(
|
||||
config: Config,
|
||||
target: Callable[..., None],
|
||||
sockets: list[socket],
|
||||
sockets: list[socket.socket],
|
||||
) -> SpawnProcess:
|
||||
"""
|
||||
Called in the parent process, to instantiate a new child process instance.
|
||||
@ -41,10 +73,15 @@ def get_subprocess(
|
||||
except (AttributeError, OSError):
|
||||
stdin_fileno = None
|
||||
|
||||
socket_shares: list[SocketShareRebind] | list[SocketSharePickle]
|
||||
if config.socket_load_balance: # pragma: py-darwin pragma: py-win32
|
||||
socket_shares = [SocketShareRebind(s) for s in sockets]
|
||||
else:
|
||||
socket_shares = [SocketSharePickle(s) for s in sockets]
|
||||
kwargs = {
|
||||
"config": config,
|
||||
"target": target,
|
||||
"sockets": sockets,
|
||||
"sockets": socket_shares,
|
||||
"stdin_fileno": stdin_fileno,
|
||||
}
|
||||
|
||||
@ -54,7 +91,7 @@ def get_subprocess(
|
||||
def subprocess_started(
|
||||
config: Config,
|
||||
target: Callable[..., None],
|
||||
sockets: list[socket],
|
||||
sockets: list[SocketSharePickle] | list[SocketShareRebind],
|
||||
stdin_fileno: int | None,
|
||||
) -> None:
|
||||
"""
|
||||
@ -77,7 +114,7 @@ def subprocess_started(
|
||||
|
||||
try:
|
||||
# Now we can call into `Server.run(sockets=sockets)`
|
||||
target(sockets=sockets)
|
||||
target(sockets=[s.get() for s in sockets])
|
||||
except KeyboardInterrupt: # pragma: no cover
|
||||
# supress the exception to avoid a traceback from subprocess.Popen
|
||||
# the parent already expects us to end, so no vital information is lost
|
||||
|
||||
@ -223,6 +223,7 @@ class Config:
|
||||
headers: list[tuple[str, str]] | None = None,
|
||||
factory: bool = False,
|
||||
h11_max_incomplete_event_size: int | None = None,
|
||||
socket_load_balance: bool = False,
|
||||
):
|
||||
self.app = app
|
||||
self.host = host
|
||||
@ -268,6 +269,7 @@ class Config:
|
||||
self.encoded_headers: list[tuple[bytes, bytes]] = []
|
||||
self.factory = factory
|
||||
self.h11_max_incomplete_event_size = h11_max_incomplete_event_size
|
||||
self.socket_load_balance = socket_load_balance
|
||||
|
||||
self.loaded = False
|
||||
self.configure_logging()
|
||||
|
||||
@ -360,6 +360,13 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
|
||||
help="Treat APP as an application factory, i.e. a () -> <ASGI app> callable.",
|
||||
show_default=True,
|
||||
)
|
||||
@click.option(
|
||||
"--socket-load-balance",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Use kernel support for socket load balancing",
|
||||
show_default=True,
|
||||
)
|
||||
def main(
|
||||
app: str,
|
||||
host: str,
|
||||
@ -408,6 +415,7 @@ def main(
|
||||
app_dir: str,
|
||||
h11_max_incomplete_event_size: int | None,
|
||||
factory: bool,
|
||||
socket_load_balance: bool = False,
|
||||
) -> None:
|
||||
run(
|
||||
app,
|
||||
@ -457,6 +465,7 @@ def main(
|
||||
factory=factory,
|
||||
app_dir=app_dir,
|
||||
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
|
||||
socket_load_balance=socket_load_balance,
|
||||
)
|
||||
|
||||
|
||||
@ -509,6 +518,7 @@ def run(
|
||||
app_dir: str | None = None,
|
||||
factory: bool = False,
|
||||
h11_max_incomplete_event_size: int | None = None,
|
||||
socket_load_balance: bool = False,
|
||||
) -> None:
|
||||
if app_dir is not None:
|
||||
sys.path.insert(0, app_dir)
|
||||
@ -560,6 +570,7 @@ def run(
|
||||
use_colors=use_colors,
|
||||
factory=factory,
|
||||
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
|
||||
socket_load_balance=socket_load_balance,
|
||||
)
|
||||
server = Server(config=config)
|
||||
|
||||
@ -570,11 +581,11 @@ def run(
|
||||
|
||||
try:
|
||||
if config.should_reload:
|
||||
sock = config.bind_socket()
|
||||
ChangeReload(config, target=server.run, sockets=[sock]).run()
|
||||
with config.bind_socket() as sock:
|
||||
ChangeReload(config, target=server.run, sockets=[sock]).run()
|
||||
elif config.workers > 1:
|
||||
sock = config.bind_socket()
|
||||
Multiprocess(config, target=server.run, sockets=[sock]).run()
|
||||
with config.bind_socket() as sock:
|
||||
Multiprocess(config, target=server.run, sockets=[sock]).run()
|
||||
else:
|
||||
server.run()
|
||||
except KeyboardInterrupt:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user