Fixed missing coverage

This commit is contained in:
Alex Grönholm 2025-07-02 13:39:08 +03:00
parent 7b2b429cf1
commit 1d5173cd95

View File

@ -65,10 +65,20 @@ async def test_bytesio_content():
@pytest.mark.anyio
async def test_async_bytesio_content():
async def fixed_stream(content: bytes) -> AsyncGenerator[bytes]:
yield content
class AsyncBytesIO:
def __init__(self, content: bytes) -> None:
self._idx = 0
self._content = content
request = httpx.Request(method, url, content=fixed_stream(b"Hello, world!"))
async def aread(self, chunk_size: int) -> bytes:
chunk = self._content[self._idx : self._idx + chunk_size]
self._idx = self._idx + chunk_size
return chunk
async def __aiter__(self):
yield self._content # pragma: no cover
request = httpx.Request(method, url, content=AsyncBytesIO(b"Hello, world!"))
assert not isinstance(request.stream, typing.Iterable)
assert isinstance(request.stream, typing.AsyncIterable)