Add BaseQueue interface (#257)
This commit is contained in:
parent
e0b523dd1b
commit
fc261e8149
@ -19,6 +19,7 @@ from ..exceptions import ConnectTimeout, PoolTimeout, ReadTimeout, WriteTimeout
|
||||
from ..interfaces import (
|
||||
BaseBackgroundManager,
|
||||
BasePoolSemaphore,
|
||||
BaseQueue,
|
||||
BaseReader,
|
||||
BaseWriter,
|
||||
ConcurrencyBackend,
|
||||
@ -246,6 +247,9 @@ class AsyncioBackend(ConcurrencyBackend):
|
||||
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
|
||||
return PoolSemaphore(limits)
|
||||
|
||||
def create_queue(self, max_size: int) -> BaseQueue:
|
||||
return typing.cast(BaseQueue, asyncio.Queue(maxsize=max_size))
|
||||
|
||||
def background_manager(
|
||||
self, coroutine: typing.Callable, args: typing.Any
|
||||
) -> "BackgroundManager":
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
import asyncio
|
||||
import typing
|
||||
|
||||
from ..concurrency.asyncio import AsyncioBackend
|
||||
from ..config import CertTypes, TimeoutTypes, VerifyTypes
|
||||
from ..interfaces import AsyncDispatcher
|
||||
from ..interfaces import AsyncDispatcher, ConcurrencyBackend
|
||||
from ..models import AsyncRequest, AsyncResponse
|
||||
|
||||
|
||||
@ -52,6 +53,8 @@ class ASGIDispatch(AsyncDispatcher):
|
||||
self.raise_app_exceptions = raise_app_exceptions
|
||||
self.root_path = root_path
|
||||
self.client = client
|
||||
# This will need to be turned into a parameter on this class at some point.
|
||||
self.backend: ConcurrencyBackend = AsyncioBackend()
|
||||
|
||||
async def send(
|
||||
self,
|
||||
@ -79,7 +82,7 @@ class ASGIDispatch(AsyncDispatcher):
|
||||
status_code = None
|
||||
headers = None
|
||||
response_started_or_failed = asyncio.Event()
|
||||
response_body = BodyIterator()
|
||||
response_body = BodyIterator(self.backend)
|
||||
request_stream = request.stream()
|
||||
|
||||
async def receive() -> dict:
|
||||
@ -156,10 +159,8 @@ class BodyIterator:
|
||||
ingest the response content from.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._queue = asyncio.Queue(
|
||||
maxsize=1
|
||||
) # type: asyncio.Queue[typing.Union[bytes, object]]
|
||||
def __init__(self, backend: ConcurrencyBackend) -> None:
|
||||
self._queue = backend.create_queue(max_size=1)
|
||||
self._done = object()
|
||||
|
||||
async def iterate(self) -> typing.AsyncIterator[bytes]:
|
||||
|
||||
@ -145,6 +145,18 @@ class BaseWriter:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
|
||||
class BaseQueue:
|
||||
"""
|
||||
A FIFO queue. Abstracts away any asyncio-specific interfaces.
|
||||
"""
|
||||
|
||||
async def get(self) -> typing.Any:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
async def put(self, value: typing.Any) -> None:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
|
||||
class BasePoolSemaphore:
|
||||
"""
|
||||
A semaphore for use with connection pooling.
|
||||
@ -205,6 +217,9 @@ class ConcurrencyBackend:
|
||||
except StopAsyncIteration:
|
||||
break
|
||||
|
||||
def create_queue(self, max_size: int) -> BaseQueue:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
def background_manager(
|
||||
self, coroutine: typing.Callable, args: typing.Any
|
||||
) -> "BaseBackgroundManager":
|
||||
|
||||
Loading…
Reference in New Issue
Block a user