663 lines
22 KiB
Python
663 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
import configparser
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import sys
|
|
from collections.abc import Callable, Iterator
|
|
from contextlib import closing
|
|
from pathlib import Path
|
|
from typing import IO, Any, Literal
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
import yaml
|
|
from pytest_mock import MockerFixture
|
|
|
|
from tests.custom_loop_utils import CustomLoop
|
|
from tests.utils import as_cwd, get_asyncio_default_loop_per_os
|
|
from uvicorn._types import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Environ, Scope, StartResponse
|
|
from uvicorn.config import Config, LoopFactoryType
|
|
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
|
|
from uvicorn.middleware.wsgi import WSGIMiddleware
|
|
from uvicorn.protocols.http.h11_impl import H11Protocol
|
|
|
|
|
|
@pytest.fixture
|
|
def mocked_logging_config_module(mocker: MockerFixture) -> MagicMock:
|
|
return mocker.patch("logging.config")
|
|
|
|
|
|
@pytest.fixture
|
|
def json_logging_config(logging_config: dict) -> str:
|
|
return json.dumps(logging_config)
|
|
|
|
|
|
@pytest.fixture
|
|
def yaml_logging_config(logging_config: dict) -> str:
|
|
return yaml.dump(logging_config)
|
|
|
|
|
|
async def asgi_app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
|
pass # pragma: nocover
|
|
|
|
|
|
def wsgi_app(environ: Environ, start_response: StartResponse) -> None:
|
|
pass # pragma: nocover
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"app, expected_should_reload",
|
|
[(asgi_app, False), ("tests.test_config:asgi_app", True)],
|
|
)
|
|
def test_config_should_reload_is_set(app: ASGIApplication, expected_should_reload: bool) -> None:
|
|
config = Config(app=app, reload=True)
|
|
assert config.reload is True
|
|
assert config.should_reload is expected_should_reload
|
|
|
|
|
|
def test_should_warn_on_invalid_reload_configuration(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None:
|
|
config_class = Config(app=asgi_app, reload_dirs=[str(tmp_path)])
|
|
assert not config_class.should_reload
|
|
assert len(caplog.records) == 1
|
|
assert (
|
|
caplog.records[-1].message == "Current configuration will not reload as not all conditions are met, "
|
|
"please refer to documentation."
|
|
)
|
|
|
|
config_no_reload = Config(app="tests.test_config:asgi_app", reload_dirs=[str(tmp_path)])
|
|
assert not config_no_reload.should_reload
|
|
assert len(caplog.records) == 2
|
|
assert (
|
|
caplog.records[-1].message == "Current configuration will not reload as not all conditions are met, "
|
|
"please refer to documentation."
|
|
)
|
|
|
|
|
|
def test_reload_dir_is_set(reload_directory_structure: Path, caplog: pytest.LogCaptureFixture) -> None:
|
|
app_dir = reload_directory_structure / "app"
|
|
with caplog.at_level(logging.INFO):
|
|
config = Config(app="tests.test_config:asgi_app", reload=True, reload_dirs=[str(app_dir)])
|
|
assert len(caplog.records) == 1
|
|
assert caplog.records[-1].message == f"Will watch for changes in these directories: {[str(app_dir)]}"
|
|
assert config.reload_dirs == [app_dir]
|
|
config = Config(app="tests.test_config:asgi_app", reload=True, reload_dirs=str(app_dir))
|
|
assert config.reload_dirs == [app_dir]
|
|
|
|
|
|
def test_non_existant_reload_dir_is_not_set(reload_directory_structure: Path, caplog: pytest.LogCaptureFixture) -> None:
|
|
with as_cwd(reload_directory_structure), caplog.at_level(logging.WARNING):
|
|
config = Config(app="tests.test_config:asgi_app", reload=True, reload_dirs=["reload"])
|
|
assert config.reload_dirs == [reload_directory_structure]
|
|
assert (
|
|
caplog.records[-1].message
|
|
== "Provided reload directories ['reload'] did not contain valid "
|
|
+ "directories, watching current working directory."
|
|
)
|
|
|
|
|
|
def test_reload_subdir_removal(reload_directory_structure: Path) -> None:
|
|
app_dir = reload_directory_structure / "app"
|
|
|
|
reload_dirs = [str(reload_directory_structure), "app", str(app_dir)]
|
|
|
|
with as_cwd(reload_directory_structure):
|
|
config = Config(app="tests.test_config:asgi_app", reload=True, reload_dirs=reload_dirs)
|
|
assert config.reload_dirs == [reload_directory_structure]
|
|
|
|
|
|
def test_reload_included_dir_is_added_to_reload_dirs(
|
|
reload_directory_structure: Path,
|
|
) -> None:
|
|
app_dir = reload_directory_structure / "app"
|
|
ext_dir = reload_directory_structure / "ext"
|
|
|
|
with as_cwd(reload_directory_structure):
|
|
config = Config(
|
|
app="tests.test_config:asgi_app",
|
|
reload=True,
|
|
reload_dirs=[str(app_dir)],
|
|
reload_includes=["*.js", str(ext_dir)],
|
|
)
|
|
assert frozenset(config.reload_dirs), frozenset([app_dir, ext_dir])
|
|
assert frozenset(config.reload_includes) == frozenset(["*.js", str(ext_dir)])
|
|
|
|
|
|
def test_reload_dir_subdirectories_are_removed(
|
|
reload_directory_structure: Path,
|
|
) -> None:
|
|
app_dir = reload_directory_structure / "app"
|
|
app_sub_dir = app_dir / "sub"
|
|
ext_dir = reload_directory_structure / "ext"
|
|
ext_sub_dir = ext_dir / "sub"
|
|
|
|
with as_cwd(reload_directory_structure):
|
|
config = Config(
|
|
app="tests.test_config:asgi_app",
|
|
reload=True,
|
|
reload_dirs=[
|
|
str(app_dir),
|
|
str(app_sub_dir),
|
|
str(ext_sub_dir),
|
|
str(ext_dir),
|
|
],
|
|
)
|
|
assert frozenset(config.reload_dirs) == frozenset([app_dir, ext_dir])
|
|
|
|
|
|
def test_reload_excluded_subdirectories_are_removed(
|
|
reload_directory_structure: Path,
|
|
) -> None:
|
|
app_dir = reload_directory_structure / "app"
|
|
app_sub_dir = app_dir / "sub"
|
|
|
|
with as_cwd(reload_directory_structure):
|
|
config = Config(
|
|
app="tests.test_config:asgi_app",
|
|
reload=True,
|
|
reload_excludes=[str(app_dir), str(app_sub_dir)],
|
|
)
|
|
assert frozenset(config.reload_dirs) == frozenset([reload_directory_structure])
|
|
assert frozenset(config.reload_dirs_excludes) == frozenset([app_dir])
|
|
assert frozenset(config.reload_excludes) == frozenset([str(app_dir), str(app_sub_dir)])
|
|
|
|
|
|
def test_reload_includes_exclude_dir_patterns_are_matched(
|
|
reload_directory_structure: Path, caplog: pytest.LogCaptureFixture
|
|
) -> None:
|
|
with caplog.at_level(logging.INFO):
|
|
first_app_dir = reload_directory_structure / "app_first" / "src"
|
|
second_app_dir = reload_directory_structure / "app_second" / "src"
|
|
|
|
with as_cwd(reload_directory_structure):
|
|
config = Config(
|
|
app="tests.test_config:asgi_app",
|
|
reload=True,
|
|
reload_includes=["*/src"],
|
|
reload_excludes=["app", "*third*"],
|
|
)
|
|
assert len(caplog.records) == 1
|
|
assert (
|
|
caplog.records[-1].message == "Will watch for changes in these directories: "
|
|
f"{sorted([str(first_app_dir), str(second_app_dir)])}"
|
|
)
|
|
assert frozenset(config.reload_dirs) == frozenset([first_app_dir, second_app_dir])
|
|
assert config.reload_includes == ["*/src"]
|
|
|
|
|
|
def test_wsgi_app() -> None:
|
|
config = Config(app=wsgi_app, interface="wsgi", proxy_headers=False)
|
|
config.load()
|
|
|
|
assert isinstance(config.loaded_app, WSGIMiddleware)
|
|
assert config.interface == "wsgi"
|
|
assert config.asgi_version == "3.0"
|
|
|
|
|
|
def test_proxy_headers() -> None:
|
|
config = Config(app=asgi_app)
|
|
config.load()
|
|
|
|
assert config.proxy_headers is True
|
|
assert isinstance(config.loaded_app, ProxyHeadersMiddleware)
|
|
|
|
|
|
def test_app_unimportable_module() -> None:
|
|
config = Config(app="no.such:app")
|
|
with pytest.raises(ImportError):
|
|
config.load()
|
|
|
|
|
|
def test_app_unimportable_other(caplog: pytest.LogCaptureFixture) -> None:
|
|
config = Config(app="tests.test_config:app")
|
|
with pytest.raises(SystemExit):
|
|
config.load()
|
|
error_messages = [
|
|
record.message for record in caplog.records if record.name == "uvicorn.error" and record.levelname == "ERROR"
|
|
]
|
|
assert (
|
|
'Error loading ASGI app. Attribute "app" not found in module "tests.test_config".' # noqa: E501
|
|
== error_messages.pop(0)
|
|
)
|
|
|
|
|
|
def test_app_factory(caplog: pytest.LogCaptureFixture) -> None:
|
|
def create_app() -> ASGIApplication:
|
|
return asgi_app
|
|
|
|
config = Config(app=create_app, factory=True, proxy_headers=False)
|
|
config.load()
|
|
assert config.loaded_app is asgi_app
|
|
|
|
# Flag not passed. In this case, successfully load the app, but issue a warning
|
|
# to indicate that an explicit flag is preferred.
|
|
caplog.clear()
|
|
config = Config(app=create_app, proxy_headers=False)
|
|
with caplog.at_level(logging.WARNING):
|
|
config.load()
|
|
assert config.loaded_app is asgi_app
|
|
assert len(caplog.records) == 1
|
|
assert "--factory" in caplog.records[0].message
|
|
|
|
# App not a no-arguments callable.
|
|
config = Config(app=asgi_app, factory=True)
|
|
with pytest.raises(SystemExit):
|
|
config.load()
|
|
|
|
|
|
def test_concrete_http_class() -> None:
|
|
config = Config(app=asgi_app, http=H11Protocol)
|
|
config.load()
|
|
assert config.http_protocol_class is H11Protocol
|
|
|
|
|
|
def test_socket_bind() -> None:
|
|
config = Config(app=asgi_app)
|
|
config.load()
|
|
sock = config.bind_socket()
|
|
assert isinstance(sock, socket.socket)
|
|
sock.close()
|
|
|
|
|
|
def test_ssl_config(
|
|
tls_ca_certificate_pem_path: str,
|
|
tls_ca_certificate_private_key_path: str,
|
|
) -> None:
|
|
config = Config(
|
|
app=asgi_app,
|
|
ssl_certfile=tls_ca_certificate_pem_path,
|
|
ssl_keyfile=tls_ca_certificate_private_key_path,
|
|
)
|
|
config.load()
|
|
|
|
assert config.is_ssl is True
|
|
|
|
|
|
def test_ssl_config_combined(tls_certificate_key_and_chain_path: str) -> None:
|
|
config = Config(
|
|
app=asgi_app,
|
|
ssl_certfile=tls_certificate_key_and_chain_path,
|
|
)
|
|
config.load()
|
|
|
|
assert config.is_ssl is True
|
|
|
|
|
|
def asgi2_app(scope: Scope) -> Callable:
|
|
async def asgi(receive: ASGIReceiveCallable, send: ASGISendCallable) -> None: # pragma: nocover
|
|
pass
|
|
|
|
return asgi # pragma: nocover
|
|
|
|
|
|
@pytest.mark.parametrize("app, expected_interface", [(asgi_app, "3.0"), (asgi2_app, "2.0")])
|
|
def test_asgi_version(app: ASGIApplication, expected_interface: Literal["2.0", "3.0"]) -> None:
|
|
config = Config(app=app)
|
|
config.load()
|
|
assert config.asgi_version == expected_interface
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"use_colors, expected",
|
|
[
|
|
pytest.param(None, None, id="use_colors_not_provided"),
|
|
pytest.param("invalid", None, id="use_colors_invalid_value"),
|
|
pytest.param(True, True, id="use_colors_enabled"),
|
|
pytest.param(False, False, id="use_colors_disabled"),
|
|
],
|
|
)
|
|
def test_log_config_default(
|
|
mocked_logging_config_module: MagicMock,
|
|
use_colors: bool | None,
|
|
expected: bool | None,
|
|
logging_config: dict[str, Any],
|
|
) -> None:
|
|
"""
|
|
Test that one can specify the use_colors option when using the default logging
|
|
config.
|
|
"""
|
|
config = Config(app=asgi_app, use_colors=use_colors, log_config=logging_config)
|
|
config.load()
|
|
|
|
mocked_logging_config_module.dictConfig.assert_called_once_with(logging_config)
|
|
|
|
(provided_dict_config,), _ = mocked_logging_config_module.dictConfig.call_args
|
|
assert provided_dict_config["formatters"]["default"]["use_colors"] == expected
|
|
|
|
|
|
def test_log_config_json(
|
|
mocked_logging_config_module: MagicMock,
|
|
logging_config: dict[str, Any],
|
|
json_logging_config: str,
|
|
mocker: MockerFixture,
|
|
) -> None:
|
|
"""
|
|
Test that one can load a json config from disk.
|
|
"""
|
|
mocked_open = mocker.patch("uvicorn.config.open", mocker.mock_open(read_data=json_logging_config))
|
|
|
|
config = Config(app=asgi_app, log_config="log_config.json")
|
|
config.load()
|
|
|
|
mocked_open.assert_called_once_with("log_config.json")
|
|
mocked_logging_config_module.dictConfig.assert_called_once_with(logging_config)
|
|
|
|
|
|
@pytest.mark.parametrize("config_filename", ["log_config.yml", "log_config.yaml"])
|
|
def test_log_config_yaml(
|
|
mocked_logging_config_module: MagicMock,
|
|
logging_config: dict[str, Any],
|
|
yaml_logging_config: str,
|
|
mocker: MockerFixture,
|
|
config_filename: str,
|
|
) -> None:
|
|
"""
|
|
Test that one can load a yaml config from disk.
|
|
"""
|
|
mocked_open = mocker.patch("uvicorn.config.open", mocker.mock_open(read_data=yaml_logging_config))
|
|
|
|
config = Config(app=asgi_app, log_config=config_filename)
|
|
config.load()
|
|
|
|
mocked_open.assert_called_once_with(config_filename)
|
|
mocked_logging_config_module.dictConfig.assert_called_once_with(logging_config)
|
|
|
|
|
|
def test_log_config_yaml_missing_pyyaml(mocked_logging_config_module: MagicMock, mocker: MockerFixture) -> None:
|
|
"""
|
|
Test that a helpful error is raised when PyYAML is not installed.
|
|
"""
|
|
mocker.patch.dict(sys.modules, {"yaml": None})
|
|
with pytest.raises(ImportError, match=r"Install the PyYAML package or uvicorn\[standard\]"):
|
|
Config(app=asgi_app, log_config="log_config.yaml")
|
|
|
|
|
|
def test_log_config_pathlike(
|
|
mocked_logging_config_module: MagicMock,
|
|
logging_config: dict[str, Any],
|
|
json_logging_config: str,
|
|
mocker: MockerFixture,
|
|
tmp_path: Path,
|
|
) -> None:
|
|
"""
|
|
Test that one can pass a `os.PathLike` (e.g. `pathlib.Path`) as the log config path.
|
|
"""
|
|
path = tmp_path / "log_config.json"
|
|
mocked_open = mocker.patch("uvicorn.config.open", mocker.mock_open(read_data=json_logging_config))
|
|
|
|
config = Config(app=asgi_app, log_config=path)
|
|
config.load()
|
|
|
|
mocked_open.assert_called_once_with(os.fspath(path))
|
|
mocked_logging_config_module.dictConfig.assert_called_once_with(logging_config)
|
|
|
|
|
|
@pytest.mark.parametrize("config_file", ["log_config.ini", configparser.ConfigParser(), io.StringIO()])
|
|
def test_log_config_file(
|
|
mocked_logging_config_module: MagicMock,
|
|
config_file: str | configparser.RawConfigParser | IO[Any],
|
|
) -> None:
|
|
"""
|
|
Test that one can load a configparser config from disk.
|
|
"""
|
|
config = Config(app=asgi_app, log_config=config_file)
|
|
config.load()
|
|
|
|
mocked_logging_config_module.fileConfig.assert_called_once_with(config_file, disable_existing_loggers=False)
|
|
|
|
|
|
@pytest.fixture(params=[0, 1])
|
|
def web_concurrency(request: pytest.FixtureRequest) -> Iterator[int]:
|
|
yield request.param
|
|
if os.getenv("WEB_CONCURRENCY"):
|
|
del os.environ["WEB_CONCURRENCY"]
|
|
|
|
|
|
@pytest.fixture(params=["127.0.0.1", "127.0.0.2"])
|
|
def forwarded_allow_ips(request: pytest.FixtureRequest) -> Iterator[str]:
|
|
yield request.param
|
|
if os.getenv("FORWARDED_ALLOW_IPS"):
|
|
del os.environ["FORWARDED_ALLOW_IPS"]
|
|
|
|
|
|
def test_env_file(
|
|
web_concurrency: int,
|
|
forwarded_allow_ips: str,
|
|
caplog: pytest.LogCaptureFixture,
|
|
tmp_path: Path,
|
|
) -> None:
|
|
"""
|
|
Test that one can load environment variables using an env file.
|
|
"""
|
|
fp = tmp_path / ".env"
|
|
content = f"WEB_CONCURRENCY={web_concurrency}\nFORWARDED_ALLOW_IPS={forwarded_allow_ips}\n"
|
|
fp.write_text(content)
|
|
with caplog.at_level(logging.INFO):
|
|
config = Config(app=asgi_app, env_file=fp)
|
|
config.load()
|
|
|
|
assert config.workers == int(str(os.getenv("WEB_CONCURRENCY")))
|
|
assert config.forwarded_allow_ips == os.getenv("FORWARDED_ALLOW_IPS")
|
|
assert len(caplog.records) == 1
|
|
assert f"Loading environment from '{fp}'" in caplog.records[0].message
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"access_log, handlers",
|
|
[
|
|
pytest.param(True, 1, id="access log enabled should have single handler"),
|
|
pytest.param(False, 0, id="access log disabled shouldn't have handlers"),
|
|
],
|
|
)
|
|
def test_config_access_log(access_log: bool, handlers: int) -> None:
|
|
config = Config(app=asgi_app, access_log=access_log)
|
|
config.load()
|
|
|
|
assert len(logging.getLogger("uvicorn.access").handlers) == handlers
|
|
assert config.access_log == access_log
|
|
|
|
|
|
@pytest.mark.parametrize("log_level", [5, 10, 20, 30, 40, 50])
|
|
def test_config_log_level(log_level: int) -> None:
|
|
config = Config(app=asgi_app, log_level=log_level)
|
|
config.load()
|
|
|
|
assert logging.getLogger("uvicorn.error").level == log_level
|
|
assert logging.getLogger("uvicorn.access").level == log_level
|
|
assert logging.getLogger("uvicorn.asgi").level == log_level
|
|
assert config.log_level == log_level
|
|
|
|
|
|
@pytest.mark.parametrize("log_level", [None, 0, 5, 10, 20, 30, 40, 50])
|
|
@pytest.mark.parametrize("uvicorn_logger_level", [0, 5, 10, 20, 30, 40, 50])
|
|
def test_config_log_effective_level(log_level: int, uvicorn_logger_level: int) -> None:
|
|
default_level = 30
|
|
log_config = {
|
|
"version": 1,
|
|
"disable_existing_loggers": False,
|
|
"loggers": {
|
|
"uvicorn": {"level": uvicorn_logger_level},
|
|
},
|
|
}
|
|
config = Config(app=asgi_app, log_level=log_level, log_config=log_config)
|
|
config.load()
|
|
|
|
effective_level = log_level or uvicorn_logger_level or default_level
|
|
assert logging.getLogger("uvicorn.error").getEffectiveLevel() == effective_level
|
|
assert logging.getLogger("uvicorn.access").getEffectiveLevel() == effective_level
|
|
assert logging.getLogger("uvicorn.asgi").getEffectiveLevel() == effective_level
|
|
|
|
|
|
@pytest.mark.parametrize("log_level", ["INFO", "Info", "info"])
|
|
def test_config_log_level_case_insensitive(log_level: str) -> None:
|
|
config = Config(app=asgi_app, log_level=log_level)
|
|
config.load()
|
|
assert logging.getLogger("uvicorn.error").level == logging.INFO
|
|
|
|
|
|
def test_ws_max_size() -> None:
|
|
config = Config(app=asgi_app, ws_max_size=1000)
|
|
config.load()
|
|
assert config.ws_max_size == 1000
|
|
|
|
|
|
def test_ws_max_queue() -> None:
|
|
config = Config(app=asgi_app, ws_max_queue=64)
|
|
config.load()
|
|
assert config.ws_max_queue == 64
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"reload, workers",
|
|
[
|
|
(True, 1),
|
|
(False, 2),
|
|
],
|
|
ids=["--reload=True --workers=1", "--reload=False --workers=2"],
|
|
)
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
|
|
def test_bind_unix_socket_works_with_reload_or_workers(
|
|
tmp_path: Path, reload: bool, workers: int, short_socket_name: str
|
|
): # pragma: py-win32
|
|
config = Config(app=asgi_app, uds=short_socket_name, reload=reload, workers=workers)
|
|
config.load()
|
|
sock = config.bind_socket()
|
|
assert isinstance(sock, socket.socket)
|
|
assert sock.family == socket.AF_UNIX
|
|
assert sock.getsockname() == short_socket_name
|
|
sock.close()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"reload, workers",
|
|
[
|
|
(True, 1),
|
|
(False, 2),
|
|
],
|
|
ids=["--reload=True --workers=1", "--reload=False --workers=2"],
|
|
)
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
|
|
def test_bind_fd_works_with_reload_or_workers(reload: bool, workers: int): # pragma: py-win32
|
|
fdsock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
fd = fdsock.fileno()
|
|
config = Config(app=asgi_app, fd=fd, reload=reload, workers=workers)
|
|
config.load()
|
|
sock = config.bind_socket()
|
|
assert isinstance(sock, socket.socket)
|
|
assert sock.family == socket.AF_UNIX
|
|
assert sock.getsockname() == ""
|
|
sock.close()
|
|
fdsock.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def stdin_socket() -> Iterator[socket.socket]: # pragma: py-win32
|
|
with closing(socket.socket(socket.AF_INET)) as sock:
|
|
sock.bind(("127.0.0.1", 0))
|
|
saved_stdin = os.dup(0)
|
|
os.dup2(sock.fileno(), 0)
|
|
try:
|
|
yield sock
|
|
finally:
|
|
os.dup2(saved_stdin, 0)
|
|
os.close(saved_stdin)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"reload, workers",
|
|
[
|
|
(True, 1),
|
|
(False, 2),
|
|
],
|
|
ids=["--reload=True --workers=1", "--reload=False --workers=2"],
|
|
)
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
|
|
def test_bind_stdin_works_with_reload_or_workers(
|
|
reload: bool, workers: int, stdin_socket: socket.socket
|
|
): # pragma: py-win32
|
|
config = Config(app=asgi_app, fd=0, reload=reload, workers=workers)
|
|
config.load()
|
|
with closing(config.bind_socket()) as sock:
|
|
assert sock.getsockname() == stdin_socket.getsockname()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"reload, workers, expected",
|
|
[
|
|
(True, 1, True),
|
|
(False, 2, True),
|
|
(False, 1, False),
|
|
],
|
|
ids=[
|
|
"--reload=True --workers=1",
|
|
"--reload=False --workers=2",
|
|
"--reload=False --workers=1",
|
|
],
|
|
)
|
|
def test_config_use_subprocess(reload: bool, workers: int, expected: bool):
|
|
config = Config(app=asgi_app, reload=reload, workers=workers)
|
|
config.load()
|
|
assert config.use_subprocess == expected
|
|
|
|
|
|
def test_warn_when_using_reload_and_workers(caplog: pytest.LogCaptureFixture) -> None:
|
|
Config(app=asgi_app, reload=True, workers=2)
|
|
assert len(caplog.records) == 1
|
|
assert '"workers" flag is ignored when reloading is enabled.' in caplog.records[0].message
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("loop_type", "expected_loop_factory"),
|
|
[
|
|
("none", None),
|
|
("asyncio", get_asyncio_default_loop_per_os()),
|
|
],
|
|
)
|
|
def test_get_loop_factory(loop_type: LoopFactoryType, expected_loop_factory: Any):
|
|
config = Config(app=asgi_app, loop=loop_type)
|
|
loop_factory = config.get_loop_factory()
|
|
if loop_factory is None:
|
|
assert expected_loop_factory is loop_factory
|
|
else:
|
|
loop = loop_factory()
|
|
with closing(loop):
|
|
assert loop is not None
|
|
assert isinstance(loop, expected_loop_factory)
|
|
|
|
|
|
def test_custom_loop__importable_custom_loop_setup_function() -> None:
|
|
config = Config(app=asgi_app, loop="tests.custom_loop_utils:CustomLoop")
|
|
config.load()
|
|
loop_factory = config.get_loop_factory()
|
|
assert loop_factory, "Loop factory should be set"
|
|
event_loop = loop_factory()
|
|
with closing(event_loop):
|
|
assert event_loop is not None
|
|
assert isinstance(event_loop, CustomLoop)
|
|
|
|
|
|
@pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning")
|
|
def test_custom_loop__not_importable_custom_loop_setup_function(caplog: pytest.LogCaptureFixture) -> None:
|
|
config = Config(app=asgi_app, loop="tests.test_config:non_existing_setup_function")
|
|
config.load()
|
|
with pytest.raises(SystemExit):
|
|
config.get_loop_factory()
|
|
error_messages = [
|
|
record.message for record in caplog.records if record.name == "uvicorn.error" and record.levelname == "ERROR"
|
|
]
|
|
assert (
|
|
'Error loading custom loop setup function. Attribute "non_existing_setup_function" not found in module "tests.test_config".' # noqa: E501
|
|
== error_messages.pop(0)
|
|
)
|
|
|
|
|
|
def test_setup_event_loop_is_removed(caplog: pytest.LogCaptureFixture) -> None:
|
|
config = Config(app=asgi_app)
|
|
with pytest.raises(
|
|
AttributeError, match="The `setup_event_loop` method was replaced by `get_loop_factory` in uvicorn 0.36.0."
|
|
):
|
|
config.setup_event_loop()
|