httpx/tests/test_decoders.py
Michiel W. Beijen 392dbe45f0
Add support for zstd decoding (#3139)
This adds support for zstd decoding using the python package zstandard.
This is similar to how it is implemented in urllib3. I also chose the
optional installation option httpx[zstd] to mimic the same option in
urllib3.

zstd decoding is similar to brotli, but in benchmarks it is supposed to
be even faster. The zstd compression is described in RFC 8878.

See https://github.com/encode/httpx/discussions/1986

Co-authored-by: Kamil Monicz <kamil@monicz.dev>
2024-03-21 10:17:15 +00:00

337 lines
9.6 KiB
Python

from __future__ import annotations
import io
import typing
import zlib
import chardet
import pytest
import zstandard as zstd
import httpx
def test_deflate():
"""
Deflate encoding may use either 'zlib' or 'deflate' in the wild.
https://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib#answer-22311297
"""
body = b"test 123"
compressor = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
compressed_body = compressor.compress(body) + compressor.flush()
headers = [(b"Content-Encoding", b"deflate")]
response = httpx.Response(
200,
headers=headers,
content=compressed_body,
)
assert response.content == body
def test_zlib():
"""
Deflate encoding may use either 'zlib' or 'deflate' in the wild.
https://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib#answer-22311297
"""
body = b"test 123"
compressed_body = zlib.compress(body)
headers = [(b"Content-Encoding", b"deflate")]
response = httpx.Response(
200,
headers=headers,
content=compressed_body,
)
assert response.content == body
def test_gzip():
body = b"test 123"
compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
compressed_body = compressor.compress(body) + compressor.flush()
headers = [(b"Content-Encoding", b"gzip")]
response = httpx.Response(
200,
headers=headers,
content=compressed_body,
)
assert response.content == body
def test_brotli():
body = b"test 123"
compressed_body = b"\x8b\x03\x80test 123\x03"
headers = [(b"Content-Encoding", b"br")]
response = httpx.Response(
200,
headers=headers,
content=compressed_body,
)
assert response.content == body
def test_zstd():
body = b"test 123"
compressed_body = zstd.compress(body)
headers = [(b"Content-Encoding", b"zstd")]
response = httpx.Response(
200,
headers=headers,
content=compressed_body,
)
assert response.content == body
def test_zstd_decoding_error():
compressed_body = "this_is_not_zstd_compressed_data"
headers = [(b"Content-Encoding", b"zstd")]
with pytest.raises(httpx.DecodingError):
httpx.Response(
200,
headers=headers,
content=compressed_body,
)
def test_zstd_multiframe():
# test inspired by urllib3 test suite
data = (
# Zstandard frame
zstd.compress(b"foo")
# skippable frame (must be ignored)
+ bytes.fromhex(
"50 2A 4D 18" # Magic_Number (little-endian)
"07 00 00 00" # Frame_Size (little-endian)
"00 00 00 00 00 00 00" # User_Data
)
# Zstandard frame
+ zstd.compress(b"bar")
)
compressed_body = io.BytesIO(data)
headers = [(b"Content-Encoding", b"zstd")]
response = httpx.Response(200, headers=headers, content=compressed_body)
response.read()
assert response.content == b"foobar"
def test_multi():
body = b"test 123"
deflate_compressor = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
compressed_body = deflate_compressor.compress(body) + deflate_compressor.flush()
gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
compressed_body = (
gzip_compressor.compress(compressed_body) + gzip_compressor.flush()
)
headers = [(b"Content-Encoding", b"deflate, gzip")]
response = httpx.Response(
200,
headers=headers,
content=compressed_body,
)
assert response.content == body
def test_multi_with_identity():
body = b"test 123"
compressed_body = b"\x8b\x03\x80test 123\x03"
headers = [(b"Content-Encoding", b"br, identity")]
response = httpx.Response(
200,
headers=headers,
content=compressed_body,
)
assert response.content == body
headers = [(b"Content-Encoding", b"identity, br")]
response = httpx.Response(
200,
headers=headers,
content=compressed_body,
)
assert response.content == body
@pytest.mark.anyio
async def test_streaming():
body = b"test 123"
compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
async def compress(body: bytes) -> typing.AsyncIterator[bytes]:
yield compressor.compress(body)
yield compressor.flush()
headers = [(b"Content-Encoding", b"gzip")]
response = httpx.Response(
200,
headers=headers,
content=compress(body),
)
assert not hasattr(response, "body")
assert await response.aread() == body
@pytest.mark.parametrize("header_value", (b"deflate", b"gzip", b"br", b"identity"))
def test_empty_content(header_value):
headers = [(b"Content-Encoding", header_value)]
response = httpx.Response(
200,
headers=headers,
content=b"",
)
assert response.content == b""
@pytest.mark.parametrize("header_value", (b"deflate", b"gzip", b"br", b"identity"))
def test_decoders_empty_cases(header_value):
headers = [(b"Content-Encoding", header_value)]
response = httpx.Response(content=b"", status_code=200, headers=headers)
assert response.read() == b""
@pytest.mark.parametrize("header_value", (b"deflate", b"gzip", b"br"))
def test_decoding_errors(header_value):
headers = [(b"Content-Encoding", header_value)]
compressed_body = b"invalid"
with pytest.raises(httpx.DecodingError):
request = httpx.Request("GET", "https://example.org")
httpx.Response(200, headers=headers, content=compressed_body, request=request)
with pytest.raises(httpx.DecodingError):
httpx.Response(200, headers=headers, content=compressed_body)
@pytest.mark.parametrize(
["data", "encoding"],
[
((b"Hello,", b" world!"), "ascii"),
((b"\xe3\x83", b"\x88\xe3\x83\xa9", b"\xe3", b"\x83\x99\xe3\x83\xab"), "utf-8"),
((b"Euro character: \x88! abcdefghijklmnopqrstuvwxyz", b""), "cp1252"),
((b"Accented: \xd6sterreich abcdefghijklmnopqrstuvwxyz", b""), "iso-8859-1"),
],
)
@pytest.mark.anyio
async def test_text_decoder_with_autodetect(data, encoding):
async def iterator() -> typing.AsyncIterator[bytes]:
nonlocal data
for chunk in data:
yield chunk
def autodetect(content):
return chardet.detect(content).get("encoding")
# Accessing `.text` on a read response.
response = httpx.Response(200, content=iterator(), default_encoding=autodetect)
await response.aread()
assert response.text == (b"".join(data)).decode(encoding)
# Streaming `.aiter_text` iteratively.
# Note that if we streamed the text *without* having read it first, then
# we won't get a `charset_normalizer` guess, and will instead always rely
# on utf-8 if no charset is specified.
text = "".join([part async for part in response.aiter_text()])
assert text == (b"".join(data)).decode(encoding)
@pytest.mark.anyio
async def test_text_decoder_known_encoding():
async def iterator() -> typing.AsyncIterator[bytes]:
yield b"\x83g"
yield b"\x83"
yield b"\x89\x83x\x83\x8b"
response = httpx.Response(
200,
headers=[(b"Content-Type", b"text/html; charset=shift-jis")],
content=iterator(),
)
await response.aread()
assert "".join(response.text) == "トラベル"
def test_text_decoder_empty_cases():
response = httpx.Response(200, content=b"")
assert response.text == ""
response = httpx.Response(200, content=[b""])
response.read()
assert response.text == ""
@pytest.mark.parametrize(
["data", "expected"],
[((b"Hello,", b" world!"), ["Hello,", " world!"])],
)
def test_streaming_text_decoder(
data: typing.Iterable[bytes], expected: list[str]
) -> None:
response = httpx.Response(200, content=iter(data))
assert list(response.iter_text()) == expected
def test_line_decoder_nl():
response = httpx.Response(200, content=[b""])
assert list(response.iter_lines()) == []
response = httpx.Response(200, content=[b"", b"a\n\nb\nc"])
assert list(response.iter_lines()) == ["a", "", "b", "c"]
# Issue #1033
response = httpx.Response(
200, content=[b"", b"12345\n", b"foo ", b"bar ", b"baz\n"]
)
assert list(response.iter_lines()) == ["12345", "foo bar baz"]
def test_line_decoder_cr():
response = httpx.Response(200, content=[b"", b"a\r\rb\rc"])
assert list(response.iter_lines()) == ["a", "", "b", "c"]
response = httpx.Response(200, content=[b"", b"a\r\rb\rc\r"])
assert list(response.iter_lines()) == ["a", "", "b", "c"]
# Issue #1033
response = httpx.Response(
200, content=[b"", b"12345\r", b"foo ", b"bar ", b"baz\r"]
)
assert list(response.iter_lines()) == ["12345", "foo bar baz"]
def test_line_decoder_crnl():
response = httpx.Response(200, content=[b"", b"a\r\n\r\nb\r\nc"])
assert list(response.iter_lines()) == ["a", "", "b", "c"]
response = httpx.Response(200, content=[b"", b"a\r\n\r\nb\r\nc\r\n"])
assert list(response.iter_lines()) == ["a", "", "b", "c"]
response = httpx.Response(200, content=[b"", b"a\r", b"\n\r\nb\r\nc"])
assert list(response.iter_lines()) == ["a", "", "b", "c"]
# Issue #1033
response = httpx.Response(200, content=[b"", b"12345\r\n", b"foo bar baz\r\n"])
assert list(response.iter_lines()) == ["12345", "foo bar baz"]
def test_invalid_content_encoding_header():
headers = [(b"Content-Encoding", b"invalid-header")]
body = b"test 123"
response = httpx.Response(
200,
headers=headers,
content=body,
)
assert response.content == body