Rejig test and tighten up models API

This commit is contained in:
Tom Christie 2019-04-30 13:52:37 +01:00
parent 4d908d024f
commit dea4d42c6d
11 changed files with 272 additions and 31 deletions

View File

@ -1,3 +1,6 @@
# Timeout exceptions...
class Timeout(Exception):
"""
A base class for all timeouts.
@ -28,6 +31,18 @@ class PoolTimeout(Timeout):
"""
# HTTP exceptions...
class ProtocolError(Exception):
"""
Malformed HTTP.
"""
# Redirect exceptions...
class RedirectError(Exception):
"""
Base class for HTTP redirect errors.
@ -53,10 +68,7 @@ class RedirectLoop(RedirectError):
"""
class ProtocolError(Exception):
"""
Malformed HTTP.
"""
# Response exceptions...
class StreamConsumed(Exception):

View File

@ -125,34 +125,69 @@ class Headers(typing.MutableMapping[str, str]):
A case-insensitive multidict.
"""
def __init__(self, headers: HeaderTypes = None) -> None:
def __init__(self, headers: HeaderTypes = None, encoding: str = None) -> None:
if headers is None:
self._list = [] # type: typing.List[typing.Tuple[bytes, bytes]]
elif isinstance(headers, Headers):
self._list = list(headers.raw)
elif isinstance(headers, dict):
self._list = [
(normalize_header_key(k), normalize_header_value(v))
(normalize_header_key(k, encoding), normalize_header_value(v, encoding))
for k, v in headers.items()
]
else:
self._list = [
(normalize_header_key(k), normalize_header_value(v)) for k, v in headers
(normalize_header_key(k, encoding), normalize_header_value(v, encoding))
for k, v in headers
]
self._encoding = encoding
@property
def encoding(self) -> str:
"""
Header encoding is mandated as ascii, but utf-8 or iso-8859-1 may be
seen in the wild.
"""
if self._encoding is None:
for encoding in ["ascii", "utf-8"]:
for key, value in self.raw:
try:
key.decode(encoding)
value.decode(encoding)
except UnicodeDecodeError:
break
else:
# The else block runs if 'break' did not occur, meaning
# all values fitted the encoding.
self._encoding = encoding
break
else:
# The ISO-8859-1 encoding covers all 256 code points in a byte,
# so will never raise decode errors.
self._encoding = "iso-8859-1"
return self._encoding
@encoding.setter
def encoding(self, value: str) -> None:
self._encoding = value
@property
def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]:
"""
Returns a list of the raw header items, as byte pairs.
May be mutated in-place.
"""
return self._list
def keys(self) -> typing.List[str]: # type: ignore
return [key.decode("latin-1") for key, value in self._list]
return [key.decode(self.encoding) for key, value in self._list]
def values(self) -> typing.List[str]: # type: ignore
return [value.decode("latin-1") for key, value in self._list]
return [value.decode(self.encoding) for key, value in self._list]
def items(self) -> typing.List[typing.Tuple[str, str]]: # type: ignore
return [
(key.decode("latin-1"), value.decode("latin-1"))
(key.decode(self.encoding), value.decode(self.encoding))
for key, value in self._list
]
@ -162,19 +197,53 @@ class Headers(typing.MutableMapping[str, str]):
except KeyError:
return default
def getlist(self, key: str) -> typing.List[str]:
get_header_key = key.lower().encode("latin-1")
return [
item_value.decode("latin-1")
def getlist(self, key: str, default: typing.Any = None, split_commas = None) -> typing.List[str]:
"""
Return multiple header values.
If there are header values that include commas, then we default to
spliting them into multiple results, except for Set-Cookie.
See: https://tools.ietf.org/html/rfc7230#section-3.2.2
"""
get_header_key = key.lower().encode(self.encoding)
if split_commas is None:
split_commas = get_header_key != b'set-cookie'
values = [
item_value.decode(self.encoding)
for item_key, item_value in self._list
if item_key == get_header_key
]
if not values:
return [] if default is None else default
if not split_commas:
return values
split_values = []
for value in values:
split_values.extend([item.strip() for item in value.split(",")])
return split_values
def __getitem__(self, key: str) -> str:
get_header_key = key.lower().encode("latin-1")
"""
Return a single header value.
If there are multiple headers with the same key, then we concatenate
them with commas. See: https://tools.ietf.org/html/rfc7230#section-3.2.2
"""
normalized_key = key.lower().encode(self.encoding)
items = []
for header_key, header_value in self._list:
if header_key == get_header_key:
return header_value.decode("latin-1")
if header_key == normalized_key:
items.append(header_value.decode(self.encoding))
if items:
return ", ".join(items)
raise KeyError(key)
def __setitem__(self, key: str, value: str) -> None:
@ -182,8 +251,8 @@ class Headers(typing.MutableMapping[str, str]):
Set the header `key` to `value`, removing any duplicate entries.
Retains insertion order.
"""
set_key = key.lower().encode("latin-1")
set_value = value.encode("latin-1")
set_key = key.lower().encode(self.encoding)
set_value = value.encode(self.encoding)
found_indexes = []
for idx, (item_key, item_value) in enumerate(self._list):
@ -203,7 +272,7 @@ class Headers(typing.MutableMapping[str, str]):
"""
Remove the header `key`.
"""
del_key = key.lower().encode("latin-1")
del_key = key.lower().encode(self.encoding)
pop_indexes = []
for idx, (item_key, item_value) in enumerate(self._list):
@ -214,7 +283,7 @@ class Headers(typing.MutableMapping[str, str]):
del self._list[idx]
def __contains__(self, key: typing.Any) -> bool:
get_header_key = key.lower().encode("latin-1")
get_header_key = key.lower().encode(self.encoding)
for header_key, header_value in self._list:
if header_key == get_header_key:
return True
@ -233,10 +302,16 @@ class Headers(typing.MutableMapping[str, str]):
def __repr__(self) -> str:
class_name = self.__class__.__name__
encoding_str = ""
if self.encoding != "ascii":
encoding_str = f", encoding={self.encoding!r}"
as_dict = dict(self.items())
if len(as_dict) == len(self):
return f"{class_name}({as_dict!r})"
return f"{class_name}(raw={self.raw!r})"
return f"{class_name}({as_dict!r}{encoding_str})"
as_list = self.items()
return f"{class_name}({as_list!r}{encoding_str})"
class Request:
@ -351,10 +426,10 @@ class Response:
"""
if not hasattr(self, "_decoder"):
decoders = [] # type: typing.List[Decoder]
value = self.headers.get("content-encoding", "identity")
for part in value.split(","):
part = part.strip().lower()
decoder_cls = SUPPORTED_DECODERS[part]
values = self.headers.getlist("content-encoding", ["identity"])
for value in values:
value = value.strip().lower()
decoder_cls = SUPPORTED_DECODERS[value]
decoders.append(decoder_cls())
if len(decoders) == 1:

View File

@ -54,22 +54,22 @@ def requote_uri(uri: str) -> str:
return quote(uri, safe=safe_without_percent)
def normalize_header_key(value: typing.AnyStr) -> bytes:
def normalize_header_key(value: typing.AnyStr, encoding: str = None) -> bytes:
"""
Coerce str/bytes into a strictly byte-wise HTTP header key.
"""
if isinstance(value, bytes):
return value.lower()
return value.encode("latin-1").lower()
return value.encode(encoding or "ascii").lower()
def normalize_header_value(value: typing.AnyStr) -> bytes:
def normalize_header_value(value: typing.AnyStr, encoding: str = None) -> bytes:
"""
Coerce str/bytes into a strictly byte-wise HTTP header value.
"""
if isinstance(value, bytes):
return value
return value.encode("latin-1")
return value.encode(encoding or "ascii")
def get_reason_phrase(status_code: int) -> str:

View File

@ -0,0 +1,154 @@
import httpcore
def test_headers():
h = httpcore.Headers([("a", "123"), ("a", "456"), ("b", "789")])
assert "a" in h
assert "A" in h
assert "b" in h
assert "B" in h
assert "c" not in h
assert h["a"] == "123, 456"
assert h.get("a") == "123, 456"
assert h.get("nope", default=None) is None
assert h.getlist("a") == ["123", "456"]
assert h.keys() == ["a", "a", "b"]
assert h.values() == ["123", "456", "789"]
assert h.items() == [("a", "123"), ("a", "456"), ("b", "789")]
assert list(h) == ["a", "a", "b"]
assert dict(h) == {"a": "123, 456", "b": "789"}
assert repr(h) == "Headers([('a', '123'), ('a', '456'), ('b', '789')])"
assert h == httpcore.Headers([("a", "123"), ("b", "789"), ("a", "456")])
assert h != [("a", "123"), ("A", "456"), ("b", "789")]
h = httpcore.Headers({"a": "123", "b": "789"})
assert h["A"] == "123"
assert h["B"] == "789"
assert h.raw == [(b"a", b"123"), (b"b", b"789")]
assert repr(h) == "Headers({'a': '123', 'b': '789'})"
def test_header_mutations():
h = httpcore.Headers()
assert dict(h) == {}
h["a"] = "1"
assert dict(h) == {"a": "1"}
h["a"] = "2"
assert dict(h) == {"a": "2"}
h.setdefault("a", "3")
assert dict(h) == {"a": "2"}
h.setdefault("b", "4")
assert dict(h) == {"a": "2", "b": "4"}
del h["a"]
assert dict(h) == {"b": "4"}
assert h.raw == [(b"b", b"4")]
def test_copy_headers():
headers = httpcore.Headers({"custom": "example"})
headers_copy = httpcore.Headers(headers)
assert headers == headers_copy
def test_headers_insert_retains_ordering():
headers = httpcore.Headers({"a": "a", "b": "b", "c": "c"})
headers["b"] = "123"
assert list(headers.values()) == ["a", "123", "c"]
def test_headers_insert_appends_if_new():
headers = httpcore.Headers({"a": "a", "b": "b", "c": "c"})
headers["d"] = "123"
assert list(headers.values()) == ["a", "b", "c", "123"]
def test_headers_insert_removes_all_existing():
headers = httpcore.Headers([("a", "123"), ("a", "456")])
headers["a"] = "789"
assert dict(headers) == {"a": "789"}
def test_headers_delete_removes_all_existing():
headers = httpcore.Headers([("a", "123"), ("a", "456")])
del headers["a"]
assert dict(headers) == {}
def test_headers_dict_repr():
"""
Headers should display with a dict repr by default.
"""
headers = httpcore.Headers({"custom": "example"})
assert repr(headers) == "Headers({'custom': 'example'})"
def test_headers_encoding_in_repr():
"""
Headers should display an encoding in the repr if required.
"""
headers = httpcore.Headers({b"custom": "example ☃".encode("utf-8")})
assert repr(headers) == "Headers({'custom': 'example ☃'}, encoding='utf-8')"
def test_headers_list_repr():
"""
Headers should display with a list repr if they include multiple identical keys.
"""
headers = httpcore.Headers([("custom", "example 1"), ("custom", "example 2")])
assert (
repr(headers) == "Headers([('custom', 'example 1'), ('custom', 'example 2')])"
)
def test_headers_decode_ascii():
"""
Headers should decode as ascii by default.
"""
raw_headers = [(b"Custom", b"Example")]
headers = httpcore.Headers(raw_headers)
assert dict(headers) == {"custom": "Example"}
assert headers.encoding == "ascii"
def test_headers_decode_utf_8():
"""
Headers containing non-ascii codepoints should default to decoding as utf-8.
"""
raw_headers = [(b"Custom", "Code point: ☃".encode("utf-8"))]
headers = httpcore.Headers(raw_headers)
assert dict(headers) == {"custom": "Code point: ☃"}
assert headers.encoding == "utf-8"
def test_headers_decode_iso_8859_1():
"""
Headers containing non-UTF-8 codepoints should default to decoding as iso-8859-1.
"""
raw_headers = [(b"Custom", "Code point: ÿ".encode("iso-8859-1"))]
headers = httpcore.Headers(raw_headers)
assert dict(headers) == {"custom": "Code point: ÿ"}
assert headers.encoding == "iso-8859-1"
def test_headers_decode_explicit_encoding():
"""
An explicit encoding may be set on headers in order to force a
particular decoding.
"""
raw_headers = [(b"Custom", "Code point: ☃".encode("utf-8"))]
headers = httpcore.Headers(raw_headers)
headers.encoding = "iso-8859-1"
print(headers)
assert dict(headers) == {"custom": "Code point: â\x98\x83"}
assert headers.encoding == "iso-8859-1"
def test_multiple_headers():
"""
Most headers should split by commas for `getlist`, except 'Set-Cookie'.
"""
h = httpcore.Headers([('set-cookie', 'a, b'), ('set-cookie', 'c')])
h.getlist('Set-Cookie') == ['a, b', 'b']
h = httpcore.Headers([('vary', 'a, b'), ('vary', 'c')])
h.getlist('Vary') == ['a', 'b', 'c']