Compare commits
27 Commits
codex/thre
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
479a2c0c89 | ||
|
|
89347fd166 | ||
|
|
767315b38a | ||
|
|
f25ee43e68 | ||
|
|
8782666189 | ||
|
|
ad5ff87c86 | ||
|
|
6761b2c8f9 | ||
|
|
438f64834d | ||
|
|
10ddc6dd29 | ||
|
|
b499bc4510 | ||
|
|
b224045f59 | ||
|
|
7375b5bf66 | ||
|
|
d438fb16fe | ||
|
|
3e6b964466 | ||
|
|
2c423bd82b | ||
|
|
7f027f8e25 | ||
|
|
73a80c3cc8 | ||
|
|
45c0b568d3 | ||
|
|
850d92656d | ||
|
|
fdcacb4b83 | ||
|
|
70f247f9ee | ||
|
|
18edfa7012 | ||
|
|
77843e06dc | ||
|
|
3703339cdc | ||
|
|
fda70f37b0 | ||
|
|
f05fc928c0 | ||
|
|
6cdd61d15e |
4
.github/dependabot.yml
vendored
4
.github/dependabot.yml
vendored
@ -4,6 +4,8 @@ updates:
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "monthly"
|
||||
cooldown:
|
||||
default-days: 7
|
||||
groups:
|
||||
python-packages:
|
||||
patterns:
|
||||
@ -12,6 +14,8 @@ updates:
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: monthly
|
||||
cooldown:
|
||||
default-days: 7
|
||||
groups:
|
||||
github-actions:
|
||||
patterns:
|
||||
|
||||
2
.github/workflows/benchmark.yml
vendored
2
.github/workflows/benchmark.yml
vendored
@ -30,7 +30,7 @@ jobs:
|
||||
shell: bash
|
||||
|
||||
- name: Run the benchmarks
|
||||
uses: CodSpeedHQ/action@d872884a306dd4853acf0f584f4b706cf0cc72a2 # v4
|
||||
uses: CodSpeedHQ/action@db35df748deb45fdef0960669f57d627c1956c30 # v4
|
||||
with:
|
||||
mode: instrumentation
|
||||
run: uv run pytest tests/benchmarks/ --codspeed -n 0
|
||||
|
||||
2
.github/workflows/zizmor.yml
vendored
2
.github/workflows/zizmor.yml
vendored
@ -14,8 +14,6 @@ jobs:
|
||||
|
||||
permissions:
|
||||
security-events: write # Required for upload-sarif (used by zizmor-action) to upload SARIF files.
|
||||
contents: read # Only needed for private repos. Needed to clone the repo.
|
||||
actions: read # Only needed for private repos. Needed for upload-sarif to read workflow run info.
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
|
||||
319
docs/concepts/logging.md
Normal file
319
docs/concepts/logging.md
Normal file
@ -0,0 +1,319 @@
|
||||
Uvicorn uses Python's built-in [`logging`](https://docs.python.org/3/library/logging.html)
|
||||
module, and provides three loggers out of the box:
|
||||
|
||||
| Logger name | Purpose |
|
||||
|------------------|----------------------------------------------------|
|
||||
| `uvicorn` | Parent logger (rarely used directly) |
|
||||
| `uvicorn.error` | Server-level messages (startup, shutdown, errors) |
|
||||
| `uvicorn.access` | Per-request access log lines |
|
||||
|
||||
!!! note
|
||||
Despite its name, `uvicorn.error` is **not** limited to error messages.
|
||||
It is the general-purpose server logger, similar to how Gunicorn names its
|
||||
main logger. See [#562](https://github.com/encode/uvicorn/issues/562) for
|
||||
background.
|
||||
|
||||
## Default Configuration
|
||||
|
||||
By default, Uvicorn applies the following
|
||||
[`dictConfig()`](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig)
|
||||
configuration:
|
||||
|
||||
```python
|
||||
LOGGING_CONFIG = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"()": "uvicorn.logging.DefaultFormatter",
|
||||
"fmt": "%(levelprefix)s %(message)s",
|
||||
"use_colors": None,
|
||||
},
|
||||
"access": {
|
||||
"()": "uvicorn.logging.AccessFormatter",
|
||||
"fmt": '%(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s',
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"formatter": "default",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stderr",
|
||||
},
|
||||
"access": {
|
||||
"formatter": "access",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stdout",
|
||||
},
|
||||
},
|
||||
"loggers": {
|
||||
"uvicorn": {"handlers": ["default"], "level": "INFO", "propagate": False},
|
||||
"uvicorn.error": {"level": "INFO"},
|
||||
"uvicorn.access": {"handlers": ["access"], "level": "INFO", "propagate": False},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
## Custom Logging Configuration
|
||||
|
||||
You can supply a custom logging configuration file with the `--log-config`
|
||||
option (or `log_config` when calling `uvicorn.run()`).
|
||||
|
||||
Uvicorn supports three file formats:
|
||||
|
||||
| Extension | Loader | Notes |
|
||||
|----------------|------------------------------|---------------------------------------------|
|
||||
| `.json` | `logging.config.dictConfig` | Standard JSON `dictConfig` schema. |
|
||||
| `.yaml`/`.yml` | `logging.config.dictConfig` | Requires **PyYAML** (`uvicorn[standard]`). |
|
||||
| Any other | `logging.config.fileConfig` | Classic INI-style format. |
|
||||
|
||||
### YAML Example
|
||||
|
||||
Create a file named `log_config.yaml`:
|
||||
|
||||
```yaml
|
||||
version: 1
|
||||
disable_existing_loggers: false
|
||||
formatters:
|
||||
default:
|
||||
"()": uvicorn.logging.DefaultFormatter
|
||||
fmt: "%(asctime)s - %(levelprefix)s %(message)s"
|
||||
datefmt: "%Y-%m-%d %H:%M:%S"
|
||||
use_colors: null
|
||||
access:
|
||||
"()": uvicorn.logging.AccessFormatter
|
||||
fmt: '%(asctime)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s'
|
||||
datefmt: "%Y-%m-%d %H:%M:%S"
|
||||
handlers:
|
||||
default:
|
||||
formatter: default
|
||||
class: logging.StreamHandler
|
||||
stream: ext://sys.stderr
|
||||
access:
|
||||
formatter: access
|
||||
class: logging.StreamHandler
|
||||
stream: ext://sys.stdout
|
||||
loggers:
|
||||
uvicorn:
|
||||
handlers:
|
||||
- default
|
||||
level: INFO
|
||||
propagate: false
|
||||
uvicorn.error:
|
||||
level: INFO
|
||||
uvicorn.access:
|
||||
handlers:
|
||||
- access
|
||||
level: INFO
|
||||
propagate: false
|
||||
```
|
||||
|
||||
Then pass it to Uvicorn:
|
||||
|
||||
=== "CLI"
|
||||
|
||||
```bash
|
||||
uvicorn main:app --log-config log_config.yaml
|
||||
```
|
||||
|
||||
=== "Programmatic"
|
||||
|
||||
```python
|
||||
uvicorn.run("main:app", log_config="log_config.yaml")
|
||||
```
|
||||
|
||||
### JSON Example
|
||||
|
||||
Create a file named `log_config.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"version": 1,
|
||||
"disable_existing_loggers": false,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"()": "uvicorn.logging.DefaultFormatter",
|
||||
"fmt": "%(asctime)s - %(levelprefix)s %(message)s",
|
||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||
"use_colors": null
|
||||
},
|
||||
"access": {
|
||||
"()": "uvicorn.logging.AccessFormatter",
|
||||
"fmt": "%(asctime)s - %(levelprefix)s %(client_addr)s - \"%(request_line)s\" %(status_code)s",
|
||||
"datefmt": "%Y-%m-%d %H:%M:%S"
|
||||
}
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"formatter": "default",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stderr"
|
||||
},
|
||||
"access": {
|
||||
"formatter": "access",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stdout"
|
||||
}
|
||||
},
|
||||
"loggers": {
|
||||
"uvicorn": {
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": false
|
||||
},
|
||||
"uvicorn.error": {
|
||||
"level": "INFO"
|
||||
},
|
||||
"uvicorn.access": {
|
||||
"handlers": ["access"],
|
||||
"level": "INFO",
|
||||
"propagate": false
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Programmatic `dictConfig`
|
||||
|
||||
You can also pass a dictionary directly when running programmatically:
|
||||
|
||||
```python
|
||||
import uvicorn
|
||||
|
||||
log_config = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"()": "uvicorn.logging.DefaultFormatter",
|
||||
"fmt": "%(asctime)s - %(levelprefix)s %(message)s",
|
||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||
},
|
||||
"access": {
|
||||
"()": "uvicorn.logging.AccessFormatter",
|
||||
"fmt": '%(asctime)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s',
|
||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"formatter": "default",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stderr",
|
||||
},
|
||||
"access": {
|
||||
"formatter": "access",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stdout",
|
||||
},
|
||||
},
|
||||
"loggers": {
|
||||
"uvicorn": {"handlers": ["default"], "level": "INFO", "propagate": False},
|
||||
"uvicorn.error": {"level": "INFO"},
|
||||
"uvicorn.access": {"handlers": ["access"], "level": "INFO", "propagate": False},
|
||||
},
|
||||
}
|
||||
|
||||
uvicorn.run("main:app", log_config=log_config)
|
||||
```
|
||||
|
||||
## Common Recipes
|
||||
|
||||
### Writing Logs to a File
|
||||
|
||||
To write Uvicorn's server logs to a file in addition to the console, add a `FileHandler` to the `uvicorn` logger:
|
||||
|
||||
```yaml
|
||||
version: 1
|
||||
disable_existing_loggers: false
|
||||
formatters:
|
||||
default:
|
||||
"()": uvicorn.logging.DefaultFormatter
|
||||
fmt: "%(asctime)s - %(levelprefix)s %(message)s"
|
||||
datefmt: "%Y-%m-%d %H:%M:%S"
|
||||
use_colors: false
|
||||
access:
|
||||
"()": uvicorn.logging.AccessFormatter
|
||||
fmt: '%(asctime)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s'
|
||||
datefmt: "%Y-%m-%d %H:%M:%S"
|
||||
handlers:
|
||||
default:
|
||||
formatter: default
|
||||
class: logging.StreamHandler
|
||||
stream: ext://sys.stderr
|
||||
access:
|
||||
formatter: access
|
||||
class: logging.StreamHandler
|
||||
stream: ext://sys.stdout
|
||||
file:
|
||||
formatter: default
|
||||
class: logging.FileHandler
|
||||
filename: uvicorn.log
|
||||
loggers:
|
||||
uvicorn:
|
||||
handlers:
|
||||
- default
|
||||
- file
|
||||
level: INFO
|
||||
propagate: false
|
||||
uvicorn.error:
|
||||
level: INFO
|
||||
uvicorn.access:
|
||||
handlers:
|
||||
- access
|
||||
level: INFO
|
||||
propagate: false
|
||||
```
|
||||
|
||||
In this example, `uvicorn.access` still writes to stdout only. To write access
|
||||
logs to the file as well, add `file` to the `uvicorn.access.handlers` list.
|
||||
|
||||
### Disabling Access Logs
|
||||
|
||||
Use the `--no-access-log` CLI flag, or set `access_log=False` programmatically.
|
||||
This removes all handlers from `uvicorn.access` without affecting the
|
||||
`uvicorn.error` logger.
|
||||
|
||||
### Disabling Colors
|
||||
|
||||
Pass `--no-use-colors` on the command line, or set `use_colors=False`
|
||||
programmatically. When using a custom `--log-config`, set `use_colors: false`
|
||||
on each formatter that extends `uvicorn.logging.ColourizedFormatter`.
|
||||
|
||||
### Using a Standard Formatter
|
||||
|
||||
If you do not need Uvicorn's colorized output, you can use the standard
|
||||
`logging.Formatter` instead:
|
||||
|
||||
```yaml
|
||||
version: 1
|
||||
disable_existing_loggers: false
|
||||
formatters:
|
||||
default:
|
||||
format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||||
datefmt: "%Y-%m-%d %H:%M:%S"
|
||||
handlers:
|
||||
default:
|
||||
formatter: default
|
||||
class: logging.StreamHandler
|
||||
stream: ext://sys.stderr
|
||||
loggers:
|
||||
uvicorn:
|
||||
handlers:
|
||||
- default
|
||||
level: INFO
|
||||
propagate: false
|
||||
uvicorn.error:
|
||||
level: INFO
|
||||
uvicorn.access:
|
||||
handlers:
|
||||
- default
|
||||
level: INFO
|
||||
propagate: false
|
||||
```
|
||||
|
||||
!!! warning
|
||||
When using a standard `logging.Formatter` for the access logger, the
|
||||
`%(client_addr)s`, `%(request_line)s`, and `%(status_code)s` placeholders
|
||||
are **not** available. The access log line will be formatted using only the
|
||||
standard `%(message)s` field.
|
||||
57
docs/css/extra.css
Normal file
57
docs/css/extra.css
Normal file
@ -0,0 +1,57 @@
|
||||
.md-nav__sponsors {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
align-items: center;
|
||||
gap: 0.5rem;
|
||||
margin: 1.2rem 0.4rem 0.6rem;
|
||||
padding: 0.9rem 0.6rem 0.8rem;
|
||||
background-color: color-mix(in srgb, var(--md-primary-fg-color) 8%, transparent);
|
||||
border-radius: 0.4rem;
|
||||
}
|
||||
|
||||
.md-nav__sponsors-title {
|
||||
margin: 0 0 0.1rem;
|
||||
font-size: 0.6rem;
|
||||
font-weight: 700;
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.05em;
|
||||
color: var(--md-default-fg-color--light);
|
||||
}
|
||||
|
||||
.md-nav__sponsor {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
width: 100%;
|
||||
padding: 0.25rem;
|
||||
border-radius: 0.2rem;
|
||||
transition: opacity 0.15s;
|
||||
}
|
||||
|
||||
.md-nav__sponsor:hover {
|
||||
opacity: 0.75;
|
||||
}
|
||||
|
||||
.md-nav__sponsor img {
|
||||
max-width: 100%;
|
||||
max-height: 1.6rem;
|
||||
object-fit: contain;
|
||||
}
|
||||
|
||||
.md-nav__sponsor-cta {
|
||||
display: inline-block;
|
||||
margin-top: 0.15rem;
|
||||
padding: 0.25rem 0.6rem;
|
||||
font-size: 0.65rem;
|
||||
font-weight: 600;
|
||||
color: var(--md-primary-bg-color);
|
||||
background-color: var(--md-primary-fg-color);
|
||||
border-radius: 0.2rem;
|
||||
text-decoration: none;
|
||||
transition: opacity 0.15s;
|
||||
}
|
||||
|
||||
.md-nav__sponsor-cta:hover {
|
||||
opacity: 0.85;
|
||||
color: var(--md-primary-bg-color);
|
||||
}
|
||||
@ -70,30 +70,19 @@ A process manager will handle the socket setup, start-up multiple server process
|
||||
|
||||
### Built-in
|
||||
|
||||
Uvicorn includes a `--workers` option that allows you to run multiple worker instances.
|
||||
Uvicorn includes a `--workers` option that allows you to run multiple worker processes.
|
||||
|
||||
```bash
|
||||
$ uvicorn main:app --workers 4
|
||||
```
|
||||
|
||||
The default worker class is `process`. Uvicorn also includes a `thread` worker class:
|
||||
|
||||
```bash
|
||||
$ uvicorn main:app --workers 4 --worker-class thread
|
||||
```
|
||||
|
||||
The `thread` worker class was built specifically for free-threaded Python 3.14 runtimes.
|
||||
It requires a free-threaded build (`Py_GIL_DISABLED=1`) with the GIL disabled at runtime.
|
||||
|
||||
Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spawn`](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multiprocess manager to still work well on Windows.
|
||||
|
||||
The default `process` manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface.
|
||||
|
||||
The `thread` worker class automatically restarts worker threads that exit unexpectedly and uses cooperative healthchecks to replace stale worker threads. Unlike the `process` worker class, it cannot force-kill a hung thread. When a thread fails its healthcheck, Uvicorn starts a replacement thread and lets the previous thread continue draining if it is still running.
|
||||
The default process manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface.
|
||||
|
||||
You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.)
|
||||
|
||||
- `SIGHUP`: Work processeses are graceful restarted one after another. If you update the code, the new worker process will use the new code.
|
||||
- `SIGHUP`: Work processes are graceful restarted one after another. If you update the code, the new worker process will use the new code.
|
||||
- `SIGTTIN`: Increase the number of worker processes by one.
|
||||
- `SIGTTOU`: Decrease the number of worker processes by one.
|
||||
|
||||
@ -236,6 +225,36 @@ It's also possible to use certificates with uvicorn's worker for gunicorn.
|
||||
$ gunicorn --keyfile=./key.pem --certfile=./cert.pem -k uvicorn.workers.UvicornWorker main:app
|
||||
```
|
||||
|
||||
### Customizing the SSL context
|
||||
|
||||
For TLS scenarios that the `--ssl-*` flags don't cover (e.g., mutual TLS, custom `SSLContext.options`, bumping `minimum_version`, loading certificates from memory), pass an `ssl_context_factory` to `uvicorn.run()` or `Config`.
|
||||
|
||||
The factory receives the `Config` instance and a `default_ssl_context_factory` callable that builds the standard context from the `ssl_*` settings on `Config`. Use it to start from uvicorn's default and mutate it, or ignore it and build your own context from scratch - the `ssl_*` settings are only consumed by the default factory, so if you don't call it they're effectively unused.
|
||||
|
||||
```python
|
||||
import ssl
|
||||
from collections.abc import Callable
|
||||
|
||||
import uvicorn
|
||||
from uvicorn.config import Config
|
||||
|
||||
|
||||
def ssl_context_factory(config: Config, default_ssl_context_factory: Callable[[], ssl.SSLContext]) -> ssl.SSLContext:
|
||||
context = default_ssl_context_factory()
|
||||
context.minimum_version = ssl.TLSVersion.TLSv1_3
|
||||
return context
|
||||
|
||||
|
||||
uvicorn.run(
|
||||
"main:app",
|
||||
ssl_keyfile="key.pem",
|
||||
ssl_certfile="cert.pem",
|
||||
ssl_context_factory=ssl_context_factory,
|
||||
)
|
||||
```
|
||||
|
||||
The factory is called inside each worker process, so it works with `--reload` and `--workers > 1`. The factory itself must be picklable in those modes (a top-level function is fine; lambdas and local closures are not). The `ssl_*` settings on `Config` are only consumed by `default_ssl_context_factory()`; if you build the context yourself without calling it, those settings are ignored.
|
||||
|
||||
## Proxies and Forwarded Headers
|
||||
|
||||
When running an application behind one or more proxies, certain information about the request is lost.
|
||||
|
||||
BIN
docs/img/fastapi-logo.png
Normal file
BIN
docs/img/fastapi-logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 19 KiB |
@ -44,6 +44,18 @@ and means we're now able to start building a common set of tooling usable across
|
||||
|
||||
Uvicorn currently supports **HTTP/1.1** and **WebSockets**.
|
||||
|
||||
## Sponsorship
|
||||
|
||||
Help us keep Uvicorn maintained and sustainable by [becoming a sponsor](https://github.com/sponsors/Kludex).
|
||||
|
||||
**Current sponsors:**
|
||||
|
||||
<div style="display: flex; flex-wrap: wrap; gap: 2rem; align-items: center; margin: 1rem 0;">
|
||||
<a href="https://fastapi.tiangolo.com">
|
||||
<img src="img/fastapi-logo.png" alt="FastAPI" style="height: 80px;">
|
||||
</a>
|
||||
</div>
|
||||
|
||||
## Quickstart
|
||||
|
||||
**Uvicorn** is available on [PyPI](https://pypi.org/project/uvicorn/) so installation is as simple as:
|
||||
|
||||
@ -44,4 +44,15 @@
|
||||
{{ item.render(nav_item, path, 1) }}
|
||||
{% endfor %}
|
||||
</ul>
|
||||
|
||||
<!-- Sponsors -->
|
||||
<div class="md-nav__sponsors">
|
||||
<p class="md-nav__sponsors-title">Sponsors</p>
|
||||
<a href="https://fastapi.tiangolo.com" title="FastAPI" class="md-nav__sponsor">
|
||||
<img src="{{ 'img/fastapi-logo.png' | url }}" alt="FastAPI">
|
||||
</a>
|
||||
<a href="https://github.com/sponsors/Kludex" class="md-nav__sponsor-cta">
|
||||
Become a sponsor! ❤️
|
||||
</a>
|
||||
</div>
|
||||
</nav>
|
||||
|
||||
@ -2,6 +2,49 @@
|
||||
toc_depth: 2
|
||||
---
|
||||
|
||||
## 0.47.0 (May 14, 2026)
|
||||
|
||||
### Added
|
||||
|
||||
* Add `ssl_context_factory` for custom `SSLContext` configuration (#2920)
|
||||
|
||||
### Changed
|
||||
|
||||
* Eagerly import the ASGI app in the parent process (#2919)
|
||||
|
||||
### Fixed
|
||||
|
||||
* Treat `fd=0` as a valid file descriptor with reload/workers (#2927)
|
||||
|
||||
## 0.46.0 (April 23, 2026)
|
||||
|
||||
### Added
|
||||
|
||||
* Support `ws_max_size` in `wsproto` implementation (#2915)
|
||||
* Support `ws_ping_interval` and `ws_ping_timeout` in `wsproto` implementation (#2916)
|
||||
|
||||
### Changed
|
||||
|
||||
* Use `bytearray` for incoming WebSocket message buffer in `websockets-sansio` (#2917)
|
||||
|
||||
## 0.45.0 (April 21, 2026)
|
||||
|
||||
### Added
|
||||
|
||||
* Add `--reset-contextvars` flag to isolate ASGI request context (#2912)
|
||||
* Accept `os.PathLike` for `log_config` (#2905)
|
||||
* Accept `log_level` strings case-insensitively (#2907)
|
||||
|
||||
### Changed
|
||||
|
||||
* Revert "Emit `http.disconnect` on server shutdown for streaming responses" (#2913)
|
||||
* Revert "Explicitly start ASGI run with empty context" (#2911)
|
||||
|
||||
### Fixed
|
||||
|
||||
* Preserve forwarded client ports in proxy headers middleware (#2903)
|
||||
* Raise helpful `ImportError` when PyYAML is missing for YAML log config (#2906)
|
||||
|
||||
## 0.44.0 (April 6, 2026)
|
||||
|
||||
### Added
|
||||
|
||||
@ -39,6 +39,7 @@ uvicorn itself.
|
||||
* `APP` - The ASGI application to run, in the format `"<module>:<attribute>"`.
|
||||
* `--factory` - Treat `APP` as an application factory, i.e. a `() -> <ASGI app>` callable.
|
||||
* `--app-dir <path>` - Look for APP in the specified directory by adding it to the PYTHONPATH. **Default:** *Current working directory*.
|
||||
* `--reset-contextvars` - Run each ASGI request in a fresh `contextvars.Context`. Workaround for a [context leak in asyncio](https://github.com/python/cpython/issues/140947); only relevant when using the `asyncio` event loop (uvloop is not affected). Enabling this hides any context set in the lifespan or by external instrumentation from ASGI handlers. **Default:** *False*.
|
||||
|
||||
## Socket Binding
|
||||
|
||||
@ -73,21 +74,13 @@ Using Uvicorn with watchfiles will enable the following options (which are other
|
||||
|
||||
## Production
|
||||
|
||||
* `--workers <int>` - Number of worker instances. Defaults to the `$WEB_CONCURRENCY` environment variable if available, or 1. Not valid with `--reload`.
|
||||
* `--worker-class [process|thread]` - Worker implementation to use when running multiple workers. `process` is the default. `thread` was built specifically for free-threaded Python 3.14 runtimes and requires `Py_GIL_DISABLED=1` with the GIL disabled at runtime.
|
||||
* `--workers <int>` - Number of worker processes. Defaults to the `$WEB_CONCURRENCY` environment variable if available, or 1. Not valid with `--reload`.
|
||||
* `--env-file <path>` - Environment configuration file for the ASGI application. **Default:** *None*.
|
||||
* `--timeout-worker-healthcheck <int>` - Maximum number of seconds to wait for a worker to respond to a healthcheck. **Default:** *5*.
|
||||
|
||||
!!! note
|
||||
The `--reload` and `--workers` arguments are mutually exclusive. You cannot use both at the same time.
|
||||
|
||||
!!! note
|
||||
The `thread` worker class was built specifically for free-threaded Python 3.14.
|
||||
It uses cooperative healthchecks to detect and replace stale worker threads.
|
||||
Unlike the `process` worker class, it cannot force-kill a hung thread.
|
||||
When a thread fails its healthcheck, Uvicorn starts a replacement thread and
|
||||
lets the previous thread continue draining if it is still running.
|
||||
|
||||
## Logging
|
||||
|
||||
* `--log-config <path>` - Logging configuration file. **Options:** *`dictConfig()` formats: .json, .yaml*. Any other format will be processed with `fileConfig()`. Set the `formatters.default.use_colors` and `formatters.access.use_colors` values to override the auto-detected behavior.
|
||||
@ -101,10 +94,10 @@ Using Uvicorn with watchfiles will enable the following options (which are other
|
||||
* `--loop <str>` - Set the event loop implementation. The uvloop implementation provides greater performance, but is not compatible with Windows or PyPy. **Options:** *'auto', 'asyncio', 'uvloop'.* **Default:** *'auto'*.
|
||||
* `--http <str>` - Set the HTTP protocol implementation. The httptools implementation provides greater performance, but it not compatible with PyPy. **Options:** *'auto', 'h11', 'httptools'.* **Default:** *'auto'*.
|
||||
* `--ws <str>` - Set the WebSockets protocol implementation. Either of the `websockets` and `wsproto` packages are supported. There are two versions of `websockets` supported: `websockets` and `websockets-sansio`. Use `'none'` to ignore all websocket requests. **Options:** *'auto', 'none', 'websockets', 'websockets-sansio', 'wsproto'.* **Default:** *'auto'*.
|
||||
* `--ws-max-size <int>` - Set the WebSockets max message size, in bytes. Only available with the `websockets` protocol. **Default:** *16777216* (16 MB).
|
||||
* `--ws-max-size <int>` - Set the WebSockets max message size, in bytes. **Default:** *16777216* (16 MB).
|
||||
* `--ws-max-queue <int>` - Set the maximum length of the WebSocket incoming message queue. Only available with the `websockets` protocol. **Default:** *32*.
|
||||
* `--ws-ping-interval <float>` - Set the WebSockets ping interval, in seconds. Available with the `websockets` and `websockets-sansio` protocols. **Default:** *20.0*.
|
||||
* `--ws-ping-timeout <float>` - Set the WebSockets ping timeout, in seconds. Available with the `websockets` and `websockets-sansio` protocols. **Default:** *20.0*.
|
||||
* `--ws-ping-interval <float>` - Set the WebSockets ping interval, in seconds. **Default:** *20.0*.
|
||||
* `--ws-ping-timeout <float>` - Set the WebSockets ping timeout, in seconds. **Default:** *20.0*.
|
||||
* `--ws-per-message-deflate <bool>` - Enable/disable WebSocket per-message-deflate compression. Only available with the `websockets` protocol. **Default:** *True*.
|
||||
* `--lifespan <str>` - Set the Lifespan protocol implementation. **Options:** *'auto', 'on', 'off'.* **Default:** *'auto'*.
|
||||
* `--h11-max-incomplete-event-size <int>` - Set the maximum number of bytes to buffer of an incomplete event. Only available for `h11` HTTP protocol implementation. **Default:** *16384* (16 KB).
|
||||
@ -145,6 +138,8 @@ The [SSL context](https://docs.python.org/3/library/ssl.html#ssl.SSLContext) can
|
||||
|
||||
To understand more about the SSL context options, please refer to the [Python documentation](https://docs.python.org/3/library/ssl.html).
|
||||
|
||||
For advanced TLS scenarios that the flags above don't cover (e.g., mutual TLS, certificate pinning, custom `SSLContext.options`), pass an `ssl_context_factory` to `uvicorn.run()` or `Config`. See [Running with HTTPS](deployment/index.md#customizing-the-ssl-context) for details.
|
||||
|
||||
## Resource Limits
|
||||
|
||||
* `--limit-concurrency <int>` - Maximum number of concurrent connections or tasks to allow, before issuing HTTP 503 responses. Useful for ensuring known memory usage patterns even under over-resourced loads.
|
||||
|
||||
@ -1,184 +0,0 @@
|
||||
# ✨ Sponsor Starlette & Uvicorn ✨
|
||||
|
||||
Thank you for your interest in sponsoring Starlette and Uvicorn! ❤️
|
||||
|
||||
Your support *directly* contributes to the ongoing development, maintenance, and long-term sustainability of both projects.
|
||||
|
||||
<div style="display: flex; justify-content: center; gap: 4rem; margin: 2rem 0; text-align: center;">
|
||||
<div style="padding: 1rem;">
|
||||
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">67M+</h3>
|
||||
<p>Starlette Downloads/Month</p>
|
||||
</div>
|
||||
<div style="padding: 1rem;">
|
||||
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">57M+</h3>
|
||||
<p>Uvicorn Downloads/Month</p>
|
||||
</div>
|
||||
<div style="padding: 1rem;">
|
||||
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">19K+</h3>
|
||||
<p>Combined GitHub Stars</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
## Why Sponsor?
|
||||
|
||||
While Starlette and Uvicorn are part of the [Encode](https://github.com/encode) organization,
|
||||
they have been primarily maintained by [**Marcelo Trylesinski (Kludex)**](https://github.com/Kludex)
|
||||
for the past several years. His dedication and consistent work have been instrumental in keeping
|
||||
these projects robust, secure, and up-to-date.
|
||||
|
||||
This sponsorship page was created to give the community an opportunity to support Marcelo's continued
|
||||
efforts in maintaining and improving both projects. Your sponsorship directly enables him to
|
||||
dedicate more time and resources to maintaining and improving these essential tools:
|
||||
|
||||
- [x] **Active Development:** Developing new features, enhancing existing ones, and
|
||||
keeping both projects aligned with the latest developments in the Python and ASGI ecosystems. 💻
|
||||
- [x] **Community Support:** Providing better support, addressing user issues,
|
||||
and cultivating a welcoming environment for contributors. 🤝
|
||||
- [x] **Long-Term Stability:** Ensuring the long-term viability of both projects through strategic
|
||||
planning and addressing technical debt. 🌳
|
||||
- [x] **Bug Fixes & Maintenance:** Providing prompt attention to bug reports and
|
||||
general maintenance to keep the projects reliable. 🔨
|
||||
- [x] **Security:** Ensuring robust security practices, conducting regular security audits, and
|
||||
promptly addressing vulnerabilities to protect millions of production deployments. 🔒
|
||||
- [x] **Documentation:** Creating comprehensive guides, tutorials, and examples to help users of all skill levels. 📖
|
||||
|
||||
## How Sponsorship Works
|
||||
|
||||
We currently manage sponsorships *exclusively* through **GitHub Sponsors**. This platform integrates seamlessly with the GitHub ecosystem, making it easy for organizations to contribute.
|
||||
|
||||
<div style="text-align: center; padding: 2rem; margin: 2rem 0; background: linear-gradient(135deg, #6e5494, #24292e); border-radius: 10px; color: white;">
|
||||
<h2 style="color: white; margin-bottom: 1rem;">🌟 Become a Sponsor Today! 🌟</h2>
|
||||
<p style="margin-bottom: 1.5rem; font-size: 1.1em;">Your support helps keep Starlette and Uvicorn growing stronger!</p>
|
||||
<a href="https://github.com/sponsors/Kludex"
|
||||
style="display: inline-block; padding: 1rem 2rem; background-color: #238636; color: white; text-decoration: none; border-radius: 6px; font-size: 1.2em; font-weight: bold; transition: all 0.3s ease-in-out;"
|
||||
onmouseover="this.style.backgroundColor='#2ea043';this.style.transform='translateY(-2px)'"
|
||||
onmouseout="this.style.backgroundColor='#238636';this.style.transform='translateY(0)'">
|
||||
❤️ Sponsor on GitHub
|
||||
</a>
|
||||
</div>
|
||||
|
||||
## Sponsorship Tiers 🎁
|
||||
|
||||
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 1.5rem; margin: 2rem 0;">
|
||||
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; display: flex; flex-direction: column;">
|
||||
<h3 style="color: #cd7f32;">🥉 Bronze Sponsor</h3>
|
||||
<div style="font-size: 1.5em; margin: 1rem 0;">$100<span style="font-size: 0.6em;">/month</span></div>
|
||||
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
|
||||
<li>✓ Company name on Sponsors page</li>
|
||||
<li>✓ Small logo with link</li>
|
||||
<li>✓ Our eternal gratitude</li>
|
||||
</ul>
|
||||
<div style="text-align: center; margin-top: auto;">
|
||||
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #cd7f32; color: white; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
|
||||
Become a Bronze Sponsor
|
||||
</a>
|
||||
</div>
|
||||
</div>
|
||||
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; display: flex; flex-direction: column;">
|
||||
<h3 style="color: #c0c0c0;">🥈 Silver Sponsor</h3>
|
||||
<div style="font-size: 1.5em; margin: 1rem 0;">$250<span style="font-size: 0.6em;">/month</span></div>
|
||||
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
|
||||
<li>✓ All Bronze benefits</li>
|
||||
<li>✓ Medium-sized logo</li>
|
||||
<li>✓ Release notes mention</li>
|
||||
</ul>
|
||||
<div style="text-align: center; margin-top: auto;">
|
||||
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #c0c0c0; color: white; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
|
||||
Become a Silver Sponsor
|
||||
</a>
|
||||
</div>
|
||||
</div>
|
||||
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; position: relative; overflow: hidden; display: flex; flex-direction: column;">
|
||||
<div style="position: absolute; top: 10px; right: -25px; background: #238636; color: white; padding: 5px 30px; transform: rotate(45deg);">
|
||||
Popular
|
||||
</div>
|
||||
<h3 style="color: #ffd700;">🥇 Gold Sponsor</h3>
|
||||
<div style="font-size: 1.5em; margin: 1rem 0;">$500<span style="font-size: 0.6em;">/month</span></div>
|
||||
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
|
||||
<li>✓ All Silver benefits</li>
|
||||
<li>✓ Large logo on main pages</li>
|
||||
<li>✓ Priority support</li>
|
||||
</ul>
|
||||
<div style="text-align: center; margin-top: auto;">
|
||||
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #ffd700; color: black; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
|
||||
Become a Gold Sponsor
|
||||
</a>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div style="text-align: center; margin: 2rem 0;">
|
||||
<h3>🤝 Custom Sponsor</h3>
|
||||
<p>Looking for something different? <a href="mailto:marcelotryle@gmail.com">Contact us</a> to discuss custom sponsorship options!</p>
|
||||
</div>
|
||||
|
||||
## Current Sponsors
|
||||
|
||||
**Thank you to our generous sponsors!** 🙏
|
||||
|
||||
<div style="display: flex; flex-direction: column; gap: 3rem; margin: 2rem 0;">
|
||||
<div>
|
||||
<h3 style="text-align: center; color: #ffd700; margin-bottom: 1.5rem;">🏆 Gold Sponsors</h3>
|
||||
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
|
||||
<a href="https://fastapi.tiangolo.com" style="text-decoration: none;">
|
||||
<div style="width: 200px; background: #f6f8fa; border-radius: 8px; padding: 1rem; text-align: center;">
|
||||
<div style="height: 100px; display: flex; align-items: center; justify-content: center; margin-bottom: 0.75rem;">
|
||||
<img src="https://fastapi.tiangolo.com/img/logo-margin/logo-teal.png" alt="FastAPI" style="max-width: 100%; max-height: 100%; object-fit: contain;">
|
||||
</div>
|
||||
<p style="margin: 0; color: #57606a; font-size: 0.9em;">Modern, fast web framework for building APIs with Python 3.8+</p>
|
||||
</div>
|
||||
</a>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div>
|
||||
<h3 style="text-align: center; color: #c0c0c0; margin-bottom: 1.5rem;">🥈 Silver Sponsors</h3>
|
||||
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
|
||||
<!-- Add Silver Sponsors here -->
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div>
|
||||
<h3 style="text-align: center; color: #cd7f32; margin-bottom: 1.5rem;">🥉 Bronze Sponsors</h3>
|
||||
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
|
||||
<!-- Add Bronze Sponsors here -->
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
## Alternative Sponsorship Platforms
|
||||
|
||||
<div style="background: #f6f8fa; padding: 1.5rem; border-radius: 8px; margin: 2rem 0;">
|
||||
<h3>📢 We Want Your Input!</h3>
|
||||
<p>We are currently evaluating whether to expand our sponsorship options beyond GitHub Sponsors. If your company would be interested in sponsoring Starlette and Uvicorn but prefers to use a different platform (e.g., Open Collective, direct invoicing), please let us know!</p>
|
||||
<p>Your feedback is invaluable in helping us make sponsorship as accessible as possible. Share your thoughts by:</p>
|
||||
<ul>
|
||||
<li>Opening a discussion on our <a href="https://github.com/Kludex/starlette/discussions">GitHub repository</a></li>
|
||||
<li>Contacting us directly at <a href="mailto:marcelotryle@gmail.com">marcelotryle@gmail.com</a></li>
|
||||
</ul>
|
||||
</div>
|
||||
|
||||
<a id="acknowledgments"></a>
|
||||
|
||||
## Community & Future Plans 🌟
|
||||
|
||||
We want to express our deepest gratitude to all the contributors who have helped shape Starlette and
|
||||
Uvicorn over the years. These projects wouldn't be what they are today without the incredible work of
|
||||
every single contributor.
|
||||
|
||||
Special thanks to some of our most impactful contributors:
|
||||
|
||||
- **Tom Christie** ([@tomchristie](https://github.com/tomchristie)) - The original creator of Starlette and Uvicorn.
|
||||
- **Adrian Garcia Badaracco** ([@adriangb](https://github.com/adriangb)) - Major contributor to Starlette.
|
||||
- **Thomas Grainger** ([@graingert](https://github.com/graingert)) - Major contributor to AnyIO, and significant contributions to Starlette and Uvicorn.
|
||||
- **Alex Grönholm** ([@agronholm](https://github.com/agronholm)) - Creator of AnyIO.
|
||||
- **Florimond Manca** ([@florimondmanca](https://github.com/florimondmanca)) - Important contributions to Starlette and Uvicorn.
|
||||
|
||||
If you want your name removed from the list above, or if I forgot a significant contributor, please let me know.
|
||||
You can view all contributors on GitHub:
|
||||
[Starlette Contributors](https://github.com/Kludex/starlette/graphs/contributors) / [Uvicorn Contributors](https://github.com/Kludex/uvicorn/graphs/contributors).
|
||||
|
||||
While the current sponsorship program directly supports Marcelo's maintenance work, we are exploring ways
|
||||
to distribute funding to other key contributors in the future. This initiative is still in early planning
|
||||
stages, as we want to ensure a fair and sustainable model that recognizes the valuable contributions of
|
||||
our community members.
|
||||
@ -54,6 +54,7 @@ nav:
|
||||
- Concepts:
|
||||
- ASGI: concepts/asgi.md
|
||||
- Lifespan: concepts/lifespan.md
|
||||
- Logging: concepts/logging.md
|
||||
- WebSockets: concepts/websockets.md
|
||||
- Event Loop: concepts/event-loop.md
|
||||
- Deployment:
|
||||
@ -61,7 +62,6 @@ nav:
|
||||
- Docker: deployment/docker.md
|
||||
- Release Notes: release-notes.md
|
||||
- Contributing: contributing.md
|
||||
- Sponsorship: sponsorship.md
|
||||
|
||||
extra:
|
||||
analytics:
|
||||
@ -79,6 +79,9 @@ extra:
|
||||
- icon: fontawesome/solid/globe
|
||||
link: https://fastapiexpert.com
|
||||
|
||||
extra_css:
|
||||
- css/extra.css
|
||||
|
||||
markdown_extensions:
|
||||
- attr_list
|
||||
- admonition
|
||||
|
||||
@ -53,7 +53,7 @@ dev = [
|
||||
# We add uvicorn[standard] so `uv sync` considers the extras.
|
||||
"uvicorn[standard]",
|
||||
"ruff==0.15.1",
|
||||
"pytest==9.0.2",
|
||||
"pytest==9.0.3",
|
||||
"pytest-mock==3.15.1",
|
||||
"pytest-xdist[psutil]==3.8.0",
|
||||
"pytest-codspeed>=4.1.1",
|
||||
@ -82,7 +82,8 @@ docs = [
|
||||
|
||||
[tool.uv]
|
||||
default-groups = ["dev", "docs"]
|
||||
required-version = ">=0.8.6"
|
||||
required-version = ">=0.9.17"
|
||||
exclude-newer = "7 days"
|
||||
|
||||
[project.scripts]
|
||||
uvicorn = "uvicorn.main:main"
|
||||
|
||||
@ -131,7 +131,7 @@ class MockLoop:
|
||||
self._tasks: list[asyncio.Task[Any]] = []
|
||||
self._later: list[MockTimerHandle] = []
|
||||
|
||||
def create_task(self, coroutine: Any, **kwargs: Any) -> Any:
|
||||
def create_task(self, coroutine: Any) -> Any:
|
||||
self._tasks.insert(0, coroutine)
|
||||
return MockTask()
|
||||
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import ipaddress
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import httpx
|
||||
import httpx._transports.asgi
|
||||
import pytest
|
||||
import websockets.client
|
||||
|
||||
@ -30,6 +31,9 @@ async def default_app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISend
|
||||
client_addr = "NONE" # pragma: no cover
|
||||
else:
|
||||
host, port = client
|
||||
with contextlib.suppress(ValueError):
|
||||
if ipaddress.ip_address(host).version == 6:
|
||||
host = f"[{host}]"
|
||||
client_addr = f"{host}:{port}"
|
||||
|
||||
response = Response(f"{scheme}://{client_addr}", media_type="text/plain")
|
||||
@ -426,6 +430,31 @@ async def test_proxy_headers_multiple_proxies(trusted_hosts: str | list[str], ex
|
||||
assert response.text == expected
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize(
|
||||
("trusted_hosts", "expected"),
|
||||
[
|
||||
# always trust
|
||||
("*", "https://1.2.3.4:1234"),
|
||||
# all proxies are trusted
|
||||
(["127.0.0.1", "2001:db8::1", "192.168.0.2"], "https://1.2.3.4:1234"),
|
||||
# should set first untrusted as remote address
|
||||
(["192.168.0.2", "127.0.0.1"], "https://[2001:db8::1]:8080"),
|
||||
# Mixed literals and networks
|
||||
(["127.0.0.1", "2001:db8::/32", "192.168.0.2"], "https://1.2.3.4:1234"),
|
||||
],
|
||||
)
|
||||
async def test_proxy_headers_multiple_proxies_with_ports(trusted_hosts: str | list[str], expected: str) -> None:
|
||||
async with make_httpx_client(trusted_hosts) as client:
|
||||
headers = {
|
||||
X_FORWARDED_FOR: "1.2.3.4:1234, [2001:db8::1]:8080, 192.168.0.2:9000",
|
||||
X_FORWARDED_PROTO: "https",
|
||||
}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert response.text == expected
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_proxy_headers_invalid_x_forwarded_for() -> None:
|
||||
async with make_httpx_client("*") as client:
|
||||
@ -441,6 +470,38 @@ async def test_proxy_headers_invalid_x_forwarded_for() -> None:
|
||||
assert response.text == "https://1.2.3.4:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize(
|
||||
("forwarded_for", "expected"),
|
||||
[
|
||||
# IPv4 without port
|
||||
("1.2.3.4", "https://1.2.3.4:0"),
|
||||
# IPv4 with port
|
||||
("1.2.3.4:1234", "https://1.2.3.4:1234"),
|
||||
# Bracketed IPv6 with port
|
||||
("[2001:db8::1]:443", "https://[2001:db8::1]:443"),
|
||||
# Bracketed IPv6 without port
|
||||
("[2001:db8::1]", "https://[2001:db8::1]:0"),
|
||||
# Bare IPv6 without port
|
||||
("2001:db8::1", "https://[2001:db8::1]:0"),
|
||||
# Invalid IPv4 port falls back to the original host value
|
||||
("1.2.3.4:notaport", "https://1.2.3.4:notaport:0"),
|
||||
# Invalid bracketed IPv6 port keeps the host and drops the port
|
||||
("[2001:db8::1]:notaport", "https://[2001:db8::1]:0"),
|
||||
# Trailing data after a bracketed IPv6 host is left untouched
|
||||
("[2001:db8::1]extra", "https://[2001:db8::1]extra:0"),
|
||||
# Malformed bracket is left untouched
|
||||
("[2001:db8::1", "https://[2001:db8::1:0"),
|
||||
],
|
||||
)
|
||||
async def test_proxy_headers_x_forwarded_for_port_shapes(forwarded_for: str, expected: str) -> None:
|
||||
async with make_httpx_client("*") as client:
|
||||
headers = {X_FORWARDED_FOR: forwarded_for, X_FORWARDED_PROTO: "https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert response.text == expected
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize(
|
||||
"forwarded_proto,expected",
|
||||
|
||||
@ -226,7 +226,7 @@ class MockLoop:
|
||||
self._tasks: list[asyncio.Task[Any]] = []
|
||||
self._later: list[MockTimerHandle] = []
|
||||
|
||||
def create_task(self, coroutine: Any, **kwargs: Any) -> Any:
|
||||
def create_task(self, coroutine: Any) -> Any:
|
||||
self._tasks.insert(0, coroutine)
|
||||
return MockTask()
|
||||
|
||||
@ -775,76 +775,6 @@ async def test_shutdown_during_idle(http_protocol_cls: type[HTTPProtocol]):
|
||||
assert protocol.transport.is_closing()
|
||||
|
||||
|
||||
async def test_shutdown_during_streaming_sends_disconnect(http_protocol_cls: type[HTTPProtocol]):
|
||||
"""When the server shuts down during an SSE/streaming response,
|
||||
receive() should return http.disconnect so the ASGI app can stop."""
|
||||
got_disconnect_event = False
|
||||
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
|
||||
nonlocal got_disconnect_event
|
||||
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.start",
|
||||
"status": 200,
|
||||
"headers": [(b"content-type", b"text/event-stream")],
|
||||
}
|
||||
)
|
||||
await send({"type": "http.response.body", "body": b"data: hello\n\n", "more_body": True})
|
||||
|
||||
# This simulates an SSE app waiting for disconnect
|
||||
message = await receive()
|
||||
if message["type"] == "http.disconnect":
|
||||
got_disconnect_event = True
|
||||
|
||||
protocol = get_connected_protocol(app, http_protocol_cls)
|
||||
protocol.data_received(SIMPLE_GET_REQUEST)
|
||||
# Trigger server shutdown while the app is streaming
|
||||
protocol.shutdown() # type: ignore[attr-defined]
|
||||
await protocol.loop.run_one()
|
||||
assert got_disconnect_event
|
||||
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
|
||||
assert b"data: hello" in protocol.transport.buffer
|
||||
assert protocol.transport.is_closing()
|
||||
|
||||
|
||||
async def test_shutdown_during_streaming_allows_send_before_exit(http_protocol_cls: type[HTTPProtocol]):
|
||||
"""During server shutdown, the app should still be able to send() data
|
||||
(e.g., a farewell SSE event) before returning."""
|
||||
farewell_sent = False
|
||||
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
|
||||
nonlocal farewell_sent
|
||||
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.start",
|
||||
"status": 200,
|
||||
"headers": [
|
||||
(b"content-type", b"text/event-stream"),
|
||||
(b"transfer-encoding", b"chunked"),
|
||||
],
|
||||
}
|
||||
)
|
||||
await send({"type": "http.response.body", "body": b"data: hello\n\n", "more_body": True})
|
||||
|
||||
# Wait for disconnect
|
||||
message = await receive()
|
||||
assert message["type"] == "http.disconnect"
|
||||
|
||||
# Send a farewell event — this should still work since the transport is open
|
||||
await send({"type": "http.response.body", "body": b"data: goodbye\n\n", "more_body": True})
|
||||
farewell_sent = True
|
||||
|
||||
protocol = get_connected_protocol(app, http_protocol_cls)
|
||||
protocol.data_received(SIMPLE_GET_REQUEST)
|
||||
protocol.shutdown() # type: ignore[attr-defined]
|
||||
await protocol.loop.run_one()
|
||||
assert farewell_sent
|
||||
assert b"data: hello" in protocol.transport.buffer
|
||||
assert b"data: goodbye" in protocol.transport.buffer
|
||||
|
||||
|
||||
async def test_100_continue_sent_when_body_consumed(http_protocol_cls: type[HTTPProtocol]):
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
|
||||
body = b""
|
||||
|
||||
@ -10,6 +10,7 @@ import websockets
|
||||
import websockets.client
|
||||
import websockets.exceptions
|
||||
from websockets.extensions.permessage_deflate import ClientPerMessageDeflateFactory
|
||||
from websockets.frames import Opcode
|
||||
from websockets.typing import Subprotocol
|
||||
|
||||
from tests.response import Response
|
||||
@ -43,6 +44,7 @@ if TYPE_CHECKING:
|
||||
|
||||
HTTPProtocol: TypeAlias = "type[H11Protocol | HttpToolsProtocol]"
|
||||
WSProtocol: TypeAlias = "type[_WSProtocol | WebSocketProtocol]"
|
||||
KeepaliveWSProtocol: TypeAlias = "type[_WSProtocol | WebSocketsSansIOProtocol]"
|
||||
|
||||
pytestmark = pytest.mark.anyio
|
||||
|
||||
@ -751,6 +753,61 @@ async def test_send_binary_data_to_server_bigger_than_default_on_websockets(
|
||||
assert ws.close_code == expected_result
|
||||
|
||||
|
||||
async def test_fragmented_message_exceeding_max_size(
|
||||
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
|
||||
):
|
||||
"""Stream non-FIN fragments past `ws_max_size` - the server must close with 1009."""
|
||||
|
||||
class App(WebSocketResponse):
|
||||
async def websocket_connect(self, message: WebSocketConnectEvent):
|
||||
await self.send({"type": "websocket.accept"})
|
||||
|
||||
config = Config(
|
||||
app=App, ws=ws_protocol_cls, http=http_protocol_cls, lifespan="off", ws_max_size=2048, port=unused_tcp_port
|
||||
)
|
||||
async with run_server(config):
|
||||
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}") as ws:
|
||||
payload = b"A" * 1024
|
||||
with pytest.raises(websockets.exceptions.ConnectionClosed) as exc_info:
|
||||
await ws.write_frame(False, Opcode.BINARY, payload)
|
||||
for _ in range(63): # 64 KiB total, well past 2 KiB budget
|
||||
await ws.write_frame(False, Opcode.CONT, payload)
|
||||
await ws.recv()
|
||||
assert exc_info.value.rcvd is not None
|
||||
assert exc_info.value.rcvd.code == 1009
|
||||
|
||||
|
||||
async def test_fragmented_message_reassembly(
|
||||
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
|
||||
):
|
||||
"""Server reassembles a fragmented message and delivers it to the app intact."""
|
||||
|
||||
received: list[bytes] = []
|
||||
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
|
||||
assert scope["type"] == "websocket"
|
||||
connect = await receive()
|
||||
assert connect["type"] == "websocket.connect"
|
||||
await send({"type": "websocket.accept"})
|
||||
message = await receive()
|
||||
assert message["type"] == "websocket.receive"
|
||||
payload = message.get("bytes")
|
||||
assert payload is not None
|
||||
received.append(payload)
|
||||
await send({"type": "websocket.close"})
|
||||
|
||||
config = Config(app=app, ws=ws_protocol_cls, http=http_protocol_cls, lifespan="off", port=unused_tcp_port)
|
||||
async with run_server(config):
|
||||
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}") as ws:
|
||||
payload = b"A" * 512
|
||||
await ws.write_frame(False, Opcode.BINARY, payload)
|
||||
for _ in range(4):
|
||||
await ws.write_frame(False, Opcode.CONT, payload)
|
||||
await ws.write_frame(True, Opcode.CONT, payload)
|
||||
|
||||
assert received == [b"A" * 512 * 6]
|
||||
|
||||
|
||||
async def test_server_reject_connection(
|
||||
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
|
||||
):
|
||||
@ -1205,7 +1262,27 @@ async def test_lifespan_state(ws_protocol_cls: WSProtocol, http_protocol_cls: HT
|
||||
assert expected_states == actual_states
|
||||
|
||||
|
||||
async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
|
||||
@pytest.fixture(
|
||||
params=[
|
||||
pytest.param(
|
||||
"uvicorn.protocols.websockets.wsproto_impl:WSProtocol",
|
||||
marks=skip_if_no_wsproto,
|
||||
id="wsproto",
|
||||
),
|
||||
pytest.param(
|
||||
"uvicorn.protocols.websockets.websockets_sansio_impl:WebSocketsSansIOProtocol", id="websockets-sansio"
|
||||
),
|
||||
]
|
||||
)
|
||||
def keepalive_ws_protocol_cls(request: pytest.FixtureRequest):
|
||||
from uvicorn.importer import import_from_string
|
||||
|
||||
return import_from_string(request.param)
|
||||
|
||||
|
||||
async def test_server_keepalive_ping_pong(
|
||||
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
|
||||
):
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
|
||||
while True:
|
||||
message = await receive()
|
||||
@ -1216,7 +1293,7 @@ async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unuse
|
||||
|
||||
config = Config(
|
||||
app=app,
|
||||
ws=WebSocketsSansIOProtocol,
|
||||
ws=keepalive_ws_protocol_cls,
|
||||
http=http_protocol_cls,
|
||||
lifespan="off",
|
||||
ws_ping_interval=0.1,
|
||||
@ -1227,18 +1304,24 @@ async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unuse
|
||||
# The websockets client auto-responds to ping frames, keeping the connection alive.
|
||||
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}", ping_interval=None):
|
||||
protocol = list(server.server_state.connections)[0]
|
||||
assert isinstance(protocol, WebSocketsSansIOProtocol)
|
||||
assert isinstance(protocol, (_WSProtocol, WebSocketsSansIOProtocol))
|
||||
|
||||
# Wait until at least one ping/pong roundtrip completes.
|
||||
async def ping_roundtrip() -> None:
|
||||
while protocol.last_ping_rtt == 0.0:
|
||||
await asyncio.sleep(0.1)
|
||||
# Wait until the server sends at least one keepalive ping, then
|
||||
# sleep past the timeout window and ensure the connection stays open.
|
||||
# This verifies that the client answered the ping without depending
|
||||
# on clock granularity for the measured RTT.
|
||||
async def ping_sent() -> None:
|
||||
while protocol.ping_sent_at == 0.0:
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
await asyncio.wait_for(ping_roundtrip(), timeout=5.0)
|
||||
assert protocol.last_ping_rtt > 0
|
||||
await asyncio.wait_for(ping_sent(), timeout=5.0)
|
||||
await asyncio.sleep(0.2)
|
||||
assert not protocol.transport.is_closing()
|
||||
|
||||
|
||||
async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
|
||||
async def test_server_keepalive_ping_timeout(
|
||||
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
|
||||
):
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
|
||||
while True:
|
||||
message = await receive()
|
||||
@ -1249,7 +1332,7 @@ async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, un
|
||||
|
||||
config = Config(
|
||||
app=app,
|
||||
ws=WebSocketsSansIOProtocol,
|
||||
ws=keepalive_ws_protocol_cls,
|
||||
http=http_protocol_cls,
|
||||
lifespan="off",
|
||||
ws_ping_interval=0.1,
|
||||
@ -1268,7 +1351,9 @@ async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, un
|
||||
assert exc_info.value.rcvd.reason == "keepalive ping timeout"
|
||||
|
||||
|
||||
async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
|
||||
async def test_server_keepalive_disabled(
|
||||
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
|
||||
):
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
|
||||
while True:
|
||||
message = await receive()
|
||||
@ -1279,7 +1364,7 @@ async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused
|
||||
|
||||
config = Config(
|
||||
app=app,
|
||||
ws=WebSocketsSansIOProtocol,
|
||||
ws=keepalive_ws_protocol_cls,
|
||||
http=http_protocol_cls,
|
||||
lifespan="off",
|
||||
ws_ping_interval=None,
|
||||
@ -1288,5 +1373,5 @@ async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused
|
||||
async with run_server(config) as server:
|
||||
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}", ping_interval=None):
|
||||
protocol = list(server.server_state.connections)[0]
|
||||
assert isinstance(protocol, WebSocketsSansIOProtocol)
|
||||
assert isinstance(protocol, (_WSProtocol, WebSocketsSansIOProtocol))
|
||||
assert protocol.ping_timer is None
|
||||
|
||||
@ -1,323 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import signal
|
||||
import socket
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from uvicorn import Config
|
||||
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
|
||||
from uvicorn.server import Server
|
||||
from uvicorn.supervisors.multithread import Multithread, Thread
|
||||
|
||||
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
pass # pragma: no cover
|
||||
|
||||
|
||||
class FakeThread:
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
target: Callable[[list[socket.socket] | None], None],
|
||||
sockets: list[socket.socket],
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.target = target
|
||||
self.sockets = sockets
|
||||
self.started = False
|
||||
self.alive = True
|
||||
self.terminated = False
|
||||
self.joined = False
|
||||
self.healthy = True
|
||||
self.join_result = True
|
||||
self.ready_for_healthcheck = True
|
||||
|
||||
def is_alive(self) -> bool:
|
||||
return self.alive
|
||||
|
||||
def is_healthy(self, timeout: float) -> bool:
|
||||
return self.healthy
|
||||
|
||||
def is_ready_for_healthcheck(self) -> bool:
|
||||
return self.ready_for_healthcheck
|
||||
|
||||
def start(self) -> None:
|
||||
self.started = True
|
||||
|
||||
def terminate(self) -> None:
|
||||
self.terminated = True
|
||||
self.alive = False
|
||||
|
||||
def join(self, timeout: float | None = None) -> bool:
|
||||
self.joined = True
|
||||
if self.join_result:
|
||||
self.alive = False
|
||||
return self.join_result
|
||||
|
||||
|
||||
def test_thread_target_passes_duplicated_sockets() -> None:
|
||||
captured_sockets: list[socket.socket] | None = None
|
||||
|
||||
def target(sockets: list[socket.socket] | None) -> None:
|
||||
nonlocal captured_sockets
|
||||
captured_sockets = sockets
|
||||
|
||||
sock = socket.socket()
|
||||
try:
|
||||
thread = Thread(Config(app=app), target=target, sockets=[sock])
|
||||
thread.target()
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
assert captured_sockets is not None
|
||||
assert len(captured_sockets) == 1
|
||||
assert captured_sockets[0].fileno() == -1
|
||||
|
||||
|
||||
def test_thread_terminate_sets_server_exit_flag() -> None:
|
||||
config = Config(app=app)
|
||||
thread = Thread(config, target=Server(config).run, sockets=[])
|
||||
target = thread._get_target()
|
||||
|
||||
assert thread.server is not None
|
||||
assert isinstance(thread.server, Server)
|
||||
assert target.__self__ is thread.server
|
||||
assert thread.server.should_exit is False
|
||||
assert thread.config.callback_progress is not None
|
||||
assert thread.config.callback_progress.__self__ is thread
|
||||
assert thread.config.callback_progress.__func__ is thread.record_heartbeat.__func__
|
||||
|
||||
thread.terminate()
|
||||
|
||||
assert thread.server.should_exit is True
|
||||
|
||||
|
||||
def test_thread_record_heartbeat_and_is_healthy() -> None:
|
||||
thread = Thread(Config(app=app, timeout_worker_healthcheck=1), target=lambda sockets: None, sockets=[])
|
||||
thread.last_heartbeat -= 5
|
||||
|
||||
assert thread.is_healthy(1) is False
|
||||
|
||||
thread.record_heartbeat()
|
||||
|
||||
assert thread.is_healthy(1) is True
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_server_progress_callback_records_heartbeat_on_tick() -> None:
|
||||
thread = Thread(Config(app=app), target=lambda sockets: None, sockets=[])
|
||||
config = Config(app=app, callback_progress=thread.record_heartbeat)
|
||||
server = Server(config=config)
|
||||
before = thread.last_heartbeat
|
||||
|
||||
thread.last_heartbeat -= 5
|
||||
await server.on_tick(10)
|
||||
|
||||
assert thread.last_heartbeat > before
|
||||
|
||||
|
||||
def test_thread_start_and_join() -> None:
|
||||
finished = threading.Event()
|
||||
|
||||
def target(sockets: list[socket.socket] | None) -> None:
|
||||
finished.set()
|
||||
|
||||
thread = Thread(Config(app=app), target=target, sockets=[])
|
||||
thread.start()
|
||||
|
||||
assert thread.join() is True
|
||||
assert finished.is_set()
|
||||
assert thread.is_alive() is False
|
||||
|
||||
|
||||
def test_thread_join_timeout_returns_false_for_hung_thread() -> None:
|
||||
blocker = threading.Event()
|
||||
|
||||
def target(sockets: list[socket.socket] | None) -> None:
|
||||
blocker.wait()
|
||||
|
||||
thread = Thread(Config(app=app), target=target, sockets=[])
|
||||
thread.start()
|
||||
|
||||
try:
|
||||
assert thread.join(timeout=0.01) is False
|
||||
assert thread.is_alive() is True
|
||||
finally:
|
||||
blocker.set()
|
||||
assert thread.join(timeout=1) is True
|
||||
|
||||
|
||||
def test_multithread_init_terminate_join_and_restart(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
|
||||
|
||||
supervisor.init_threads()
|
||||
original_threads = list(supervisor.threads)
|
||||
|
||||
assert len(supervisor.threads) == 2
|
||||
assert all(thread.started for thread in supervisor.threads)
|
||||
|
||||
supervisor.terminate_all()
|
||||
assert all(thread.terminated for thread in original_threads)
|
||||
|
||||
supervisor.join_all()
|
||||
assert all(thread.joined for thread in original_threads)
|
||||
|
||||
supervisor.restart_all()
|
||||
assert len(supervisor.threads) == 2
|
||||
assert all(thread is not old for thread, old in zip(supervisor.threads, original_threads))
|
||||
assert all(thread.started for thread in supervisor.threads)
|
||||
|
||||
|
||||
def test_multithread_keep_subthread_alive_replaces_dead_thread(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
|
||||
supervisor.init_threads()
|
||||
|
||||
dead_thread = supervisor.threads[0]
|
||||
dead_thread.alive = False
|
||||
|
||||
supervisor.keep_subthread_alive()
|
||||
|
||||
assert supervisor.threads[0] is not dead_thread
|
||||
assert supervisor.threads[0].started is True
|
||||
|
||||
|
||||
def test_multithread_keep_subthread_alive_replaces_unhealthy_thread(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
|
||||
supervisor.init_threads()
|
||||
|
||||
unhealthy_thread = supervisor.threads[0]
|
||||
unhealthy_thread.healthy = False
|
||||
|
||||
supervisor.keep_subthread_alive()
|
||||
|
||||
assert supervisor.threads[0] is not unhealthy_thread
|
||||
assert supervisor.threads[0].started is True
|
||||
|
||||
|
||||
def test_multithread_keep_subthread_alive_skips_healthcheck_until_ready(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||
supervisor.init_threads()
|
||||
|
||||
thread = supervisor.threads[0]
|
||||
thread.healthy = False
|
||||
thread.ready_for_healthcheck = False
|
||||
|
||||
supervisor.keep_subthread_alive()
|
||||
|
||||
assert supervisor.threads[0] is thread
|
||||
|
||||
|
||||
def test_multithread_keep_subthread_alive_replaces_unhealthy_thread_without_blocking_join(
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||
supervisor.init_threads()
|
||||
|
||||
unhealthy_thread = supervisor.threads[0]
|
||||
unhealthy_thread.healthy = False
|
||||
unhealthy_thread.join_result = False
|
||||
|
||||
supervisor.keep_subthread_alive()
|
||||
|
||||
assert supervisor.threads[0] is not unhealthy_thread
|
||||
assert unhealthy_thread in supervisor.stale_threads
|
||||
|
||||
|
||||
def test_multithread_keep_subthread_alive_noop_when_exiting(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||
supervisor.init_threads()
|
||||
dead_thread = supervisor.threads[0]
|
||||
dead_thread.alive = False
|
||||
supervisor.should_exit.set()
|
||||
|
||||
supervisor.keep_subthread_alive()
|
||||
|
||||
assert supervisor.threads[0] is dead_thread
|
||||
|
||||
|
||||
def test_multithread_signal_handlers(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(Config(app=app, workers=2), target=lambda sockets: None, sockets=[])
|
||||
supervisor.init_threads()
|
||||
|
||||
supervisor.handle_ttin()
|
||||
assert len(supervisor.threads) == 3
|
||||
|
||||
removed_thread = supervisor.threads[-1]
|
||||
supervisor.handle_ttou()
|
||||
assert len(supervisor.threads) == 2
|
||||
assert removed_thread.terminated is True
|
||||
assert removed_thread.joined is True
|
||||
|
||||
supervisor.handle_ttou()
|
||||
supervisor.handle_ttou()
|
||||
assert len(supervisor.threads) == 1
|
||||
|
||||
original_threads = list(supervisor.threads)
|
||||
supervisor.handle_hup()
|
||||
assert len(supervisor.threads) == 1
|
||||
assert supervisor.threads[0] is not original_threads[0]
|
||||
|
||||
supervisor.handle_term()
|
||||
assert supervisor.should_exit.is_set()
|
||||
|
||||
|
||||
def test_multithread_join_all_uses_timeout_and_warns(mocker: MockerFixture, caplog: pytest.LogCaptureFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(
|
||||
Config(app=app, workers=1, timeout_worker_healthcheck=2, timeout_graceful_shutdown=3),
|
||||
target=lambda sockets: None,
|
||||
sockets=[],
|
||||
)
|
||||
supervisor.init_threads()
|
||||
thread = supervisor.threads[0]
|
||||
thread.join_result = False
|
||||
|
||||
supervisor.join_all()
|
||||
|
||||
assert thread.joined is True
|
||||
assert "Worker thread did not exit within 3.00 seconds." in caplog.records[-1].message
|
||||
|
||||
|
||||
def test_multithread_thread_shutdown_timeout_defaults_to_healthcheck(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(
|
||||
Config(app=app, workers=1, timeout_worker_healthcheck=7),
|
||||
target=lambda sockets: None,
|
||||
sockets=[],
|
||||
)
|
||||
|
||||
assert supervisor._thread_shutdown_timeout == 7.0
|
||||
|
||||
|
||||
@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK")
|
||||
def test_multithread_handle_break() -> None: # pragma: py-not-win32
|
||||
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||
supervisor.handle_break()
|
||||
assert supervisor.should_exit.is_set()
|
||||
|
||||
|
||||
def test_multithread_handle_signals_and_run(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.supervisors.multithread.Thread", FakeThread)
|
||||
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||
supervisor.signal_queue.extend([signal.SIGINT, signal.SIGUSR1])
|
||||
|
||||
supervisor.handle_signals()
|
||||
assert supervisor.should_exit.is_set()
|
||||
assert supervisor.signal_queue == []
|
||||
|
||||
supervisor = Multithread(Config(app=app, workers=1), target=lambda sockets: None, sockets=[])
|
||||
supervisor.signal_queue.append(signal.SIGINT)
|
||||
supervisor.run()
|
||||
|
||||
assert supervisor.should_exit.is_set()
|
||||
@ -15,7 +15,7 @@ import uvicorn
|
||||
from uvicorn.config import Config
|
||||
from uvicorn.main import main as cli
|
||||
from uvicorn.server import Server
|
||||
from uvicorn.supervisors import ChangeReload, Multiprocess, Multithread
|
||||
from uvicorn.supervisors import ChangeReload, Multiprocess
|
||||
|
||||
HEADERS = "Content-Security-Policy:default-src 'self'; script-src https://example.com"
|
||||
main = importlib.import_module("uvicorn.main")
|
||||
@ -101,19 +101,6 @@ def test_cli_call_multiprocess_run() -> None:
|
||||
mock_run.assert_called_once()
|
||||
|
||||
|
||||
def test_cli_call_multithread_run() -> None:
|
||||
runner = CliRunner()
|
||||
|
||||
with mock.patch("uvicorn.config.is_free_threaded_runtime", return_value=True):
|
||||
with mock.patch.object(Config, "bind_socket") as mock_bind_socket:
|
||||
with mock.patch.object(Multithread, "run") as mock_run:
|
||||
result = runner.invoke(cli, ["tests.test_cli:App", "--workers=2", "--worker-class=thread"])
|
||||
|
||||
assert result.exit_code == 0
|
||||
mock_bind_socket.assert_called_once()
|
||||
mock_run.assert_called_once()
|
||||
|
||||
|
||||
@pytest.fixture(params=(True, False))
|
||||
def uds_file(tmp_path: Path, request: pytest.FixtureRequest) -> Path: # pragma: py-win32
|
||||
file = tmp_path / "uvicorn.sock"
|
||||
@ -138,15 +125,6 @@ def test_cli_uds(uds_file: Path) -> None: # pragma: py-win32
|
||||
assert not uds_file.exists()
|
||||
|
||||
|
||||
def test_cli_thread_worker_class_requires_free_threaded_runtime() -> None:
|
||||
runner = CliRunner()
|
||||
|
||||
result = runner.invoke(cli, ["tests.test_cli:App", "--workers=2", "--worker-class=thread"])
|
||||
|
||||
assert result.exit_code == 1
|
||||
assert 'Worker class "thread" requires a free-threaded Python 3.14 runtime' in str(result.exception)
|
||||
|
||||
|
||||
def test_cli_incomplete_app_parameter() -> None:
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
@ -366,6 +366,35 @@ def test_log_config_yaml(
|
||||
mocked_logging_config_module.dictConfig.assert_called_once_with(logging_config)
|
||||
|
||||
|
||||
def test_log_config_yaml_missing_pyyaml(mocked_logging_config_module: MagicMock, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test that a helpful error is raised when PyYAML is not installed.
|
||||
"""
|
||||
mocker.patch.dict(sys.modules, {"yaml": None})
|
||||
with pytest.raises(ImportError, match=r"Install the PyYAML package or uvicorn\[standard\]"):
|
||||
Config(app=asgi_app, log_config="log_config.yaml")
|
||||
|
||||
|
||||
def test_log_config_pathlike(
|
||||
mocked_logging_config_module: MagicMock,
|
||||
logging_config: dict[str, Any],
|
||||
json_logging_config: str,
|
||||
mocker: MockerFixture,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""
|
||||
Test that one can pass a `os.PathLike` (e.g. `pathlib.Path`) as the log config path.
|
||||
"""
|
||||
path = tmp_path / "log_config.json"
|
||||
mocked_open = mocker.patch("uvicorn.config.open", mocker.mock_open(read_data=json_logging_config))
|
||||
|
||||
config = Config(app=asgi_app, log_config=path)
|
||||
config.load()
|
||||
|
||||
mocked_open.assert_called_once_with(os.fspath(path))
|
||||
mocked_logging_config_module.dictConfig.assert_called_once_with(logging_config)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("config_file", ["log_config.ini", configparser.ConfigParser(), io.StringIO()])
|
||||
def test_log_config_file(
|
||||
mocked_logging_config_module: MagicMock,
|
||||
@ -462,6 +491,13 @@ def test_config_log_effective_level(log_level: int, uvicorn_logger_level: int) -
|
||||
assert logging.getLogger("uvicorn.asgi").getEffectiveLevel() == effective_level
|
||||
|
||||
|
||||
@pytest.mark.parametrize("log_level", ["INFO", "Info", "info"])
|
||||
def test_config_log_level_case_insensitive(log_level: str) -> None:
|
||||
config = Config(app=asgi_app, log_level=log_level)
|
||||
config.load()
|
||||
assert logging.getLogger("uvicorn.error").level == logging.INFO
|
||||
|
||||
|
||||
def test_ws_max_size() -> None:
|
||||
config = Config(app=asgi_app, ws_max_size=1000)
|
||||
config.load()
|
||||
@ -517,6 +553,37 @@ def test_bind_fd_works_with_reload_or_workers(reload: bool, workers: int): # pr
|
||||
fdsock.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stdin_socket() -> Iterator[socket.socket]: # pragma: py-win32
|
||||
with closing(socket.socket(socket.AF_INET)) as sock:
|
||||
sock.bind(("127.0.0.1", 0))
|
||||
saved_stdin = os.dup(0)
|
||||
os.dup2(sock.fileno(), 0)
|
||||
try:
|
||||
yield sock
|
||||
finally:
|
||||
os.dup2(saved_stdin, 0)
|
||||
os.close(saved_stdin)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"reload, workers",
|
||||
[
|
||||
(True, 1),
|
||||
(False, 2),
|
||||
],
|
||||
ids=["--reload=True --workers=1", "--reload=False --workers=2"],
|
||||
)
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
|
||||
def test_bind_stdin_works_with_reload_or_workers(
|
||||
reload: bool, workers: int, stdin_socket: socket.socket
|
||||
): # pragma: py-win32
|
||||
config = Config(app=asgi_app, fd=0, reload=reload, workers=workers)
|
||||
config.load()
|
||||
with closing(config.bind_socket()) as sock:
|
||||
assert sock.getsockname() == stdin_socket.getsockname()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"reload, workers, expected",
|
||||
[
|
||||
@ -536,24 +603,12 @@ def test_config_use_subprocess(reload: bool, workers: int, expected: bool):
|
||||
assert config.use_subprocess == expected
|
||||
|
||||
|
||||
def test_config_use_subprocess_with_thread_worker_class(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.config.is_free_threaded_runtime", return_value=True)
|
||||
config = Config(app=asgi_app, workers=2, worker_class="thread")
|
||||
config.load()
|
||||
assert config.use_subprocess is False
|
||||
|
||||
|
||||
def test_warn_when_using_reload_and_workers(caplog: pytest.LogCaptureFixture) -> None:
|
||||
Config(app=asgi_app, reload=True, workers=2)
|
||||
assert len(caplog.records) == 1
|
||||
assert '"workers" flag is ignored when reloading is enabled.' in caplog.records[0].message
|
||||
|
||||
|
||||
def test_thread_worker_class_requires_free_threaded_runtime() -> None:
|
||||
with pytest.raises(ValueError, match='Worker class "thread" requires a free-threaded Python 3.14 runtime'):
|
||||
Config(app=asgi_app, worker_class="thread")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("loop_type", "expected_loop_factory"),
|
||||
[
|
||||
|
||||
@ -1,11 +1,12 @@
|
||||
import importlib
|
||||
import inspect
|
||||
import socket
|
||||
import sys
|
||||
from logging import WARNING
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
import uvicorn.server
|
||||
from tests.utils import run_server
|
||||
@ -13,7 +14,7 @@ from uvicorn import Server
|
||||
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
|
||||
from uvicorn.config import Config
|
||||
from uvicorn.main import run
|
||||
from uvicorn.supervisors import Multithread
|
||||
from uvicorn.supervisors import Multiprocess
|
||||
|
||||
pytestmark = pytest.mark.anyio
|
||||
|
||||
@ -87,20 +88,59 @@ def test_run_invalid_app_config_combination(caplog: pytest.LogCaptureFixture) ->
|
||||
)
|
||||
|
||||
|
||||
def test_run_invalid_thread_worker_class_config() -> None:
|
||||
with pytest.raises(ValueError, match='Worker class "thread" requires a free-threaded Python 3.14 runtime'):
|
||||
run("tests.test_main:app", workers=2, worker_class="thread")
|
||||
def test_run_fails_fast_in_parent_on_bad_app_path(
|
||||
caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Bad app path with `--workers > 1` exits in the parent.
|
||||
|
||||
Regression for https://github.com/encode/uvicorn/discussions/2440: without
|
||||
parent-side validation the supervisor restarts dying workers forever.
|
||||
"""
|
||||
|
||||
def fail(*args: object, **kwargs: object) -> None: # pragma: no cover
|
||||
pytest.fail("parent reached supervisor; should have exited on bad app path")
|
||||
|
||||
monkeypatch.setattr(Config, "bind_socket", fail)
|
||||
monkeypatch.setattr(Multiprocess, "run", fail)
|
||||
|
||||
with pytest.raises(SystemExit) as exit_exception:
|
||||
run("tests.test_main:nonexistent_attr", workers=2)
|
||||
assert exit_exception.value.code == 1
|
||||
assert any("Error loading ASGI app" in record.message for record in caplog.records)
|
||||
|
||||
|
||||
def test_run_multithread(mocker: MockerFixture) -> None:
|
||||
mocker.patch("uvicorn.config.is_free_threaded_runtime", return_value=True)
|
||||
mock_bind_socket = mocker.patch.object(Config, "bind_socket")
|
||||
mock_run = mocker.patch.object(Multithread, "run")
|
||||
def test_run_imports_app_before_starting_event_loop(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""`uvicorn.run()` imports the app before `Server.run` opens the event loop.
|
||||
|
||||
run("tests.test_main:app", workers=2, worker_class="thread")
|
||||
Regression for https://github.com/encode/uvicorn/issues/941: an app whose
|
||||
module body calls `asyncio.run(...)` crashes with "loop already running"
|
||||
if Uvicorn imports it inside the server's event loop. The parent must
|
||||
import the app synchronously, before `Server.run` enters `asyncio.run`.
|
||||
"""
|
||||
module = tmp_path / "eager_async_app.py"
|
||||
module.write_text(
|
||||
"import asyncio\n"
|
||||
"async def _build():\n"
|
||||
" async def app(scope, receive, send):\n"
|
||||
" pass\n"
|
||||
" return app\n"
|
||||
"app = asyncio.run(_build())\n"
|
||||
)
|
||||
monkeypatch.syspath_prepend(str(tmp_path))
|
||||
|
||||
mock_bind_socket.assert_called_once()
|
||||
mock_run.assert_called_once()
|
||||
imported_before_server_run: list[bool] = []
|
||||
|
||||
def tracking_run(self: Server, sockets: object = None) -> None:
|
||||
imported_before_server_run.append("eager_async_app" in sys.modules)
|
||||
self.started = True
|
||||
|
||||
monkeypatch.setattr(Server, "run", tracking_run)
|
||||
|
||||
# The import side effect (`eager_async_app` lands in `sys.modules`) must
|
||||
# happen before `Server.run`, which is where the event loop opens.
|
||||
run("eager_async_app:app")
|
||||
|
||||
assert imported_before_server_run == [True]
|
||||
|
||||
|
||||
def test_run_startup_failure(caplog: pytest.LogCaptureFixture) -> None:
|
||||
@ -119,7 +159,7 @@ def test_run_match_config_params() -> None:
|
||||
config_params = {
|
||||
key: repr(value)
|
||||
for key, value in inspect.signature(Config.__init__).parameters.items()
|
||||
if key not in ("self", "timeout_notify", "callback_notify", "callback_progress")
|
||||
if key not in ("self", "timeout_notify", "callback_notify")
|
||||
}
|
||||
run_params = {
|
||||
key: repr(value) for key, value in inspect.signature(run).parameters.items() if key not in ("app_dir",)
|
||||
|
||||
@ -15,12 +15,12 @@ import pytest
|
||||
|
||||
from tests.protocols.test_http import SIMPLE_GET_REQUEST
|
||||
from tests.utils import run_server
|
||||
from uvicorn import Server
|
||||
from uvicorn._types import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Scope
|
||||
from uvicorn.config import Config
|
||||
from uvicorn.protocols.http.flow_control import HIGH_WATER_LIMIT
|
||||
from uvicorn.protocols.http.h11_impl import H11Protocol
|
||||
from uvicorn.protocols.http.httptools_impl import HttpToolsProtocol
|
||||
from uvicorn.server import Server
|
||||
|
||||
pytestmark = pytest.mark.anyio
|
||||
|
||||
@ -142,23 +142,25 @@ async def test_limit_max_requests_jitter(
|
||||
config = Config(
|
||||
app=app, limit_max_requests=1, limit_max_requests_jitter=2, port=unused_tcp_port, http=http_protocol_cls
|
||||
)
|
||||
server = Server(config=config)
|
||||
limit = server.limit_max_requests
|
||||
assert limit is not None
|
||||
assert 1 <= limit <= 3
|
||||
task = asyncio.create_task(server.serve())
|
||||
while not server.started:
|
||||
await asyncio.sleep(0.01)
|
||||
async with httpx.AsyncClient() as client:
|
||||
for _ in range(limit + 1):
|
||||
await client.get(f"http://127.0.0.1:{unused_tcp_port}")
|
||||
await task
|
||||
async with run_server(config) as server:
|
||||
limit = server.limit_max_requests
|
||||
assert limit is not None
|
||||
assert 1 <= limit <= 3
|
||||
async with httpx.AsyncClient() as client:
|
||||
tasks = [client.get(f"http://127.0.0.1:{unused_tcp_port}") for _ in range(limit + 1)]
|
||||
await asyncio.gather(*tasks)
|
||||
assert f"Maximum request limit of {limit} exceeded. Terminating process." in caplog.text
|
||||
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def server(*, app: ASGIApplication, port: int, http_protocol_cls: type[H11Protocol | HttpToolsProtocol]):
|
||||
config = Config(app=app, port=port, loop="asyncio", http=http_protocol_cls)
|
||||
async def _raw_server(
|
||||
*,
|
||||
app: ASGIApplication,
|
||||
port: int,
|
||||
http_protocol_cls: type[H11Protocol | HttpToolsProtocol],
|
||||
reset_contextvars: bool = False,
|
||||
):
|
||||
config = Config(app=app, port=port, loop="asyncio", http=http_protocol_cls, reset_contextvars=reset_contextvars)
|
||||
server = Server(config=config)
|
||||
task = asyncio.create_task(server.serve())
|
||||
|
||||
@ -186,10 +188,36 @@ async def server(*, app: ASGIApplication, port: int, http_protocol_cls: type[H11
|
||||
await task
|
||||
|
||||
|
||||
async def test_no_contextvars_pollution_asyncio(
|
||||
async def test_contextvars_preserved_by_default(
|
||||
http_protocol_cls: type[H11Protocol | HttpToolsProtocol], unused_tcp_port: int
|
||||
):
|
||||
"""Non-regression test for https://github.com/encode/uvicorn/issues/2167."""
|
||||
"""By default, context set outside the ASGI task is visible inside it."""
|
||||
ctx: contextvars.ContextVar[str] = contextvars.ContextVar("ctx")
|
||||
ctx.set("outer-value")
|
||||
|
||||
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
|
||||
assert scope["type"] == "http"
|
||||
while True:
|
||||
message = await receive()
|
||||
assert message["type"] == "http.request"
|
||||
if not message["more_body"]:
|
||||
break
|
||||
body = json.dumps({"ctx": ctx.get("MISSING")}).encode("utf-8")
|
||||
headers = [(b"content-type", b"application/json"), (b"content-length", str(len(body)).encode("utf-8"))]
|
||||
await send({"type": "http.response.start", "status": 200, "headers": headers})
|
||||
await send({"type": "http.response.body", "body": body})
|
||||
|
||||
async with _raw_server(app=app, http_protocol_cls=http_protocol_cls, port=unused_tcp_port) as extract_json_body:
|
||||
assert await extract_json_body(SIMPLE_GET_REQUEST) == {"ctx": "outer-value"}
|
||||
|
||||
|
||||
async def test_reset_contextvars_asyncio(
|
||||
http_protocol_cls: type[H11Protocol | HttpToolsProtocol], unused_tcp_port: int
|
||||
):
|
||||
"""With reset_contextvars=True, each ASGI run starts with a fresh context.
|
||||
|
||||
Non-regression test for https://github.com/encode/uvicorn/issues/2167.
|
||||
"""
|
||||
default_contextvars = {c.name for c in contextvars.copy_context().keys()}
|
||||
ctx: contextvars.ContextVar[str] = contextvars.ContextVar("ctx")
|
||||
|
||||
@ -209,14 +237,13 @@ async def test_no_contextvars_pollution_asyncio(
|
||||
if not message["more_body"]:
|
||||
break
|
||||
|
||||
# return the initial context for empty assertion
|
||||
body = json.dumps(initial_context).encode("utf-8")
|
||||
headers = [(b"content-type", b"application/json"), (b"content-length", str(len(body)).encode("utf-8"))]
|
||||
await send({"type": "http.response.start", "status": 200, "headers": headers})
|
||||
await send({"type": "http.response.body", "body": body})
|
||||
|
||||
# body has to be larger than HIGH_WATER_LIMIT to trigger a reading pause on the main thread
|
||||
# and a resumption inside the ASGI task
|
||||
# body larger than HIGH_WATER_LIMIT forces a reading pause on the main thread
|
||||
# and a resumption inside the ASGI task, which is where the original pollution showed up.
|
||||
large_body = b"a" * (HIGH_WATER_LIMIT + 1)
|
||||
large_request = b"\r\n".join(
|
||||
[
|
||||
@ -229,6 +256,8 @@ async def test_no_contextvars_pollution_asyncio(
|
||||
]
|
||||
)
|
||||
|
||||
async with server(app=app, http_protocol_cls=http_protocol_cls, port=unused_tcp_port) as extract_json_body:
|
||||
async with _raw_server(
|
||||
app=app, http_protocol_cls=http_protocol_cls, port=unused_tcp_port, reset_contextvars=True
|
||||
) as extract_json_body:
|
||||
assert await extract_json_body(large_request) == {}
|
||||
assert await extract_json_body(SIMPLE_GET_REQUEST) == {}
|
||||
|
||||
@ -1,9 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import ssl
|
||||
from collections.abc import Callable
|
||||
from typing import TypeAlias
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from tests.utils import run_server
|
||||
from uvicorn.config import Config
|
||||
|
||||
DefaultFactory: TypeAlias = Callable[[], ssl.SSLContext]
|
||||
|
||||
|
||||
async def app(scope, receive, send):
|
||||
assert scope["type"] == "http"
|
||||
@ -92,3 +100,108 @@ async def test_run_password(
|
||||
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
|
||||
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
|
||||
assert response.status_code == 204
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_run_ssl_context_factory_default(
|
||||
tls_ca_ssl_context: ssl.SSLContext,
|
||||
tls_certificate_server_cert_path: str,
|
||||
tls_certificate_private_key_path: str,
|
||||
unused_tcp_port: int,
|
||||
) -> None:
|
||||
"""A factory that just delegates to the default factory should produce a working server."""
|
||||
|
||||
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
|
||||
return default_ssl_context_factory()
|
||||
|
||||
config = Config(
|
||||
app=app,
|
||||
loop="asyncio",
|
||||
limit_max_requests=1,
|
||||
ssl_keyfile=tls_certificate_private_key_path,
|
||||
ssl_certfile=tls_certificate_server_cert_path,
|
||||
ssl_context_factory=ssl_context_factory,
|
||||
port=unused_tcp_port,
|
||||
)
|
||||
async with run_server(config):
|
||||
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
|
||||
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
|
||||
assert response.status_code == 204
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_run_ssl_context_factory_custom(
|
||||
tls_ca_ssl_context: ssl.SSLContext,
|
||||
tls_certificate_server_cert_path: str,
|
||||
tls_certificate_private_key_path: str,
|
||||
unused_tcp_port: int,
|
||||
) -> None:
|
||||
"""A factory that builds its own SSLContext from scratch should work without ssl_keyfile/ssl_certfile."""
|
||||
|
||||
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
ctx.load_cert_chain(tls_certificate_server_cert_path, tls_certificate_private_key_path)
|
||||
return ctx
|
||||
|
||||
config = Config(
|
||||
app=app,
|
||||
loop="asyncio",
|
||||
limit_max_requests=1,
|
||||
ssl_context_factory=ssl_context_factory,
|
||||
port=unused_tcp_port,
|
||||
)
|
||||
async with run_server(config):
|
||||
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
|
||||
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
|
||||
assert response.status_code == 204
|
||||
|
||||
|
||||
def test_ssl_context_factory_mutates_default(
|
||||
tls_certificate_server_cert_path: str,
|
||||
tls_certificate_private_key_path: str,
|
||||
) -> None:
|
||||
"""The factory can call the default and mutate the result (e.g., bump TLS minimum version)."""
|
||||
|
||||
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
|
||||
ctx = default_ssl_context_factory()
|
||||
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
|
||||
return ctx
|
||||
|
||||
config = Config(
|
||||
app=app,
|
||||
ssl_keyfile=tls_certificate_private_key_path,
|
||||
ssl_certfile=tls_certificate_server_cert_path,
|
||||
ssl_context_factory=ssl_context_factory,
|
||||
)
|
||||
config.load()
|
||||
assert config.is_ssl
|
||||
assert isinstance(config.ssl, ssl.SSLContext)
|
||||
assert config.ssl.minimum_version == ssl.TLSVersion.TLSv1_3
|
||||
|
||||
|
||||
def test_default_ssl_context_factory_requires_ssl_certfile() -> None:
|
||||
"""Calling `default_ssl_context_factory()` without `ssl_certfile` raises a clear error."""
|
||||
|
||||
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
|
||||
return default_ssl_context_factory()
|
||||
|
||||
config = Config(app=app, ssl_context_factory=ssl_context_factory)
|
||||
with pytest.raises(RuntimeError, match="requires `ssl_certfile`"):
|
||||
config.load()
|
||||
|
||||
|
||||
def test_ssl_context_factory_must_return_ssl_context() -> None:
|
||||
def bad_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> object:
|
||||
return "not an SSLContext"
|
||||
|
||||
config = Config(app=app, ssl_context_factory=bad_factory) # type: ignore[arg-type]
|
||||
with pytest.raises(TypeError, match="must return an `ssl.SSLContext`"):
|
||||
config.load()
|
||||
|
||||
|
||||
def test_is_ssl_true_when_only_factory_set() -> None:
|
||||
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
|
||||
return ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) # pragma: no cover
|
||||
|
||||
config = Config(app=app, ssl_context_factory=ssl_context_factory)
|
||||
assert config.is_ssl is True
|
||||
|
||||
14
uv.lock
generated
14
uv.lock
generated
@ -1344,7 +1344,7 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "pytest"
|
||||
version = "9.0.2"
|
||||
version = "9.0.3"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "colorama", marker = "sys_platform == 'win32'" },
|
||||
@ -1355,9 +1355,9 @@ dependencies = [
|
||||
{ name = "pygments" },
|
||||
{ name = "tomli", marker = "python_full_version < '3.11'" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/7d/0d/549bd94f1a0a402dc8cf64563a117c0f3765662e2e668477624baeec44d5/pytest-9.0.3.tar.gz", hash = "sha256:b86ada508af81d19edeb213c681b1d48246c1a91d304c6c81a427674c17eb91c", size = 1572165, upload-time = "2026-04-07T17:16:18.027Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1757,11 +1757,11 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "2.6.3"
|
||||
version = "2.7.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1837,7 +1837,7 @@ dev = [
|
||||
{ name = "cryptography", specifier = ">=44.0.3" },
|
||||
{ name = "httpx", specifier = "==0.28.1" },
|
||||
{ name = "mypy", specifier = "==1.19.1" },
|
||||
{ name = "pytest", specifier = "==9.0.2" },
|
||||
{ name = "pytest", specifier = "==9.0.3" },
|
||||
{ name = "pytest-codspeed", specifier = ">=4.1.1" },
|
||||
{ name = "pytest-mock", specifier = "==3.15.1" },
|
||||
{ name = "pytest-xdist", extras = ["psutil"], specifier = "==3.8.0" },
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
from uvicorn.config import Config
|
||||
from uvicorn.main import Server, main, run
|
||||
|
||||
__version__ = "0.44.0"
|
||||
__version__ = "0.47.0"
|
||||
__all__ = ["main", "run", "Config", "Server"]
|
||||
|
||||
@ -9,7 +9,6 @@ import os
|
||||
import socket
|
||||
import ssl
|
||||
import sys
|
||||
import sysconfig
|
||||
from collections.abc import Awaitable, Callable
|
||||
from configparser import RawConfigParser
|
||||
from pathlib import Path
|
||||
@ -31,7 +30,6 @@ WSProtocolType = Literal["auto", "none", "websockets", "websockets-sansio", "wsp
|
||||
LifespanType = Literal["auto", "on", "off"]
|
||||
LoopFactoryType = Literal["none", "auto", "asyncio", "uvloop"]
|
||||
InterfaceType = Literal["auto", "asgi3", "asgi2", "wsgi"]
|
||||
WorkerClassType = Literal["process", "thread"]
|
||||
|
||||
LOG_LEVELS: dict[str, int] = {
|
||||
"critical": logging.CRITICAL,
|
||||
@ -65,7 +63,6 @@ LOOP_FACTORIES: dict[str, str | None] = {
|
||||
"uvloop": "uvicorn.loops.uvloop:uvloop_loop_factory",
|
||||
}
|
||||
INTERFACES: list[InterfaceType] = ["auto", "asgi3", "asgi2", "wsgi"]
|
||||
WORKER_CLASSES: list[WorkerClassType] = ["process", "thread"]
|
||||
|
||||
SSL_PROTOCOL_VERSION: int = ssl.PROTOCOL_TLS_SERVER
|
||||
|
||||
@ -178,21 +175,6 @@ def _normalize_dirs(dirs: list[str] | str | None) -> list[str]:
|
||||
return list(set(dirs))
|
||||
|
||||
|
||||
def is_free_threaded_runtime() -> bool:
|
||||
# CPython documents these as separate checks:
|
||||
# - `sysconfig.get_config_var("Py_GIL_DISABLED")` for whether the build supports free threading
|
||||
# - `sys._is_gil_enabled()` for whether the GIL is actually disabled in the running process
|
||||
# https://docs.python.org/3/howto/free-threading-python.html#identifying-free-threaded-python
|
||||
if sys.version_info < (3, 14):
|
||||
return False
|
||||
if sysconfig.get_config_var("Py_GIL_DISABLED") != 1:
|
||||
return False
|
||||
is_gil_enabled = getattr(sys, "_is_gil_enabled", None)
|
||||
if is_gil_enabled is None:
|
||||
return True
|
||||
return not is_gil_enabled()
|
||||
|
||||
|
||||
class Config:
|
||||
def __init__(
|
||||
self,
|
||||
@ -211,7 +193,7 @@ class Config:
|
||||
ws_per_message_deflate: bool = True,
|
||||
lifespan: LifespanType = "auto",
|
||||
env_file: str | os.PathLike[str] | None = None,
|
||||
log_config: dict[str, Any] | str | RawConfigParser | IO[Any] | None = LOGGING_CONFIG,
|
||||
log_config: dict[str, Any] | str | os.PathLike[str] | RawConfigParser | IO[Any] | None = LOGGING_CONFIG,
|
||||
log_level: str | int | None = None,
|
||||
access_log: bool = True,
|
||||
use_colors: bool | None = None,
|
||||
@ -222,7 +204,6 @@ class Config:
|
||||
reload_includes: list[str] | str | None = None,
|
||||
reload_excludes: list[str] | str | None = None,
|
||||
workers: int | None = None,
|
||||
worker_class: WorkerClassType = "process",
|
||||
proxy_headers: bool = True,
|
||||
server_header: bool = True,
|
||||
date_header: bool = True,
|
||||
@ -237,7 +218,6 @@ class Config:
|
||||
timeout_graceful_shutdown: int | None = None,
|
||||
timeout_worker_healthcheck: int = 5,
|
||||
callback_notify: Callable[..., Awaitable[None]] | None = None,
|
||||
callback_progress: Callable[[], None] | None = None,
|
||||
ssl_keyfile: str | os.PathLike[str] | None = None,
|
||||
ssl_certfile: str | os.PathLike[str] | None = None,
|
||||
ssl_keyfile_password: str | None = None,
|
||||
@ -245,9 +225,11 @@ class Config:
|
||||
ssl_cert_reqs: int = ssl.CERT_NONE,
|
||||
ssl_ca_certs: str | os.PathLike[str] | None = None,
|
||||
ssl_ciphers: str = "TLSv1",
|
||||
ssl_context_factory: Callable[[Config, Callable[[], ssl.SSLContext]], ssl.SSLContext] | None = None,
|
||||
headers: list[tuple[str, str]] | None = None,
|
||||
factory: bool = False,
|
||||
h11_max_incomplete_event_size: int | None = None,
|
||||
reset_contextvars: bool = False,
|
||||
):
|
||||
self.app = app
|
||||
self.host = host
|
||||
@ -271,7 +253,6 @@ class Config:
|
||||
self.reload = reload
|
||||
self.reload_delay = reload_delay
|
||||
self.workers = workers or 1
|
||||
self.worker_class = worker_class
|
||||
self.proxy_headers = proxy_headers
|
||||
self.server_header = server_header
|
||||
self.date_header = date_header
|
||||
@ -285,7 +266,6 @@ class Config:
|
||||
self.timeout_graceful_shutdown = timeout_graceful_shutdown
|
||||
self.timeout_worker_healthcheck = timeout_worker_healthcheck
|
||||
self.callback_notify = callback_notify
|
||||
self.callback_progress = callback_progress
|
||||
self.ssl_keyfile = ssl_keyfile
|
||||
self.ssl_certfile = ssl_certfile
|
||||
self.ssl_keyfile_password = ssl_keyfile_password
|
||||
@ -293,10 +273,12 @@ class Config:
|
||||
self.ssl_cert_reqs = ssl_cert_reqs
|
||||
self.ssl_ca_certs = ssl_ca_certs
|
||||
self.ssl_ciphers = ssl_ciphers
|
||||
self.ssl_context_factory = ssl_context_factory
|
||||
self.headers: list[tuple[str, str]] = headers or []
|
||||
self.encoded_headers: list[tuple[bytes, bytes]] = []
|
||||
self.factory = factory
|
||||
self.h11_max_incomplete_event_size = h11_max_incomplete_event_size
|
||||
self.reset_contextvars = reset_contextvars
|
||||
|
||||
self.loaded = False
|
||||
self.configure_logging()
|
||||
@ -366,11 +348,6 @@ class Config:
|
||||
if self.reload and self.workers > 1:
|
||||
logger.warning('"workers" flag is ignored when reloading is enabled.')
|
||||
|
||||
if self.worker_class == "thread" and not is_free_threaded_runtime():
|
||||
raise ValueError(
|
||||
'Worker class "thread" requires a free-threaded Python 3.14 runtime (Py_GIL_DISABLED=1, GIL off).'
|
||||
)
|
||||
|
||||
@property
|
||||
def asgi_version(self) -> Literal["2.0", "3.0"]:
|
||||
mapping: dict[str, Literal["2.0", "3.0"]] = {
|
||||
@ -382,16 +359,19 @@ class Config:
|
||||
|
||||
@property
|
||||
def is_ssl(self) -> bool:
|
||||
return bool(self.ssl_keyfile or self.ssl_certfile)
|
||||
return bool(self.ssl_keyfile or self.ssl_certfile or self.ssl_context_factory)
|
||||
|
||||
@property
|
||||
def use_subprocess(self) -> bool:
|
||||
return bool(self.reload or (self.workers > 1 and self.worker_class == "process"))
|
||||
return bool(self.reload or self.workers > 1)
|
||||
|
||||
def configure_logging(self) -> None:
|
||||
logging.addLevelName(TRACE_LOG_LEVEL, "TRACE")
|
||||
|
||||
if self.log_config is not None:
|
||||
if isinstance(self.log_config, os.PathLike):
|
||||
self.log_config = os.fspath(self.log_config)
|
||||
|
||||
if isinstance(self.log_config, dict):
|
||||
if self.use_colors in (True, False):
|
||||
self.log_config["formatters"]["default"]["use_colors"] = self.use_colors
|
||||
@ -402,9 +382,12 @@ class Config:
|
||||
loaded_config = json.load(file)
|
||||
logging.config.dictConfig(loaded_config)
|
||||
elif isinstance(self.log_config, str) and self.log_config.endswith((".yaml", ".yml")):
|
||||
# Install the PyYAML package or the uvicorn[standard] optional
|
||||
# dependencies to enable this functionality.
|
||||
import yaml
|
||||
try:
|
||||
import yaml
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
"Install the PyYAML package or uvicorn[standard] to use `--log-config` with YAML files."
|
||||
) from e
|
||||
|
||||
with open(self.log_config) as file:
|
||||
loaded_config = yaml.safe_load(file)
|
||||
@ -416,7 +399,7 @@ class Config:
|
||||
|
||||
if self.log_level is not None:
|
||||
if isinstance(self.log_level, str):
|
||||
log_level = LOG_LEVELS[self.log_level]
|
||||
log_level = LOG_LEVELS[self.log_level.lower()]
|
||||
else:
|
||||
log_level = self.log_level
|
||||
logging.getLogger("uvicorn.error").setLevel(log_level)
|
||||
@ -426,12 +409,43 @@ class Config:
|
||||
logging.getLogger("uvicorn.access").handlers = []
|
||||
logging.getLogger("uvicorn.access").propagate = False
|
||||
|
||||
def load_app(self) -> Any:
|
||||
"""Import the app and return it. Exits on failure."""
|
||||
try:
|
||||
return import_from_string(self.app)
|
||||
except ImportFromStringError as exc:
|
||||
logger.error("Error loading ASGI app. %s" % exc)
|
||||
sys.exit(1)
|
||||
|
||||
def load(self) -> None:
|
||||
assert not self.loaded
|
||||
|
||||
if self.is_ssl:
|
||||
if self.ssl_context_factory is not None:
|
||||
|
||||
def default_factory() -> ssl.SSLContext:
|
||||
if not self.ssl_certfile:
|
||||
raise RuntimeError(
|
||||
"`default_ssl_context_factory()` requires `ssl_certfile` to be set on `Config`. "
|
||||
"Either pass `ssl_certfile` (and optionally `ssl_keyfile`) or build the `SSLContext` "
|
||||
"directly inside `ssl_context_factory` without calling the default factory."
|
||||
)
|
||||
return create_ssl_context(
|
||||
keyfile=self.ssl_keyfile,
|
||||
certfile=self.ssl_certfile,
|
||||
password=self.ssl_keyfile_password,
|
||||
ssl_version=self.ssl_version,
|
||||
cert_reqs=self.ssl_cert_reqs,
|
||||
ca_certs=self.ssl_ca_certs,
|
||||
ciphers=self.ssl_ciphers,
|
||||
)
|
||||
|
||||
context = self.ssl_context_factory(self, default_factory)
|
||||
if not isinstance(context, ssl.SSLContext):
|
||||
raise TypeError(f"`ssl_context_factory` must return an `ssl.SSLContext`, got {type(context).__name__}")
|
||||
self.ssl: ssl.SSLContext | None = context
|
||||
elif self.is_ssl:
|
||||
assert self.ssl_certfile
|
||||
self.ssl: ssl.SSLContext | None = create_ssl_context(
|
||||
self.ssl = create_ssl_context(
|
||||
keyfile=self.ssl_keyfile,
|
||||
certfile=self.ssl_certfile,
|
||||
password=self.ssl_keyfile_password,
|
||||
@ -464,11 +478,7 @@ class Config:
|
||||
|
||||
self.lifespan_class = import_from_string(LIFESPAN[self.lifespan])
|
||||
|
||||
try:
|
||||
self.loaded_app = import_from_string(self.app)
|
||||
except ImportFromStringError as exc:
|
||||
logger.error("Error loading ASGI app. %s" % exc)
|
||||
sys.exit(1)
|
||||
self.loaded_app = self.load_app()
|
||||
|
||||
try:
|
||||
self.loaded_app = self.loaded_app()
|
||||
@ -527,7 +537,7 @@ class Config:
|
||||
|
||||
def bind_socket(self) -> socket.socket:
|
||||
logger_args: list[str | int]
|
||||
if self.uds: # pragma: py-win32
|
||||
if self.uds is not None: # pragma: py-win32
|
||||
path = self.uds
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
try:
|
||||
@ -542,7 +552,7 @@ class Config:
|
||||
sock_name_format = "%s"
|
||||
color_message = "Uvicorn running on " + click.style(sock_name_format, bold=True) + " (Press CTRL+C to quit)"
|
||||
logger_args = [self.uds]
|
||||
elif self.fd: # pragma: py-win32
|
||||
elif self.fd is not None: # pragma: py-win32
|
||||
sock = socket.fromfd(self.fd, socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
message = "Uvicorn running on socket %s (Press CTRL+C to quit)"
|
||||
fd_name_format = "%s"
|
||||
|
||||
@ -21,22 +21,19 @@ from uvicorn.config import (
|
||||
LOG_LEVELS,
|
||||
LOGGING_CONFIG,
|
||||
SSL_PROTOCOL_VERSION,
|
||||
WORKER_CLASSES,
|
||||
Config,
|
||||
HTTPProtocolType,
|
||||
InterfaceType,
|
||||
LifespanType,
|
||||
LoopFactoryType,
|
||||
WorkerClassType,
|
||||
WSProtocolType,
|
||||
)
|
||||
from uvicorn.server import Server
|
||||
from uvicorn.supervisors import ChangeReload, Multiprocess, Multithread
|
||||
from uvicorn.supervisors import ChangeReload, Multiprocess
|
||||
|
||||
LEVEL_CHOICES = click.Choice(list(LOG_LEVELS.keys()))
|
||||
LIFESPAN_CHOICES = click.Choice(list(LIFESPAN.keys()))
|
||||
INTERFACE_CHOICES = click.Choice(INTERFACES)
|
||||
WORKER_CLASS_CHOICES = click.Choice(WORKER_CLASSES)
|
||||
|
||||
|
||||
def _metavar_from_type(_type: Any) -> str:
|
||||
@ -116,16 +113,9 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
|
||||
"--workers",
|
||||
default=None,
|
||||
type=int,
|
||||
help="Number of worker instances. Defaults to the $WEB_CONCURRENCY environment"
|
||||
help="Number of worker processes. Defaults to the $WEB_CONCURRENCY environment"
|
||||
" variable if available, or 1. Not valid with --reload.",
|
||||
)
|
||||
@click.option(
|
||||
"--worker-class",
|
||||
type=WORKER_CLASS_CHOICES,
|
||||
default="process",
|
||||
help="Worker implementation to use when running multiple workers.",
|
||||
show_default=True,
|
||||
)
|
||||
@click.option(
|
||||
"--loop",
|
||||
type=str,
|
||||
@ -382,6 +372,13 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
|
||||
default=None,
|
||||
help="For h11, the maximum number of bytes to buffer of an incomplete event.",
|
||||
)
|
||||
@click.option(
|
||||
"--reset-contextvars",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Run each ASGI request in a fresh contextvars.Context. Hides context set in the lifespan.",
|
||||
show_default=True,
|
||||
)
|
||||
@click.option(
|
||||
"--factory",
|
||||
is_flag=True,
|
||||
@ -411,7 +408,6 @@ def main(
|
||||
reload_excludes: list[str],
|
||||
reload_delay: float,
|
||||
workers: int,
|
||||
worker_class: WorkerClassType,
|
||||
env_file: str,
|
||||
log_config: str,
|
||||
log_level: str,
|
||||
@ -439,6 +435,7 @@ def main(
|
||||
use_colors: bool,
|
||||
app_dir: str,
|
||||
h11_max_incomplete_event_size: int | None,
|
||||
reset_contextvars: bool,
|
||||
factory: bool,
|
||||
) -> None:
|
||||
run(
|
||||
@ -467,7 +464,6 @@ def main(
|
||||
reload_excludes=reload_excludes or None,
|
||||
reload_delay=reload_delay,
|
||||
workers=workers,
|
||||
worker_class=worker_class,
|
||||
proxy_headers=proxy_headers,
|
||||
server_header=server_header,
|
||||
date_header=date_header,
|
||||
@ -492,6 +488,7 @@ def main(
|
||||
factory=factory,
|
||||
app_dir=app_dir,
|
||||
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
|
||||
reset_contextvars=reset_contextvars,
|
||||
)
|
||||
|
||||
|
||||
@ -518,9 +515,8 @@ def run(
|
||||
reload_excludes: list[str] | str | None = None,
|
||||
reload_delay: float = 0.25,
|
||||
workers: int | None = None,
|
||||
worker_class: WorkerClassType = "process",
|
||||
env_file: str | os.PathLike[str] | None = None,
|
||||
log_config: dict[str, Any] | str | RawConfigParser | IO[Any] | None = LOGGING_CONFIG,
|
||||
log_config: dict[str, Any] | str | os.PathLike[str] | RawConfigParser | IO[Any] | None = LOGGING_CONFIG,
|
||||
log_level: str | int | None = None,
|
||||
access_log: bool = True,
|
||||
proxy_headers: bool = True,
|
||||
@ -542,11 +538,13 @@ def run(
|
||||
ssl_cert_reqs: int = ssl.CERT_NONE,
|
||||
ssl_ca_certs: str | os.PathLike[str] | None = None,
|
||||
ssl_ciphers: str = "TLSv1",
|
||||
ssl_context_factory: Callable[[Config, Callable[[], ssl.SSLContext]], ssl.SSLContext] | None = None,
|
||||
headers: list[tuple[str, str]] | None = None,
|
||||
use_colors: bool | None = None,
|
||||
app_dir: str | None = None,
|
||||
factory: bool = False,
|
||||
h11_max_incomplete_event_size: int | None = None,
|
||||
reset_contextvars: bool = False,
|
||||
) -> None:
|
||||
if app_dir is not None:
|
||||
sys.path.insert(0, app_dir)
|
||||
@ -573,7 +571,6 @@ def run(
|
||||
reload_excludes=reload_excludes,
|
||||
reload_delay=reload_delay,
|
||||
workers=workers,
|
||||
worker_class=worker_class,
|
||||
env_file=env_file,
|
||||
log_config=log_config,
|
||||
log_level=log_level,
|
||||
@ -597,32 +594,32 @@ def run(
|
||||
ssl_cert_reqs=ssl_cert_reqs,
|
||||
ssl_ca_certs=ssl_ca_certs,
|
||||
ssl_ciphers=ssl_ciphers,
|
||||
ssl_context_factory=ssl_context_factory,
|
||||
headers=headers,
|
||||
use_colors=use_colors,
|
||||
factory=factory,
|
||||
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
|
||||
reset_contextvars=reset_contextvars,
|
||||
)
|
||||
server = Server(config=config)
|
||||
|
||||
if (config.reload or config.workers > 1) and not isinstance(app, str):
|
||||
logger = logging.getLogger("uvicorn.error")
|
||||
logger.warning("You must pass the application as an import string to enable 'reload' or 'workers'.")
|
||||
sys.exit(1)
|
||||
|
||||
config.load_app()
|
||||
server = Server(config=config)
|
||||
|
||||
try:
|
||||
if config.should_reload:
|
||||
sock = config.bind_socket()
|
||||
ChangeReload(config, target=server.run, sockets=[sock]).run()
|
||||
elif config.workers > 1:
|
||||
sock = config.bind_socket()
|
||||
if config.worker_class == "process":
|
||||
Multiprocess(config, target=server.run, sockets=[sock]).run()
|
||||
else:
|
||||
Multithread(config, target=server.run, sockets=[sock]).run()
|
||||
Multiprocess(config, target=server.run, sockets=[sock]).run()
|
||||
else:
|
||||
server.run()
|
||||
except KeyboardInterrupt:
|
||||
pass # pragma: full coverage
|
||||
except KeyboardInterrupt: # pragma: full coverage
|
||||
pass
|
||||
finally:
|
||||
if config.uds and os.path.exists(config.uds):
|
||||
os.remove(config.uds) # pragma: py-win32
|
||||
|
||||
@ -45,16 +45,12 @@ class ProxyHeadersMiddleware:
|
||||
|
||||
if b"x-forwarded-for" in headers:
|
||||
x_forwarded_for = headers[b"x-forwarded-for"].decode("latin1")
|
||||
host = self.trusted_hosts.get_trusted_client_host(x_forwarded_for)
|
||||
host, port = self.trusted_hosts.get_trusted_client_address(x_forwarded_for)
|
||||
|
||||
if host:
|
||||
# If the x-forwarded-for header is empty then host is an empty string.
|
||||
# Only set the client if we actually got something usable.
|
||||
# See: https://github.com/Kludex/uvicorn/issues/1068
|
||||
|
||||
# We've lost the connecting client's port information by now,
|
||||
# so only include the host.
|
||||
port = 0
|
||||
scope["client"] = (host, port)
|
||||
|
||||
return await self.app(scope, receive, send)
|
||||
@ -64,6 +60,41 @@ def _parse_raw_hosts(value: str) -> list[str]:
|
||||
return [item.strip() for item in value.split(",")]
|
||||
|
||||
|
||||
def _parse_host_port(value: str) -> tuple[str, int]:
|
||||
"""Parse a forwarded host value into host and optional port.
|
||||
|
||||
Accepts bare IPs, IPv4 `host:port`, and bracketed IPv6 `[host]:port`.
|
||||
Any unrecognized or malformed value is treated conservatively and returned
|
||||
without a port so trust checks do not silently normalize arbitrary input.
|
||||
"""
|
||||
|
||||
if value.startswith("["):
|
||||
bracket_end = value.find("]")
|
||||
if bracket_end == -1:
|
||||
return value, 0
|
||||
|
||||
host = value[1:bracket_end]
|
||||
remainder = value[bracket_end + 1 :]
|
||||
if not remainder:
|
||||
return host, 0
|
||||
if not remainder.startswith(":"):
|
||||
return value, 0
|
||||
|
||||
try:
|
||||
return host, int(remainder[1:])
|
||||
except ValueError:
|
||||
return host, 0
|
||||
|
||||
if value.count(":") == 1:
|
||||
host, port = value.rsplit(":", 1)
|
||||
try:
|
||||
return host, int(port)
|
||||
except ValueError:
|
||||
return value, 0
|
||||
|
||||
return value, 0
|
||||
|
||||
|
||||
class _TrustedHosts:
|
||||
"""Container for trusted hosts and networks"""
|
||||
|
||||
@ -122,21 +153,22 @@ class _TrustedHosts:
|
||||
except ValueError:
|
||||
return host in self.trusted_literals
|
||||
|
||||
def get_trusted_client_host(self, x_forwarded_for: str) -> str:
|
||||
"""Extract the client host from x_forwarded_for header
|
||||
def get_trusted_client_address(self, x_forwarded_for: str) -> tuple[str, int]:
|
||||
"""Extract the client address from x_forwarded_for header.
|
||||
|
||||
In general this is the first "untrusted" host in the forwarded for list.
|
||||
"""
|
||||
x_forwarded_for_hosts = _parse_raw_hosts(x_forwarded_for)
|
||||
|
||||
if self.always_trust:
|
||||
return x_forwarded_for_hosts[0]
|
||||
return _parse_host_port(x_forwarded_for_hosts[0])
|
||||
|
||||
# Note: each proxy appends to the header list so check it in reverse order
|
||||
for host in reversed(x_forwarded_for_hosts):
|
||||
for host_port in reversed(x_forwarded_for_hosts):
|
||||
host, port = _parse_host_port(host_port)
|
||||
if host not in self:
|
||||
return host
|
||||
return host, port
|
||||
|
||||
# All hosts are trusted meaning that the client was also a trusted proxy
|
||||
# See https://github.com/Kludex/uvicorn/issues/1068#issuecomment-855371576
|
||||
return x_forwarded_for_hosts[0]
|
||||
return _parse_host_port(x_forwarded_for_hosts[0])
|
||||
|
||||
@ -250,13 +250,16 @@ class H11Protocol(asyncio.Protocol):
|
||||
message_event=asyncio.Event(),
|
||||
on_response=self.on_response_complete,
|
||||
)
|
||||
# For the asyncio loop, we need to explicitly start with an empty context
|
||||
# as it can be polluted from previous ASGI runs.
|
||||
# See https://github.com/python/cpython/issues/140947 for details.
|
||||
if sys.version_info >= (3, 11): # pragma: py-lt-311
|
||||
task = self.loop.create_task(self.cycle.run_asgi(app), context=contextvars.Context())
|
||||
else: # pragma: py-gte-311
|
||||
task = contextvars.Context().run(self.loop.create_task, self.cycle.run_asgi(app))
|
||||
if self.config.reset_contextvars:
|
||||
# Opt-in workaround for https://github.com/python/cpython/issues/140947:
|
||||
# asyncio can leak context vars between tasks. Hides context set in the
|
||||
# lifespan or by external instrumentation.
|
||||
if sys.version_info >= (3, 11): # pragma: py-lt-311
|
||||
task = self.loop.create_task(self.cycle.run_asgi(app), context=contextvars.Context())
|
||||
else: # pragma: py-gte-311
|
||||
task = contextvars.Context().run(self.loop.create_task, self.cycle.run_asgi(app))
|
||||
else:
|
||||
task = self.loop.create_task(self.cycle.run_asgi(app))
|
||||
task.add_done_callback(self.tasks.discard)
|
||||
self.tasks.add(task)
|
||||
|
||||
@ -344,8 +347,6 @@ class H11Protocol(asyncio.Protocol):
|
||||
self.transport.close()
|
||||
else:
|
||||
self.cycle.keep_alive = False
|
||||
self.cycle.shutting_down = True
|
||||
self.cycle.message_event.set()
|
||||
|
||||
def pause_writing(self) -> None:
|
||||
"""
|
||||
@ -399,7 +400,6 @@ class RequestResponseCycle:
|
||||
self.disconnected = False
|
||||
self.keep_alive = True
|
||||
self.waiting_for_100_continue = conn.they_are_waiting_for_100_continue
|
||||
self.shutting_down = False
|
||||
|
||||
# Request state
|
||||
self.body = bytearray()
|
||||
@ -432,9 +432,8 @@ class RequestResponseCycle:
|
||||
self.logger.error(msg)
|
||||
await self.send_500_response()
|
||||
elif not self.response_complete and not self.disconnected:
|
||||
if not self.shutting_down:
|
||||
msg = "ASGI callable returned without completing response."
|
||||
self.logger.error(msg)
|
||||
msg = "ASGI callable returned without completing response."
|
||||
self.logger.error(msg)
|
||||
self.transport.close()
|
||||
finally:
|
||||
self.on_response = lambda: None
|
||||
@ -532,12 +531,12 @@ class RequestResponseCycle:
|
||||
self.transport.write(output)
|
||||
self.waiting_for_100_continue = False
|
||||
|
||||
if not self.disconnected and not self.response_complete and not self.shutting_down:
|
||||
if not self.disconnected and not self.response_complete:
|
||||
self.flow.resume_reading()
|
||||
await self.message_event.wait()
|
||||
self.message_event.clear()
|
||||
|
||||
if self.disconnected or self.response_complete or self.shutting_down:
|
||||
if self.disconnected or self.response_complete:
|
||||
return {"type": "http.disconnect"}
|
||||
|
||||
message: HTTPRequestEvent = {"type": "http.request", "body": bytes(self.body), "more_body": self.more_body}
|
||||
|
||||
@ -289,20 +289,26 @@ class HttpToolsProtocol(asyncio.Protocol):
|
||||
)
|
||||
if existing_cycle is None or existing_cycle.response_complete:
|
||||
# Standard case - start processing the request.
|
||||
# For the asyncio loop, we need to explicitly start with an empty context
|
||||
# as it can be polluted from previous ASGI runs.
|
||||
# See https://github.com/python/cpython/issues/140947 for details.
|
||||
if sys.version_info >= (3, 11): # pragma: py-lt-311
|
||||
task = self.loop.create_task(self.cycle.run_asgi(app), context=contextvars.Context())
|
||||
else: # pragma: py-gte-311
|
||||
task = contextvars.Context().run(self.loop.create_task, self.cycle.run_asgi(app))
|
||||
task.add_done_callback(self.tasks.discard)
|
||||
self.tasks.add(task)
|
||||
self._start_asgi_task(self.cycle, app)
|
||||
else:
|
||||
# Pipelined HTTP requests need to be queued up.
|
||||
self.flow.pause_reading()
|
||||
self.pipeline.appendleft((self.cycle, app))
|
||||
|
||||
def _start_asgi_task(self, cycle: RequestResponseCycle, app: ASGI3Application) -> None:
|
||||
if self.config.reset_contextvars:
|
||||
# Opt-in workaround for https://github.com/python/cpython/issues/140947:
|
||||
# asyncio can leak context vars between tasks. Hides context set in the
|
||||
# lifespan or by external instrumentation.
|
||||
if sys.version_info >= (3, 11): # pragma: py-lt-311
|
||||
task = self.loop.create_task(cycle.run_asgi(app), context=contextvars.Context())
|
||||
else: # pragma: py-gte-311
|
||||
task = contextvars.Context().run(self.loop.create_task, cycle.run_asgi(app))
|
||||
else:
|
||||
task = self.loop.create_task(cycle.run_asgi(app))
|
||||
task.add_done_callback(self.tasks.discard)
|
||||
self.tasks.add(task)
|
||||
|
||||
def on_body(self, body: bytes) -> None:
|
||||
if (self.parser.should_upgrade() and self._should_upgrade()) or self.cycle.response_complete:
|
||||
return
|
||||
@ -333,9 +339,7 @@ class HttpToolsProtocol(asyncio.Protocol):
|
||||
# Keep-Alive timeout instead.
|
||||
if self.pipeline:
|
||||
cycle, app = self.pipeline.pop()
|
||||
task = self.loop.create_task(cycle.run_asgi(app))
|
||||
task.add_done_callback(self.tasks.discard)
|
||||
self.tasks.add(task)
|
||||
self._start_asgi_task(cycle, app)
|
||||
else:
|
||||
self.timeout_keep_alive_task = self.loop.call_later(
|
||||
self.timeout_keep_alive, self.timeout_keep_alive_handler
|
||||
@ -349,8 +353,6 @@ class HttpToolsProtocol(asyncio.Protocol):
|
||||
self.transport.close()
|
||||
else:
|
||||
self.cycle.keep_alive = False
|
||||
self.cycle.shutting_down = True
|
||||
self.cycle.message_event.set()
|
||||
|
||||
def pause_writing(self) -> None:
|
||||
"""
|
||||
@ -402,7 +404,6 @@ class RequestResponseCycle:
|
||||
self.disconnected = False
|
||||
self.keep_alive = keep_alive
|
||||
self.waiting_for_100_continue = expect_100_continue
|
||||
self.shutting_down = False
|
||||
|
||||
# Request state
|
||||
self.body = bytearray()
|
||||
@ -437,9 +438,8 @@ class RequestResponseCycle:
|
||||
self.logger.error(msg)
|
||||
await self.send_500_response()
|
||||
elif not self.response_complete and not self.disconnected:
|
||||
if not self.shutting_down:
|
||||
msg = "ASGI callable returned without completing response."
|
||||
self.logger.error(msg)
|
||||
msg = "ASGI callable returned without completing response."
|
||||
self.logger.error(msg)
|
||||
self.transport.close()
|
||||
finally:
|
||||
self.on_response = lambda: None
|
||||
@ -564,12 +564,12 @@ class RequestResponseCycle:
|
||||
self.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n")
|
||||
self.waiting_for_100_continue = False
|
||||
|
||||
if not self.disconnected and not self.response_complete and not self.shutting_down:
|
||||
if not self.disconnected and not self.response_complete:
|
||||
self.flow.resume_reading()
|
||||
await self.message_event.wait()
|
||||
self.message_event.clear()
|
||||
|
||||
if self.disconnected or self.response_complete or self.shutting_down:
|
||||
if self.disconnected or self.response_complete:
|
||||
return {"type": "http.disconnect"}
|
||||
message: HTTPRequestEvent = {"type": "http.request", "body": bytes(self.body), "more_body": self.more_body}
|
||||
self.body = bytearray()
|
||||
|
||||
@ -105,7 +105,7 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
|
||||
self.last_ping_rtt: float = 0.0
|
||||
|
||||
# Buffers
|
||||
self.bytes = b""
|
||||
self.bytes = bytearray()
|
||||
|
||||
def connection_made(self, transport: BaseTransport) -> None:
|
||||
"""Called when a connection is made."""
|
||||
@ -216,19 +216,19 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
|
||||
task.add_done_callback(self.on_task_complete)
|
||||
self.tasks.add(task)
|
||||
|
||||
def handle_cont(self, event: Frame) -> None: # pragma: no cover
|
||||
self.bytes += event.data
|
||||
def handle_cont(self, event: Frame) -> None:
|
||||
self.bytes.extend(event.data)
|
||||
if event.fin:
|
||||
self.send_receive_event_to_app()
|
||||
|
||||
def handle_text(self, event: Frame) -> None:
|
||||
self.bytes = event.data
|
||||
self.bytes = bytearray(event.data)
|
||||
self.curr_msg_data_type: Literal["text", "bytes"] = "text"
|
||||
if event.fin:
|
||||
self.send_receive_event_to_app()
|
||||
|
||||
def handle_bytes(self, event: Frame) -> None:
|
||||
self.bytes = event.data
|
||||
self.bytes = bytearray(event.data)
|
||||
self.curr_msg_data_type = "bytes"
|
||||
if event.fin:
|
||||
self.send_receive_event_to_app()
|
||||
@ -243,7 +243,7 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
|
||||
self.handle_parser_exception()
|
||||
return
|
||||
else:
|
||||
self.queue.put_nowait({"type": "websocket.receive", "bytes": self.bytes})
|
||||
self.queue.put_nowait({"type": "websocket.receive", "bytes": bytes(self.bytes)})
|
||||
if not self.read_paused:
|
||||
self.read_paused = True
|
||||
self.transport.pause_reading()
|
||||
|
||||
@ -2,6 +2,10 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
import struct
|
||||
from asyncio import TimerHandle
|
||||
from io import BytesIO, StringIO
|
||||
from typing import Any, Literal, cast
|
||||
from urllib.parse import unquote
|
||||
|
||||
@ -11,12 +15,7 @@ from wsproto.connection import ConnectionState
|
||||
from wsproto.extensions import Extension, PerMessageDeflate
|
||||
from wsproto.utilities import LocalProtocolError, RemoteProtocolError
|
||||
|
||||
from uvicorn._types import (
|
||||
ASGI3Application,
|
||||
ASGISendEvent,
|
||||
WebSocketEvent,
|
||||
WebSocketScope,
|
||||
)
|
||||
from uvicorn._types import ASGI3Application, ASGISendEvent, WebSocketEvent, WebSocketReceiveEvent, WebSocketScope
|
||||
from uvicorn.config import Config
|
||||
from uvicorn.logging import TRACE_LOG_LEVEL
|
||||
from uvicorn.protocols.utils import (
|
||||
@ -30,6 +29,36 @@ from uvicorn.protocols.utils import (
|
||||
from uvicorn.server import ServerState
|
||||
|
||||
|
||||
class FrameTooLargeError(Exception):
|
||||
"""Raised when accumulated websocket message bytes exceed `ws_max_size`."""
|
||||
|
||||
|
||||
class WebsocketBuffer:
|
||||
def __init__(self, max_length: int) -> None:
|
||||
self.value: BytesIO | StringIO | None = None
|
||||
self.length = 0
|
||||
self.max_length = max_length
|
||||
|
||||
def extend(self, event: events.TextMessage | events.BytesMessage) -> None:
|
||||
if self.value is None:
|
||||
self.value = StringIO() if isinstance(event, events.TextMessage) else BytesIO()
|
||||
self.value.write(event.data) # type: ignore[arg-type]
|
||||
# `ws_max_size` is a byte budget, so count UTF-8 bytes for text.
|
||||
self.length += len(event.data.encode()) if isinstance(event, events.TextMessage) else len(event.data)
|
||||
if self.length > self.max_length:
|
||||
raise FrameTooLargeError
|
||||
|
||||
def clear(self) -> None:
|
||||
self.value = None
|
||||
self.length = 0
|
||||
|
||||
def to_message(self) -> WebSocketReceiveEvent:
|
||||
if isinstance(self.value, StringIO):
|
||||
return {"type": "websocket.receive", "text": self.value.getvalue()}
|
||||
assert isinstance(self.value, BytesIO)
|
||||
return {"type": "websocket.receive", "bytes": self.value.getvalue()}
|
||||
|
||||
|
||||
class WSProtocol(asyncio.Protocol):
|
||||
def __init__(
|
||||
self,
|
||||
@ -73,15 +102,21 @@ class WSProtocol(asyncio.Protocol):
|
||||
self.writable = asyncio.Event()
|
||||
self.writable.set()
|
||||
|
||||
# Buffers
|
||||
self.bytes = b""
|
||||
self.text = ""
|
||||
# Keepalive state
|
||||
self.ping_interval = config.ws_ping_interval
|
||||
self.ping_timeout = config.ws_ping_timeout
|
||||
self.ping_timer: TimerHandle | None = None
|
||||
self.pong_timer: TimerHandle | None = None
|
||||
self.pending_ping_payload: bytes | None = None
|
||||
self.ping_sent_at: float = 0.0
|
||||
self.last_ping_rtt: float = 0.0
|
||||
|
||||
# Buffer
|
||||
self.buffer = WebsocketBuffer(self.config.ws_max_size)
|
||||
|
||||
# Protocol interface
|
||||
|
||||
def connection_made( # type: ignore[override]
|
||||
self, transport: asyncio.Transport
|
||||
) -> None:
|
||||
def connection_made(self, transport: asyncio.Transport) -> None: # type: ignore[override]
|
||||
self.connections.add(self)
|
||||
self.transport = transport
|
||||
self.server = get_local_addr(transport)
|
||||
@ -93,6 +128,7 @@ class WSProtocol(asyncio.Protocol):
|
||||
self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection made", prefix)
|
||||
|
||||
def connection_lost(self, exc: Exception | None) -> None:
|
||||
self.stop_keepalive()
|
||||
code = 1005 if self.handshake_complete else 1006
|
||||
self.queue.put_nowait({"type": "websocket.disconnect", "code": code})
|
||||
self.connections.remove(self)
|
||||
@ -120,16 +156,18 @@ class WSProtocol(asyncio.Protocol):
|
||||
|
||||
def handle_events(self) -> None:
|
||||
for event in self.conn.events():
|
||||
if self.close_sent:
|
||||
return
|
||||
if isinstance(event, events.Request):
|
||||
self.handle_connect(event)
|
||||
elif isinstance(event, events.TextMessage):
|
||||
self.handle_text(event)
|
||||
elif isinstance(event, events.BytesMessage):
|
||||
self.handle_bytes(event)
|
||||
elif isinstance(event, (events.TextMessage, events.BytesMessage)):
|
||||
self.handle_message(event)
|
||||
elif isinstance(event, events.CloseConnection):
|
||||
self.handle_close(event)
|
||||
elif isinstance(event, events.Ping):
|
||||
self.handle_ping(event)
|
||||
elif isinstance(event, events.Pong):
|
||||
self.handle_pong(event)
|
||||
|
||||
def pause_writing(self) -> None:
|
||||
"""
|
||||
@ -144,6 +182,7 @@ class WSProtocol(asyncio.Protocol):
|
||||
self.writable.set() # pragma: full coverage
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self.stop_keepalive()
|
||||
if self.handshake_complete:
|
||||
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1012})
|
||||
output = self.conn.send(wsproto.events.CloseConnection(code=1012))
|
||||
@ -185,21 +224,20 @@ class WSProtocol(asyncio.Protocol):
|
||||
task.add_done_callback(self.on_task_complete)
|
||||
self.tasks.add(task)
|
||||
|
||||
def handle_text(self, event: events.TextMessage) -> None:
|
||||
self.text += event.data
|
||||
def handle_message(self, event: events.TextMessage | events.BytesMessage) -> None:
|
||||
try:
|
||||
self.buffer.extend(event)
|
||||
except FrameTooLargeError:
|
||||
self.close_sent = True
|
||||
reason = f"Message exceeds the maximum size ({self.config.ws_max_size} bytes)"
|
||||
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1009, "reason": reason})
|
||||
if not self.transport.is_closing():
|
||||
self.transport.write(self.conn.send(wsproto.events.CloseConnection(code=1009, reason=reason)))
|
||||
self.transport.close()
|
||||
return
|
||||
if event.message_finished:
|
||||
self.queue.put_nowait({"type": "websocket.receive", "text": self.text})
|
||||
self.text = ""
|
||||
if not self.read_paused:
|
||||
self.read_paused = True
|
||||
self.transport.pause_reading()
|
||||
|
||||
def handle_bytes(self, event: events.BytesMessage) -> None:
|
||||
self.bytes += event.data
|
||||
# todo: we may want to guard the size of self.bytes and self.text
|
||||
if event.message_finished:
|
||||
self.queue.put_nowait({"type": "websocket.receive", "bytes": self.bytes})
|
||||
self.bytes = b""
|
||||
self.queue.put_nowait(self.buffer.to_message())
|
||||
self.buffer.clear()
|
||||
if not self.read_paused:
|
||||
self.read_paused = True
|
||||
self.transport.pause_reading()
|
||||
@ -213,6 +251,65 @@ class WSProtocol(asyncio.Protocol):
|
||||
def handle_ping(self, event: events.Ping) -> None:
|
||||
self.transport.write(self.conn.send(event.response()))
|
||||
|
||||
def handle_pong(self, event: events.Pong) -> None:
|
||||
# Ignore unsolicited pongs and stale pongs whose payload doesn't match the ping currently in flight.
|
||||
if self.pending_ping_payload is None or bytes(event.payload) != self.pending_ping_payload:
|
||||
return # pragma: no cover
|
||||
|
||||
self.last_ping_rtt = self.loop.time() - self.ping_sent_at
|
||||
self.pending_ping_payload = None
|
||||
# The peer answered in time; cancel the pong deadline and chain the next ping. This `schedule_ping()` call is
|
||||
# what keeps the keepalive loop running when ping_timeout is set. When ping_timeout is None the next ping is
|
||||
# already scheduled by `send_keepalive_ping`, so we must not schedule a duplicate here.
|
||||
if self.pong_timer is not None:
|
||||
self.pong_timer.cancel()
|
||||
self.pong_timer = None
|
||||
self.schedule_ping()
|
||||
|
||||
def start_keepalive(self) -> None:
|
||||
if self.ping_interval is not None and self.ping_interval > 0:
|
||||
self.schedule_ping()
|
||||
|
||||
def stop_keepalive(self) -> None:
|
||||
if self.ping_timer is not None:
|
||||
self.ping_timer.cancel()
|
||||
self.ping_timer = None
|
||||
if self.pong_timer is not None: # pragma: no cover
|
||||
self.pong_timer.cancel()
|
||||
self.pong_timer = None
|
||||
self.pending_ping_payload = None
|
||||
|
||||
def schedule_ping(self) -> None:
|
||||
assert self.ping_interval is not None
|
||||
delay = max(0.0, self.ping_interval - self.last_ping_rtt)
|
||||
self.ping_timer = self.loop.call_later(delay, self.send_keepalive_ping)
|
||||
|
||||
def send_keepalive_ping(self) -> None:
|
||||
self.ping_timer = None
|
||||
if self.close_sent or self.transport.is_closing(): # pragma: no cover
|
||||
return
|
||||
# Random 4-byte payload identifies this ping; `handle_pong` uses it to ignore stale or unsolicited pongs.
|
||||
self.pending_ping_payload = struct.pack("!I", random.getrandbits(32))
|
||||
self.ping_sent_at = self.loop.time()
|
||||
self.transport.write(self.conn.send(wsproto.events.Ping(payload=self.pending_ping_payload)))
|
||||
if self.ping_timeout is not None:
|
||||
self.pong_timer = self.loop.call_later(self.ping_timeout, self.keepalive_timeout)
|
||||
else: # pragma: no cover
|
||||
self.schedule_ping()
|
||||
|
||||
def keepalive_timeout(self) -> None:
|
||||
self.pong_timer = None
|
||||
self.pending_ping_payload = None
|
||||
if self.close_sent or self.transport.is_closing(): # pragma: no cover
|
||||
return
|
||||
if self.logger.level <= TRACE_LOG_LEVEL:
|
||||
prefix = "%s:%d - " % self.client if self.client else ""
|
||||
self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket keepalive ping timeout", prefix)
|
||||
reason = "keepalive ping timeout"
|
||||
self.transport.write(self.conn.send(wsproto.events.CloseConnection(code=1011, reason=reason)))
|
||||
self.close_sent = True
|
||||
self.transport.close()
|
||||
|
||||
def send_500_response(self) -> None:
|
||||
if self.response_started or self.handshake_complete:
|
||||
return # we cannot send responses anymore
|
||||
@ -266,6 +363,7 @@ class WSProtocol(asyncio.Protocol):
|
||||
)
|
||||
)
|
||||
self.transport.write(output)
|
||||
self.start_keepalive()
|
||||
|
||||
elif message["type"] == "websocket.close":
|
||||
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1006})
|
||||
|
||||
@ -241,9 +241,6 @@ class Server:
|
||||
async def on_tick(self, counter: int) -> bool:
|
||||
# Update the default headers, once per second.
|
||||
if counter % 10 == 0:
|
||||
if self.config.callback_progress is not None:
|
||||
self.config.callback_progress()
|
||||
|
||||
current_time = time.time()
|
||||
current_date = formatdate(current_time, usegmt=True).encode()
|
||||
|
||||
|
||||
@ -4,7 +4,6 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from uvicorn.supervisors.basereload import BaseReload
|
||||
from uvicorn.supervisors.multiprocess import Multiprocess
|
||||
from uvicorn.supervisors.multithread import Multithread
|
||||
|
||||
if TYPE_CHECKING:
|
||||
ChangeReload: type[BaseReload]
|
||||
@ -14,4 +13,4 @@ else:
|
||||
except ImportError: # pragma: no cover
|
||||
from uvicorn.supervisors.statreload import StatReload as ChangeReload
|
||||
|
||||
__all__ = ["Multiprocess", "Multithread", "ChangeReload"]
|
||||
__all__ = ["Multiprocess", "ChangeReload"]
|
||||
|
||||
@ -1,221 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from socket import socket
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
|
||||
from uvicorn.config import Config
|
||||
from uvicorn.server import Server
|
||||
from uvicorn.supervisors.multiprocess import SIGNALS
|
||||
|
||||
logger = logging.getLogger("uvicorn.error")
|
||||
|
||||
|
||||
class Thread:
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
target: Callable[[list[socket] | None], None],
|
||||
sockets: list[socket],
|
||||
) -> None:
|
||||
self.config = copy.copy(config)
|
||||
self.real_target = target
|
||||
self.sockets = sockets
|
||||
self.server: Server | None = None
|
||||
self.last_heartbeat = time.monotonic()
|
||||
self.thread = threading.Thread(target=self.target, daemon=True)
|
||||
|
||||
def _get_target(self) -> Callable[[list[socket] | None], None]:
|
||||
if inspect.ismethod(self.real_target) and isinstance(self.real_target.__self__, Server):
|
||||
self.config.callback_progress = self.record_heartbeat
|
||||
self.server = Server(config=self.config)
|
||||
return self.server.run
|
||||
return self.real_target
|
||||
|
||||
def record_heartbeat(self) -> None:
|
||||
self.last_heartbeat = time.monotonic()
|
||||
|
||||
def target(self, sockets: list[socket] | None = None) -> Any:
|
||||
sockets = [sock.dup() for sock in self.sockets]
|
||||
try:
|
||||
return self._get_target()(sockets)
|
||||
finally:
|
||||
for sock in sockets:
|
||||
if sock.fileno() != -1:
|
||||
sock.close()
|
||||
|
||||
def is_alive(self) -> bool:
|
||||
return self.thread.is_alive()
|
||||
|
||||
def start(self) -> None:
|
||||
self.thread.start()
|
||||
|
||||
def terminate(self) -> None:
|
||||
if self.server is not None:
|
||||
self.server.should_exit = True
|
||||
|
||||
def join(self, timeout: float | None = None) -> bool:
|
||||
self.thread.join(timeout=timeout)
|
||||
return not self.is_alive()
|
||||
|
||||
def is_healthy(self, timeout: float) -> bool:
|
||||
return time.monotonic() - self.last_heartbeat <= timeout
|
||||
|
||||
def is_ready_for_healthcheck(self) -> bool:
|
||||
return self.server is None or self.server.started
|
||||
|
||||
|
||||
class Multithread:
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
target: Callable[[list[socket] | None], None],
|
||||
sockets: list[socket],
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.target = target
|
||||
self.sockets = sockets
|
||||
|
||||
self.threads_num = config.workers
|
||||
self.threads: list[Thread] = []
|
||||
self.stale_threads: list[Thread] = []
|
||||
|
||||
self.should_exit = threading.Event()
|
||||
|
||||
self.signal_queue: list[int] = []
|
||||
for sig in SIGNALS:
|
||||
signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig))
|
||||
|
||||
def init_threads(self) -> None:
|
||||
for _ in range(self.threads_num):
|
||||
self.threads.append(self._start_thread())
|
||||
|
||||
def terminate_all(self) -> None:
|
||||
for thread in self._all_threads():
|
||||
thread.terminate()
|
||||
|
||||
def join_all(self) -> None:
|
||||
timeout = self._thread_shutdown_timeout
|
||||
for thread in self._all_threads():
|
||||
joined = thread.join(timeout=timeout)
|
||||
if not joined:
|
||||
logger.warning("Worker thread did not exit within %.2f seconds.", timeout)
|
||||
|
||||
def restart_all(self) -> None:
|
||||
for idx, thread in enumerate(self.threads):
|
||||
self._replace_thread(idx, thread, reason="Worker thread restarted")
|
||||
|
||||
def run(self) -> None:
|
||||
message = f"Started parent process [{os.getpid()}]"
|
||||
color_message = "Started parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True))
|
||||
logger.info(message, extra={"color_message": color_message})
|
||||
|
||||
self.init_threads()
|
||||
|
||||
while not self.should_exit.wait(0.5):
|
||||
self.handle_signals()
|
||||
self.keep_subthread_alive()
|
||||
|
||||
self.terminate_all()
|
||||
self.join_all()
|
||||
|
||||
message = f"Stopping parent process [{os.getpid()}]"
|
||||
color_message = "Stopping parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True))
|
||||
logger.info(message, extra={"color_message": color_message})
|
||||
|
||||
def keep_subthread_alive(self) -> None:
|
||||
if self.should_exit.is_set():
|
||||
return
|
||||
|
||||
for idx, thread in enumerate(self.threads):
|
||||
if self.should_exit.is_set(): # pragma: no cover
|
||||
return
|
||||
|
||||
if not thread.is_alive():
|
||||
self._replace_thread(idx, thread, reason="Child thread died")
|
||||
continue
|
||||
|
||||
if not thread.is_ready_for_healthcheck():
|
||||
continue
|
||||
|
||||
if not thread.is_healthy(timeout=self.config.timeout_worker_healthcheck):
|
||||
self._replace_thread(idx, thread, reason="Worker thread failed healthcheck")
|
||||
|
||||
def handle_signals(self) -> None:
|
||||
for sig in tuple(self.signal_queue):
|
||||
self.signal_queue.remove(sig)
|
||||
sig_name = SIGNALS[sig]
|
||||
sig_handler = getattr(self, f"handle_{sig_name.lower()}", None)
|
||||
if sig_handler is not None:
|
||||
sig_handler()
|
||||
else: # pragma: no cover
|
||||
logger.debug(f"Received signal {sig_name}, but no handler is defined for it.")
|
||||
|
||||
def handle_int(self) -> None:
|
||||
logger.info("Received SIGINT, exiting.")
|
||||
self.should_exit.set()
|
||||
|
||||
def handle_term(self) -> None:
|
||||
logger.info("Received SIGTERM, exiting.")
|
||||
self.should_exit.set()
|
||||
|
||||
def handle_break(self) -> None: # pragma: py-not-win32
|
||||
logger.info("Received SIGBREAK, exiting.")
|
||||
self.should_exit.set()
|
||||
|
||||
def handle_hup(self) -> None: # pragma: py-win32
|
||||
logger.info("Received SIGHUP, restarting threads.")
|
||||
self.restart_all()
|
||||
|
||||
def handle_ttin(self) -> None: # pragma: py-win32
|
||||
logger.info("Received SIGTTIN, increasing the number of threads.")
|
||||
self.threads_num += 1
|
||||
self.threads.append(self._start_thread())
|
||||
|
||||
def handle_ttou(self) -> None: # pragma: py-win32
|
||||
logger.info("Received SIGTTOU, decreasing number of threads.")
|
||||
if self.threads_num <= 1:
|
||||
logger.info("Already reached one thread, cannot decrease the number of threads anymore.")
|
||||
return
|
||||
self.threads_num -= 1
|
||||
thread = self.threads.pop()
|
||||
thread.terminate()
|
||||
if not thread.join(timeout=self._thread_shutdown_timeout):
|
||||
logger.warning("Worker thread did not exit within %.2f seconds.", self._thread_shutdown_timeout)
|
||||
self.stale_threads.append(thread)
|
||||
|
||||
@property
|
||||
def _thread_shutdown_timeout(self) -> float:
|
||||
if self.config.timeout_graceful_shutdown is not None:
|
||||
return float(self.config.timeout_graceful_shutdown)
|
||||
return float(self.config.timeout_worker_healthcheck)
|
||||
|
||||
def _start_thread(self) -> Thread:
|
||||
thread = Thread(self.config, self.target, self.sockets)
|
||||
thread.start()
|
||||
return thread
|
||||
|
||||
def _replace_thread(self, idx: int, thread: Thread, *, reason: str) -> None:
|
||||
thread.terminate()
|
||||
if not thread.join(timeout=self._thread_shutdown_timeout):
|
||||
logger.warning("%s; starting a replacement thread while the previous thread is still running.", reason)
|
||||
self.stale_threads.append(thread)
|
||||
else:
|
||||
logger.info(reason)
|
||||
self.threads[idx] = self._start_thread()
|
||||
|
||||
def _all_threads(self) -> list[Thread]:
|
||||
threads = list(self.threads)
|
||||
for thread in self.stale_threads:
|
||||
if thread not in threads:
|
||||
threads.append(thread)
|
||||
return threads
|
||||
Loading…
Reference in New Issue
Block a user