Move remaining utility functions from _utils.py to _models.py (#3387)

This commit is contained in:
RafaelWO 2024-11-01 20:20:18 +01:00 committed by GitHub
parent 6212e8fa3b
commit 41597adffa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 118 additions and 124 deletions

View File

@ -1,8 +1,10 @@
from __future__ import annotations from __future__ import annotations
import codecs
import datetime import datetime
import email.message import email.message
import json as jsonlib import json as jsonlib
import re
import typing import typing
import urllib.request import urllib.request
from collections.abc import Mapping from collections.abc import Mapping
@ -44,15 +46,23 @@ from ._types import (
SyncByteStream, SyncByteStream,
) )
from ._urls import URL from ._urls import URL
from ._utils import ( from ._utils import to_bytes_or_str, to_str
is_known_encoding,
obfuscate_sensitive_headers,
parse_content_type_charset,
parse_header_links,
)
__all__ = ["Cookies", "Headers", "Request", "Response"] __all__ = ["Cookies", "Headers", "Request", "Response"]
SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
def _is_known_encoding(encoding: str) -> bool:
"""
Return `True` if `encoding` is a known codec.
"""
try:
codecs.lookup(encoding)
except LookupError:
return False
return True
def _normalize_header_key(key: str | bytes, encoding: str | None = None) -> bytes: def _normalize_header_key(key: str | bytes, encoding: str | None = None) -> bytes:
""" """
@ -72,6 +82,60 @@ def _normalize_header_value(value: str | bytes, encoding: str | None = None) ->
return value.encode(encoding or "ascii") return value.encode(encoding or "ascii")
def _parse_content_type_charset(content_type: str) -> str | None:
# We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
# See: https://peps.python.org/pep-0594/#cgi
msg = email.message.Message()
msg["content-type"] = content_type
return msg.get_content_charset(failobj=None)
def _parse_header_links(value: str) -> list[dict[str, str]]:
"""
Returns a list of parsed link headers, for more info see:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
The generic syntax of those is:
Link: < uri-reference >; param1=value1; param2="value2"
So for instance:
Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
would return
[
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
{"url": "http://.../back.jpeg"},
]
:param value: HTTP Link entity-header field
:return: list of parsed link headers
"""
links: list[dict[str, str]] = []
replace_chars = " '\""
value = value.strip(replace_chars)
if not value:
return links
for val in re.split(", *<", value):
try:
url, params = val.split(";", 1)
except ValueError:
url, params = val, ""
link = {"url": url.strip("<> '\"")}
for param in params.split(";"):
try:
key, value = param.split("=")
except ValueError:
break
link[key.strip(replace_chars)] = value.strip(replace_chars)
links.append(link)
return links
def _obfuscate_sensitive_headers(
items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
for k, v in items:
if to_str(k.lower()) in SENSITIVE_HEADERS:
v = to_bytes_or_str("[secure]", match_type_of=v)
yield k, v
class Headers(typing.MutableMapping[str, str]): class Headers(typing.MutableMapping[str, str]):
""" """
HTTP headers, as a case-insensitive multi-dict. HTTP headers, as a case-insensitive multi-dict.
@ -306,7 +370,7 @@ class Headers(typing.MutableMapping[str, str]):
if self.encoding != "ascii": if self.encoding != "ascii":
encoding_str = f", encoding={self.encoding!r}" encoding_str = f", encoding={self.encoding!r}"
as_list = list(obfuscate_sensitive_headers(self.multi_items())) as_list = list(_obfuscate_sensitive_headers(self.multi_items()))
as_dict = dict(as_list) as_dict = dict(as_list)
no_duplicate_keys = len(as_dict) == len(as_list) no_duplicate_keys = len(as_dict) == len(as_list)
@ -599,7 +663,7 @@ class Response:
""" """
if not hasattr(self, "_encoding"): if not hasattr(self, "_encoding"):
encoding = self.charset_encoding encoding = self.charset_encoding
if encoding is None or not is_known_encoding(encoding): if encoding is None or not _is_known_encoding(encoding):
if isinstance(self.default_encoding, str): if isinstance(self.default_encoding, str):
encoding = self.default_encoding encoding = self.default_encoding
elif hasattr(self, "_content"): elif hasattr(self, "_content"):
@ -630,7 +694,7 @@ class Response:
if content_type is None: if content_type is None:
return None return None
return parse_content_type_charset(content_type) return _parse_content_type_charset(content_type)
def _get_content_decoder(self) -> ContentDecoder: def _get_content_decoder(self) -> ContentDecoder:
""" """
@ -785,7 +849,7 @@ class Response:
return { return {
(link.get("rel") or link.get("url")): link (link.get("rel") or link.get("url")): link
for link in parse_header_links(header) for link in _parse_header_links(header)
} }
@property @property

View File

@ -1,7 +1,5 @@
from __future__ import annotations from __future__ import annotations
import codecs
import email.message
import ipaddress import ipaddress
import os import os
import re import re
@ -29,74 +27,6 @@ def primitive_value_to_str(value: PrimitiveData) -> str:
return str(value) return str(value)
def is_known_encoding(encoding: str) -> bool:
"""
Return `True` if `encoding` is a known codec.
"""
try:
codecs.lookup(encoding)
except LookupError:
return False
return True
def parse_header_links(value: str) -> list[dict[str, str]]:
"""
Returns a list of parsed link headers, for more info see:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
The generic syntax of those is:
Link: < uri-reference >; param1=value1; param2="value2"
So for instance:
Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
would return
[
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
{"url": "http://.../back.jpeg"},
]
:param value: HTTP Link entity-header field
:return: list of parsed link headers
"""
links: list[dict[str, str]] = []
replace_chars = " '\""
value = value.strip(replace_chars)
if not value:
return links
for val in re.split(", *<", value):
try:
url, params = val.split(";", 1)
except ValueError:
url, params = val, ""
link = {"url": url.strip("<> '\"")}
for param in params.split(";"):
try:
key, value = param.split("=")
except ValueError:
break
link[key.strip(replace_chars)] = value.strip(replace_chars)
links.append(link)
return links
def parse_content_type_charset(content_type: str) -> str | None:
# We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
# See: https://peps.python.org/pep-0594/#cgi
msg = email.message.Message()
msg["content-type"] = content_type
return msg.get_content_charset(failobj=None)
SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
def obfuscate_sensitive_headers(
items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
for k, v in items:
if to_str(k.lower()) in SENSITIVE_HEADERS:
v = to_bytes_or_str("[secure]", match_type_of=v)
yield k, v
def port_or_default(url: URL) -> int | None: def port_or_default(url: URL) -> int | None:
if url.port is not None: if url.port is not None:
return url.port return url.port

View File

@ -8,5 +8,5 @@ export SOURCE_FILES="httpx tests"
set -x set -x
${PREFIX}ruff --fix $SOURCE_FILES ${PREFIX}ruff check --fix $SOURCE_FILES
${PREFIX}ruff format $SOURCE_FILES ${PREFIX}ruff format $SOURCE_FILES

View File

@ -174,3 +174,46 @@ def test_sensitive_headers(header):
value = "s3kr3t" value = "s3kr3t"
h = httpx.Headers({header: value}) h = httpx.Headers({header: value})
assert repr(h) == "Headers({'%s': '[secure]'})" % header assert repr(h) == "Headers({'%s': '[secure]'})" % header
@pytest.mark.parametrize(
"headers, output",
[
([("content-type", "text/html")], [("content-type", "text/html")]),
([("authorization", "s3kr3t")], [("authorization", "[secure]")]),
([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]),
],
)
def test_obfuscate_sensitive_headers(headers, output):
as_dict = {k: v for k, v in output}
headers_class = httpx.Headers({k: v for k, v in headers})
assert repr(headers_class) == f"Headers({as_dict!r})"
@pytest.mark.parametrize(
"value, expected",
(
(
'<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
[{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
),
("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
(
'<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
[
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
{"url": "http://.../back.jpeg"},
],
),
("", []),
),
)
def test_parse_header_links(value, expected):
all_links = httpx.Response(200, headers={"link": value}).links.values()
assert all(link in all_links for link in expected)
def test_parse_header_links_no_link():
all_links = httpx.Response(200).links
assert all_links == {}

View File

@ -53,35 +53,6 @@ def test_guess_by_bom(encoding, expected):
assert response.json() == {"abc": 123} assert response.json() == {"abc": 123}
@pytest.mark.parametrize(
"value, expected",
(
(
'<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
[{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
),
("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
(
'<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
[
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
{"url": "http://.../back.jpeg"},
],
),
("", []),
),
)
def test_parse_header_links(value, expected):
all_links = httpx.Response(200, headers={"link": value}).links.values()
assert all(link in all_links for link in expected)
def test_parse_header_links_no_link():
all_links = httpx.Response(200).links
assert all_links == {}
def test_logging_request(server, caplog): def test_logging_request(server, caplog):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
with httpx.Client() as client: with httpx.Client() as client:
@ -144,20 +115,6 @@ def test_get_environment_proxies(environment, proxies):
assert get_environment_proxies() == proxies assert get_environment_proxies() == proxies
@pytest.mark.parametrize(
"headers, output",
[
([("content-type", "text/html")], [("content-type", "text/html")]),
([("authorization", "s3kr3t")], [("authorization", "[secure]")]),
([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]),
],
)
def test_obfuscate_sensitive_headers(headers, output):
as_dict = {k: v for k, v in output}
headers_class = httpx.Headers({k: v for k, v in headers})
assert repr(headers_class) == f"Headers({as_dict!r})"
def test_same_origin(): def test_same_origin():
origin = httpx.URL("https://example.com") origin = httpx.URL("https://example.com")
request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443") request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443")