Compare commits

...

14 Commits
0.45.0 ... main

Author SHA1 Message Date
Marcelo Trylesinski
479a2c0c89
Version 0.47.0 (#2937) 2026-05-14 06:20:53 -07:00
Marcelo Trylesinski
89347fd166
Add 7-day cooldown for dependency resolution via uv exclude-newer (#2936) 2026-05-12 15:48:51 +00:00
Marcelo Trylesinski
767315b38a
Drop unused contents/actions permissions from zizmor workflow (#2935) 2026-05-12 15:08:08 +02:00
dependabot[bot]
f25ee43e68
chore(deps): bump urllib3 from 2.6.3 to 2.7.0 (#2933) 2026-05-12 07:58:53 +02:00
Stefan Wójcik
8782666189
Fix typo in docs/deployment/index.md. (#2932) 2026-05-09 19:12:56 +02:00
Eugene Toder
ad5ff87c86
Treat fd=0 as a valid file descriptor with reload/workers (#2927)
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
2026-04-30 20:26:13 +02:00
Marcelo Trylesinski
6761b2c8f9
Remove Hugging Face sponsor block from docs (#2923) 2026-04-28 17:24:46 +02:00
Marcelo Trylesinski
438f64834d
Surface sponsors on welcome page and sidebar (#2921) 2026-04-28 10:14:03 +02:00
Marcelo Trylesinski
10ddc6dd29
Add ssl_context_factory for custom SSLContext configuration (#2920) 2026-04-28 06:24:24 +00:00
Marcelo Trylesinski
b499bc4510
Eagerly import the ASGI app in the parent process (#2919) 2026-04-27 23:56:45 +02:00
Marcelo Trylesinski
b224045f59
Version 0.46.0 (#2918) 2026-04-23 06:33:22 +00:00
Marcelo Trylesinski
7375b5bf66
Use bytearray for incoming WebSocket message buffer in websockets-sansio (#2917) 2026-04-22 20:11:28 +00:00
Marcelo Trylesinski
d438fb16fe
Support ws_ping_interval and ws_ping_timeout in wsproto implementation (#2916) 2026-04-22 18:33:16 +00:00
Marcelo Trylesinski
3e6b964466
Support ws_max_size in wsproto implementation (#2915) 2026-04-22 19:17:00 +02:00
22 changed files with 628 additions and 265 deletions

View File

@ -14,8 +14,6 @@ jobs:
permissions:
security-events: write # Required for upload-sarif (used by zizmor-action) to upload SARIF files.
contents: read # Only needed for private repos. Needed to clone the repo.
actions: read # Only needed for private repos. Needed for upload-sarif to read workflow run info.
steps:
- name: Checkout repository

57
docs/css/extra.css Normal file
View File

@ -0,0 +1,57 @@
.md-nav__sponsors {
display: flex;
flex-direction: column;
align-items: center;
gap: 0.5rem;
margin: 1.2rem 0.4rem 0.6rem;
padding: 0.9rem 0.6rem 0.8rem;
background-color: color-mix(in srgb, var(--md-primary-fg-color) 8%, transparent);
border-radius: 0.4rem;
}
.md-nav__sponsors-title {
margin: 0 0 0.1rem;
font-size: 0.6rem;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.05em;
color: var(--md-default-fg-color--light);
}
.md-nav__sponsor {
display: flex;
align-items: center;
justify-content: center;
width: 100%;
padding: 0.25rem;
border-radius: 0.2rem;
transition: opacity 0.15s;
}
.md-nav__sponsor:hover {
opacity: 0.75;
}
.md-nav__sponsor img {
max-width: 100%;
max-height: 1.6rem;
object-fit: contain;
}
.md-nav__sponsor-cta {
display: inline-block;
margin-top: 0.15rem;
padding: 0.25rem 0.6rem;
font-size: 0.65rem;
font-weight: 600;
color: var(--md-primary-bg-color);
background-color: var(--md-primary-fg-color);
border-radius: 0.2rem;
text-decoration: none;
transition: opacity 0.15s;
}
.md-nav__sponsor-cta:hover {
opacity: 0.85;
color: var(--md-primary-bg-color);
}

View File

@ -82,7 +82,7 @@ The default process manager monitors the status of child processes and automatic
You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.)
- `SIGHUP`: Work processeses are graceful restarted one after another. If you update the code, the new worker process will use the new code.
- `SIGHUP`: Work processes are graceful restarted one after another. If you update the code, the new worker process will use the new code.
- `SIGTTIN`: Increase the number of worker processes by one.
- `SIGTTOU`: Decrease the number of worker processes by one.
@ -225,6 +225,36 @@ It's also possible to use certificates with uvicorn's worker for gunicorn.
$ gunicorn --keyfile=./key.pem --certfile=./cert.pem -k uvicorn.workers.UvicornWorker main:app
```
### Customizing the SSL context
For TLS scenarios that the `--ssl-*` flags don't cover (e.g., mutual TLS, custom `SSLContext.options`, bumping `minimum_version`, loading certificates from memory), pass an `ssl_context_factory` to `uvicorn.run()` or `Config`.
The factory receives the `Config` instance and a `default_ssl_context_factory` callable that builds the standard context from the `ssl_*` settings on `Config`. Use it to start from uvicorn's default and mutate it, or ignore it and build your own context from scratch - the `ssl_*` settings are only consumed by the default factory, so if you don't call it they're effectively unused.
```python
import ssl
from collections.abc import Callable
import uvicorn
from uvicorn.config import Config
def ssl_context_factory(config: Config, default_ssl_context_factory: Callable[[], ssl.SSLContext]) -> ssl.SSLContext:
context = default_ssl_context_factory()
context.minimum_version = ssl.TLSVersion.TLSv1_3
return context
uvicorn.run(
"main:app",
ssl_keyfile="key.pem",
ssl_certfile="cert.pem",
ssl_context_factory=ssl_context_factory,
)
```
The factory is called inside each worker process, so it works with `--reload` and `--workers > 1`. The factory itself must be picklable in those modes (a top-level function is fine; lambdas and local closures are not). The `ssl_*` settings on `Config` are only consumed by `default_ssl_context_factory()`; if you build the context yourself without calling it, those settings are ignored.
## Proxies and Forwarded Headers
When running an application behind one or more proxies, certain information about the request is lost.

BIN
docs/img/fastapi-logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

View File

@ -44,6 +44,18 @@ and means we're now able to start building a common set of tooling usable across
Uvicorn currently supports **HTTP/1.1** and **WebSockets**.
## Sponsorship
Help us keep Uvicorn maintained and sustainable by [becoming a sponsor](https://github.com/sponsors/Kludex).
**Current sponsors:**
<div style="display: flex; flex-wrap: wrap; gap: 2rem; align-items: center; margin: 1rem 0;">
<a href="https://fastapi.tiangolo.com">
<img src="img/fastapi-logo.png" alt="FastAPI" style="height: 80px;">
</a>
</div>
## Quickstart
**Uvicorn** is available on [PyPI](https://pypi.org/project/uvicorn/) so installation is as simple as:

View File

@ -44,4 +44,15 @@
{{ item.render(nav_item, path, 1) }}
{% endfor %}
</ul>
<!-- Sponsors -->
<div class="md-nav__sponsors">
<p class="md-nav__sponsors-title">Sponsors</p>
<a href="https://fastapi.tiangolo.com" title="FastAPI" class="md-nav__sponsor">
<img src="{{ 'img/fastapi-logo.png' | url }}" alt="FastAPI">
</a>
<a href="https://github.com/sponsors/Kludex" class="md-nav__sponsor-cta">
Become a sponsor! ❤️
</a>
</div>
</nav>

View File

@ -2,6 +2,31 @@
toc_depth: 2
---
## 0.47.0 (May 14, 2026)
### Added
* Add `ssl_context_factory` for custom `SSLContext` configuration (#2920)
### Changed
* Eagerly import the ASGI app in the parent process (#2919)
### Fixed
* Treat `fd=0` as a valid file descriptor with reload/workers (#2927)
## 0.46.0 (April 23, 2026)
### Added
* Support `ws_max_size` in `wsproto` implementation (#2915)
* Support `ws_ping_interval` and `ws_ping_timeout` in `wsproto` implementation (#2916)
### Changed
* Use `bytearray` for incoming WebSocket message buffer in `websockets-sansio` (#2917)
## 0.45.0 (April 21, 2026)
### Added

View File

@ -94,10 +94,10 @@ Using Uvicorn with watchfiles will enable the following options (which are other
* `--loop <str>` - Set the event loop implementation. The uvloop implementation provides greater performance, but is not compatible with Windows or PyPy. **Options:** *'auto', 'asyncio', 'uvloop'.* **Default:** *'auto'*.
* `--http <str>` - Set the HTTP protocol implementation. The httptools implementation provides greater performance, but it not compatible with PyPy. **Options:** *'auto', 'h11', 'httptools'.* **Default:** *'auto'*.
* `--ws <str>` - Set the WebSockets protocol implementation. Either of the `websockets` and `wsproto` packages are supported. There are two versions of `websockets` supported: `websockets` and `websockets-sansio`. Use `'none'` to ignore all websocket requests. **Options:** *'auto', 'none', 'websockets', 'websockets-sansio', 'wsproto'.* **Default:** *'auto'*.
* `--ws-max-size <int>` - Set the WebSockets max message size, in bytes. Only available with the `websockets` protocol. **Default:** *16777216* (16 MB).
* `--ws-max-size <int>` - Set the WebSockets max message size, in bytes. **Default:** *16777216* (16 MB).
* `--ws-max-queue <int>` - Set the maximum length of the WebSocket incoming message queue. Only available with the `websockets` protocol. **Default:** *32*.
* `--ws-ping-interval <float>` - Set the WebSockets ping interval, in seconds. Available with the `websockets` and `websockets-sansio` protocols. **Default:** *20.0*.
* `--ws-ping-timeout <float>` - Set the WebSockets ping timeout, in seconds. Available with the `websockets` and `websockets-sansio` protocols. **Default:** *20.0*.
* `--ws-ping-interval <float>` - Set the WebSockets ping interval, in seconds. **Default:** *20.0*.
* `--ws-ping-timeout <float>` - Set the WebSockets ping timeout, in seconds. **Default:** *20.0*.
* `--ws-per-message-deflate <bool>` - Enable/disable WebSocket per-message-deflate compression. Only available with the `websockets` protocol. **Default:** *True*.
* `--lifespan <str>` - Set the Lifespan protocol implementation. **Options:** *'auto', 'on', 'off'.* **Default:** *'auto'*.
* `--h11-max-incomplete-event-size <int>` - Set the maximum number of bytes to buffer of an incomplete event. Only available for `h11` HTTP protocol implementation. **Default:** *16384* (16 KB).
@ -138,6 +138,8 @@ The [SSL context](https://docs.python.org/3/library/ssl.html#ssl.SSLContext) can
To understand more about the SSL context options, please refer to the [Python documentation](https://docs.python.org/3/library/ssl.html).
For advanced TLS scenarios that the flags above don't cover (e.g., mutual TLS, certificate pinning, custom `SSLContext.options`), pass an `ssl_context_factory` to `uvicorn.run()` or `Config`. See [Running with HTTPS](deployment/index.md#customizing-the-ssl-context) for details.
## Resource Limits
* `--limit-concurrency <int>` - Maximum number of concurrent connections or tasks to allow, before issuing HTTP 503 responses. Useful for ensuring known memory usage patterns even under over-resourced loads.

View File

@ -1,184 +0,0 @@
# ✨ Sponsor Starlette & Uvicorn ✨
Thank you for your interest in sponsoring Starlette and Uvicorn! ❤️
Your support *directly* contributes to the ongoing development, maintenance, and long-term sustainability of both projects.
<div style="display: flex; justify-content: center; gap: 4rem; margin: 2rem 0; text-align: center;">
<div style="padding: 1rem;">
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">67M+</h3>
<p>Starlette Downloads/Month</p>
</div>
<div style="padding: 1rem;">
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">57M+</h3>
<p>Uvicorn Downloads/Month</p>
</div>
<div style="padding: 1rem;">
<h3 style="color: #6e5494; font-size: 2em; margin-bottom: 0.5rem;">19K+</h3>
<p>Combined GitHub Stars</p>
</div>
</div>
## Why Sponsor?
While Starlette and Uvicorn are part of the [Encode](https://github.com/encode) organization,
they have been primarily maintained by [**Marcelo Trylesinski (Kludex)**](https://github.com/Kludex)
for the past several years. His dedication and consistent work have been instrumental in keeping
these projects robust, secure, and up-to-date.
This sponsorship page was created to give the community an opportunity to support Marcelo's continued
efforts in maintaining and improving both projects. Your sponsorship directly enables him to
dedicate more time and resources to maintaining and improving these essential tools:
- [x] **Active Development:** Developing new features, enhancing existing ones, and
keeping both projects aligned with the latest developments in the Python and ASGI ecosystems. 💻
- [x] **Community Support:** Providing better support, addressing user issues,
and cultivating a welcoming environment for contributors. 🤝
- [x] **Long-Term Stability:** Ensuring the long-term viability of both projects through strategic
planning and addressing technical debt. 🌳
- [x] **Bug Fixes & Maintenance:** Providing prompt attention to bug reports and
general maintenance to keep the projects reliable. 🔨
- [x] **Security:** Ensuring robust security practices, conducting regular security audits, and
promptly addressing vulnerabilities to protect millions of production deployments. 🔒
- [x] **Documentation:** Creating comprehensive guides, tutorials, and examples to help users of all skill levels. 📖
## How Sponsorship Works
We currently manage sponsorships *exclusively* through **GitHub Sponsors**. This platform integrates seamlessly with the GitHub ecosystem, making it easy for organizations to contribute.
<div style="text-align: center; padding: 2rem; margin: 2rem 0; background: linear-gradient(135deg, #6e5494, #24292e); border-radius: 10px; color: white;">
<h2 style="color: white; margin-bottom: 1rem;">🌟 Become a Sponsor Today! 🌟</h2>
<p style="margin-bottom: 1.5rem; font-size: 1.1em;">Your support helps keep Starlette and Uvicorn growing stronger!</p>
<a href="https://github.com/sponsors/Kludex"
style="display: inline-block; padding: 1rem 2rem; background-color: #238636; color: white; text-decoration: none; border-radius: 6px; font-size: 1.2em; font-weight: bold; transition: all 0.3s ease-in-out;"
onmouseover="this.style.backgroundColor='#2ea043';this.style.transform='translateY(-2px)'"
onmouseout="this.style.backgroundColor='#238636';this.style.transform='translateY(0)'">
❤️ Sponsor on GitHub
</a>
</div>
## Sponsorship Tiers 🎁
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 1.5rem; margin: 2rem 0;">
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; display: flex; flex-direction: column;">
<h3 style="color: #cd7f32;">🥉 Bronze Sponsor</h3>
<div style="font-size: 1.5em; margin: 1rem 0;">$100<span style="font-size: 0.6em;">/month</span></div>
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
<li>✓ Company name on Sponsors page</li>
<li>✓ Small logo with link</li>
<li>✓ Our eternal gratitude</li>
</ul>
<div style="text-align: center; margin-top: auto;">
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #cd7f32; color: white; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
Become a Bronze Sponsor
</a>
</div>
</div>
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; display: flex; flex-direction: column;">
<h3 style="color: #c0c0c0;">🥈 Silver Sponsor</h3>
<div style="font-size: 1.5em; margin: 1rem 0;">$250<span style="font-size: 0.6em;">/month</span></div>
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
<li>✓ All Bronze benefits</li>
<li>✓ Medium-sized logo</li>
<li>✓ Release notes mention</li>
</ul>
<div style="text-align: center; margin-top: auto;">
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #c0c0c0; color: white; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
Become a Silver Sponsor
</a>
</div>
</div>
<div style="padding: 1.5rem; border: 1px solid #e1e4e8; border-radius: 6px; background: #fff; position: relative; overflow: hidden; display: flex; flex-direction: column;">
<div style="position: absolute; top: 10px; right: -25px; background: #238636; color: white; padding: 5px 30px; transform: rotate(45deg);">
Popular
</div>
<h3 style="color: #ffd700;">🥇 Gold Sponsor</h3>
<div style="font-size: 1.5em; margin: 1rem 0;">$500<span style="font-size: 0.6em;">/month</span></div>
<ul style="list-style: none; padding: 0; margin-bottom: 1rem; min-height: 90px;">
<li>✓ All Silver benefits</li>
<li>✓ Large logo on main pages</li>
<li>✓ Priority support</li>
</ul>
<div style="text-align: center; margin-top: auto;">
<a href="https://github.com/sponsors/Kludex" style="display: inline-block; padding: 0.5rem 1rem; background-color: #ffd700; color: black; text-decoration: none; border-radius: 6px; font-weight: bold; transition: opacity 0.2s;" onmouseover="this.style.opacity='0.8'" onmouseout="this.style.opacity='1'">
Become a Gold Sponsor
</a>
</div>
</div>
</div>
<div style="text-align: center; margin: 2rem 0;">
<h3>🤝 Custom Sponsor</h3>
<p>Looking for something different? <a href="mailto:marcelotryle@gmail.com">Contact us</a> to discuss custom sponsorship options!</p>
</div>
## Current Sponsors
**Thank you to our generous sponsors!** 🙏
<div style="display: flex; flex-direction: column; gap: 3rem; margin: 2rem 0;">
<div>
<h3 style="text-align: center; color: #ffd700; margin-bottom: 1.5rem;">🏆 Gold Sponsors</h3>
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
<a href="https://fastapi.tiangolo.com" style="text-decoration: none;">
<div style="width: 200px; background: #f6f8fa; border-radius: 8px; padding: 1rem; text-align: center;">
<div style="height: 100px; display: flex; align-items: center; justify-content: center; margin-bottom: 0.75rem;">
<img src="https://fastapi.tiangolo.com/img/logo-margin/logo-teal.png" alt="FastAPI" style="max-width: 100%; max-height: 100%; object-fit: contain;">
</div>
<p style="margin: 0; color: #57606a; font-size: 0.9em;">Modern, fast web framework for building APIs with Python 3.8+</p>
</div>
</a>
</div>
</div>
<div>
<h3 style="text-align: center; color: #c0c0c0; margin-bottom: 1.5rem;">🥈 Silver Sponsors</h3>
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
<!-- Add Silver Sponsors here -->
</div>
</div>
<div>
<h3 style="text-align: center; color: #cd7f32; margin-bottom: 1.5rem;">🥉 Bronze Sponsors</h3>
<div style="display: flex; flex-wrap: wrap; justify-content: center; gap: 2rem; align-items: center;">
<!-- Add Bronze Sponsors here -->
</div>
</div>
</div>
## Alternative Sponsorship Platforms
<div style="background: #f6f8fa; padding: 1.5rem; border-radius: 8px; margin: 2rem 0;">
<h3>📢 We Want Your Input!</h3>
<p>We are currently evaluating whether to expand our sponsorship options beyond GitHub Sponsors. If your company would be interested in sponsoring Starlette and Uvicorn but prefers to use a different platform (e.g., Open Collective, direct invoicing), please let us know!</p>
<p>Your feedback is invaluable in helping us make sponsorship as accessible as possible. Share your thoughts by:</p>
<ul>
<li>Opening a discussion on our <a href="https://github.com/Kludex/starlette/discussions">GitHub repository</a></li>
<li>Contacting us directly at <a href="mailto:marcelotryle@gmail.com">marcelotryle@gmail.com</a></li>
</ul>
</div>
<a id="acknowledgments"></a>
## Community & Future Plans 🌟
We want to express our deepest gratitude to all the contributors who have helped shape Starlette and
Uvicorn over the years. These projects wouldn't be what they are today without the incredible work of
every single contributor.
Special thanks to some of our most impactful contributors:
- **Tom Christie** ([@tomchristie](https://github.com/tomchristie)) - The original creator of Starlette and Uvicorn.
- **Adrian Garcia Badaracco** ([@adriangb](https://github.com/adriangb)) - Major contributor to Starlette.
- **Thomas Grainger** ([@graingert](https://github.com/graingert)) - Major contributor to AnyIO, and significant contributions to Starlette and Uvicorn.
- **Alex Grönholm** ([@agronholm](https://github.com/agronholm)) - Creator of AnyIO.
- **Florimond Manca** ([@florimondmanca](https://github.com/florimondmanca)) - Important contributions to Starlette and Uvicorn.
If you want your name removed from the list above, or if I forgot a significant contributor, please let me know.
You can view all contributors on GitHub:
[Starlette Contributors](https://github.com/Kludex/starlette/graphs/contributors) / [Uvicorn Contributors](https://github.com/Kludex/uvicorn/graphs/contributors).
While the current sponsorship program directly supports Marcelo's maintenance work, we are exploring ways
to distribute funding to other key contributors in the future. This initiative is still in early planning
stages, as we want to ensure a fair and sustainable model that recognizes the valuable contributions of
our community members.

View File

@ -62,7 +62,6 @@ nav:
- Docker: deployment/docker.md
- Release Notes: release-notes.md
- Contributing: contributing.md
- Sponsorship: sponsorship.md
extra:
analytics:
@ -80,6 +79,9 @@ extra:
- icon: fontawesome/solid/globe
link: https://fastapiexpert.com
extra_css:
- css/extra.css
markdown_extensions:
- attr_list
- admonition

View File

@ -82,7 +82,8 @@ docs = [
[tool.uv]
default-groups = ["dev", "docs"]
required-version = ">=0.8.6"
required-version = ">=0.9.17"
exclude-newer = "7 days"
[project.scripts]
uvicorn = "uvicorn.main:main"

View File

@ -10,6 +10,7 @@ import websockets
import websockets.client
import websockets.exceptions
from websockets.extensions.permessage_deflate import ClientPerMessageDeflateFactory
from websockets.frames import Opcode
from websockets.typing import Subprotocol
from tests.response import Response
@ -43,6 +44,7 @@ if TYPE_CHECKING:
HTTPProtocol: TypeAlias = "type[H11Protocol | HttpToolsProtocol]"
WSProtocol: TypeAlias = "type[_WSProtocol | WebSocketProtocol]"
KeepaliveWSProtocol: TypeAlias = "type[_WSProtocol | WebSocketsSansIOProtocol]"
pytestmark = pytest.mark.anyio
@ -751,6 +753,61 @@ async def test_send_binary_data_to_server_bigger_than_default_on_websockets(
assert ws.close_code == expected_result
async def test_fragmented_message_exceeding_max_size(
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
"""Stream non-FIN fragments past `ws_max_size` - the server must close with 1009."""
class App(WebSocketResponse):
async def websocket_connect(self, message: WebSocketConnectEvent):
await self.send({"type": "websocket.accept"})
config = Config(
app=App, ws=ws_protocol_cls, http=http_protocol_cls, lifespan="off", ws_max_size=2048, port=unused_tcp_port
)
async with run_server(config):
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}") as ws:
payload = b"A" * 1024
with pytest.raises(websockets.exceptions.ConnectionClosed) as exc_info:
await ws.write_frame(False, Opcode.BINARY, payload)
for _ in range(63): # 64 KiB total, well past 2 KiB budget
await ws.write_frame(False, Opcode.CONT, payload)
await ws.recv()
assert exc_info.value.rcvd is not None
assert exc_info.value.rcvd.code == 1009
async def test_fragmented_message_reassembly(
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
"""Server reassembles a fragmented message and delivers it to the app intact."""
received: list[bytes] = []
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
assert scope["type"] == "websocket"
connect = await receive()
assert connect["type"] == "websocket.connect"
await send({"type": "websocket.accept"})
message = await receive()
assert message["type"] == "websocket.receive"
payload = message.get("bytes")
assert payload is not None
received.append(payload)
await send({"type": "websocket.close"})
config = Config(app=app, ws=ws_protocol_cls, http=http_protocol_cls, lifespan="off", port=unused_tcp_port)
async with run_server(config):
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}") as ws:
payload = b"A" * 512
await ws.write_frame(False, Opcode.BINARY, payload)
for _ in range(4):
await ws.write_frame(False, Opcode.CONT, payload)
await ws.write_frame(True, Opcode.CONT, payload)
assert received == [b"A" * 512 * 6]
async def test_server_reject_connection(
ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
@ -1205,7 +1262,27 @@ async def test_lifespan_state(ws_protocol_cls: WSProtocol, http_protocol_cls: HT
assert expected_states == actual_states
async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
@pytest.fixture(
params=[
pytest.param(
"uvicorn.protocols.websockets.wsproto_impl:WSProtocol",
marks=skip_if_no_wsproto,
id="wsproto",
),
pytest.param(
"uvicorn.protocols.websockets.websockets_sansio_impl:WebSocketsSansIOProtocol", id="websockets-sansio"
),
]
)
def keepalive_ws_protocol_cls(request: pytest.FixtureRequest):
from uvicorn.importer import import_from_string
return import_from_string(request.param)
async def test_server_keepalive_ping_pong(
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
while True:
message = await receive()
@ -1216,7 +1293,7 @@ async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unuse
config = Config(
app=app,
ws=WebSocketsSansIOProtocol,
ws=keepalive_ws_protocol_cls,
http=http_protocol_cls,
lifespan="off",
ws_ping_interval=0.1,
@ -1227,7 +1304,7 @@ async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unuse
# The websockets client auto-responds to ping frames, keeping the connection alive.
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}", ping_interval=None):
protocol = list(server.server_state.connections)[0]
assert isinstance(protocol, WebSocketsSansIOProtocol)
assert isinstance(protocol, (_WSProtocol, WebSocketsSansIOProtocol))
# Wait until the server sends at least one keepalive ping, then
# sleep past the timeout window and ensure the connection stays open.
@ -1242,7 +1319,9 @@ async def test_server_keepalive_ping_pong(http_protocol_cls: HTTPProtocol, unuse
assert not protocol.transport.is_closing()
async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
async def test_server_keepalive_ping_timeout(
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
while True:
message = await receive()
@ -1253,7 +1332,7 @@ async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, un
config = Config(
app=app,
ws=WebSocketsSansIOProtocol,
ws=keepalive_ws_protocol_cls,
http=http_protocol_cls,
lifespan="off",
ws_ping_interval=0.1,
@ -1272,7 +1351,9 @@ async def test_server_keepalive_ping_timeout(http_protocol_cls: HTTPProtocol, un
assert exc_info.value.rcvd.reason == "keepalive ping timeout"
async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused_tcp_port: int):
async def test_server_keepalive_disabled(
keepalive_ws_protocol_cls: KeepaliveWSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int
):
async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
while True:
message = await receive()
@ -1283,7 +1364,7 @@ async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused
config = Config(
app=app,
ws=WebSocketsSansIOProtocol,
ws=keepalive_ws_protocol_cls,
http=http_protocol_cls,
lifespan="off",
ws_ping_interval=None,
@ -1292,5 +1373,5 @@ async def test_server_keepalive_disabled(http_protocol_cls: HTTPProtocol, unused
async with run_server(config) as server:
async with websockets.connect(f"ws://127.0.0.1:{unused_tcp_port}", ping_interval=None):
protocol = list(server.server_state.connections)[0]
assert isinstance(protocol, WebSocketsSansIOProtocol)
assert isinstance(protocol, (_WSProtocol, WebSocketsSansIOProtocol))
assert protocol.ping_timer is None

View File

@ -553,6 +553,37 @@ def test_bind_fd_works_with_reload_or_workers(reload: bool, workers: int): # pr
fdsock.close()
@pytest.fixture
def stdin_socket() -> Iterator[socket.socket]: # pragma: py-win32
with closing(socket.socket(socket.AF_INET)) as sock:
sock.bind(("127.0.0.1", 0))
saved_stdin = os.dup(0)
os.dup2(sock.fileno(), 0)
try:
yield sock
finally:
os.dup2(saved_stdin, 0)
os.close(saved_stdin)
@pytest.mark.parametrize(
"reload, workers",
[
(True, 1),
(False, 2),
],
ids=["--reload=True --workers=1", "--reload=False --workers=2"],
)
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
def test_bind_stdin_works_with_reload_or_workers(
reload: bool, workers: int, stdin_socket: socket.socket
): # pragma: py-win32
config = Config(app=asgi_app, fd=0, reload=reload, workers=workers)
config.load()
with closing(config.bind_socket()) as sock:
assert sock.getsockname() == stdin_socket.getsockname()
@pytest.mark.parametrize(
"reload, workers, expected",
[

View File

@ -1,7 +1,9 @@
import importlib
import inspect
import socket
import sys
from logging import WARNING
from pathlib import Path
import httpx
import pytest
@ -12,6 +14,7 @@ from uvicorn import Server
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
from uvicorn.config import Config
from uvicorn.main import run
from uvicorn.supervisors import Multiprocess
pytestmark = pytest.mark.anyio
@ -85,6 +88,61 @@ def test_run_invalid_app_config_combination(caplog: pytest.LogCaptureFixture) ->
)
def test_run_fails_fast_in_parent_on_bad_app_path(
caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Bad app path with `--workers > 1` exits in the parent.
Regression for https://github.com/encode/uvicorn/discussions/2440: without
parent-side validation the supervisor restarts dying workers forever.
"""
def fail(*args: object, **kwargs: object) -> None: # pragma: no cover
pytest.fail("parent reached supervisor; should have exited on bad app path")
monkeypatch.setattr(Config, "bind_socket", fail)
monkeypatch.setattr(Multiprocess, "run", fail)
with pytest.raises(SystemExit) as exit_exception:
run("tests.test_main:nonexistent_attr", workers=2)
assert exit_exception.value.code == 1
assert any("Error loading ASGI app" in record.message for record in caplog.records)
def test_run_imports_app_before_starting_event_loop(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""`uvicorn.run()` imports the app before `Server.run` opens the event loop.
Regression for https://github.com/encode/uvicorn/issues/941: an app whose
module body calls `asyncio.run(...)` crashes with "loop already running"
if Uvicorn imports it inside the server's event loop. The parent must
import the app synchronously, before `Server.run` enters `asyncio.run`.
"""
module = tmp_path / "eager_async_app.py"
module.write_text(
"import asyncio\n"
"async def _build():\n"
" async def app(scope, receive, send):\n"
" pass\n"
" return app\n"
"app = asyncio.run(_build())\n"
)
monkeypatch.syspath_prepend(str(tmp_path))
imported_before_server_run: list[bool] = []
def tracking_run(self: Server, sockets: object = None) -> None:
imported_before_server_run.append("eager_async_app" in sys.modules)
self.started = True
monkeypatch.setattr(Server, "run", tracking_run)
# The import side effect (`eager_async_app` lands in `sys.modules`) must
# happen before `Server.run`, which is where the event loop opens.
run("eager_async_app:app")
assert imported_before_server_run == [True]
def test_run_startup_failure(caplog: pytest.LogCaptureFixture) -> None:
async def app(scope, receive, send):
assert scope["type"] == "lifespan"

View File

@ -142,17 +142,13 @@ async def test_limit_max_requests_jitter(
config = Config(
app=app, limit_max_requests=1, limit_max_requests_jitter=2, port=unused_tcp_port, http=http_protocol_cls
)
server = Server(config=config)
limit = server.limit_max_requests
assert limit is not None
assert 1 <= limit <= 3
task = asyncio.create_task(server.serve())
while not server.started:
await asyncio.sleep(0.01)
async with httpx.AsyncClient() as client:
for _ in range(limit + 1):
await client.get(f"http://127.0.0.1:{unused_tcp_port}")
await task
async with run_server(config) as server:
limit = server.limit_max_requests
assert limit is not None
assert 1 <= limit <= 3
async with httpx.AsyncClient() as client:
tasks = [client.get(f"http://127.0.0.1:{unused_tcp_port}") for _ in range(limit + 1)]
await asyncio.gather(*tasks)
assert f"Maximum request limit of {limit} exceeded. Terminating process." in caplog.text

View File

@ -1,9 +1,17 @@
from __future__ import annotations
import ssl
from collections.abc import Callable
from typing import TypeAlias
import httpx
import pytest
from tests.utils import run_server
from uvicorn.config import Config
DefaultFactory: TypeAlias = Callable[[], ssl.SSLContext]
async def app(scope, receive, send):
assert scope["type"] == "http"
@ -92,3 +100,108 @@ async def test_run_password(
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
assert response.status_code == 204
@pytest.mark.anyio
async def test_run_ssl_context_factory_default(
tls_ca_ssl_context: ssl.SSLContext,
tls_certificate_server_cert_path: str,
tls_certificate_private_key_path: str,
unused_tcp_port: int,
) -> None:
"""A factory that just delegates to the default factory should produce a working server."""
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
return default_ssl_context_factory()
config = Config(
app=app,
loop="asyncio",
limit_max_requests=1,
ssl_keyfile=tls_certificate_private_key_path,
ssl_certfile=tls_certificate_server_cert_path,
ssl_context_factory=ssl_context_factory,
port=unused_tcp_port,
)
async with run_server(config):
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
assert response.status_code == 204
@pytest.mark.anyio
async def test_run_ssl_context_factory_custom(
tls_ca_ssl_context: ssl.SSLContext,
tls_certificate_server_cert_path: str,
tls_certificate_private_key_path: str,
unused_tcp_port: int,
) -> None:
"""A factory that builds its own SSLContext from scratch should work without ssl_keyfile/ssl_certfile."""
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(tls_certificate_server_cert_path, tls_certificate_private_key_path)
return ctx
config = Config(
app=app,
loop="asyncio",
limit_max_requests=1,
ssl_context_factory=ssl_context_factory,
port=unused_tcp_port,
)
async with run_server(config):
async with httpx.AsyncClient(verify=tls_ca_ssl_context) as client:
response = await client.get(f"https://127.0.0.1:{unused_tcp_port}")
assert response.status_code == 204
def test_ssl_context_factory_mutates_default(
tls_certificate_server_cert_path: str,
tls_certificate_private_key_path: str,
) -> None:
"""The factory can call the default and mutate the result (e.g., bump TLS minimum version)."""
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
ctx = default_ssl_context_factory()
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
return ctx
config = Config(
app=app,
ssl_keyfile=tls_certificate_private_key_path,
ssl_certfile=tls_certificate_server_cert_path,
ssl_context_factory=ssl_context_factory,
)
config.load()
assert config.is_ssl
assert isinstance(config.ssl, ssl.SSLContext)
assert config.ssl.minimum_version == ssl.TLSVersion.TLSv1_3
def test_default_ssl_context_factory_requires_ssl_certfile() -> None:
"""Calling `default_ssl_context_factory()` without `ssl_certfile` raises a clear error."""
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
return default_ssl_context_factory()
config = Config(app=app, ssl_context_factory=ssl_context_factory)
with pytest.raises(RuntimeError, match="requires `ssl_certfile`"):
config.load()
def test_ssl_context_factory_must_return_ssl_context() -> None:
def bad_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> object:
return "not an SSLContext"
config = Config(app=app, ssl_context_factory=bad_factory) # type: ignore[arg-type]
with pytest.raises(TypeError, match="must return an `ssl.SSLContext`"):
config.load()
def test_is_ssl_true_when_only_factory_set() -> None:
def ssl_context_factory(config: Config, default_ssl_context_factory: DefaultFactory) -> ssl.SSLContext:
return ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) # pragma: no cover
config = Config(app=app, ssl_context_factory=ssl_context_factory)
assert config.is_ssl is True

6
uv.lock generated
View File

@ -1757,11 +1757,11 @@ wheels = [
[[package]]
name = "urllib3"
version = "2.6.3"
version = "2.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" }
sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" },
{ url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" },
]
[[package]]

View File

@ -1,5 +1,5 @@
from uvicorn.config import Config
from uvicorn.main import Server, main, run
__version__ = "0.45.0"
__version__ = "0.47.0"
__all__ = ["main", "run", "Config", "Server"]

View File

@ -225,6 +225,7 @@ class Config:
ssl_cert_reqs: int = ssl.CERT_NONE,
ssl_ca_certs: str | os.PathLike[str] | None = None,
ssl_ciphers: str = "TLSv1",
ssl_context_factory: Callable[[Config, Callable[[], ssl.SSLContext]], ssl.SSLContext] | None = None,
headers: list[tuple[str, str]] | None = None,
factory: bool = False,
h11_max_incomplete_event_size: int | None = None,
@ -272,6 +273,7 @@ class Config:
self.ssl_cert_reqs = ssl_cert_reqs
self.ssl_ca_certs = ssl_ca_certs
self.ssl_ciphers = ssl_ciphers
self.ssl_context_factory = ssl_context_factory
self.headers: list[tuple[str, str]] = headers or []
self.encoded_headers: list[tuple[bytes, bytes]] = []
self.factory = factory
@ -357,7 +359,7 @@ class Config:
@property
def is_ssl(self) -> bool:
return bool(self.ssl_keyfile or self.ssl_certfile)
return bool(self.ssl_keyfile or self.ssl_certfile or self.ssl_context_factory)
@property
def use_subprocess(self) -> bool:
@ -407,12 +409,43 @@ class Config:
logging.getLogger("uvicorn.access").handlers = []
logging.getLogger("uvicorn.access").propagate = False
def load_app(self) -> Any:
"""Import the app and return it. Exits on failure."""
try:
return import_from_string(self.app)
except ImportFromStringError as exc:
logger.error("Error loading ASGI app. %s" % exc)
sys.exit(1)
def load(self) -> None:
assert not self.loaded
if self.is_ssl:
if self.ssl_context_factory is not None:
def default_factory() -> ssl.SSLContext:
if not self.ssl_certfile:
raise RuntimeError(
"`default_ssl_context_factory()` requires `ssl_certfile` to be set on `Config`. "
"Either pass `ssl_certfile` (and optionally `ssl_keyfile`) or build the `SSLContext` "
"directly inside `ssl_context_factory` without calling the default factory."
)
return create_ssl_context(
keyfile=self.ssl_keyfile,
certfile=self.ssl_certfile,
password=self.ssl_keyfile_password,
ssl_version=self.ssl_version,
cert_reqs=self.ssl_cert_reqs,
ca_certs=self.ssl_ca_certs,
ciphers=self.ssl_ciphers,
)
context = self.ssl_context_factory(self, default_factory)
if not isinstance(context, ssl.SSLContext):
raise TypeError(f"`ssl_context_factory` must return an `ssl.SSLContext`, got {type(context).__name__}")
self.ssl: ssl.SSLContext | None = context
elif self.is_ssl:
assert self.ssl_certfile
self.ssl: ssl.SSLContext | None = create_ssl_context(
self.ssl = create_ssl_context(
keyfile=self.ssl_keyfile,
certfile=self.ssl_certfile,
password=self.ssl_keyfile_password,
@ -445,11 +478,7 @@ class Config:
self.lifespan_class = import_from_string(LIFESPAN[self.lifespan])
try:
self.loaded_app = import_from_string(self.app)
except ImportFromStringError as exc:
logger.error("Error loading ASGI app. %s" % exc)
sys.exit(1)
self.loaded_app = self.load_app()
try:
self.loaded_app = self.loaded_app()
@ -508,7 +537,7 @@ class Config:
def bind_socket(self) -> socket.socket:
logger_args: list[str | int]
if self.uds: # pragma: py-win32
if self.uds is not None: # pragma: py-win32
path = self.uds
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
@ -523,7 +552,7 @@ class Config:
sock_name_format = "%s"
color_message = "Uvicorn running on " + click.style(sock_name_format, bold=True) + " (Press CTRL+C to quit)"
logger_args = [self.uds]
elif self.fd: # pragma: py-win32
elif self.fd is not None: # pragma: py-win32
sock = socket.fromfd(self.fd, socket.AF_UNIX, socket.SOCK_STREAM)
message = "Uvicorn running on socket %s (Press CTRL+C to quit)"
fd_name_format = "%s"

View File

@ -538,6 +538,7 @@ def run(
ssl_cert_reqs: int = ssl.CERT_NONE,
ssl_ca_certs: str | os.PathLike[str] | None = None,
ssl_ciphers: str = "TLSv1",
ssl_context_factory: Callable[[Config, Callable[[], ssl.SSLContext]], ssl.SSLContext] | None = None,
headers: list[tuple[str, str]] | None = None,
use_colors: bool | None = None,
app_dir: str | None = None,
@ -593,19 +594,21 @@ def run(
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ciphers=ssl_ciphers,
ssl_context_factory=ssl_context_factory,
headers=headers,
use_colors=use_colors,
factory=factory,
h11_max_incomplete_event_size=h11_max_incomplete_event_size,
reset_contextvars=reset_contextvars,
)
server = Server(config=config)
if (config.reload or config.workers > 1) and not isinstance(app, str):
logger = logging.getLogger("uvicorn.error")
logger.warning("You must pass the application as an import string to enable 'reload' or 'workers'.")
sys.exit(1)
config.load_app()
server = Server(config=config)
try:
if config.should_reload:
sock = config.bind_socket()
@ -615,8 +618,8 @@ def run(
Multiprocess(config, target=server.run, sockets=[sock]).run()
else:
server.run()
except KeyboardInterrupt:
pass # pragma: full coverage
except KeyboardInterrupt: # pragma: full coverage
pass
finally:
if config.uds and os.path.exists(config.uds):
os.remove(config.uds) # pragma: py-win32

View File

@ -105,7 +105,7 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
self.last_ping_rtt: float = 0.0
# Buffers
self.bytes = b""
self.bytes = bytearray()
def connection_made(self, transport: BaseTransport) -> None:
"""Called when a connection is made."""
@ -216,19 +216,19 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
task.add_done_callback(self.on_task_complete)
self.tasks.add(task)
def handle_cont(self, event: Frame) -> None: # pragma: no cover
self.bytes += event.data
def handle_cont(self, event: Frame) -> None:
self.bytes.extend(event.data)
if event.fin:
self.send_receive_event_to_app()
def handle_text(self, event: Frame) -> None:
self.bytes = event.data
self.bytes = bytearray(event.data)
self.curr_msg_data_type: Literal["text", "bytes"] = "text"
if event.fin:
self.send_receive_event_to_app()
def handle_bytes(self, event: Frame) -> None:
self.bytes = event.data
self.bytes = bytearray(event.data)
self.curr_msg_data_type = "bytes"
if event.fin:
self.send_receive_event_to_app()
@ -243,7 +243,7 @@ class WebSocketsSansIOProtocol(asyncio.Protocol):
self.handle_parser_exception()
return
else:
self.queue.put_nowait({"type": "websocket.receive", "bytes": self.bytes})
self.queue.put_nowait({"type": "websocket.receive", "bytes": bytes(self.bytes)})
if not self.read_paused:
self.read_paused = True
self.transport.pause_reading()

View File

@ -2,6 +2,10 @@ from __future__ import annotations
import asyncio
import logging
import random
import struct
from asyncio import TimerHandle
from io import BytesIO, StringIO
from typing import Any, Literal, cast
from urllib.parse import unquote
@ -11,12 +15,7 @@ from wsproto.connection import ConnectionState
from wsproto.extensions import Extension, PerMessageDeflate
from wsproto.utilities import LocalProtocolError, RemoteProtocolError
from uvicorn._types import (
ASGI3Application,
ASGISendEvent,
WebSocketEvent,
WebSocketScope,
)
from uvicorn._types import ASGI3Application, ASGISendEvent, WebSocketEvent, WebSocketReceiveEvent, WebSocketScope
from uvicorn.config import Config
from uvicorn.logging import TRACE_LOG_LEVEL
from uvicorn.protocols.utils import (
@ -30,6 +29,36 @@ from uvicorn.protocols.utils import (
from uvicorn.server import ServerState
class FrameTooLargeError(Exception):
"""Raised when accumulated websocket message bytes exceed `ws_max_size`."""
class WebsocketBuffer:
def __init__(self, max_length: int) -> None:
self.value: BytesIO | StringIO | None = None
self.length = 0
self.max_length = max_length
def extend(self, event: events.TextMessage | events.BytesMessage) -> None:
if self.value is None:
self.value = StringIO() if isinstance(event, events.TextMessage) else BytesIO()
self.value.write(event.data) # type: ignore[arg-type]
# `ws_max_size` is a byte budget, so count UTF-8 bytes for text.
self.length += len(event.data.encode()) if isinstance(event, events.TextMessage) else len(event.data)
if self.length > self.max_length:
raise FrameTooLargeError
def clear(self) -> None:
self.value = None
self.length = 0
def to_message(self) -> WebSocketReceiveEvent:
if isinstance(self.value, StringIO):
return {"type": "websocket.receive", "text": self.value.getvalue()}
assert isinstance(self.value, BytesIO)
return {"type": "websocket.receive", "bytes": self.value.getvalue()}
class WSProtocol(asyncio.Protocol):
def __init__(
self,
@ -73,15 +102,21 @@ class WSProtocol(asyncio.Protocol):
self.writable = asyncio.Event()
self.writable.set()
# Buffers
self.bytes = b""
self.text = ""
# Keepalive state
self.ping_interval = config.ws_ping_interval
self.ping_timeout = config.ws_ping_timeout
self.ping_timer: TimerHandle | None = None
self.pong_timer: TimerHandle | None = None
self.pending_ping_payload: bytes | None = None
self.ping_sent_at: float = 0.0
self.last_ping_rtt: float = 0.0
# Buffer
self.buffer = WebsocketBuffer(self.config.ws_max_size)
# Protocol interface
def connection_made( # type: ignore[override]
self, transport: asyncio.Transport
) -> None:
def connection_made(self, transport: asyncio.Transport) -> None: # type: ignore[override]
self.connections.add(self)
self.transport = transport
self.server = get_local_addr(transport)
@ -93,6 +128,7 @@ class WSProtocol(asyncio.Protocol):
self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection made", prefix)
def connection_lost(self, exc: Exception | None) -> None:
self.stop_keepalive()
code = 1005 if self.handshake_complete else 1006
self.queue.put_nowait({"type": "websocket.disconnect", "code": code})
self.connections.remove(self)
@ -120,16 +156,18 @@ class WSProtocol(asyncio.Protocol):
def handle_events(self) -> None:
for event in self.conn.events():
if self.close_sent:
return
if isinstance(event, events.Request):
self.handle_connect(event)
elif isinstance(event, events.TextMessage):
self.handle_text(event)
elif isinstance(event, events.BytesMessage):
self.handle_bytes(event)
elif isinstance(event, (events.TextMessage, events.BytesMessage)):
self.handle_message(event)
elif isinstance(event, events.CloseConnection):
self.handle_close(event)
elif isinstance(event, events.Ping):
self.handle_ping(event)
elif isinstance(event, events.Pong):
self.handle_pong(event)
def pause_writing(self) -> None:
"""
@ -144,6 +182,7 @@ class WSProtocol(asyncio.Protocol):
self.writable.set() # pragma: full coverage
def shutdown(self) -> None:
self.stop_keepalive()
if self.handshake_complete:
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1012})
output = self.conn.send(wsproto.events.CloseConnection(code=1012))
@ -185,21 +224,20 @@ class WSProtocol(asyncio.Protocol):
task.add_done_callback(self.on_task_complete)
self.tasks.add(task)
def handle_text(self, event: events.TextMessage) -> None:
self.text += event.data
def handle_message(self, event: events.TextMessage | events.BytesMessage) -> None:
try:
self.buffer.extend(event)
except FrameTooLargeError:
self.close_sent = True
reason = f"Message exceeds the maximum size ({self.config.ws_max_size} bytes)"
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1009, "reason": reason})
if not self.transport.is_closing():
self.transport.write(self.conn.send(wsproto.events.CloseConnection(code=1009, reason=reason)))
self.transport.close()
return
if event.message_finished:
self.queue.put_nowait({"type": "websocket.receive", "text": self.text})
self.text = ""
if not self.read_paused:
self.read_paused = True
self.transport.pause_reading()
def handle_bytes(self, event: events.BytesMessage) -> None:
self.bytes += event.data
# todo: we may want to guard the size of self.bytes and self.text
if event.message_finished:
self.queue.put_nowait({"type": "websocket.receive", "bytes": self.bytes})
self.bytes = b""
self.queue.put_nowait(self.buffer.to_message())
self.buffer.clear()
if not self.read_paused:
self.read_paused = True
self.transport.pause_reading()
@ -213,6 +251,65 @@ class WSProtocol(asyncio.Protocol):
def handle_ping(self, event: events.Ping) -> None:
self.transport.write(self.conn.send(event.response()))
def handle_pong(self, event: events.Pong) -> None:
# Ignore unsolicited pongs and stale pongs whose payload doesn't match the ping currently in flight.
if self.pending_ping_payload is None or bytes(event.payload) != self.pending_ping_payload:
return # pragma: no cover
self.last_ping_rtt = self.loop.time() - self.ping_sent_at
self.pending_ping_payload = None
# The peer answered in time; cancel the pong deadline and chain the next ping. This `schedule_ping()` call is
# what keeps the keepalive loop running when ping_timeout is set. When ping_timeout is None the next ping is
# already scheduled by `send_keepalive_ping`, so we must not schedule a duplicate here.
if self.pong_timer is not None:
self.pong_timer.cancel()
self.pong_timer = None
self.schedule_ping()
def start_keepalive(self) -> None:
if self.ping_interval is not None and self.ping_interval > 0:
self.schedule_ping()
def stop_keepalive(self) -> None:
if self.ping_timer is not None:
self.ping_timer.cancel()
self.ping_timer = None
if self.pong_timer is not None: # pragma: no cover
self.pong_timer.cancel()
self.pong_timer = None
self.pending_ping_payload = None
def schedule_ping(self) -> None:
assert self.ping_interval is not None
delay = max(0.0, self.ping_interval - self.last_ping_rtt)
self.ping_timer = self.loop.call_later(delay, self.send_keepalive_ping)
def send_keepalive_ping(self) -> None:
self.ping_timer = None
if self.close_sent or self.transport.is_closing(): # pragma: no cover
return
# Random 4-byte payload identifies this ping; `handle_pong` uses it to ignore stale or unsolicited pongs.
self.pending_ping_payload = struct.pack("!I", random.getrandbits(32))
self.ping_sent_at = self.loop.time()
self.transport.write(self.conn.send(wsproto.events.Ping(payload=self.pending_ping_payload)))
if self.ping_timeout is not None:
self.pong_timer = self.loop.call_later(self.ping_timeout, self.keepalive_timeout)
else: # pragma: no cover
self.schedule_ping()
def keepalive_timeout(self) -> None:
self.pong_timer = None
self.pending_ping_payload = None
if self.close_sent or self.transport.is_closing(): # pragma: no cover
return
if self.logger.level <= TRACE_LOG_LEVEL:
prefix = "%s:%d - " % self.client if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket keepalive ping timeout", prefix)
reason = "keepalive ping timeout"
self.transport.write(self.conn.send(wsproto.events.CloseConnection(code=1011, reason=reason)))
self.close_sent = True
self.transport.close()
def send_500_response(self) -> None:
if self.response_started or self.handshake_complete:
return # we cannot send responses anymore
@ -266,6 +363,7 @@ class WSProtocol(asyncio.Protocol):
)
)
self.transport.write(output)
self.start_keepalive()
elif message["type"] == "websocket.close":
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1006})