httpx/tests/client/test_auth.py
T-256 0006ed0547
format (#3131)
Co-authored-by: T-256 <Tester@test.com>
2024-03-01 19:49:23 +00:00

796 lines
26 KiB
Python

"""
Integration tests for authentication.
Unit tests for auth classes also exist in tests/test_auth.py
"""
import hashlib
import netrc
import os
import sys
import threading
import typing
from urllib.request import parse_keqv_list
import anyio
import pytest
import httpx
from ..common import FIXTURES_DIR
class App:
"""
A mock app to test auth credentials.
"""
def __init__(self, auth_header: str = "", status_code: int = 200) -> None:
self.auth_header = auth_header
self.status_code = status_code
def __call__(self, request: httpx.Request) -> httpx.Response:
headers = {"www-authenticate": self.auth_header} if self.auth_header else {}
data = {"auth": request.headers.get("Authorization")}
return httpx.Response(self.status_code, headers=headers, json=data)
class DigestApp:
def __init__(
self,
algorithm: str = "SHA-256",
send_response_after_attempt: int = 1,
qop: str = "auth",
regenerate_nonce: bool = True,
) -> None:
self.algorithm = algorithm
self.send_response_after_attempt = send_response_after_attempt
self.qop = qop
self._regenerate_nonce = regenerate_nonce
self._response_count = 0
def __call__(self, request: httpx.Request) -> httpx.Response:
if self._response_count < self.send_response_after_attempt:
return self.challenge_send(request)
data = {"auth": request.headers.get("Authorization")}
return httpx.Response(200, json=data)
def challenge_send(self, request: httpx.Request) -> httpx.Response:
self._response_count += 1
nonce = (
hashlib.sha256(os.urandom(8)).hexdigest()
if self._regenerate_nonce
else "ee96edced2a0b43e4869e96ebe27563f369c1205a049d06419bb51d8aeddf3d3"
)
challenge_data = {
"nonce": nonce,
"qop": self.qop,
"opaque": (
"ee6378f3ee14ebfd2fff54b70a91a7c9390518047f242ab2271380db0e14bda1"
),
"algorithm": self.algorithm,
"stale": "FALSE",
}
challenge_str = ", ".join(
'{}="{}"'.format(key, value)
for key, value in challenge_data.items()
if value
)
headers = {
"www-authenticate": f'Digest realm="httpx@example.org", {challenge_str}',
}
return httpx.Response(401, headers=headers)
class RepeatAuth(httpx.Auth):
"""
A mock authentication scheme that requires clients to send
the request a fixed number of times, and then send a last request containing
an aggregation of nonces that the server sent in 'WWW-Authenticate' headers
of intermediate responses.
"""
requires_request_body = True
def __init__(self, repeat: int) -> None:
self.repeat = repeat
def auth_flow(
self, request: httpx.Request
) -> typing.Generator[httpx.Request, httpx.Response, None]:
nonces = []
for index in range(self.repeat):
request.headers["Authorization"] = f"Repeat {index}"
response = yield request
nonces.append(response.headers["www-authenticate"])
key = ".".join(nonces)
request.headers["Authorization"] = f"Repeat {key}"
yield request
class ResponseBodyAuth(httpx.Auth):
"""
A mock authentication scheme that requires clients to send an 'Authorization'
header, then send back the contents of the response in the 'Authorization'
header.
"""
requires_response_body = True
def __init__(self, token: str) -> None:
self.token = token
def auth_flow(
self, request: httpx.Request
) -> typing.Generator[httpx.Request, httpx.Response, None]:
request.headers["Authorization"] = self.token
response = yield request
data = response.text
request.headers["Authorization"] = data
yield request
class SyncOrAsyncAuth(httpx.Auth):
"""
A mock authentication scheme that uses a different implementation for the
sync and async cases.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self._async_lock = anyio.Lock()
def sync_auth_flow(
self, request: httpx.Request
) -> typing.Generator[httpx.Request, httpx.Response, None]:
with self._lock:
request.headers["Authorization"] = "sync-auth"
yield request
async def async_auth_flow(
self, request: httpx.Request
) -> typing.AsyncGenerator[httpx.Request, httpx.Response]:
async with self._async_lock:
request.headers["Authorization"] = "async-auth"
yield request
@pytest.mark.anyio
async def test_basic_auth() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_basic_auth_with_stream() -> None:
"""
See: https://github.com/encode/httpx/pull/1312
"""
url = "https://example.org/"
auth = ("user", "password123")
app = App()
async with httpx.AsyncClient(
transport=httpx.MockTransport(app), auth=auth
) as client:
async with client.stream("GET", url) as response:
await response.aread()
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_basic_auth_in_url() -> None:
url = "https://user:password123@example.org/"
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_basic_auth_on_session() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
async with httpx.AsyncClient(
transport=httpx.MockTransport(app), auth=auth
) as client:
response = await client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_custom_auth() -> None:
url = "https://example.org/"
app = App()
def auth(request: httpx.Request) -> httpx.Request:
request.headers["Authorization"] = "Token 123"
return request
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Token 123"}
def test_netrc_auth_credentials_exist() -> None:
"""
When netrc auth is being used and a request is made to a host that is
in the netrc file, then the relevant credentials should be applied.
"""
netrc_file = str(FIXTURES_DIR / ".netrc")
url = "http://netrcexample.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
"auth": "Basic ZXhhbXBsZS11c2VybmFtZTpleGFtcGxlLXBhc3N3b3Jk"
}
def test_netrc_auth_credentials_do_not_exist() -> None:
"""
When netrc auth is being used and a request is made to a host that is
not in the netrc file, then no credentials should be applied.
"""
netrc_file = str(FIXTURES_DIR / ".netrc")
url = "http://example.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": None}
@pytest.mark.skipif(
sys.version_info < (3, 11),
reason="netrc files without a password are invalid with Python < 3.11",
)
def test_netrc_auth_nopassword() -> None: # pragma: no cover
"""
Python has different netrc parsing behaviours with different versions.
For Python 3.11+ a netrc file with no password is valid. In this case
we want to check that we allow the netrc auth, and simply don't provide
any credentials in the request.
"""
netrc_file = str(FIXTURES_DIR / ".netrc-nopassword")
url = "http://example.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": None}
@pytest.mark.skipif(
sys.version_info >= (3, 11),
reason="netrc files without a password are valid from Python >= 3.11",
)
def test_netrc_auth_nopassword_parse_error() -> None: # pragma: no cover
"""
Python has different netrc parsing behaviours with different versions.
For Python < 3.11 a netrc file with no password is invalid. In this case
we want to allow the parse error to be raised.
"""
netrc_file = str(FIXTURES_DIR / ".netrc-nopassword")
with pytest.raises(netrc.NetrcParseError):
httpx.NetRCAuth(netrc_file)
@pytest.mark.anyio
async def test_auth_disable_per_request() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
async with httpx.AsyncClient(
transport=httpx.MockTransport(app), auth=auth
) as client:
response = await client.get(url, auth=None)
assert response.status_code == 200
assert response.json() == {"auth": None}
def test_auth_hidden_url() -> None:
url = "http://example-username:example-password@example.org/"
expected = "URL('http://example-username:[secure]@example.org/')"
assert url == httpx.URL(url)
assert expected == repr(httpx.URL(url))
@pytest.mark.anyio
async def test_auth_hidden_header() -> None:
url = "https://example.org/"
auth = ("example-username", "example-password")
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert "'authorization': '[secure]'" in str(response.request.headers)
@pytest.mark.anyio
async def test_auth_property() -> None:
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
assert client.auth is None
client.auth = ("user", "password123") # type: ignore
assert isinstance(client.auth, httpx.BasicAuth)
url = "https://example.org/"
response = await client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
@pytest.mark.anyio
async def test_auth_invalid_type() -> None:
app = App()
with pytest.raises(TypeError):
client = httpx.AsyncClient(
transport=httpx.MockTransport(app),
auth="not a tuple, not a callable", # type: ignore
)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(TypeError):
await client.get(auth="not a tuple, not a callable") # type: ignore
with pytest.raises(TypeError):
client.auth = "not a tuple, not a callable" # type: ignore
@pytest.mark.anyio
async def test_digest_auth_returns_no_auth_if_no_digest_header_in_response() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": None}
assert len(response.history) == 0
def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
auth_header = "Token ..."
app = App(auth_header=auth_header, status_code=401)
client = httpx.Client(transport=httpx.MockTransport(app))
response = client.get(url, auth=auth)
assert response.status_code == 401
assert response.json() == {"auth": None}
assert len(response.history) == 0
@pytest.mark.anyio
async def test_digest_auth_200_response_including_digest_auth_header() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
auth_header = 'Digest realm="realm@host.com",qop="auth",nonce="abc",opaque="xyz"'
app = App(auth_header=auth_header, status_code=200)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": None}
assert len(response.history) == 0
@pytest.mark.anyio
async def test_digest_auth_401_response_without_digest_auth_header() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App(auth_header="", status_code=401)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 401
assert response.json() == {"auth": None}
assert len(response.history) == 0
@pytest.mark.parametrize(
"algorithm,expected_hash_length,expected_response_length",
[
("MD5", 64, 32),
("MD5-SESS", 64, 32),
("SHA", 64, 40),
("SHA-SESS", 64, 40),
("SHA-256", 64, 64),
("SHA-256-SESS", 64, 64),
("SHA-512", 64, 128),
("SHA-512-SESS", 64, 128),
],
)
@pytest.mark.anyio
async def test_digest_auth(
algorithm: str, expected_hash_length: int, expected_response_length: int
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(algorithm=algorithm)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"
response_fields = [field.strip() for field in fields.split(",")]
digest_data = dict(field.split("=") for field in response_fields)
assert digest_data["username"] == '"user"'
assert digest_data["realm"] == '"httpx@example.org"'
assert "nonce" in digest_data
assert digest_data["uri"] == '"/"'
assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes
assert len(digest_data["opaque"]) == expected_hash_length + 2
assert digest_data["algorithm"] == algorithm
assert digest_data["qop"] == "auth"
assert digest_data["nc"] == "00000001"
assert len(digest_data["cnonce"]) == 16 + 2
@pytest.mark.anyio
async def test_digest_auth_no_specified_qop() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="")
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"
response_fields = [field.strip() for field in fields.split(",")]
digest_data = dict(field.split("=") for field in response_fields)
assert "qop" not in digest_data
assert "nc" not in digest_data
assert "cnonce" not in digest_data
assert digest_data["username"] == '"user"'
assert digest_data["realm"] == '"httpx@example.org"'
assert len(digest_data["nonce"]) == 64 + 2 # extra quotes
assert digest_data["uri"] == '"/"'
assert len(digest_data["response"]) == 64 + 2
assert len(digest_data["opaque"]) == 64 + 2
assert digest_data["algorithm"] == "SHA-256"
@pytest.mark.parametrize("qop", ("auth, auth-int", "auth,auth-int", "unknown,auth"))
@pytest.mark.anyio
async def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop=qop)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
@pytest.mark.anyio
async def test_digest_auth_qop_auth_int_not_implemented() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="auth-int")
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(NotImplementedError):
await client.get(url, auth=auth)
@pytest.mark.anyio
async def test_digest_auth_qop_must_be_auth_or_auth_int() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="not-auth")
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(httpx.ProtocolError):
await client.get(url, auth=auth)
@pytest.mark.anyio
async def test_digest_auth_incorrect_credentials() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(send_response_after_attempt=2)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 401
assert len(response.history) == 1
@pytest.mark.anyio
async def test_digest_auth_reuses_challenge() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response_1 = await client.get(url, auth=auth)
response_2 = await client.get(url, auth=auth)
assert response_1.status_code == 200
assert response_2.status_code == 200
assert len(response_1.history) == 1
assert len(response_2.history) == 0
@pytest.mark.anyio
async def test_digest_auth_resets_nonce_count_after_401() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response_1 = await client.get(url, auth=auth)
assert response_1.status_code == 200
assert len(response_1.history) == 1
first_nonce = parse_keqv_list(
response_1.request.headers["Authorization"].split(", ")
)["nonce"]
first_nc = parse_keqv_list(
response_1.request.headers["Authorization"].split(", ")
)["nc"]
# with this we now force a 401 on a subsequent (but initial) request
app.send_response_after_attempt = 2
# we expect the client again to try to authenticate,
# i.e. the history length must be 1
response_2 = await client.get(url, auth=auth)
assert response_2.status_code == 200
assert len(response_2.history) == 1
second_nonce = parse_keqv_list(
response_2.request.headers["Authorization"].split(", ")
)["nonce"]
second_nc = parse_keqv_list(
response_2.request.headers["Authorization"].split(", ")
)["nc"]
assert first_nonce != second_nonce # ensures that the auth challenge was reset
assert (
first_nc == second_nc
) # ensures the nonce count is reset when the authentication failed
@pytest.mark.parametrize(
"auth_header",
[
'Digest realm="httpx@example.org", qop="auth"', # missing fields
'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list
],
)
@pytest.mark.anyio
async def test_async_digest_auth_raises_protocol_error_on_malformed_header(
auth_header: str,
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App(auth_header=auth_header, status_code=401)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(httpx.ProtocolError):
await client.get(url, auth=auth)
@pytest.mark.parametrize(
"auth_header",
[
'Digest realm="httpx@example.org", qop="auth"', # missing fields
'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list
],
)
def test_sync_digest_auth_raises_protocol_error_on_malformed_header(
auth_header: str,
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App(auth_header=auth_header, status_code=401)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
with pytest.raises(httpx.ProtocolError):
client.get(url, auth=auth)
@pytest.mark.anyio
async def test_async_auth_history() -> None:
"""
Test that intermediate requests sent as part of an authentication flow
are recorded in the response history.
"""
url = "https://example.org/"
auth = RepeatAuth(repeat=2)
app = App(auth_header="abc")
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Repeat abc.abc"}
assert len(response.history) == 2
resp1, resp2 = response.history
assert resp1.json() == {"auth": "Repeat 0"}
assert resp2.json() == {"auth": "Repeat 1"}
assert len(resp2.history) == 1
assert resp2.history == [resp1]
assert len(resp1.history) == 0
def test_sync_auth_history() -> None:
"""
Test that intermediate requests sent as part of an authentication flow
are recorded in the response history.
"""
url = "https://example.org/"
auth = RepeatAuth(repeat=2)
app = App(auth_header="abc")
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Repeat abc.abc"}
assert len(response.history) == 2
resp1, resp2 = response.history
assert resp1.json() == {"auth": "Repeat 0"}
assert resp2.json() == {"auth": "Repeat 1"}
assert len(resp2.history) == 1
assert resp2.history == [resp1]
assert len(resp1.history) == 0
class ConsumeBodyTransport(httpx.MockTransport):
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
assert isinstance(request.stream, httpx.AsyncByteStream)
[_ async for _ in request.stream]
return self.handler(request) # type: ignore[return-value]
@pytest.mark.anyio
async def test_digest_auth_unavailable_streaming_body():
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
async def streaming_body() -> typing.AsyncIterator[bytes]:
yield b"Example request body" # pragma: no cover
async with httpx.AsyncClient(transport=ConsumeBodyTransport(app)) as client:
with pytest.raises(httpx.StreamConsumed):
await client.post(url, content=streaming_body(), auth=auth)
@pytest.mark.anyio
async def test_async_auth_reads_response_body() -> None:
"""
Test that we can read the response body in an auth flow if `requires_response_body`
is set.
"""
url = "https://example.org/"
auth = ResponseBodyAuth("xyz")
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": '{"auth": "xyz"}'}
def test_sync_auth_reads_response_body() -> None:
"""
Test that we can read the response body in an auth flow if `requires_response_body`
is set.
"""
url = "https://example.org/"
auth = ResponseBodyAuth("xyz")
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": '{"auth": "xyz"}'}
@pytest.mark.anyio
async def test_async_auth() -> None:
"""
Test that we can use an auth implementation specific to the async case, to
support cases that require performing I/O or using concurrency primitives (such
as checking a disk-based cache or fetching a token from a remote auth server).
"""
url = "https://example.org/"
auth = SyncOrAsyncAuth()
app = App()
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "async-auth"}
def test_sync_auth() -> None:
"""
Test that we can use an auth implementation specific to the sync case.
"""
url = "https://example.org/"
auth = SyncOrAsyncAuth()
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "sync-auth"}