Add charset_normalizer detection. (#1791)

* Add charset_normalizer detection

* Tweak JSON tests for slightly different charset decoding behaviour

* Add charset-normalizer to docs
This commit is contained in:
Tom Christie 2021-08-13 11:38:53 +01:00 committed by GitHub
parent 77246617ca
commit acb5e6ac50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 82 additions and 91 deletions

View File

@ -119,6 +119,7 @@ The HTTPX project relies on these excellent libraries:
* `h11` - HTTP/1.1 support.
* `h2` - HTTP/2 support. *(Optional)*
* `certifi` - SSL certificates.
* `charset_normalizer` - Charset auto-detection.
* `rfc3986` - URL parsing & normalization.
* `idna` - Internationalized domain name support.
* `sniffio` - Async library autodetection.

View File

@ -111,6 +111,7 @@ The HTTPX project relies on these excellent libraries:
* `h11` - HTTP/1.1 support.
* `h2` - HTTP/2 support. *(Optional)*
* `certifi` - SSL certificates.
* `charset_normalizer` - Charset auto-detection.
* `rfc3986` - URL parsing & normalization.
* `idna` - Internationalized domain name support.
* `sniffio` - Async library autodetection.

View File

@ -241,52 +241,13 @@ class TextDecoder:
Handles incrementally decoding bytes into text
"""
def __init__(self, encoding: typing.Optional[str] = None):
self.decoder: typing.Optional[codecs.IncrementalDecoder] = None
if encoding is not None:
self.decoder = codecs.getincrementaldecoder(encoding)(errors="strict")
def __init__(self, encoding: str = "utf-8"):
self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace")
def decode(self, data: bytes) -> str:
"""
If an encoding is explicitly specified, then we use that.
Otherwise our strategy is to attempt UTF-8, and fallback to Windows 1252.
Note that UTF-8 is a strict superset of ascii, and Windows 1252 is a
superset of the non-control characters in iso-8859-1, so we essentially
end up supporting any of ascii, utf-8, iso-8859-1, cp1252.
Given that UTF-8 is now by *far* the most widely used encoding, this
should be a pretty robust strategy for cases where a charset has
not been explicitly included.
Useful stats on the prevalence of different charsets in the wild...
* https://w3techs.com/technologies/overview/character_encoding
* https://w3techs.com/technologies/history_overview/character_encoding
The HTML5 spec also has some useful guidelines, suggesting defaults of
either UTF-8 or Windows 1252 in most cases...
* https://dev.w3.org/html5/spec-LC/Overview.html
"""
if self.decoder is None:
# If this is the first decode pass then we need to determine which
# encoding to use by attempting UTF-8 and raising any decode errors.
attempt_utf_8 = codecs.getincrementaldecoder("utf-8")(errors="strict")
try:
attempt_utf_8.decode(data)
except UnicodeDecodeError:
# Could not decode as UTF-8. Use Windows 1252.
self.decoder = codecs.getincrementaldecoder("cp1252")(errors="replace")
else:
# Can decode as UTF-8. Use UTF-8 with lenient error settings.
self.decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
return self.decoder.decode(data)
def flush(self) -> str:
if self.decoder is None:
return ""
return self.decoder.decode(b"", True)

View File

@ -8,6 +8,7 @@ from collections.abc import MutableMapping
from http.cookiejar import Cookie, CookieJar
from urllib.parse import parse_qs, quote, unquote, urlencode
import charset_normalizer
import idna
import rfc3986
import rfc3986.exceptions
@ -1314,22 +1315,26 @@ class Response:
if not content:
self._text = ""
else:
decoder = TextDecoder(encoding=self.encoding)
decoder = TextDecoder(encoding=self.encoding or "utf-8")
self._text = "".join([decoder.decode(self.content), decoder.flush()])
return self._text
@property
def encoding(self) -> typing.Optional[str]:
"""
Return the encoding, which may have been set explicitly, or may have
been specified by the Content-Type header.
Return an encoding to use for decoding the byte content into text.
The priority for determining this is given by...
* `.encoding = <>` has been set explicitly.
* The encoding as specified by the charset parameter in the Content-Type header.
* The encoding as determined by `charset_normalizer`.
* UTF-8.
"""
if not hasattr(self, "_encoding"):
encoding = self.charset_encoding
if encoding is None or not is_known_encoding(encoding):
self._encoding = None
else:
self._encoding = encoding
encoding = self.apparent_encoding
self._encoding = encoding
return self._encoding
@encoding.setter
@ -1351,6 +1356,19 @@ class Response:
return params["charset"].strip("'\"")
@property
def apparent_encoding(self) -> typing.Optional[str]:
"""
Return the encoding, as detemined by `charset_normalizer`.
"""
content = getattr(self, "_content", b"")
if len(content) < 32:
# charset_normalizer will issue warnings if we run it with
# fewer bytes than this cutoff.
return None
match = charset_normalizer.from_bytes(self.content).best()
return None if match is None else match.encoding
def _get_content_decoder(self) -> ContentDecoder:
"""
Returns a decoder instance which can be used to decode the raw byte
@ -1411,10 +1429,7 @@ class Response:
if self.charset_encoding is None and self.content and len(self.content) > 3:
encoding = guess_json_utf(self.content)
if encoding is not None:
try:
return jsonlib.loads(self.content.decode(encoding), **kwargs)
except UnicodeDecodeError:
pass
return jsonlib.loads(self.content.decode(encoding), **kwargs)
return jsonlib.loads(self.text, **kwargs)
@property
@ -1495,7 +1510,7 @@ class Response:
that handles both gzip, deflate, etc but also detects the content's
string encoding.
"""
decoder = TextDecoder(encoding=self.encoding)
decoder = TextDecoder(encoding=self.encoding or "utf-8")
chunker = TextChunker(chunk_size=chunk_size)
with request_context(request=self._request):
for byte_content in self.iter_bytes():
@ -1593,7 +1608,7 @@ class Response:
that handles both gzip, deflate, etc but also detects the content's
string encoding.
"""
decoder = TextDecoder(encoding=self.encoding)
decoder = TextDecoder(encoding=self.encoding or "utf-8")
chunker = TextChunker(chunk_size=chunk_size)
with request_context(request=self._request):
async for byte_content in self.aiter_bytes():

View File

@ -57,6 +57,7 @@ setup(
zip_safe=False,
install_requires=[
"certifi",
"charset_normalizer",
"sniffio",
"rfc3986[idna2008]>=1.3,<2",
"httpcore>=0.13.3,<0.14.0",

View File

@ -1,6 +1,5 @@
import json
import pickle
from unittest import mock
import brotlicffi
import pytest
@ -197,15 +196,16 @@ def test_response_no_charset_with_iso_8859_1_content():
A response with ISO 8859-1 encoded content should decode correctly,
even with no charset specified.
"""
content = "Accented: Österreich".encode("iso-8859-1")
content = "Accented: Österreich abcdefghijklmnopqrstuzwxyz".encode("iso-8859-1")
headers = {"Content-Type": "text/plain"}
response = httpx.Response(
200,
content=content,
headers=headers,
)
assert response.text == "Accented: Österreich"
assert response.encoding is None
assert response.text == "Accented: Österreich abcdefghijklmnopqrstuzwxyz"
assert response.charset_encoding is None
assert response.apparent_encoding is not None
def test_response_no_charset_with_cp_1252_content():
@ -213,15 +213,16 @@ def test_response_no_charset_with_cp_1252_content():
A response with Windows 1252 encoded content should decode correctly,
even with no charset specified.
"""
content = "Euro Currency: €".encode("cp1252")
content = "Euro Currency: € abcdefghijklmnopqrstuzwxyz".encode("cp1252")
headers = {"Content-Type": "text/plain"}
response = httpx.Response(
200,
content=content,
headers=headers,
)
assert response.text == "Euro Currency: €"
assert response.encoding is None
assert response.text == "Euro Currency: € abcdefghijklmnopqrstuzwxyz"
assert response.charset_encoding is None
assert response.apparent_encoding is not None
def test_response_non_text_encoding():
@ -718,9 +719,22 @@ def test_json_with_options():
assert response.json(parse_int=str)["amount"] == "1"
def test_json_without_specified_encoding():
@pytest.mark.parametrize(
"encoding",
[
"utf-8",
"utf-8-sig",
"utf-16",
"utf-16-be",
"utf-16-le",
"utf-32",
"utf-32-be",
"utf-32-le",
],
)
def test_json_without_specified_charset(encoding):
data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-32-be")
content = json.dumps(data).encode(encoding)
headers = {"Content-Type": "application/json"}
response = httpx.Response(
200,
@ -730,30 +744,29 @@ def test_json_without_specified_encoding():
assert response.json() == data
def test_json_without_specified_encoding_decode_error():
@pytest.mark.parametrize(
"encoding",
[
"utf-8",
"utf-8-sig",
"utf-16",
"utf-16-be",
"utf-16-le",
"utf-32",
"utf-32-be",
"utf-32-le",
],
)
def test_json_with_specified_charset(encoding):
data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-32-be")
headers = {"Content-Type": "application/json"}
# force incorrect guess from `guess_json_utf` to trigger error
with mock.patch("httpx._models.guess_json_utf", return_value="utf-32-le"):
response = httpx.Response(
200,
content=content,
headers=headers,
)
with pytest.raises(json.decoder.JSONDecodeError):
response.json()
def test_json_without_specified_encoding_value_error():
data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-32-be")
headers = {"Content-Type": "application/json"}
# force incorrect guess from `guess_json_utf` to trigger error
with mock.patch("httpx._models.guess_json_utf", return_value="utf-32-le"):
response = httpx.Response(200, content=content, headers=headers)
with pytest.raises(json.decoder.JSONDecodeError):
response.json()
content = json.dumps(data).encode(encoding)
headers = {"Content-Type": f"application/json; charset={encoding}"}
response = httpx.Response(
200,
content=content,
headers=headers,
)
assert response.json() == data
@pytest.mark.parametrize(

View File

@ -179,8 +179,8 @@ def test_decoding_errors(header_value):
[
((b"Hello,", b" world!"), "ascii"),
((b"\xe3\x83", b"\x88\xe3\x83\xa9", b"\xe3", b"\x83\x99\xe3\x83\xab"), "utf-8"),
((b"Euro character: \x88!", b""), "cp1252"),
((b"Accented: \xd6sterreich", b""), "iso-8859-1"),
((b"Euro character: \x88! abcdefghijklmnopqrstuvwxyz", b""), "cp1252"),
((b"Accented: \xd6sterreich abcdefghijklmnopqrstuvwxyz", b""), "iso-8859-1"),
],
)
@pytest.mark.asyncio
@ -199,10 +199,9 @@ async def test_text_decoder(data, encoding):
assert response.text == (b"".join(data)).decode(encoding)
# Streaming `.aiter_text` iteratively.
response = httpx.Response(
200,
content=iterator(),
)
# Note that if we streamed the text *without* having read it first, then
# we won't get a `charset_normalizer` guess, and will instead always rely
# on utf-8 if no charset is specified.
text = "".join([part async for part in response.aiter_text()])
assert text == (b"".join(data)).decode(encoding)