Compare commits
2 Commits
main
...
forwarded-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fec7b4cc4a | ||
|
|
7c2e669a66 |
@ -260,13 +260,21 @@ The factory is called inside each worker process, so it works with `--reload` an
|
||||
When running an application behind one or more proxies, certain information about the request is lost.
|
||||
To avoid this most proxies will add headers containing this information for downstream servers to read.
|
||||
|
||||
Uvicorn currently supports the following headers:
|
||||
Uvicorn supports two header families, selected via `--proxy-headers <x-forwarded|forwarded>`:
|
||||
|
||||
- `X-Forwarded-For` ([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For))
|
||||
- `X-Forwarded-Proto`([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-Proto))
|
||||
`x-forwarded` (default):
|
||||
|
||||
Uvicorn can use these headers to correctly set the client and protocol in the request.
|
||||
However as anyone can set these headers you must configure which "clients" you will trust to have set them correctly.
|
||||
- `X-Forwarded-For` ([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For)) - sets `scope["client"]`.
|
||||
- `X-Forwarded-Proto` ([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-Proto)) - sets `scope["scheme"]`.
|
||||
- `X-Forwarded-Host` ([MDN Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-Host)) - rewrites the `Host` request header.
|
||||
|
||||
`forwarded`:
|
||||
|
||||
- `Forwarded` ([RFC 7239](https://datatracker.ietf.org/doc/html/rfc7239)) - the standardized header that combines all of the above into a single value with `for=`, `proto=`, and `host=` parameters.
|
||||
|
||||
The two modes are mutually exclusive: when one is selected, the other family is ignored entirely. This is intentional - silent fallback would let a client set whichever family the proxy is not configured for. Choose the mode that matches what your upstream proxy emits.
|
||||
|
||||
However, as anyone can set these headers you must configure which "clients" you will trust to have set them correctly.
|
||||
|
||||
Uvicorn can be configured to trust IP Addresses (e.g. `127.0.0.1`), IP Networks (e.g. `10.100.0.0/16`),
|
||||
or Literals (e.g. `/path/to/socket.sock`). When running from CLI these are configured using `--forwarded-allow-ips`.
|
||||
|
||||
@ -115,7 +115,7 @@ Note that WSGI mode always disables WebSocket support, as it is not supported by
|
||||
## HTTP
|
||||
|
||||
* `--root-path <str>` - Set the ASGI `root_path` for applications submounted below a given URL path. **Default:** *""*.
|
||||
* `--proxy-headers / --no-proxy-headers` - Enable/Disable X-Forwarded-Proto, X-Forwarded-For to populate remote address info. Defaults to enabled, but is restricted to only trusting connecting IPs in the `forwarded-allow-ips` configuration.
|
||||
* `--proxy-headers <x-forwarded|forwarded>` - Which forwarding-header family to read to populate remote address info, scheme, and host. `x-forwarded` (default) reads `X-Forwarded-Proto`, `X-Forwarded-For`, and `X-Forwarded-Host`. `forwarded` reads the standardized `Forwarded` header from [RFC 7239](https://datatracker.ietf.org/doc/html/rfc7239). Modes are mutually exclusive: when one is selected, the other family is ignored entirely. Bare `--proxy-headers` means `x-forwarded` (matching the prior boolean default). Use `--no-proxy-headers` to disable. Header parsing is restricted to only trusting connecting IPs in the `forwarded-allow-ips` configuration.
|
||||
* `--forwarded-allow-ips <comma-separated-list>` - Comma separated list of IP Addresses, IP Networks, or literals (e.g. UNIX Socket path) to trust with proxy headers. Defaults to the `$FORWARDED_ALLOW_IPS` environment variable if available, or '127.0.0.1'. The literal `'*'` means trust everything.
|
||||
* `--server-header / --no-server-header` - Enable/Disable default `Server` header. **Default:** *True*.
|
||||
* `--date-header / --no-date-header` - Enable/Disable default `Date` header. **Default:** *True*.
|
||||
|
||||
@ -23,6 +23,7 @@ if TYPE_CHECKING:
|
||||
|
||||
X_FORWARDED_FOR = "X-Forwarded-For"
|
||||
X_FORWARDED_PROTO = "X-Forwarded-Proto"
|
||||
FORWARDED = "Forwarded"
|
||||
|
||||
|
||||
async def default_app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
@ -554,3 +555,330 @@ async def test_proxy_headers_empty_x_forwarded_for() -> None:
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert response.text == "https://127.0.0.1:123"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RFC 7239 `Forwarded` header (mode="forwarded")
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def forwarded_app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
"""Echoes scheme, client, and host header as space-separated `key=value` pairs."""
|
||||
headers = dict(scope["headers"]) # type: ignore
|
||||
host_header = headers.get(b"host", b"").decode("latin1")
|
||||
client = scope["client"] # type: ignore
|
||||
scheme = scope["scheme"] # type: ignore
|
||||
assert client is not None
|
||||
body = f"scheme={scheme} client={client[0]}:{client[1]} host={host_header}"
|
||||
await Response(body, media_type="text/plain")(scope, receive, send)
|
||||
|
||||
|
||||
def parse_echo(text: str) -> dict[str, str]:
|
||||
"""Parse `forwarded_app`'s `key=value key=value ...` echo into a dict."""
|
||||
return dict(part.split("=", 1) for part in text.split(" "))
|
||||
|
||||
|
||||
def make_forwarded_client(
|
||||
trusted_hosts: str | list[str],
|
||||
client: tuple[str, int] = ("127.0.0.1", 123),
|
||||
) -> httpx.AsyncClient:
|
||||
"""httpx client wired to a `mode='forwarded'` middleware over `forwarded_app`."""
|
||||
app = ProxyHeadersMiddleware(forwarded_app, trusted_hosts, mode="forwarded")
|
||||
transport = httpx.ASGITransport(app=app, client=client) # type: ignore
|
||||
return httpx.AsyncClient(transport=transport, base_url="http://testserver")
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_multiple_hops_picks_rightmost() -> None:
|
||||
"""Walks from the right; first hop whose `for=` is not trusted is the client."""
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4, for=10.0.0.5;proto=http, for=10.0.0.6"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["client"] == "1.2.3.4:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_falls_back_to_leftmost_when_all_trusted() -> None:
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = {FORWARDED: "for=10.0.0.1, for=10.0.0.2"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["client"] == "10.0.0.1:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_quoted_string_escape_is_unescaped() -> None:
|
||||
"""RFC 7230 quoted-string: `\\"` inside quotes decodes to a literal `"`."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: r'for="1.2.3.4";host="weird\"name"'}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["client"] == "1.2.3.4:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_case_insensitive_param_keys() -> None:
|
||||
"""RFC 7239 §4: parameter names are case-insensitive."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "For=1.2.3.4;PROTO=https;Host=public.example"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] == "public.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_unknown_params_ignored() -> None:
|
||||
"""Unrecognized parameters (here, `secret`) must not affect scope mutation."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;secret=hunter2;proto=https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_empty_header_does_not_mutate_scope() -> None:
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: ""}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# Untouched scope: peer client is the httpx transport peer, scheme stays http.
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "127.0.0.1:123"
|
||||
assert echo["scheme"] == "http"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_malformed_pair_in_entry_is_dropped_but_entry_survives() -> None:
|
||||
"""Pairs without `=` are skipped, other params on the same hop survive."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;novalue;proto=https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_duplicate_param_drops_entry() -> None:
|
||||
"""RFC 7239 §4: each parameter appears at most once per element. Smuggling defense -
|
||||
a single hop with two `for=` values must be rejected so an attacker cannot prepend
|
||||
a fake value that some downstream parser might pick over the real one."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=attacker.example;for=1.2.3.4"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# Entire entry is dropped; scope falls through to the peer client.
|
||||
assert parse_echo(response.text)["client"] == "127.0.0.1:123"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_duplicate_param_only_drops_offending_entry() -> None:
|
||||
"""A poisoned entry must not contaminate a later, well-formed entry in the same header."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=attacker.example;for=1.2.3.4, for=5.6.7.8"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["client"] == "5.6.7.8:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_missing_for_is_unanchorable() -> None:
|
||||
"""An entry without `for=` cannot identify a client and must be skipped entirely."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "host=attacker.example;proto=https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# `host=`/`proto=` from an unanchorable entry must NOT apply to the request.
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["host"] != "attacker.example"
|
||||
assert echo["scheme"] == "http"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_placeholder_for_filtered_under_always_trust() -> None:
|
||||
"""With `always_trust=*`, an unanchorable leftmost entry must not be the fallback."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=unknown;proto=https;host=attacker.example, for=1.2.3.4"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@pytest.mark.parametrize("placeholder", ["unknown", "Unknown", "_hidden"])
|
||||
async def test_forwarded_mode_placeholder_for_variants_are_unanchorable(placeholder: str) -> None:
|
||||
"""`unknown` (case-insensitive) and `_*` obfuscated identifiers cannot anchor a hop."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: f"for={placeholder};host=attacker.example;proto=https"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["host"] != "attacker.example"
|
||||
assert echo["scheme"] == "http"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_basic() -> None:
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;proto=https;host=public.example:9000"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] == "public.example:9000"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_untrusted_peer_ignored() -> None:
|
||||
async with make_forwarded_client("192.168.0.1") as client:
|
||||
headers = {FORWARDED: "for=attacker.example;proto=https;host=attacker.example"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] != "attacker.example:0"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_ipv6_quoted() -> None:
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: 'for="[2001:db8::1]:443";proto=https;host="public.example"'}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["client"] == "2001:db8::1:443"
|
||||
assert echo["host"] == "public.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_chained_proxy_picks_rightmost_untrusted() -> None:
|
||||
"""The connecting peer (10.0.0.5) and the next-to-last hop are both trusted; client = 1.2.3.4."""
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;proto=https, for=10.0.0.5;proto=http;host=internal.lan"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# The rightmost untrusted hop is `for=1.2.3.4`, which carries `proto=https` only.
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["host"] != "internal.lan"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_placeholder_for_falls_through() -> None:
|
||||
"""`for=unknown` is unanchorable; the next anchorable hop wins."""
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = {FORWARDED: "for=unknown;proto=https;host=attacker.example, for=1.2.3.4;proto=http"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_proto_case_insensitive() -> None:
|
||||
"""RFC 3986 - URI schemes are case-insensitive. `proto=HTTPS` must work."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;proto=HTTPS;host=public.example"}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
assert parse_echo(response.text)["scheme"] == "https"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_ignores_x_forwarded_headers() -> None:
|
||||
"""When mode='forwarded', the X-Forwarded-* family must be completely ignored."""
|
||||
async with make_forwarded_client("*") as client:
|
||||
headers = {
|
||||
FORWARDED: "for=1.2.3.4;proto=https",
|
||||
X_FORWARDED_FOR: "9.9.9.9",
|
||||
X_FORWARDED_PROTO: "http",
|
||||
"X-Forwarded-Host": "attacker.example",
|
||||
}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_x_forwarded_mode_ignores_forwarded_header() -> None:
|
||||
"""And vice versa: when mode='x-forwarded' (default), `Forwarded` is ignored."""
|
||||
app = ProxyHeadersMiddleware(forwarded_app, trusted_hosts="*", mode="x-forwarded")
|
||||
transport = httpx.ASGITransport(app=app, client=("127.0.0.1", 123)) # type: ignore
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
headers = {
|
||||
FORWARDED: "for=attacker.example;proto=https;host=attacker.example",
|
||||
X_FORWARDED_FOR: "1.2.3.4",
|
||||
X_FORWARDED_PROTO: "https",
|
||||
}
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] != "attacker.example"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_via_config(unused_tcp_port: int) -> None:
|
||||
"""Programmatic API: `Config(proxy_headers_mode='forwarded')` wires the right mode."""
|
||||
config = Config(
|
||||
app=forwarded_app,
|
||||
loop="asyncio",
|
||||
limit_max_requests=1,
|
||||
proxy_headers_mode="forwarded",
|
||||
forwarded_allow_ips="*",
|
||||
port=unused_tcp_port,
|
||||
)
|
||||
async with run_server(config):
|
||||
async with httpx.AsyncClient() as client:
|
||||
headers = {FORWARDED: "for=1.2.3.4;proto=https;host=public.example:9000"}
|
||||
response = await client.get(f"http://127.0.0.1:{unused_tcp_port}", headers=headers)
|
||||
assert response.status_code == 200
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["scheme"] == "https"
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["host"] == "public.example:9000"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_x_forwarded_proto_last_wins_on_duplicates() -> None:
|
||||
"""X-Forwarded-Proto is single-valued; if duplicated, take the rightmost (the value
|
||||
appended by the trusted upstream proxy, not a client-supplied earlier copy)."""
|
||||
async with make_httpx_client("*") as client:
|
||||
# httpx.Headers preserves multiple values for the same name in order.
|
||||
headers = httpx.Headers(
|
||||
[(X_FORWARDED_PROTO, "http"), (X_FORWARDED_PROTO, "https"), (X_FORWARDED_FOR, "1.2.3.4")]
|
||||
)
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# The default_app echoes scheme://client; last X-Forwarded-Proto value (https) wins.
|
||||
assert response.text == "https://1.2.3.4:0"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_forwarded_mode_joins_multiple_forwarded_headers() -> None:
|
||||
"""Multiple `Forwarded` headers are list-equivalent per RFC 7230 §3.2.2."""
|
||||
async with make_forwarded_client("10.0.0.0/8", client=("10.0.0.5", 1234)) as client:
|
||||
headers = httpx.Headers([(FORWARDED, "for=1.2.3.4;proto=https"), (FORWARDED, "for=10.0.0.5")])
|
||||
response = await client.get("/", headers=headers)
|
||||
assert response.status_code == 200
|
||||
# Right-most untrusted in the joined list is `for=1.2.3.4` (10.0.0.5 is trusted).
|
||||
echo = parse_echo(response.text)
|
||||
assert echo["client"] == "1.2.3.4:0"
|
||||
assert echo["scheme"] == "https"
|
||||
|
||||
@ -205,6 +205,7 @@ class Config:
|
||||
reload_excludes: list[str] | str | None = None,
|
||||
workers: int | None = None,
|
||||
proxy_headers: bool = True,
|
||||
proxy_headers_mode: Literal["x-forwarded", "forwarded"] = "x-forwarded",
|
||||
server_header: bool = True,
|
||||
date_header: bool = True,
|
||||
forwarded_allow_ips: list[str] | str | None = None,
|
||||
@ -254,6 +255,7 @@ class Config:
|
||||
self.reload_delay = reload_delay
|
||||
self.workers = workers or 1
|
||||
self.proxy_headers = proxy_headers
|
||||
self.proxy_headers_mode = proxy_headers_mode
|
||||
self.server_header = server_header
|
||||
self.date_header = date_header
|
||||
self.root_path = root_path
|
||||
@ -511,7 +513,11 @@ class Config:
|
||||
if logger.getEffectiveLevel() <= TRACE_LOG_LEVEL:
|
||||
self.loaded_app = MessageLoggerMiddleware(self.loaded_app)
|
||||
if self.proxy_headers:
|
||||
self.loaded_app = ProxyHeadersMiddleware(self.loaded_app, trusted_hosts=self.forwarded_allow_ips)
|
||||
self.loaded_app = ProxyHeadersMiddleware(
|
||||
self.loaded_app,
|
||||
trusted_hosts=self.forwarded_allow_ips,
|
||||
mode=self.proxy_headers_mode,
|
||||
)
|
||||
|
||||
self.loaded = True
|
||||
|
||||
|
||||
@ -9,7 +9,7 @@ import sys
|
||||
import warnings
|
||||
from collections.abc import Callable
|
||||
from configparser import RawConfigParser
|
||||
from typing import IO, Any, get_args
|
||||
from typing import IO, Any, Literal, get_args
|
||||
|
||||
import click
|
||||
|
||||
@ -223,10 +223,23 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No
|
||||
help="Enable/Disable colorized logging.",
|
||||
)
|
||||
@click.option(
|
||||
"--proxy-headers/--no-proxy-headers",
|
||||
"--proxy-headers",
|
||||
type=click.Choice(["x-forwarded", "forwarded"]),
|
||||
is_flag=False,
|
||||
flag_value="x-forwarded",
|
||||
default="x-forwarded",
|
||||
show_default=True,
|
||||
help="Which forwarding-header family to read. `x-forwarded` (default) reads "
|
||||
"X-Forwarded-Proto / -For / -Host. `forwarded` reads the RFC 7239 `Forwarded` header. "
|
||||
"Bare `--proxy-headers` means `x-forwarded` (matches the prior boolean default). "
|
||||
"Use `--no-proxy-headers` to disable.",
|
||||
)
|
||||
@click.option(
|
||||
"--no-proxy-headers",
|
||||
"no_proxy_headers",
|
||||
is_flag=True,
|
||||
default=True,
|
||||
help="Enable/Disable X-Forwarded-Proto, X-Forwarded-For to populate url scheme and remote address info.",
|
||||
default=False,
|
||||
help="Disable proxy header parsing.",
|
||||
)
|
||||
@click.option(
|
||||
"--server-header/--no-server-header",
|
||||
@ -412,7 +425,8 @@ def main(
|
||||
log_config: str,
|
||||
log_level: str,
|
||||
access_log: bool,
|
||||
proxy_headers: bool,
|
||||
proxy_headers: Literal["x-forwarded", "forwarded"],
|
||||
no_proxy_headers: bool,
|
||||
server_header: bool,
|
||||
date_header: bool,
|
||||
forwarded_allow_ips: str,
|
||||
@ -438,6 +452,8 @@ def main(
|
||||
reset_contextvars: bool,
|
||||
factory: bool,
|
||||
) -> None:
|
||||
proxy_headers_enabled = not no_proxy_headers
|
||||
proxy_headers_mode = proxy_headers
|
||||
run(
|
||||
app,
|
||||
host=host,
|
||||
@ -464,7 +480,8 @@ def main(
|
||||
reload_excludes=reload_excludes or None,
|
||||
reload_delay=reload_delay,
|
||||
workers=workers,
|
||||
proxy_headers=proxy_headers,
|
||||
proxy_headers=proxy_headers_enabled,
|
||||
proxy_headers_mode=proxy_headers_mode,
|
||||
server_header=server_header,
|
||||
date_header=date_header,
|
||||
forwarded_allow_ips=forwarded_allow_ips,
|
||||
@ -520,6 +537,7 @@ def run(
|
||||
log_level: str | int | None = None,
|
||||
access_log: bool = True,
|
||||
proxy_headers: bool = True,
|
||||
proxy_headers_mode: Literal["x-forwarded", "forwarded"] = "x-forwarded",
|
||||
server_header: bool = True,
|
||||
date_header: bool = True,
|
||||
forwarded_allow_ips: list[str] | str | None = None,
|
||||
@ -576,6 +594,7 @@ def run(
|
||||
log_level=log_level,
|
||||
access_log=access_log,
|
||||
proxy_headers=proxy_headers,
|
||||
proxy_headers_mode=proxy_headers_mode,
|
||||
server_header=server_header,
|
||||
date_header=date_header,
|
||||
forwarded_allow_ips=forwarded_allow_ips,
|
||||
|
||||
@ -1,60 +1,155 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
from typing import Literal
|
||||
|
||||
from uvicorn._types import ASGI3Application, ASGIReceiveCallable, ASGISendCallable, Scope
|
||||
from uvicorn._types import ASGI3Application, ASGIReceiveCallable, ASGISendCallable, Scope, WWWScope
|
||||
|
||||
|
||||
class ProxyHeadersMiddleware:
|
||||
"""Middleware for handling known proxy headers
|
||||
"""Maps proxy-supplied forwarding headers onto the ASGI scope.
|
||||
|
||||
This middleware can be used when a known proxy is fronting the application,
|
||||
and is trusted to be properly setting the `X-Forwarded-Proto` and
|
||||
`X-Forwarded-For` headers with the connecting client information.
|
||||
Selects a parser at construction based on `mode`:
|
||||
|
||||
Modifies the `client` and `scheme` information so that they reference
|
||||
the connecting client, rather that the connecting proxy.
|
||||
* `"x-forwarded"` (default) - reads `X-Forwarded-Proto`, `X-Forwarded-For`,
|
||||
and `X-Forwarded-Host`.
|
||||
* `"forwarded"` - reads the standardized `Forwarded` header (RFC 7239).
|
||||
|
||||
References:
|
||||
- <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers#Proxies>
|
||||
- <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For>
|
||||
Modes are mutually exclusive: when one is selected, the other family is
|
||||
ignored entirely. Silent fallback would let an attacker set whichever
|
||||
family the proxy is not configured for.
|
||||
"""
|
||||
|
||||
def __init__(self, app: ASGI3Application, trusted_hosts: list[str] | str = "127.0.0.1") -> None:
|
||||
self.app = app
|
||||
self.trusted_hosts = _TrustedHosts(trusted_hosts)
|
||||
def __init__(
|
||||
self,
|
||||
app: ASGI3Application,
|
||||
trusted_hosts: list[str] | str = "127.0.0.1",
|
||||
mode: Literal["x-forwarded", "forwarded"] = "x-forwarded",
|
||||
) -> None:
|
||||
trusted = _TrustedHosts(trusted_hosts)
|
||||
self._inner = (_XForwardedParser if mode == "x-forwarded" else _RFC7239Parser)(app, trusted)
|
||||
|
||||
async def __call__(self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
if scope["type"] == "lifespan":
|
||||
return await self.app(scope, receive, send)
|
||||
return await self._inner(scope, receive, send)
|
||||
|
||||
client_addr = scope.get("client")
|
||||
client_host = client_addr[0] if client_addr else None
|
||||
|
||||
if client_host in self.trusted_hosts:
|
||||
headers = dict(scope["headers"])
|
||||
class _BaseForwardedParser:
|
||||
"""Base parser: handles trust gating and shared scope rewriting.
|
||||
|
||||
if b"x-forwarded-proto" in headers:
|
||||
x_forwarded_proto = headers[b"x-forwarded-proto"].decode("latin1").strip()
|
||||
Subclasses implement `apply`, which is invoked only when the connecting peer is in the trust set.
|
||||
"""
|
||||
|
||||
if x_forwarded_proto in {"http", "https", "ws", "wss"}:
|
||||
if scope["type"] == "websocket":
|
||||
scope["scheme"] = x_forwarded_proto.replace("http", "ws")
|
||||
else:
|
||||
scope["scheme"] = x_forwarded_proto
|
||||
|
||||
if b"x-forwarded-for" in headers:
|
||||
x_forwarded_for = headers[b"x-forwarded-for"].decode("latin1")
|
||||
host, port = self.trusted_hosts.get_trusted_client_address(x_forwarded_for)
|
||||
|
||||
if host:
|
||||
# If the x-forwarded-for header is empty then host is an empty string.
|
||||
# Only set the client if we actually got something usable.
|
||||
# See: https://github.com/Kludex/uvicorn/issues/1068
|
||||
scope["client"] = (host, port)
|
||||
def __init__(self, app: ASGI3Application, trusted: _TrustedHosts) -> None:
|
||||
self.app = app
|
||||
self.trusted = trusted
|
||||
|
||||
async def __call__(self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
|
||||
if scope["type"] != "lifespan":
|
||||
client_addr = scope.get("client")
|
||||
client_host = client_addr[0] if client_addr else None
|
||||
if client_host in self.trusted:
|
||||
self.apply(scope)
|
||||
return await self.app(scope, receive, send)
|
||||
|
||||
def apply(self, scope: WWWScope) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def set_scheme(scope: WWWScope, proto: str) -> None:
|
||||
if proto not in {"http", "https", "ws", "wss"}:
|
||||
return
|
||||
if scope["type"] == "websocket":
|
||||
scope["scheme"] = proto.replace("http", "ws")
|
||||
else:
|
||||
scope["scheme"] = proto
|
||||
|
||||
@staticmethod
|
||||
def set_host(scope: WWWScope, host_value: str) -> None:
|
||||
# Rewrite only the `Host` header. Per ASGI, `scope["server"]` is the local bind
|
||||
# address, not the client-perceived host, so we leave it untouched.
|
||||
scope["headers"] = [(name, value) for name, value in scope["headers"] if name != b"host"] + [
|
||||
(b"host", host_value.encode("latin1"))
|
||||
]
|
||||
|
||||
|
||||
class _XForwardedParser(_BaseForwardedParser):
|
||||
"""Reads the `X-Forwarded-Proto`, `X-Forwarded-For`, `X-Forwarded-Host` family."""
|
||||
|
||||
def apply(self, scope: WWWScope) -> None:
|
||||
if (proto := _last_field(scope, b"x-forwarded-proto")) is not None:
|
||||
self.set_scheme(scope, proto.strip())
|
||||
|
||||
if (forwarded_for := _join_field(scope, b"x-forwarded-for")) is not None:
|
||||
host, port = self.trusted.get_trusted_client_address(forwarded_for)
|
||||
if host:
|
||||
# Empty x-forwarded-for yields an empty host - skip in that case.
|
||||
# See: https://github.com/Kludex/uvicorn/issues/1068
|
||||
scope["client"] = (host, port)
|
||||
|
||||
if (forwarded_host := _last_field(scope, b"x-forwarded-host")) is not None:
|
||||
if (value := forwarded_host.strip()) != "":
|
||||
self.set_host(scope, value)
|
||||
|
||||
|
||||
class _RFC7239Parser(_BaseForwardedParser):
|
||||
"""Reads the standardized `Forwarded` header (RFC 7239)."""
|
||||
|
||||
def apply(self, scope: WWWScope) -> None:
|
||||
if (forwarded := _join_field(scope, b"forwarded")) is None:
|
||||
return
|
||||
|
||||
entry = self.trusted.get_trusted_forwarded_entry(forwarded)
|
||||
if entry is None:
|
||||
return
|
||||
|
||||
if (proto := entry.get("proto")) is not None:
|
||||
self.set_scheme(scope, proto.lower())
|
||||
|
||||
# `for=` is guaranteed present and non-placeholder by `get_trusted_forwarded_entry`.
|
||||
host, port = _parse_host_port(entry["for"])
|
||||
if host:
|
||||
scope["client"] = (host, port)
|
||||
|
||||
if (forwarded_host := entry.get("host")) is not None and forwarded_host != "":
|
||||
self.set_host(scope, forwarded_host)
|
||||
|
||||
|
||||
def _join_field(scope: WWWScope, name: bytes) -> str | None:
|
||||
"""Concatenate all occurrences of a list-valued header field in wire order.
|
||||
|
||||
Per RFC 7230 §3.2.2 and RFC 7239 §7.1, list-valued header fields may appear
|
||||
multiple times in the message and are equivalent to a single field with a
|
||||
comma-joined value. `dict(scope["headers"])` would keep only one
|
||||
occurrence and is unsafe for these fields.
|
||||
"""
|
||||
values = [value.decode("latin1") for n, value in scope["headers"] if n == name]
|
||||
if not values:
|
||||
return None
|
||||
return ", ".join(values)
|
||||
|
||||
|
||||
def _last_field(scope: WWWScope, name: bytes) -> str | None:
|
||||
"""Return the last occurrence of a single-valued header field.
|
||||
|
||||
`X-Forwarded-Proto` and `X-Forwarded-Host` are de-facto single-valued
|
||||
headers; if the message contains repeats, the rightmost is the value
|
||||
appended by the trusted upstream proxy (nginx, Apache, ALB and friends
|
||||
all append rather than prepend). Picking the leftmost would let an
|
||||
attacker preserve a client-supplied value past a proxy. This also
|
||||
matches the prior uvicorn behavior, which used `dict(scope["headers"])`
|
||||
and effectively kept the last value.
|
||||
"""
|
||||
result: str | None = None
|
||||
for n, value in scope["headers"]:
|
||||
if n == name:
|
||||
result = value.decode("latin1")
|
||||
return result
|
||||
|
||||
|
||||
def _is_placeholder_for(value: str) -> bool:
|
||||
"""RFC 7239 placeholder identifiers: `unknown` (case-insensitive) or obfuscated `_*`."""
|
||||
return value.lower() == "unknown" or value.startswith("_")
|
||||
|
||||
|
||||
def _parse_raw_hosts(value: str) -> list[str]:
|
||||
return [item.strip() for item in value.split(",")]
|
||||
@ -64,8 +159,8 @@ def _parse_host_port(value: str) -> tuple[str, int]:
|
||||
"""Parse a forwarded host value into host and optional port.
|
||||
|
||||
Accepts bare IPs, IPv4 `host:port`, and bracketed IPv6 `[host]:port`.
|
||||
Any unrecognized or malformed value is treated conservatively and returned
|
||||
without a port so trust checks do not silently normalize arbitrary input.
|
||||
Any unrecognized or malformed value is returned without a port so trust
|
||||
checks do not silently normalize arbitrary input.
|
||||
"""
|
||||
|
||||
if value.startswith("["):
|
||||
@ -95,6 +190,77 @@ def _parse_host_port(value: str) -> tuple[str, int]:
|
||||
return value, 0
|
||||
|
||||
|
||||
def _parse_forwarded(value: str) -> list[dict[str, str]]:
|
||||
"""Parse an RFC 7239 `Forwarded` header into a list of hop dicts.
|
||||
|
||||
Each entry maps lowercase parameter names (`for`, `host`, `proto`,
|
||||
`by`) to their (unquoted) values. Unrecognized parameters are dropped;
|
||||
malformed pairs are skipped without raising. Per RFC 7239 §4 each
|
||||
parameter must occur at most once per forwarded-element; entries that
|
||||
violate this are discarded entirely to avoid header smuggling.
|
||||
"""
|
||||
|
||||
entries: list[dict[str, str]] = []
|
||||
for raw_entry in _split_outside_quotes(value, ","):
|
||||
params: dict[str, str] = {}
|
||||
duplicate = False
|
||||
for raw_pair in _split_outside_quotes(raw_entry, ";"):
|
||||
name, sep, val = raw_pair.strip().partition("=")
|
||||
if not sep:
|
||||
continue
|
||||
key = name.strip().lower()
|
||||
if key in {"for", "host", "proto", "by"}:
|
||||
if key in params:
|
||||
duplicate = True
|
||||
break
|
||||
params[key] = _unquote(val.strip())
|
||||
if params and not duplicate:
|
||||
entries.append(params)
|
||||
return entries
|
||||
|
||||
|
||||
def _split_outside_quotes(value: str, separator: str) -> list[str]:
|
||||
"""Split `value` on `separator` while respecting RFC 7230 quoted-strings."""
|
||||
parts: list[str] = []
|
||||
current: list[str] = []
|
||||
in_quotes = False
|
||||
i = 0
|
||||
while i < len(value):
|
||||
ch = value[i]
|
||||
if ch == "\\" and in_quotes and i + 1 < len(value):
|
||||
current.append(value[i : i + 2])
|
||||
i += 2
|
||||
continue
|
||||
if ch == '"':
|
||||
in_quotes = not in_quotes
|
||||
current.append(ch)
|
||||
elif ch == separator and not in_quotes:
|
||||
parts.append("".join(current))
|
||||
current = []
|
||||
else:
|
||||
current.append(ch)
|
||||
i += 1
|
||||
parts.append("".join(current))
|
||||
return parts
|
||||
|
||||
|
||||
def _unquote(value: str) -> str:
|
||||
"""Strip surrounding quotes and unescape backslash sequences (RFC 7230)."""
|
||||
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
|
||||
inner = value[1:-1]
|
||||
result: list[str] = []
|
||||
i = 0
|
||||
while i < len(inner):
|
||||
if inner[i] == "\\" and i + 1 < len(inner):
|
||||
result.append(inner[i + 1])
|
||||
i += 2
|
||||
else:
|
||||
result.append(inner[i])
|
||||
i += 1
|
||||
return "".join(result)
|
||||
return value
|
||||
|
||||
|
||||
class _TrustedHosts:
|
||||
"""Container for trusted hosts and networks"""
|
||||
|
||||
@ -172,3 +338,35 @@ class _TrustedHosts:
|
||||
# All hosts are trusted meaning that the client was also a trusted proxy
|
||||
# See https://github.com/Kludex/uvicorn/issues/1068#issuecomment-855371576
|
||||
return _parse_host_port(x_forwarded_for_hosts[0])
|
||||
|
||||
def get_trusted_forwarded_entry(self, forwarded: str) -> dict[str, str] | None:
|
||||
"""Extract the trusted hop entry from an RFC 7239 `Forwarded` header.
|
||||
|
||||
Mirrors `get_trusted_client_address`: walk hops from right and
|
||||
return the first one whose `for=` is set, anchorable, and not in
|
||||
the trust set. Entries with no `for=` or with placeholder values
|
||||
(`unknown`, `_obfuscated`) cannot establish the client hop and
|
||||
are filtered out before selection - returning one would attach a
|
||||
stale `host=`/`proto=` to the request without a verifiable client.
|
||||
Returns `None` when no anchorable hop exists.
|
||||
"""
|
||||
|
||||
anchorable = [
|
||||
entry
|
||||
for entry in _parse_forwarded(forwarded)
|
||||
if (for_ := entry.get("for")) is not None and not _is_placeholder_for(for_)
|
||||
]
|
||||
if not anchorable:
|
||||
return None
|
||||
|
||||
if self.always_trust:
|
||||
return anchorable[0]
|
||||
|
||||
for entry in reversed(anchorable):
|
||||
host, _ = _parse_host_port(entry["for"])
|
||||
if host not in self:
|
||||
return entry
|
||||
|
||||
# All anchorable hops were trusted - fall back to the leftmost
|
||||
# (original client, mirroring `get_trusted_client_address`).
|
||||
return anchorable[0]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user