Merge branch 'master' into limit-supported-codecs
This commit is contained in:
commit
2b2c1b41c7
@ -8,7 +8,7 @@ As HTTPX usage grows, there is an expanding community of developers building too
|
||||
|
||||
### Hishel
|
||||
|
||||
[GitHub](https://github.com/karosis88/hishel) - [Documentation](https://karosis88.github.io/hishel/)
|
||||
[GitHub](https://github.com/karpetrosyan/hishel) - [Documentation](https://hishel.com/)
|
||||
|
||||
An elegant HTTP Cache implementation for HTTPX and HTTP Core.
|
||||
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import hashlib
|
||||
import netrc
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
@ -148,6 +147,10 @@ class NetRCAuth(Auth):
|
||||
"""
|
||||
|
||||
def __init__(self, file: typing.Optional[str] = None):
|
||||
# Lazily import 'netrc'.
|
||||
# There's no need for us to load this module unless 'NetRCAuth' is being used.
|
||||
import netrc
|
||||
|
||||
self._netrc_info = netrc.netrc(file)
|
||||
|
||||
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
|
||||
|
||||
@ -43,7 +43,6 @@ from ._types import (
|
||||
)
|
||||
from ._urls import URL
|
||||
from ._utils import (
|
||||
guess_json_utf,
|
||||
is_known_encoding,
|
||||
normalize_header_key,
|
||||
normalize_header_value,
|
||||
@ -759,11 +758,7 @@ class Response:
|
||||
raise HTTPStatusError(message, request=request, response=self)
|
||||
|
||||
def json(self, **kwargs: typing.Any) -> typing.Any:
|
||||
if self.charset_encoding is None and self.content and len(self.content) > 3:
|
||||
encoding = guess_json_utf(self.content)
|
||||
if encoding is not None:
|
||||
return jsonlib.loads(self.content.decode(encoding), **kwargs)
|
||||
return jsonlib.loads(self.text, **kwargs)
|
||||
return jsonlib.loads(self.content, **kwargs)
|
||||
|
||||
@property
|
||||
def cookies(self) -> "Cookies":
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import binascii
|
||||
import io
|
||||
import os
|
||||
import typing
|
||||
@ -200,7 +199,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream):
|
||||
boundary: typing.Optional[bytes] = None,
|
||||
) -> None:
|
||||
if boundary is None:
|
||||
boundary = binascii.hexlify(os.urandom(16))
|
||||
boundary = os.urandom(16).hex().encode("ascii")
|
||||
|
||||
self.boundary = boundary
|
||||
self.content_type = "multipart/form-data; boundary=%s" % boundary.decode(
|
||||
|
||||
@ -145,41 +145,6 @@ def format_form_param(name: str, value: str) -> bytes:
|
||||
return f'{name}="{value}"'.encode()
|
||||
|
||||
|
||||
# Null bytes; no need to recreate these on each call to guess_json_utf
|
||||
_null = b"\x00"
|
||||
_null2 = _null * 2
|
||||
_null3 = _null * 3
|
||||
|
||||
|
||||
def guess_json_utf(data: bytes) -> typing.Optional[str]:
|
||||
# JSON always starts with two ASCII characters, so detection is as
|
||||
# easy as counting the nulls and from their location and count
|
||||
# determine the encoding. Also detect a BOM, if present.
|
||||
sample = data[:4]
|
||||
if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
|
||||
return "utf-32" # BOM included
|
||||
if sample[:3] == codecs.BOM_UTF8:
|
||||
return "utf-8-sig" # BOM included, MS style (discouraged)
|
||||
if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
|
||||
return "utf-16" # BOM included
|
||||
nullcount = sample.count(_null)
|
||||
if nullcount == 0:
|
||||
return "utf-8"
|
||||
if nullcount == 2:
|
||||
if sample[::2] == _null2: # 1st and 3rd are null
|
||||
return "utf-16-be"
|
||||
if sample[1::2] == _null2: # 2nd and 4th are null
|
||||
return "utf-16-le"
|
||||
# Did not detect 2 valid UTF-16 ascii-range characters
|
||||
if nullcount == 3:
|
||||
if sample[:3] == _null3:
|
||||
return "utf-32-be"
|
||||
if sample[1:] == _null3:
|
||||
return "utf-32-le"
|
||||
# Did not detect a valid UTF-32 ascii-range character
|
||||
return None
|
||||
|
||||
|
||||
def get_ca_bundle_from_env() -> typing.Optional[str]:
|
||||
if "SSL_CERT_FILE" in os.environ:
|
||||
ssl_file = Path(os.environ["SSL_CERT_FILE"])
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
@ -10,7 +11,6 @@ from httpx._utils import (
|
||||
URLPattern,
|
||||
get_ca_bundle_from_env,
|
||||
get_environment_proxies,
|
||||
guess_json_utf,
|
||||
is_https_redirect,
|
||||
obfuscate_sensitive_headers,
|
||||
parse_header_links,
|
||||
@ -34,12 +34,16 @@ from .common import TESTS_DIR
|
||||
),
|
||||
)
|
||||
def test_encoded(encoding):
|
||||
data = "{}".encode(encoding)
|
||||
assert guess_json_utf(data) == encoding
|
||||
content = '{"abc": 123}'.encode(encoding)
|
||||
response = httpx.Response(200, content=content)
|
||||
assert response.json() == {"abc": 123}
|
||||
|
||||
|
||||
def test_bad_utf_like_encoding():
|
||||
assert guess_json_utf(b"\x00\x00\x00\x00") is None
|
||||
content = b"\x00\x00\x00\x00"
|
||||
response = httpx.Response(200, content=content)
|
||||
with pytest.raises(json.decoder.JSONDecodeError):
|
||||
response.json()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -52,8 +56,9 @@ def test_bad_utf_like_encoding():
|
||||
),
|
||||
)
|
||||
def test_guess_by_bom(encoding, expected):
|
||||
data = "\ufeff{}".encode(encoding)
|
||||
assert guess_json_utf(data) == expected
|
||||
content = '\ufeff{"abc": 123}'.encode(encoding)
|
||||
response = httpx.Response(200, content=content)
|
||||
assert response.json() == {"abc": 123}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user