Drop Queue from concurrency backends, since it's no longer required (#562)
* Drop Queue from concurrency backends, since it's no longer required * Drop unused import
This commit is contained in:
parent
e045b86d7f
commit
5fccc04da4
@ -11,7 +11,6 @@ from .base import (
|
||||
BaseBackgroundManager,
|
||||
BaseEvent,
|
||||
BasePoolSemaphore,
|
||||
BaseQueue,
|
||||
BaseSocketStream,
|
||||
ConcurrencyBackend,
|
||||
TimeoutFlag,
|
||||
@ -325,9 +324,6 @@ class AsyncioBackend(ConcurrencyBackend):
|
||||
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
|
||||
return PoolSemaphore(limits)
|
||||
|
||||
def create_queue(self, max_size: int) -> BaseQueue:
|
||||
return typing.cast(BaseQueue, asyncio.Queue(maxsize=max_size))
|
||||
|
||||
def create_event(self) -> BaseEvent:
|
||||
return typing.cast(BaseEvent, asyncio.Event())
|
||||
|
||||
|
||||
@ -70,18 +70,6 @@ class BaseSocketStream:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
|
||||
class BaseQueue:
|
||||
"""
|
||||
A FIFO queue. Abstracts away any asyncio-specific interfaces.
|
||||
"""
|
||||
|
||||
async def get(self) -> typing.Any:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
async def put(self, value: typing.Any) -> None:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
|
||||
class BaseEvent:
|
||||
"""
|
||||
An event object. Abstracts away any asyncio-specific interfaces.
|
||||
@ -169,9 +157,6 @@ class ConcurrencyBackend:
|
||||
except StopAsyncIteration:
|
||||
break
|
||||
|
||||
def create_queue(self, max_size: int) -> BaseQueue:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
def create_event(self) -> BaseEvent:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import functools
|
||||
import math
|
||||
import ssl
|
||||
import typing
|
||||
from types import TracebackType
|
||||
@ -12,7 +11,6 @@ from .base import (
|
||||
BaseBackgroundManager,
|
||||
BaseEvent,
|
||||
BasePoolSemaphore,
|
||||
BaseQueue,
|
||||
BaseSocketStream,
|
||||
ConcurrencyBackend,
|
||||
TimeoutFlag,
|
||||
@ -230,9 +228,6 @@ class TrioBackend(ConcurrencyBackend):
|
||||
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
|
||||
return PoolSemaphore(limits)
|
||||
|
||||
def create_queue(self, max_size: int) -> BaseQueue:
|
||||
return Queue(max_size=max_size)
|
||||
|
||||
def create_event(self) -> BaseEvent:
|
||||
return Event()
|
||||
|
||||
@ -242,17 +237,6 @@ class TrioBackend(ConcurrencyBackend):
|
||||
return BackgroundManager(coroutine, *args)
|
||||
|
||||
|
||||
class Queue(BaseQueue):
|
||||
def __init__(self, max_size: int) -> None:
|
||||
self.send_channel, self.receive_channel = trio.open_memory_channel(math.inf)
|
||||
|
||||
async def get(self) -> typing.Any:
|
||||
return await self.receive_channel.receive()
|
||||
|
||||
async def put(self, value: typing.Any) -> None:
|
||||
await self.send_channel.send(value)
|
||||
|
||||
|
||||
class Event(BaseEvent):
|
||||
def __init__(self) -> None:
|
||||
self._event = trio.Event()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user