Enable ruff UP (pyupgrade) rule and fix all 56 violations

Enable the ruff `UP` rule set for automatic Python modernization checks.

Changes across 17 files:
- UP006: Replace deprecated typing.Tuple/Dict/List with builtin equivalents
- UP012: Remove unnecessary .encode('utf-8') calls
- UP015: Remove redundant open() modes
- UP028: Replace yield-over-for with yield from
- UP031/UP032: Modernize string formatting to f-strings/format specs
- UP035: Import from collections.abc instead of typing

Also fixes import ordering (I001) caused by the import changes.
This commit is contained in:
Zephyr-Blessed 2026-02-18 18:52:05 +00:00 committed by Zephyr-Blessed
parent ae1b9f6623
commit d5e5079558
17 changed files with 56 additions and 67 deletions

View File

@ -150,8 +150,7 @@ class BoundSyncStream(SyncByteStream):
self._start = start
def __iter__(self) -> typing.Iterator[bytes]:
for chunk in self._stream:
yield chunk
yield from self._stream
def close(self) -> None:
elapsed = time.perf_counter() - self._start

View File

@ -2,14 +2,10 @@ from __future__ import annotations
import inspect
import warnings
from collections.abc import AsyncIterable, AsyncIterator, Iterable, Iterator, Mapping
from json import dumps as json_dumps
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Iterable,
Iterator,
Mapping,
)
from urllib.parse import urlencode
@ -60,8 +56,7 @@ class IteratorByteStream(SyncByteStream):
chunk = self._stream.read(self.CHUNK_SIZE)
else:
# Otherwise iterate.
for part in self._stream:
yield part
yield from self._stream
class AsyncIteratorByteStream(AsyncByteStream):

View File

@ -186,9 +186,9 @@ def print_response(response: Response) -> None:
console.print(f"<{len(response.content)} bytes of binary data>")
_PCTRTT = typing.Tuple[typing.Tuple[str, str], ...]
_PCTRTTT = typing.Tuple[_PCTRTT, ...]
_PeerCertRetDictType = typing.Dict[str, typing.Union[str, _PCTRTTT, _PCTRTT]]
_PCTRTT = tuple[tuple[str, str], ...]
_PCTRTTT = tuple[_PCTRTT, ...]
_PeerCertRetDictType = dict[str, typing.Union[str, _PCTRTTT, _PCTRTT]]
def format_certificate(cert: _PeerCertRetDictType) -> str: # pragma: no cover

View File

@ -23,7 +23,7 @@ from ._utils import (
_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"}
_HTML5_FORM_ENCODING_REPLACEMENTS.update(
{chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B}
{chr(c): f"%{c:02X}" for c in range(0x1F + 1) if c != 0x1B}
)
_HTML5_FORM_ENCODING_RE = re.compile(
r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()])
@ -236,9 +236,9 @@ class MultipartStream(SyncByteStream, AsyncByteStream):
boundary = os.urandom(16).hex().encode("ascii")
self.boundary = boundary
self.content_type = "multipart/form-data; boundary=%s" % boundary.decode(
self.content_type = "multipart/form-data; boundary={}".format(boundary.decode(
"ascii"
)
))
self.fields = list(self._iter_fields(data, files))
def _iter_fields(
@ -292,8 +292,7 @@ class MultipartStream(SyncByteStream, AsyncByteStream):
return {"Content-Length": str(content_length), "Content-Type": content_type}
def __iter__(self) -> typing.Iterator[bytes]:
for chunk in self.iter_chunks():
yield chunk
yield from self.iter_chunks()
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
for chunk in self.iter_chunks():

View File

@ -61,9 +61,9 @@ T = typing.TypeVar("T", bound="HTTPTransport")
A = typing.TypeVar("A", bound="AsyncHTTPTransport")
SOCKET_OPTION = typing.Union[
typing.Tuple[int, int, int],
typing.Tuple[int, int, typing.Union[bytes, bytearray]],
typing.Tuple[int, int, None, int],
tuple[int, int, int],
tuple[int, int, typing.Union[bytes, bytearray]],
tuple[int, int, None, int],
]
__all__ = ["AsyncHTTPTransport", "HTTPTransport"]
@ -124,8 +124,7 @@ class ResponseStream(SyncByteStream):
def __iter__(self) -> typing.Iterator[bytes]:
with map_httpcore_exceptions():
for part in self._httpcore_stream:
yield part
yield from self._httpcore_stream
def close(self) -> None:
if hasattr(self._httpcore_stream, "close"):

View File

@ -33,8 +33,7 @@ class WSGIByteStream(SyncByteStream):
self._result = _skip_leading_empty_chunks(result)
def __iter__(self) -> typing.Iterator[bytes]:
for part in self._result:
yield part
yield from self._result
def close(self) -> None:
if self._close is not None:

View File

@ -2,22 +2,21 @@
Type definitions for type checking purposes.
"""
from collections.abc import (
AsyncIterable,
AsyncIterator,
Iterable,
Iterator,
Mapping,
Sequence,
)
from http.cookiejar import CookieJar
from typing import (
IO,
TYPE_CHECKING,
Any,
AsyncIterable,
AsyncIterator,
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
)
@ -35,8 +34,8 @@ URLTypes = Union["URL", str]
QueryParamTypes = Union[
"QueryParams",
Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]],
List[Tuple[str, PrimitiveData]],
Tuple[Tuple[str, PrimitiveData], ...],
list[tuple[str, PrimitiveData]],
tuple[tuple[str, PrimitiveData], ...],
str,
bytes,
]
@ -45,22 +44,22 @@ HeaderTypes = Union[
"Headers",
Mapping[str, str],
Mapping[bytes, bytes],
Sequence[Tuple[str, str]],
Sequence[Tuple[bytes, bytes]],
Sequence[tuple[str, str]],
Sequence[tuple[bytes, bytes]],
]
CookieTypes = Union["Cookies", CookieJar, Dict[str, str], List[Tuple[str, str]]]
CookieTypes = Union["Cookies", CookieJar, dict[str, str], list[tuple[str, str]]]
TimeoutTypes = Union[
Optional[float],
Tuple[Optional[float], Optional[float], Optional[float], Optional[float]],
tuple[Optional[float], Optional[float], Optional[float], Optional[float]],
"Timeout",
]
ProxyTypes = Union["URL", str, "Proxy"]
CertTypes = Union[str, Tuple[str, str], Tuple[str, str, str]]
CertTypes = Union[str, tuple[str, str], tuple[str, str, str]]
AuthTypes = Union[
Tuple[Union[str, bytes], Union[str, bytes]],
tuple[Union[str, bytes], Union[str, bytes]],
Callable[["Request"], "Request"],
"Auth",
]
@ -76,13 +75,13 @@ FileTypes = Union[
# file (or bytes)
FileContent,
# (filename, file (or bytes))
Tuple[Optional[str], FileContent],
tuple[Optional[str], FileContent],
# (filename, file (or bytes), content_type)
Tuple[Optional[str], FileContent, Optional[str]],
tuple[Optional[str], FileContent, Optional[str]],
# (filename, file (or bytes), content_type, headers)
Tuple[Optional[str], FileContent, Optional[str], Mapping[str, str]],
tuple[Optional[str], FileContent, Optional[str], Mapping[str, str]],
]
RequestFiles = Union[Mapping[str, FileTypes], Sequence[Tuple[str, FileTypes]]]
RequestFiles = Union[Mapping[str, FileTypes], Sequence[tuple[str, FileTypes]]]
RequestExtensions = Mapping[str, Any]

View File

@ -96,7 +96,7 @@ pattern = 'src="(docs/img/.*?)"'
replacement = 'src="https://raw.githubusercontent.com/encode/httpx/master/\1"'
[tool.ruff.lint]
select = ["E", "F", "I", "B", "PIE"]
select = ["E", "F", "I", "B", "PIE", "UP"]
ignore = ["B904", "B028"]
[tool.ruff.lint.isort]

View File

@ -73,7 +73,7 @@ class DigestApp:
"stale": "FALSE",
}
challenge_str = ", ".join(
'{}="{}"'.format(key, value)
f'{key}="{value}"'
for key, value in challenge_data.items()
if value
)
@ -437,7 +437,7 @@ async def test_digest_auth(
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
authorization = typing.cast(dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"
@ -468,7 +468,7 @@ async def test_digest_auth_no_specified_qop() -> None:
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
authorization = typing.cast(dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"

View File

@ -37,7 +37,7 @@ def redirects(request: httpx.Request) -> httpx.Response:
elif request.url.path == "/invalid_redirect":
status_code = httpx.codes.SEE_OTHER
raw_headers = [(b"location", "https://😇/".encode("utf-8"))]
raw_headers = [(b"location", "https://😇/".encode())]
return httpx.Response(status_code, headers=raw_headers)
elif request.url.path == "/no_scheme_redirect":

View File

@ -48,12 +48,12 @@ def clean_environ():
os.environ.update(original_environ)
Message = typing.Dict[str, typing.Any]
Message = dict[str, typing.Any]
Receive = typing.Callable[[], typing.Awaitable[Message]]
Send = typing.Callable[
[typing.Dict[str, typing.Any]], typing.Coroutine[None, None, None]
[dict[str, typing.Any]], typing.Coroutine[None, None, None]
]
Scope = typing.Dict[str, typing.Any]
Scope = dict[str, typing.Any]
async def app(scope: Scope, receive: Receive, send: Send) -> None:

View File

@ -99,7 +99,7 @@ def test_headers_encoding_in_repr():
"""
Headers should display an encoding in the repr if required.
"""
headers = httpx.Headers({b"custom": "example ☃".encode("utf-8")})
headers = httpx.Headers({b"custom": "example ☃".encode()})
assert repr(headers) == "Headers({'custom': 'example ☃'}, encoding='utf-8')"
@ -127,7 +127,7 @@ def test_headers_decode_utf_8():
"""
Headers containing non-ascii codepoints should default to decoding as utf-8.
"""
raw_headers = [(b"Custom", "Code point: ☃".encode("utf-8"))]
raw_headers = [(b"Custom", "Code point: ☃".encode())]
headers = httpx.Headers(raw_headers)
assert dict(headers) == {"custom": "Code point: ☃"}
assert headers.encoding == "utf-8"
@ -148,7 +148,7 @@ def test_headers_decode_explicit_encoding():
An explicit encoding may be set on headers in order to force a
particular decoding.
"""
raw_headers = [(b"Custom", "Code point: ☃".encode("utf-8"))]
raw_headers = [(b"Custom", "Code point: ☃".encode())]
headers = httpx.Headers(raw_headers)
headers.encoding = "iso-8859-1"
assert dict(headers) == {"custom": "Code point: â\x98\x83"}
@ -173,7 +173,7 @@ def test_sensitive_headers(header):
"""
value = "s3kr3t"
h = httpx.Headers({header: value})
assert repr(h) == "Headers({'%s': '[secure]'})" % header
assert repr(h) == f"Headers({{'{header}': '[secure]'}})"
@pytest.mark.parametrize(

View File

@ -173,7 +173,7 @@ def test_response_default_to_utf8_encoding():
"""
Default to utf-8 encoding if there is no Content-Type header.
"""
content = "おはようございます。".encode("utf-8")
content = "おはようございます。".encode()
response = httpx.Response(
200,
content=content,
@ -187,7 +187,7 @@ def test_response_fallback_to_utf8_encoding():
Fallback to utf-8 if we get an invalid charset in the Content-Type header.
"""
headers = {"Content-Type": "text-plain; charset=invalid-codec-name"}
content = "おはようございます。".encode("utf-8")
content = "おはようございます。".encode()
response = httpx.Response(
200,
content=content,
@ -219,7 +219,7 @@ def test_response_no_charset_with_utf8_content():
A response with UTF-8 encoded content should decode correctly,
even with no charset specified.
"""
content = "Unicode Snowman: ☃".encode("utf-8")
content = "Unicode Snowman: ☃".encode()
headers = {"Content-Type": "text/plain"}
response = httpx.Response(
200,
@ -289,7 +289,7 @@ def test_response_set_explicit_encoding():
def test_response_force_encoding():
response = httpx.Response(
200,
content="Snowman: ☃".encode("utf-8"),
content="Snowman: ☃".encode(),
)
response.encoding = "iso-8859-1"
assert response.status_code == 200

View File

@ -10,7 +10,7 @@ from httpx._urlparse import urlparse
# URL test cases from...
# https://github.com/web-platform-tests/wpt/blob/master/url/resources/urltestdata.json
with open("tests/models/whatwg.json", "r", encoding="utf-8") as input:
with open("tests/models/whatwg.json", encoding="utf-8") as input:
test_cases = json.load(input)
test_cases = [
item

View File

@ -192,7 +192,7 @@ def test_digest_auth_rfc_7616_md5(monkeypatch):
# Example from https://datatracker.ietf.org/doc/html/rfc7616#section-3.9.1
def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode()
return b"f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"
auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life")
monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce)
@ -252,7 +252,7 @@ def test_digest_auth_rfc_7616_sha_256(monkeypatch):
# Example from https://datatracker.ietf.org/doc/html/rfc7616#section-3.9.1
def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode()
return b"f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"
auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life")
monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce)

View File

@ -174,7 +174,7 @@ def test_download(server):
with runner.isolated_filesystem():
runner.invoke(httpx.main, [url, "--download", "index.txt"])
assert os.path.exists("index.txt")
with open("index.txt", "r") as input_file:
with open("index.txt") as input_file:
assert input_file.read() == "Hello, world!"

View File

@ -24,8 +24,7 @@ def application_factory(output: typing.Iterable[bytes]) -> WSGIApplication:
start_response(status, response_headers)
for item in output:
yield item
yield from output
return wsgiref.validate.validator(application)