Add Auth.requires_response_body attribute (#803)

* Add Auth.requires_response_body attribute

If set then responses are read by the client before being sent back into the auth flow

* Update tests and docs

* PR fixes

* Change example methods
This commit is contained in:
George Kettleborough 2020-02-10 12:10:11 +00:00 committed by GitHub
parent cfe2278096
commit b3db9ff0b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 0 deletions

View File

@ -421,6 +421,41 @@ class MyCustomAuth(httpx.Auth):
...
```
Similarly, if you are implementing a scheme that requires access to the response body, then use the `requires_response_body` property. You will then be able to access response body properties and methods such as `response.content`, `response.text`, `response.json()`, etc.
```python
class MyCustomAuth(httpx.Auth):
requires_response_body = True
def __init__(self, access_token, refresh_token, refresh_url):
self.access_token = access_token
self.refresh_token = refresh_token
self.refresh_url = refresh_url
def auth_flow(self, request):
request.headers["X-Authentication"] = self.access_token
response = yield request
if response.status_code == 401:
# If the server issues a 401 response, then issue a request to
# refresh tokens, and resend the request.
refresh_response = yield self.build_refresh_request()
self.update_tokens(refresh_response)
request.headers["X-Authentication"] = self.access_token
yield request
def build_refresh_request(self):
# Return an `httpx.Request` for refreshing tokens.
...
def update_tokens(self, response):
# Update the `.access_token` and `.refresh_token` tokens
# based on a refresh response.
data = response.json()
...
```
## SSL certificates
When making a request over HTTPS, HTTPX needs to verify the identity of the requested host. To do this, it uses a bundle of SSL certificates (a.k.a. CA bundle) delivered by a trusted certificate authority (CA).

View File

@ -23,6 +23,7 @@ class Auth:
"""
requires_request_body = False
requires_response_body = False
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
"""

View File

@ -657,6 +657,8 @@ class Client(BaseClient):
request = next(auth_flow)
while True:
response = self.send_single_request(request, timeout)
if auth.requires_response_body:
response.read()
try:
next_request = auth_flow.send(response)
except StopIteration:
@ -1182,6 +1184,8 @@ class AsyncClient(BaseClient):
request = next(auth_flow)
while True:
response = await self.send_single_request(request, timeout)
if auth.requires_response_body:
await response.aread()
try:
next_request = auth_flow.send(response)
except StopIteration:

View File

@ -467,3 +467,40 @@ async def test_digest_auth_unavailable_streaming_body():
with pytest.raises(RequestBodyUnavailable):
await client.post(url, data=streaming_body(), auth=auth)
@pytest.mark.asyncio
async def test_auth_reads_response_body() -> None:
"""
Test that we can read the response body in an auth flow if `requires_response_body`
is set.
"""
class ResponseBodyAuth(Auth):
"""
A mock authentication scheme that requires clients to send an 'Authorization'
header, then send back the contents of the response in the 'Authorization'
header.
"""
requires_response_body = True
def __init__(self, token):
self.token = token
def auth_flow(
self, request: Request
) -> typing.Generator[Request, Response, None]:
request.headers["Authorization"] = self.token
response = yield request
data = response.text
request.headers["Authorization"] = data
yield request
url = "https://example.org/"
auth = ResponseBodyAuth("xyz")
client = AsyncClient(dispatch=MockDispatch())
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": '{"auth": "xyz"}'}