Add Client.stream() method. (#600)

* Add Client.stream() method.

* Add top-level stream API

* Documentation
This commit is contained in:
Tom Christie 2019-12-05 17:25:43 +00:00 committed by GitHub
parent 38a9d77342
commit 8d8ea8bbba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 215 additions and 36 deletions

View File

@ -10,11 +10,10 @@ Pretty much any API mentioned in the `requests` QuickStart should be identical
to the API in our own documentation. The following exceptions apply:
* `Response.url` - Returns a `URL` instance, rather than a string. Use `str(response.url)` if you need a string instance.
* `httpx.codes` - In our documentation we prefer the uppercased versions, such as `codes.NOT_FOUND`,
but also provide lower-cased versions for API compatibility with `requests`.
* `stream=True`. - Streaming responses provide the `.stream()` and `.raw()` byte iterator interfaces, rather than the `.iter_content()` method and the `.raw` socket interface.
* `httpx.codes` - In our documentation we prefer the uppercased versions, such as `codes.NOT_FOUND`, but also provide lower-cased versions for API compatibility with `requests`.
* `stream()`. - HTTPX provides a `.stream()` interface rather than using `stream=True`. This ensures that streaming responses are always properly closed outside of the stream block, and makes it visually clearer at which points streaming I/O APIs may be used with a response. Streaming request data is made avialable with `.stream_bytes()`, `.stream_text()`, `.stream_lines()`, and `.stream_raw()`.
* `.get`, `.delete`, `.head`, `.options` - These methods do not support `files`, `data`, or `json` arguments. Use `.request` if you need to need to send data using these http methods.
* We don't support `response.is_ok` since the naming is ambiguous there, and might incorrectly imply an equivalence to `response.status_code == codes.OK`. Instead we provide an `.is_error` property.
* We don't support `response.is_ok` since the naming is ambiguous there, and might incorrectly imply an equivalence to `response.status_code == codes.OK`. Instead we provide the `response.is_error` property. Use `if not response.is_error:` instead of `if response.is_ok:`.
## Advanced Usage

View File

@ -292,11 +292,57 @@ The `Headers` data type is case-insensitive, so you can use any capitalization.
'application/json'
```
Multiple values for a single response header are represented as a single comma-separated
value, as per [RFC 7230](https://tools.ietf.org/html/rfc7230#section-3.2):
Multiple values for a single response header are represented as a single comma-separated value, as per [RFC 7230](https://tools.ietf.org/html/rfc7230#section-3.2):
> A recipient MAY combine multiple header fields with the same field name into one “field-name: field-value” pair, without changing the semantics of the message, by appending each subsequent field-value to the combined field value in order, separated by a comma.
## Streaming Responses
For large downloads you may want to use streaming responses that do not load the entire response body into memory at once.
You can stream the binary content of the response...
```
>>> async with httpx.stream("GET", "https://www.example.com") as r:
... async for data in r.stream_bytes():
... print(data)
```
Or the text of the response...
```
>>> async with httpx.stream("GET", "https://www.example.com") as r:
... async for text in r.stream_text():
... print(text)
```
Or stream the text, on a line-by-line basis...
```
>>> async with httpx.stream("GET", "https://www.example.com") as r:
... async for line in r.stream_lines():
... print(line)
```
HTTPX will use universal line endings, normalising all cases to `\n`.
In some cases you might want to access the raw bytes on the response without applying any HTTP content decoding. In this case any content encoding that the web server has applied such as `gzip`, `deflate`, or `brotli` will not be automatically decoded.
```
>>> async with httpx.stream("GET", "https://www.example.com") as r:
... async for chunk in r.stream_raw():
... print(chunk)
```
If you're using streaming responses in any of these ways then the `response.content` and `response.text` attributes will not be available, and will raise errors if accessed. However you can also use the response streaming functionality to conditionally load the response body:
```
>>> async with httpx.stream("GET", "https://www.example.com") as r:
... if r.headers['Content-Length'] < TOO_LONG:
... await r.read()
... print(r.text)
```
## Cookies
Any cookies that are set on the response can be easily accessed:

View File

@ -1,5 +1,5 @@
from .__version__ import __description__, __title__, __version__
from .api import delete, get, head, options, patch, post, put, request
from .api import delete, get, head, options, patch, post, put, request, stream
from .auth import BasicAuth, DigestAuth
from .client import Client
from .concurrency.asyncio import AsyncioBackend
@ -75,6 +75,7 @@ __all__ = [
"patch",
"put",
"request",
"stream",
"BasicAuth",
"Client",
"DigestAuth",

View File

@ -1,12 +1,13 @@
import typing
from .client import Client
from .client import Client, StreamContextManager
from .config import DEFAULT_TIMEOUT_CONFIG, CertTypes, TimeoutTypes, VerifyTypes
from .models import (
AuthTypes,
CookieTypes,
HeaderTypes,
QueryParamTypes,
Request,
RequestData,
RequestFiles,
Response,
@ -98,6 +99,44 @@ async def request(
)
def stream(
method: str,
url: URLTypes,
*,
params: QueryParamTypes = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: AuthTypes = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
allow_redirects: bool = True,
verify: VerifyTypes = True,
cert: CertTypes = None,
trust_env: bool = True,
) -> StreamContextManager:
client = Client(cert=cert, verify=verify, trust_env=trust_env)
request = Request(
method=method,
url=url,
params=params,
data=data,
files=files,
json=json,
headers=headers,
cookies=cookies,
)
return StreamContextManager(
client=client,
request=request,
auth=auth,
timeout=timeout,
allow_redirects=allow_redirects,
close_client=True,
)
async def get(
url: URLTypes,
*,

View File

@ -1,5 +1,6 @@
import functools
import typing
import warnings
from types import TracebackType
import hstspreload
@ -222,6 +223,12 @@ class Client:
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
if stream:
warnings.warn(
"The 'stream=True' argument is due to be deprecated. "
"Use 'async with client.stream(method, url, ...) as response' instead."
)
request = self.build_request(
method=method,
url=url,
@ -244,6 +251,39 @@ class Client:
)
return response
def stream(
self,
method: str,
url: URLTypes,
*,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: AuthTypes = None,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> "StreamContextManager":
request = self.build_request(
method=method,
url=url,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
return StreamContextManager(
client=self,
request=request,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def build_request(
self,
method: str,
@ -864,3 +904,42 @@ def _proxies_to_dispatchers(
else:
new_proxies[str(key)] = dispatcher_or_url
return new_proxies
class StreamContextManager:
def __init__(
self,
client: Client,
request: Request,
*,
auth: AuthTypes = None,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
close_client: bool = False,
) -> None:
self.client = client
self.request = request
self.auth = auth
self.allow_redirects = allow_redirects
self.timeout = timeout
self.close_client = close_client
async def __aenter__(self) -> "Response":
self.response = await self.client.send(
request=self.request,
auth=self.auth,
allow_redirects=self.allow_redirects,
timeout=self.timeout,
stream=True,
)
return self.response
async def __aexit__(
self,
exc_type: typing.Type[BaseException] = None,
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> None:
await self.response.close()
if self.close_client:
await self.client.close()

View File

@ -62,16 +62,19 @@ async def test_post_json(server, backend):
async def test_stream_response(server, backend):
async with httpx.Client(backend=backend) as client:
response = await client.request("GET", server.url, stream=True)
async with client.stream("GET", server.url) as response:
body = await response.read()
assert response.status_code == 200
body = await response.read()
assert body == b"Hello, world!"
assert response.content == b"Hello, world!"
async def test_access_content_stream_response(server, backend):
async with httpx.Client(backend=backend) as client:
response = await client.request("GET", server.url, stream=True)
async with client.stream("GET", server.url) as response:
pass
assert response.status_code == 200
with pytest.raises(httpx.ResponseNotRead):
response.content

View File

@ -59,33 +59,36 @@ async def test_post_json(server):
@pytest.mark.asyncio
async def test_stream_response(server):
async with httpx.Client() as client:
response = await client.get(server.url, stream=True)
async with client.stream("GET", server.url) as response:
content = await response.read()
assert response.status_code == 200
content = await response.read()
assert content == b"Hello, world!"
@pytest.mark.asyncio
async def test_stream_iterator(server):
async with httpx.Client() as client:
response = await client.get(server.url, stream=True)
assert response.status_code == 200
body = b""
async for chunk in response.stream_bytes():
body += chunk
async with httpx.Client() as client:
async with client.stream("GET", server.url) as response:
async for chunk in response.stream_bytes():
body += chunk
assert response.status_code == 200
assert body == b"Hello, world!"
@pytest.mark.asyncio
async def test_raw_iterator(server):
async with httpx.Client() as client:
response = await client.get(server.url, stream=True)
assert response.status_code == 200
body = b""
async for chunk in response.stream_raw():
body += chunk
async with httpx.Client() as client:
async with client.stream("GET", server.url) as response:
async for chunk in response.stream_raw():
body += chunk
assert response.status_code == 200
assert body == b"Hello, world!"
await response.close()
@pytest.mark.asyncio
@ -174,17 +177,18 @@ def test_merge_url():
@pytest.mark.asyncio
async def test_elapsed_delay(server):
url = server.url.copy_with(path="/slow_response/100")
async with httpx.Client() as client:
response = await client.get(server.url.copy_with(path="/slow_response/100"))
response = await client.get(url)
assert response.elapsed.total_seconds() == pytest.approx(0.1, rel=0.2)
@pytest.mark.asyncio
async def test_elapsed_delay_ignores_read_time(server):
url = server.url.copy_with(path="/slow_response/100")
async with httpx.Client() as client:
response = await client.get(
server.url.copy_with(path="/slow_response/100"), stream=True
)
sleep(0.2)
await response.read()
async with client.stream("GET", url) as response:
sleep(0.2)
await response.read()
assert response.elapsed.total_seconds() == pytest.approx(0.1, rel=0.2)

View File

@ -66,6 +66,17 @@ async def test_delete(server):
assert response.reason_phrase == "OK"
@pytest.mark.asyncio
async def test_stream(server):
async with httpx.stream("GET", server.url) as response:
await response.read()
assert response.status_code == 200
assert response.reason_phrase == "OK"
assert response.text == "Hello, world!"
assert response.http_version == "HTTP/1.1"
@pytest.mark.asyncio
async def test_get_invalid_url(server):
with pytest.raises(httpx.InvalidURL):

View File

@ -44,9 +44,6 @@ async def test_pool_timeout(server, backend):
async with Client(
pool_limits=pool_limits, timeout=timeout, backend=backend
) as client:
response = await client.get(server.url, stream=True)
with pytest.raises(PoolTimeout):
await client.get("http://localhost:8000/")
await response.read()
async with client.stream("GET", server.url):
with pytest.raises(PoolTimeout):
await client.get("http://localhost:8000/")