""" Integration tests for authentication. Unit tests for auth classes also exist in tests/test_auth.py """ import hashlib import netrc import os import sys import threading import typing from urllib.request import parse_keqv_list import anyio import pytest import httpx from ..common import FIXTURES_DIR class App: """ A mock app to test auth credentials. """ def __init__(self, auth_header: str = "", status_code: int = 200) -> None: self.auth_header = auth_header self.status_code = status_code def __call__(self, request: httpx.Request) -> httpx.Response: headers = {"www-authenticate": self.auth_header} if self.auth_header else {} data = {"auth": request.headers.get("Authorization")} return httpx.Response(self.status_code, headers=headers, json=data) class DigestApp: def __init__( self, algorithm: str = "SHA-256", send_response_after_attempt: int = 1, qop: str = "auth", regenerate_nonce: bool = True, ) -> None: self.algorithm = algorithm self.send_response_after_attempt = send_response_after_attempt self.qop = qop self._regenerate_nonce = regenerate_nonce self._response_count = 0 def __call__(self, request: httpx.Request) -> httpx.Response: if self._response_count < self.send_response_after_attempt: return self.challenge_send(request) data = {"auth": request.headers.get("Authorization")} return httpx.Response(200, json=data) def challenge_send(self, request: httpx.Request) -> httpx.Response: self._response_count += 1 nonce = ( hashlib.sha256(os.urandom(8)).hexdigest() if self._regenerate_nonce else "ee96edced2a0b43e4869e96ebe27563f369c1205a049d06419bb51d8aeddf3d3" ) challenge_data = { "nonce": nonce, "qop": self.qop, "opaque": ( "ee6378f3ee14ebfd2fff54b70a91a7c9390518047f242ab2271380db0e14bda1" ), "algorithm": self.algorithm, "stale": "FALSE", } challenge_str = ", ".join( '{}="{}"'.format(key, value) for key, value in challenge_data.items() if value ) headers = { "www-authenticate": f'Digest realm="httpx@example.org", {challenge_str}', } return httpx.Response(401, headers=headers) class RepeatAuth(httpx.Auth): """ A mock authentication scheme that requires clients to send the request a fixed number of times, and then send a last request containing an aggregation of nonces that the server sent in 'WWW-Authenticate' headers of intermediate responses. """ requires_request_body = True def __init__(self, repeat: int) -> None: self.repeat = repeat def auth_flow( self, request: httpx.Request ) -> typing.Generator[httpx.Request, httpx.Response, None]: nonces = [] for index in range(self.repeat): request.headers["Authorization"] = f"Repeat {index}" response = yield request nonces.append(response.headers["www-authenticate"]) key = ".".join(nonces) request.headers["Authorization"] = f"Repeat {key}" yield request class ResponseBodyAuth(httpx.Auth): """ A mock authentication scheme that requires clients to send an 'Authorization' header, then send back the contents of the response in the 'Authorization' header. """ requires_response_body = True def __init__(self, token: str) -> None: self.token = token def auth_flow( self, request: httpx.Request ) -> typing.Generator[httpx.Request, httpx.Response, None]: request.headers["Authorization"] = self.token response = yield request data = response.text request.headers["Authorization"] = data yield request class SyncOrAsyncAuth(httpx.Auth): """ A mock authentication scheme that uses a different implementation for the sync and async cases. """ def __init__(self) -> None: self._lock = threading.Lock() self._async_lock = anyio.Lock() def sync_auth_flow( self, request: httpx.Request ) -> typing.Generator[httpx.Request, httpx.Response, None]: with self._lock: request.headers["Authorization"] = "sync-auth" yield request async def async_auth_flow( self, request: httpx.Request ) -> typing.AsyncGenerator[httpx.Request, httpx.Response]: async with self._async_lock: request.headers["Authorization"] = "async-auth" yield request @pytest.mark.anyio async def test_basic_auth() -> None: url = "https://example.org/" auth = ("user", "password123") app = App() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} @pytest.mark.anyio async def test_basic_auth_with_stream() -> None: """ See: https://github.com/encode/httpx/pull/1312 """ url = "https://example.org/" auth = ("user", "password123") app = App() async with httpx.AsyncClient( transport=httpx.MockTransport(app), auth=auth ) as client: async with client.stream("GET", url) as response: await response.aread() assert response.status_code == 200 assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} @pytest.mark.anyio async def test_basic_auth_in_url() -> None: url = "https://user:password123@example.org/" app = App() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url) assert response.status_code == 200 assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} @pytest.mark.anyio async def test_basic_auth_on_session() -> None: url = "https://example.org/" auth = ("user", "password123") app = App() async with httpx.AsyncClient( transport=httpx.MockTransport(app), auth=auth ) as client: response = await client.get(url) assert response.status_code == 200 assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} @pytest.mark.anyio async def test_custom_auth() -> None: url = "https://example.org/" app = App() def auth(request: httpx.Request) -> httpx.Request: request.headers["Authorization"] = "Token 123" return request async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": "Token 123"} def test_netrc_auth_credentials_exist() -> None: """ When netrc auth is being used and a request is made to a host that is in the netrc file, then the relevant credentials should be applied. """ netrc_file = str(FIXTURES_DIR / ".netrc") url = "http://netrcexample.org" app = App() auth = httpx.NetRCAuth(netrc_file) with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client: response = client.get(url) assert response.status_code == 200 assert response.json() == { "auth": "Basic ZXhhbXBsZS11c2VybmFtZTpleGFtcGxlLXBhc3N3b3Jk" } def test_netrc_auth_credentials_do_not_exist() -> None: """ When netrc auth is being used and a request is made to a host that is not in the netrc file, then no credentials should be applied. """ netrc_file = str(FIXTURES_DIR / ".netrc") url = "http://example.org" app = App() auth = httpx.NetRCAuth(netrc_file) with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client: response = client.get(url) assert response.status_code == 200 assert response.json() == {"auth": None} @pytest.mark.skipif( sys.version_info < (3, 11), reason="netrc files without a password are invalid with Python < 3.11", ) def test_netrc_auth_nopassword() -> None: # pragma: no cover """ Python has different netrc parsing behaviours with different versions. For Python 3.11+ a netrc file with no password is valid. In this case we want to check that we allow the netrc auth, and simply don't provide any credentials in the request. """ netrc_file = str(FIXTURES_DIR / ".netrc-nopassword") url = "http://example.org" app = App() auth = httpx.NetRCAuth(netrc_file) with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client: response = client.get(url) assert response.status_code == 200 assert response.json() == {"auth": None} @pytest.mark.skipif( sys.version_info >= (3, 11), reason="netrc files without a password are valid from Python >= 3.11", ) def test_netrc_auth_nopassword_parse_error() -> None: # pragma: no cover """ Python has different netrc parsing behaviours with different versions. For Python < 3.11 a netrc file with no password is invalid. In this case we want to allow the parse error to be raised. """ netrc_file = str(FIXTURES_DIR / ".netrc-nopassword") with pytest.raises(netrc.NetrcParseError): httpx.NetRCAuth(netrc_file) @pytest.mark.anyio async def test_auth_disable_per_request() -> None: url = "https://example.org/" auth = ("user", "password123") app = App() async with httpx.AsyncClient( transport=httpx.MockTransport(app), auth=auth ) as client: response = await client.get(url, auth=None) assert response.status_code == 200 assert response.json() == {"auth": None} def test_auth_hidden_url() -> None: url = "http://example-username:example-password@example.org/" expected = "URL('http://example-username:[secure]@example.org/')" assert url == httpx.URL(url) assert expected == repr(httpx.URL(url)) @pytest.mark.anyio async def test_auth_hidden_header() -> None: url = "https://example.org/" auth = ("example-username", "example-password") app = App() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert "'authorization': '[secure]'" in str(response.request.headers) @pytest.mark.anyio async def test_auth_property() -> None: app = App() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: assert client.auth is None client.auth = ("user", "password123") # type: ignore assert isinstance(client.auth, httpx.BasicAuth) url = "https://example.org/" response = await client.get(url) assert response.status_code == 200 assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} @pytest.mark.anyio async def test_auth_invalid_type() -> None: app = App() with pytest.raises(TypeError): client = httpx.AsyncClient( transport=httpx.MockTransport(app), auth="not a tuple, not a callable", # type: ignore ) async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: with pytest.raises(TypeError): await client.get(auth="not a tuple, not a callable") # type: ignore with pytest.raises(TypeError): client.auth = "not a tuple, not a callable" # type: ignore @pytest.mark.anyio async def test_digest_auth_returns_no_auth_if_no_digest_header_in_response() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = App() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": None} assert len(response.history) == 0 def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") auth_header = "Token ..." app = App(auth_header=auth_header, status_code=401) client = httpx.Client(transport=httpx.MockTransport(app)) response = client.get(url, auth=auth) assert response.status_code == 401 assert response.json() == {"auth": None} assert len(response.history) == 0 @pytest.mark.anyio async def test_digest_auth_200_response_including_digest_auth_header() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") auth_header = 'Digest realm="realm@host.com",qop="auth",nonce="abc",opaque="xyz"' app = App(auth_header=auth_header, status_code=200) async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": None} assert len(response.history) == 0 @pytest.mark.anyio async def test_digest_auth_401_response_without_digest_auth_header() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = App(auth_header="", status_code=401) async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 401 assert response.json() == {"auth": None} assert len(response.history) == 0 @pytest.mark.parametrize( "algorithm,expected_hash_length,expected_response_length", [ ("MD5", 64, 32), ("MD5-SESS", 64, 32), ("SHA", 64, 40), ("SHA-SESS", 64, 40), ("SHA-256", 64, 64), ("SHA-256-SESS", 64, 64), ("SHA-512", 64, 128), ("SHA-512-SESS", 64, 128), ], ) @pytest.mark.anyio async def test_digest_auth( algorithm: str, expected_hash_length: int, expected_response_length: int ) -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp(algorithm=algorithm) async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert len(response.history) == 1 authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"] scheme, _, fields = authorization.partition(" ") assert scheme == "Digest" response_fields = [field.strip() for field in fields.split(",")] digest_data = dict(field.split("=") for field in response_fields) assert digest_data["username"] == '"user"' assert digest_data["realm"] == '"httpx@example.org"' assert "nonce" in digest_data assert digest_data["uri"] == '"/"' assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes assert len(digest_data["opaque"]) == expected_hash_length + 2 assert digest_data["algorithm"] == algorithm assert digest_data["qop"] == "auth" assert digest_data["nc"] == "00000001" assert len(digest_data["cnonce"]) == 16 + 2 @pytest.mark.anyio async def test_digest_auth_no_specified_qop() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp(qop="") async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert len(response.history) == 1 authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"] scheme, _, fields = authorization.partition(" ") assert scheme == "Digest" response_fields = [field.strip() for field in fields.split(",")] digest_data = dict(field.split("=") for field in response_fields) assert "qop" not in digest_data assert "nc" not in digest_data assert "cnonce" not in digest_data assert digest_data["username"] == '"user"' assert digest_data["realm"] == '"httpx@example.org"' assert len(digest_data["nonce"]) == 64 + 2 # extra quotes assert digest_data["uri"] == '"/"' assert len(digest_data["response"]) == 64 + 2 assert len(digest_data["opaque"]) == 64 + 2 assert digest_data["algorithm"] == "SHA-256" @pytest.mark.parametrize("qop", ("auth, auth-int", "auth,auth-int", "unknown,auth")) @pytest.mark.anyio async def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str) -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp(qop=qop) async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert len(response.history) == 1 @pytest.mark.anyio async def test_digest_auth_qop_auth_int_not_implemented() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp(qop="auth-int") async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: with pytest.raises(NotImplementedError): await client.get(url, auth=auth) @pytest.mark.anyio async def test_digest_auth_qop_must_be_auth_or_auth_int() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp(qop="not-auth") async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: with pytest.raises(httpx.ProtocolError): await client.get(url, auth=auth) @pytest.mark.anyio async def test_digest_auth_incorrect_credentials() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp(send_response_after_attempt=2) async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 401 assert len(response.history) == 1 @pytest.mark.anyio async def test_digest_auth_reuses_challenge() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response_1 = await client.get(url, auth=auth) response_2 = await client.get(url, auth=auth) assert response_1.status_code == 200 assert response_2.status_code == 200 assert len(response_1.history) == 1 assert len(response_2.history) == 0 @pytest.mark.anyio async def test_digest_auth_resets_nonce_count_after_401() -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response_1 = await client.get(url, auth=auth) assert response_1.status_code == 200 assert len(response_1.history) == 1 first_nonce = parse_keqv_list( response_1.request.headers["Authorization"].split(", ") )["nonce"] first_nc = parse_keqv_list( response_1.request.headers["Authorization"].split(", ") )["nc"] # with this we now force a 401 on a subsequent (but initial) request app.send_response_after_attempt = 2 # we expect the client again to try to authenticate, # i.e. the history length must be 1 response_2 = await client.get(url, auth=auth) assert response_2.status_code == 200 assert len(response_2.history) == 1 second_nonce = parse_keqv_list( response_2.request.headers["Authorization"].split(", ") )["nonce"] second_nc = parse_keqv_list( response_2.request.headers["Authorization"].split(", ") )["nc"] assert first_nonce != second_nonce # ensures that the auth challenge was reset assert ( first_nc == second_nc ) # ensures the nonce count is reset when the authentication failed @pytest.mark.parametrize( "auth_header", [ 'Digest realm="httpx@example.org", qop="auth"', # missing fields 'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list ], ) @pytest.mark.anyio async def test_async_digest_auth_raises_protocol_error_on_malformed_header( auth_header: str, ) -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = App(auth_header=auth_header, status_code=401) async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: with pytest.raises(httpx.ProtocolError): await client.get(url, auth=auth) @pytest.mark.parametrize( "auth_header", [ 'Digest realm="httpx@example.org", qop="auth"', # missing fields 'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list ], ) def test_sync_digest_auth_raises_protocol_error_on_malformed_header( auth_header: str, ) -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = App(auth_header=auth_header, status_code=401) with httpx.Client(transport=httpx.MockTransport(app)) as client: with pytest.raises(httpx.ProtocolError): client.get(url, auth=auth) @pytest.mark.anyio async def test_async_auth_history() -> None: """ Test that intermediate requests sent as part of an authentication flow are recorded in the response history. """ url = "https://example.org/" auth = RepeatAuth(repeat=2) app = App(auth_header="abc") async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": "Repeat abc.abc"} assert len(response.history) == 2 resp1, resp2 = response.history assert resp1.json() == {"auth": "Repeat 0"} assert resp2.json() == {"auth": "Repeat 1"} assert len(resp2.history) == 1 assert resp2.history == [resp1] assert len(resp1.history) == 0 def test_sync_auth_history() -> None: """ Test that intermediate requests sent as part of an authentication flow are recorded in the response history. """ url = "https://example.org/" auth = RepeatAuth(repeat=2) app = App(auth_header="abc") with httpx.Client(transport=httpx.MockTransport(app)) as client: response = client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": "Repeat abc.abc"} assert len(response.history) == 2 resp1, resp2 = response.history assert resp1.json() == {"auth": "Repeat 0"} assert resp2.json() == {"auth": "Repeat 1"} assert len(resp2.history) == 1 assert resp2.history == [resp1] assert len(resp1.history) == 0 class ConsumeBodyTransport(httpx.MockTransport): async def handle_async_request(self, request: httpx.Request) -> httpx.Response: assert isinstance(request.stream, httpx.AsyncByteStream) [_ async for _ in request.stream] return self.handler(request) # type: ignore[return-value] @pytest.mark.anyio async def test_digest_auth_unavailable_streaming_body(): url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") app = DigestApp() async def streaming_body() -> typing.AsyncIterator[bytes]: yield b"Example request body" # pragma: no cover async with httpx.AsyncClient(transport=ConsumeBodyTransport(app)) as client: with pytest.raises(httpx.StreamConsumed): await client.post(url, content=streaming_body(), auth=auth) @pytest.mark.anyio async def test_async_auth_reads_response_body() -> None: """ Test that we can read the response body in an auth flow if `requires_response_body` is set. """ url = "https://example.org/" auth = ResponseBodyAuth("xyz") app = App() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": '{"auth": "xyz"}'} def test_sync_auth_reads_response_body() -> None: """ Test that we can read the response body in an auth flow if `requires_response_body` is set. """ url = "https://example.org/" auth = ResponseBodyAuth("xyz") app = App() with httpx.Client(transport=httpx.MockTransport(app)) as client: response = client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": '{"auth": "xyz"}'} @pytest.mark.anyio async def test_async_auth() -> None: """ Test that we can use an auth implementation specific to the async case, to support cases that require performing I/O or using concurrency primitives (such as checking a disk-based cache or fetching a token from a remote auth server). """ url = "https://example.org/" auth = SyncOrAsyncAuth() app = App() async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: response = await client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": "async-auth"} def test_sync_auth() -> None: """ Test that we can use an auth implementation specific to the sync case. """ url = "https://example.org/" auth = SyncOrAsyncAuth() app = App() with httpx.Client(transport=httpx.MockTransport(app)) as client: response = client.get(url, auth=auth) assert response.status_code == 200 assert response.json() == {"auth": "sync-auth"}