Compare commits

...

1 Commits

Author SHA1 Message Date
Tom Christie
54b8d24a6e Use system proxies 2024-11-05 14:55:32 +00:00
8 changed files with 42 additions and 896 deletions

View File

@ -17,7 +17,6 @@ from ._config import (
DEFAULT_MAX_REDIRECTS, DEFAULT_MAX_REDIRECTS,
DEFAULT_TIMEOUT_CONFIG, DEFAULT_TIMEOUT_CONFIG,
Limits, Limits,
Proxy,
Timeout, Timeout,
) )
from ._decoders import SUPPORTED_DECODERS from ._decoders import SUPPORTED_DECODERS
@ -47,8 +46,6 @@ from ._types import (
) )
from ._urls import URL, QueryParams from ._urls import URL, QueryParams
from ._utils import ( from ._utils import (
URLPattern,
get_environment_proxies,
is_https_redirect, is_https_redirect,
same_origin, same_origin,
) )
@ -206,20 +203,6 @@ class BaseClient:
return url return url
return url.copy_with(raw_path=url.raw_path + b"/") return url.copy_with(raw_path=url.raw_path + b"/")
def _get_proxy_map(
self, proxy: ProxyTypes | None, allow_env_proxies: bool
) -> dict[str, Proxy | None]:
if proxy is None:
if allow_env_proxies:
return {
key: None if url is None else Proxy(url=url)
for key, url in get_environment_proxies().items()
}
return {}
else:
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
return {"all://": proxy}
@property @property
def timeout(self) -> Timeout: def timeout(self) -> Timeout:
return self._timeout return self._timeout
@ -618,7 +601,6 @@ class Client(BaseClient):
http1: bool = True, http1: bool = True,
http2: bool = False, http2: bool = False,
proxy: ProxyTypes | None = None, proxy: ProxyTypes | None = None,
mounts: None | (typing.Mapping[str, BaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False, follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS, limits: Limits = DEFAULT_LIMITS,
@ -646,106 +628,21 @@ class Client(BaseClient):
default_encoding=default_encoding, default_encoding=default_encoding,
) )
if http2: if transport is not None:
try: self._transport = transport
import h2 # noqa else:
except ImportError: # pragma: no cover self._transport = HTTPTransport(
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
allow_env_proxies = trust_env and transport is None
proxy_map = self._get_proxy_map(proxy, allow_env_proxies)
self._transport = self._init_transport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
transport=transport,
trust_env=trust_env,
# Deprecated in favor of ssl_context...
verify=verify,
cert=cert,
)
self._mounts: dict[URLPattern, BaseTransport | None] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
proxy,
ssl_context=ssl_context, ssl_context=ssl_context,
proxy=proxy,
http1=http1, http1=http1,
http2=http2, http2=http2,
limits=limits, limits=limits,
# Deprecated in favor of ssl_context...
verify=verify, verify=verify,
cert=cert, cert=cert,
) )
for key, proxy in proxy_map.items()
}
if mounts is not None:
self._mounts.update(
{URLPattern(key): transport for key, transport in mounts.items()}
)
self._mounts = dict(sorted(self._mounts.items()))
def _init_transport(
self,
ssl_context: ssl.SSLContext | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
transport: BaseTransport | None = None,
trust_env: bool = True,
# Deprecated in favor of `ssl_context`...
verify: typing.Any = None,
cert: typing.Any = None,
) -> BaseTransport:
if transport is not None:
return transport
return HTTPTransport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
verify=verify,
cert=cert,
)
def _init_proxy_transport(
self,
proxy: Proxy,
ssl_context: ssl.SSLContext | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env: bool = True,
# Deprecated in favor of `ssl_context`...
verify: typing.Any = None,
cert: typing.Any = None,
) -> BaseTransport:
return HTTPTransport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
proxy=proxy,
verify=verify,
cert=cert,
)
def _transport_for_url(self, url: URL) -> BaseTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
for pattern, transport in self._mounts.items():
if pattern.matches(url):
return self._transport if transport is None else transport
@property
def transport(self) -> BaseTransport:
return self._transport return self._transport
def request( def request(
@ -982,7 +879,6 @@ class Client(BaseClient):
""" """
Sends a single request, without handling any redirections. Sends a single request, without handling any redirections.
""" """
transport = self._transport_for_url(request.url)
start = time.perf_counter() start = time.perf_counter()
if not isinstance(request.stream, SyncByteStream): if not isinstance(request.stream, SyncByteStream):
@ -991,7 +887,7 @@ class Client(BaseClient):
) )
with request_context(request=request): with request_context(request=request):
response = transport.handle_request(request) response = self.transport.handle_request(request)
assert isinstance(response.stream, SyncByteStream) assert isinstance(response.stream, SyncByteStream)
@ -1242,15 +1138,11 @@ class Client(BaseClient):
def close(self) -> None: def close(self) -> None:
""" """
Close transport and proxies. Close transport.
""" """
if self._state != ClientState.CLOSED: if self._state != ClientState.CLOSED:
self._state = ClientState.CLOSED self._state = ClientState.CLOSED
self.transport.close()
self._transport.close()
for transport in self._mounts.values():
if transport is not None:
transport.close()
def __enter__(self: T) -> T: def __enter__(self: T) -> T:
if self._state != ClientState.UNOPENED: if self._state != ClientState.UNOPENED:
@ -1263,11 +1155,7 @@ class Client(BaseClient):
raise RuntimeError(msg) raise RuntimeError(msg)
self._state = ClientState.OPENED self._state = ClientState.OPENED
self.transport.__enter__()
self._transport.__enter__()
for transport in self._mounts.values():
if transport is not None:
transport.__enter__()
return self return self
def __exit__( def __exit__(
@ -1277,11 +1165,7 @@ class Client(BaseClient):
traceback: TracebackType | None = None, traceback: TracebackType | None = None,
) -> None: ) -> None:
self._state = ClientState.CLOSED self._state = ClientState.CLOSED
self.transport.__exit__(exc_type, exc_value, traceback)
self._transport.__exit__(exc_type, exc_value, traceback)
for transport in self._mounts.values():
if transport is not None:
transport.__exit__(exc_type, exc_value, traceback)
class AsyncClient(BaseClient): class AsyncClient(BaseClient):
@ -1340,7 +1224,6 @@ class AsyncClient(BaseClient):
http1: bool = True, http1: bool = True,
http2: bool = False, http2: bool = False,
proxy: ProxyTypes | None = None, proxy: ProxyTypes | None = None,
mounts: None | (typing.Mapping[str, AsyncBaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False, follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS, limits: Limits = DEFAULT_LIMITS,
@ -1367,104 +1250,21 @@ class AsyncClient(BaseClient):
trust_env=trust_env, trust_env=trust_env,
default_encoding=default_encoding, default_encoding=default_encoding,
) )
if transport is not None:
if http2: self._transport = transport
try: else:
import h2 # noqa self._transport = AsyncHTTPTransport(
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
allow_env_proxies = trust_env and transport is None
proxy_map = self._get_proxy_map(proxy, allow_env_proxies)
self._transport = self._init_transport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
transport=transport,
# Deprecated in favor of ssl_context
verify=verify,
cert=cert,
)
self._mounts: dict[URLPattern, AsyncBaseTransport | None] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
proxy,
ssl_context=ssl_context, ssl_context=ssl_context,
proxy=proxy,
http1=http1, http1=http1,
http2=http2, http2=http2,
limits=limits, limits=limits,
# Deprecated in favor of `ssl_context`...
verify=verify, verify=verify,
cert=cert, cert=cert,
) )
for key, proxy in proxy_map.items()
}
if mounts is not None:
self._mounts.update(
{URLPattern(key): transport for key, transport in mounts.items()}
)
self._mounts = dict(sorted(self._mounts.items()))
def _init_transport(
self,
ssl_context: ssl.SSLContext | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
transport: AsyncBaseTransport | None = None,
# Deprecated in favor of `ssl_context`...
verify: typing.Any = None,
cert: typing.Any = None,
) -> AsyncBaseTransport:
if transport is not None:
return transport
return AsyncHTTPTransport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
verify=verify,
cert=cert,
)
def _init_proxy_transport(
self,
proxy: Proxy,
ssl_context: ssl.SSLContext | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
# Deprecated in favor of `ssl_context`...
verify: typing.Any = None,
cert: typing.Any = None,
) -> AsyncBaseTransport:
return AsyncHTTPTransport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
proxy=proxy,
verify=verify,
cert=cert,
)
def _transport_for_url(self, url: URL) -> AsyncBaseTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
for pattern, transport in self._mounts.items():
if pattern.matches(url):
return self._transport if transport is None else transport
@property
def transport(self) -> AsyncBaseTransport:
return self._transport return self._transport
async def request( async def request(
@ -1703,7 +1503,6 @@ class AsyncClient(BaseClient):
""" """
Sends a single request, without handling any redirections. Sends a single request, without handling any redirections.
""" """
transport = self._transport_for_url(request.url)
start = time.perf_counter() start = time.perf_counter()
if not isinstance(request.stream, AsyncByteStream): if not isinstance(request.stream, AsyncByteStream):
@ -1712,7 +1511,7 @@ class AsyncClient(BaseClient):
) )
with request_context(request=request): with request_context(request=request):
response = await transport.handle_async_request(request) response = await self.transport.handle_async_request(request)
assert isinstance(response.stream, AsyncByteStream) assert isinstance(response.stream, AsyncByteStream)
response.request = request response.request = request
@ -1962,15 +1761,11 @@ class AsyncClient(BaseClient):
async def aclose(self) -> None: async def aclose(self) -> None:
""" """
Close transport and proxies. Close transport.
""" """
if self._state != ClientState.CLOSED: if self._state != ClientState.CLOSED:
self._state = ClientState.CLOSED self._state = ClientState.CLOSED
await self.transport.aclose()
await self._transport.aclose()
for proxy in self._mounts.values():
if proxy is not None:
await proxy.aclose()
async def __aenter__(self: U) -> U: async def __aenter__(self: U) -> U:
if self._state != ClientState.UNOPENED: if self._state != ClientState.UNOPENED:
@ -1983,11 +1778,7 @@ class AsyncClient(BaseClient):
raise RuntimeError(msg) raise RuntimeError(msg)
self._state = ClientState.OPENED self._state = ClientState.OPENED
await self.transport.__aenter__()
await self._transport.__aenter__()
for proxy in self._mounts.values():
if proxy is not None:
await proxy.__aenter__()
return self return self
async def __aexit__( async def __aexit__(
@ -1997,8 +1788,4 @@ class AsyncClient(BaseClient):
traceback: TracebackType | None = None, traceback: TracebackType | None = None,
) -> None: ) -> None:
self._state = ClientState.CLOSED self._state = ClientState.CLOSED
await self.transport.__aexit__(exc_type, exc_value, traceback)
await self._transport.__aexit__(exc_type, exc_value, traceback)
for proxy in self._mounts.values():
if proxy is not None:
await proxy.__aexit__(exc_type, exc_value, traceback)

View File

@ -275,15 +275,6 @@ class Proxy:
self.headers = headers self.headers = headers
self.ssl_context = ssl_context self.ssl_context = ssl_context
@property
def raw_auth(self) -> tuple[bytes, bytes] | None:
# The proxy authentication as raw bytes.
return (
None
if self.auth is None
else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
)
def __repr__(self) -> str: def __repr__(self) -> str:
# The authentication is represented with the password component masked. # The authentication is represented with the password component masked.
auth = (self.auth[0], "********") if self.auth else None auth = (self.auth[0], "********") if self.auth else None

View File

@ -148,6 +148,15 @@ class HTTPTransport(BaseTransport):
verify: typing.Any = None, verify: typing.Any = None,
cert: typing.Any = None, cert: typing.Any = None,
) -> None: ) -> None:
if http2:
try:
import h2 # noqa
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
import httpcore import httpcore
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
@ -297,6 +306,15 @@ class AsyncHTTPTransport(AsyncBaseTransport):
verify: typing.Any = None, verify: typing.Any = None,
cert: typing.Any = None, cert: typing.Any = None,
) -> None: ) -> None:
if http2:
try:
import h2 # noqa
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
import httpcore import httpcore
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy

View File

@ -2,12 +2,10 @@ from __future__ import annotations
import codecs import codecs
import email.message import email.message
import ipaddress
import mimetypes import mimetypes
import os import os
import re import re
import typing import typing
from urllib.request import getproxies
from ._types import PrimitiveData from ._types import PrimitiveData
@ -178,55 +176,6 @@ def is_https_redirect(url: URL, location: URL) -> bool:
) )
def get_environment_proxies() -> dict[str, str | None]:
"""Gets proxy information from the environment"""
# urllib.request.getproxies() falls back on System
# Registry and Config for proxies on Windows and macOS.
# We don't want to propagate non-HTTP proxies into
# our configuration such as 'TRAVIS_APT_PROXY'.
proxy_info = getproxies()
mounts: dict[str, str | None] = {}
for scheme in ("http", "https", "all"):
if proxy_info.get(scheme):
hostname = proxy_info[scheme]
mounts[f"{scheme}://"] = (
hostname if "://" in hostname else f"http://{hostname}"
)
no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")]
for hostname in no_proxy_hosts:
# See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details
# on how names in `NO_PROXY` are handled.
if hostname == "*":
# If NO_PROXY=* is used or if "*" occurs as any one of the comma
# separated hostnames, then we should just bypass any information
# from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore
# proxies.
return {}
elif hostname:
# NO_PROXY=.google.com is marked as "all://*.google.com,
# which disables "www.google.com" but not "google.com"
# NO_PROXY=google.com is marked as "all://*google.com,
# which disables "www.google.com" and "google.com".
# (But not "wwwgoogle.com")
# NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost"
# NO_PROXY=example.com,::1,localhost,192.168.0.0/16
if "://" in hostname:
mounts[hostname] = None
elif is_ipv4_hostname(hostname):
mounts[f"all://{hostname}"] = None
elif is_ipv6_hostname(hostname):
mounts[f"all://[{hostname}]"] = None
elif hostname.lower() == "localhost":
mounts[f"all://{hostname}"] = None
else:
mounts[f"all://*{hostname}"] = None
return mounts
def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes: def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes:
return value.encode(encoding) if isinstance(value, str) else value return value.encode(encoding) if isinstance(value, str) else value
@ -272,128 +221,3 @@ def peek_filelike_length(stream: typing.Any) -> int | None:
return None return None
return length return length
class URLPattern:
"""
A utility class currently used for making lookups against proxy keys...
# Wildcard matching...
>>> pattern = URLPattern("all://")
>>> pattern.matches(httpx.URL("http://example.com"))
True
# Witch scheme matching...
>>> pattern = URLPattern("https://")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
False
# With domain matching...
>>> pattern = URLPattern("https://example.com")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
False
>>> pattern.matches(httpx.URL("https://other.com"))
False
# Wildcard scheme, with domain matching...
>>> pattern = URLPattern("all://example.com")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
True
>>> pattern.matches(httpx.URL("https://other.com"))
False
# With port matching...
>>> pattern = URLPattern("https://example.com:1234")
>>> pattern.matches(httpx.URL("https://example.com:1234"))
True
>>> pattern.matches(httpx.URL("https://example.com"))
False
"""
def __init__(self, pattern: str) -> None:
from ._urls import URL
if pattern and ":" not in pattern:
raise ValueError(
f"Proxy keys should use proper URL forms rather "
f"than plain scheme strings. "
f'Instead of "{pattern}", use "{pattern}://"'
)
url = URL(pattern)
self.pattern = pattern
self.scheme = "" if url.scheme == "all" else url.scheme
self.host = "" if url.host == "*" else url.host
self.port = url.port
if not url.host or url.host == "*":
self.host_regex: typing.Pattern[str] | None = None
elif url.host.startswith("*."):
# *.example.com should match "www.example.com", but not "example.com"
domain = re.escape(url.host[2:])
self.host_regex = re.compile(f"^.+\\.{domain}$")
elif url.host.startswith("*"):
# *example.com should match "www.example.com" and "example.com"
domain = re.escape(url.host[1:])
self.host_regex = re.compile(f"^(.+\\.)?{domain}$")
else:
# example.com should match "example.com" but not "www.example.com"
domain = re.escape(url.host)
self.host_regex = re.compile(f"^{domain}$")
def matches(self, other: URL) -> bool:
if self.scheme and self.scheme != other.scheme:
return False
if (
self.host
and self.host_regex is not None
and not self.host_regex.match(other.host)
):
return False
if self.port is not None and self.port != other.port:
return False
return True
@property
def priority(self) -> tuple[int, int, int]:
"""
The priority allows URLPattern instances to be sortable, so that
we can match from most specific to least specific.
"""
# URLs with a port should take priority over URLs without a port.
port_priority = 0 if self.port is not None else 1
# Longer hostnames should match first.
host_priority = -len(self.host)
# Longer schemes should match first.
scheme_priority = -len(self.scheme)
return (port_priority, host_priority, scheme_priority)
def __hash__(self) -> int:
return hash(self.pattern)
def __lt__(self, other: URLPattern) -> bool:
return self.priority < other.priority
def __eq__(self, other: typing.Any) -> bool:
return isinstance(other, URLPattern) and self.pattern == other.pattern
def is_ipv4_hostname(hostname: str) -> bool:
try:
ipaddress.IPv4Address(hostname.split("/")[0])
except Exception:
return False
return True
def is_ipv6_hostname(hostname: str) -> bool:
try:
ipaddress.IPv6Address(hostname.split("/")[0])
except Exception:
return False
return True

View File

@ -211,47 +211,6 @@ async def test_context_managed_transport():
] ]
@pytest.mark.anyio
async def test_context_managed_transport_and_mount():
class Transport(httpx.AsyncBaseTransport):
def __init__(self, name: str) -> None:
self.name: str = name
self.events: list[str] = []
async def aclose(self):
# The base implementation of httpx.AsyncBaseTransport just
# calls into `.aclose`, so simple transport cases can just override
# this method for any cleanup, where more complex cases
# might want to additionally override `__aenter__`/`__aexit__`.
self.events.append(f"{self.name}.aclose")
async def __aenter__(self):
await super().__aenter__()
self.events.append(f"{self.name}.__aenter__")
async def __aexit__(self, *args):
await super().__aexit__(*args)
self.events.append(f"{self.name}.__aexit__")
transport = Transport(name="transport")
mounted = Transport(name="mounted")
async with httpx.AsyncClient(
transport=transport, mounts={"http://www.example.org": mounted}
):
pass
assert transport.events == [
"transport.__aenter__",
"transport.aclose",
"transport.__aexit__",
]
assert mounted.events == [
"mounted.__aenter__",
"mounted.aclose",
"mounted.__aexit__",
]
def hello_world(request): def hello_world(request):
return httpx.Response(200, text="Hello, world!") return httpx.Response(200, text="Hello, world!")
@ -288,31 +247,6 @@ async def test_client_closed_state_using_with_block():
await client.get("http://example.com") await client.get("http://example.com")
def unmounted(request: httpx.Request) -> httpx.Response:
data = {"app": "unmounted"}
return httpx.Response(200, json=data)
def mounted(request: httpx.Request) -> httpx.Response:
data = {"app": "mounted"}
return httpx.Response(200, json=data)
@pytest.mark.anyio
async def test_mounted_transport():
transport = httpx.MockTransport(unmounted)
mounts = {"custom://": httpx.MockTransport(mounted)}
async with httpx.AsyncClient(transport=transport, mounts=mounts) as client:
response = await client.get("https://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "unmounted"}
response = await client.get("custom://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "mounted"}
@pytest.mark.anyio @pytest.mark.anyio
async def test_async_mock_transport(): async def test_async_mock_transport():
async def hello_world(request: httpx.Request) -> httpx.Response: async def hello_world(request: httpx.Request) -> httpx.Response:

View File

@ -260,44 +260,6 @@ def test_context_managed_transport():
] ]
def test_context_managed_transport_and_mount():
class Transport(httpx.BaseTransport):
def __init__(self, name: str) -> None:
self.name: str = name
self.events: list[str] = []
def close(self):
# The base implementation of httpx.BaseTransport just
# calls into `.close`, so simple transport cases can just override
# this method for any cleanup, where more complex cases
# might want to additionally override `__enter__`/`__exit__`.
self.events.append(f"{self.name}.close")
def __enter__(self):
super().__enter__()
self.events.append(f"{self.name}.__enter__")
def __exit__(self, *args):
super().__exit__(*args)
self.events.append(f"{self.name}.__exit__")
transport = Transport(name="transport")
mounted = Transport(name="mounted")
with httpx.Client(transport=transport, mounts={"http://www.example.org": mounted}):
pass
assert transport.events == [
"transport.__enter__",
"transport.close",
"transport.__exit__",
]
assert mounted.events == [
"mounted.__enter__",
"mounted.close",
"mounted.__exit__",
]
def hello_world(request): def hello_world(request):
return httpx.Response(200, text="Hello, world!") return httpx.Response(200, text="Hello, world!")
@ -364,41 +326,6 @@ def test_raw_client_header():
] ]
def unmounted(request: httpx.Request) -> httpx.Response:
data = {"app": "unmounted"}
return httpx.Response(200, json=data)
def mounted(request: httpx.Request) -> httpx.Response:
data = {"app": "mounted"}
return httpx.Response(200, json=data)
def test_mounted_transport():
transport = httpx.MockTransport(unmounted)
mounts = {"custom://": httpx.MockTransport(mounted)}
client = httpx.Client(transport=transport, mounts=mounts)
response = client.get("https://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "unmounted"}
response = client.get("custom://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "mounted"}
def test_all_mounted_transport():
mounts = {"all://": httpx.MockTransport(mounted)}
client = httpx.Client(mounts=mounts)
response = client.get("https://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "mounted"}
def test_server_extensions(server): def test_server_extensions(server):
url = server.url.copy_with(path="/http_version_2") url = server.url.copy_with(path="/http_version_2")
with httpx.Client(http2=True) as client: with httpx.Client(http2=True) as client:

View File

@ -1,265 +0,0 @@
import httpcore
import pytest
import httpx
def url_to_origin(url: str) -> httpcore.URL:
"""
Given a URL string, return the origin in the raw tuple format that
`httpcore` uses for it's representation.
"""
u = httpx.URL(url)
return httpcore.URL(scheme=u.raw_scheme, host=u.raw_host, port=u.port, target="/")
def test_socks_proxy():
url = httpx.URL("http://www.example.com")
for proxy in ("socks5://localhost/", "socks5h://localhost/"):
client = httpx.Client(proxy=proxy)
transport = client._transport_for_url(url)
assert isinstance(transport, httpx.HTTPTransport)
assert isinstance(transport._pool, httpcore.SOCKSProxy)
async_client = httpx.AsyncClient(proxy=proxy)
async_transport = async_client._transport_for_url(url)
assert isinstance(async_transport, httpx.AsyncHTTPTransport)
assert isinstance(async_transport._pool, httpcore.AsyncSOCKSProxy)
PROXY_URL = "http://[::1]"
@pytest.mark.parametrize(
["url", "proxies", "expected"],
[
("http://example.com", {}, None),
("http://example.com", {"https://": PROXY_URL}, None),
("http://example.com", {"http://example.net": PROXY_URL}, None),
# Using "*" should match any domain name.
("http://example.com", {"http://*": PROXY_URL}, PROXY_URL),
("https://example.com", {"http://*": PROXY_URL}, None),
# Using "example.com" should match example.com, but not www.example.com
("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://www.example.com", {"http://example.com": PROXY_URL}, None),
# Using "*.example.com" should match www.example.com, but not example.com
("http://example.com", {"http://*.example.com": PROXY_URL}, None),
("http://www.example.com", {"http://*.example.com": PROXY_URL}, PROXY_URL),
# Using "*example.com" should match example.com and www.example.com
("http://example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
("http://www.example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
("http://wwwexample.com", {"http://*example.com": PROXY_URL}, None),
# ...
("http://example.com:443", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"all://": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://": PROXY_URL}, PROXY_URL),
("http://example.com", {"all://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://example.com:80": PROXY_URL}, PROXY_URL),
("http://example.com:8080", {"http://example.com:8080": PROXY_URL}, PROXY_URL),
("http://example.com:8080", {"http://example.com": PROXY_URL}, PROXY_URL),
(
"http://example.com",
{
"all://": PROXY_URL + ":1",
"http://": PROXY_URL + ":2",
"all://example.com": PROXY_URL + ":3",
"http://example.com": PROXY_URL + ":4",
},
PROXY_URL + ":4",
),
(
"http://example.com",
{
"all://": PROXY_URL + ":1",
"http://": PROXY_URL + ":2",
"all://example.com": PROXY_URL + ":3",
},
PROXY_URL + ":3",
),
(
"http://example.com",
{"all://": PROXY_URL + ":1", "http://": PROXY_URL + ":2"},
PROXY_URL + ":2",
),
],
)
def test_transport_for_request(url, proxies, expected):
mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
client = httpx.Client(mounts=mounts)
transport = client._transport_for_url(httpx.URL(url))
if expected is None:
assert transport is client._transport
else:
assert isinstance(transport, httpx.HTTPTransport)
assert isinstance(transport._pool, httpcore.HTTPProxy)
assert transport._pool._proxy_url == url_to_origin(expected)
@pytest.mark.anyio
@pytest.mark.network
async def test_async_proxy_close():
try:
transport = httpx.AsyncHTTPTransport(proxy=PROXY_URL)
client = httpx.AsyncClient(mounts={"https://": transport})
await client.get("http://example.com")
finally:
await client.aclose()
@pytest.mark.network
def test_sync_proxy_close():
try:
transport = httpx.HTTPTransport(proxy=PROXY_URL)
client = httpx.Client(mounts={"https://": transport})
client.get("http://example.com")
finally:
client.close()
def test_unsupported_proxy_scheme():
with pytest.raises(ValueError):
httpx.Client(proxy="ftp://127.0.0.1")
@pytest.mark.parametrize(
["url", "env", "expected"],
[
("http://google.com", {}, None),
(
"http://google.com",
{"HTTP_PROXY": "http://example.com"},
"http://example.com",
),
# Auto prepend http scheme
("http://google.com", {"HTTP_PROXY": "example.com"}, "http://example.com"),
(
"http://google.com",
{"HTTP_PROXY": "http://example.com", "NO_PROXY": "google.com"},
None,
),
# Everything proxied when NO_PROXY is empty/unset
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": ""},
"http://localhost:123",
),
# Not proxied if NO_PROXY matches URL.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "127.0.0.1"},
None,
),
# Proxied if NO_PROXY scheme does not match URL.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "https://127.0.0.1"},
"http://localhost:123",
),
# Proxied if NO_PROXY scheme does not match host.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "1.1.1.1"},
"http://localhost:123",
),
# Not proxied if NO_PROXY matches host domain suffix.
(
"http://courses.mit.edu",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
None,
),
# Proxied even though NO_PROXY matches host domain *prefix*.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
"http://localhost:123",
),
# Not proxied if one item in NO_PROXY case matches host domain suffix.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,edu.info"},
None,
),
# Not proxied if one item in NO_PROXY case matches host domain suffix.
# May include whitespace.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu, edu.info"},
None,
),
# Proxied if no items in NO_PROXY match.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,mit.info"},
"http://localhost:123",
),
# Proxied if NO_PROXY domain doesn't match.
(
"https://foo.example.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "www.example.com"},
"http://localhost:123",
),
# Not proxied for subdomains matching NO_PROXY, with a leading ".".
(
"https://www.example1.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": ".example1.com"},
None,
),
# Proxied, because NO_PROXY subdomains only match if "." separated.
(
"https://www.example2.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "ample2.com"},
"http://localhost:123",
),
# No requests are proxied if NO_PROXY="*" is set.
(
"https://www.example3.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "*"},
None,
),
],
)
@pytest.mark.parametrize("client_class", [httpx.Client, httpx.AsyncClient])
def test_proxies_environ(monkeypatch, client_class, url, env, expected):
for name, value in env.items():
monkeypatch.setenv(name, value)
client = client_class()
transport = client._transport_for_url(httpx.URL(url))
if expected is None:
assert transport == client._transport
else:
assert transport._pool._proxy_url == url_to_origin(expected)
@pytest.mark.parametrize(
["proxies", "is_valid"],
[
({"http": "http://127.0.0.1"}, False),
({"https": "http://127.0.0.1"}, False),
({"all": "http://127.0.0.1"}, False),
({"http://": "http://127.0.0.1"}, True),
({"https://": "http://127.0.0.1"}, True),
({"all://": "http://127.0.0.1"}, True),
],
)
def test_for_deprecated_proxy_params(proxies, is_valid):
mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
if not is_valid:
with pytest.raises(ValueError):
httpx.Client(mounts=mounts)
else:
httpx.Client(mounts=mounts)
def test_proxy_with_mounts():
proxy_transport = httpx.HTTPTransport(proxy="http://127.0.0.1")
client = httpx.Client(mounts={"http://": proxy_transport})
transport = client._transport_for_url(httpx.URL("http://example.com"))
assert transport == proxy_transport

View File

@ -1,15 +1,9 @@
import json import json
import logging import logging
import os
import random
import pytest import pytest
import httpx import httpx
from httpx._utils import (
URLPattern,
get_environment_proxies,
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -118,32 +112,6 @@ def test_logging_redirect_chain(server, caplog):
] ]
@pytest.mark.parametrize(
["environment", "proxies"],
[
({}, {}),
({"HTTP_PROXY": "http://127.0.0.1"}, {"http://": "http://127.0.0.1"}),
(
{"https_proxy": "http://127.0.0.1", "HTTP_PROXY": "https://127.0.0.1"},
{"https://": "http://127.0.0.1", "http://": "https://127.0.0.1"},
),
({"all_proxy": "http://127.0.0.1"}, {"all://": "http://127.0.0.1"}),
({"TRAVIS_APT_PROXY": "http://127.0.0.1"}, {}),
({"no_proxy": "127.0.0.1"}, {"all://127.0.0.1": None}),
({"no_proxy": "192.168.0.0/16"}, {"all://192.168.0.0/16": None}),
({"no_proxy": "::1"}, {"all://[::1]": None}),
({"no_proxy": "localhost"}, {"all://localhost": None}),
({"no_proxy": "github.com"}, {"all://*github.com": None}),
({"no_proxy": ".github.com"}, {"all://*.github.com": None}),
({"no_proxy": "http://github.com"}, {"http://github.com": None}),
],
)
def test_get_environment_proxies(environment, proxies):
os.environ.update(environment)
assert get_environment_proxies() == proxies
@pytest.mark.parametrize( @pytest.mark.parametrize(
"headers, output", "headers, output",
[ [
@ -212,41 +180,3 @@ def test_is_not_https_redirect_if_not_default_ports():
headers = client._redirect_headers(request, url, "GET") headers = client._redirect_headers(request, url, "GET")
assert "Authorization" not in headers assert "Authorization" not in headers
@pytest.mark.parametrize(
["pattern", "url", "expected"],
[
("http://example.com", "http://example.com", True),
("http://example.com", "https://example.com", False),
("http://example.com", "http://other.com", False),
("http://example.com:123", "http://example.com:123", True),
("http://example.com:123", "http://example.com:456", False),
("http://example.com:123", "http://example.com", False),
("all://example.com", "http://example.com", True),
("all://example.com", "https://example.com", True),
("http://", "http://example.com", True),
("http://", "https://example.com", False),
("all://", "https://example.com:123", True),
("", "https://example.com:123", True),
],
)
def test_url_matches(pattern, url, expected):
pattern = URLPattern(pattern)
assert pattern.matches(httpx.URL(url)) == expected
def test_pattern_priority():
matchers = [
URLPattern("all://"),
URLPattern("http://"),
URLPattern("http://example.com"),
URLPattern("http://example.com:123"),
]
random.shuffle(matchers)
assert sorted(matchers) == [
URLPattern("http://example.com:123"),
URLPattern("http://example.com"),
URLPattern("http://"),
URLPattern("all://"),
]