Compare commits

..

19 Commits

Author SHA1 Message Date
Marcelo Trylesinski
479a2c0c89
Version 0.47.0 (#2937) 2026-05-14 06:20:53 -07:00
Marcelo Trylesinski
89347fd166
Add 7-day cooldown for dependency resolution via uv exclude-newer (#2936) 2026-05-12 15:48:51 +00:00
Marcelo Trylesinski
767315b38a
Drop unused contents/actions permissions from zizmor workflow (#2935) 2026-05-12 15:08:08 +02:00
dependabot[bot]
f25ee43e68
chore(deps): bump urllib3 from 2.6.3 to 2.7.0 (#2933) 2026-05-12 07:58:53 +02:00
Stefan Wójcik
8782666189
Fix typo in docs/deployment/index.md. (#2932) 2026-05-09 19:12:56 +02:00
Eugene Toder
ad5ff87c86
Treat fd=0 as a valid file descriptor with reload/workers (#2927)
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
2026-04-30 20:26:13 +02:00
Marcelo Trylesinski
6761b2c8f9
Remove Hugging Face sponsor block from docs (#2923) 2026-04-28 17:24:46 +02:00
Marcelo Trylesinski
438f64834d
Surface sponsors on welcome page and sidebar (#2921) 2026-04-28 10:14:03 +02:00
Marcelo Trylesinski
10ddc6dd29
Add ssl_context_factory for custom SSLContext configuration (#2920) 2026-04-28 06:24:24 +00:00
Marcelo Trylesinski
b499bc4510
Eagerly import the ASGI app in the parent process (#2919) 2026-04-27 23:56:45 +02:00
Marcelo Trylesinski
b224045f59
Version 0.46.0 (#2918) 2026-04-23 06:33:22 +00:00
Marcelo Trylesinski
7375b5bf66
Use bytearray for incoming WebSocket message buffer in websockets-sansio (#2917) 2026-04-22 20:11:28 +00:00
Marcelo Trylesinski
d438fb16fe
Support ws_ping_interval and ws_ping_timeout in wsproto implementation (#2916) 2026-04-22 18:33:16 +00:00
Marcelo Trylesinski
3e6b964466
Support ws_max_size in wsproto implementation (#2915) 2026-04-22 19:17:00 +02:00
Marcelo Trylesinski
2c423bd82b
Version 0.45.0 (#2914) 2026-04-21 11:42:06 +01:00
Marcelo Trylesinski
7f027f8e25
Revert "Emit http.disconnect on server shutdown for streaming responses" (#2829) (#2913) 2026-04-21 10:22:03 +00:00
Marcelo Trylesinski
73a80c3cc8
Add --reset-contextvars flag to isolate ASGI request context (#2912) 2026-04-21 10:46:10 +01:00
Marcelo Trylesinski
45c0b568d3
Revert empty context for ASGI runs (#2911) 2026-04-21 09:51:18 +01:00
Marcelo Trylesinski
850d92656d
Raise helpful ImportError when PyYAML is missing for YAML log config (#2906)
Co-authored-by: Nuno André <mail@nunoand.re>
2026-04-19 10:06:44 +00:00
31 changed files with 888 additions and 3796 deletions

View File

@ -14,8 +14,6 @@ jobs:
permissions:
security-events: write # Required for upload-sarif (used by zizmor-action) to upload SARIF files.
contents: read # Only needed for private repos. Needed to clone the repo.
actions: read # Only needed for private repos. Needed for upload-sarif to read workflow run info.
steps:
- name: Checkout repository

View File

@ -1,241 +0,0 @@
**Uvicorn** supports HTTP/2, the major revision of the HTTP protocol that provides significant
performance improvements over HTTP/1.1.
!!! warning "Experimental Feature"
HTTP/2 support is currently **experimental** and is **not enabled by default**.
## Overview
HTTP/2 introduces several key features:
- **Multiplexing**: Multiple requests and responses can be sent simultaneously over a single TCP connection
- **Header compression**: HTTP headers are compressed using HPACK, reducing overhead
- **Binary protocol**: More efficient parsing compared to HTTP/1.1's text-based format
- **Stream prioritization**: Clients can indicate which resources are more important
## Enabling HTTP/2
To enable HTTP/2 support in Uvicorn, use the `--http2` flag:
=== "Command Line"
```bash
uvicorn main:app --http2
```
=== "Programmatic"
```python
import uvicorn
uvicorn.run("main:app", http2=True)
```
!!! note
HTTP/2 support requires the `h2` package. Install it with:
```bash
pip install h2
```
## Connection Methods
HTTP/2 can be established through two different mechanisms: **h2** (over TLS) and **h2c** (cleartext).
### h2: HTTP/2 over TLS (Recommended)
When using HTTPS, HTTP/2 is negotiated via **ALPN** (Application-Layer Protocol Negotiation)
during the TLS handshake. This is the most common and recommended way to use HTTP/2.
```mermaid
sequenceDiagram
participant Client
participant Server
Note over Client,Server: TLS Handshake with ALPN
Client->>Server: ClientHello
Note right of Client: ALPN: h2, http/1.1
Server->>Client: ServerHello
Note right of Server: ALPN: h2
Note over Client,Server: TLS Handshake Complete
Client->>Server: HTTP/2 Connection Preface
Server->>Client: HTTP/2 SETTINGS Frame
Note over Client,Server: HTTP/2 Connection Established
Client->>Server: HEADERS (Stream 1)
Server->>Client: HEADERS + DATA (Stream 1)
```
For testing it locally, you can generate a self-signed certificate and use it to test the HTTP/2 connection.
```bash
openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes -subj "/CN=localhost"
```
Then create a simple ASGI application to test the connection.
```python title="main.py"
async def app(scope, receive, send):
await send({"type": "http.response.start", "status": 200, "headers": []})
await send({"type": "http.response.body", "body": b"ok"})
```
Run Uvicorn with the `--http2` flag and the SSL certificate files.
=== "Command Line"
```bash
uvicorn app:app --http2 --ssl-keyfile key.pem --ssl-certfile cert.pem
```
=== "Programmatic"
```python
import uvicorn
uvicorn.run(
"app:app",
http2=True,
ssl_keyfile="key.pem",
ssl_certfile="cert.pem",
)
```
You can test the connection using curl with the `--http2` flag.
```bash
# Use -k to skip certificate verification for self-signed certs
curl -v --http2 -k https://localhost:8000/
```
### h2c: HTTP/2 Cleartext
HTTP/2 can also be used without TLS through an **upgrade mechanism**. The client sends an
HTTP/1.1 request with upgrade headers, and if the server supports HTTP/2, it responds with
`101 Switching Protocols`.
```mermaid
sequenceDiagram
participant Client
participant Server
Note over Client,Server: h2c Upgrade Process
Client->>Server: HTTP/1.1 GET /
Note right of Client: Headers:<br/>Upgrade: h2c<br/>HTTP2-Settings: [base64]<br/>Connection: Upgrade, HTTP2-Settings
Server->>Client: HTTP/1.1 101 Switching Protocols
Note right of Server: Headers:<br/>Upgrade: h2c<br/>Connection: Upgrade
Note over Client,Server: Connection Upgraded to HTTP/2
Server->>Client: HTTP/2 SETTINGS Frame
Client->>Server: HTTP/2 SETTINGS ACK
Server->>Client: HEADERS + DATA (Stream 1)
Note right of Server: Response to original request
```
Using the same `main.py` from the h2 section above, run Uvicorn with the `--http2` flag.
=== "Command Line"
```bash
uvicorn main:app --http2
```
=== "Programmatic"
```python
import uvicorn
uvicorn.run("main:app", http2=True)
```
You can test the connection using curl with the `--http2` flag.
```bash
curl -v --http2 http://localhost:8000/
```
!!! warning
h2c is not supported by web browsers. Browsers only support HTTP/2 over TLS (h2).
h2c is primarily useful for internal services, proxies, or testing.
## ASGI Scope
When a request comes in over HTTP/2, the ASGI scope will have `http_version` set to `"2"`:
```python
async def app(scope, receive, send):
assert scope["type"] == "http"
print(f"HTTP Version: {scope['http_version']}") # "2" for HTTP/2
# ... handle request
```
## Using with Reverse Proxies
In production, Uvicorn is typically deployed behind a reverse proxy like Nginx, Caddy, or HAProxy.
**Benefits of using a reverse proxy:**
- **TLS termination**: The proxy handles SSL/TLS encryption, offloading this work from your application
- **Load balancing**: Distribute requests across multiple Uvicorn instances
- **Static file serving**: Serve static assets directly without hitting your Python application
- **Request buffering**: Buffer slow clients to free up Uvicorn workers
- **Security**: Hide your application server details, add rate limiting, and filter malicious requests
- **HTTP/2 to clients**: Provide HTTP/2 benefits to clients even if using HTTP/1.1 internally
```mermaid
flowchart LR
Client <-->|HTTP/2 over TLS| Proxy
Proxy <-->|HTTP/1.1 or HTTP/2| Uvicorn
style Client fill:#e1f5fe
style Proxy fill:#fff3e0
style Uvicorn fill:#e8f5e9
```
### Proxy HTTP/2 Upstream Support
**HTTP/2 Upstream** refers to the protocol used between the proxy and the backend server (Uvicorn).
While all modern proxies support HTTP/2 for client connections, support for HTTP/2 to backend
servers varies.
**Multiplexing** is HTTP/2's ability to send multiple requests simultaneously over a single TCP
connection. Without multiplexing, each request requires its own connection, negating a key
benefit of HTTP/2. Some proxies support HTTP/2 upstream but open a new connection per request,
which means they don't truly multiplex.
Here's the current state of proxy support (as of 2026-02-02):
| Proxy | HTTP/2 Upstream | Multiplexing | Documentation |
|-------|-----------------|--------------|---------------|
| **Envoy** | Yes | Yes | [Connection Pooling Docs](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/connection_pooling) |
| **Caddy** | Yes | Yes | [reverse_proxy Docs](https://caddyserver.com/docs/caddyfile/directives/reverse_proxy) |
| **HAProxy** | Yes | Yes | [HTTP/2 Docs](https://www.haproxy.com/documentation/hapee/latest/load-balancing/protocols/http-2/) |
| **Traefik** | Yes | Yes | [ServersTransport Docs](https://doc.traefik.io/traefik/routing/services/) |
| **Apache** | Partial | No | [mod_proxy_http2 Docs](https://httpd.apache.org/docs/trunk/mod/mod_proxy_http2.html) |
| **Nginx** | Limited | No | [Trac Ticket #923](https://trac.nginx.org/nginx/ticket/923) |
### Recommended Proxy Configuration
For most production deployments, using **HTTP/1.1 with keepalive** connections between the proxy
and Uvicorn is recommended. This provides excellent performance while being simple to configure
and debug.
!!! note "h2c Prior Knowledge Not Supported"
Uvicorn's h2c implementation uses the HTTP/1.1 upgrade mechanism. It does **not** support
"prior knowledge" h2c where clients send the HTTP/2 connection preface directly. This means
proxy configurations using `h2c://` URLs will not work.
For HTTP/2 between proxy and Uvicorn, use **h2 over TLS** (ALPN negotiation).
## Performance Considerations
HTTP/2 provides the most benefit when:
- **High latency connections**: Multiplexing reduces round-trip overhead
- **Many concurrent requests**: Multiple streams share a single connection
- **Large headers**: HPACK compression reduces header overhead
For internal, low-latency connections (like proxy to backend), HTTP/1.1 with keepalive
often performs comparably to HTTP/2, which is why nginx's approach is still effective.

57
docs/css/extra.css Normal file
View File

@ -0,0 +1,57 @@
.md-nav__sponsors {
display: flex;
flex-direction: column;
align-items: center;
gap: 0.5rem;
margin: 1.2rem 0.4rem 0.6rem;
padding: 0.9rem 0.6rem 0.8rem;
background-color: color-mix(in srgb, var(--md-primary-fg-color) 8%, transparent);
border-radius: 0.4rem;
}
.md-nav__sponsors-title {
margin: 0 0 0.1rem;
font-size: 0.6rem;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.05em;
color: var(--md-default-fg-color--light);
}
.md-nav__sponsor {
display: flex;
align-items: center;
justify-content: center;
width: 100%;
padding: 0.25rem;
border-radius: 0.2rem;
transition: opacity 0.15s;
}
.md-nav__sponsor:hover {
opacity: 0.75;
}
.md-nav__sponsor img {
max-width: 100%;
max-height: 1.6rem;
object-fit: contain;
}
.md-nav__sponsor-cta {
display: inline-block;
margin-top: 0.15rem;
padding: 0.25rem 0.6rem;
font-size: 0.65rem;
font-weight: 600;
color: var(--md-primary-bg-color);
background-color: var(--md-primary-fg-color);
border-radius: 0.2rem;
text-decoration: none;
transition: opacity 0.15s;
}
.md-nav__sponsor-cta:hover {
opacity: 0.85;
color: var(--md-primary-bg-color);
}

View File

@ -82,7 +82,7 @@ The default process manager monitors the status of child processes and automatic
You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.)
- `SIGHUP`: Work processeses are graceful restarted one after another. If you update the code, the new worker process will use the new code.
- `SIGHUP`: Work processes are graceful restarted one after another. If you update the code, the new worker process will use the new code.
- `SIGTTIN`: Increase the number of worker processes by one.
- `SIGTTOU`: Decrease the number of worker processes by one.
@ -225,6 +225,36 @@ It's also possible to use certificates with uvicorn's worker for gunicorn.
$ gunicorn --keyfile=./key.pem --certfile=./cert.pem -k uvicorn.workers.UvicornWorker main:app
```
### Customizing the SSL context
For TLS scenarios that the `--ssl-*` flags don't cover (e.g., mutual TLS, custom `SSLContext.options`, bumping `minimum_version`, loading certificates from memory), pass an `ssl_context_factory` to `uvicorn.run()` or `Config`.
The factory receives the `Config` instance and a `default_ssl_context_factory` callable that builds the standard context from the `ssl_*` settings on `Config`. Use it to start from uvicorn's default and mutate it, or ignore it and build your own context from scratch - the `ssl_*` settings are only consumed by the default factory, so if you don't call it they're effectively unused.
```python
import ssl
from collections.abc import Callable
import uvicorn
from uvicorn.config import Config
def ssl_context_factory(config: Config, default_ssl_context_factory: Callable[[], ssl.SSLContext]) -> ssl.SSLContext:
context = default_ssl_context_factory()
context.minimum_version = ssl.TLSVersion.TLSv1_3
return context
uvicorn.run(
"main:app",
ssl_keyfile="key.pem",
ssl_certfile="cert.pem",
ssl_context_factory=ssl_context_factory,
)
```
The factory is called inside each worker process, so it works with `--reload` and `--workers > 1`. The factory itself must be picklable in those modes (a top-level function is fine; lambdas and local closures are not). The `ssl_*` settings on `Config` are only consumed by `default_ssl_context_factory()`; if you build the context yourself without calling it, those settings are ignored.
## Proxies and Forwarded Headers
When running an application behind one or more proxies, certain information about the request is lost.

BIN
docs/img/fastapi-logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

View File

@ -42,7 +42,19 @@ Until recently Python has lacked a minimal low-level server/application interfac
async frameworks. The [ASGI specification](https://asgi.readthedocs.io/en/latest/) fills this gap,
and means we're now able to start building a common set of tooling usable across all async frameworks.
Uvicorn currently supports **HTTP/1.1**, **HTTP/2**, and **WebSockets**.
Uvicorn currently supports **HTTP/1.1** and **WebSockets**.
## Sponsorship
Help us keep Uvicorn maintained and sustainable by [becoming a sponsor](https://github.com/sponsors/Kludex).
**Current sponsors:**
<div style="display: flex; flex-wrap: wrap; gap: 2rem; align-items: center; margin: 1rem 0;">
<a href="https://fastapi.tiangolo.com">
<img src="img/fastapi-logo.png" alt="FastAPI" style="height: 80px;">
</a>
</div>
## Quickstart

View File

@ -44,4 +44,15 @@
{{ item.render(nav_item, path, 1) }}
{% endfor %}
</ul>
<!-- Sponsors -->
<div class="md-nav__sponsors">
<p class="md-nav__sponsors-title">Sponsors</p>
<a href="https://fastapi.tiangolo.com" title="FastAPI" class="md-nav__sponsor">
<img src="{{ 'img/fastapi-logo.png' | url }}" alt="FastAPI">
</a>
<a href="https://github.com/sponsors/Kludex" class="md-nav__sponsor-cta">
Become a sponsor! ❤️
</a>
</div>
</nav>

View File

@ -2,6 +2,49 @@
toc_depth: 2
---
## 0.47.0 (May 14, 2026)
### Added
* Add `ssl_context_factory` for custom `SSLContext` configuration (#2920)
### Changed
* Eagerly import the ASGI app in the parent process (#2919)
### Fixed
* Treat `fd=0` as a valid file descriptor with reload/workers (#2927)
## 0.46.0 (April 23, 2026)
### Added
* Support `ws_max_size` in `wsproto` implementation (#2915)
* Support `ws_ping_interval` and `ws_ping_timeout` in `wsproto` implementation (#2916)
### Changed
* Use `bytearray` for incoming WebSocket message buffer in `websockets-sansio` (#2917)
## 0.45.0 (April 21, 2026)
### Added
* Add `--reset-contextvars` flag to isolate ASGI request context (#2912)
* Accept `os.PathLike` for `log_config` (#2905)
* Accept `log_level` strings case-insensitively (#2907)
### Changed
* Revert "Emit `http.disconnect` on server shutdown for streaming responses" (#2913)
* Revert "Explicitly start ASGI run with empty context" (#2911)
### Fixed
* Preserve forwarded client ports in proxy headers middleware (#2903)
* Raise helpful `ImportError` when PyYAML is missing for YAML log config (#2906)
## 0.44.0 (April 6, 2026)
### Added

View File

@ -39,6 +39,7 @@ uvicorn itself.
* `APP` - The ASGI application to run, in the format `"<module>:<attribute>"`.
* `--factory` - Treat `APP` as an application factory, i.e. a `() -> <ASGI app>` callable.
* `--app-dir <path>` - Look for APP in the specified directory by adding it to the PYTHONPATH. **Default:** *Current working directory*.
* `--reset-contextvars` - Run each ASGI request in a fresh `contextvars.Context`. Workaround for a [context leak in asyncio](https://github.com/python/cpython/issues/140947); only relevant when using the `asyncio` event loop (uvloop is not affected). Enabling this hides any context set in the lifespan or by external instrumentation from ASGI handlers. **Default:** *False*.
## Socket Binding
@ -92,12 +93,11 @@ Using Uvicorn with watchfiles will enable the following options (which are other
* `--loop <str>` - Set the event loop implementation. The uvloop implementation provides greater performance, but is not compatible with Windows or PyPy. **Options:** *'auto', 'asyncio', 'uvloop'.* **Default:** *'auto'*.
* `--http <str>` - Set the HTTP protocol implementation. The httptools implementation provides greater performance, but it not compatible with PyPy. **Options:** *'auto', 'h11', 'httptools'.* **Default:** *'auto'*.
* `--http2` - Enable HTTP/2 support. Requires the `h2` package (`pip install h2`). When enabled, HTTP/2 is available via ALPN negotiation over TLS (h2) and via cleartext upgrade (h2c). See the [HTTP/2 documentation](concepts/http2.md) for details. **Default:** *False*.
* `--ws <str>` - Set the WebSockets protocol implementation. Either of the `websockets` and `wsproto` packages are supported. There are two versions of `websockets` supported: `websockets` and `websockets-sansio`. Use `'none'` to ignore all websocket requests. **Options:** *'auto', 'none', 'websockets', 'websockets-sansio', 'wsproto'.* **Default:** *'auto'*.
* `--ws-max-size <int>` - Set the WebSockets max message size, in bytes. Only available with the `websockets` protocol. **Default:** *16777216* (16 MB).
* `--ws-max-size <int>` - Set the WebSockets max message size, in bytes. **Default:** *16777216* (16 MB).
* `--ws-max-queue <int>` - Set the maximum length of the WebSocket incoming message queue. Only available with the `websockets` protocol. **Default:** *32*.
* `--ws-ping-interval <float>` - Set the WebSockets ping interval, in seconds. Available with the `websockets` and `websockets-sansio` protocols. **Default:** *20.0*.
* `--ws-ping-timeout <float>` - Set the WebSockets ping timeout, in seconds. Available with the `websockets` and `websockets-sansio` protocols. **Default:** *20.0*.
* `--ws-ping-interval <float>` - Set the WebSockets ping interval, in seconds. **Default:** *20.0*.
* `--ws-ping-timeout <float>` - Set the WebSockets ping timeout, in seconds. **Default:** *20.0*.
* `--ws-per-message-deflate <bool>` - Enable/disable WebSocket per-message-deflate compression. Only available with the `websockets` protocol. **Default:** *True*.
* `--lifespan <str>` - Set the Lifespan protocol implementation. **Options:** *'auto', 'on', 'off'.* **Default:** *'auto'*.
* `--h11-max-incomplete-event-size <int>` - Set the maximum number of bytes to buffer of an incomplete event. Only available for `h11` HTTP protocol implementation. **Default:** *16384* (16 KB).
@ -138,6 +138,8 @@ The [SSL context](https://docs.python.org/3/library/ssl.html#ssl.SSLContext) can
To understand more about the SSL context options, please refer to the [Python documentation](https://docs.python.org/3/library/ssl.html).
For advanced TLS scenarios that the flags above don't cover (e.g., mutual TLS, certificate pinning, custom `SSLContext.options`), pass an `ssl_context_factory` to `uvicorn.run()` or `Config`. See [Running with HTTPS](deployment/index.md#customizing-the-ssl-context) for details.
## Resource Limits
* `--limit-concurrency <int>` - Maximum number of concurrent connections or tasks to allow, before issuing HTTP 503 responses. Useful for ensuring known memory usage patterns even under over-resourced loads.

View File

@ -1,184 +0,0 @@
# ✨ Sponsor Starlette & Uvicorn ✨
Thank you for your interest in sponsoring Starlette and Uvicorn! ❤️
Your support *directly* contributes to the ongoing development, maintenance, and long-term sustainability of both projects.
<div style="display: flex; justify-content: center; gap: 4rem; margin: 2rem 0; text-align: center;">
<div style="padding: 1rem;">
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">67M+</h3>
<p>Starlette Downloads/Month</p>
</div>
<div style="padding: 1rem;">
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">57M+</h3>
<p>Uvicorn Downloads/Month</p>
</div>
<div style="padding: 1rem;">
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">19K+</h3>
<p>Combined GitHub Stars</p>
</div>
</div>
## Why Sponsor?
While Starlette and Uvicorn are part of the [Encode](https://github.com/encode) organization,
they have been primarily maintained by [**Marcelo Trylesinski (Kludex)**](https://github.com/Kludex)
for the past several years. His dedication and consistent work have been instrumental in keeping
these projects robust, secure, and up-to-date.
This sponsorship page was created to give the community an opportunity to support Marcelo's continued
efforts in maintaining and improving both projects. Your sponsorship directly enables him to
dedicate more time and resources to maintaining and improving these essential tools:
- [x] **Active Development:** Developing new features, enhancing existing ones, and
keeping both projects aligned with the latest developments in the Python and ASGI ecosystems. 💻
- [x] **Community Support:** Providing better support, addressing user issues,
and cultivating a welcoming environment for contributors. 🤝
- [x] **Long-Term Stability:** Ensuring the long-term viability of both projects through strategic
planning and addressing technical debt. 🌳
- [x] **Bug Fixes & Maintenance:** Providing prompt attention to bug reports and
general maintenance to keep the projects reliable. 🔨
- [x] **Security:** Ensuring robust security practices, conducting regular security audits, and
promptly addressing vulnerabilities to protect millions of production deployments. 🔒
- [x] **Documentation:** Creating comprehensive guides, tutorials, and examples to help users of all skill levels. 📖
## How Sponsorship Works
We currently manage sponsorships *exclusively* through **GitHub Sponsors**. This platform integrates seamlessly with the GitHub ecosystem, making it easy for organizations to contribute.
<div style="text-align: center; padding: 2rem; margin: 2rem 0; background: linear-gradient(135deg, #6e5494, #24292e); border-radius: 10px; color: white;">
<h2 style="color: white; margin-bottom: 1rem;">🌟 Become a Sponsor Today! 🌟</h2>
<p style="margin-bottom: 1.5rem; font-size: 1.1em;">Your support helps keep Starlette and Uvicorn growing stronger!</p>
<a href="https://github.com/sponsors/Kludex"
style="display: inline-block; padding: 1rem 2rem; background-color: #238636; color: white; text-decoration: none; border-radius: 6px; font-size: 1.2em; font-weight: bold; transition: all 0.3s ease-in-out;"
onmouseover="this.style.backgroundColor='#2ea043';this.style.transform='translateY(-2px)'"
onmouseout="this.style.backgroundColor='#238636';this.style.transform='translateY(0)'">
❤️ Sponsor on GitHub
</a>
</div>
## Sponsorship Tiers 🎁
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 1.5rem; margin: 2rem 0;">
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; display: flex; flex-direction: column;">
<h3 style="color: #cd7f32;">🥉 Bronze Sponsor</h3>
<div style="font-size: 1.5em; margin: 1rem 0;">$100<span style="font-size: 0.6em;">/month</span></div>
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
<li>✓ Company name on Sponsors page</li>
<li>✓ Small logo with link</li>
<li>✓ Our eternal gratitude</li>
</ul>
<div style="text-align: center; margin-top: auto;">
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #cd7f32; color: white; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
Become a Bronze Sponsor
</a>
</div>
</div>
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; display: flex; flex-direction: column;">
<h3 style="color: #c0c0c0;">🥈 Silver Sponsor</h3>
<div style="font-size: 1.5em; margin: 1rem 0;">$250<span style="font-size: 0.6em;">/month</span></div>
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
<li>✓ All Bronze benefits</li>
<li>✓ Medium-sized logo</li>
<li>✓ Release notes mention</li>
</ul>
<div style="text-align: center; margin-top: auto;">
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #c0c0c0; color: white; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
Become a Silver Sponsor
</a>
</div>
</div>
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; position: relative; overflow: hidden; display: flex; flex-direction: column;">
<div style="position: absolute; top: 10px; right: -25px; background: #238636; color: white; padding: 5px 30px; transform: rotate(45deg);">
Popular
</div>
<h3 style="color: #ffd700;">🥇 Gold Sponsor</h3>
<div style="font-size: 1.5em; margin: 1rem 0;">$500<span style="font-size: 0.6em;">/month</span></div>
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
<li>✓ All Silver benefits</li>
<li>✓ Large logo on main pages</li>
<li>✓ Priority support</li>
</ul>
<div style="text-align: center; margin-top: auto;">
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #ffd700; color: black; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
Become a Gold Sponsor
</a>
</div>
</div>
</div>
<div style="text-align: center; margin: 2rem 0;">
<h3>🤝 Custom Sponsor</h3>
<p>Looking for something different? <a href="mailto:marcelotryle@gmail.com">Contact us</a> to discuss custom sponsorship options!</p>
</div>
## Current Sponsors
**Thank you to our generous sponsors!** 🙏
<div style="display: flex; flex-direction: column; gap: 3rem; margin: 2rem 0;">
<div>
<h3 style="text-align: center; color: #ffd700; margin-bottom: 1.5rem;">🏆 Gold Sponsors</h3>
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
<a href="https://fastapi.tiangolo.com" style="text-decoration: none;">
<div style="width: 200px; background: #f6f8fa; border-radius: 8px; padding: 1rem; text-align: center;">
<div style="height: 100px; display: flex; align-items: center; justify-content: center; margin-bottom: 0.75rem;">
<img src="https://fastapi.tiangolo.com/img/logo-margin/logo-teal.png" alt="FastAPI" style="max-width: 100%; max-height: 100%; object-fit: contain;">
</div>
<p style="margin: 0; color: #57606a; font-size: 0.9em;">Modern, fast web framework for building APIs with Python 3.8+</p>
</div>
</a>
</div>
</div>
<div>
<h3 style="text-align: center; color: #c0c0c0; margin-bottom: 1.5rem;">🥈 Silver Sponsors</h3>
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
<!-- Add Silver Sponsors here -->
</div>
</div>
<div>
<h3 style="text-align: center; color: #cd7f32; margin-bottom: 1.5rem;">🥉 Bronze Sponsors</h3>
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
<!-- Add Bronze Sponsors here -->
</div>
</div>
</div>
## Alternative Sponsorship Platforms
<div style="background: #f6f8fa; padding: 1.5rem; border-radius: 8px; margin: 2rem 0;">
<h3>📢 We Want Your Input!</h3>
<p>We are currently evaluating whether to expand our sponsorship options beyond GitHub Sponsors. If your company would be interested in sponsoring Starlette and Uvicorn but prefers to use a different platform (e.g., Open Collective, direct invoicing), please let us know!</p>
<p>Your feedback is invaluable in helping us make sponsorship as accessible as possible. Share your thoughts by:</p>
<ul>
<li>Opening a discussion on our <a href="https://github.com/Kludex/starlette/discussions">GitHub repository</a></li>
<li>Contacting us directly at <a href="mailto:marcelotryle@gmail.com">marcelotryle@gmail.com</a></li>
</ul>
</div>
<a id="acknowledgments"></a>
## Community & Future Plans 🌟
We want to express our deepest gratitude to all the contributors who have helped shape Starlette and
Uvicorn over the years. These projects wouldn't be what they are today without the incredible work of
every single contributor.
Special thanks to some of our most impactful contributors:
- **Tom Christie** ([@tomchristie](https://github.com/tomchristie)) - The original creator of Starlette and Uvicorn.
- **Adrian Garcia Badaracco** ([@adriangb](https://github.com/adriangb)) - Major contributor to Starlette.
- **Thomas Grainger** ([@graingert](https://github.com/graingert)) - Major contributor to AnyIO, and significant contributions to Starlette and Uvicorn.
- **Alex Grönholm** ([@agronholm](https://github.com/agronholm)) - Creator of AnyIO.
- **Florimond Manca** ([@florimondmanca](https://github.com/florimondmanca)) - Important contributions to Starlette and Uvicorn.
If you want your name removed from the list above, or if I forgot a significant contributor, please let me know.
You can view all contributors on GitHub:
[Starlette Contributors](https://github.com/Kludex/starlette/graphs/contributors) / [Uvicorn Contributors](https://github.com/Kludex/uvicorn/graphs/contributors).
While the current sponsorship program directly supports Marcelo's maintenance work, we are exploring ways
to distribute funding to other key contributors in the future. This initiative is still in early planning
stages, as we want to ensure a fair and sustainable model that recognizes the valuable contributions of
our community members.

View File

@ -54,7 +54,6 @@ nav:
- Concepts:
- ASGI: concepts/asgi.md
- Lifespan: concepts/lifespan.md
- HTTP/2: concepts/http2.md
- Logging: concepts/logging.md
- WebSockets: concepts/websockets.md
- Event Loop: concepts/event-loop.md
@ -63,7 +62,6 @@ nav:
- Docker: deployment/docker.md
- Release Notes: release-notes.md
- Contributing: contributing.md
- Sponsorship: sponsorship.md
extra:
analytics:
@ -81,6 +79,9 @@ extra:
- icon: fontawesome/solid/globe
link: https://fastapiexpert.com
extra_css:
- css/extra.css
markdown_extensions:
- attr_list
- admonition

View File

@ -40,7 +40,6 @@ dependencies = [
[project.optional-dependencies]
standard = [
"colorama>=0.4; sys_platform == 'win32'",
"h2>=4.2.0",
"httptools>=0.6.3",
"python-dotenv>=0.13",
"PyYAML>=5.1",
@ -83,7 +82,8 @@ docs = [
[tool.uv]
default-groups = ["dev", "docs"]
required-version = ">=0.8.6"
required-version = ">=0.9.17"
exclude-newer = "7 days"
[project.scripts]
uvicorn = "uvicorn.main:main"

View File

@ -131,7 +131,7 @@ class MockLoop:
self._tasks: list[asyncio.Task[Any]] = []
self._later: list[MockTimerHandle] = []
def create_task(self, coroutine: Any, **kwargs: Any) -> Any:
def create_task(self, coroutine: Any) -> Any:
self._tasks.insert(0, coroutine)
return MockTask()

View File

@ -1,165 +0,0 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable
from typing import Any
class MockSSLObject:
def __init__(self, alpn_protocol: str | None = None):
self._alpn_protocol = alpn_protocol
def selected_alpn_protocol(self) -> str | None:
return self._alpn_protocol
class MockTransport:
def __init__(
self,
sockname: tuple[str, int] | None = None,
peername: tuple[str, int] | None = None,
sslcontext: bool = False,
ssl_object: Any = None,
):
self.sockname = ("127.0.0.1", 8000) if sockname is None else sockname
self.peername = ("127.0.0.1", 8001) if peername is None else peername
self.sslcontext = sslcontext
self._ssl_object = ssl_object
self.closed = False
self.buffer = b""
self.read_paused = False
self._protocol: asyncio.Protocol | None = None
def get_extra_info(self, key: Any):
return {
"sockname": self.sockname,
"peername": self.peername,
"sslcontext": self.sslcontext,
"ssl_object": self._ssl_object,
}.get(key)
def write(self, data: bytes):
assert not self.closed
self.buffer += data
def close(self):
assert not self.closed
self.closed = True
def pause_reading(self):
self.read_paused = True
def resume_reading(self):
self.read_paused = False
def is_closing(self):
return self.closed
def clear_buffer(self):
self.buffer = b""
def set_protocol(self, protocol: asyncio.Protocol):
self._protocol = protocol
def get_protocol(self) -> asyncio.Protocol | None:
return self._protocol
class MockTimerHandle:
def __init__(
self,
loop_later_list: list[MockTimerHandle],
delay: float,
callback: Callable[[], None],
args: tuple[Any, ...],
):
self.loop_later_list = loop_later_list
self.delay = delay
self.callback = callback
self.args = args
self.cancelled = False
def cancel(self):
if not self.cancelled:
self.cancelled = True
self.loop_later_list.remove(self)
class MockTask:
def add_done_callback(self, callback: Callable[[], None]):
pass
class MockLoop:
def __init__(self) -> None:
self._tasks: list[asyncio.Task[Any]] = []
self._later: list[MockTimerHandle] = []
def create_task(self, coroutine: Any, **kwargs: Any) -> Any:
self._tasks.insert(0, coroutine)
return MockTask()
def call_later(self, delay: float, callback: Callable[[], None], *args: Any) -> MockTimerHandle:
handle = MockTimerHandle(self._later, delay, callback, args)
self._later.insert(0, handle)
return handle
async def run_one(self) -> Any:
return await self._tasks.pop()
def run_later(self, with_delay: float) -> None:
later: list[MockTimerHandle] = []
for timer_handle in self._later:
if with_delay >= timer_handle.delay:
timer_handle.callback(*timer_handle.args)
else:
later.append(timer_handle)
self._later = later
H2C_UPGRADE_REQUEST = b"\r\n".join(
[
b"GET / HTTP/1.1",
b"Host: example.org",
b"Connection: Upgrade, HTTP2-Settings",
b"Upgrade: h2c",
b"HTTP2-Settings: AAMAAABkAAQBAAAAAAIAAAAA",
b"",
b"",
]
)
def h2c_upgrade_request(
*,
method: bytes = b"GET",
connection: bytes | None = b"Upgrade, HTTP2-Settings",
upgrade: bytes | None = b"h2c",
settings: bytes | None = b"AAMAAABkAAQBAAAAAAIAAAAA",
extra_settings: bytes | None = None,
content_length: bytes | None = None,
transfer_encoding: bytes | None = None,
body: bytes = b"",
) -> bytes:
"""Build an h2c upgrade request, optionally with malformed pieces.
Set any keyword to None to omit that header. `extra_settings` adds a second
HTTP2-Settings header (used to assert duplicate-header rejection).
`content_length` / `transfer_encoding` / `body` build a request that
carries a body so tests can assert the upgrade is refused.
"""
lines = [method + b" / HTTP/1.1", b"Host: example.org"]
if connection is not None:
lines.append(b"Connection: " + connection)
if upgrade is not None:
lines.append(b"Upgrade: " + upgrade)
if settings is not None:
lines.append(b"HTTP2-Settings: " + settings)
if extra_settings is not None:
lines.append(b"HTTP2-Settings: " + extra_settings)
if content_length is not None:
lines.append(b"Content-Length: " + content_length)
if transfer_encoding is not None:
lines.append(b"Transfer-Encoding: " + transfer_encoding)
lines.extend([b"", body])
return b"\r\n".join(lines)

View File

@ -5,17 +5,11 @@ import logging
import socket
import threading
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, TypeAlias
import pytest
from tests.protocols.http_utils import (
H2C_UPGRADE_REQUEST,
MockLoop,
MockSSLObject,
MockTransport,
h2c_upgrade_request,
)
from tests.response import Response
from uvicorn import Server
from uvicorn._types import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Scope
@ -32,15 +26,7 @@ try:
except ModuleNotFoundError: # pragma: no cover
skip_if_no_httptools = pytest.mark.skipif(True, reason="httptools is not installed")
try:
from uvicorn.protocols.http.h2_impl import H2Protocol
skip_if_no_h2 = pytest.mark.skipif(False, reason="h2 is installed")
except ModuleNotFoundError: # pragma: no cover
skip_if_no_h2 = pytest.mark.skipif(True, reason="h2 is not installed")
if TYPE_CHECKING:
from uvicorn.protocols.http.h2_impl import H2Protocol
from uvicorn.protocols.http.httptools_impl import HttpToolsProtocol
from uvicorn.protocols.websockets.websockets_impl import WebSocketProtocol
from uvicorn.protocols.websockets.wsproto_impl import WSProtocol as _WSProtocol
@ -48,7 +34,6 @@ if TYPE_CHECKING:
WSProtocol: TypeAlias = WebSocketProtocol | _WSProtocol
HTTPProtocol: TypeAlias = H11Protocol | HttpToolsProtocol
pytestmark = pytest.mark.anyio
@ -137,6 +122,18 @@ UPGRADE_REQUEST = b"\r\n".join(
]
)
UPGRADE_HTTP2_REQUEST = b"\r\n".join(
[
b"GET / HTTP/1.1",
b"Host: example.org",
b"Connection: upgrade",
b"Upgrade: h2c",
b"Sec-WebSocket-Version: 11",
b"",
b"",
]
)
INVALID_REQUEST_TEMPLATE = b"\r\n".join(
[
b"%s",
@ -170,6 +167,92 @@ UPGRADE_REQUEST_ERROR_FIELD = b"\r\n".join(
)
class MockTransport:
def __init__(
self, sockname: tuple[str, int] | None = None, peername: tuple[str, int] | None = None, sslcontext: bool = False
):
self.sockname = ("127.0.0.1", 8000) if sockname is None else sockname
self.peername = ("127.0.0.1", 8001) if peername is None else peername
self.sslcontext = sslcontext
self.closed = False
self.buffer = b""
self.read_paused = False
def get_extra_info(self, key: Any):
return {"sockname": self.sockname, "peername": self.peername, "sslcontext": self.sslcontext}.get(key)
def write(self, data: bytes):
assert not self.closed
self.buffer += data
def close(self):
assert not self.closed
self.closed = True
def pause_reading(self):
self.read_paused = True
def resume_reading(self):
self.read_paused = False
def is_closing(self):
return self.closed
def clear_buffer(self):
self.buffer = b""
def set_protocol(self, protocol: asyncio.Protocol):
pass
class MockTimerHandle:
def __init__(
self, loop_later_list: list[MockTimerHandle], delay: float, callback: Callable[[], None], args: tuple[Any, ...]
):
self.loop_later_list = loop_later_list
self.delay = delay
self.callback = callback
self.args = args
self.cancelled = False
def cancel(self):
if not self.cancelled:
self.cancelled = True
self.loop_later_list.remove(self)
class MockLoop:
def __init__(self):
self._tasks: list[asyncio.Task[Any]] = []
self._later: list[MockTimerHandle] = []
def create_task(self, coroutine: Any) -> Any:
self._tasks.insert(0, coroutine)
return MockTask()
def call_later(self, delay: float, callback: Callable[[], None], *args: Any) -> MockTimerHandle:
handle = MockTimerHandle(self._later, delay, callback, args)
self._later.insert(0, handle)
return handle
async def run_one(self):
return await self._tasks.pop()
def run_later(self, with_delay: float) -> None:
later: list[MockTimerHandle] = []
for timer_handle in self._later:
if with_delay >= timer_handle.delay:
timer_handle.callback(*timer_handle.args)
else:
later.append(timer_handle)
self._later = later
class MockTask:
def add_done_callback(self, callback: Callable[[], None]):
pass
class MockProtocol(asyncio.Protocol):
loop: MockLoop
transport: MockTransport
@ -182,15 +265,10 @@ def get_connected_protocol(
app: ASGIApplication,
http_protocol_cls: type[HTTPProtocol],
lifespan: LifespanOff | LifespanOn | None = None,
alpn_protocol: str | None = None,
**kwargs: Any,
) -> MockProtocol:
loop = MockLoop()
if alpn_protocol is not None:
ssl_object = MockSSLObject(alpn_protocol=alpn_protocol)
transport = MockTransport(sslcontext=True, ssl_object=ssl_object)
else:
transport = MockTransport()
transport = MockTransport()
config = Config(app=app, **kwargs)
lifespan = lifespan or LifespanOff(config)
server_state = ServerState()
@ -697,76 +775,6 @@ async def test_shutdown_during_idle(http_protocol_cls: type[HTTPProtocol]):
assert protocol.transport.is_closing()
async def test_shutdown_during_streaming_sends_disconnect(http_protocol_cls: type[HTTPProtocol]):
"""When the server shuts down during an SSE/streaming response,
receive() should return http.disconnect so the ASGI app can stop."""
got_disconnect_event = False
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
nonlocal got_disconnect_event
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [(b"content-type", b"text/event-stream")],
}
)
await send({"type": "http.response.body", "body": b"data: hello\n\n", "more_body": True})
# This simulates an SSE app waiting for disconnect
message = await receive()
if message["type"] == "http.disconnect":
got_disconnect_event = True
protocol = get_connected_protocol(app, http_protocol_cls)
protocol.data_received(SIMPLE_GET_REQUEST)
# Trigger server shutdown while the app is streaming
protocol.shutdown() # type: ignore[attr-defined]
await protocol.loop.run_one()
assert got_disconnect_event
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
assert b"data: hello" in protocol.transport.buffer
assert protocol.transport.is_closing()
async def test_shutdown_during_streaming_allows_send_before_exit(http_protocol_cls: type[HTTPProtocol]):
"""During server shutdown, the app should still be able to send() data
(e.g., a farewell SSE event) before returning."""
farewell_sent = False
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
nonlocal farewell_sent
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [
(b"content-type", b"text/event-stream"),
(b"transfer-encoding", b"chunked"),
],
}
)
await send({"type": "http.response.body", "body": b"data: hello\n\n", "more_body": True})
# Wait for disconnect
message = await receive()
assert message["type"] == "http.disconnect"
# Send a farewell event — this should still work since the transport is open
await send({"type": "http.response.body", "body": b"data: goodbye\n\n", "more_body": True})
farewell_sent = True
protocol = get_connected_protocol(app, http_protocol_cls)
protocol.data_received(SIMPLE_GET_REQUEST)
protocol.shutdown() # type: ignore[attr-defined]
await protocol.loop.run_one()
assert farewell_sent
assert b"data: hello" in protocol.transport.buffer
assert b"data: goodbye" in protocol.transport.buffer
async def test_100_continue_sent_when_body_consumed(http_protocol_cls: type[HTTPProtocol]):
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
body = b""
@ -858,6 +866,16 @@ async def test_unsupported_ws_upgrade_request_warn_on_auto(
assert msg in warnings
async def test_http2_upgrade_request(http_protocol_cls: type[HTTPProtocol], ws_protocol_cls: type[WSProtocol]):
app = Response("Hello, world", media_type="text/plain")
protocol = get_connected_protocol(app, http_protocol_cls, ws=ws_protocol_cls)
protocol.data_received(UPGRADE_HTTP2_REQUEST)
await protocol.loop.run_one()
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
assert b"Hello, world" in protocol.transport.buffer
async def asgi3app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
pass
@ -1122,129 +1140,6 @@ async def test_header_upgrade_is_not_websocket_depend_installed(
assert b"Hello, world" in protocol.transport.buffer
@skip_if_no_h2
async def test_alpn_h2_upgrade(http_protocol_cls: type[HTTPProtocol]):
"""Test that ALPN h2 negotiation switches to H2Protocol."""
app = Response("Hello, world", media_type="text/plain")
protocol = get_connected_protocol(app, http_protocol_cls, alpn_protocol="h2", http2=True)
assert isinstance(protocol.transport.get_protocol(), H2Protocol)
@skip_if_no_h2
async def test_alpn_h2_upgrade_not_triggered_without_h2_negotiation(http_protocol_cls: type[HTTPProtocol]):
"""Test that ALPN upgrade doesn't happen when http/1.1 was negotiated."""
app = Response("Hello, world", media_type="text/plain")
protocol = get_connected_protocol(app, http_protocol_cls, alpn_protocol="http/1.1", http2=True)
assert protocol.transport.get_protocol() is None
@skip_if_no_h2
async def test_alpn_h2_upgrade_disabled_with_http2_false(http_protocol_cls: type[HTTPProtocol]):
"""Test that ALPN h2 upgrade is disabled when http2=False."""
app = Response("Hello, world", media_type="text/plain")
protocol = get_connected_protocol(app, http_protocol_cls, alpn_protocol="h2", http2=False)
assert protocol.transport.get_protocol() is None
@skip_if_no_h2
async def test_h2c_upgrade(http_protocol_cls: type[HTTPProtocol]):
"""Test HTTP/2 cleartext (h2c) upgrade via Upgrade header."""
app = Response("Hello, world", media_type="text/plain")
protocol = get_connected_protocol(app, http_protocol_cls, http2=True)
protocol.data_received(H2C_UPGRADE_REQUEST)
assert b"HTTP/1.1 101 Switching Protocols" in protocol.transport.buffer
assert b"Upgrade: h2c" in protocol.transport.buffer
assert isinstance(protocol.transport.get_protocol(), H2Protocol)
# Consume the H2 request task to avoid unawaited coroutine warning
h2_protocol = protocol.transport.get_protocol()
await h2_protocol.loop.run_one() # type: ignore[union-attr]
@skip_if_no_h2
async def test_h2c_upgrade_disabled_with_http2_false(http_protocol_cls: type[HTTPProtocol]):
"""Test that h2c upgrade is disabled when http2=False."""
app = Response("Hello, world", media_type="text/plain")
protocol = get_connected_protocol(app, http_protocol_cls, http2=False)
protocol.data_received(H2C_UPGRADE_REQUEST)
await protocol.loop.run_one()
assert b"HTTP/1.1 101 Switching Protocols" not in protocol.transport.buffer
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
@skip_if_no_h2
@pytest.mark.parametrize(
"request_bytes",
[
pytest.param(h2c_upgrade_request(settings=None), id="missing_http2_settings_header"),
pytest.param(h2c_upgrade_request(connection=b"Upgrade"), id="missing_connection_token"),
pytest.param(h2c_upgrade_request(settings=b""), id="empty_http2_settings_value"),
pytest.param(
h2c_upgrade_request(extra_settings=b"AAMAAABkAAQBAAAAAAIAAAAA"),
id="duplicate_http2_settings_header",
),
# Requests with a body cannot be safely upgraded - the HTTP/1.1 body would
# need to feed stream 1 in HTTP/2, and we don't carry it across the switch.
pytest.param(
h2c_upgrade_request(method=b"POST", content_length=b"5", body=b"hello"),
id="content_length_body",
),
pytest.param(
h2c_upgrade_request(method=b"POST", transfer_encoding=b"chunked"),
id="transfer_encoding_chunked",
),
],
)
async def test_h2c_upgrade_request_falls_back_to_http1(
http_protocol_cls: type[HTTPProtocol],
request_bytes: bytes,
caplog: pytest.LogCaptureFixture,
):
"""When the upgrade request is structurally malformed (missing required
headers, has a body), the server falls back to plain HTTP/1.1 instead of
sending `101 Switching Protocols`."""
caplog.set_level(logging.WARNING, logger="uvicorn.error")
app = Response("Hello, world", media_type="text/plain")
protocol = get_connected_protocol(app, http_protocol_cls, http2=True)
protocol.data_received(request_bytes)
await protocol.loop.run_one()
assert b"HTTP/1.1 101 Switching Protocols" not in protocol.transport.buffer
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
assert protocol.transport.get_protocol() is None
assert any("Ignoring h2c upgrade" in record.getMessage() for record in caplog.records)
@skip_if_no_h2
@pytest.mark.parametrize(
"request_bytes",
[
pytest.param(h2c_upgrade_request(settings=b"!!not-base64!!"), id="non_base64_http2_settings"),
# `AAAA` decodes to 3 bytes, which is shorter than one SETTINGS entry (6 bytes).
pytest.param(h2c_upgrade_request(settings=b"AAAA"), id="invalid_settings_frame_body"),
# `AAIAAAAC` encodes ENABLE_PUSH=2, which is a valid SETTINGS body shape but
# an invalid value (must be 0 or 1).
pytest.param(h2c_upgrade_request(settings=b"AAIAAAAC"), id="invalid_settings_value"),
],
)
async def test_h2c_upgrade_with_invalid_settings_returns_400(
http_protocol_cls: type[HTTPProtocol],
request_bytes: bytes,
):
"""When the HTTP2-Settings payload is well-framed enough to attempt the
upgrade but hyper-h2 rejects the SETTINGS contents, the server replies
with 400 instead of leaving the wire in a half-broken state."""
app = Response("Hello, world", media_type="text/plain")
protocol = get_connected_protocol(app, http_protocol_cls, http2=True)
protocol.data_received(request_bytes)
assert b"HTTP/1.1 400" in protocol.transport.buffer
assert b"HTTP/1.1 101 Switching Protocols" not in protocol.transport.buffer
assert protocol.transport.get_protocol() is None
async def test_header_upgrade_is_websocket_depend_not_installed(
caplog: pytest.LogCaptureFixture, http_protocol_cls: type[HTTPProtocol]
):

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,7 @@ import websockets
import websockets.client
import websockets.exceptions
from websockets.extensions.permessage_deflate import ClientPerMessageDeflateFactory
from websockets.frames import Opcode
from websockets.typing import Subprotocol
from tests.response import Response
@ -43,6 +44,7 @@ if TYPE_CHECKING:
HTTPProtocol: TypeAlias = "type[H11Protocol | HttpToolsProtocol]"
WSProtocol: TypeAlias = "type[_WSProtocol | WebSocketProtocol]"
KeepaliveWSProtocol: TypeAlias = "type[_WSProtocol | WebSocketsSansIOProtocol]"
pytestmark = pytest.mark.anyio
@ -751,6 +753,61 @@ async def test_send_binary_data_to_server_bigger_than_default_on_websockets(
assert ws.close_code == expected_result
async def test_fragmented_message_exceeding_max_size(
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
"""Stream non-FIN fragments past `ws_max_size` - the server must close with 1009."""
class App(WebSocketResponse):
async def websocket_connect(self, message: WebSocketConnectEvent):
await self.send({"type": "websocket.accept"})
config = Config(
app=App, ws=ws_protocol_cls, http=http_protocol_cls, lifespan="off", ws_max_size=2048, port=unused_tcp_port
)
async with run_server(config):
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}") as ws:
payload = b"A" * 1024
with pytest.raises(websockets.exceptions.ConnectionClosed) as exc_info:
await ws.write_frame(False, Opcode.BINARY, payload)
for _ in range(63): # 64 KiB total, well past 2 KiB budget
await ws.write_frame(False, Opcode.CONT, payload)
await ws.recv()
assert exc_info.value.rcvd is not None
assert exc_info.value.rcvd.code == 1009
async def test_fragmented_message_reassembly(
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
"""Server reassembles a fragmented message and delivers it to the app intact."""
received: list[bytes] = []
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
assert scope["type"] == "websocket"
connect = await receive()
assert connect["type"] == "websocket.connect"
await send({"type": "websocket.accept"})
message = await receive()
assert message["type"] == "websocket.receive"
payload = message.get("bytes")
assert payload is not None
received.append(payload)
await send({"type": "websocket.close"})
config = Config(app=app, ws=ws_protocol_cls, http=http_protocol_cls, lifespan="off", port=unused_tcp_port)
async with run_server(config):
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}") as ws:
payload = b"A" * 512
await ws.write_frame(False, Opcode.BINARY, payload)
for _ in range(4):
await ws.write_frame(False, Opcode.CONT, payload)
await ws.write_frame(True, Opcode.CONT, payload)
assert received == [b"A" * 512 * 6]
async def test_server_reject_connection(
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
@ -1205,7 +1262,27 @@ async def test_lifespan_state(ws_protocol_cls: WSProtocol, http_protocol_cls: HT
assert expected_states == actual_states
async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
@pytest.fixture(
params=[
pytest.param(
"uvicorn.protocols.websockets.wsproto_impl:WSProtocol",
marks=skip_if_no_wsproto,
id="wsproto",
),
pytest.param(
"uvicorn.protocols.websockets.websockets_sansio_impl:WebSocketsSansIOProtocol", id="websockets-sansio"
),
]
)
def keepalive_ws_protocol_cls(request: pytest.FixtureRequest):
from uvicorn.importer import import_from_string
return import_from_string(request.param)
async def test_server_keepalive_ping_pong(
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
while True:
message = await receive()
@ -1216,7 +1293,7 @@ async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unuse
config = Config(
app=app,
ws=WebSocketsSansIOProtocol,
ws=keepalive_ws_protocol_cls,
http=http_protocol_cls,
lifespan="off",
ws_ping_interval=0.1,
@ -1227,7 +1304,7 @@ async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unuse
# The websockets client auto-responds to ping frames, keeping the connection alive.
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}", ping_interval=None):
protocol = list(server.server_state.connections)[0]
assert isinstance(protocol, WebSocketsSansIOProtocol)
assert isinstance(protocol, (_WSProtocol, WebSocketsSansIOProtocol))
# Wait until the server sends at least one keepalive ping, then
# sleep past the timeout window and ensure the connection stays open.
@ -1242,7 +1319,9 @@ async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unuse
assert not protocol.transport.is_closing()
async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
async def test_server_keepalive_ping_timeout(
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
while True:
message = await receive()
@ -1253,7 +1332,7 @@ async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, un
config = Config(
app=app,
ws=WebSocketsSansIOProtocol,
ws=keepalive_ws_protocol_cls,
http=http_protocol_cls,
lifespan="off",
ws_ping_interval=0.1,
@ -1272,7 +1351,9 @@ async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, un
assert exc_info.value.rcvd.reason == "keepalive ping timeout"
async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
async def test_server_keepalive_disabled(
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
while True:
message = await receive()
@ -1283,7 +1364,7 @@ async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused
config = Config(
app=app,
ws=WebSocketsSansIOProtocol,
ws=keepalive_ws_protocol_cls,
http=http_protocol_cls,
lifespan="off",
ws_ping_interval=None,
@ -1292,5 +1373,5 @@ async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused
async with run_server(config) as server:
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}", ping_interval=None):
protocol = list(server.server_state.connections)[0]
assert isinstance(protocol, WebSocketsSansIOProtocol)
assert isinstance(protocol, (_WSProtocol, WebSocketsSansIOProtocol))
assert protocol.ping_timer is None

View File

@ -6,7 +6,6 @@ import json
import logging
import os
import socket
import ssl
import sys
from collections.abc import Callable, Iterator
from contextlib import closing
@ -367,6 +366,15 @@ def test_log_config_yaml(
mocked_logging_config_module.dictConfig.assert_called_once_with(logging_config)
def test_log_config_yaml_missing_pyyaml(mocked_logging_config_module: MagicMock, mocker: MockerFixture) -> None:
"""
Test that a helpful error is raised when PyYAML is not installed.
"""
mocker.patch.dict(sys.modules, {"yaml": None})
with pytest.raises(ImportError, match=r"Install the PyYAML package or uvicorn\[standard\]"):
Config(app=asgi_app, log_config="log_config.yaml")
def test_log_config_pathlike(
mocked_logging_config_module: MagicMock,
logging_config: dict[str, Any],
@ -545,6 +553,37 @@ def test_bind_fd_works_with_reload_or_workers(reload: bool, workers: int): # pr
fdsock.close()
@pytest.fixture
def stdin_socket() -> Iterator[socket.socket]: # pragma: py-win32
with closing(socket.socket(socket.AF_INET)) as sock:
sock.bind(("127.0.0.1", 0))
saved_stdin = os.dup(0)
os.dup2(sock.fileno(), 0)
try:
yield sock
finally:
os.dup2(saved_stdin, 0)
os.close(saved_stdin)
@pytest.mark.parametrize(
"reload, workers",
[
(True, 1),
(False, 2),
],
ids=["--reload=True --workers=1", "--reload=False --workers=2"],
)
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
def test_bind_stdin_works_with_reload_or_workers(
reload: bool, workers: int, stdin_socket: socket.socket
): # pragma: py-win32
config = Config(app=asgi_app, fd=0, reload=reload, workers=workers)
config.load()
with closing(config.bind_socket()) as sock:
assert sock.getsockname() == stdin_socket.getsockname()
@pytest.mark.parametrize(
"reload, workers, expected",
[
@ -621,58 +660,3 @@ def test_setup_event_loop_is_removed(caplog: pytest.LogCaptureFixture) -> None:
AttributeError, match="The `setup_event_loop` method was replaced by `get_loop_factory` in uvicorn 0.36.0."
):
config.setup_event_loop()
def test_http2_with_ssl_sets_alpn(
tls_ca_certificate_pem_path: str,
tls_ca_certificate_private_key_path: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that http2=True with SSL configures ALPN protocols."""
recorded: list[list[str]] = []
original = ssl.SSLContext.set_alpn_protocols
def spy(self: ssl.SSLContext, protocols: list[str]) -> None:
recorded.append(protocols)
original(self, protocols)
monkeypatch.setattr(ssl.SSLContext, "set_alpn_protocols", spy)
config = Config(
app=asgi_app,
http2=True,
ssl_certfile=tls_ca_certificate_pem_path,
ssl_keyfile=tls_ca_certificate_private_key_path,
)
config.load()
assert config.is_ssl is True
assert config.ssl is not None
assert config.h2_protocol_class is not None
assert ["h2", "http/1.1"] in recorded
def test_http2_as_string_path() -> None:
"""Test that http2 can be specified as a string import path."""
config = Config(
app=asgi_app,
http2="uvicorn.protocols.http.h2_impl:H2Protocol",
)
config.load()
from uvicorn.protocols.http.h2_impl import H2Protocol
assert config.h2_protocol_class is H2Protocol
def test_http2_as_class() -> None:
"""Test that http2 can be specified as a protocol class directly."""
from uvicorn.protocols.http.h2_impl import H2Protocol
config = Config(
app=asgi_app,
http2=H2Protocol,
)
config.load()
assert config.h2_protocol_class is H2Protocol

View File

@ -1,7 +1,9 @@
import importlib
import inspect
import socket
import sys
from logging import WARNING
from pathlib import Path
import httpx
import pytest
@ -12,6 +14,7 @@ from uvicorn import Server
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
from uvicorn.config import Config
from uvicorn.main import run
from uvicorn.supervisors import Multiprocess
pytestmark = pytest.mark.anyio
@ -85,6 +88,61 @@ def test_run_invalid_app_config_combination(caplog: pytest.LogCaptureFixture) ->
)
def test_run_fails_fast_in_parent_on_bad_app_path(
caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Bad app path with `--workers > 1` exits in the parent.
Regression for https://github.com/encode/uvicorn/discussions/2440: without
parent-side validation the supervisor restarts dying workers forever.
"""
def fail(*args: object, **kwargs: object) -> None: # pragma: no cover
pytest.fail("parent reached supervisor; should have exited on bad app path")
monkeypatch.setattr(Config, "bind_socket", fail)
monkeypatch.setattr(Multiprocess, "run", fail)
with pytest.raises(SystemExit) as exit_exception:
run("tests.test_main:nonexistent_attr", workers=2)
assert exit_exception.value.code == 1
assert any("Error loading ASGI app" in record.message for record in caplog.records)
def test_run_imports_app_before_starting_event_loop(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""`uvicorn.run()` imports the app before `Server.run` opens the event loop.
Regression for https://github.com/encode/uvicorn/issues/941: an app whose
module body calls `asyncio.run(...)` crashes with "loop already running"
if Uvicorn imports it inside the server's event loop. The parent must
import the app synchronously, before `Server.run` enters `asyncio.run`.
"""
module = tmp_path / "eager_async_app.py"
module.write_text(
"import asyncio\n"
"async def _build():\n"
" async def app(scope, receive, send):\n"
" pass\n"
" return app\n"
"app = asyncio.run(_build())\n"
)
monkeypatch.syspath_prepend(str(tmp_path))
imported_before_server_run: list[bool] = []
def tracking_run(self: Server, sockets: object = None) -> None:
imported_before_server_run.append("eager_async_app" in sys.modules)
self.started = True
monkeypatch.setattr(Server, "run", tracking_run)
# The import side effect (`eager_async_app` lands in `sys.modules`) must
# happen before `Server.run`, which is where the event loop opens.
run("eager_async_app:app")
assert imported_before_server_run == [True]
def test_run_startup_failure(caplog: pytest.LogCaptureFixture) -> None:
async def app(scope, receive, send):
assert scope["type"] == "lifespan"

View File

@ -15,12 +15,12 @@ import pytest
from tests.protocols.test_http import SIMPLE_GET_REQUEST
from tests.utils import run_server
from uvicorn import Server
from uvicorn._types import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Scope
from uvicorn.config import Config
from uvicorn.protocols.http.flow_control import HIGH_WATER_LIMIT
from uvicorn.protocols.http.h11_impl import H11Protocol
from uvicorn.protocols.http.httptools_impl import HttpToolsProtocol
from uvicorn.server import Server
pytestmark = pytest.mark.anyio
@ -142,23 +142,25 @@ async def test_limit_max_requests_jitter(
config = Config(
app=app, limit_max_requests=1, limit_max_requests_jitter=2, port=unused_tcp_port, http=http_protocol_cls
)
server = Server(config=config)
limit = server.limit_max_requests
assert limit is not None
assert 1 <= limit <= 3
task = asyncio.create_task(server.serve())
while not server.started:
await asyncio.sleep(0.01)
async with httpx.AsyncClient() as client:
for _ in range(limit + 1):
await client.get(f"http://127.0.0.1:{unused_tcp_port}")
await task
async with run_server(config) as server:
limit = server.limit_max_requests
assert limit is not None
assert 1 <= limit <= 3
async with httpx.AsyncClient() as client:
tasks = [client.get(f"http://127.0.0.1:{unused_tcp_port}") for _ in range(limit + 1)]
await asyncio.gather(*tasks)
assert f"Maximum request limit of {limit} exceeded. Terminating process." in caplog.text
@contextlib.asynccontextmanager
async def server(*, app: ASGIApplication, port: int, http_protocol_cls: type[H11Protocol | HttpToolsProtocol]):
config = Config(app=app, port=port, loop="asyncio", http=http_protocol_cls)
async def _raw_server(
*,
app: ASGIApplication,
port: int,
http_protocol_cls: type[H11Protocol | HttpToolsProtocol],
reset_contextvars: bool = False,
):
config = Config(app=app, port=port, loop="asyncio", http=http_protocol_cls, reset_contextvars=reset_contextvars)
server = Server(config=config)
task = asyncio.create_task(server.serve())
@ -186,10 +188,36 @@ async def server(*, app: ASGIApplication, port: int, http_protocol_cls: type[H11
await task
async def test_no_contextvars_pollution_asyncio(
async def test_contextvars_preserved_by_default(
http_protocol_cls: type[H11Protocol | HttpToolsProtocol], unused_tcp_port: int
):
"""Non-regression test for https://github.com/encode/uvicorn/issues/2167."""
"""By default, context set outside the ASGI task is visible inside it."""
ctx: contextvars.ContextVar[str] = contextvars.ContextVar("ctx")
ctx.set("outer-value")
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
assert scope["type"] == "http"
while True:
message = await receive()
assert message["type"] == "http.request"
if not message["more_body"]:
break
body = json.dumps({"ctx": ctx.get("MISSING")}).encode("utf-8")
headers = [(b"content-type", b"application/json"), (b"content-length", str(len(body)).encode("utf-8"))]
await send({"type": "http.response.start", "status": 200, "headers": headers})
await send({"type": "http.response.body", "body": body})
async with _raw_server(app=app, http_protocol_cls=http_protocol_cls, port=unused_tcp_port) as extract_json_body:
assert await extract_json_body(SIMPLE_GET_REQUEST) == {"ctx": "outer-value"}
async def test_reset_contextvars_asyncio(
http_protocol_cls: type[H11Protocol | HttpToolsProtocol], unused_tcp_port: int
):
"""With reset_contextvars=True, each ASGI run starts with a fresh context.
Non-regression test for https://github.com/encode/uvicorn/issues/2167.
"""
default_contextvars = {c.name for c in contextvars.copy_context().keys()}
ctx: contextvars.ContextVar[str] = contextvars.ContextVar("ctx")
@ -209,14 +237,13 @@ async def test_no_contextvars_pollution_asyncio(
if not message["more_body"]:
break
# return the initial context for empty assertion
body = json.dumps(initial_context).encode("utf-8")
headers = [(b"content-type", b"application/json"), (b"content-length", str(len(body)).encode("utf-8"))]
await send({"type": "http.response.start", "status": 200, "headers": headers})
await send({"type": "http.response.body", "body": body})
# body has to be larger than HIGH_WATER_LIMIT to trigger a reading pause on the main thread
# and a resumption inside the ASGI task
# body larger than HIGH_WATER_LIMIT forces a reading pause on the main thread
# and a resumption inside the ASGI task, which is where the original pollution showed up.
large_body = b"a" * (HIGH_WATER_LIMIT + 1)
large_request = b"\r\n".join(
[
@ -229,6 +256,8 @@ async def test_no_contextvars_pollution_asyncio(
]
)
async with server(app=app, http_protocol_cls=http_protocol_cls, port=unused_tcp_port) as extract_json_body:
async with _raw_server(
app=app, http_protocol_cls=http_protocol_cls, port=unused_tcp_port, reset_contextvars=True
) as extract_json_body:
assert await extract_json_body(large_request) == {}
assert await extract_json_body(SIMPLE_GET_REQUEST) == {}

View File

@ -1,9 +1,17 @@
from __future__ import annotations
import ssl
from collections.abc import Callable
from typing import TypeAlias
import httpx
import pytest
from tests.utils import run_server
from uvicorn.config import Config
DefaultFactory: TypeAlias = Callable[[], ssl.SSLContext]
async def app(scope, receive, send):
assert scope["type"] == "http"
@ -92,3 +100,108 @@ async def test_run_password(
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
assert response.status_code == 204
@pytest.mark.anyio
async def test_run_ssl_context_factory_default(
tls_ca_ssl_context: ssl.SSLContext,
tls_certificate_server_cert_path: str,
tls_certificate_private_key_path: str,
unused_tcp_port: int,
) -> None:
"""A factory that just delegates to the default factory should produce a working server."""
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
return default_ssl_context_factory()
config = Config(
app=app,
loop="asyncio",
limit_max_requests=1,
ssl_keyfile=tls_certificate_private_key_path,
ssl_certfile=tls_certificate_server_cert_path,
ssl_context_factory=ssl_context_factory,
port=unused_tcp_port,
)
async with run_server(config):
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
assert response.status_code == 204
@pytest.mark.anyio
async def test_run_ssl_context_factory_custom(
tls_ca_ssl_context: ssl.SSLContext,
tls_certificate_server_cert_path: str,
tls_certificate_private_key_path: str,
unused_tcp_port: int,
) -> None:
"""A factory that builds its own SSLContext from scratch should work without ssl_keyfile/ssl_certfile."""
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(tls_certificate_server_cert_path, tls_certificate_private_key_path)
return ctx
config = Config(
app=app,
loop="asyncio",
limit_max_requests=1,
ssl_context_factory=ssl_context_factory,
port=unused_tcp_port,
)
async with run_server(config):
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
assert response.status_code == 204
def test_ssl_context_factory_mutates_default(
tls_certificate_server_cert_path: str,
tls_certificate_private_key_path: str,
) -> None:
"""The factory can call the default and mutate the result (e.g., bump TLS minimum version)."""
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
ctx = default_ssl_context_factory()
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
return ctx
config = Config(
app=app,
ssl_keyfile=tls_certificate_private_key_path,
ssl_certfile=tls_certificate_server_cert_path,
ssl_context_factory=ssl_context_factory,
)
config.load()
assert config.is_ssl
assert isinstance(config.ssl, ssl.SSLContext)
assert config.ssl.minimum_version == ssl.TLSVersion.TLSv1_3
def test_default_ssl_context_factory_requires_ssl_certfile() -> None:
"""Calling `default_ssl_context_factory()` without `ssl_certfile` raises a clear error."""
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
return default_ssl_context_factory()
config = Config(app=app, ssl_context_factory=ssl_context_factory)
with pytest.raises(RuntimeError, match="requires `ssl_certfile`"):
config.load()
def test_ssl_context_factory_must_return_ssl_context() -> None:
def bad_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> object:
return "not an SSLContext"
config = Config(app=app, ssl_context_factory=bad_factory) # type: ignore[arg-type]
with pytest.raises(TypeError, match="must return an `ssl.SSLContext`"):
config.load()
def test_is_ssl_true_when_only_factory_set() -> None:
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
return ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) # pragma: no cover
config = Config(app=app, ssl_context_factory=ssl_context_factory)
assert config.is_ssl is True

39
uv.lock generated
View File

@ -574,28 +574,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" },
]
[[package]]
name = "h2"
version = "4.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "hpack" },
{ name = "hyperframe" },
]
sdist = { url = "https://files.pythonhosted.org/packages/1d/17/afa56379f94ad0fe8defd37d6eb3f89a25404ffc71d4d848893d270325fc/h2-4.3.0.tar.gz", hash = "sha256:6c59efe4323fa18b47a632221a1888bd7fde6249819beda254aeca909f221bf1", size = 2152026, upload-time = "2025-08-23T18:12:19.778Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/69/b2/119f6e6dcbd96f9069ce9a2665e0146588dc9f88f29549711853645e736a/h2-4.3.0-py3-none-any.whl", hash = "sha256:c438f029a25f7945c69e0ccf0fb951dc3f73a5f6412981daee861431b70e2bdd", size = 61779, upload-time = "2025-08-23T18:12:17.779Z" },
]
[[package]]
name = "hpack"
version = "4.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2c/48/71de9ed269fdae9c8057e5a4c0aa7402e8bb16f2c6e90b3aa53327b113f8/hpack-4.1.0.tar.gz", hash = "sha256:ec5eca154f7056aa06f196a557655c5b009b382873ac8d1e66e79e87535f1dca", size = 51276, upload-time = "2025-01-22T21:44:58.347Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/c6/80c95b1b2b94682a72cbdbfb85b81ae2daffa4291fbfa1b1464502ede10d/hpack-4.1.0-py3-none-any.whl", hash = "sha256:157ac792668d995c657d93111f46b4535ed114f0c9c8d672271bbec7eae1b496", size = 34357, upload-time = "2025-01-22T21:44:56.92Z" },
]
[[package]]
name = "httpcore"
version = "1.0.9"
@ -667,15 +645,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
]
[[package]]
name = "hyperframe"
version = "6.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/02/e7/94f8232d4a74cc99514c13a9f995811485a6903d48e5d952771ef6322e30/hyperframe-6.1.0.tar.gz", hash = "sha256:f630908a00854a7adeabd6382b43923a4c4cd4b821fcb527e6ab9e15382a3b08", size = 26566, upload-time = "2025-01-22T21:41:49.302Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/48/30/47d0bf6072f7252e6521f3447ccfa40b421b6824517f82854703d0f5a98b/hyperframe-6.1.0-py3-none-any.whl", hash = "sha256:b03380493a519fce58ea5af42e4a42317bf9bd425596f7a0835ffce80f1a42e5", size = 13007, upload-time = "2025-01-22T21:41:47.295Z" },
]
[[package]]
name = "id"
version = "1.6.1"
@ -1788,11 +1757,11 @@ wheels = [
[[package]]
name = "urllib3"
version = "2.6.3"
version = "2.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" }
sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" },
{ url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" },
]
[[package]]
@ -1807,7 +1776,6 @@ dependencies = [
[package.optional-dependencies]
standard = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "h2" },
{ name = "httptools" },
{ name = "python-dotenv" },
{ name = "pyyaml" },
@ -1850,7 +1818,6 @@ requires-dist = [
{ name = "click", specifier = ">=7.0" },
{ name = "colorama", marker = "sys_platform == 'win32' and extra == 'standard'", specifier = ">=0.4" },
{ name = "h11", specifier = ">=0.8" },
{ name = "h2", marker = "extra == 'standard'", specifier = ">=4.2.0" },
{ name = "httptools", marker = "extra == 'standard'", specifier = ">=0.6.3" },
{ name = "python-dotenv", marker = "extra == 'standard'", specifier = ">=0.13" },
{ name = "pyyaml", marker = "extra == 'standard'", specifier = ">=5.1" },

View File

@ -1,5 +1,5 @@
from uvicorn.config import Config
from uvicorn.main import Server, main, run
__version__ = "0.44.0"
__version__ = "0.47.0"
__all__ = ["main", "run", "Config", "Server"]

View File

@ -12,7 +12,7 @@ import sys
from collections.abc import Awaitable, Callable
from configparser import RawConfigParser
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, Literal
from typing import IO, Any, Literal
import click
@ -25,9 +25,6 @@ from uvicorn.middleware.message_logger import MessageLoggerMiddleware
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
from uvicorn.middleware.wsgi import WSGIMiddleware
if TYPE_CHECKING:
from uvicorn.protocols.http.h2_impl import HTTP2Protocol
HTTPProtocolType = Literal["auto", "h11", "httptools"]
WSProtocolType = Literal["auto", "none", "websockets", "websockets-sansio", "wsproto"]
LifespanType = Literal["auto", "on", "off"]
@ -113,7 +110,6 @@ def create_ssl_context(
cert_reqs: int,
ca_certs: str | os.PathLike[str] | None,
ciphers: str | None,
alpn_protocols: list[str] | None = None,
) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl_version)
get_password = (lambda: password) if password else None
@ -123,8 +119,6 @@ def create_ssl_context(
ctx.load_verify_locations(ca_certs)
if ciphers:
ctx.set_ciphers(ciphers)
if alpn_protocols:
ctx.set_alpn_protocols(alpn_protocols)
return ctx
@ -160,7 +154,7 @@ def resolve_reload_patterns(patterns_list: list[str], directories_list: list[str
directories = list(map(lambda x: x.resolve(), directories))
directories = list({reload_path for reload_path in directories if is_dir(reload_path)})
children: list[Path] = []
children = []
for j in range(len(directories)):
for k in range(j + 1, len(directories)): # pragma: full coverage
if directories[j] in directories[k].parents:
@ -191,7 +185,6 @@ class Config:
fd: int | None = None,
loop: LoopFactoryType | str = "auto",
http: type[asyncio.Protocol] | HTTPProtocolType | str = "auto",
http2: bool | type[HTTP2Protocol] | str = False,
ws: type[asyncio.Protocol] | WSProtocolType | str = "auto",
ws_max_size: int = 16 * 1024 * 1024,
ws_max_queue: int = 32,
@ -232,9 +225,11 @@ class Config:
ssl_cert_reqs: int = ssl.CERT_NONE,
ssl_ca_certs: str | os.PathLike[str] | None = None,
ssl_ciphers: str = "TLSv1",
ssl_context_factory: Callable[[Config, Callable[[], ssl.SSLContext]], ssl.SSLContext] | None = None,
headers: list[tuple[str, str]] | None = None,
factory: bool = False,
h11_max_incomplete_event_size: int | None = None,
reset_contextvars: bool = False,
):
self.app = app
self.host = host
@ -243,7 +238,6 @@ class Config:
self.fd = fd
self.loop = loop
self.http = http
self.http2 = http2
self.ws = ws
self.ws_max_size = ws_max_size
self.ws_max_queue = ws_max_queue
@ -279,10 +273,12 @@ class Config:
self.ssl_cert_reqs = ssl_cert_reqs
self.ssl_ca_certs = ssl_ca_certs
self.ssl_ciphers = ssl_ciphers
self.ssl_context_factory = ssl_context_factory
self.headers: list[tuple[str, str]] = headers or []
self.encoded_headers: list[tuple[bytes, bytes]] = []
self.factory = factory
self.h11_max_incomplete_event_size = h11_max_incomplete_event_size
self.reset_contextvars = reset_contextvars
self.loaded = False
self.configure_logging()
@ -363,7 +359,7 @@ class Config:
@property
def is_ssl(self) -> bool:
return bool(self.ssl_keyfile or self.ssl_certfile)
return bool(self.ssl_keyfile or self.ssl_certfile or self.ssl_context_factory)
@property
def use_subprocess(self) -> bool:
@ -386,9 +382,12 @@ class Config:
loaded_config = json.load(file)
logging.config.dictConfig(loaded_config)
elif isinstance(self.log_config, str) and self.log_config.endswith((".yaml", ".yml")):
# Install the PyYAML package or the uvicorn[standard] optional
# dependencies to enable this functionality.
import yaml
try:
import yaml
except ImportError as e:
raise ImportError(
"Install the PyYAML package or uvicorn[standard] to use `--log-config` with YAML files."
) from e
with open(self.log_config) as file:
loaded_config = yaml.safe_load(file)
@ -410,15 +409,43 @@ class Config:
logging.getLogger("uvicorn.access").handlers = []
logging.getLogger("uvicorn.access").propagate = False
def load_app(self) -> Any:
"""Import the app and return it. Exits on failure."""
try:
return import_from_string(self.app)
except ImportFromStringError as exc:
logger.error("Error loading ASGI app. %s" % exc)
sys.exit(1)
def load(self) -> None:
assert not self.loaded
if self.is_ssl:
if self.ssl_context_factory is not None:
def default_factory() -> ssl.SSLContext:
if not self.ssl_certfile:
raise RuntimeError(
"`default_ssl_context_factory()` requires `ssl_certfile` to be set on `Config`. "
"Either pass `ssl_certfile` (and optionally `ssl_keyfile`) or build the `SSLContext` "
"directly inside `ssl_context_factory` without calling the default factory."
)
return create_ssl_context(
keyfile=self.ssl_keyfile,
certfile=self.ssl_certfile,
password=self.ssl_keyfile_password,
ssl_version=self.ssl_version,
cert_reqs=self.ssl_cert_reqs,
ca_certs=self.ssl_ca_certs,
ciphers=self.ssl_ciphers,
)
context = self.ssl_context_factory(self, default_factory)
if not isinstance(context, ssl.SSLContext):
raise TypeError(f"`ssl_context_factory` must return an `ssl.SSLContext`, got {type(context).__name__}")
self.ssl: ssl.SSLContext | None = context
elif self.is_ssl:
assert self.ssl_certfile
alpn_protocols: list[str] | None = None
if self.http2:
alpn_protocols = ["h2", "http/1.1"]
self.ssl: ssl.SSLContext | None = create_ssl_context(
self.ssl = create_ssl_context(
keyfile=self.ssl_keyfile,
certfile=self.ssl_certfile,
password=self.ssl_keyfile_password,
@ -426,7 +453,6 @@ class Config:
cert_reqs=self.ssl_cert_reqs,
ca_certs=self.ssl_ca_certs,
ciphers=self.ssl_ciphers,
alpn_protocols=alpn_protocols,
)
else:
self.ssl = None
@ -444,15 +470,6 @@ class Config:
else:
self.http_protocol_class = self.http
if self.http2 is False:
self.h2_protocol_class: type[HTTP2Protocol] | None = None
elif self.http2 is True:
self.h2_protocol_class = import_from_string("uvicorn.protocols.http.h2_impl:H2Protocol")
elif isinstance(self.http2, str):
self.h2_protocol_class = import_from_string(self.http2)
else:
self.h2_protocol_class = self.http2
if isinstance(self.ws, str):
ws_protocol_class = import_from_string(WS_PROTOCOLS.get(self.ws, self.ws))
self.ws_protocol_class: type[asyncio.Protocol] | None = ws_protocol_class
@ -461,11 +478,7 @@ class Config:
self.lifespan_class = import_from_string(LIFESPAN[self.lifespan])
try:
self.loaded_app = import_from_string(self.app)
except ImportFromStringError as exc:
logger.error("Error loading ASGI app. %s" % exc)
sys.exit(1)
self.loaded_app = self.load_app()
try:
self.loaded_app = self.loaded_app()
@ -524,7 +537,7 @@ class Config:
def bind_socket(self) -> socket.socket:
logger_args: list[str | int]
if self.uds: # pragma: py-win32
if self.uds is not None: # pragma: py-win32
path = self.uds
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
@ -539,7 +552,7 @@ class Config:
sock_name_format = "%s"
color_message = "Uvicorn running on " + click.style(sock_name_format, bold=True) + " (Press CTRL+C to quit)"
logger_args = [self.uds]
elif self.fd: # pragma: py-win32
elif self.fd is not None: # pragma: py-win32
sock = socket.fromfd(self.fd, socket.AF_UNIX, socket.SOCK_STREAM)
message = "Uvicorn running on socket %s (Press CTRL+C to quit)"
fd_name_format = "%s"

View File

@ -9,7 +9,7 @@ import sys
import warnings
from collections.abc import Callable
from configparser import RawConfigParser
from typing import IO, TYPE_CHECKING, Any, get_args
from typing import IO, Any, get_args
import click
@ -31,9 +31,6 @@ from uvicorn.config import (
from uvicorn.server import Server
from uvicorn.supervisors import ChangeReload, Multiprocess
if TYPE_CHECKING:
from uvicorn.protocols.http.h2_impl import HTTP2Protocol
LEVEL_CHOICES = click.Choice(list(LOG_LEVELS.keys()))
LIFESPAN_CHOICES = click.Choice(list(LIFESPAN.keys()))
INTERFACE_CHOICES = click.Choice(INTERFACES)
@ -135,12 +132,6 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
help="HTTP protocol implementation.",
show_default=True,
)
@click.option(
"--http2",
is_flag=True,
default=False,
help="Enable HTTP/2 support.",
)
@click.option(
"--ws",
type=str,
@ -381,6 +372,13 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
default=None,
help="For h11, the maximum number of bytes to buffer of an incomplete event.",
)
@click.option(
"--reset-contextvars",
is_flag=True,
default=False,
help="Run each ASGI request in a fresh contextvars.Context. Hides context set in the lifespan.",
show_default=True,
)
@click.option(
"--factory",
is_flag=True,
@ -396,7 +394,6 @@ def main(
fd: int,
loop: LoopFactoryType | str,
http: HTTPProtocolType | str,
http2: bool,
ws: WSProtocolType | str,
ws_max_size: int,
ws_max_queue: int,
@ -438,6 +435,7 @@ def main(
use_colors: bool,
app_dir: str,
h11_max_incomplete_event_size: int | None,
reset_contextvars: bool,
factory: bool,
) -> None:
run(
@ -448,7 +446,6 @@ def main(
fd=fd,
loop=loop,
http=http,
http2=http2,
ws=ws,
ws_max_size=ws_max_size,
ws_max_queue=ws_max_queue,
@ -491,6 +488,7 @@ def main(
factory=factory,
app_dir=app_dir,
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
reset_contextvars=reset_contextvars,
)
@ -503,7 +501,6 @@ def run(
fd: int | None = None,
loop: LoopFactoryType | str = "auto",
http: type[asyncio.Protocol] | HTTPProtocolType | str = "auto",
http2: bool | type[HTTP2Protocol] | str = False,
ws: type[asyncio.Protocol] | WSProtocolType | str = "auto",
ws_max_size: int = 16777216,
ws_max_queue: int = 32,
@ -541,11 +538,13 @@ def run(
ssl_cert_reqs: int = ssl.CERT_NONE,
ssl_ca_certs: str | os.PathLike[str] | None = None,
ssl_ciphers: str = "TLSv1",
ssl_context_factory: Callable[[Config, Callable[[], ssl.SSLContext]], ssl.SSLContext] | None = None,
headers: list[tuple[str, str]] | None = None,
use_colors: bool | None = None,
app_dir: str | None = None,
factory: bool = False,
h11_max_incomplete_event_size: int | None = None,
reset_contextvars: bool = False,
) -> None:
if app_dir is not None:
sys.path.insert(0, app_dir)
@ -558,7 +557,6 @@ def run(
fd=fd,
loop=loop,
http=http,
http2=http2,
ws=ws,
ws_max_size=ws_max_size,
ws_max_queue=ws_max_queue,
@ -596,18 +594,21 @@ def run(
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ciphers=ssl_ciphers,
ssl_context_factory=ssl_context_factory,
headers=headers,
use_colors=use_colors,
factory=factory,
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
reset_contextvars=reset_contextvars,
)
server = Server(config=config)
if (config.reload or config.workers > 1) and not isinstance(app, str):
logger = logging.getLogger("uvicorn.error")
logger.warning("You must pass the application as an import string to enable 'reload' or 'workers'.")
sys.exit(1)
config.load_app()
server = Server(config=config)
try:
if config.should_reload:
sock = config.bind_socket()
@ -617,8 +618,8 @@ def run(
Multiprocess(config, target=server.run, sockets=[sock]).run()
else:
server.run()
except KeyboardInterrupt:
pass # pragma: full coverage
except KeyboardInterrupt: # pragma: full coverage
pass
finally:
if config.uds and os.path.exists(config.uds):
os.remove(config.uds) # pragma: py-win32

View File

@ -6,7 +6,6 @@ import http
import logging
import sys
from collections.abc import Callable
from ssl import SSLObject
from typing import Any, Literal
from urllib.parse import unquote
@ -63,7 +62,6 @@ class H11Protocol(asyncio.Protocol):
else DEFAULT_MAX_INCOMPLETE_EVENT_SIZE,
)
self.ws_protocol_class = config.ws_protocol_class
self.h2_protocol_class = config.h2_protocol_class
self.root_path = config.root_path
self.limit_concurrency = config.limit_concurrency
self.app_state = app_state
@ -88,8 +86,6 @@ class H11Protocol(asyncio.Protocol):
self.scope: HTTPScope = None # type: ignore[assignment]
self.headers: list[tuple[bytes, bytes]] = None # type: ignore[assignment]
self.cycle: RequestResponseCycle = None # type: ignore[assignment]
# Cached HTTP2-Settings header value when an h2c upgrade has been validated.
self._h2c_settings: bytes | None = None
# Protocol interface
def connection_made( # type: ignore[override]
@ -103,37 +99,10 @@ class H11Protocol(asyncio.Protocol):
self.client = get_remote_addr(transport)
self.scheme = "https" if is_ssl(transport) else "http"
# Check for ALPN negotiation - if h2 was negotiated, switch to HTTP/2 protocol
if self._should_upgrade_to_h2(transport):
self._upgrade_to_h2(transport)
return
if self.logger.level <= TRACE_LOG_LEVEL:
prefix = "%s:%d - " % self.client if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sHTTP connection made", prefix)
def _should_upgrade_to_h2(self, transport: asyncio.Transport) -> bool:
"""Check if the connection should be upgraded to HTTP/2 based on ALPN."""
if self.h2_protocol_class is None:
return False
ssl_object: SSLObject | None = transport.get_extra_info("ssl_object")
selected_protocol = ssl_object and ssl_object.selected_alpn_protocol()
return selected_protocol == "h2"
def _upgrade_to_h2(self, transport: asyncio.Transport) -> None:
"""Upgrade the connection to HTTP/2 protocol."""
assert self.h2_protocol_class is not None
self.connections.discard(self)
h2_protocol = self.h2_protocol_class(
config=self.config,
server_state=self.server_state,
app_state=self.app_state,
_loop=self.loop,
)
transport.set_protocol(h2_protocol)
h2_protocol.connection_made(transport)
def connection_lost(self, exc: Exception | None) -> None:
self.connections.discard(self)
@ -167,34 +136,23 @@ class H11Protocol(asyncio.Protocol):
self.timeout_keep_alive_task.cancel()
self.timeout_keep_alive_task = None
def _get_upgrade(self) -> tuple[bytes | None, list[bytes]]:
connection: list[bytes] = []
def _get_upgrade(self) -> bytes | None:
connection = []
upgrade = None
for name, value in self.headers:
if name == b"connection":
connection.extend(token.lower().strip() for token in value.split(b","))
connection = [token.lower().strip() for token in value.split(b",")]
if name == b"upgrade":
upgrade = value.lower()
if b"upgrade" in connection:
return upgrade, connection
return None, connection
return upgrade
return None
def _should_upgrade_to_ws(self) -> bool:
if self.ws_protocol_class is None:
return False
return True
def _should_upgrade_to_h2c(self) -> bool:
"""Check if h2 support is enabled for h2c upgrade.
h2c is HTTP/2 cleartext only; refuse it on TLS connections so a client
cannot bypass ALPN by sending `Upgrade: h2c` over a session that
negotiated `http/1.1`. HTTP/2 over TLS must come through ALPN.
"""
if self.h2_protocol_class is None:
return False
return not is_ssl(self.transport)
def _unsupported_upgrade_warning(self) -> None:
msg = "Unsupported upgrade request."
self.logger.warning(msg)
@ -202,42 +160,13 @@ class H11Protocol(asyncio.Protocol):
msg = "No supported WebSocket library detected. Please use \"pip install 'uvicorn[standard]'\", or install 'websockets' or 'wsproto' manually." # noqa: E501
self.logger.warning(msg)
def _get_h2c_settings(self, connection_tokens: list[bytes]) -> bytes | None:
# Per RFC 7540 section 3.2, an h2c upgrade requires the `HTTP2-Settings`
# connection-option and exactly one `HTTP2-Settings` header field. Requests
# that carry a body cannot be upgraded because we'd lose the body bytes
# when we hand stream 1 to h2. The actual SETTINGS payload is validated
# later by `H2Protocol.initiate_h2c_upgrade`, which only commits the
# protocol switch if hyper-h2 accepts the payload.
if b"http2-settings" not in connection_tokens:
return None
seen: bytes | None = None
for name, value in self.headers:
if name == b"http2-settings":
if seen is not None or not value:
return None
seen = value
elif name == b"content-length" and value not in (b"", b"0"):
return None
elif name == b"transfer-encoding":
return None
return seen
def _get_upgrade_type(self) -> str | None:
"""Determine the type of upgrade request: 'websocket', 'h2c', or None."""
upgrade, connection_tokens = self._get_upgrade()
def _should_upgrade(self) -> bool:
upgrade = self._get_upgrade()
if upgrade == b"websocket" and self._should_upgrade_to_ws():
return "websocket"
if upgrade == b"h2c" and self._should_upgrade_to_h2c():
settings = self._get_h2c_settings(connection_tokens)
if settings is not None:
self._h2c_settings = settings
return "h2c"
self.logger.warning("Ignoring h2c upgrade with missing or invalid HTTP2-Settings header.")
return None
return True
if upgrade is not None:
self._unsupported_upgrade_warning()
return None
return False
def data_received(self, data: bytes) -> None:
self._unset_keepalive_if_required()
@ -287,13 +216,9 @@ class H11Protocol(asyncio.Protocol):
"headers": self.headers,
"state": self.app_state.copy(),
}
upgrade_type = self._get_upgrade_type()
if upgrade_type == "websocket":
if self._should_upgrade():
self.handle_websocket_upgrade(event)
return
elif upgrade_type == "h2c":
self.handle_h2c_upgrade(event)
return
# Handle 503 responses when 'limit_concurrency' is exceeded.
if self.limit_concurrency is not None and (
@ -325,13 +250,16 @@ class H11Protocol(asyncio.Protocol):
message_event=asyncio.Event(),
on_response=self.on_response_complete,
)
# For the asyncio loop, we need to explicitly start with an empty context
# as it can be polluted from previous ASGI runs.
# See https://github.com/python/cpython/issues/140947 for details.
if sys.version_info >= (3, 11): # pragma: py-lt-311
task = self.loop.create_task(self.cycle.run_asgi(app), context=contextvars.Context())
else: # pragma: py-gte-311
task = contextvars.Context().run(self.loop.create_task, self.cycle.run_asgi(app))
if self.config.reset_contextvars:
# Opt-in workaround for https://github.com/python/cpython/issues/140947:
# asyncio can leak context vars between tasks. Hides context set in the
# lifespan or by external instrumentation.
if sys.version_info >= (3, 11): # pragma: py-lt-311
task = self.loop.create_task(self.cycle.run_asgi(app), context=contextvars.Context())
else: # pragma: py-gte-311
task = contextvars.Context().run(self.loop.create_task, self.cycle.run_asgi(app))
else:
task = self.loop.create_task(self.cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
@ -372,47 +300,6 @@ class H11Protocol(asyncio.Protocol):
protocol.data_received(b"".join(output))
self.transport.set_protocol(protocol)
def handle_h2c_upgrade(self, event: h11.Request) -> None:
"""Handle HTTP/2 cleartext (h2c) upgrade request."""
assert self.h2_protocol_class is not None
assert self._h2c_settings is not None
if self.logger.level <= TRACE_LOG_LEVEL: # pragma: full coverage
prefix = "%s:%d - " % self.client if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sUpgrading to HTTP/2 (h2c)", prefix)
# Bytes the peer already sent after the upgrade request - typically
# the HTTP/2 client preface (`PRI * HTTP/2.0...`) and the first
# frames - need to be forwarded to the new protocol after the switch
# so the connection doesn't stall on lost initial frames.
trailing = self.conn.trailing_data[0]
h2_protocol = self.h2_protocol_class(
config=self.config,
server_state=self.server_state,
app_state=self.app_state,
_loop=self.loop,
)
# Try the upgrade first; only commit `101 Switching Protocols` if h2
# accepts the SETTINGS payload. Otherwise the wire would be left in
# a half-broken state with the peer expecting HTTP/2 and the server
# unable to speak it.
if not h2_protocol.initiate_h2c_upgrade(
self.transport,
event.method.decode("ascii"),
event.target.decode("ascii"),
self.headers,
self._h2c_settings,
):
self.send_400_response("Invalid HTTP2-Settings header for h2c upgrade")
return
self.connections.discard(self)
self.transport.write(b"HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: h2c\r\n\r\n")
self.transport.set_protocol(h2_protocol)
if trailing:
h2_protocol.data_received(trailing)
def send_400_response(self, msg: str) -> None:
reason = STATUS_PHRASES[400]
headers: list[tuple[bytes, bytes]] = [
@ -460,8 +347,6 @@ class H11Protocol(asyncio.Protocol):
self.transport.close()
else:
self.cycle.keep_alive = False
self.cycle.shutting_down = True
self.cycle.message_event.set()
def pause_writing(self) -> None:
"""
@ -515,7 +400,6 @@ class RequestResponseCycle:
self.disconnected = False
self.keep_alive = True
self.waiting_for_100_continue = conn.they_are_waiting_for_100_continue
self.shutting_down = False
# Request state
self.body = bytearray()
@ -548,9 +432,8 @@ class RequestResponseCycle:
self.logger.error(msg)
await self.send_500_response()
elif not self.response_complete and not self.disconnected:
if not self.shutting_down:
msg = "ASGI callable returned without completing response."
self.logger.error(msg)
msg = "ASGI callable returned without completing response."
self.logger.error(msg)
self.transport.close()
finally:
self.on_response = lambda: None
@ -648,12 +531,12 @@ class RequestResponseCycle:
self.transport.write(output)
self.waiting_for_100_continue = False
if not self.disconnected and not self.response_complete and not self.shutting_down:
if not self.disconnected and not self.response_complete:
self.flow.resume_reading()
await self.message_event.wait()
self.message_event.clear()
if self.disconnected or self.response_complete or self.shutting_down:
if self.disconnected or self.response_complete:
return {"type": "http.disconnect"}
message: HTTPRequestEvent = {"type": "http.request", "body": bytes(self.body), "more_body": self.more_body}

File diff suppressed because it is too large Load Diff

View File

@ -10,7 +10,6 @@ import urllib
from asyncio.events import TimerHandle
from collections import deque
from collections.abc import Callable
from ssl import SSLObject
from typing import Any, Literal
import httptools
@ -70,7 +69,6 @@ class HttpToolsProtocol(asyncio.Protocol):
pass
self.ws_protocol_class = config.ws_protocol_class
self.h2_protocol_class = config.h2_protocol_class
self.root_path = config.root_path
self.limit_concurrency = config.limit_concurrency
self.app_state = app_state
@ -97,8 +95,6 @@ class HttpToolsProtocol(asyncio.Protocol):
self.headers: list[tuple[bytes, bytes]] = None # type: ignore[assignment]
self.expect_100_continue = False
self.cycle: RequestResponseCycle = None # type: ignore[assignment]
# Cached HTTP2-Settings header value when an h2c upgrade has been validated.
self._h2c_settings: bytes | None = None
# Protocol interface
def connection_made( # type: ignore[override]
@ -112,37 +108,10 @@ class HttpToolsProtocol(asyncio.Protocol):
self.client = get_remote_addr(transport)
self.scheme = "https" if is_ssl(transport) else "http"
# Check for ALPN negotiation - if h2 was negotiated, switch to HTTP/2 protocol
if self._should_upgrade_to_h2(transport):
self._upgrade_to_h2(transport)
return
if self.logger.level <= TRACE_LOG_LEVEL:
prefix = "%s:%d - " % self.client if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sHTTP connection made", prefix)
def _should_upgrade_to_h2(self, transport: asyncio.Transport) -> bool:
"""Check if the connection should be upgraded to HTTP/2 based on ALPN."""
if self.h2_protocol_class is None:
return False
ssl_object: SSLObject | None = transport.get_extra_info("ssl_object")
selected_protocol = ssl_object and ssl_object.selected_alpn_protocol()
return selected_protocol == "h2"
def _upgrade_to_h2(self, transport: asyncio.Transport) -> None:
"""Upgrade the connection to HTTP/2 protocol."""
assert self.h2_protocol_class is not None
self.connections.discard(self)
h2_protocol = self.h2_protocol_class(
config=self.config,
server_state=self.server_state,
app_state=self.app_state,
_loop=self.loop,
)
transport.set_protocol(h2_protocol)
h2_protocol.connection_made(transport)
def connection_lost(self, exc: Exception | None) -> None:
self.connections.discard(self)
@ -170,79 +139,32 @@ class HttpToolsProtocol(asyncio.Protocol):
self.timeout_keep_alive_task.cancel()
self.timeout_keep_alive_task = None
def _get_upgrade(self) -> tuple[bytes | None, list[bytes]]:
connection: list[bytes] = []
def _get_upgrade(self) -> bytes | None:
connection = []
upgrade = None
for name, value in self.headers:
if name == b"connection":
connection.extend(token.lower().strip() for token in value.split(b","))
connection = [token.lower().strip() for token in value.split(b",")]
if name == b"upgrade":
upgrade = value.lower()
if b"upgrade" in connection:
return upgrade, connection
return None, connection # pragma: full coverage
return upgrade
return None # pragma: full coverage
def _should_upgrade_to_ws(self) -> bool:
if self.ws_protocol_class is None:
return False
return True
def _should_upgrade_to_h2c(self) -> bool:
"""Check if HTTP/2 protocol is available for h2c upgrade.
h2c is HTTP/2 cleartext only; refuse it on TLS connections so a client
cannot bypass ALPN by sending `Upgrade: h2c` over a session that
negotiated `http/1.1`. HTTP/2 over TLS must come through ALPN.
"""
if self.h2_protocol_class is None:
return False
return not is_ssl(self.transport)
def _unsupported_upgrade_warning(self) -> None:
self.logger.warning("Unsupported upgrade request.")
if not self._should_upgrade_to_ws():
msg = "No supported WebSocket library detected. Please use \"pip install 'uvicorn[standard]'\", or install 'websockets' or 'wsproto' manually." # noqa: E501
self.logger.warning(msg)
def _get_h2c_settings(self, connection_tokens: list[bytes]) -> bytes | None:
# Per RFC 7540 section 3.2, an h2c upgrade requires the `HTTP2-Settings`
# connection-option and exactly one `HTTP2-Settings` header field. Requests
# that carry a body cannot be upgraded because we'd lose the body bytes
# when we hand stream 1 to h2. The actual SETTINGS payload is validated
# later by `H2Protocol.initiate_h2c_upgrade`, which only commits the
# protocol switch if hyper-h2 accepts the payload.
if b"http2-settings" not in connection_tokens:
return None
seen: bytes | None = None
for name, value in self.headers:
if name == b"http2-settings":
if seen is not None or not value:
return None
seen = value
elif name == b"content-length" and value not in (b"", b"0"):
return None
elif name == b"transfer-encoding":
return None
return seen
def _get_upgrade_type(self) -> str | None:
"""Determine the type of upgrade request: 'websocket', 'h2c', or None."""
upgrade, connection_tokens = self._get_upgrade()
if upgrade == b"websocket" and self._should_upgrade_to_ws():
return "websocket"
if upgrade == b"h2c" and self._should_upgrade_to_h2c():
settings = self._get_h2c_settings(connection_tokens)
if settings is not None:
self._h2c_settings = settings
return "h2c"
self.logger.warning("Ignoring h2c upgrade with missing or invalid HTTP2-Settings header.")
return None
if upgrade is not None:
self._unsupported_upgrade_warning()
return None
def _should_upgrade(self) -> bool:
return self._get_upgrade_type() is not None
upgrade = self._get_upgrade()
return upgrade == b"websocket" and self._should_upgrade_to_ws()
def data_received(self, data: bytes) -> None:
self._unset_keepalive_if_required()
@ -254,18 +176,11 @@ class HttpToolsProtocol(asyncio.Protocol):
self.logger.warning(msg)
self.send_400_response(msg)
return
except httptools.HttpParserUpgrade as exc:
upgrade_type = self._get_upgrade_type()
# Anything past the offset reported by httptools is data the peer
# has already shipped after the upgrade headers (e.g. the HTTP/2
# client preface arriving in the same TCP packet as the Upgrade
# request). Hand it to the upgraded protocol once we switch.
offset = exc.args[0] if exc.args else len(data)
trailing = data[offset:]
if upgrade_type == "websocket":
except httptools.HttpParserUpgrade:
if self._should_upgrade():
self.handle_websocket_upgrade()
elif upgrade_type == "h2c":
self.handle_h2c_upgrade(trailing)
else:
self._unsupported_upgrade_warning()
def handle_websocket_upgrade(self) -> None:
if self.logger.level <= TRACE_LOG_LEVEL:
@ -287,47 +202,6 @@ class HttpToolsProtocol(asyncio.Protocol):
protocol.data_received(b"".join(output))
self.transport.set_protocol(protocol)
def handle_h2c_upgrade(self, trailing_data: bytes = b"") -> None:
"""Handle HTTP/2 cleartext (h2c) upgrade request.
`trailing_data` carries any bytes the peer sent after the upgrade
request in the same TCP read; they are forwarded to the upgraded
protocol's `data_received` after the switch so HTTP/2 frames sent
immediately after the Upgrade headers are not lost.
"""
assert self.h2_protocol_class is not None
assert self._h2c_settings is not None
if self.logger.level <= TRACE_LOG_LEVEL: # pragma: full coverage
prefix = "%s:%d - " % self.client if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sUpgrading to HTTP/2 (h2c)", prefix)
h2_protocol = self.h2_protocol_class(
config=self.config,
server_state=self.server_state,
app_state=self.app_state,
_loop=self.loop,
)
# Try the upgrade first; only commit `101 Switching Protocols` if h2
# accepts the SETTINGS payload. Otherwise the wire would be left in
# a half-broken state with the peer expecting HTTP/2 and the server
# unable to speak it.
if not h2_protocol.initiate_h2c_upgrade(
self.transport,
self.scope["method"],
self.url.decode("ascii"),
list(self.scope["headers"]),
self._h2c_settings,
):
self.send_400_response("Invalid HTTP2-Settings header for h2c upgrade")
return
self.connections.discard(self)
self.transport.write(b"HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: h2c\r\n\r\n")
self.transport.set_protocol(h2_protocol)
if trailing_data:
h2_protocol.data_received(trailing_data)
def send_400_response(self, msg: str) -> None:
content = [STATUS_LINE[400]]
for name, value in self.server_state.default_headers:
@ -415,20 +289,26 @@ class HttpToolsProtocol(asyncio.Protocol):
)
if existing_cycle is None or existing_cycle.response_complete:
# Standard case - start processing the request.
# For the asyncio loop, we need to explicitly start with an empty context
# as it can be polluted from previous ASGI runs.
# See https://github.com/python/cpython/issues/140947 for details.
if sys.version_info >= (3, 11): # pragma: py-lt-311
task = self.loop.create_task(self.cycle.run_asgi(app), context=contextvars.Context())
else: # pragma: py-gte-311
task = contextvars.Context().run(self.loop.create_task, self.cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
self._start_asgi_task(self.cycle, app)
else:
# Pipelined HTTP requests need to be queued up.
self.flow.pause_reading()
self.pipeline.appendleft((self.cycle, app))
def _start_asgi_task(self, cycle: RequestResponseCycle, app: ASGI3Application) -> None:
if self.config.reset_contextvars:
# Opt-in workaround for https://github.com/python/cpython/issues/140947:
# asyncio can leak context vars between tasks. Hides context set in the
# lifespan or by external instrumentation.
if sys.version_info >= (3, 11): # pragma: py-lt-311
task = self.loop.create_task(cycle.run_asgi(app), context=contextvars.Context())
else: # pragma: py-gte-311
task = contextvars.Context().run(self.loop.create_task, cycle.run_asgi(app))
else:
task = self.loop.create_task(cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
def on_body(self, body: bytes) -> None:
if (self.parser.should_upgrade() and self._should_upgrade()) or self.cycle.response_complete:
return
@ -459,9 +339,7 @@ class HttpToolsProtocol(asyncio.Protocol):
# Keep-Alive timeout instead.
if self.pipeline:
cycle, app = self.pipeline.pop()
task = self.loop.create_task(cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
self._start_asgi_task(cycle, app)
else:
self.timeout_keep_alive_task = self.loop.call_later(
self.timeout_keep_alive, self.timeout_keep_alive_handler
@ -475,8 +353,6 @@ class HttpToolsProtocol(asyncio.Protocol):
self.transport.close()
else:
self.cycle.keep_alive = False
self.cycle.shutting_down = True
self.cycle.message_event.set()
def pause_writing(self) -> None:
"""
@ -528,7 +404,6 @@ class RequestResponseCycle:
self.disconnected = False
self.keep_alive = keep_alive
self.waiting_for_100_continue = expect_100_continue
self.shutting_down = False
# Request state
self.body = bytearray()
@ -563,9 +438,8 @@ class RequestResponseCycle:
self.logger.error(msg)
await self.send_500_response()
elif not self.response_complete and not self.disconnected:
if not self.shutting_down:
msg = "ASGI callable returned without completing response."
self.logger.error(msg)
msg = "ASGI callable returned without completing response."
self.logger.error(msg)
self.transport.close()
finally:
self.on_response = lambda: None
@ -690,12 +564,12 @@ class RequestResponseCycle:
self.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n")
self.waiting_for_100_continue = False
if not self.disconnected and not self.response_complete and not self.shutting_down:
if not self.disconnected and not self.response_complete:
self.flow.resume_reading()
await self.message_event.wait()
self.message_event.clear()
if self.disconnected or self.response_complete or self.shutting_down:
if self.disconnected or self.response_complete:
return {"type": "http.disconnect"}
message: HTTPRequestEvent = {"type": "http.request", "body": bytes(self.body), "more_body": self.more_body}
self.body = bytearray()

View File

@ -105,7 +105,7 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
self.last_ping_rtt: float = 0.0
# Buffers
self.bytes = b""
self.bytes = bytearray()
def connection_made(self, transport: BaseTransport) -> None:
"""Called when a connection is made."""
@ -216,19 +216,19 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
task.add_done_callback(self.on_task_complete)
self.tasks.add(task)
def handle_cont(self, event: Frame) -> None: # pragma: no cover
self.bytes += event.data
def handle_cont(self, event: Frame) -> None:
self.bytes.extend(event.data)
if event.fin:
self.send_receive_event_to_app()
def handle_text(self, event: Frame) -> None:
self.bytes = event.data
self.bytes = bytearray(event.data)
self.curr_msg_data_type: Literal["text", "bytes"] = "text"
if event.fin:
self.send_receive_event_to_app()
def handle_bytes(self, event: Frame) -> None:
self.bytes = event.data
self.bytes = bytearray(event.data)
self.curr_msg_data_type = "bytes"
if event.fin:
self.send_receive_event_to_app()
@ -243,7 +243,7 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
self.handle_parser_exception()
return
else:
self.queue.put_nowait({"type": "websocket.receive", "bytes": self.bytes})
self.queue.put_nowait({"type": "websocket.receive", "bytes": bytes(self.bytes)})
if not self.read_paused:
self.read_paused = True
self.transport.pause_reading()

View File

@ -2,6 +2,10 @@ from __future__ import annotations
import asyncio
import logging
import random
import struct
from asyncio import TimerHandle
from io import BytesIO, StringIO
from typing import Any, Literal, cast
from urllib.parse import unquote
@ -11,12 +15,7 @@ from wsproto.connection import ConnectionState
from wsproto.extensions import Extension, PerMessageDeflate
from wsproto.utilities import LocalProtocolError, RemoteProtocolError
from uvicorn._types import (
ASGI3Application,
ASGISendEvent,
WebSocketEvent,
WebSocketScope,
)
from uvicorn._types import ASGI3Application, ASGISendEvent, WebSocketEvent, WebSocketReceiveEvent, WebSocketScope
from uvicorn.config import Config
from uvicorn.logging import TRACE_LOG_LEVEL
from uvicorn.protocols.utils import (
@ -30,6 +29,36 @@ from uvicorn.protocols.utils import (
from uvicorn.server import ServerState
class FrameTooLargeError(Exception):
"""Raised when accumulated websocket message bytes exceed `ws_max_size`."""
class WebsocketBuffer:
def __init__(self, max_length: int) -> None:
self.value: BytesIO | StringIO | None = None
self.length = 0
self.max_length = max_length
def extend(self, event: events.TextMessage | events.BytesMessage) -> None:
if self.value is None:
self.value = StringIO() if isinstance(event, events.TextMessage) else BytesIO()
self.value.write(event.data) # type: ignore[arg-type]
# `ws_max_size` is a byte budget, so count UTF-8 bytes for text.
self.length += len(event.data.encode()) if isinstance(event, events.TextMessage) else len(event.data)
if self.length > self.max_length:
raise FrameTooLargeError
def clear(self) -> None:
self.value = None
self.length = 0
def to_message(self) -> WebSocketReceiveEvent:
if isinstance(self.value, StringIO):
return {"type": "websocket.receive", "text": self.value.getvalue()}
assert isinstance(self.value, BytesIO)
return {"type": "websocket.receive", "bytes": self.value.getvalue()}
class WSProtocol(asyncio.Protocol):
def __init__(
self,
@ -73,15 +102,21 @@ class WSProtocol(asyncio.Protocol):
self.writable = asyncio.Event()
self.writable.set()
# Buffers
self.bytes = b""
self.text = ""
# Keepalive state
self.ping_interval = config.ws_ping_interval
self.ping_timeout = config.ws_ping_timeout
self.ping_timer: TimerHandle | None = None
self.pong_timer: TimerHandle | None = None
self.pending_ping_payload: bytes | None = None
self.ping_sent_at: float = 0.0
self.last_ping_rtt: float = 0.0
# Buffer
self.buffer = WebsocketBuffer(self.config.ws_max_size)
# Protocol interface
def connection_made( # type: ignore[override]
self, transport: asyncio.Transport
) -> None:
def connection_made(self, transport: asyncio.Transport) -> None: # type: ignore[override]
self.connections.add(self)
self.transport = transport
self.server = get_local_addr(transport)
@ -93,6 +128,7 @@ class WSProtocol(asyncio.Protocol):
self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection made", prefix)
def connection_lost(self, exc: Exception | None) -> None:
self.stop_keepalive()
code = 1005 if self.handshake_complete else 1006
self.queue.put_nowait({"type": "websocket.disconnect", "code": code})
self.connections.remove(self)
@ -120,16 +156,18 @@ class WSProtocol(asyncio.Protocol):
def handle_events(self) -> None:
for event in self.conn.events():
if self.close_sent:
return
if isinstance(event, events.Request):
self.handle_connect(event)
elif isinstance(event, events.TextMessage):
self.handle_text(event)
elif isinstance(event, events.BytesMessage):
self.handle_bytes(event)
elif isinstance(event, (events.TextMessage, events.BytesMessage)):
self.handle_message(event)
elif isinstance(event, events.CloseConnection):
self.handle_close(event)
elif isinstance(event, events.Ping):
self.handle_ping(event)
elif isinstance(event, events.Pong):
self.handle_pong(event)
def pause_writing(self) -> None:
"""
@ -144,6 +182,7 @@ class WSProtocol(asyncio.Protocol):
self.writable.set() # pragma: full coverage
def shutdown(self) -> None:
self.stop_keepalive()
if self.handshake_complete:
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1012})
output = self.conn.send(wsproto.events.CloseConnection(code=1012))
@ -185,21 +224,20 @@ class WSProtocol(asyncio.Protocol):
task.add_done_callback(self.on_task_complete)
self.tasks.add(task)
def handle_text(self, event: events.TextMessage) -> None:
self.text += event.data
def handle_message(self, event: events.TextMessage | events.BytesMessage) -> None:
try:
self.buffer.extend(event)
except FrameTooLargeError:
self.close_sent = True
reason = f"Message exceeds the maximum size ({self.config.ws_max_size} bytes)"
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1009, "reason": reason})
if not self.transport.is_closing():
self.transport.write(self.conn.send(wsproto.events.CloseConnection(code=1009, reason=reason)))
self.transport.close()
return
if event.message_finished:
self.queue.put_nowait({"type": "websocket.receive", "text": self.text})
self.text = ""
if not self.read_paused:
self.read_paused = True
self.transport.pause_reading()
def handle_bytes(self, event: events.BytesMessage) -> None:
self.bytes += event.data
# todo: we may want to guard the size of self.bytes and self.text
if event.message_finished:
self.queue.put_nowait({"type": "websocket.receive", "bytes": self.bytes})
self.bytes = b""
self.queue.put_nowait(self.buffer.to_message())
self.buffer.clear()
if not self.read_paused:
self.read_paused = True
self.transport.pause_reading()
@ -213,6 +251,65 @@ class WSProtocol(asyncio.Protocol):
def handle_ping(self, event: events.Ping) -> None:
self.transport.write(self.conn.send(event.response()))
def handle_pong(self, event: events.Pong) -> None:
# Ignore unsolicited pongs and stale pongs whose payload doesn't match the ping currently in flight.
if self.pending_ping_payload is None or bytes(event.payload) != self.pending_ping_payload:
return # pragma: no cover
self.last_ping_rtt = self.loop.time() - self.ping_sent_at
self.pending_ping_payload = None
# The peer answered in time; cancel the pong deadline and chain the next ping. This `schedule_ping()` call is
# what keeps the keepalive loop running when ping_timeout is set. When ping_timeout is None the next ping is
# already scheduled by `send_keepalive_ping`, so we must not schedule a duplicate here.
if self.pong_timer is not None:
self.pong_timer.cancel()
self.pong_timer = None
self.schedule_ping()
def start_keepalive(self) -> None:
if self.ping_interval is not None and self.ping_interval > 0:
self.schedule_ping()
def stop_keepalive(self) -> None:
if self.ping_timer is not None:
self.ping_timer.cancel()
self.ping_timer = None
if self.pong_timer is not None: # pragma: no cover
self.pong_timer.cancel()
self.pong_timer = None
self.pending_ping_payload = None
def schedule_ping(self) -> None:
assert self.ping_interval is not None
delay = max(0.0, self.ping_interval - self.last_ping_rtt)
self.ping_timer = self.loop.call_later(delay, self.send_keepalive_ping)
def send_keepalive_ping(self) -> None:
self.ping_timer = None
if self.close_sent or self.transport.is_closing(): # pragma: no cover
return
# Random 4-byte payload identifies this ping; `handle_pong` uses it to ignore stale or unsolicited pongs.
self.pending_ping_payload = struct.pack("!I", random.getrandbits(32))
self.ping_sent_at = self.loop.time()
self.transport.write(self.conn.send(wsproto.events.Ping(payload=self.pending_ping_payload)))
if self.ping_timeout is not None:
self.pong_timer = self.loop.call_later(self.ping_timeout, self.keepalive_timeout)
else: # pragma: no cover
self.schedule_ping()
def keepalive_timeout(self) -> None:
self.pong_timer = None
self.pending_ping_payload = None
if self.close_sent or self.transport.is_closing(): # pragma: no cover
return
if self.logger.level <= TRACE_LOG_LEVEL:
prefix = "%s:%d - " % self.client if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket keepalive ping timeout", prefix)
reason = "keepalive ping timeout"
self.transport.write(self.conn.send(wsproto.events.CloseConnection(code=1011, reason=reason)))
self.close_sent = True
self.transport.close()
def send_500_response(self) -> None:
if self.response_started or self.handshake_complete:
return # we cannot send responses anymore
@ -266,6 +363,7 @@ class WSProtocol(asyncio.Protocol):
)
)
self.transport.write(output)
self.start_keepalive()
elif message["type"] == "websocket.close":
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1006})

View File

@ -23,16 +23,13 @@ from uvicorn._compat import asyncio_run
from uvicorn.config import Config
if TYPE_CHECKING:
from uvicorn.protocols.http.h2_impl import H2Protocol
from uvicorn.protocols.http.h11_impl import H11Protocol
from uvicorn.protocols.http.httptools_impl import HttpToolsProtocol
from uvicorn.protocols.websockets.websockets_impl import WebSocketProtocol
from uvicorn.protocols.websockets.websockets_sansio_impl import WebSocketsSansIOProtocol
from uvicorn.protocols.websockets.wsproto_impl import WSProtocol
Protocols: TypeAlias = (
H11Protocol | H2Protocol | HttpToolsProtocol | WSProtocol | WebSocketProtocol | WebSocketsSansIOProtocol
)
Protocols: TypeAlias = H11Protocol | HttpToolsProtocol | WSProtocol | WebSocketProtocol | WebSocketsSansIOProtocol
HANDLED_SIGNALS = (
signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.