Add response.text and response.encoding
This commit is contained in:
parent
4255d41846
commit
d8df61b44b
@ -1,6 +1,9 @@
|
||||
import cgi
|
||||
import typing
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
import chardet
|
||||
|
||||
from .config import SSLConfig, TimeoutConfig
|
||||
from .decoders import (
|
||||
ACCEPT_ENCODING,
|
||||
@ -11,7 +14,12 @@ from .decoders import (
|
||||
)
|
||||
from .exceptions import ResponseClosed, ResponseNotRead, StreamConsumed
|
||||
from .status_codes import codes
|
||||
from .utils import get_reason_phrase, normalize_header_key, normalize_header_value
|
||||
from .utils import (
|
||||
get_reason_phrase,
|
||||
is_known_encoding,
|
||||
normalize_header_key,
|
||||
normalize_header_value,
|
||||
)
|
||||
|
||||
URLTypes = typing.Union["URL", str]
|
||||
|
||||
@ -200,15 +208,8 @@ class Headers(typing.MutableMapping[str, str]):
|
||||
def getlist(self, key: str, split_commas: bool = False) -> typing.List[str]:
|
||||
"""
|
||||
Return multiple header values.
|
||||
|
||||
If there are header values that include commas, then we default to
|
||||
spliting them into multiple results, except for Set-Cookie.
|
||||
|
||||
See: https://tools.ietf.org/html/rfc7230#section-3.2.2
|
||||
"""
|
||||
get_header_key = key.lower().encode(self.encoding)
|
||||
if split_commas is None:
|
||||
split_commas = get_header_key != b"set-cookie"
|
||||
|
||||
values = [
|
||||
item_value.decode(self.encoding)
|
||||
@ -424,13 +425,58 @@ class Response:
|
||||
def content(self) -> bytes:
|
||||
if not hasattr(self, "_content"):
|
||||
if hasattr(self, "_raw_content"):
|
||||
self._content = (
|
||||
self.decoder.decode(self._raw_content) + self.decoder.flush()
|
||||
)
|
||||
content = self.decoder.decode(self._raw_content)
|
||||
content += self.decoder.flush()
|
||||
self._content = content
|
||||
else:
|
||||
raise ResponseNotRead()
|
||||
return self._content
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
if not hasattr(self, "_text"):
|
||||
content = self.content
|
||||
if not content:
|
||||
self._text = ""
|
||||
else:
|
||||
encoding = self.encoding
|
||||
self._text = content.decode(encoding, errors="replace")
|
||||
return self._text
|
||||
|
||||
@property
|
||||
def encoding(self) -> str:
|
||||
if not hasattr(self, "_encoding"):
|
||||
encoding = self.charset_encoding
|
||||
if encoding is None or not is_known_encoding(encoding):
|
||||
encoding = self.apparent_encoding
|
||||
if encoding is None or not is_known_encoding(encoding):
|
||||
encoding = "utf-8"
|
||||
self._encoding = encoding
|
||||
return self._encoding
|
||||
|
||||
@encoding.setter
|
||||
def encoding(self, value: str) -> None:
|
||||
self._encoding = value
|
||||
|
||||
@property
|
||||
def charset_encoding(self) -> typing.Optional[str]:
|
||||
"""
|
||||
Return the encoding, as specified by the Content-Type header.
|
||||
"""
|
||||
content_type = self.headers.get("Content-Type")
|
||||
if content_type is None:
|
||||
return None
|
||||
|
||||
parsed = cgi.parse_header(content_type)[-1]
|
||||
return parsed.get("charset")
|
||||
|
||||
@property
|
||||
def apparent_encoding(self) -> typing.Optional[str]:
|
||||
"""
|
||||
Return the encoding, as it appears to autodetection.
|
||||
"""
|
||||
return chardet.detect(self.content)["encoding"]
|
||||
|
||||
@property
|
||||
def decoder(self) -> Decoder:
|
||||
"""
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import codecs
|
||||
import http
|
||||
import typing
|
||||
from urllib.parse import quote
|
||||
@ -80,3 +81,11 @@ def get_reason_phrase(status_code: int) -> str:
|
||||
return http.HTTPStatus(status_code).phrase
|
||||
except ValueError as exc:
|
||||
return ""
|
||||
|
||||
|
||||
def is_known_encoding(encoding: str) -> bool:
|
||||
try:
|
||||
codecs.lookup(encoding)
|
||||
except LookupError:
|
||||
return False
|
||||
return True
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
certifi
|
||||
chardet
|
||||
h11
|
||||
h2
|
||||
|
||||
|
||||
2
setup.py
2
setup.py
@ -47,7 +47,7 @@ setup(
|
||||
author_email="tom@tomchristie.com",
|
||||
packages=get_packages("httpcore"),
|
||||
data_files=[("", ["LICENSE.md"])],
|
||||
install_requires=["h11", "h2", "certifi"],
|
||||
install_requires=["h11", "h2", "certifi", "chardet"],
|
||||
classifiers=[
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Environment :: Web Environment",
|
||||
|
||||
@ -12,8 +12,37 @@ def test_response():
|
||||
response = httpcore.Response(200, content=b"Hello, world!")
|
||||
assert response.status_code == 200
|
||||
assert response.reason_phrase == "OK"
|
||||
assert response.content == b"Hello, world!"
|
||||
assert response.is_closed
|
||||
assert response.text == "Hello, world!"
|
||||
|
||||
|
||||
def test_response_content_type_encoding():
|
||||
headers = {"Content-Type": "text-plain; charset=latin-1"}
|
||||
response = httpcore.Response(
|
||||
200, content="Latin 1: ÿ".encode("latin-1"), headers=headers
|
||||
)
|
||||
assert response.text == "Latin 1: ÿ"
|
||||
assert response.encoding == "latin-1"
|
||||
|
||||
|
||||
def test_response_autodetect_encoding():
|
||||
response = httpcore.Response(200, content="Snowmen: ☃☃☃".encode("utf-8"))
|
||||
assert response.text == "Snowmen: ☃☃☃"
|
||||
assert response.encoding == "utf-8"
|
||||
|
||||
|
||||
def test_response_default_encoding():
|
||||
response = httpcore.Response(200, content=b"")
|
||||
assert response.text == ""
|
||||
assert response.encoding == "utf-8"
|
||||
|
||||
|
||||
def test_response_force_encoding():
|
||||
response = httpcore.Response(200, content="Snowman: ☃".encode("utf-8"))
|
||||
response.encoding = "iso-8859-1"
|
||||
assert response.status_code == 200
|
||||
assert response.reason_phrase == "OK"
|
||||
assert response.text == "Snowman: â\x98\x83"
|
||||
assert response.encoding == "iso-8859-1"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -21,7 +50,8 @@ async def test_read_response():
|
||||
response = httpcore.Response(200, content=b"Hello, world!")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.content == b"Hello, world!"
|
||||
assert response.text == "Hello, world!"
|
||||
assert response.encoding == "ascii"
|
||||
assert response.is_closed
|
||||
|
||||
content = await response.read()
|
||||
@ -71,3 +101,4 @@ def test_unknown_status_code():
|
||||
response = httpcore.Response(600)
|
||||
assert response.status_code == 600
|
||||
assert response.reason_phrase == ""
|
||||
assert response.text == ""
|
||||
|
||||
@ -9,7 +9,7 @@ async def test_get(server):
|
||||
async with httpcore.Client() as client:
|
||||
response = await client.get(url)
|
||||
assert response.status_code == 200
|
||||
assert response.content == b"Hello, world!"
|
||||
assert response.text == "Hello, world!"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Loading…
Reference in New Issue
Block a user