More coverage improvements (#711)
* More coverage improvements * Drop redundant 'sleep' usage in test utils
This commit is contained in:
parent
11e7604d1a
commit
f5eaec7ab3
@ -18,7 +18,7 @@ class AutoBackend(ConcurrencyBackend):
|
||||
def backend(self) -> ConcurrencyBackend:
|
||||
if not hasattr(self, "_backend_implementation"):
|
||||
backend = sniffio.current_async_library()
|
||||
if backend not in ("asyncio", "trio"):
|
||||
if backend not in ("asyncio", "trio"): # pragma: nocover
|
||||
raise RuntimeError(f"Unsupported concurrency backend {backend!r}")
|
||||
self._backend_implementation = lookup_backend(backend)
|
||||
return self._backend_implementation
|
||||
|
||||
@ -253,15 +253,6 @@ def get_logger(name: str) -> Logger:
|
||||
return typing.cast(Logger, logger)
|
||||
|
||||
|
||||
def kv_format(**kwargs: typing.Any) -> str:
|
||||
"""Format arguments into a key=value line.
|
||||
|
||||
>>> formatkv(x=1, name="Bob")
|
||||
"x=1 name='Bob'"
|
||||
"""
|
||||
return " ".join(f"{key}={value!r}" for key, value in kwargs.items())
|
||||
|
||||
|
||||
def should_not_be_proxied(url: "URL") -> bool:
|
||||
""" Return True if url should not be proxied,
|
||||
return False otherwise.
|
||||
|
||||
@ -14,26 +14,6 @@ from httpx.backends.auto import AutoBackend
|
||||
from httpx.backends.trio import TrioBackend
|
||||
|
||||
|
||||
@functools.singledispatch
|
||||
async def sleep(backend, seconds: int):
|
||||
raise NotImplementedError # pragma: no cover
|
||||
|
||||
|
||||
@sleep.register(AutoBackend)
|
||||
async def _sleep_auto(backend, seconds: int):
|
||||
return await sleep(backend.backend, seconds=seconds)
|
||||
|
||||
|
||||
@sleep.register(AsyncioBackend)
|
||||
async def _sleep_asyncio(backend, seconds: int):
|
||||
await asyncio.sleep(seconds)
|
||||
|
||||
|
||||
@sleep.register(TrioBackend)
|
||||
async def _sleep_trio(backend, seconds: int):
|
||||
await trio.sleep(seconds)
|
||||
|
||||
|
||||
@functools.singledispatch
|
||||
async def run_concurrently(backend, *coroutines: typing.Callable[[], typing.Awaitable]):
|
||||
raise NotImplementedError # pragma: no cover
|
||||
|
||||
@ -7,7 +7,6 @@ import h2.events
|
||||
|
||||
from httpx import Request, Timeout
|
||||
from httpx.backends.base import BaseSocketStream, lookup_backend
|
||||
from tests.concurrency import sleep
|
||||
|
||||
|
||||
class MockHTTP2Backend:
|
||||
@ -49,7 +48,6 @@ class MockHTTP2Server(BaseSocketStream):
|
||||
return "HTTP/2"
|
||||
|
||||
async def read(self, n, timeout, flag=None) -> bytes:
|
||||
await sleep(self.backend, 0)
|
||||
send, self.buffer = self.buffer[:n], self.buffer[n:]
|
||||
return send
|
||||
|
||||
@ -195,7 +193,6 @@ class MockRawSocketStream(BaseSocketStream):
|
||||
self.backend.received_data.append(data)
|
||||
|
||||
async def read(self, n, timeout, flag=None) -> bytes:
|
||||
await sleep(self.backend.backend, 0)
|
||||
if not self.backend.data_to_send:
|
||||
return b""
|
||||
return self.backend.data_to_send.pop(0)
|
||||
|
||||
@ -125,10 +125,7 @@ def test_decoding_errors(header_value):
|
||||
body = b"test 123"
|
||||
compressed_body = brotli.compress(body)[3:]
|
||||
with pytest.raises(httpx.DecodingError):
|
||||
response = httpx.Response(
|
||||
200, headers=headers, content=compressed_body, request=REQUEST
|
||||
)
|
||||
response.content
|
||||
httpx.Response(200, headers=headers, content=compressed_body, request=REQUEST)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user