Add support for gzip, deflate, and brotli decoding

This commit is contained in:
Tom Christie 2019-04-16 11:26:28 +01:00
parent 9d59a5a1d6
commit 39b57c939d
5 changed files with 213 additions and 34 deletions

4
httpcore/compat.py Normal file
View File

@ -0,0 +1,4 @@
try:
import brotli
except ImportError:
brotli = None

View File

@ -1,7 +1,7 @@
import typing
from urllib.parse import urlsplit
from .decoders import IdentityDecoder
from .decoders import SUPPORTED_DECODERS, Decoder, IdentityDecoder, MultiDecoder
from .exceptions import ResponseClosed, StreamConsumed
@ -95,10 +95,25 @@ class Response:
self.on_close = on_close
self.is_closed = False
self.is_streamed = False
self.decoder = IdentityDecoder()
decoders = [] # type: typing.List[Decoder]
for header, value in self.headers:
if header.strip().lower() == b"content-encoding":
for part in value.split(b","):
part = part.strip().lower()
decoder_cls = SUPPORTED_DECODERS[part]
decoders.append(decoder_cls())
if len(decoders) == 0:
self.decoder = IdentityDecoder() # type: Decoder
elif len(decoders) == 1:
self.decoder = decoders[0]
else:
self.decoder = MultiDecoder(decoders)
if isinstance(body, bytes):
self.is_closed = True
self.body = body
self.body = self.decoder.decode(body) + self.decoder.flush()
else:
self.body_aiter = body

View File

@ -1,41 +1,118 @@
"""
Handlers for Content-Encoding.
See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
"""
import typing
import zlib
from .compat import brotli
class IdentityDecoder:
def decode(self, chunk: bytes) -> bytes:
return chunk
class Decoder:
def decode(self, data: bytes) -> bytes:
raise NotImplementedError() # pragma: nocover
def flush(self) -> bytes:
raise NotImplementedError() # pragma: nocover
class IdentityDecoder(Decoder):
def decode(self, data: bytes) -> bytes:
return data
def flush(self) -> bytes:
return b""
# class DeflateDecoder:
# pass
#
#
# class GZipDecoder:
# pass
#
#
# class BrotliDecoder:
# pass
#
#
# class MultiDecoder:
# def __init__(self, children):
# self.children = children
#
# def decode(self, chunk: bytes) -> bytes:
# data = chunk
# for child in children:
# data = child.decode(data)
# return data
#
# def flush(self) -> bytes:
# data = b''
# for child in children:
# data = child.decode(data)
# data = child.flush()
# return data
class DeflateDecoder(Decoder):
"""
Handle 'deflate' decoding.
See: https://stackoverflow.com/questions/1838699
"""
def __init__(self) -> None:
self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
def decode(self, data: bytes) -> bytes:
return self.decompressor.decompress(data)
def flush(self) -> bytes:
return self.decompressor.flush()
class GZipDecoder(Decoder):
"""
Handle 'gzip' decoding.
See: https://stackoverflow.com/questions/1838699
"""
def __init__(self) -> None:
self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
def decode(self, data: bytes) -> bytes:
return self.decompressor.decompress(data)
def flush(self) -> bytes:
return self.decompressor.flush()
class BrotliDecoder(Decoder):
"""
Handle 'brotli' decoding.
Requires `pip install brotlipy`.
See: https://brotlipy.readthedocs.io/
"""
def __init__(self) -> None:
assert (
brotli is not None
), "The 'brotlipy' library must be installed to use 'BrotliDecoder'"
self.decompressor = brotli.Decompressor()
def decode(self, data: bytes) -> bytes:
return self.decompressor.decompress(data)
def flush(self) -> bytes:
self.decompressor.finish()
return b""
class MultiDecoder(Decoder):
"""
Handle the case where mutliple encodings have been applied.
"""
def __init__(self, children: typing.Sequence[Decoder]) -> None:
"""
children should be a sequence of decoders in the order in which
each was applied.
"""
# Note that we reverse the order for decoding.
self.children = list(reversed(children))
def decode(self, data: bytes) -> bytes:
for child in self.children:
data = child.decode(data)
return data
def flush(self) -> bytes:
data = b""
for child in self.children:
data = child.decode(data) + child.flush()
return data
SUPPORTED_DECODERS = {
b"gzip": GZipDecoder,
b"deflate": DeflateDecoder,
b"identity": IdentityDecoder,
b"br": BrotliDecoder,
}
if brotli is None:
SUPPORTED_DECODERS.pop(b"br") # pragma: nocover

View File

@ -1,6 +1,10 @@
certifi
h11
# Optional
brotlipy
# Testing
autoflake
black

79
tests/test_decoding.py Normal file
View File

@ -0,0 +1,79 @@
import zlib
import brotli
import pytest
import httpcore
def test_deflate():
body = b"test 123"
compressor = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
compressed_body = compressor.compress(body) + compressor.flush()
headers = [(b"Content-Encoding", b"deflate")]
response = httpcore.Response(200, headers=headers, body=compressed_body)
assert response.body == body
def test_gzip():
body = b"test 123"
compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
compressed_body = compressor.compress(body) + compressor.flush()
headers = [(b"Content-Encoding", b"gzip")]
response = httpcore.Response(200, headers=headers, body=compressed_body)
assert response.body == body
def test_brotli():
body = b"test 123"
compressed_body = brotli.compress(body)
headers = [(b"Content-Encoding", b"br")]
response = httpcore.Response(200, headers=headers, body=compressed_body)
assert response.body == body
def test_multi():
body = b"test 123"
deflate_compressor = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
compressed_body = deflate_compressor.compress(body) + deflate_compressor.flush()
gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
compressed_body = (
gzip_compressor.compress(compressed_body) + gzip_compressor.flush()
)
headers = [(b"Content-Encoding", b"deflate, gzip")]
response = httpcore.Response(200, headers=headers, body=compressed_body)
assert response.body == body
def test_multi_with_identity():
body = b"test 123"
compressed_body = brotli.compress(body)
headers = [(b"Content-Encoding", b"br, identity")]
response = httpcore.Response(200, headers=headers, body=compressed_body)
assert response.body == body
headers = [(b"Content-Encoding", b"identity, br")]
response = httpcore.Response(200, headers=headers, body=compressed_body)
assert response.body == body
@pytest.mark.asyncio
async def test_streaming():
body = b"test 123"
compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
async def compress(body):
yield compressor.compress(body)
yield compressor.flush()
headers = [(b"Content-Encoding", b"gzip")]
response = httpcore.Response(200, headers=headers, body=compress(body))
assert not hasattr(response, "body")
assert await response.read() == body