Compare commits
2 Commits
main
...
forwarded-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fec7b4cc4a | ||
|
|
7c2e669a66 |
2
.github/workflows/zizmor.yml
vendored
2
.github/workflows/zizmor.yml
vendored
@ -14,6 +14,8 @@ jobs:
|
||||
|
||||
permissions:
|
||||
security-events: write # Required for upload-sarif (used by zizmor-action) to upload SARIF files.
|
||||
contents: read # Only needed for private repos. Needed to clone the repo.
|
||||
actions: read # Only needed for private repos. Needed for upload-sarif to read workflow run info.
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
|
||||
@ -82,7 +82,7 @@ The default process manager monitors the status of child processes and automatic
|
||||
|
||||
You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.)
|
||||
|
||||
- `SIGHUP`: Work processes are graceful restarted one after another. If you update the code, the new worker process will use the new code.
|
||||
- `SIGHUP`: Work processeses are graceful restarted one after another. If you update the code, the new worker process will use the new code.
|
||||
- `SIGTTIN`: Increase the number of worker processes by one.
|
||||
- `SIGTTOU`: Decrease the number of worker processes by one.
|
||||
|
||||
@ -260,13 +260,21 @@ The factory is called inside each worker process, so it works with `--reload` an
|
||||
When running an application behind one or more proxies, certain information about the request is lost.
|
||||
To avoid this most proxies will add headers containing this information for downstream servers to read.
|
||||
|
||||
Uvicorn currently supports the following headers:
|
||||
Uvicorn supports two header families, selected via `--proxy-headers <x-forwarded|forwarded>`:
|
||||
|
||||
- `X-Forwarded-For` ([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For))
|
||||
- `X-Forwarded-Proto`([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-Proto))
|
||||
`x-forwarded` (default):
|
||||
|
||||
Uvicorn can use these headers to correctly set the client and protocol in the request.
|
||||
However as anyone can set these headers you must configure which "clients" you will trust to have set them correctly.
|
||||
- `X-Forwarded-For` ([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For)) - sets `scope["client"]`.
|
||||
- `X-Forwarded-Proto` ([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-Proto)) - sets `scope["scheme"]`.
|
||||
- `X-Forwarded-Host` ([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-Host)) - rewrites the `Host` request header.
|
||||
|
||||
`forwarded`:
|
||||
|
||||
- `Forwarded` ([RFC 7239](https://datatracker.ietf.org/doc/html/rfc7239)) - the standardized header that combines all of the above into a single value with `for=`, `proto=`, and `host=` parameters.
|
||||
|
||||
The two modes are mutually exclusive: when one is selected, the other family is ignored entirely. This is intentional - silent fallback would let a client set whichever family the proxy is not configured for. Choose the mode that matches what your upstream proxy emits.
|
||||
|
||||
However, as anyone can set these headers you must configure which "clients" you will trust to have set them correctly.
|
||||
|
||||
Uvicorn can be configured to trust IP Addresses (e.g. `127.0.0.1`), IP Networks (e.g. `10.100.0.0/16`),
|
||||
or Literals (e.g. `/path/to/socket.sock`). When running from CLI these are configured using `--forwarded-allow-ips`.
|
||||
|
||||
@ -2,20 +2,6 @@
|
||||
toc_depth: 2
|
||||
---
|
||||
|
||||
## 0.47.0 (May 14, 2026)
|
||||
|
||||
### Added
|
||||
|
||||
* Add `ssl_context_factory` for custom `SSLContext` configuration (#2920)
|
||||
|
||||
### Changed
|
||||
|
||||
* Eagerly import the ASGI app in the parent process (#2919)
|
||||
|
||||
### Fixed
|
||||
|
||||
* Treat `fd=0` as a valid file descriptor with reload/workers (#2927)
|
||||
|
||||
## 0.46.0 (April 23, 2026)
|
||||
|
||||
### Added
|
||||
|
||||
@ -115,7 +115,7 @@ Note that WSGI mode always disables WebSocket support, as it is not supported by
|
||||
## HTTP
|
||||
|
||||
* `--root-path <str>` - Set the ASGI `root_path` for applications submounted below a given URL path. **Default:** *""*.
|
||||
* `--proxy-headers / --no-proxy-headers` - Enable/Disable X-Forwarded-Proto, X-Forwarded-For to populate remote address info. Defaults to enabled, but is restricted to only trusting connecting IPs in the `forwarded-allow-ips` configuration.
|
||||
* `--proxy-headers <x-forwarded|forwarded>` - Which forwarding-header family to read to populate remote address info, scheme, and host. `x-forwarded` (default) reads `X-Forwarded-Proto`, `X-Forwarded-For`, and `X-Forwarded-Host`. `forwarded` reads the standardized `Forwarded` header from [RFC 7239](https://datatracker.ietf.org/doc/html/rfc7239). Modes are mutually exclusive: when one is selected, the other family is ignored entirely. Bare `--proxy-headers` means `x-forwarded` (matching the prior boolean default). Use `--no-proxy-headers` to disable. Header parsing is restricted to only trusting connecting IPs in the `forwarded-allow-ips` configuration.
|
||||
* `--forwarded-allow-ips <comma-separated-list>` - Comma separated list of IP Addresses, IP Networks, or literals (e.g. UNIX Socket path) to trust with proxy headers. Defaults to the `$FORWARDED_ALLOW_IPS` environment variable if available, or '127.0.0.1'. The literal `'*'` means trust everything.
|
||||
* `--server-header / --no-server-header` - Enable/Disable default `Server` header. **Default:** *True*.
|
||||
* `--date-header / --no-date-header` - Enable/Disable default `Date` header. **Default:** *True*.
|
||||
|
||||
@ -82,8 +82,7 @@ docs = [
|
||||
|
||||
[tool.uv]
|
||||
default-groups = ["dev", "docs"]
|
||||
required-version = ">=0.9.17"
|
||||
exclude-newer = "7 days"
|
||||
required-version = ">=0.8.6"
|
||||
|
||||
[project.scripts]
|
||||
uvicorn = "uvicorn.main:main"
|
||||
|
||||
@ -23,6 +23,7 @@ if TYPE_CHECKING:
|
||||
|
||||
X_FORWARDED_FOR = "X-Forwarded-For"
|
||||
X_FORWARDED_PROTO = "X-Forwarded-Proto"
|
||||
FORWARDED = "Forwarded"
|
||||
|
||||
|
||||
async def default_app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
@ -554,3 +555,330 @@ async def test_proxy_headers_empty_x_forwarded_for() -> None:
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert response.text == "https://127.0.0.1:123"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RFC 7239 `Forwarded` header (mode="forwarded")
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def forwarded_app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
"""Echoes scheme, client, and host header as space-separated `key=value` pairs."""
|
||||
headers = dict(scope["headers"]) # type: ignore
|
||||
host_header = headers.get(b"host", b"").decode("latin1")
|
||||
client = scope["client"] # type: ignore
|
||||
scheme = scope["scheme"] # type: ignore
|
||||
assert client is not None
|
||||
body = f"scheme={scheme} client={client[0]}:{client[1]} host={host_header}"
|
||||
await Response(body, media_type="text/plain")(scope, receive, send)
|
||||
|
||||
|
||||
def parse_echo(text: str) -> dict[str, str]:
|
||||
"""Parse `forwarded_app`'s `key=value key=value ...` echo into a dict."""
|
||||
return dict(part.split("=", 1) for part in text.split(" "))
|
||||
|
||||
|
||||
def make_forwarded_client(
|
||||
trusted_hosts: str | list[str],
|
||||
client: tuple[str, int] = ("127.0.0.1", 123),
|
||||
) -> httpx.AsyncClient:
|
||||
"""httpx client wired to a `mode='forwarded'` middleware over `forwarded_app`."""
|
||||
app = ProxyHeadersMiddleware(forwarded_app, trusted_hosts, mode="forwarded")
|
||||
transport = httpx.ASGITransport(app=app, client=client) # type: ignore
|
||||
return httpx.AsyncClient(transport=transport, base_url="http://testserver")
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_multiple_hops_picks_rightmost() -> None:
|
||||
"""Walks from the right; first hop whose `for=` is not trusted is the client."""
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4, for=10.0.0.5;proto=http, for=10.0.0.6"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["client"] == "1.2.3.4:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_falls_back_to_leftmost_when_all_trusted() -> None:
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = {FORWARDED: "for=10.0.0.1, for=10.0.0.2"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["client"] == "10.0.0.1:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_quoted_string_escape_is_unescaped() -> None:
|
||||
"""RFC 7230 quoted-string: `\\"` inside quotes decodes to a literal `"`."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: r'for="1.2.3.4";host="weird\"name"'}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["client"] == "1.2.3.4:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_case_insensitive_param_keys() -> None:
|
||||
"""RFC 7239 §4: parameter names are case-insensitive."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "For=1.2.3.4;PROTO=https;Host=public.example"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] == "public.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_unknown_params_ignored() -> None:
|
||||
"""Unrecognized parameters (here, `secret`) must not affect scope mutation."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;secret=hunter2;proto=https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_empty_header_does_not_mutate_scope() -> None:
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: ""}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# Untouched scope: peer client is the httpx transport peer, scheme stays http.
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "127.0.0.1:123"
|
||||
assert echo["scheme"] == "http"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_malformed_pair_in_entry_is_dropped_but_entry_survives() -> None:
|
||||
"""Pairs without `=` are skipped, other params on the same hop survive."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;novalue;proto=https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_duplicate_param_drops_entry() -> None:
|
||||
"""RFC 7239 §4: each parameter appears at most once per element. Smuggling defense -
|
||||
a single hop with two `for=` values must be rejected so an attacker cannot prepend
|
||||
a fake value that some downstream parser might pick over the real one."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=attacker.example;for=1.2.3.4"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# Entire entry is dropped; scope falls through to the peer client.
|
||||
assert parse_echo(response.text)["client"] == "127.0.0.1:123"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_duplicate_param_only_drops_offending_entry() -> None:
|
||||
"""A poisoned entry must not contaminate a later, well-formed entry in the same header."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=attacker.example;for=1.2.3.4, for=5.6.7.8"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["client"] == "5.6.7.8:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_missing_for_is_unanchorable() -> None:
|
||||
"""An entry without `for=` cannot identify a client and must be skipped entirely."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "host=attacker.example;proto=https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# `host=`/`proto=` from an unanchorable entry must NOT apply to the request.
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["host"] != "attacker.example"
|
||||
assert echo["scheme"] == "http"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_placeholder_for_filtered_under_always_trust() -> None:
|
||||
"""With `always_trust=*`, an unanchorable leftmost entry must not be the fallback."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=unknown;proto=https;host=attacker.example, for=1.2.3.4"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize("placeholder", ["unknown", "Unknown", "_hidden"])
|
||||
async def test_forwarded_mode_placeholder_for_variants_are_unanchorable(placeholder: str) -> None:
|
||||
"""`unknown` (case-insensitive) and `_*` obfuscated identifiers cannot anchor a hop."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: f"for={placeholder};host=attacker.example;proto=https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["host"] != "attacker.example"
|
||||
assert echo["scheme"] == "http"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_basic() -> None:
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;proto=https;host=public.example:9000"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] == "public.example:9000"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_untrusted_peer_ignored() -> None:
|
||||
async with make_forwarded_client("192.168.0.1") as client:
|
||||
headers = {FORWARDED: "for=attacker.example;proto=https;host=attacker.example"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] != "attacker.example:0"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_ipv6_quoted() -> None:
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: 'for="[2001:db8::1]:443";proto=https;host="public.example"'}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["client"] == "2001:db8::1:443"
|
||||
assert echo["host"] == "public.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_chained_proxy_picks_rightmost_untrusted() -> None:
|
||||
"""The connecting peer (10.0.0.5) and the next-to-last hop are both trusted; client = 1.2.3.4."""
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;proto=https, for=10.0.0.5;proto=http;host=internal.lan"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# The rightmost untrusted hop is `for=1.2.3.4`, which carries `proto=https` only.
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["host"] != "internal.lan"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_placeholder_for_falls_through() -> None:
|
||||
"""`for=unknown` is unanchorable; the next anchorable hop wins."""
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = {FORWARDED: "for=unknown;proto=https;host=attacker.example, for=1.2.3.4;proto=http"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_proto_case_insensitive() -> None:
|
||||
"""RFC 3986 - URI schemes are case-insensitive. `proto=HTTPS` must work."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;proto=HTTPS;host=public.example"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["scheme"] == "https"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_ignores_x_forwarded_headers() -> None:
|
||||
"""When mode='forwarded', the X-Forwarded-* family must be completely ignored."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {
|
||||
FORWARDED: "for=1.2.3.4;proto=https",
|
||||
X_FORWARDED_FOR: "9.9.9.9",
|
||||
X_FORWARDED_PROTO: "http",
|
||||
"X-Forwarded-Host": "attacker.example",
|
||||
}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_x_forwarded_mode_ignores_forwarded_header() -> None:
|
||||
"""And vice versa: when mode='x-forwarded' (default), `Forwarded` is ignored."""
|
||||
app = ProxyHeadersMiddleware(forwarded_app, trusted_hosts="*", mode="x-forwarded")
|
||||
transport = httpx.ASGITransport(app=app, client=("127.0.0.1", 123)) # type: ignore
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
headers = {
|
||||
FORWARDED: "for=attacker.example;proto=https;host=attacker.example",
|
||||
X_FORWARDED_FOR: "1.2.3.4",
|
||||
X_FORWARDED_PROTO: "https",
|
||||
}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_via_config(unused_tcp_port: int) -> None:
|
||||
"""Programmatic API: `Config(proxy_headers_mode='forwarded')` wires the right mode."""
|
||||
config = Config(
|
||||
app=forwarded_app,
|
||||
loop="asyncio",
|
||||
limit_max_requests=1,
|
||||
proxy_headers_mode="forwarded",
|
||||
forwarded_allow_ips="*",
|
||||
port=unused_tcp_port,
|
||||
)
|
||||
async with run_server(config):
|
||||
async with httpx.AsyncClient() as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;proto=https;host=public.example:9000"}
|
||||
response = await client.get(f"http://127.0.0.1:{unused_tcp_port}", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] == "public.example:9000"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_x_forwarded_proto_last_wins_on_duplicates() -> None:
|
||||
"""X-Forwarded-Proto is single-valued; if duplicated, take the rightmost (the value
|
||||
appended by the trusted upstream proxy, not a client-supplied earlier copy)."""
|
||||
async with make_httpx_client("*") as client:
|
||||
# httpx.Headers preserves multiple values for the same name in order.
|
||||
headers = httpx.Headers(
|
||||
[(X_FORWARDED_PROTO, "http"), (X_FORWARDED_PROTO, "https"), (X_FORWARDED_FOR, "1.2.3.4")]
|
||||
)
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# The default_app echoes scheme://client; last X-Forwarded-Proto value (https) wins.
|
||||
assert response.text == "https://1.2.3.4:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_joins_multiple_forwarded_headers() -> None:
|
||||
"""Multiple `Forwarded` headers are list-equivalent per RFC 7230 §3.2.2."""
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = httpx.Headers([(FORWARDED, "for=1.2.3.4;proto=https"), (FORWARDED, "for=10.0.0.5")])
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# Right-most untrusted in the joined list is `for=1.2.3.4` (10.0.0.5 is trusted).
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
|
||||
@ -553,37 +553,6 @@ def test_bind_fd_works_with_reload_or_workers(reload: bool, workers: int): # pr
|
||||
fdsock.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stdin_socket() -> Iterator[socket.socket]: # pragma: py-win32
|
||||
with closing(socket.socket(socket.AF_INET)) as sock:
|
||||
sock.bind(("127.0.0.1", 0))
|
||||
saved_stdin = os.dup(0)
|
||||
os.dup2(sock.fileno(), 0)
|
||||
try:
|
||||
yield sock
|
||||
finally:
|
||||
os.dup2(saved_stdin, 0)
|
||||
os.close(saved_stdin)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"reload, workers",
|
||||
[
|
||||
(True, 1),
|
||||
(False, 2),
|
||||
],
|
||||
ids=["--reload=True --workers=1", "--reload=False --workers=2"],
|
||||
)
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
|
||||
def test_bind_stdin_works_with_reload_or_workers(
|
||||
reload: bool, workers: int, stdin_socket: socket.socket
|
||||
): # pragma: py-win32
|
||||
config = Config(app=asgi_app, fd=0, reload=reload, workers=workers)
|
||||
config.load()
|
||||
with closing(config.bind_socket()) as sock:
|
||||
assert sock.getsockname() == stdin_socket.getsockname()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"reload, workers, expected",
|
||||
[
|
||||
|
||||
@ -142,13 +142,17 @@ async def test_limit_max_requests_jitter(
|
||||
config = Config(
|
||||
app=app, limit_max_requests=1, limit_max_requests_jitter=2, port=unused_tcp_port, http=http_protocol_cls
|
||||
)
|
||||
async with run_server(config) as server:
|
||||
limit = server.limit_max_requests
|
||||
assert limit is not None
|
||||
assert 1 <= limit <= 3
|
||||
async with httpx.AsyncClient() as client:
|
||||
tasks = [client.get(f"http://127.0.0.1:{unused_tcp_port}") for _ in range(limit + 1)]
|
||||
await asyncio.gather(*tasks)
|
||||
server = Server(config=config)
|
||||
limit = server.limit_max_requests
|
||||
assert limit is not None
|
||||
assert 1 <= limit <= 3
|
||||
task = asyncio.create_task(server.serve())
|
||||
while not server.started:
|
||||
await asyncio.sleep(0.01)
|
||||
async with httpx.AsyncClient() as client:
|
||||
for _ in range(limit + 1):
|
||||
await client.get(f"http://127.0.0.1:{unused_tcp_port}")
|
||||
await task
|
||||
assert f"Maximum request limit of {limit} exceeded. Terminating process." in caplog.text
|
||||
|
||||
|
||||
|
||||
6
uv.lock
generated
6
uv.lock
generated
@ -1757,11 +1757,11 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "2.7.0"
|
||||
version = "2.6.3"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
from uvicorn.config import Config
|
||||
from uvicorn.main import Server, main, run
|
||||
|
||||
__version__ = "0.47.0"
|
||||
__version__ = "0.46.0"
|
||||
__all__ = ["main", "run", "Config", "Server"]
|
||||
|
||||
@ -205,6 +205,7 @@ class Config:
|
||||
reload_excludes: list[str] | str | None = None,
|
||||
workers: int | None = None,
|
||||
proxy_headers: bool = True,
|
||||
proxy_headers_mode: Literal["x-forwarded", "forwarded"] = "x-forwarded",
|
||||
server_header: bool = True,
|
||||
date_header: bool = True,
|
||||
forwarded_allow_ips: list[str] | str | None = None,
|
||||
@ -254,6 +255,7 @@ class Config:
|
||||
self.reload_delay = reload_delay
|
||||
self.workers = workers or 1
|
||||
self.proxy_headers = proxy_headers
|
||||
self.proxy_headers_mode = proxy_headers_mode
|
||||
self.server_header = server_header
|
||||
self.date_header = date_header
|
||||
self.root_path = root_path
|
||||
@ -511,7 +513,11 @@ class Config:
|
||||
if logger.getEffectiveLevel() <= TRACE_LOG_LEVEL:
|
||||
self.loaded_app = MessageLoggerMiddleware(self.loaded_app)
|
||||
if self.proxy_headers:
|
||||
self.loaded_app = ProxyHeadersMiddleware(self.loaded_app, trusted_hosts=self.forwarded_allow_ips)
|
||||
self.loaded_app = ProxyHeadersMiddleware(
|
||||
self.loaded_app,
|
||||
trusted_hosts=self.forwarded_allow_ips,
|
||||
mode=self.proxy_headers_mode,
|
||||
)
|
||||
|
||||
self.loaded = True
|
||||
|
||||
@ -537,7 +543,7 @@ class Config:
|
||||
|
||||
def bind_socket(self) -> socket.socket:
|
||||
logger_args: list[str | int]
|
||||
if self.uds is not None: # pragma: py-win32
|
||||
if self.uds: # pragma: py-win32
|
||||
path = self.uds
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
try:
|
||||
@ -552,7 +558,7 @@ class Config:
|
||||
sock_name_format = "%s"
|
||||
color_message = "Uvicorn running on " + click.style(sock_name_format, bold=True) + " (Press CTRL+C to quit)"
|
||||
logger_args = [self.uds]
|
||||
elif self.fd is not None: # pragma: py-win32
|
||||
elif self.fd: # pragma: py-win32
|
||||
sock = socket.fromfd(self.fd, socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
message = "Uvicorn running on socket %s (Press CTRL+C to quit)"
|
||||
fd_name_format = "%s"
|
||||
|
||||
@ -9,7 +9,7 @@ import sys
|
||||
import warnings
|
||||
from collections.abc import Callable
|
||||
from configparser import RawConfigParser
|
||||
from typing import IO, Any, get_args
|
||||
from typing import IO, Any, Literal, get_args
|
||||
|
||||
import click
|
||||
|
||||
@ -223,10 +223,23 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
|
||||
help="Enable/Disable colorized logging.",
|
||||
)
|
||||
@click.option(
|
||||
"--proxy-headers/--no-proxy-headers",
|
||||
"--proxy-headers",
|
||||
type=click.Choice(["x-forwarded", "forwarded"]),
|
||||
is_flag=False,
|
||||
flag_value="x-forwarded",
|
||||
default="x-forwarded",
|
||||
show_default=True,
|
||||
help="Which forwarding-header family to read. `x-forwarded` (default) reads "
|
||||
"X-Forwarded-Proto / -For / -Host. `forwarded` reads the RFC 7239 `Forwarded` header. "
|
||||
"Bare `--proxy-headers` means `x-forwarded` (matches the prior boolean default). "
|
||||
"Use `--no-proxy-headers` to disable.",
|
||||
)
|
||||
@click.option(
|
||||
"--no-proxy-headers",
|
||||
"no_proxy_headers",
|
||||
is_flag=True,
|
||||
default=True,
|
||||
help="Enable/Disable X-Forwarded-Proto, X-Forwarded-For to populate url scheme and remote address info.",
|
||||
default=False,
|
||||
help="Disable proxy header parsing.",
|
||||
)
|
||||
@click.option(
|
||||
"--server-header/--no-server-header",
|
||||
@ -412,7 +425,8 @@ def main(
|
||||
log_config: str,
|
||||
log_level: str,
|
||||
access_log: bool,
|
||||
proxy_headers: bool,
|
||||
proxy_headers: Literal["x-forwarded", "forwarded"],
|
||||
no_proxy_headers: bool,
|
||||
server_header: bool,
|
||||
date_header: bool,
|
||||
forwarded_allow_ips: str,
|
||||
@ -438,6 +452,8 @@ def main(
|
||||
reset_contextvars: bool,
|
||||
factory: bool,
|
||||
) -> None:
|
||||
proxy_headers_enabled = not no_proxy_headers
|
||||
proxy_headers_mode = proxy_headers
|
||||
run(
|
||||
app,
|
||||
host=host,
|
||||
@ -464,7 +480,8 @@ def main(
|
||||
reload_excludes=reload_excludes or None,
|
||||
reload_delay=reload_delay,
|
||||
workers=workers,
|
||||
proxy_headers=proxy_headers,
|
||||
proxy_headers=proxy_headers_enabled,
|
||||
proxy_headers_mode=proxy_headers_mode,
|
||||
server_header=server_header,
|
||||
date_header=date_header,
|
||||
forwarded_allow_ips=forwarded_allow_ips,
|
||||
@ -520,6 +537,7 @@ def run(
|
||||
log_level: str | int | None = None,
|
||||
access_log: bool = True,
|
||||
proxy_headers: bool = True,
|
||||
proxy_headers_mode: Literal["x-forwarded", "forwarded"] = "x-forwarded",
|
||||
server_header: bool = True,
|
||||
date_header: bool = True,
|
||||
forwarded_allow_ips: list[str] | str | None = None,
|
||||
@ -576,6 +594,7 @@ def run(
|
||||
log_level=log_level,
|
||||
access_log=access_log,
|
||||
proxy_headers=proxy_headers,
|
||||
proxy_headers_mode=proxy_headers_mode,
|
||||
server_header=server_header,
|
||||
date_header=date_header,
|
||||
forwarded_allow_ips=forwarded_allow_ips,
|
||||
|
||||
@ -1,60 +1,155 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
from typing import Literal
|
||||
|
||||
from uvicorn._types import ASGI3Application, ASGIReceiveCallable, ASGISendCallable, Scope
|
||||
from uvicorn._types import ASGI3Application, ASGIReceiveCallable, ASGISendCallable, Scope, WWWScope
|
||||
|
||||
|
||||
class ProxyHeadersMiddleware:
|
||||
"""Middleware for handling known proxy headers
|
||||
"""Maps proxy-supplied forwarding headers onto the ASGI scope.
|
||||
|
||||
This middleware can be used when a known proxy is fronting the application,
|
||||
and is trusted to be properly setting the `X-Forwarded-Proto` and
|
||||
`X-Forwarded-For` headers with the connecting client information.
|
||||
Selects a parser at construction based on `mode`:
|
||||
|
||||
Modifies the `client` and `scheme` information so that they reference
|
||||
the connecting client, rather that the connecting proxy.
|
||||
* `"x-forwarded"` (default) - reads `X-Forwarded-Proto`, `X-Forwarded-For`,
|
||||
and `X-Forwarded-Host`.
|
||||
* `"forwarded"` - reads the standardized `Forwarded` header (RFC 7239).
|
||||
|
||||
References:
|
||||
- <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers#Proxies>
|
||||
- <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For>
|
||||
Modes are mutually exclusive: when one is selected, the other family is
|
||||
ignored entirely. Silent fallback would let an attacker set whichever
|
||||
family the proxy is not configured for.
|
||||
"""
|
||||
|
||||
def __init__(self, app: ASGI3Application, trusted_hosts: list[str] | str = "127.0.0.1") -> None:
|
||||
self.app = app
|
||||
self.trusted_hosts = _TrustedHosts(trusted_hosts)
|
||||
def __init__(
|
||||
self,
|
||||
app: ASGI3Application,
|
||||
trusted_hosts: list[str] | str = "127.0.0.1",
|
||||
mode: Literal["x-forwarded", "forwarded"] = "x-forwarded",
|
||||
) -> None:
|
||||
trusted = _TrustedHosts(trusted_hosts)
|
||||
self._inner = (_XForwardedParser if mode == "x-forwarded" else _RFC7239Parser)(app, trusted)
|
||||
|
||||
async def __call__(self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
if scope["type"] == "lifespan":
|
||||
return await self.app(scope, receive, send)
|
||||
return await self._inner(scope, receive, send)
|
||||
|
||||
client_addr = scope.get("client")
|
||||
client_host = client_addr[0] if client_addr else None
|
||||
|
||||
if client_host in self.trusted_hosts:
|
||||
headers = dict(scope["headers"])
|
||||
class _BaseForwardedParser:
|
||||
"""Base parser: handles trust gating and shared scope rewriting.
|
||||
|
||||
if b"x-forwarded-proto" in headers:
|
||||
x_forwarded_proto = headers[b"x-forwarded-proto"].decode("latin1").strip()
|
||||
Subclasses implement `apply`, which is invoked only when the connecting peer is in the trust set.
|
||||
"""
|
||||
|
||||
if x_forwarded_proto in {"http", "https", "ws", "wss"}:
|
||||
if scope["type"] == "websocket":
|
||||
scope["scheme"] = x_forwarded_proto.replace("http", "ws")
|
||||
else:
|
||||
scope["scheme"] = x_forwarded_proto
|
||||
|
||||
if b"x-forwarded-for" in headers:
|
||||
x_forwarded_for = headers[b"x-forwarded-for"].decode("latin1")
|
||||
host, port = self.trusted_hosts.get_trusted_client_address(x_forwarded_for)
|
||||
|
||||
if host:
|
||||
# If the x-forwarded-for header is empty then host is an empty string.
|
||||
# Only set the client if we actually got something usable.
|
||||
# See: https://github.com/Kludex/uvicorn/issues/1068
|
||||
scope["client"] = (host, port)
|
||||
def __init__(self, app: ASGI3Application, trusted: _TrustedHosts) -> None:
|
||||
self.app = app
|
||||
self.trusted = trusted
|
||||
|
||||
async def __call__(self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
if scope["type"] != "lifespan":
|
||||
client_addr = scope.get("client")
|
||||
client_host = client_addr[0] if client_addr else None
|
||||
if client_host in self.trusted:
|
||||
self.apply(scope)
|
||||
return await self.app(scope, receive, send)
|
||||
|
||||
def apply(self, scope: WWWScope) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def set_scheme(scope: WWWScope, proto: str) -> None:
|
||||
if proto not in {"http", "https", "ws", "wss"}:
|
||||
return
|
||||
if scope["type"] == "websocket":
|
||||
scope["scheme"] = proto.replace("http", "ws")
|
||||
else:
|
||||
scope["scheme"] = proto
|
||||
|
||||
@staticmethod
|
||||
def set_host(scope: WWWScope, host_value: str) -> None:
|
||||
# Rewrite only the `Host` header. Per ASGI, `scope["server"]` is the local bind
|
||||
# address, not the client-perceived host, so we leave it untouched.
|
||||
scope["headers"] = [(name, value) for name, value in scope["headers"] if name != b"host"] + [
|
||||
(b"host", host_value.encode("latin1"))
|
||||
]
|
||||
|
||||
|
||||
class _XForwardedParser(_BaseForwardedParser):
|
||||
"""Reads the `X-Forwarded-Proto`, `X-Forwarded-For`, `X-Forwarded-Host` family."""
|
||||
|
||||
def apply(self, scope: WWWScope) -> None:
|
||||
if (proto := _last_field(scope, b"x-forwarded-proto")) is not None:
|
||||
self.set_scheme(scope, proto.strip())
|
||||
|
||||
if (forwarded_for := _join_field(scope, b"x-forwarded-for")) is not None:
|
||||
host, port = self.trusted.get_trusted_client_address(forwarded_for)
|
||||
if host:
|
||||
# Empty x-forwarded-for yields an empty host - skip in that case.
|
||||
# See: https://github.com/Kludex/uvicorn/issues/1068
|
||||
scope["client"] = (host, port)
|
||||
|
||||
if (forwarded_host := _last_field(scope, b"x-forwarded-host")) is not None:
|
||||
if (value := forwarded_host.strip()) != "":
|
||||
self.set_host(scope, value)
|
||||
|
||||
|
||||
class _RFC7239Parser(_BaseForwardedParser):
|
||||
"""Reads the standardized `Forwarded` header (RFC 7239)."""
|
||||
|
||||
def apply(self, scope: WWWScope) -> None:
|
||||
if (forwarded := _join_field(scope, b"forwarded")) is None:
|
||||
return
|
||||
|
||||
entry = self.trusted.get_trusted_forwarded_entry(forwarded)
|
||||
if entry is None:
|
||||
return
|
||||
|
||||
if (proto := entry.get("proto")) is not None:
|
||||
self.set_scheme(scope, proto.lower())
|
||||
|
||||
# `for=` is guaranteed present and non-placeholder by `get_trusted_forwarded_entry`.
|
||||
host, port = _parse_host_port(entry["for"])
|
||||
if host:
|
||||
scope["client"] = (host, port)
|
||||
|
||||
if (forwarded_host := entry.get("host")) is not None and forwarded_host != "":
|
||||
self.set_host(scope, forwarded_host)
|
||||
|
||||
|
||||
def _join_field(scope: WWWScope, name: bytes) -> str | None:
|
||||
"""Concatenate all occurrences of a list-valued header field in wire order.
|
||||
|
||||
Per RFC 7230 §3.2.2 and RFC 7239 §7.1, list-valued header fields may appear
|
||||
multiple times in the message and are equivalent to a single field with a
|
||||
comma-joined value. `dict(scope["headers"])` would keep only one
|
||||
occurrence and is unsafe for these fields.
|
||||
"""
|
||||
values = [value.decode("latin1") for n, value in scope["headers"] if n == name]
|
||||
if not values:
|
||||
return None
|
||||
return ", ".join(values)
|
||||
|
||||
|
||||
def _last_field(scope: WWWScope, name: bytes) -> str | None:
|
||||
"""Return the last occurrence of a single-valued header field.
|
||||
|
||||
`X-Forwarded-Proto` and `X-Forwarded-Host` are de-facto single-valued
|
||||
headers; if the message contains repeats, the rightmost is the value
|
||||
appended by the trusted upstream proxy (nginx, Apache, ALB and friends
|
||||
all append rather than prepend). Picking the leftmost would let an
|
||||
attacker preserve a client-supplied value past a proxy. This also
|
||||
matches the prior uvicorn behavior, which used `dict(scope["headers"])`
|
||||
and effectively kept the last value.
|
||||
"""
|
||||
result: str | None = None
|
||||
for n, value in scope["headers"]:
|
||||
if n == name:
|
||||
result = value.decode("latin1")
|
||||
return result
|
||||
|
||||
|
||||
def _is_placeholder_for(value: str) -> bool:
|
||||
"""RFC 7239 placeholder identifiers: `unknown` (case-insensitive) or obfuscated `_*`."""
|
||||
return value.lower() == "unknown" or value.startswith("_")
|
||||
|
||||
|
||||
def _parse_raw_hosts(value: str) -> list[str]:
|
||||
return [item.strip() for item in value.split(",")]
|
||||
@ -64,8 +159,8 @@ def _parse_host_port(value: str) -> tuple[str, int]:
|
||||
"""Parse a forwarded host value into host and optional port.
|
||||
|
||||
Accepts bare IPs, IPv4 `host:port`, and bracketed IPv6 `[host]:port`.
|
||||
Any unrecognized or malformed value is treated conservatively and returned
|
||||
without a port so trust checks do not silently normalize arbitrary input.
|
||||
Any unrecognized or malformed value is returned without a port so trust
|
||||
checks do not silently normalize arbitrary input.
|
||||
"""
|
||||
|
||||
if value.startswith("["):
|
||||
@ -95,6 +190,77 @@ def _parse_host_port(value: str) -> tuple[str, int]:
|
||||
return value, 0
|
||||
|
||||
|
||||
def _parse_forwarded(value: str) -> list[dict[str, str]]:
|
||||
"""Parse an RFC 7239 `Forwarded` header into a list of hop dicts.
|
||||
|
||||
Each entry maps lowercase parameter names (`for`, `host`, `proto`,
|
||||
`by`) to their (unquoted) values. Unrecognized parameters are dropped;
|
||||
malformed pairs are skipped without raising. Per RFC 7239 §4 each
|
||||
parameter must occur at most once per forwarded-element; entries that
|
||||
violate this are discarded entirely to avoid header smuggling.
|
||||
"""
|
||||
|
||||
entries: list[dict[str, str]] = []
|
||||
for raw_entry in _split_outside_quotes(value, ","):
|
||||
params: dict[str, str] = {}
|
||||
duplicate = False
|
||||
for raw_pair in _split_outside_quotes(raw_entry, ";"):
|
||||
name, sep, val = raw_pair.strip().partition("=")
|
||||
if not sep:
|
||||
continue
|
||||
key = name.strip().lower()
|
||||
if key in {"for", "host", "proto", "by"}:
|
||||
if key in params:
|
||||
duplicate = True
|
||||
break
|
||||
params[key] = _unquote(val.strip())
|
||||
if params and not duplicate:
|
||||
entries.append(params)
|
||||
return entries
|
||||
|
||||
|
||||
def _split_outside_quotes(value: str, separator: str) -> list[str]:
|
||||
"""Split `value` on `separator` while respecting RFC 7230 quoted-strings."""
|
||||
parts: list[str] = []
|
||||
current: list[str] = []
|
||||
in_quotes = False
|
||||
i = 0
|
||||
while i < len(value):
|
||||
ch = value[i]
|
||||
if ch == "\\" and in_quotes and i + 1 < len(value):
|
||||
current.append(value[i : i + 2])
|
||||
i += 2
|
||||
continue
|
||||
if ch == '"':
|
||||
in_quotes = not in_quotes
|
||||
current.append(ch)
|
||||
elif ch == separator and not in_quotes:
|
||||
parts.append("".join(current))
|
||||
current = []
|
||||
else:
|
||||
current.append(ch)
|
||||
i += 1
|
||||
parts.append("".join(current))
|
||||
return parts
|
||||
|
||||
|
||||
def _unquote(value: str) -> str:
|
||||
"""Strip surrounding quotes and unescape backslash sequences (RFC 7230)."""
|
||||
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
|
||||
inner = value[1:-1]
|
||||
result: list[str] = []
|
||||
i = 0
|
||||
while i < len(inner):
|
||||
if inner[i] == "\\" and i + 1 < len(inner):
|
||||
result.append(inner[i + 1])
|
||||
i += 2
|
||||
else:
|
||||
result.append(inner[i])
|
||||
i += 1
|
||||
return "".join(result)
|
||||
return value
|
||||
|
||||
|
||||
class _TrustedHosts:
|
||||
"""Container for trusted hosts and networks"""
|
||||
|
||||
@ -172,3 +338,35 @@ class _TrustedHosts:
|
||||
# All hosts are trusted meaning that the client was also a trusted proxy
|
||||
# See https://github.com/Kludex/uvicorn/issues/1068#issuecomment-855371576
|
||||
return _parse_host_port(x_forwarded_for_hosts[0])
|
||||
|
||||
def get_trusted_forwarded_entry(self, forwarded: str) -> dict[str, str] | None:
|
||||
"""Extract the trusted hop entry from an RFC 7239 `Forwarded` header.
|
||||
|
||||
Mirrors `get_trusted_client_address`: walk hops from right and
|
||||
return the first one whose `for=` is set, anchorable, and not in
|
||||
the trust set. Entries with no `for=` or with placeholder values
|
||||
(`unknown`, `_obfuscated`) cannot establish the client hop and
|
||||
are filtered out before selection - returning one would attach a
|
||||
stale `host=`/`proto=` to the request without a verifiable client.
|
||||
Returns `None` when no anchorable hop exists.
|
||||
"""
|
||||
|
||||
anchorable = [
|
||||
entry
|
||||
for entry in _parse_forwarded(forwarded)
|
||||
if (for_ := entry.get("for")) is not None and not _is_placeholder_for(for_)
|
||||
]
|
||||
if not anchorable:
|
||||
return None
|
||||
|
||||
if self.always_trust:
|
||||
return anchorable[0]
|
||||
|
||||
for entry in reversed(anchorable):
|
||||
host, _ = _parse_host_port(entry["for"])
|
||||
if host not in self:
|
||||
return entry
|
||||
|
||||
# All anchorable hops were trusted - fall back to the leftmost
|
||||
# (original client, mirroring `get_trusted_client_address`).
|
||||
return anchorable[0]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user