Add stream ID to HTTP2Connection

This commit is contained in:
Tom Christie 2019-04-24 17:04:15 +01:00
parent 53f3dc4a66
commit 0bdcbaef72
7 changed files with 45 additions and 50 deletions

View File

@ -1,7 +1,6 @@
from .config import PoolLimits, SSLConfig, TimeoutConfig
from .connection import HTTPConnection
from .connectionpool import ConnectionPool
from .datastructures import URL, Origin, Request, Response
from .connection_pool import ConnectionPool
from .exceptions import (
ConnectTimeout,
PoolTimeout,
@ -13,6 +12,7 @@ from .exceptions import (
)
from .http2 import HTTP2Connection
from .http11 import HTTP11Connection
from .models import URL, Origin, Request, Response
from .sync import SyncClient, SyncConnectionPool
__version__ = "0.2.1"

View File

@ -5,10 +5,10 @@ import h2.connection
import h11
from .config import DEFAULT_SSL_CONFIG, DEFAULT_TIMEOUT_CONFIG, SSLConfig, TimeoutConfig
from .datastructures import Client, Origin, Request, Response
from .exceptions import ConnectTimeout
from .http2 import HTTP2Connection
from .http11 import HTTP11Connection
from .models import Client, Origin, Request, Response
class HTTPConnection(Client):

View File

@ -11,8 +11,8 @@ from .config import (
TimeoutConfig,
)
from .connection import HTTPConnection
from .datastructures import Client, Origin, Request, Response
from .exceptions import PoolTimeout
from .models import Client, Origin, Request, Response
class ConnectionPool(Client):

View File

@ -4,8 +4,8 @@ import typing
import h11
from .config import DEFAULT_SSL_CONFIG, DEFAULT_TIMEOUT_CONFIG, SSLConfig, TimeoutConfig
from .datastructures import Client, Origin, Request, Response
from .exceptions import ConnectTimeout, ReadTimeout
from .models import Client, Origin, Request, Response
H11Event = typing.Union[
h11.Request,

View File

@ -5,8 +5,8 @@ import h2.connection
import h2.events
from .config import DEFAULT_SSL_CONFIG, DEFAULT_TIMEOUT_CONFIG, SSLConfig, TimeoutConfig
from .datastructures import Client, Origin, Request, Response
from .exceptions import ConnectTimeout, ReadTimeout
from .models import Client, Origin, Request, Response
class HTTP2Connection(Client):
@ -24,7 +24,8 @@ class HTTP2Connection(Client):
self.timeout = timeout
self.on_release = on_release
self.h2_state = h2.connection.H2Connection()
self.events = [] # type: typing.List[h2.events.Event]
self.events = {} # type: typing.Dict[int, typing.List[h2.events.Event]]
self.initialized = False
@property
def is_closed(self) -> bool:
@ -40,20 +41,24 @@ class HTTP2Connection(Client):
if timeout is None:
timeout = self.timeout
if not self.initialized:
self.initiate_connection()
#  Start sending the request.
await self._initiate_connection()
await self._send_headers(request)
stream_id = self.h2_state.get_next_available_stream_id()
self.events[stream_id] = []
await self.send_headers(stream_id, request)
# Send the request body.
if request.body:
await self._send_data(request.body)
async for data in request.stream():
await self.send_data(stream_id, data)
# Finalize sending the request.
await self._end_stream()
await self.end_stream(stream_id)
# Start getting the response.
while True:
event = await self._receive_event(timeout)
event = await self.receive_event(stream_id, timeout)
if isinstance(event, h2.events.ResponseReceived):
break
@ -65,51 +70,57 @@ class HTTP2Connection(Client):
elif not k.startswith(b":"):
headers.append((k, v))
body = self._body_iter(timeout)
body = self.body_iter(stream_id, timeout)
return Response(
status_code=status_code,
protocol="HTTP/2",
headers=headers,
body=body,
on_close=self._release,
on_close=self.release,
)
async def _initiate_connection(self) -> None:
def initiate_connection(self) -> None:
self.h2_state.initiate_connection()
data_to_send = self.h2_state.data_to_send()
self.writer.write(data_to_send)
self.initialized = True
async def _send_headers(self, request: Request) -> None:
async def send_headers(self, stream_id: int, request: Request) -> None:
headers = [
(b":method", request.method.encode()),
(b":authority", request.url.hostname.encode()),
(b":scheme", request.url.scheme.encode()),
(b":path", request.url.full_path.encode()),
] + request.headers
self.h2_state.send_headers(1, headers)
self.h2_state.send_headers(stream_id, headers)
data_to_send = self.h2_state.data_to_send()
self.writer.write(data_to_send)
async def _send_data(self, data: bytes) -> None:
self.h2_state.send_data(1, data)
async def send_data(self, stream_id: int, data: bytes) -> None:
self.h2_state.send_data(stream_id, data)
data_to_send = self.h2_state.data_to_send()
self.writer.write(data_to_send)
async def _end_stream(self) -> None:
self.h2_state.end_stream(1)
async def end_stream(self, stream_id: int) -> None:
self.h2_state.end_stream(stream_id)
data_to_send = self.h2_state.data_to_send()
self.writer.write(data_to_send)
async def _body_iter(self, timeout: TimeoutConfig) -> typing.AsyncIterator[bytes]:
async def body_iter(
self, stream_id: int, timeout: TimeoutConfig
) -> typing.AsyncIterator[bytes]:
while True:
event = await self._receive_event(timeout)
event = await self.receive_event(stream_id, timeout)
if isinstance(event, h2.events.DataReceived):
yield event.data
elif isinstance(event, h2.events.StreamEnded):
del self.events[stream_id]
break
async def _receive_event(self, timeout: TimeoutConfig) -> h2.events.Event:
while not self.events:
async def receive_event(
self, stream_id: int, timeout: TimeoutConfig
) -> h2.events.Event:
while not self.events[stream_id]:
try:
data = await asyncio.wait_for(
self.reader.read(2048), timeout.read_timeout
@ -118,35 +129,19 @@ class HTTP2Connection(Client):
raise ReadTimeout()
events = self.h2_state.receive_data(data)
self.events.extend(events)
for event in events:
if getattr(event, "stream_id", 0):
self.events[event.stream_id].append(event)
data_to_send = self.h2_state.data_to_send()
if data_to_send:
self.writer.write(data_to_send)
return self.events.pop(0)
async def _release(self) -> None:
# if (
# self.h11_state.our_state is h11.DONE
# and self.h11_state.their_state is h11.DONE
# ):
# self.h11_state.start_next_cycle()
# else:
# await self.close()
return self.events[stream_id].pop(0)
async def release(self) -> None:
if self.on_release is not None:
await self.on_release(self)
async def close(self) -> None:
# event = h11.ConnectionClosed()
# try:
# # If we're in h11.MUST_CLOSE then we'll end up in h11.CLOSED.
# self.h11_state.send(event)
# except h11.ProtocolError:
# # If we're in some other state then it's a premature close,
# # and we'll end up in h11.ERROR.
# pass
if self.writer is not None:
self.writer.close()
self.writer.close()

View File

@ -3,8 +3,8 @@ import typing
from types import TracebackType
from .config import SSLConfig, TimeoutConfig
from .connectionpool import ConnectionPool
from .datastructures import URL, Client, Response
from .connection_pool import ConnectionPool
from .models import URL, Client, Response
class SyncResponse: