Add Response.stream_lines (#575)
This commit is contained in:
parent
fdaa01275a
commit
248aa580a1
@ -211,6 +211,67 @@ class TextDecoder:
|
||||
return result
|
||||
|
||||
|
||||
class LineDecoder:
|
||||
"""
|
||||
Handles incrementally reading lines from text.
|
||||
|
||||
Uses universal line decoding, supporting any of `\n`, `\r`, or `\r\n`
|
||||
as line endings, normalizing to `\n`.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.buffer = ""
|
||||
|
||||
def decode(self, text: str) -> typing.List[str]:
|
||||
lines = []
|
||||
|
||||
if text.startswith("\n") and self.buffer and self.buffer[-1] == "\r":
|
||||
# Handle the case where we have an "\r\n" split across
|
||||
# our previous input, and our new chunk.
|
||||
lines.append(self.buffer[:-1] + "\n")
|
||||
self.buffer = ""
|
||||
text = text[1:]
|
||||
|
||||
while text:
|
||||
num_chars = len(text)
|
||||
for idx in range(num_chars):
|
||||
char = text[idx]
|
||||
next_char = None if idx + 1 == num_chars else text[idx + 1]
|
||||
if char == "\n":
|
||||
lines.append(self.buffer + text[: idx + 1])
|
||||
self.buffer = ""
|
||||
text = text[idx + 1 :]
|
||||
break
|
||||
elif char == "\r" and next_char == "\n":
|
||||
lines.append(self.buffer + text[:idx] + "\n")
|
||||
self.buffer = ""
|
||||
text = text[idx + 2 :]
|
||||
break
|
||||
elif char == "\r" and next_char is not None:
|
||||
lines.append(self.buffer + text[:idx] + "\n")
|
||||
self.buffer = ""
|
||||
text = text[idx + 1 :]
|
||||
break
|
||||
elif next_char is None:
|
||||
self.buffer = text
|
||||
text = ""
|
||||
break
|
||||
|
||||
return lines
|
||||
|
||||
def flush(self) -> typing.List[str]:
|
||||
if self.buffer.endswith("\r"):
|
||||
# Handle the case where we had a trailing '\r', which could have
|
||||
# been a '\r\n' pair.
|
||||
lines = [self.buffer[:-1] + "\n"]
|
||||
elif self.buffer:
|
||||
lines = [self.buffer]
|
||||
else:
|
||||
lines = []
|
||||
self.buffer = ""
|
||||
return lines
|
||||
|
||||
|
||||
SUPPORTED_DECODERS = {
|
||||
"identity": IdentityDecoder,
|
||||
"gzip": GZipDecoder,
|
||||
|
||||
@ -17,6 +17,7 @@ from .decoders import (
|
||||
SUPPORTED_DECODERS,
|
||||
Decoder,
|
||||
IdentityDecoder,
|
||||
LineDecoder,
|
||||
MultiDecoder,
|
||||
TextDecoder,
|
||||
)
|
||||
@ -936,6 +937,14 @@ class Response:
|
||||
yield decoder.decode(chunk)
|
||||
yield decoder.flush()
|
||||
|
||||
async def stream_lines(self) -> typing.AsyncIterator[str]:
|
||||
decoder = LineDecoder()
|
||||
async for text in self.stream_text():
|
||||
for line in decoder.decode(text):
|
||||
yield line
|
||||
for line in decoder.flush():
|
||||
yield line
|
||||
|
||||
async def raw(self) -> typing.AsyncIterator[bytes]:
|
||||
"""
|
||||
A byte-iterator over the raw response content.
|
||||
|
||||
@ -164,6 +164,18 @@ async def test_stream_text():
|
||||
assert content == "Hello, world!"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_lines():
|
||||
response = httpx.Response(200, content=b"Hello,\nworld!")
|
||||
|
||||
await response.read()
|
||||
|
||||
content = []
|
||||
async for line in response.stream_lines():
|
||||
content.append(line)
|
||||
assert content == ["Hello,\n", "world!"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_interface_after_read():
|
||||
response = httpx.Response(200, content=b"Hello, world!")
|
||||
|
||||
@ -9,6 +9,7 @@ from httpx.decoders import (
|
||||
DeflateDecoder,
|
||||
GZipDecoder,
|
||||
IdentityDecoder,
|
||||
LineDecoder,
|
||||
TextDecoder,
|
||||
)
|
||||
|
||||
@ -167,6 +168,48 @@ def test_text_decoder_empty_cases():
|
||||
assert decoder.flush() == ""
|
||||
|
||||
|
||||
def test_line_decoder_nl():
|
||||
decoder = LineDecoder()
|
||||
assert decoder.decode("") == []
|
||||
assert decoder.decode("a\n\nb\nc") == ["a\n", "\n", "b\n"]
|
||||
assert decoder.flush() == ["c"]
|
||||
|
||||
decoder = LineDecoder()
|
||||
assert decoder.decode("") == []
|
||||
assert decoder.decode("a\n\nb\nc\n") == ["a\n", "\n", "b\n", "c\n"]
|
||||
assert decoder.flush() == []
|
||||
|
||||
|
||||
def test_line_decoder_cr():
|
||||
decoder = LineDecoder()
|
||||
assert decoder.decode("") == []
|
||||
assert decoder.decode("a\r\rb\rc") == ["a\n", "\n", "b\n"]
|
||||
assert decoder.flush() == ["c"]
|
||||
|
||||
decoder = LineDecoder()
|
||||
assert decoder.decode("") == []
|
||||
assert decoder.decode("a\r\rb\rc\r") == ["a\n", "\n", "b\n"]
|
||||
assert decoder.flush() == ["c\n"]
|
||||
|
||||
|
||||
def test_line_decoder_crnl():
|
||||
decoder = LineDecoder()
|
||||
assert decoder.decode("") == []
|
||||
assert decoder.decode("a\r\n\r\nb\r\nc") == ["a\n", "\n", "b\n"]
|
||||
assert decoder.flush() == ["c"]
|
||||
|
||||
decoder = LineDecoder()
|
||||
assert decoder.decode("") == []
|
||||
assert decoder.decode("a\r\n\r\nb\r\nc\r\n") == ["a\n", "\n", "b\n", "c\n"]
|
||||
assert decoder.flush() == []
|
||||
|
||||
decoder = LineDecoder()
|
||||
assert decoder.decode("") == []
|
||||
assert decoder.decode("a\r") == []
|
||||
assert decoder.decode("\n\r\nb\r\nc") == ["a\n", "\n", "b\n"]
|
||||
assert decoder.flush() == ["c"]
|
||||
|
||||
|
||||
def test_invalid_content_encoding_header():
|
||||
headers = [(b"Content-Encoding", b"invalid-header")]
|
||||
body = b"test 123"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user