Prep for introducing SyncClient (#713)

* Reorganize method ordering in client

* Move 'request'

* Use 'httpx.Proxy' for proxy configuration
This commit is contained in:
Tom Christie 2020-01-07 10:27:01 +00:00 committed by GitHub
parent e1afbfa7b4
commit f17ab37b2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 339 additions and 234 deletions

View File

@ -3,7 +3,7 @@ from .api import delete, get, head, options, patch, post, put, request, stream
from .auth import BasicAuth, DigestAuth
from .client import AsyncClient, Client
from .config import TimeoutConfig # For 0.8 backwards compat.
from .config import PoolLimits, Timeout
from .config import PoolLimits, Proxy, Timeout
from .dispatch.proxy_http import HTTPProxy, HTTPProxyMode
from .exceptions import (
ConnectionClosed,
@ -49,6 +49,7 @@ __all__ = [
"Client",
"DigestAuth",
"PoolLimits",
"Proxy",
"Timeout",
"TimeoutConfig", # For 0.8 backwards compat.
"HTTPProxy",

View File

@ -14,6 +14,8 @@ from .config import (
UNSET,
CertTypes,
PoolLimits,
ProxiesTypes,
Proxy,
Timeout,
TimeoutTypes,
UnsetType,
@ -37,7 +39,6 @@ from .models import (
CookieTypes,
Headers,
HeaderTypes,
ProxiesTypes,
QueryParams,
QueryParamTypes,
Request,
@ -126,20 +127,6 @@ class AsyncClient:
trust_env: bool = True,
uds: str = None,
):
if app is not None:
dispatch = ASGIDispatch(app=app)
if dispatch is None:
dispatch = ConnectionPool(
verify=verify,
cert=cert,
http2=http2,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
uds=uds,
)
if base_url is None:
self.base_url = URL("", allow_relative=True)
else:
@ -148,6 +135,9 @@ class AsyncClient:
if params is None:
params = {}
if proxies is None and trust_env:
proxies = typing.cast(ProxiesTypes, get_environment_proxies())
self.auth = auth
self._params = QueryParams(params)
self._headers = Headers(headers)
@ -155,13 +145,24 @@ class AsyncClient:
self.timeout = Timeout(timeout)
self.max_redirects = max_redirects
self.trust_env = trust_env
self.dispatch = dispatch
self.netrc = NetRCInfo()
self.dispatch = self.init_dispatch(
verify=verify,
cert=cert,
http2=http2,
pool_limits=pool_limits,
dispatch=dispatch,
app=app,
backend=backend,
trust_env=trust_env,
uds=uds,
)
if proxies is None and trust_env:
proxies = typing.cast(ProxiesTypes, get_environment_proxies())
self.proxies: typing.Dict[str, AsyncDispatcher] = _proxies_to_dispatchers(
self.proxies: typing.Dict[str, AsyncDispatcher] = self.proxies_to_dispatchers(
proxies,
verify=verify,
cert=cert,
@ -171,6 +172,114 @@ class AsyncClient:
trust_env=trust_env,
)
def init_dispatch(
self,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
dispatch: AsyncDispatcher = None,
app: typing.Callable = None,
backend: typing.Union[str, ConcurrencyBackend] = "auto",
trust_env: bool = True,
uds: str = None,
) -> AsyncDispatcher:
if dispatch is not None:
return dispatch
if app is not None:
return ASGIDispatch(app=app)
return ConnectionPool(
verify=verify,
cert=cert,
http2=http2,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
uds=uds,
)
def init_proxy_dispatch(
self,
proxy: Proxy,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
backend: typing.Union[str, ConcurrencyBackend] = "auto",
trust_env: bool = True,
) -> AsyncDispatcher:
return HTTPProxy(
proxy_url=proxy.url,
proxy_headers=proxy.headers,
proxy_mode=proxy.mode,
verify=verify,
cert=cert,
http2=http2,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
)
def proxies_to_dispatchers(
self,
proxies: typing.Optional[ProxiesTypes],
verify: VerifyTypes,
cert: typing.Optional[CertTypes],
http2: bool,
pool_limits: PoolLimits,
backend: typing.Union[str, ConcurrencyBackend],
trust_env: bool,
) -> typing.Dict[str, AsyncDispatcher]:
if proxies is None:
return {}
elif isinstance(proxies, (str, URL, Proxy)):
proxy = Proxy(url=proxies) if isinstance(proxies, (str, URL)) else proxies
return {
"all": self.init_proxy_dispatch(
proxy=proxy,
verify=verify,
cert=cert,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
http2=http2,
)
}
elif isinstance(proxies, AsyncDispatcher): # pragma: nocover
return {"all": proxies}
# We're supporting this style for now, but we'll want to deprecate it.
#
# raise RuntimeError(
# "Passing a AsyncDispatcher instance to 'proxies=' is no longer
# supported. Use `httpx.Proxy() instead.`"
# )
else:
new_proxies = {}
for key, value in proxies.items():
if isinstance(value, (str, URL, Proxy)):
proxy = Proxy(url=value) if isinstance(value, (str, URL)) else value
new_proxies[str(key)] = self.init_proxy_dispatch(
proxy=proxy,
verify=verify,
cert=cert,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
http2=http2,
)
elif isinstance(value, AsyncDispatcher): # pragma: nocover
new_proxies[str(key)] = value
# We're supporting this style for now, but we'll want to
# deprecate it.
#
# raise RuntimeError(
# "Passing a AsyncDispatcher instance to 'proxies=' is "
# "no longer supported. Use `httpx.Proxy() instead.`"
# )
return new_proxies
@property
def headers(self) -> Headers:
"""
@ -204,71 +313,6 @@ class AsyncClient:
def params(self, params: QueryParamTypes) -> None:
self._params = QueryParams(params)
async def request(
self,
method: str,
url: URLTypes,
*,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
if cert is not None: # pragma: nocover
raise RuntimeError(
"Passing a 'cert' argument when making a request on a client "
"is not supported anymore. Instantiate a new client instead, "
"passing any 'cert' arguments to the client itself."
)
if verify is not None: # pragma: nocover
raise RuntimeError(
"Passing a 'verify' argument when making a request on a client "
"is not supported anymore. Instantiate a new client instead, "
"passing any 'verify' arguments to the client itself."
)
if trust_env is not None: # pragma: nocover
raise RuntimeError(
"Passing a 'trust_env' argument when making a request on a client "
"is not supported anymore. Instantiate a new client instead, "
"passing any 'trust_env' argument to the client itself."
)
if stream: # pragma: nocover
warnings.warn(
"The 'stream=True' argument is due to be deprecated. "
"Use 'async with client.stream(method, url, ...) as response' instead."
)
request = self.build_request(
method=method,
url=url,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
response = await self.send(
request,
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
return response
def stream(
self,
method: str,
@ -382,35 +426,7 @@ class AsyncClient:
return merged_queryparams
return params
async def send(
self,
request: Request,
*,
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
if request.url.scheme not in ("http", "https"):
raise InvalidURL('URL scheme must be "http" or "https".')
timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
auth = self.setup_auth(request, auth)
response = await self.send_handling_redirects(
request, auth=auth, timeout=timeout, allow_redirects=allow_redirects,
)
if not stream:
try:
await response.aread()
finally:
await response.aclose()
return response
def setup_auth(self, request: Request, auth: AuthTypes = None) -> Auth:
def build_auth(self, request: Request, auth: AuthTypes = None) -> Auth:
auth = self.auth if auth is None else auth
if auth is not None:
@ -433,46 +449,6 @@ class AsyncClient:
return Auth()
async def send_handling_redirects(
self,
request: Request,
auth: Auth,
timeout: Timeout,
allow_redirects: bool = True,
history: typing.List[Response] = None,
) -> Response:
if history is None:
history = []
while True:
if len(history) > self.max_redirects:
raise TooManyRedirects()
if request.url in (response.url for response in history):
raise RedirectLoop()
response = await self.send_handling_auth(
request, history, auth=auth, timeout=timeout,
)
response.history = list(history)
if not response.is_redirect:
return response
await response.aread()
request = self.build_redirect_request(request, response)
history = history + [response]
if not allow_redirects:
response.call_next = functools.partial(
self.send_handling_redirects,
request=request,
auth=auth,
timeout=timeout,
allow_redirects=False,
history=history,
)
return response
def build_redirect_request(self, request: Request, response: Response) -> Request:
"""
Given a request and a redirect response, return a new request that
@ -570,6 +546,164 @@ class AsyncClient:
return request.stream
def dispatcher_for_url(self, url: URL) -> AsyncDispatcher:
"""
Returns the AsyncDispatcher instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
if self.proxies:
is_default_port = (url.scheme == "http" and url.port == 80) or (
url.scheme == "https" and url.port == 443
)
hostname = f"{url.host}:{url.port}"
proxy_keys = (
f"{url.scheme}://{hostname}",
f"{url.scheme}://{url.host}" if is_default_port else None,
f"all://{hostname}",
f"all://{url.host}" if is_default_port else None,
url.scheme,
"all",
)
for proxy_key in proxy_keys:
if proxy_key and proxy_key in self.proxies:
dispatcher = self.proxies[proxy_key]
return dispatcher
return self.dispatch
async def request(
self,
method: str,
url: URLTypes,
*,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
if cert is not None: # pragma: nocover
raise RuntimeError(
"Passing a 'cert' argument when making a request on a client "
"is not supported anymore. Instantiate a new client instead, "
"passing any 'cert' arguments to the client itself."
)
if verify is not None: # pragma: nocover
raise RuntimeError(
"Passing a 'verify' argument when making a request on a client "
"is not supported anymore. Instantiate a new client instead, "
"passing any 'verify' arguments to the client itself."
)
if trust_env is not None: # pragma: nocover
raise RuntimeError(
"Passing a 'trust_env' argument when making a request on a client "
"is not supported anymore. Instantiate a new client instead, "
"passing any 'trust_env' argument to the client itself."
)
if stream: # pragma: nocover
warnings.warn(
"The 'stream=True' argument is due to be deprecated. "
"Use 'async with client.stream(method, url, ...) as response' instead."
)
request = self.build_request(
method=method,
url=url,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
response = await self.send(
request,
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
return response
async def send(
self,
request: Request,
*,
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
if request.url.scheme not in ("http", "https"):
raise InvalidURL('URL scheme must be "http" or "https".')
timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
auth = self.build_auth(request, auth)
response = await self.send_handling_redirects(
request, auth=auth, timeout=timeout, allow_redirects=allow_redirects,
)
if not stream:
try:
await response.aread()
finally:
await response.aclose()
return response
async def send_handling_redirects(
self,
request: Request,
auth: Auth,
timeout: Timeout,
allow_redirects: bool = True,
history: typing.List[Response] = None,
) -> Response:
if history is None:
history = []
while True:
if len(history) > self.max_redirects:
raise TooManyRedirects()
if request.url in (response.url for response in history):
raise RedirectLoop()
response = await self.send_handling_auth(
request, auth=auth, timeout=timeout, history=history
)
response.history = list(history)
if not response.is_redirect:
return response
await response.aread()
request = self.build_redirect_request(request, response)
history = history + [response]
if not allow_redirects:
response.call_next = functools.partial(
self.send_handling_redirects,
request=request,
auth=auth,
timeout=timeout,
allow_redirects=False,
history=history,
)
return response
async def send_handling_auth(
self,
request: Request,
@ -621,30 +755,6 @@ class AsyncClient:
return response
def dispatcher_for_url(self, url: URL) -> AsyncDispatcher:
"""
Returns the AsyncDispatcher instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
if self.proxies:
is_default_port = (url.scheme == "http" and url.port == 80) or (
url.scheme == "https" and url.port == 443
)
hostname = f"{url.host}:{url.port}"
proxy_keys = (
f"{url.scheme}://{hostname}",
f"{url.scheme}://{url.host}" if is_default_port else None,
f"all://{hostname}",
f"all://{url.host}" if is_default_port else None,
url.scheme,
"all",
)
for proxy_key in proxy_keys:
if proxy_key and proxy_key in self.proxies:
return self.proxies[proxy_key]
return self.dispatch
async def get(
self,
url: URLTypes,
@ -888,46 +998,6 @@ class AsyncClient:
await self.aclose()
def _proxies_to_dispatchers(
proxies: typing.Optional[ProxiesTypes],
verify: VerifyTypes,
cert: typing.Optional[CertTypes],
http2: bool,
pool_limits: PoolLimits,
backend: typing.Union[str, ConcurrencyBackend],
trust_env: bool,
) -> typing.Dict[str, AsyncDispatcher]:
def _proxy_from_url(url: URLTypes) -> AsyncDispatcher:
nonlocal verify, cert, http2, pool_limits, backend, trust_env
url = URL(url)
if url.scheme in ("http", "https"):
return HTTPProxy(
url,
verify=verify,
cert=cert,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
http2=http2,
)
raise ValueError(f"Unknown proxy for {url!r}")
if proxies is None:
return {}
elif isinstance(proxies, (str, URL)):
return {"all": _proxy_from_url(proxies)}
elif isinstance(proxies, AsyncDispatcher):
return {"all": proxies}
else:
new_proxies = {}
for key, dispatcher_or_url in proxies.items():
if isinstance(dispatcher_or_url, (str, URL)):
new_proxies[str(key)] = _proxy_from_url(dispatcher_or_url)
else:
new_proxies[str(key)] = dispatcher_or_url
return new_proxies
class StreamContextManager:
def __init__(
self,

View File

@ -5,7 +5,7 @@ from pathlib import Path
import certifi
from .__version__ import __version__
from .models import URL, Headers, HeaderTypes, URLTypes
from .utils import get_ca_bundle_from_env, get_logger
CertTypes = typing.Union[str, typing.Tuple[str, str], typing.Tuple[str, str, str]]
@ -13,10 +13,11 @@ VerifyTypes = typing.Union[str, bool, ssl.SSLContext]
TimeoutTypes = typing.Union[
None, float, typing.Tuple[float, float, float, float], "Timeout"
]
ProxiesTypes = typing.Union[
URLTypes, "Proxy", typing.Dict[URLTypes, typing.Union[URLTypes, "Proxy"]]
]
USER_AGENT = f"python-httpx/{__version__}"
DEFAULT_CIPHERS = ":".join(
[
"ECDHE+AESGCM",
@ -305,6 +306,30 @@ class PoolLimits:
)
class Proxy:
def __init__(
self, url: URLTypes, *, headers: HeaderTypes = None, mode: str = "DEFAULT",
):
url = URL(url)
headers = Headers(headers)
if url.scheme not in ("http", "https"):
raise ValueError(f"Unknown scheme for proxy URL {url!r}")
if mode not in ("DEFAULT", "CONNECT_ONLY", "TUNNEL_ONLY"):
raise ValueError(f"Unknown proxy mode {mode!r}")
self.url = url
self.headers = headers
self.mode = mode
def __repr__(self) -> str:
return (
f"Proxy(url={str(self.url)!r}, "
f"headers={dict(self.headers)!r}, "
f"mode={self.mode!r})"
)
TimeoutConfig = Timeout # Synonym for backwards compat

View File

@ -282,8 +282,3 @@ SUPPORTED_DECODERS = {
if brotli is None:
SUPPORTED_DECODERS.pop("br") # pragma: nocover
ACCEPT_ENCODING = ", ".join(
[key for key in SUPPORTED_DECODERS.keys() if key != "identity"]
)

View File

@ -12,7 +12,7 @@ from urllib.parse import parse_qsl, urlencode
import chardet
import rfc3986
from .config import USER_AGENT
from .__version__ import __version__
from .content_streams import (
ByteStream,
ContentStream,
@ -21,7 +21,6 @@ from .content_streams import (
encode,
)
from .decoders import (
ACCEPT_ENCODING,
SUPPORTED_DECODERS,
Decoder,
IdentityDecoder,
@ -73,12 +72,6 @@ HeaderTypes = typing.Union[
CookieTypes = typing.Union["Cookies", CookieJar, typing.Dict[str, str]]
ProxiesTypes = typing.Union[
URLTypes,
"AsyncDispatcher",
typing.Dict[URLTypes, typing.Union[URLTypes, "AsyncDispatcher"]],
]
class URL:
def __init__(
@ -589,6 +582,12 @@ class Headers(typing.MutableMapping[str, str]):
return f"{class_name}({as_list!r}{encoding_str})"
USER_AGENT = f"python-httpx/{__version__}"
ACCEPT_ENCODING = ", ".join(
[key for key in SUPPORTED_DECODERS.keys() if key != "identity"]
)
class Request:
def __init__(
self,

View File

@ -12,9 +12,9 @@ import httpx
{"http": "http://127.0.0.1", "https": "https://127.0.0.1"},
[("http", "http://127.0.0.1"), ("https", "https://127.0.0.1")],
),
(httpx.HTTPProxy("http://127.0.0.1"), [("all", "http://127.0.0.1")]),
(httpx.Proxy("http://127.0.0.1"), [("all", "http://127.0.0.1")]),
(
{"https": httpx.HTTPProxy("https://127.0.0.1"), "all": "http://127.0.0.1"},
{"https": httpx.Proxy("https://127.0.0.1"), "all": "http://127.0.0.1"},
[("all", "http://127.0.0.1"), ("https", "https://127.0.0.1")],
),
],

View File

@ -206,3 +206,18 @@ def test_ssl_config_support_for_keylog_file(tmpdir, monkeypatch): # pragma: noc
ssl_config = SSLConfig(trust_env=False)
assert ssl_config.ssl_context.keylog_filename is None
def test_proxy_from_url():
proxy = httpx.Proxy("https://example.com")
assert repr(proxy) == "Proxy(url='https://example.com', headers={}, mode='DEFAULT')"
def test_invalid_proxy_scheme():
with pytest.raises(ValueError):
httpx.Proxy("invalid://example.com")
def test_invalid_proxy_mode():
with pytest.raises(ValueError):
httpx.Proxy("https://example.com", mode="INVALID")