This commit is contained in:
David Sillman 2026-02-23 17:33:45 +04:00 committed by GitHub
commit e4a8a480c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 179 additions and 14 deletions

View File

@ -40,6 +40,18 @@ HTTP digest authentication is a challenge-response authentication scheme. Unlike
[<Response [401 UNAUTHORIZED]>]
```
HTTPX also supports digest authentication using `auth-int` quality-of-protection, which provides message integrity protection. When `qop="auth-int"` is used, the authentication response includes a hash of the request body, protecting against tampering with the message content during transmission. This provides additional security over `qop="auth"` by ensuring that both the authentication credentials and the message body integrity are verified.
```pycon
>>> auth = httpx.DigestAuth(username="olivia", password="secret")
>>> client = httpx.Client(auth=auth)
>>> response = client.get("https://httpbin.org/digest-auth/auth-int/olivia/secret")
>>> response
<Response [200 OK]>
>>> response.history
[<Response [401 UNAUTHORIZED]>]
```
## NetRC authentication
HTTPX can be configured to use [a `.netrc` config file](https://everything.curl.dev/usingcurl/netrc) for authentication.
@ -229,4 +241,4 @@ class MyCustomAuth(httpx.Auth):
async def async_auth_flow(self, request):
raise RuntimeError("Cannot use a sync authentication class with httpx.AsyncClient")
```
```

View File

@ -264,7 +264,8 @@ class DigestAuth(Auth):
path = request.url.raw_path
A2 = b":".join((request.method.encode(), path))
# TODO: implement auth-int
if challenge.qop == b"auth-int":
A2 += b":" + digest(request.content)
HA2 = digest(A2)
nc_value = b"%08x" % self._nonce_count
@ -294,7 +295,7 @@ class DigestAuth(Auth):
if challenge.opaque:
format_args["opaque"] = challenge.opaque
if qop:
format_args["qop"] = b"auth"
format_args["qop"] = qop
format_args["nc"] = nc_value
format_args["cnonce"] = cnonce
@ -330,12 +331,13 @@ class DigestAuth(Auth):
if qop is None:
return None
qops = re.split(b", ?", qop)
# Defer to the strongest supplied qop (auth-int > auth)
if b"auth-int" in qops:
return b"auth-int"
if b"auth" in qops:
return b"auth"
if qops == [b"auth-int"]:
raise NotImplementedError("Digest auth-int support is not yet implemented")
message = f'Unexpected qop value "{qop!r}" in digest auth'
raise ProtocolError(message, request=request)

View File

@ -501,15 +501,50 @@ async def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str)
assert len(response.history) == 1
@pytest.mark.parametrize(
"algorithm,expected_hash_length,expected_response_length",
[
("MD5", 64, 32),
("MD5-SESS", 64, 32),
("SHA", 64, 40),
("SHA-SESS", 64, 40),
("SHA-256", 64, 64),
("SHA-256-SESS", 64, 64),
("SHA-512", 64, 128),
("SHA-512-SESS", 64, 128),
],
)
@pytest.mark.anyio
async def test_digest_auth_qop_auth_int_not_implemented() -> None:
async def test_digest_auth_qop_auth_int(
algorithm: str, expected_hash_length: int, expected_response_length: int
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="auth-int")
app = DigestApp(qop="auth-int", algorithm=algorithm)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(NotImplementedError):
await client.get(url, auth=auth)
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"
response_fields = [field.strip() for field in fields.split(",")]
digest_data = dict(field.split("=") for field in response_fields)
assert digest_data["username"] == '"user"'
assert digest_data["realm"] == '"httpx@example.org"'
assert "nonce" in digest_data
assert digest_data["uri"] == '"/"'
assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes
assert len(digest_data["opaque"]) == expected_hash_length + 2
assert digest_data["algorithm"] == algorithm
assert digest_data["qop"] == "auth-int"
assert digest_data["nc"] == "00000001"
assert len(digest_data["cnonce"]) == 16 + 2
@pytest.mark.anyio

View File

@ -208,7 +208,7 @@ def test_digest_auth_rfc_7616_md5(monkeypatch):
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
'qop="auth, auth-int", '
"qop=auth, "
"algorithm=MD5, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
@ -232,7 +232,7 @@ def test_digest_auth_rfc_7616_md5(monkeypatch):
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth" in request.headers["Authorization"]
assert "qop=auth," in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
@ -268,7 +268,7 @@ def test_digest_auth_rfc_7616_sha_256(monkeypatch):
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
'qop="auth, auth-int", '
"qop=auth, "
"algorithm=SHA-256, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
@ -292,7 +292,7 @@ def test_digest_auth_rfc_7616_sha_256(monkeypatch):
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth" in request.headers["Authorization"]
assert "qop=auth," in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
@ -306,3 +306,119 @@ def test_digest_auth_rfc_7616_sha_256(monkeypatch):
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)
def test_digest_auth_int_rfc_7616_md5(monkeypatch):
def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode()
auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life")
monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce)
request = httpx.Request("GET", "https://www.example.com/dir/index.html")
# The initial request should not include an auth header.
flow = auth.sync_auth_flow(request)
request = next(flow)
assert "Authorization" not in request.headers
# If a 401 response is returned, then a digest auth request is made.
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
"qop=auth-int, "
"algorithm=MD5, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
)
}
response = httpx.Response(
content=b"Auth required", status_code=401, headers=headers, request=request
)
request = flow.send(response)
assert request.headers["Authorization"].startswith("Digest")
assert 'username="Mufasa"' in request.headers["Authorization"]
assert 'realm="http-auth@example.org"' in request.headers["Authorization"]
assert 'uri="/dir/index.html"' in request.headers["Authorization"]
assert "algorithm=MD5" in request.headers["Authorization"]
assert (
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v"'
in request.headers["Authorization"]
)
assert "nc=00000001" in request.headers["Authorization"]
assert (
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth-int," in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
)
assert (
'response="8804a53d3640a40a4f73cea12c5ba451"'
in request.headers["Authorization"]
)
# No other requests are made.
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)
def test_digest_auth_int_rfc7616_sha256(monkeypatch):
def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode()
auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life")
monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce)
request = httpx.Request("GET", "https://www.example.com/dir/index.html")
# The initial request should not include an auth header.
flow = auth.sync_auth_flow(request)
request = next(flow)
assert "Authorization" not in request.headers
# If a 401 response is returned, then a digest auth request is made.
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
"qop=auth-int, "
"algorithm=SHA-256, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
)
}
response = httpx.Response(
content=b"Auth required", status_code=401, headers=headers, request=request
)
request = flow.send(response)
assert request.headers["Authorization"].startswith("Digest")
assert 'username="Mufasa"' in request.headers["Authorization"]
assert 'realm="http-auth@example.org"' in request.headers["Authorization"]
assert 'uri="/dir/index.html"' in request.headers["Authorization"]
assert "algorithm=SHA-256" in request.headers["Authorization"]
assert (
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v"'
in request.headers["Authorization"]
)
assert "nc=00000001" in request.headers["Authorization"]
assert (
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth-int," in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
)
assert (
'response="8bdf6f15638e260831e905028de5450562816d093c9bfc5c13d3a46adcdde940"'
in request.headers["Authorization"]
)
# No other requests are made.
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)