Connections

This commit is contained in:
Tom Christie 2019-04-05 17:49:59 +01:00
parent fcc1fe1233
commit 6a4376b202
5 changed files with 251 additions and 170 deletions

View File

@ -9,8 +9,7 @@ from .config import (
SSLConfig,
TimeoutConfig,
)
from .decoders import IdentityDecoder
from .exceptions import ResponseClosed, StreamConsumed
from .models import Response
async def request(
@ -22,7 +21,7 @@ async def request(
stream: bool = False,
ssl: SSLConfig = DEFAULT_SSL_CONFIG,
timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
) -> "Response":
) -> Response:
async with PoolManager(ssl=ssl, timeout=timeout) as pool:
return await pool.request(
method=method, url=url, headers=headers, body=body, stream=stream
@ -50,13 +49,8 @@ class PoolManager:
headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
stream: bool = False,
) -> "Response":
if stream:
async def streaming_body():
yield b"Hello, "
yield b"world!"
return Response(200, body=streaming_body)
return Response(200, body=b"Hello, world!")
) -> Response:
raise NotImplementedError()
async def close(self) -> None:
self.is_closed = True
@ -71,67 +65,3 @@ class PoolManager:
traceback: TracebackType = None,
) -> None:
await self.close()
class Response:
def __init__(
self,
status_code: int,
*,
headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
on_close: typing.Callable = None,
):
self.status_code = status_code
self.headers = list(headers)
self.on_close = on_close
self.is_closed = False
self.is_streamed = False
self.decoder = IdentityDecoder()
if isinstance(body, bytes):
self.is_closed = True
self.body = body
else:
self.body_aiter = body
async def read(self) -> bytes:
"""
Read and return the response content.
"""
if not hasattr(self, "body"):
body = b""
async for part in self.stream():
body += part
self.body = body
return self.body
async def stream(self):
"""
A byte-iterator over the decoded response content.
This will allow us to handle gzip, deflate, and brotli encoded responses.
"""
if hasattr(self, "body"):
yield self.body
else:
async for chunk in self.raw():
yield self.decoder.decode(chunk)
yield self.decoder.flush()
async def raw(self) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the raw response content.
"""
if self.is_streamed:
raise StreamConsumed()
if self.is_closed:
raise ResponseClosed()
self.is_streamed = True
async for part in self.body_aiter():
yield part
await self.close()
async def close(self) -> None:
if not self.is_closed:
self.is_closed = True
if self.on_close is not None:
await self.on_close()

70
httpcore/connections.py Normal file
View File

@ -0,0 +1,70 @@
from config import TimeoutConfig
import asyncio
import h11
import ssl
class Connection:
def __init__(self):
self.reader = None
self.writer = None
self.state = h11.Connection(our_role=h11.CLIENT)
async def open(self, host: str, port: int, ssl: ssl.SSLContext):
try:
self.reader, self.writer = await asyncio.wait_for(
asyncio.open_connection(host, port, ssl=ssl), timeout
)
except asyncio.TimeoutError:
raise ConnectTimeout()
async def send(self, request: Request) -> Response:
method = request.method
target = request.url.path
if request.url.query:
target += "?" + request.url.query
headers = [
("host", request.url.netloc)
] += request.headers
# Send the request method, path/query, and headers.
event = h11.Request(method=method, target=target, headers=headers)
await self._send_event(event)
# Send the request body.
if request.is_streaming:
async for data in request.raw():
event = h11.Data(data=data)
await self._send_event(event)
else:
event = h11.Data(data=request.body)
await self._send_event(event)
# Finalize sending the request.
event = h11.EndOfMessage()
await connection.send_event(event)
async def _send_event(self, message):
data = self.state.send(message)
self.writer.write(data)
async def _receive_event(self, timeout):
event = self.state.next_event()
while type(event) is h11.NEED_DATA:
try:
data = await asyncio.wait_for(self.reader.read(2048), timeout)
except asyncio.TimeoutError:
raise ReadTimeout()
self.state.receive_data(data)
event = self.state.next_event()
return event
async def close(self):
self.writer.close()
if hasattr(self.writer, "wait_closed"):
await self.writer.wait_closed()

68
httpcore/models.py Normal file
View File

@ -0,0 +1,68 @@
import typing
from .decoders import IdentityDecoder
from .exceptions import ResponseClosed, StreamConsumed
class Response:
def __init__(
self,
status_code: int,
*,
headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
on_close: typing.Callable = None,
):
self.status_code = status_code
self.headers = list(headers)
self.on_close = on_close
self.is_closed = False
self.is_streamed = False
self.decoder = IdentityDecoder()
if isinstance(body, bytes):
self.is_closed = True
self.body = body
else:
self.body_aiter = body
async def read(self) -> bytes:
"""
Read and return the response content.
"""
if not hasattr(self, "body"):
body = b""
async for part in self.stream():
body += part
self.body = body
return self.body
async def stream(self):
"""
A byte-iterator over the decoded response content.
This will allow us to handle gzip, deflate, and brotli encoded responses.
"""
if hasattr(self, "body"):
yield self.body
else:
async for chunk in self.raw():
yield self.decoder.decode(chunk)
yield self.decoder.flush()
async def raw(self) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the raw response content.
"""
if self.is_streamed:
raise StreamConsumed()
if self.is_closed:
raise ResponseClosed()
self.is_streamed = True
async for part in self.body_aiter():
yield part
await self.close()
async def close(self) -> None:
if not self.is_closed:
self.is_closed = True
if self.on_close is not None:
await self.on_close()

View File

@ -1,96 +0,0 @@
import pytest
import httpcore
@pytest.mark.asyncio
async def test_request():
response = await httpcore.request("GET", "http://example.com")
assert response.status_code == 200
assert response.body == b"Hello, world!"
assert response.is_closed
@pytest.mark.asyncio
async def test_read_response():
response = await httpcore.request("GET", "http://example.com")
assert response.status_code == 200
assert response.body == b"Hello, world!"
assert response.is_closed
body = await response.read()
assert body == b"Hello, world!"
assert response.body == b"Hello, world!"
assert response.is_closed
@pytest.mark.asyncio
async def test_stream_response():
response = await httpcore.request("GET", "http://example.com")
assert response.status_code == 200
assert response.body == b"Hello, world!"
assert response.is_closed
body = b''
async for part in response.stream():
body += part
assert body == b"Hello, world!"
assert response.body == b"Hello, world!"
assert response.is_closed
@pytest.mark.asyncio
async def test_read_streaming_response():
response = await httpcore.request("GET", "http://example.com", stream=True)
assert response.status_code == 200
assert not hasattr(response, 'body')
assert not response.is_closed
body = await response.read()
assert body == b"Hello, world!"
assert response.body == b"Hello, world!"
assert response.is_closed
@pytest.mark.asyncio
async def test_stream_streaming_response():
response = await httpcore.request("GET", "http://example.com", stream=True)
assert response.status_code == 200
assert not hasattr(response, 'body')
assert not response.is_closed
body = b''
async for part in response.stream():
body += part
assert body == b"Hello, world!"
assert not hasattr(response, 'body')
assert response.is_closed
@pytest.mark.asyncio
async def test_cannot_read_after_stream_consumed():
response = await httpcore.request("GET", "http://example.com", stream=True)
body = b''
async for part in response.stream():
body += part
with pytest.raises(httpcore.StreamConsumed):
await response.read()
@pytest.mark.asyncio
async def test_cannot_read_after_response_closed():
response = await httpcore.request("GET", "http://example.com", stream=True)
await response.close()
with pytest.raises(httpcore.ResponseClosed):
await response.read()

109
tests/test_responses.py Normal file
View File

@ -0,0 +1,109 @@
import pytest
import httpcore
class MockRequests(httpcore.PoolManager):
async def request(self, method, url, *, headers = (), body = b'', stream = False) -> httpcore.Response:
if stream:
async def streaming_body():
yield b"Hello, "
yield b"world!"
return httpcore.Response(200, body=streaming_body)
return httpcore.Response(200, body=b"Hello, world!")
http = MockRequests()
@pytest.mark.asyncio
async def test_request():
response = await http.request("GET", "http://example.com")
assert response.status_code == 200
assert response.body == b"Hello, world!"
assert response.is_closed
@pytest.mark.asyncio
async def test_read_response():
response = await http.request("GET", "http://example.com")
assert response.status_code == 200
assert response.body == b"Hello, world!"
assert response.is_closed
body = await response.read()
assert body == b"Hello, world!"
assert response.body == b"Hello, world!"
assert response.is_closed
@pytest.mark.asyncio
async def test_stream_response():
response = await http.request("GET", "http://example.com")
assert response.status_code == 200
assert response.body == b"Hello, world!"
assert response.is_closed
body = b''
async for part in response.stream():
body += part
assert body == b"Hello, world!"
assert response.body == b"Hello, world!"
assert response.is_closed
@pytest.mark.asyncio
async def test_read_streaming_response():
response = await http.request("GET", "http://example.com", stream=True)
assert response.status_code == 200
assert not hasattr(response, 'body')
assert not response.is_closed
body = await response.read()
assert body == b"Hello, world!"
assert response.body == b"Hello, world!"
assert response.is_closed
@pytest.mark.asyncio
async def test_stream_streaming_response():
response = await http.request("GET", "http://example.com", stream=True)
assert response.status_code == 200
assert not hasattr(response, 'body')
assert not response.is_closed
body = b''
async for part in response.stream():
body += part
assert body == b"Hello, world!"
assert not hasattr(response, 'body')
assert response.is_closed
@pytest.mark.asyncio
async def test_cannot_read_after_stream_consumed():
response = await http.request("GET", "http://example.com", stream=True)
body = b''
async for part in response.stream():
body += part
with pytest.raises(httpcore.StreamConsumed):
await response.read()
@pytest.mark.asyncio
async def test_cannot_read_after_response_closed():
response = await http.request("GET", "http://example.com", stream=True)
await response.close()
with pytest.raises(httpcore.ResponseClosed):
await response.read()