Use system proxies

This commit is contained in:
Tom Christie 2024-11-05 14:55:32 +00:00
parent 6622553979
commit 54b8d24a6e
8 changed files with 42 additions and 896 deletions

View File

@ -17,7 +17,6 @@ from ._config import (
DEFAULT_MAX_REDIRECTS,
DEFAULT_TIMEOUT_CONFIG,
Limits,
Proxy,
Timeout,
)
from ._decoders import SUPPORTED_DECODERS
@ -47,8 +46,6 @@ from ._types import (
)
from ._urls import URL, QueryParams
from ._utils import (
URLPattern,
get_environment_proxies,
is_https_redirect,
same_origin,
)
@ -206,20 +203,6 @@ class BaseClient:
return url
return url.copy_with(raw_path=url.raw_path + b"/")
def _get_proxy_map(
self, proxy: ProxyTypes | None, allow_env_proxies: bool
) -> dict[str, Proxy | None]:
if proxy is None:
if allow_env_proxies:
return {
key: None if url is None else Proxy(url=url)
for key, url in get_environment_proxies().items()
}
return {}
else:
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
return {"all://": proxy}
@property
def timeout(self) -> Timeout:
return self._timeout
@ -618,7 +601,6 @@ class Client(BaseClient):
http1: bool = True,
http2: bool = False,
proxy: ProxyTypes | None = None,
mounts: None | (typing.Mapping[str, BaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS,
@ -646,106 +628,21 @@ class Client(BaseClient):
default_encoding=default_encoding,
)
if http2:
try:
import h2 # noqa
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
allow_env_proxies = trust_env and transport is None
proxy_map = self._get_proxy_map(proxy, allow_env_proxies)
self._transport = self._init_transport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
transport=transport,
trust_env=trust_env,
# Deprecated in favor of ssl_context...
verify=verify,
cert=cert,
)
self._mounts: dict[URLPattern, BaseTransport | None] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
proxy,
if transport is not None:
self._transport = transport
else:
self._transport = HTTPTransport(
ssl_context=ssl_context,
proxy=proxy,
http1=http1,
http2=http2,
limits=limits,
# Deprecated in favor of ssl_context...
verify=verify,
cert=cert,
)
for key, proxy in proxy_map.items()
}
if mounts is not None:
self._mounts.update(
{URLPattern(key): transport for key, transport in mounts.items()}
)
self._mounts = dict(sorted(self._mounts.items()))
def _init_transport(
self,
ssl_context: ssl.SSLContext | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
transport: BaseTransport | None = None,
trust_env: bool = True,
# Deprecated in favor of `ssl_context`...
verify: typing.Any = None,
cert: typing.Any = None,
) -> BaseTransport:
if transport is not None:
return transport
return HTTPTransport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
verify=verify,
cert=cert,
)
def _init_proxy_transport(
self,
proxy: Proxy,
ssl_context: ssl.SSLContext | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env: bool = True,
# Deprecated in favor of `ssl_context`...
verify: typing.Any = None,
cert: typing.Any = None,
) -> BaseTransport:
return HTTPTransport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
proxy=proxy,
verify=verify,
cert=cert,
)
def _transport_for_url(self, url: URL) -> BaseTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
for pattern, transport in self._mounts.items():
if pattern.matches(url):
return self._transport if transport is None else transport
@property
def transport(self) -> BaseTransport:
return self._transport
def request(
@ -982,7 +879,6 @@ class Client(BaseClient):
"""
Sends a single request, without handling any redirections.
"""
transport = self._transport_for_url(request.url)
start = time.perf_counter()
if not isinstance(request.stream, SyncByteStream):
@ -991,7 +887,7 @@ class Client(BaseClient):
)
with request_context(request=request):
response = transport.handle_request(request)
response = self.transport.handle_request(request)
assert isinstance(response.stream, SyncByteStream)
@ -1242,15 +1138,11 @@ class Client(BaseClient):
def close(self) -> None:
"""
Close transport and proxies.
Close transport.
"""
if self._state != ClientState.CLOSED:
self._state = ClientState.CLOSED
self._transport.close()
for transport in self._mounts.values():
if transport is not None:
transport.close()
self.transport.close()
def __enter__(self: T) -> T:
if self._state != ClientState.UNOPENED:
@ -1263,11 +1155,7 @@ class Client(BaseClient):
raise RuntimeError(msg)
self._state = ClientState.OPENED
self._transport.__enter__()
for transport in self._mounts.values():
if transport is not None:
transport.__enter__()
self.transport.__enter__()
return self
def __exit__(
@ -1277,11 +1165,7 @@ class Client(BaseClient):
traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
self._transport.__exit__(exc_type, exc_value, traceback)
for transport in self._mounts.values():
if transport is not None:
transport.__exit__(exc_type, exc_value, traceback)
self.transport.__exit__(exc_type, exc_value, traceback)
class AsyncClient(BaseClient):
@ -1340,7 +1224,6 @@ class AsyncClient(BaseClient):
http1: bool = True,
http2: bool = False,
proxy: ProxyTypes | None = None,
mounts: None | (typing.Mapping[str, AsyncBaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS,
@ -1367,104 +1250,21 @@ class AsyncClient(BaseClient):
trust_env=trust_env,
default_encoding=default_encoding,
)
if http2:
try:
import h2 # noqa
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
allow_env_proxies = trust_env and transport is None
proxy_map = self._get_proxy_map(proxy, allow_env_proxies)
self._transport = self._init_transport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
transport=transport,
# Deprecated in favor of ssl_context
verify=verify,
cert=cert,
)
self._mounts: dict[URLPattern, AsyncBaseTransport | None] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
proxy,
if transport is not None:
self._transport = transport
else:
self._transport = AsyncHTTPTransport(
ssl_context=ssl_context,
proxy=proxy,
http1=http1,
http2=http2,
limits=limits,
# Deprecated in favor of `ssl_context`...
verify=verify,
cert=cert,
)
for key, proxy in proxy_map.items()
}
if mounts is not None:
self._mounts.update(
{URLPattern(key): transport for key, transport in mounts.items()}
)
self._mounts = dict(sorted(self._mounts.items()))
def _init_transport(
self,
ssl_context: ssl.SSLContext | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
transport: AsyncBaseTransport | None = None,
# Deprecated in favor of `ssl_context`...
verify: typing.Any = None,
cert: typing.Any = None,
) -> AsyncBaseTransport:
if transport is not None:
return transport
return AsyncHTTPTransport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
verify=verify,
cert=cert,
)
def _init_proxy_transport(
self,
proxy: Proxy,
ssl_context: ssl.SSLContext | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
# Deprecated in favor of `ssl_context`...
verify: typing.Any = None,
cert: typing.Any = None,
) -> AsyncBaseTransport:
return AsyncHTTPTransport(
ssl_context=ssl_context,
http1=http1,
http2=http2,
limits=limits,
proxy=proxy,
verify=verify,
cert=cert,
)
def _transport_for_url(self, url: URL) -> AsyncBaseTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
for pattern, transport in self._mounts.items():
if pattern.matches(url):
return self._transport if transport is None else transport
@property
def transport(self) -> AsyncBaseTransport:
return self._transport
async def request(
@ -1703,7 +1503,6 @@ class AsyncClient(BaseClient):
"""
Sends a single request, without handling any redirections.
"""
transport = self._transport_for_url(request.url)
start = time.perf_counter()
if not isinstance(request.stream, AsyncByteStream):
@ -1712,7 +1511,7 @@ class AsyncClient(BaseClient):
)
with request_context(request=request):
response = await transport.handle_async_request(request)
response = await self.transport.handle_async_request(request)
assert isinstance(response.stream, AsyncByteStream)
response.request = request
@ -1962,15 +1761,11 @@ class AsyncClient(BaseClient):
async def aclose(self) -> None:
"""
Close transport and proxies.
Close transport.
"""
if self._state != ClientState.CLOSED:
self._state = ClientState.CLOSED
await self._transport.aclose()
for proxy in self._mounts.values():
if proxy is not None:
await proxy.aclose()
await self.transport.aclose()
async def __aenter__(self: U) -> U:
if self._state != ClientState.UNOPENED:
@ -1983,11 +1778,7 @@ class AsyncClient(BaseClient):
raise RuntimeError(msg)
self._state = ClientState.OPENED
await self._transport.__aenter__()
for proxy in self._mounts.values():
if proxy is not None:
await proxy.__aenter__()
await self.transport.__aenter__()
return self
async def __aexit__(
@ -1997,8 +1788,4 @@ class AsyncClient(BaseClient):
traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
await self._transport.__aexit__(exc_type, exc_value, traceback)
for proxy in self._mounts.values():
if proxy is not None:
await proxy.__aexit__(exc_type, exc_value, traceback)
await self.transport.__aexit__(exc_type, exc_value, traceback)

View File

@ -275,15 +275,6 @@ class Proxy:
self.headers = headers
self.ssl_context = ssl_context
@property
def raw_auth(self) -> tuple[bytes, bytes] | None:
# The proxy authentication as raw bytes.
return (
None
if self.auth is None
else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
)
def __repr__(self) -> str:
# The authentication is represented with the password component masked.
auth = (self.auth[0], "********") if self.auth else None

View File

@ -148,6 +148,15 @@ class HTTPTransport(BaseTransport):
verify: typing.Any = None,
cert: typing.Any = None,
) -> None:
if http2:
try:
import h2 # noqa
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
import httpcore
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
@ -297,6 +306,15 @@ class AsyncHTTPTransport(AsyncBaseTransport):
verify: typing.Any = None,
cert: typing.Any = None,
) -> None:
if http2:
try:
import h2 # noqa
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
import httpcore
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy

View File

@ -2,12 +2,10 @@ from __future__ import annotations
import codecs
import email.message
import ipaddress
import mimetypes
import os
import re
import typing
from urllib.request import getproxies
from ._types import PrimitiveData
@ -178,55 +176,6 @@ def is_https_redirect(url: URL, location: URL) -> bool:
)
def get_environment_proxies() -> dict[str, str | None]:
"""Gets proxy information from the environment"""
# urllib.request.getproxies() falls back on System
# Registry and Config for proxies on Windows and macOS.
# We don't want to propagate non-HTTP proxies into
# our configuration such as 'TRAVIS_APT_PROXY'.
proxy_info = getproxies()
mounts: dict[str, str | None] = {}
for scheme in ("http", "https", "all"):
if proxy_info.get(scheme):
hostname = proxy_info[scheme]
mounts[f"{scheme}://"] = (
hostname if "://" in hostname else f"http://{hostname}"
)
no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")]
for hostname in no_proxy_hosts:
# See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details
# on how names in `NO_PROXY` are handled.
if hostname == "*":
# If NO_PROXY=* is used or if "*" occurs as any one of the comma
# separated hostnames, then we should just bypass any information
# from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore
# proxies.
return {}
elif hostname:
# NO_PROXY=.google.com is marked as "all://*.google.com,
# which disables "www.google.com" but not "google.com"
# NO_PROXY=google.com is marked as "all://*google.com,
# which disables "www.google.com" and "google.com".
# (But not "wwwgoogle.com")
# NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost"
# NO_PROXY=example.com,::1,localhost,192.168.0.0/16
if "://" in hostname:
mounts[hostname] = None
elif is_ipv4_hostname(hostname):
mounts[f"all://{hostname}"] = None
elif is_ipv6_hostname(hostname):
mounts[f"all://[{hostname}]"] = None
elif hostname.lower() == "localhost":
mounts[f"all://{hostname}"] = None
else:
mounts[f"all://*{hostname}"] = None
return mounts
def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes:
return value.encode(encoding) if isinstance(value, str) else value
@ -272,128 +221,3 @@ def peek_filelike_length(stream: typing.Any) -> int | None:
return None
return length
class URLPattern:
"""
A utility class currently used for making lookups against proxy keys...
# Wildcard matching...
>>> pattern = URLPattern("all://")
>>> pattern.matches(httpx.URL("http://example.com"))
True
# Witch scheme matching...
>>> pattern = URLPattern("https://")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
False
# With domain matching...
>>> pattern = URLPattern("https://example.com")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
False
>>> pattern.matches(httpx.URL("https://other.com"))
False
# Wildcard scheme, with domain matching...
>>> pattern = URLPattern("all://example.com")
>>> pattern.matches(httpx.URL("https://example.com"))
True
>>> pattern.matches(httpx.URL("http://example.com"))
True
>>> pattern.matches(httpx.URL("https://other.com"))
False
# With port matching...
>>> pattern = URLPattern("https://example.com:1234")
>>> pattern.matches(httpx.URL("https://example.com:1234"))
True
>>> pattern.matches(httpx.URL("https://example.com"))
False
"""
def __init__(self, pattern: str) -> None:
from ._urls import URL
if pattern and ":" not in pattern:
raise ValueError(
f"Proxy keys should use proper URL forms rather "
f"than plain scheme strings. "
f'Instead of "{pattern}", use "{pattern}://"'
)
url = URL(pattern)
self.pattern = pattern
self.scheme = "" if url.scheme == "all" else url.scheme
self.host = "" if url.host == "*" else url.host
self.port = url.port
if not url.host or url.host == "*":
self.host_regex: typing.Pattern[str] | None = None
elif url.host.startswith("*."):
# *.example.com should match "www.example.com", but not "example.com"
domain = re.escape(url.host[2:])
self.host_regex = re.compile(f"^.+\\.{domain}$")
elif url.host.startswith("*"):
# *example.com should match "www.example.com" and "example.com"
domain = re.escape(url.host[1:])
self.host_regex = re.compile(f"^(.+\\.)?{domain}$")
else:
# example.com should match "example.com" but not "www.example.com"
domain = re.escape(url.host)
self.host_regex = re.compile(f"^{domain}$")
def matches(self, other: URL) -> bool:
if self.scheme and self.scheme != other.scheme:
return False
if (
self.host
and self.host_regex is not None
and not self.host_regex.match(other.host)
):
return False
if self.port is not None and self.port != other.port:
return False
return True
@property
def priority(self) -> tuple[int, int, int]:
"""
The priority allows URLPattern instances to be sortable, so that
we can match from most specific to least specific.
"""
# URLs with a port should take priority over URLs without a port.
port_priority = 0 if self.port is not None else 1
# Longer hostnames should match first.
host_priority = -len(self.host)
# Longer schemes should match first.
scheme_priority = -len(self.scheme)
return (port_priority, host_priority, scheme_priority)
def __hash__(self) -> int:
return hash(self.pattern)
def __lt__(self, other: URLPattern) -> bool:
return self.priority < other.priority
def __eq__(self, other: typing.Any) -> bool:
return isinstance(other, URLPattern) and self.pattern == other.pattern
def is_ipv4_hostname(hostname: str) -> bool:
try:
ipaddress.IPv4Address(hostname.split("/")[0])
except Exception:
return False
return True
def is_ipv6_hostname(hostname: str) -> bool:
try:
ipaddress.IPv6Address(hostname.split("/")[0])
except Exception:
return False
return True

View File

@ -211,47 +211,6 @@ async def test_context_managed_transport():
]
@pytest.mark.anyio
async def test_context_managed_transport_and_mount():
class Transport(httpx.AsyncBaseTransport):
def __init__(self, name: str) -> None:
self.name: str = name
self.events: list[str] = []
async def aclose(self):
# The base implementation of httpx.AsyncBaseTransport just
# calls into `.aclose`, so simple transport cases can just override
# this method for any cleanup, where more complex cases
# might want to additionally override `__aenter__`/`__aexit__`.
self.events.append(f"{self.name}.aclose")
async def __aenter__(self):
await super().__aenter__()
self.events.append(f"{self.name}.__aenter__")
async def __aexit__(self, *args):
await super().__aexit__(*args)
self.events.append(f"{self.name}.__aexit__")
transport = Transport(name="transport")
mounted = Transport(name="mounted")
async with httpx.AsyncClient(
transport=transport, mounts={"http://www.example.org": mounted}
):
pass
assert transport.events == [
"transport.__aenter__",
"transport.aclose",
"transport.__aexit__",
]
assert mounted.events == [
"mounted.__aenter__",
"mounted.aclose",
"mounted.__aexit__",
]
def hello_world(request):
return httpx.Response(200, text="Hello, world!")
@ -288,31 +247,6 @@ async def test_client_closed_state_using_with_block():
await client.get("http://example.com")
def unmounted(request: httpx.Request) -> httpx.Response:
data = {"app": "unmounted"}
return httpx.Response(200, json=data)
def mounted(request: httpx.Request) -> httpx.Response:
data = {"app": "mounted"}
return httpx.Response(200, json=data)
@pytest.mark.anyio
async def test_mounted_transport():
transport = httpx.MockTransport(unmounted)
mounts = {"custom://": httpx.MockTransport(mounted)}
async with httpx.AsyncClient(transport=transport, mounts=mounts) as client:
response = await client.get("https://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "unmounted"}
response = await client.get("custom://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "mounted"}
@pytest.mark.anyio
async def test_async_mock_transport():
async def hello_world(request: httpx.Request) -> httpx.Response:

View File

@ -260,44 +260,6 @@ def test_context_managed_transport():
]
def test_context_managed_transport_and_mount():
class Transport(httpx.BaseTransport):
def __init__(self, name: str) -> None:
self.name: str = name
self.events: list[str] = []
def close(self):
# The base implementation of httpx.BaseTransport just
# calls into `.close`, so simple transport cases can just override
# this method for any cleanup, where more complex cases
# might want to additionally override `__enter__`/`__exit__`.
self.events.append(f"{self.name}.close")
def __enter__(self):
super().__enter__()
self.events.append(f"{self.name}.__enter__")
def __exit__(self, *args):
super().__exit__(*args)
self.events.append(f"{self.name}.__exit__")
transport = Transport(name="transport")
mounted = Transport(name="mounted")
with httpx.Client(transport=transport, mounts={"http://www.example.org": mounted}):
pass
assert transport.events == [
"transport.__enter__",
"transport.close",
"transport.__exit__",
]
assert mounted.events == [
"mounted.__enter__",
"mounted.close",
"mounted.__exit__",
]
def hello_world(request):
return httpx.Response(200, text="Hello, world!")
@ -364,41 +326,6 @@ def test_raw_client_header():
]
def unmounted(request: httpx.Request) -> httpx.Response:
data = {"app": "unmounted"}
return httpx.Response(200, json=data)
def mounted(request: httpx.Request) -> httpx.Response:
data = {"app": "mounted"}
return httpx.Response(200, json=data)
def test_mounted_transport():
transport = httpx.MockTransport(unmounted)
mounts = {"custom://": httpx.MockTransport(mounted)}
client = httpx.Client(transport=transport, mounts=mounts)
response = client.get("https://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "unmounted"}
response = client.get("custom://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "mounted"}
def test_all_mounted_transport():
mounts = {"all://": httpx.MockTransport(mounted)}
client = httpx.Client(mounts=mounts)
response = client.get("https://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "mounted"}
def test_server_extensions(server):
url = server.url.copy_with(path="/http_version_2")
with httpx.Client(http2=True) as client:

View File

@ -1,265 +0,0 @@
import httpcore
import pytest
import httpx
def url_to_origin(url: str) -> httpcore.URL:
"""
Given a URL string, return the origin in the raw tuple format that
`httpcore` uses for it's representation.
"""
u = httpx.URL(url)
return httpcore.URL(scheme=u.raw_scheme, host=u.raw_host, port=u.port, target="/")
def test_socks_proxy():
url = httpx.URL("http://www.example.com")
for proxy in ("socks5://localhost/", "socks5h://localhost/"):
client = httpx.Client(proxy=proxy)
transport = client._transport_for_url(url)
assert isinstance(transport, httpx.HTTPTransport)
assert isinstance(transport._pool, httpcore.SOCKSProxy)
async_client = httpx.AsyncClient(proxy=proxy)
async_transport = async_client._transport_for_url(url)
assert isinstance(async_transport, httpx.AsyncHTTPTransport)
assert isinstance(async_transport._pool, httpcore.AsyncSOCKSProxy)
PROXY_URL = "http://[::1]"
@pytest.mark.parametrize(
["url", "proxies", "expected"],
[
("http://example.com", {}, None),
("http://example.com", {"https://": PROXY_URL}, None),
("http://example.com", {"http://example.net": PROXY_URL}, None),
# Using "*" should match any domain name.
("http://example.com", {"http://*": PROXY_URL}, PROXY_URL),
("https://example.com", {"http://*": PROXY_URL}, None),
# Using "example.com" should match example.com, but not www.example.com
("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://www.example.com", {"http://example.com": PROXY_URL}, None),
# Using "*.example.com" should match www.example.com, but not example.com
("http://example.com", {"http://*.example.com": PROXY_URL}, None),
("http://www.example.com", {"http://*.example.com": PROXY_URL}, PROXY_URL),
# Using "*example.com" should match example.com and www.example.com
("http://example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
("http://www.example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
("http://wwwexample.com", {"http://*example.com": PROXY_URL}, None),
# ...
("http://example.com:443", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"all://": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://": PROXY_URL}, PROXY_URL),
("http://example.com", {"all://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://example.com:80": PROXY_URL}, PROXY_URL),
("http://example.com:8080", {"http://example.com:8080": PROXY_URL}, PROXY_URL),
("http://example.com:8080", {"http://example.com": PROXY_URL}, PROXY_URL),
(
"http://example.com",
{
"all://": PROXY_URL + ":1",
"http://": PROXY_URL + ":2",
"all://example.com": PROXY_URL + ":3",
"http://example.com": PROXY_URL + ":4",
},
PROXY_URL + ":4",
),
(
"http://example.com",
{
"all://": PROXY_URL + ":1",
"http://": PROXY_URL + ":2",
"all://example.com": PROXY_URL + ":3",
},
PROXY_URL + ":3",
),
(
"http://example.com",
{"all://": PROXY_URL + ":1", "http://": PROXY_URL + ":2"},
PROXY_URL + ":2",
),
],
)
def test_transport_for_request(url, proxies, expected):
mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
client = httpx.Client(mounts=mounts)
transport = client._transport_for_url(httpx.URL(url))
if expected is None:
assert transport is client._transport
else:
assert isinstance(transport, httpx.HTTPTransport)
assert isinstance(transport._pool, httpcore.HTTPProxy)
assert transport._pool._proxy_url == url_to_origin(expected)
@pytest.mark.anyio
@pytest.mark.network
async def test_async_proxy_close():
try:
transport = httpx.AsyncHTTPTransport(proxy=PROXY_URL)
client = httpx.AsyncClient(mounts={"https://": transport})
await client.get("http://example.com")
finally:
await client.aclose()
@pytest.mark.network
def test_sync_proxy_close():
try:
transport = httpx.HTTPTransport(proxy=PROXY_URL)
client = httpx.Client(mounts={"https://": transport})
client.get("http://example.com")
finally:
client.close()
def test_unsupported_proxy_scheme():
with pytest.raises(ValueError):
httpx.Client(proxy="ftp://127.0.0.1")
@pytest.mark.parametrize(
["url", "env", "expected"],
[
("http://google.com", {}, None),
(
"http://google.com",
{"HTTP_PROXY": "http://example.com"},
"http://example.com",
),
# Auto prepend http scheme
("http://google.com", {"HTTP_PROXY": "example.com"}, "http://example.com"),
(
"http://google.com",
{"HTTP_PROXY": "http://example.com", "NO_PROXY": "google.com"},
None,
),
# Everything proxied when NO_PROXY is empty/unset
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": ""},
"http://localhost:123",
),
# Not proxied if NO_PROXY matches URL.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "127.0.0.1"},
None,
),
# Proxied if NO_PROXY scheme does not match URL.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "https://127.0.0.1"},
"http://localhost:123",
),
# Proxied if NO_PROXY scheme does not match host.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "1.1.1.1"},
"http://localhost:123",
),
# Not proxied if NO_PROXY matches host domain suffix.
(
"http://courses.mit.edu",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
None,
),
# Proxied even though NO_PROXY matches host domain *prefix*.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
"http://localhost:123",
),
# Not proxied if one item in NO_PROXY case matches host domain suffix.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,edu.info"},
None,
),
# Not proxied if one item in NO_PROXY case matches host domain suffix.
# May include whitespace.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu, edu.info"},
None,
),
# Proxied if no items in NO_PROXY match.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,mit.info"},
"http://localhost:123",
),
# Proxied if NO_PROXY domain doesn't match.
(
"https://foo.example.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "www.example.com"},
"http://localhost:123",
),
# Not proxied for subdomains matching NO_PROXY, with a leading ".".
(
"https://www.example1.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": ".example1.com"},
None,
),
# Proxied, because NO_PROXY subdomains only match if "." separated.
(
"https://www.example2.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "ample2.com"},
"http://localhost:123",
),
# No requests are proxied if NO_PROXY="*" is set.
(
"https://www.example3.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "*"},
None,
),
],
)
@pytest.mark.parametrize("client_class", [httpx.Client, httpx.AsyncClient])
def test_proxies_environ(monkeypatch, client_class, url, env, expected):
for name, value in env.items():
monkeypatch.setenv(name, value)
client = client_class()
transport = client._transport_for_url(httpx.URL(url))
if expected is None:
assert transport == client._transport
else:
assert transport._pool._proxy_url == url_to_origin(expected)
@pytest.mark.parametrize(
["proxies", "is_valid"],
[
({"http": "http://127.0.0.1"}, False),
({"https": "http://127.0.0.1"}, False),
({"all": "http://127.0.0.1"}, False),
({"http://": "http://127.0.0.1"}, True),
({"https://": "http://127.0.0.1"}, True),
({"all://": "http://127.0.0.1"}, True),
],
)
def test_for_deprecated_proxy_params(proxies, is_valid):
mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
if not is_valid:
with pytest.raises(ValueError):
httpx.Client(mounts=mounts)
else:
httpx.Client(mounts=mounts)
def test_proxy_with_mounts():
proxy_transport = httpx.HTTPTransport(proxy="http://127.0.0.1")
client = httpx.Client(mounts={"http://": proxy_transport})
transport = client._transport_for_url(httpx.URL("http://example.com"))
assert transport == proxy_transport

View File

@ -1,15 +1,9 @@
import json
import logging
import os
import random
import pytest
import httpx
from httpx._utils import (
URLPattern,
get_environment_proxies,
)
@pytest.mark.parametrize(
@ -118,32 +112,6 @@ def test_logging_redirect_chain(server, caplog):
]
@pytest.mark.parametrize(
["environment", "proxies"],
[
({}, {}),
({"HTTP_PROXY": "http://127.0.0.1"}, {"http://": "http://127.0.0.1"}),
(
{"https_proxy": "http://127.0.0.1", "HTTP_PROXY": "https://127.0.0.1"},
{"https://": "http://127.0.0.1", "http://": "https://127.0.0.1"},
),
({"all_proxy": "http://127.0.0.1"}, {"all://": "http://127.0.0.1"}),
({"TRAVIS_APT_PROXY": "http://127.0.0.1"}, {}),
({"no_proxy": "127.0.0.1"}, {"all://127.0.0.1": None}),
({"no_proxy": "192.168.0.0/16"}, {"all://192.168.0.0/16": None}),
({"no_proxy": "::1"}, {"all://[::1]": None}),
({"no_proxy": "localhost"}, {"all://localhost": None}),
({"no_proxy": "github.com"}, {"all://*github.com": None}),
({"no_proxy": ".github.com"}, {"all://*.github.com": None}),
({"no_proxy": "http://github.com"}, {"http://github.com": None}),
],
)
def test_get_environment_proxies(environment, proxies):
os.environ.update(environment)
assert get_environment_proxies() == proxies
@pytest.mark.parametrize(
"headers, output",
[
@ -212,41 +180,3 @@ def test_is_not_https_redirect_if_not_default_ports():
headers = client._redirect_headers(request, url, "GET")
assert "Authorization" not in headers
@pytest.mark.parametrize(
["pattern", "url", "expected"],
[
("http://example.com", "http://example.com", True),
("http://example.com", "https://example.com", False),
("http://example.com", "http://other.com", False),
("http://example.com:123", "http://example.com:123", True),
("http://example.com:123", "http://example.com:456", False),
("http://example.com:123", "http://example.com", False),
("all://example.com", "http://example.com", True),
("all://example.com", "https://example.com", True),
("http://", "http://example.com", True),
("http://", "https://example.com", False),
("all://", "https://example.com:123", True),
("", "https://example.com:123", True),
],
)
def test_url_matches(pattern, url, expected):
pattern = URLPattern(pattern)
assert pattern.matches(httpx.URL(url)) == expected
def test_pattern_priority():
matchers = [
URLPattern("all://"),
URLPattern("http://"),
URLPattern("http://example.com"),
URLPattern("http://example.com:123"),
]
random.shuffle(matchers)
assert sorted(matchers) == [
URLPattern("http://example.com:123"),
URLPattern("http://example.com"),
URLPattern("http://"),
URLPattern("all://"),
]