Use pathlib.Path instead of os.path functions (#188)
This commit is contained in:
parent
de91fdfa16
commit
fde30b8d14
@ -1,7 +1,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
import ssl
|
||||
import typing
|
||||
from pathlib import Path
|
||||
|
||||
import certifi
|
||||
|
||||
@ -94,8 +94,8 @@ class SSLConfig:
|
||||
"""
|
||||
if isinstance(self.verify, bool):
|
||||
ca_bundle_path = DEFAULT_CA_BUNDLE_PATH
|
||||
elif os.path.exists(self.verify):
|
||||
ca_bundle_path = self.verify
|
||||
elif Path(self.verify).exists():
|
||||
ca_bundle_path = Path(self.verify)
|
||||
else:
|
||||
raise IOError(
|
||||
"Could not find a suitable TLS CA certificate bundle, "
|
||||
@ -120,10 +120,10 @@ class SSLConfig:
|
||||
except AttributeError: # pragma: nocover
|
||||
pass
|
||||
|
||||
if os.path.isfile(ca_bundle_path):
|
||||
context.load_verify_locations(cafile=ca_bundle_path)
|
||||
elif os.path.isdir(ca_bundle_path):
|
||||
context.load_verify_locations(capath=ca_bundle_path)
|
||||
if ca_bundle_path.is_file():
|
||||
context.load_verify_locations(cafile=str(ca_bundle_path))
|
||||
elif ca_bundle_path.is_dir():
|
||||
context.load_verify_locations(capath=str(ca_bundle_path))
|
||||
|
||||
if self.cert is not None:
|
||||
if isinstance(self.cert, str):
|
||||
@ -248,5 +248,5 @@ class PoolLimits:
|
||||
DEFAULT_SSL_CONFIG = SSLConfig(cert=None, verify=True)
|
||||
DEFAULT_TIMEOUT_CONFIG = TimeoutConfig(timeout=5.0)
|
||||
DEFAULT_POOL_LIMITS = PoolLimits(soft_limit=10, hard_limit=100, pool_timeout=5.0)
|
||||
DEFAULT_CA_BUNDLE_PATH = certifi.where()
|
||||
DEFAULT_CA_BUNDLE_PATH = Path(certifi.where())
|
||||
DEFAULT_MAX_REDIRECTS = 20
|
||||
|
||||
@ -4,6 +4,7 @@ import os
|
||||
import re
|
||||
import typing
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
|
||||
_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"}
|
||||
_HTML5_FORM_ENCODING_REPLACEMENTS.update(
|
||||
@ -47,7 +48,7 @@ class FileField(Field):
|
||||
) -> None:
|
||||
self.name = name
|
||||
if not isinstance(value, tuple):
|
||||
self.filename = os.path.basename(getattr(value, "name", "upload"))
|
||||
self.filename = Path(getattr(value, "name", "upload")).name
|
||||
self.file = value # type: typing.Union[typing.IO[str], typing.IO[bytes]]
|
||||
self.content_type = self.guess_content_type()
|
||||
else:
|
||||
@ -113,7 +114,7 @@ def multipart_encode(data: dict, files: dict) -> typing.Tuple[bytes, str]:
|
||||
def _format_param(name: str, value: typing.Union[str, bytes]) -> bytes:
|
||||
if isinstance(value, bytes):
|
||||
value = value.decode()
|
||||
|
||||
|
||||
def replacer(match: typing.Match[str]) -> str:
|
||||
return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)]
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ import codecs
|
||||
import netrc
|
||||
import os
|
||||
import typing
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def normalize_header_key(value: typing.AnyStr, encoding: str = None) -> bytes:
|
||||
@ -83,20 +84,17 @@ def guess_json_utf(data: bytes) -> typing.Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
NETRC_STATIC_FILES = tuple(
|
||||
os.path.expanduser(path) for path in ("~/.netrc", "~/_netrc")
|
||||
)
|
||||
NETRC_STATIC_FILES = (Path("~/.netrc"), Path("~/_netrc"))
|
||||
|
||||
|
||||
def get_netrc_login(host: str) -> typing.Optional[typing.Tuple[str, str, str]]:
|
||||
NETRC_FILES = (
|
||||
(os.environ["NETRC"],) if "NETRC" in os.environ else NETRC_STATIC_FILES
|
||||
)
|
||||
NETRC_FILES = (Path(os.getenv("NETRC", "")),) + NETRC_STATIC_FILES
|
||||
netrc_path = None
|
||||
|
||||
for file_path in NETRC_FILES:
|
||||
if os.path.isfile(file_path):
|
||||
netrc_path = file_path
|
||||
expanded_path = file_path.expanduser()
|
||||
if expanded_path.is_file():
|
||||
netrc_path = expanded_path
|
||||
break
|
||||
|
||||
if netrc_path is None:
|
||||
|
||||
12
setup.py
12
setup.py
@ -1,8 +1,8 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from setuptools import setup
|
||||
|
||||
@ -11,8 +11,8 @@ def get_version(package):
|
||||
"""
|
||||
Return package version as listed in `__version__` in `init.py`.
|
||||
"""
|
||||
with open(os.path.join(package, "__version__.py")) as f:
|
||||
return re.search("__version__ = ['\"]([^'\"]+)['\"]", f.read()).group(1)
|
||||
version = Path(package, "__version__.py").read_text()
|
||||
return re.search("__version__ = ['\"]([^'\"]+)['\"]", version).group(1)
|
||||
|
||||
|
||||
def get_long_description():
|
||||
@ -27,11 +27,7 @@ def get_packages(package):
|
||||
"""
|
||||
Return root package and all sub-packages.
|
||||
"""
|
||||
return [
|
||||
dirpath
|
||||
for dirpath, dirnames, filenames in os.walk(package)
|
||||
if os.path.exists(os.path.join(dirpath, "__init__.py"))
|
||||
]
|
||||
return [str(path.parent) for path in Path(package).glob("**/__init__.py")]
|
||||
|
||||
|
||||
setup(
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import os
|
||||
import ssl
|
||||
|
||||
import pytest
|
||||
@ -31,7 +30,7 @@ async def test_load_ssl_config_verify_existing_file():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_load_ssl_config_verify_directory():
|
||||
path = os.path.dirname(httpx.config.DEFAULT_CA_BUNDLE_PATH)
|
||||
path = httpx.config.DEFAULT_CA_BUNDLE_PATH.parent
|
||||
ssl_config = httpx.SSLConfig(verify=path)
|
||||
context = await ssl_config.load_ssl_context()
|
||||
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
|
||||
|
||||
Loading…
Reference in New Issue
Block a user