Use unasync for tests

This commit is contained in:
Kar Petrosyan 2025-02-27 20:38:08 +04:00
parent 6baba9c1ce
commit a2fc6825a5
23 changed files with 2454 additions and 21 deletions

View File

@ -12,3 +12,4 @@ set -x
${PREFIX}ruff format $SOURCE_FILES --diff
${PREFIX}mypy $SOURCE_FILES
${PREFIX}ruff check $SOURCE_FILES
${PREFIX}python scripts/unasync.py --check

View File

@ -10,3 +10,4 @@ set -x
${PREFIX}ruff check --fix $SOURCE_FILES
${PREFIX}ruff format $SOURCE_FILES
${PREFIX}python scripts/unasync.py

92
scripts/unasync.py Executable file
View File

@ -0,0 +1,92 @@
#!venv/bin/python
import os
import re
import sys
from pprint import pprint
SUBS = [
# httpx specific
('AsyncByteStream', 'SyncByteStream'),
('async_auth_flow', 'sync_auth_flow'),
('handle_async_request', 'handle_request'),
# general
('AsyncIterator', 'Iterator'),
('from anyio import Lock', 'from threading import Lock'),
('Async([A-Z][A-Za-z0-9_]*)', r'\2'),
('async def', 'def'),
('async with', 'with'),
('async for', 'for'),
('await ', ''),
('aclose', 'close'),
('aread', 'read'),
('__aenter__', '__enter__'),
('__aexit__', '__exit__'),
('__aiter__', '__iter__'),
('@pytest.mark.anyio', ''),
]
COMPILED_SUBS = [
(re.compile(r'(^|\b)' + regex + r'($|\b)'), repl)
for regex, repl in SUBS
]
USED_SUBS = set()
def unasync_line(line):
for index, (regex, repl) in enumerate(COMPILED_SUBS):
old_line = line
line = re.sub(regex, repl, line)
if old_line != line:
USED_SUBS.add(index)
return line
def unasync_file(in_path, out_path):
with open(in_path, "r") as in_file:
with open(out_path, "w", newline="") as out_file:
for line in in_file.readlines():
line = unasync_line(line)
out_file.write(line)
def unasync_file_check(in_path, out_path):
with open(in_path, "r") as in_file:
with open(out_path, "r") as out_file:
for in_line, out_line in zip(in_file.readlines(), out_file.readlines()):
expected = unasync_line(in_line)
if out_line != expected:
print(f'unasync mismatch between {in_path!r} and {out_path!r}')
print(f'Async code: {in_line!r}')
print(f'Expected sync code: {expected!r}')
print(f'Actual sync code: {out_line!r}')
sys.exit(1)
def unasync_dir(in_dir, out_dir, check_only=False):
for dirpath, dirnames, filenames in os.walk(in_dir):
for filename in filenames:
if not filename.endswith('.py'):
continue
rel_dir = os.path.relpath(dirpath, in_dir)
in_path = os.path.normpath(os.path.join(in_dir, rel_dir, filename))
out_path = os.path.normpath(os.path.join(out_dir, rel_dir, filename))
print(in_path, '->', out_path)
if check_only:
unasync_file_check(in_path, out_path)
else:
unasync_file(in_path, out_path)
def main():
check_only = '--check' in sys.argv
unasync_dir("tests/client/async", "tests/client/sync", check_only=check_only)
if len(USED_SUBS) != len(SUBS):
unused_subs = [SUBS[i] for i in range(len(SUBS)) if i not in USED_SUBS]
print("These patterns were not used:")
pprint(unused_subs)
exit(1)
if __name__ == '__main__':
main()

View File

View File

@ -11,12 +11,12 @@ import sys
import typing
from urllib.request import parse_keqv_list
import anyio
import pytest
from anyio import Lock
import httpx
from ..common import FIXTURES_DIR
from ...common import FIXTURES_DIR
class App:
@ -140,13 +140,11 @@ class AsyncAuth(httpx.Auth):
"""
def __init__(self) -> None:
self._async_lock = anyio.Lock()
self._lock = Lock()
async def async_auth_flow(
self, request: httpx.Request
) -> typing.AsyncGenerator[httpx.Request, httpx.Response]:
async with self._async_lock:
request.headers["Authorization"] = "async-auth"
async def async_auth_flow(self, request: httpx.Request) -> typing.Any:
async with self._lock:
request.headers["Authorization"] = "auth"
yield request
@ -694,4 +692,4 @@ async def test_auth() -> None:
response = await client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "async-auth"}
assert response.json() == {"auth": "auth"}

View File

@ -100,17 +100,6 @@ async def test_stream_request(server):
assert response.status_code == 200
@pytest.mark.anyio
async def test_cannot_stream_sync_request(server):
def hello_world() -> typing.Iterator[bytes]: # pragma: no cover
yield b"Hello, "
yield b"world!"
async with httpx.AsyncClient() as client:
with pytest.raises(RuntimeError):
await client.post(server.url, content=hello_world())
@pytest.mark.anyio
async def test_raise_for_status(server):
async with httpx.AsyncClient() as client:
@ -314,7 +303,7 @@ async def test_mounted_transport():
@pytest.mark.anyio
async def test_async_mock_transport():
async def test_mock_transport():
async def hello_world(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, text="Hello, world!")

View File

View File

@ -0,0 +1,655 @@
"""
Integration tests for authentication.
Unit tests for auth classes also exist in tests/test_auth.py
"""
import hashlib
import netrc
import os
import sys
import typing
from threading import Lock
from urllib.request import parse_keqv_list
import pytest
import httpx
from ...common import FIXTURES_DIR
class App:
"""
A mock app to test auth credentials.
"""
def __init__(self, auth_header: str = "", status_code: int = 200) -> None:
self.auth_header = auth_header
self.status_code = status_code
def __call__(self, request: httpx.Request) -> httpx.Response:
headers = {"www-authenticate": self.auth_header} if self.auth_header else {}
data = {"auth": request.headers.get("Authorization")}
return httpx.Response(self.status_code, headers=headers, json=data)
class DigestApp:
def __init__(
self,
algorithm: str = "SHA-256",
send_response_after_attempt: int = 1,
qop: str = "auth",
regenerate_nonce: bool = True,
) -> None:
self.algorithm = algorithm
self.send_response_after_attempt = send_response_after_attempt
self.qop = qop
self._regenerate_nonce = regenerate_nonce
self._response_count = 0
def __call__(self, request: httpx.Request) -> httpx.Response:
if self._response_count < self.send_response_after_attempt:
return self.challenge_send(request)
data = {"auth": request.headers.get("Authorization")}
return httpx.Response(200, json=data)
def challenge_send(self, request: httpx.Request) -> httpx.Response:
self._response_count += 1
nonce = (
hashlib.sha256(os.urandom(8)).hexdigest()
if self._regenerate_nonce
else "ee96edced2a0b43e4869e96ebe27563f369c1205a049d06419bb51d8aeddf3d3"
)
challenge_data = {
"nonce": nonce,
"qop": self.qop,
"opaque": (
"ee6378f3ee14ebfd2fff54b70a91a7c9390518047f242ab2271380db0e14bda1"
),
"algorithm": self.algorithm,
"stale": "FALSE",
}
challenge_str = ", ".join(
'{}="{}"'.format(key, value)
for key, value in challenge_data.items()
if value
)
headers = {
"www-authenticate": f'Digest realm="httpx@example.org", {challenge_str}',
}
return httpx.Response(401, headers=headers)
class RepeatAuth(httpx.Auth):
"""
A mock authentication scheme that requires clients to send
the request a fixed number of times, and then send a last request containing
an aggregation of nonces that the server sent in 'WWW-Authenticate' headers
of intermediate responses.
"""
requires_request_body = True
def __init__(self, repeat: int) -> None:
self.repeat = repeat
def auth_flow(
self, request: httpx.Request
) -> typing.Generator[httpx.Request, httpx.Response, None]:
nonces = []
for index in range(self.repeat):
request.headers["Authorization"] = f"Repeat {index}"
response = yield request
nonces.append(response.headers["www-authenticate"])
key = ".".join(nonces)
request.headers["Authorization"] = f"Repeat {key}"
yield request
class ResponseBodyAuth(httpx.Auth):
"""
A mock authentication scheme that requires clients to send an 'Authorization'
header, then send back the contents of the response in the 'Authorization'
header.
"""
requires_response_body = True
def __init__(self, token: str) -> None:
self.token = token
def auth_flow(
self, request: httpx.Request
) -> typing.Generator[httpx.Request, httpx.Response, None]:
request.headers["Authorization"] = self.token
response = yield request
data = response.text
request.headers["Authorization"] = data
yield request
class Auth(httpx.Auth):
"""
A mock authentication scheme that uses a different implementation for the
sync and async cases.
"""
def __init__(self) -> None:
self._lock = Lock()
def sync_auth_flow(self, request: httpx.Request) -> typing.Any:
with self._lock:
request.headers["Authorization"] = "auth"
yield request
def test_basic_auth() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
def test_basic_auth_with_stream() -> None:
"""
See: https://github.com/encode/httpx/pull/1312
"""
url = "https://example.org/"
auth = ("user", "password123")
app = App()
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
with client.stream("GET", url) as response:
response.read()
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
def test_basic_auth_in_url() -> None:
url = "https://user:password123@example.org/"
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
def test_basic_auth_on_session() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
def test_custom_auth() -> None:
url = "https://example.org/"
app = App()
def auth(request: httpx.Request) -> httpx.Request:
request.headers["Authorization"] = "Token 123"
return request
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Token 123"}
def test_netrc_auth_credentials_exist() -> None:
"""
When netrc auth is being used and a request is made to a host that is
in the netrc file, then the relevant credentials should be applied.
"""
netrc_file = str(FIXTURES_DIR / ".netrc")
url = "http://netrcexample.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
"auth": "Basic ZXhhbXBsZS11c2VybmFtZTpleGFtcGxlLXBhc3N3b3Jk"
}
def test_netrc_auth_credentials_do_not_exist() -> None:
"""
When netrc auth is being used and a request is made to a host that is
not in the netrc file, then no credentials should be applied.
"""
netrc_file = str(FIXTURES_DIR / ".netrc")
url = "http://example.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": None}
@pytest.mark.skipif(
sys.version_info >= (3, 11),
reason="netrc files without a password are valid from Python >= 3.11",
)
def test_netrc_auth_nopassword_parse_error() -> None: # pragma: no cover
"""
Python has different netrc parsing behaviours with different versions.
For Python < 3.11 a netrc file with no password is invalid. In this case
we want to allow the parse error to be raised.
"""
netrc_file = str(FIXTURES_DIR / ".netrc-nopassword")
with pytest.raises(netrc.NetrcParseError):
httpx.NetRCAuth(netrc_file)
def test_auth_disable_per_request() -> None:
url = "https://example.org/"
auth = ("user", "password123")
app = App()
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url, auth=None)
assert response.status_code == 200
assert response.json() == {"auth": None}
def test_auth_hidden_url() -> None:
url = "http://example-username:example-password@example.org/"
expected = "URL('http://example-username:[secure]@example.org/')"
assert url == httpx.URL(url)
assert expected == repr(httpx.URL(url))
def test_auth_hidden_header() -> None:
url = "https://example.org/"
auth = ("example-username", "example-password")
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert "'authorization': '[secure]'" in str(response.request.headers)
def test_auth_property() -> None:
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
assert client.auth is None
client.auth = ("user", "password123") # type: ignore
assert isinstance(client.auth, httpx.BasicAuth)
url = "https://example.org/"
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
def test_auth_invalid_type() -> None:
app = App()
with pytest.raises(TypeError):
client = httpx.Client(
transport=httpx.MockTransport(app),
auth="not a tuple, not a callable", # type: ignore
)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
with pytest.raises(TypeError):
client.get(auth="not a tuple, not a callable") # type: ignore
with pytest.raises(TypeError):
client.auth = "not a tuple, not a callable" # type: ignore
def test_digest_auth_returns_no_auth_if_no_digest_header_in_response() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": None}
assert len(response.history) == 0
def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
auth_header = "Token ..."
app = App(auth_header=auth_header, status_code=401)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 401
assert response.json() == {"auth": None}
assert len(response.history) == 0
def test_digest_auth_200_response_including_digest_auth_header() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
auth_header = 'Digest realm="realm@host.com",qop="auth",nonce="abc",opaque="xyz"'
app = App(auth_header=auth_header, status_code=200)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": None}
assert len(response.history) == 0
def test_digest_auth_401_response_without_digest_auth_header() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App(auth_header="", status_code=401)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 401
assert response.json() == {"auth": None}
assert len(response.history) == 0
@pytest.mark.parametrize(
"algorithm,expected_hash_length,expected_response_length",
[
("MD5", 64, 32),
("MD5-SESS", 64, 32),
("SHA", 64, 40),
("SHA-SESS", 64, 40),
("SHA-256", 64, 64),
("SHA-256-SESS", 64, 64),
("SHA-512", 64, 128),
("SHA-512-SESS", 64, 128),
],
)
def test_digest_auth(
algorithm: str, expected_hash_length: int, expected_response_length: int
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(algorithm=algorithm)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"
response_fields = [field.strip() for field in fields.split(",")]
digest_data = dict(field.split("=") for field in response_fields)
assert digest_data["username"] == '"user"'
assert digest_data["realm"] == '"httpx@example.org"'
assert "nonce" in digest_data
assert digest_data["uri"] == '"/"'
assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes
assert len(digest_data["opaque"]) == expected_hash_length + 2
assert digest_data["algorithm"] == algorithm
assert digest_data["qop"] == "auth"
assert digest_data["nc"] == "00000001"
assert len(digest_data["cnonce"]) == 16 + 2
def test_digest_auth_no_specified_qop() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="")
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"
response_fields = [field.strip() for field in fields.split(",")]
digest_data = dict(field.split("=") for field in response_fields)
assert "qop" not in digest_data
assert "nc" not in digest_data
assert "cnonce" not in digest_data
assert digest_data["username"] == '"user"'
assert digest_data["realm"] == '"httpx@example.org"'
assert len(digest_data["nonce"]) == 64 + 2 # extra quotes
assert digest_data["uri"] == '"/"'
assert len(digest_data["response"]) == 64 + 2
assert len(digest_data["opaque"]) == 64 + 2
assert digest_data["algorithm"] == "SHA-256"
@pytest.mark.parametrize("qop", ("auth, auth-int", "auth,auth-int", "unknown,auth"))
def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop=qop)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert len(response.history) == 1
def test_digest_auth_qop_auth_int_not_implemented() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="auth-int")
with httpx.Client(transport=httpx.MockTransport(app)) as client:
with pytest.raises(NotImplementedError):
client.get(url, auth=auth)
def test_digest_auth_qop_must_be_auth_or_auth_int() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="not-auth")
with httpx.Client(transport=httpx.MockTransport(app)) as client:
with pytest.raises(httpx.ProtocolError):
client.get(url, auth=auth)
def test_digest_auth_incorrect_credentials() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(send_response_after_attempt=2)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 401
assert len(response.history) == 1
def test_digest_auth_reuses_challenge() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response_1 = client.get(url, auth=auth)
response_2 = client.get(url, auth=auth)
assert response_1.status_code == 200
assert response_2.status_code == 200
assert len(response_1.history) == 1
assert len(response_2.history) == 0
def test_digest_auth_resets_nonce_count_after_401() -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response_1 = client.get(url, auth=auth)
assert response_1.status_code == 200
assert len(response_1.history) == 1
first_nonce = parse_keqv_list(
response_1.request.headers["Authorization"].split(", ")
)["nonce"]
first_nc = parse_keqv_list(
response_1.request.headers["Authorization"].split(", ")
)["nc"]
# with this we now force a 401 on a subsequent (but initial) request
app.send_response_after_attempt = 2
# we expect the client again to try to authenticate,
# i.e. the history length must be 1
response_2 = client.get(url, auth=auth)
assert response_2.status_code == 200
assert len(response_2.history) == 1
second_nonce = parse_keqv_list(
response_2.request.headers["Authorization"].split(", ")
)["nonce"]
second_nc = parse_keqv_list(
response_2.request.headers["Authorization"].split(", ")
)["nc"]
assert first_nonce != second_nonce # ensures that the auth challenge was reset
assert (
first_nc == second_nc
) # ensures the nonce count is reset when the authentication failed
@pytest.mark.parametrize(
"auth_header",
[
'Digest realm="httpx@example.org", qop="auth"', # missing fields
'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list
],
)
def test_digest_auth_raises_protocol_error_on_malformed_header(
auth_header: str,
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = App(auth_header=auth_header, status_code=401)
with httpx.Client(transport=httpx.MockTransport(app)) as client:
with pytest.raises(httpx.ProtocolError):
client.get(url, auth=auth)
def test_auth_history() -> None:
"""
Test that intermediate requests sent as part of an authentication flow
are recorded in the response history.
"""
url = "https://example.org/"
auth = RepeatAuth(repeat=2)
app = App(auth_header="abc")
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "Repeat abc.abc"}
assert len(response.history) == 2
resp1, resp2 = response.history
assert resp1.json() == {"auth": "Repeat 0"}
assert resp2.json() == {"auth": "Repeat 1"}
assert len(resp2.history) == 1
assert resp2.history == [resp1]
assert len(resp1.history) == 0
class ConsumeBodyTransport(httpx.MockTransport):
def handle_request(self, request: httpx.Request) -> httpx.Response:
assert isinstance(request.stream, httpx.SyncByteStream)
for _ in request.stream:
pass
return self.handler(request) # type: ignore[return-value]
def test_digest_auth_unavailable_streaming_body():
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp()
def streaming_body() -> typing.Iterator[bytes]:
yield b"Example request body" # pragma: no cover
with httpx.Client(transport=ConsumeBodyTransport(app)) as client:
with pytest.raises(httpx.StreamConsumed):
client.post(url, content=streaming_body(), auth=auth)
def test_auth_reads_response_body() -> None:
"""
Test that we can read the response body in an auth flow if `requires_response_body`
is set.
"""
url = "https://example.org/"
auth = ResponseBodyAuth("xyz")
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": '{"auth":"xyz"}'}
def test_auth() -> None:
"""
Test that we can use an auth implementation specific to the async case, to
support cases that require performing I/O or using concurrency primitives (such
as checking a disk-based cache or fetching a token from a remote auth server).
"""
url = "https://example.org/"
auth = Auth()
app = App()
with httpx.Client(transport=httpx.MockTransport(app)) as client:
response = client.get(url, auth=auth)
assert response.status_code == 200
assert response.json() == {"auth": "auth"}

View File

@ -0,0 +1,339 @@
from __future__ import annotations
import typing
from datetime import timedelta
import pytest
import httpx
def test_get(server):
url = server.url
with httpx.Client(http2=True) as client:
response = client.get(url)
assert response.status_code == 200
assert response.text == "Hello, world!"
assert response.http_version == "HTTP/1.1"
assert response.headers
assert repr(response) == "<Response [200 OK]>"
assert response.elapsed > timedelta(seconds=0)
@pytest.mark.parametrize(
"url",
[
pytest.param("invalid://example.org", id="scheme-not-http(s)"),
pytest.param("://example.org", id="no-scheme"),
pytest.param("http://", id="no-host"),
],
)
def test_get_invalid_url(server, url):
with httpx.Client() as client:
with pytest.raises((httpx.UnsupportedProtocol, httpx.LocalProtocolError)):
client.get(url)
def test_build_request(server):
url = server.url.copy_with(path="/echo_headers")
headers = {"Custom-header": "value"}
with httpx.Client() as client:
request = client.build_request("GET", url)
request.headers.update(headers)
response = client.send(request)
assert response.status_code == 200
assert response.url == url
assert response.json()["Custom-header"] == "value"
def test_post(server):
url = server.url
with httpx.Client() as client:
response = client.post(url, content=b"Hello, world!")
assert response.status_code == 200
def test_post_json(server):
url = server.url
with httpx.Client() as client:
response = client.post(url, json={"text": "Hello, world!"})
assert response.status_code == 200
def test_stream_response(server):
with httpx.Client() as client:
with client.stream("GET", server.url) as response:
body = response.read()
assert response.status_code == 200
assert body == b"Hello, world!"
assert response.content == b"Hello, world!"
def test_access_content_stream_response(server):
with httpx.Client() as client:
with client.stream("GET", server.url) as response:
pass
assert response.status_code == 200
with pytest.raises(httpx.ResponseNotRead):
response.content # noqa: B018
def test_stream_request(server):
def hello_world() -> typing.Iterator[bytes]:
yield b"Hello, "
yield b"world!"
with httpx.Client() as client:
response = client.post(server.url, content=hello_world())
assert response.status_code == 200
def test_raise_for_status(server):
with httpx.Client() as client:
for status_code in (200, 400, 404, 500, 505):
response = client.request(
"GET", server.url.copy_with(path=f"/status/{status_code}")
)
if 400 <= status_code < 600:
with pytest.raises(httpx.HTTPStatusError) as exc_info:
response.raise_for_status()
assert exc_info.value.response == response
else:
assert response.raise_for_status() is response
def test_options(server):
with httpx.Client() as client:
response = client.options(server.url)
assert response.status_code == 200
assert response.text == "Hello, world!"
def test_head(server):
with httpx.Client() as client:
response = client.head(server.url)
assert response.status_code == 200
assert response.text == ""
def test_put(server):
with httpx.Client() as client:
response = client.put(server.url, content=b"Hello, world!")
assert response.status_code == 200
def test_patch(server):
with httpx.Client() as client:
response = client.patch(server.url, content=b"Hello, world!")
assert response.status_code == 200
def test_delete(server):
with httpx.Client() as client:
response = client.delete(server.url)
assert response.status_code == 200
assert response.text == "Hello, world!"
def test_100_continue(server):
headers = {"Expect": "100-continue"}
content = b"Echo request body"
with httpx.Client() as client:
response = client.post(
server.url.copy_with(path="/echo_body"), headers=headers, content=content
)
assert response.status_code == 200
assert response.content == content
def test_context_managed_transport():
class Transport(httpx.BaseTransport):
def __init__(self) -> None:
self.events: list[str] = []
def close(self):
# The base implementation of httpx.BaseTransport just
# calls into `.close`, so simple transport cases can just override
# this method for any cleanup, where more complex cases
# might want to additionally override `__enter__`/`__exit__`.
self.events.append("transport.close")
def __enter__(self):
super().__enter__()
self.events.append("transport.__enter__")
def __exit__(self, *args):
super().__exit__(*args)
self.events.append("transport.__exit__")
transport = Transport()
with httpx.Client(transport=transport):
pass
assert transport.events == [
"transport.__enter__",
"transport.close",
"transport.__exit__",
]
def test_context_managed_transport_and_mount():
class Transport(httpx.BaseTransport):
def __init__(self, name: str) -> None:
self.name: str = name
self.events: list[str] = []
def close(self):
# The base implementation of httpx.BaseTransport just
# calls into `.close`, so simple transport cases can just override
# this method for any cleanup, where more complex cases
# might want to additionally override `__enter__`/`__exit__`.
self.events.append(f"{self.name}.close")
def __enter__(self):
super().__enter__()
self.events.append(f"{self.name}.__enter__")
def __exit__(self, *args):
super().__exit__(*args)
self.events.append(f"{self.name}.__exit__")
transport = Transport(name="transport")
mounted = Transport(name="mounted")
with httpx.Client(transport=transport, mounts={"http://www.example.org": mounted}):
pass
assert transport.events == [
"transport.__enter__",
"transport.close",
"transport.__exit__",
]
assert mounted.events == [
"mounted.__enter__",
"mounted.close",
"mounted.__exit__",
]
def hello_world(request):
return httpx.Response(200, text="Hello, world!")
def test_client_closed_state_using_implicit_open():
client = httpx.Client(transport=httpx.MockTransport(hello_world))
assert not client.is_closed
client.get("http://example.com")
assert not client.is_closed
client.close()
assert client.is_closed
# Once we're close we cannot make any more requests.
with pytest.raises(RuntimeError):
client.get("http://example.com")
# Once we're closed we cannot reopen the client.
with pytest.raises(RuntimeError):
with client:
pass # pragma: no cover
def test_client_closed_state_using_with_block():
with httpx.Client(transport=httpx.MockTransport(hello_world)) as client:
assert not client.is_closed
client.get("http://example.com")
assert client.is_closed
with pytest.raises(RuntimeError):
client.get("http://example.com")
def unmounted(request: httpx.Request) -> httpx.Response:
data = {"app": "unmounted"}
return httpx.Response(200, json=data)
def mounted(request: httpx.Request) -> httpx.Response:
data = {"app": "mounted"}
return httpx.Response(200, json=data)
def test_mounted_transport():
transport = httpx.MockTransport(unmounted)
mounts = {"custom://": httpx.MockTransport(mounted)}
with httpx.Client(transport=transport, mounts=mounts) as client:
response = client.get("https://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "unmounted"}
response = client.get("custom://www.example.com")
assert response.status_code == 200
assert response.json() == {"app": "mounted"}
def test_mock_transport():
def hello_world(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, text="Hello, world!")
transport = httpx.MockTransport(hello_world)
with httpx.Client(transport=transport) as client:
response = client.get("https://www.example.com")
assert response.status_code == 200
assert response.text == "Hello, world!"
def test_cancellation_during_stream():
"""
If any BaseException is raised during streaming the response, then the
stream should be closed.
This includes:
* `asyncio.CancelledError` (A subclass of BaseException from Python 3.8 onwards.)
* `trio.Cancelled`
* `KeyboardInterrupt`
* `SystemExit`
See https://github.com/encode/httpx/issues/2139
"""
stream_was_closed = False
def response_with_cancel_during_stream(request):
class CancelledStream(httpx.SyncByteStream):
def __iter__(self) -> typing.Iterator[bytes]:
yield b"Hello"
raise KeyboardInterrupt()
yield b", world" # pragma: no cover
def close(self) -> None:
nonlocal stream_was_closed
stream_was_closed = True
return httpx.Response(
200, headers={"Content-Length": "12"}, stream=CancelledStream()
)
transport = httpx.MockTransport(response_with_cancel_during_stream)
with httpx.Client(transport=transport) as client:
with pytest.raises(KeyboardInterrupt):
client.get("https://www.example.com")
assert stream_was_closed
def test_server_extensions(server):
url = server.url
with httpx.Client(http2=True) as client:
response = client.get(url)
assert response.status_code == 200
assert response.extensions["http_version"] == b"HTTP/1.1"

View File

@ -0,0 +1,167 @@
from http.cookiejar import Cookie, CookieJar
import pytest
import httpx
def get_and_set_cookies(request: httpx.Request) -> httpx.Response:
if request.url.path == "/echo_cookies":
data = {"cookies": request.headers.get("cookie")}
return httpx.Response(200, json=data)
elif request.url.path == "/set_cookie":
return httpx.Response(200, headers={"set-cookie": "example-name=example-value"})
else:
raise NotImplementedError() # pragma: no cover
def test_set_cookie() -> None:
"""
Send a request including a cookie.
"""
url = "http://example.org/echo_cookies"
cookies = {"example-name": "example-value"}
with httpx.Client(
cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies)
) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"cookies": "example-name=example-value"}
def test_set_per_request_cookie_is_deprecated() -> None:
"""
Sending a request including a per-request cookie is deprecated.
"""
url = "http://example.org/echo_cookies"
cookies = {"example-name": "example-value"}
with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client:
with pytest.warns(DeprecationWarning):
response = client.get(url, cookies=cookies)
assert response.status_code == 200
assert response.json() == {"cookies": "example-name=example-value"}
def test_set_cookie_with_cookiejar() -> None:
"""
Send a request including a cookie, using a `CookieJar` instance.
"""
url = "http://example.org/echo_cookies"
cookies = CookieJar()
cookie = Cookie(
version=0,
name="example-name",
value="example-value",
port=None,
port_specified=False,
domain="",
domain_specified=False,
domain_initial_dot=False,
path="/",
path_specified=True,
secure=False,
expires=None,
discard=True,
comment=None,
comment_url=None,
rest={"HttpOnly": ""},
rfc2109=False,
)
cookies.set_cookie(cookie)
with httpx.Client(
cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies)
) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"cookies": "example-name=example-value"}
def test_setting_client_cookies_to_cookiejar() -> None:
"""
Send a request including a cookie, using a `CookieJar` instance.
"""
url = "http://example.org/echo_cookies"
cookies = CookieJar()
cookie = Cookie(
version=0,
name="example-name",
value="example-value",
port=None,
port_specified=False,
domain="",
domain_specified=False,
domain_initial_dot=False,
path="/",
path_specified=True,
secure=False,
expires=None,
discard=True,
comment=None,
comment_url=None,
rest={"HttpOnly": ""},
rfc2109=False,
)
cookies.set_cookie(cookie)
with httpx.Client(
cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies)
) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"cookies": "example-name=example-value"}
def test_set_cookie_with_cookies_model() -> None:
"""
Send a request including a cookie, using a `Cookies` instance.
"""
url = "http://example.org/echo_cookies"
cookies = httpx.Cookies()
cookies["example-name"] = "example-value"
with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client:
client.cookies = cookies
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"cookies": "example-name=example-value"}
def test_get_cookie() -> None:
url = "http://example.org/set_cookie"
with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client:
response = client.get(url)
assert response.status_code == 200
assert response.cookies["example-name"] == "example-value"
assert client.cookies["example-name"] == "example-value"
def test_cookie_persistence() -> None:
"""
Ensure that Client instances persist cookies between requests.
"""
with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client:
response = client.get("http://example.org/echo_cookies")
assert response.status_code == 200
assert response.json() == {"cookies": None}
response = client.get("http://example.org/set_cookie")
assert response.status_code == 200
assert response.cookies["example-name"] == "example-value"
assert client.cookies["example-name"] == "example-value"
response = client.get("http://example.org/echo_cookies")
assert response.status_code == 200
assert response.json() == {"cookies": "example-name=example-value"}

View File

@ -0,0 +1,117 @@
import httpx
def app(request: httpx.Request) -> httpx.Response:
if request.url.path == "/redirect":
return httpx.Response(303, headers={"server": "testserver", "location": "/"})
elif request.url.path.startswith("/status/"):
status_code = int(request.url.path[-3:])
return httpx.Response(status_code, headers={"server": "testserver"})
return httpx.Response(200, headers={"server": "testserver"})
def test_event_hooks():
events = []
def on_request(request):
events.append({"event": "request", "headers": dict(request.headers)})
def on_response(response):
events.append({"event": "response", "headers": dict(response.headers)})
event_hooks = {"request": [on_request], "response": [on_response]}
with httpx.Client(
event_hooks=event_hooks, transport=httpx.MockTransport(app)
) as http:
http.get("http://127.0.0.1:8000/", auth=("username", "password"))
assert events == [
{
"event": "request",
"headers": {
"host": "127.0.0.1:8000",
"user-agent": f"python-httpx/{httpx.__version__}",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
},
},
{
"event": "response",
"headers": {"server": "testserver"},
},
]
def test_event_hooks_raising_exception():
def raise_on_4xx_5xx(response):
response.raise_for_status()
event_hooks = {"response": [raise_on_4xx_5xx]}
with httpx.Client(
event_hooks=event_hooks, transport=httpx.MockTransport(app)
) as http:
try:
http.get("http://127.0.0.1:8000/status/400")
except httpx.HTTPStatusError as exc:
assert exc.response.is_closed
def test_event_hooks_with_redirect():
"""
A redirect request should trigger additional 'request' and 'response' event hooks.
"""
events = []
def on_request(request):
events.append({"event": "request", "headers": dict(request.headers)})
def on_response(response):
events.append({"event": "response", "headers": dict(response.headers)})
event_hooks = {"request": [on_request], "response": [on_response]}
with httpx.Client(
event_hooks=event_hooks,
transport=httpx.MockTransport(app),
follow_redirects=True,
) as http:
http.get("http://127.0.0.1:8000/redirect", auth=("username", "password"))
assert events == [
{
"event": "request",
"headers": {
"host": "127.0.0.1:8000",
"user-agent": f"python-httpx/{httpx.__version__}",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
},
},
{
"event": "response",
"headers": {"location": "/", "server": "testserver"},
},
{
"event": "request",
"headers": {
"host": "127.0.0.1:8000",
"user-agent": f"python-httpx/{httpx.__version__}",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
},
},
{
"event": "response",
"headers": {"server": "testserver"},
},
]

View File

@ -0,0 +1,297 @@
#!/usr/bin/env python3
import pytest
import httpx
def echo_headers(request: httpx.Request) -> httpx.Response:
data = {"headers": dict(request.headers)}
return httpx.Response(200, json=data)
def echo_repeated_headers_multi_items(request: httpx.Request) -> httpx.Response:
data = {"headers": list(request.headers.multi_items())}
return httpx.Response(200, json=data)
def echo_repeated_headers_items(request: httpx.Request) -> httpx.Response:
data = {"headers": list(request.headers.items())}
return httpx.Response(200, json=data)
def test_client_header():
"""
Set a header in the Client.
"""
url = "http://example.org/echo_headers"
headers = {"Example-Header": "example-value"}
with httpx.Client(
transport=httpx.MockTransport(echo_headers), headers=headers
) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"example-header": "example-value",
"host": "example.org",
"user-agent": f"python-httpx/{httpx.__version__}",
}
}
def test_header_merge():
url = "http://example.org/echo_headers"
client_headers = {"User-Agent": "python-myclient/0.2.1"}
request_headers = {"X-Auth-Token": "FooBarBazToken"}
with httpx.Client(
transport=httpx.MockTransport(echo_headers), headers=client_headers
) as client:
response = client.get(url, headers=request_headers)
assert response.status_code == 200
assert response.json() == {
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"host": "example.org",
"user-agent": "python-myclient/0.2.1",
"x-auth-token": "FooBarBazToken",
}
}
def test_header_merge_conflicting_headers():
url = "http://example.org/echo_headers"
client_headers = {"X-Auth-Token": "FooBar"}
request_headers = {"X-Auth-Token": "BazToken"}
with httpx.Client(
transport=httpx.MockTransport(echo_headers), headers=client_headers
) as client:
response = client.get(url, headers=request_headers)
assert response.status_code == 200
assert response.json() == {
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"host": "example.org",
"user-agent": f"python-httpx/{httpx.__version__}",
"x-auth-token": "BazToken",
}
}
def test_header_update():
url = "http://example.org/echo_headers"
with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client:
first_response = client.get(url)
client.headers.update(
{"User-Agent": "python-myclient/0.2.1", "Another-Header": "AThing"}
)
second_response = client.get(url)
assert first_response.status_code == 200
assert first_response.json() == {
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"host": "example.org",
"user-agent": f"python-httpx/{httpx.__version__}",
}
}
assert second_response.status_code == 200
assert second_response.json() == {
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"another-header": "AThing",
"connection": "keep-alive",
"host": "example.org",
"user-agent": "python-myclient/0.2.1",
}
}
def test_header_repeated_items():
url = "http://example.org/echo_headers"
with httpx.Client(
transport=httpx.MockTransport(echo_repeated_headers_items)
) as client:
response = client.get(url, headers=[("x-header", "1"), ("x-header", "2,3")])
assert response.status_code == 200
echoed_headers = response.json()["headers"]
# as per RFC 7230, the whitespace after a comma is insignificant
# so we split and strip here so that we can do a safe comparison
assert ["x-header", ["1", "2", "3"]] in [
[k, [subv.lstrip() for subv in v.split(",")]] for k, v in echoed_headers
]
def test_header_repeated_multi_items():
url = "http://example.org/echo_headers"
with httpx.Client(
transport=httpx.MockTransport(echo_repeated_headers_multi_items)
) as client:
response = client.get(url, headers=[("x-header", "1"), ("x-header", "2,3")])
assert response.status_code == 200
echoed_headers = response.json()["headers"]
assert ["x-header", "1"] in echoed_headers
assert ["x-header", "2,3"] in echoed_headers
def test_remove_default_header():
"""
Remove a default header from the Client.
"""
url = "http://example.org/echo_headers"
with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client:
del client.headers["User-Agent"]
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"host": "example.org",
}
}
def test_header_does_not_exist():
headers = httpx.Headers({"foo": "bar"})
with pytest.raises(KeyError):
del headers["baz"]
def test_header_with_incorrect_value():
with pytest.raises(
TypeError,
match=f"Header value must be str or bytes, not {type(None)}",
):
httpx.Headers({"foo": None}) # type: ignore
def test_host_with_auth_and_port_in_url():
"""
The Host header should only include the hostname, or hostname:port
(for non-default ports only). Any userinfo or default port should not
be present.
"""
url = "http://username:password@example.org:80/echo_headers"
with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"host": "example.org",
"user-agent": f"python-httpx/{httpx.__version__}",
"authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
}
}
def test_host_with_non_default_port_in_url():
"""
If the URL includes a non-default port, then it should be included in
the Host header.
"""
url = "http://username:password@example.org:123/echo_headers"
with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"connection": "keep-alive",
"host": "example.org:123",
"user-agent": f"python-httpx/{httpx.__version__}",
"authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
}
}
def test_request_auto_headers():
request = httpx.Request("GET", "https://www.example.org/")
assert "host" in request.headers
def test_same_origin():
origin = httpx.URL("https://example.com")
request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443")
with httpx.Client() as client:
headers = client._redirect_headers(request, origin, "GET")
assert headers["Host"] == request.url.netloc.decode("ascii")
def test_not_same_origin():
origin = httpx.URL("https://example.com")
request = httpx.Request("GET", "HTTP://EXAMPLE.COM:80")
with httpx.Client() as client:
headers = client._redirect_headers(request, origin, "GET")
assert headers["Host"] == origin.netloc.decode("ascii")
def test_is_https_redirect():
url = httpx.URL("https://example.com")
request = httpx.Request(
"GET", "http://example.com", headers={"Authorization": "empty"}
)
with httpx.Client() as client:
headers = client._redirect_headers(request, url, "GET")
assert "Authorization" in headers
def test_is_not_https_redirect():
url = httpx.URL("https://www.example.com")
request = httpx.Request(
"GET", "http://example.com", headers={"Authorization": "empty"}
)
with httpx.Client() as client:
headers = client._redirect_headers(request, url, "GET")
assert "Authorization" not in headers
def test_is_not_https_redirect_if_not_default_ports():
url = httpx.URL("https://example.com:1337")
request = httpx.Request(
"GET", "http://example.com:9999", headers={"Authorization": "empty"}
)
with httpx.Client() as client:
headers = client._redirect_headers(request, url, "GET")
assert "Authorization" not in headers

View File

@ -0,0 +1,67 @@
import httpx
def test_client_base_url():
with httpx.Client() as client:
client.base_url = "https://www.example.org/" # type: ignore
assert isinstance(client.base_url, httpx.URL)
assert client.base_url == "https://www.example.org/"
def test_client_base_url_without_trailing_slash():
with httpx.Client() as client:
client.base_url = "https://www.example.org/path" # type: ignore
assert isinstance(client.base_url, httpx.URL)
assert client.base_url == "https://www.example.org/path/"
def test_client_base_url_with_trailing_slash():
client = httpx.Client()
client.base_url = "https://www.example.org/path/" # type: ignore
assert isinstance(client.base_url, httpx.URL)
assert client.base_url == "https://www.example.org/path/"
def test_client_headers():
with httpx.Client() as client:
client.headers = {"a": "b"} # type: ignore
assert isinstance(client.headers, httpx.Headers)
assert client.headers["A"] == "b"
def test_client_cookies():
with httpx.Client() as client:
client.cookies = {"a": "b"} # type: ignore
assert isinstance(client.cookies, httpx.Cookies)
mycookies = list(client.cookies.jar)
assert len(mycookies) == 1
assert mycookies[0].name == "a" and mycookies[0].value == "b"
def test_client_timeout():
expected_timeout = 12.0
with httpx.Client() as client:
client.timeout = expected_timeout # type: ignore
assert isinstance(client.timeout, httpx.Timeout)
assert client.timeout.connect == expected_timeout
assert client.timeout.read == expected_timeout
assert client.timeout.write == expected_timeout
assert client.timeout.pool == expected_timeout
def test_client_event_hooks():
def on_request(request):
pass # pragma: no cover
with httpx.Client() as client:
client.event_hooks = {"request": [on_request]}
assert client.event_hooks == {"request": [on_request], "response": []}
def test_client_trust_env():
with httpx.Client() as client:
assert client.trust_env
with httpx.Client(trust_env=False) as client:
assert not client.trust_env

View File

@ -0,0 +1,247 @@
import httpcore
import pytest
import httpx
def url_to_origin(url: str) -> httpcore.URL:
"""
Given a URL string, return the origin in the raw tuple format that
`httpcore` uses for it's representation.
"""
u = httpx.URL(url)
return httpcore.URL(scheme=u.raw_scheme, host=u.raw_host, port=u.port, target="/")
def test_socks_proxy():
url = httpx.URL("http://www.example.com")
for proxy in ("socks5://localhost/", "socks5h://localhost/"):
with httpx.Client(proxy=proxy) as client:
transport = client._transport_for_url(url)
assert isinstance(transport, httpx.HTTPTransport)
assert isinstance(transport._pool, httpcore.SOCKSProxy)
PROXY_URL = "http://[::1]"
@pytest.mark.parametrize(
["url", "proxies", "expected"],
[
("http://example.com", {}, None),
("http://example.com", {"https://": PROXY_URL}, None),
("http://example.com", {"http://example.net": PROXY_URL}, None),
# Using "*" should match any domain name.
("http://example.com", {"http://*": PROXY_URL}, PROXY_URL),
("https://example.com", {"http://*": PROXY_URL}, None),
# Using "example.com" should match example.com, but not www.example.com
("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://www.example.com", {"http://example.com": PROXY_URL}, None),
# Using "*.example.com" should match www.example.com, but not example.com
("http://example.com", {"http://*.example.com": PROXY_URL}, None),
("http://www.example.com", {"http://*.example.com": PROXY_URL}, PROXY_URL),
# Using "*example.com" should match example.com and www.example.com
("http://example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
("http://www.example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
("http://wwwexample.com", {"http://*example.com": PROXY_URL}, None),
# ...
("http://example.com:443", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"all://": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://": PROXY_URL}, PROXY_URL),
("http://example.com", {"all://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
("http://example.com", {"http://example.com:80": PROXY_URL}, PROXY_URL),
("http://example.com:8080", {"http://example.com:8080": PROXY_URL}, PROXY_URL),
("http://example.com:8080", {"http://example.com": PROXY_URL}, PROXY_URL),
(
"http://example.com",
{
"all://": PROXY_URL + ":1",
"http://": PROXY_URL + ":2",
"all://example.com": PROXY_URL + ":3",
"http://example.com": PROXY_URL + ":4",
},
PROXY_URL + ":4",
),
(
"http://example.com",
{
"all://": PROXY_URL + ":1",
"http://": PROXY_URL + ":2",
"all://example.com": PROXY_URL + ":3",
},
PROXY_URL + ":3",
),
(
"http://example.com",
{"all://": PROXY_URL + ":1", "http://": PROXY_URL + ":2"},
PROXY_URL + ":2",
),
],
)
def test_transport_for_request(url, proxies, expected):
mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
with httpx.Client(mounts=mounts) as client:
transport = client._transport_for_url(httpx.URL(url))
if expected is None:
assert transport is client._transport
else:
assert isinstance(transport, httpx.HTTPTransport)
assert isinstance(transport._pool, httpcore.HTTPProxy)
assert transport._pool._proxy_url == url_to_origin(expected)
@pytest.mark.network
def test_proxy_close():
try:
transport = httpx.HTTPTransport(proxy=PROXY_URL)
client = httpx.Client(mounts={"https://": transport})
client.get("http://example.com")
finally:
client.close()
def test_unsupported_proxy_scheme():
with pytest.raises(ValueError):
httpx.Client(proxy="ftp://127.0.0.1")
@pytest.mark.parametrize(
["url", "env", "expected"],
[
("http://google.com", {}, None),
(
"http://google.com",
{"HTTP_PROXY": "http://example.com"},
"http://example.com",
),
# Auto prepend http scheme
("http://google.com", {"HTTP_PROXY": "example.com"}, "http://example.com"),
(
"http://google.com",
{"HTTP_PROXY": "http://example.com", "NO_PROXY": "google.com"},
None,
),
# Everything proxied when NO_PROXY is empty/unset
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": ""},
"http://localhost:123",
),
# Not proxied if NO_PROXY matches URL.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "127.0.0.1"},
None,
),
# Proxied if NO_PROXY scheme does not match URL.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "https://127.0.0.1"},
"http://localhost:123",
),
# Proxied if NO_PROXY scheme does not match host.
(
"http://127.0.0.1",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "1.1.1.1"},
"http://localhost:123",
),
# Not proxied if NO_PROXY matches host domain suffix.
(
"http://courses.mit.edu",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
None,
),
# Proxied even though NO_PROXY matches host domain *prefix*.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
"http://localhost:123",
),
# Not proxied if one item in NO_PROXY case matches host domain suffix.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,edu.info"},
None,
),
# Not proxied if one item in NO_PROXY case matches host domain suffix.
# May include whitespace.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu, edu.info"},
None,
),
# Proxied if no items in NO_PROXY match.
(
"https://mit.edu.info",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,mit.info"},
"http://localhost:123",
),
# Proxied if NO_PROXY domain doesn't match.
(
"https://foo.example.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "www.example.com"},
"http://localhost:123",
),
# Not proxied for subdomains matching NO_PROXY, with a leading ".".
(
"https://www.example1.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": ".example1.com"},
None,
),
# Proxied, because NO_PROXY subdomains only match if "." separated.
(
"https://www.example2.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "ample2.com"},
"http://localhost:123",
),
# No requests are proxied if NO_PROXY="*" is set.
(
"https://www.example3.com",
{"ALL_PROXY": "http://localhost:123", "NO_PROXY": "*"},
None,
),
],
)
def test_proxies_environ(monkeypatch, url, env, expected):
for name, value in env.items():
monkeypatch.setenv(name, value)
with httpx.Client() as client:
transport = client._transport_for_url(httpx.URL(url))
if expected is None:
assert transport == client._transport
else:
assert transport._pool._proxy_url == url_to_origin(expected) # type: ignore
@pytest.mark.parametrize(
["proxies", "is_valid"],
[
({"http": "http://127.0.0.1"}, False),
({"https": "http://127.0.0.1"}, False),
({"all": "http://127.0.0.1"}, False),
({"http://": "http://127.0.0.1"}, True),
({"https://": "http://127.0.0.1"}, True),
({"all://": "http://127.0.0.1"}, True),
],
)
def test_for_deprecated_proxy_params(proxies, is_valid):
mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
if not is_valid:
with pytest.raises(ValueError):
httpx.Client(mounts=mounts)
else:
httpx.Client(mounts=mounts)
def test_proxy_with_mounts():
proxy_transport = httpx.HTTPTransport(proxy="http://127.0.0.1")
with httpx.Client(mounts={"http://": proxy_transport}) as client:
transport = client._transport_for_url(httpx.URL("http://example.com"))
assert transport == proxy_transport

View File

@ -0,0 +1,37 @@
import httpx
def hello_world(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, text="Hello, world")
def test_client_queryparams():
client = httpx.Client(params={"a": "b"})
assert isinstance(client.params, httpx.QueryParams)
assert client.params["a"] == "b"
def test_client_queryparams_string():
with httpx.Client(params="a=b") as client:
assert isinstance(client.params, httpx.QueryParams)
assert client.params["a"] == "b"
with httpx.Client() as client:
client.params = "a=b" # type: ignore
assert isinstance(client.params, httpx.QueryParams)
assert client.params["a"] == "b"
def test_client_queryparams_echo():
url = "http://example.org/echo_queryparams"
client_queryparams = "first=str"
request_queryparams = {"second": "dict"}
with httpx.Client(
transport=httpx.MockTransport(hello_world), params=client_queryparams
) as client:
response = client.get(url, params=request_queryparams)
assert response.status_code == 200
assert (
response.url == "http://example.org/echo_queryparams?first=str&second=dict"
)

View File

@ -0,0 +1,426 @@
import typing
import pytest
import httpx
def redirects(request: httpx.Request) -> httpx.Response:
if request.url.scheme not in ("http", "https"):
raise httpx.UnsupportedProtocol(f"Scheme {request.url.scheme!r} not supported.")
if request.url.path == "/redirect_301":
status_code = httpx.codes.MOVED_PERMANENTLY
content = b"<a href='https://example.org/'>here</a>"
headers = {"location": "https://example.org/"}
return httpx.Response(status_code, headers=headers, content=content)
elif request.url.path == "/redirect_302":
status_code = httpx.codes.FOUND
headers = {"location": "https://example.org/"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/redirect_303":
status_code = httpx.codes.SEE_OTHER
headers = {"location": "https://example.org/"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/relative_redirect":
status_code = httpx.codes.SEE_OTHER
headers = {"location": "/"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/malformed_redirect":
status_code = httpx.codes.SEE_OTHER
headers = {"location": "https://:443/"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/invalid_redirect":
status_code = httpx.codes.SEE_OTHER
raw_headers = [(b"location", "https://😇/".encode("utf-8"))]
return httpx.Response(status_code, headers=raw_headers)
elif request.url.path == "/no_scheme_redirect":
status_code = httpx.codes.SEE_OTHER
headers = {"location": "//example.org/"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/multiple_redirects":
params = httpx.QueryParams(request.url.query)
count = int(params.get("count", "0"))
redirect_count = count - 1
status_code = httpx.codes.SEE_OTHER if count else httpx.codes.OK
if count:
location = "/multiple_redirects"
if redirect_count:
location += f"?count={redirect_count}"
headers = {"location": location}
else:
headers = {}
return httpx.Response(status_code, headers=headers)
if request.url.path == "/redirect_loop":
status_code = httpx.codes.SEE_OTHER
headers = {"location": "/redirect_loop"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/cross_domain":
status_code = httpx.codes.SEE_OTHER
headers = {"location": "https://example.org/cross_domain_target"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/cross_domain_target":
status_code = httpx.codes.OK
data = {
"body": request.content.decode("ascii"),
"headers": dict(request.headers),
}
return httpx.Response(status_code, json=data)
elif request.url.path == "/redirect_body":
status_code = httpx.codes.PERMANENT_REDIRECT
headers = {"location": "/redirect_body_target"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/redirect_no_body":
status_code = httpx.codes.SEE_OTHER
headers = {"location": "/redirect_body_target"}
return httpx.Response(status_code, headers=headers)
elif request.url.path == "/redirect_body_target":
data = {
"body": request.content.decode("ascii"),
"headers": dict(request.headers),
}
return httpx.Response(200, json=data)
elif request.url.path == "/cross_subdomain":
if request.headers["Host"] != "www.example.org":
status_code = httpx.codes.PERMANENT_REDIRECT
headers = {"location": "https://www.example.org/cross_subdomain"}
return httpx.Response(status_code, headers=headers)
else:
return httpx.Response(200, text="Hello, world!")
elif request.url.path == "/redirect_custom_scheme":
status_code = httpx.codes.MOVED_PERMANENTLY
headers = {"location": "market://details?id=42"}
return httpx.Response(status_code, headers=headers)
if request.method == "HEAD":
return httpx.Response(200)
return httpx.Response(200, html="<html><body>Hello, world!</body></html>")
def test_redirect_301():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.post(
"https://example.org/redirect_301", follow_redirects=True
)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/"
assert len(response.history) == 1
def test_redirect_302():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.post(
"https://example.org/redirect_302", follow_redirects=True
)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/"
assert len(response.history) == 1
def test_redirect_303():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.get("https://example.org/redirect_303", follow_redirects=True)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/"
assert len(response.history) == 1
def test_next_request():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
request = client.build_request("POST", "https://example.org/redirect_303")
response = client.send(request, follow_redirects=False)
assert response.status_code == httpx.codes.SEE_OTHER
assert response.url == "https://example.org/redirect_303"
assert response.next_request is not None
response = client.send(response.next_request, follow_redirects=False)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/"
assert response.next_request is None
def test_head_redirect():
"""
Contrary to Requests, redirects remain enabled by default for HEAD requests.
"""
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.head(
"https://example.org/redirect_302", follow_redirects=True
)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/"
assert response.request.method == "HEAD"
assert len(response.history) == 1
assert response.text == ""
def test_relative_redirect():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.get(
"https://example.org/relative_redirect", follow_redirects=True
)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/"
assert len(response.history) == 1
def test_malformed_redirect():
# https://github.com/encode/httpx/issues/771
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.get(
"http://example.org/malformed_redirect", follow_redirects=True
)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org:443/"
assert len(response.history) == 1
def test_no_scheme_redirect():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.get(
"https://example.org/no_scheme_redirect", follow_redirects=True
)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/"
assert len(response.history) == 1
def test_fragment_redirect():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.get(
"https://example.org/relative_redirect#fragment", follow_redirects=True
)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/#fragment"
assert len(response.history) == 1
def test_multiple_redirects():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
response = client.get(
"https://example.org/multiple_redirects?count=20", follow_redirects=True
)
assert response.status_code == httpx.codes.OK
assert response.url == "https://example.org/multiple_redirects"
assert len(response.history) == 20
assert (
response.history[0].url == "https://example.org/multiple_redirects?count=20"
)
assert (
response.history[1].url == "https://example.org/multiple_redirects?count=19"
)
assert len(response.history[0].history) == 0
assert len(response.history[1].history) == 1
def test_too_many_redirects():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
with pytest.raises(httpx.TooManyRedirects):
client.get(
"https://example.org/multiple_redirects?count=21", follow_redirects=True
)
def test_redirect_loop():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
with pytest.raises(httpx.TooManyRedirects):
client.get("https://example.org/redirect_loop", follow_redirects=True)
def test_cross_domain_redirect_with_auth_header():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "https://example.com/cross_domain"
headers = {"Authorization": "abc"}
response = client.get(url, headers=headers, follow_redirects=True)
assert response.url == "https://example.org/cross_domain_target"
assert "authorization" not in response.json()["headers"]
def test_cross_domain_https_redirect_with_auth_header():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "http://example.com/cross_domain"
headers = {"Authorization": "abc"}
response = client.get(url, headers=headers, follow_redirects=True)
assert response.url == "https://example.org/cross_domain_target"
assert "authorization" not in response.json()["headers"]
def test_cross_domain_redirect_with_auth():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "https://example.com/cross_domain"
response = client.get(url, auth=("user", "pass"), follow_redirects=True)
assert response.url == "https://example.org/cross_domain_target"
assert "authorization" not in response.json()["headers"]
def test_same_domain_redirect():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "https://example.org/cross_domain"
headers = {"Authorization": "abc"}
response = client.get(url, headers=headers, follow_redirects=True)
assert response.url == "https://example.org/cross_domain_target"
assert response.json()["headers"]["authorization"] == "abc"
def test_same_domain_https_redirect_with_auth_header():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "http://example.org/cross_domain"
headers = {"Authorization": "abc"}
response = client.get(url, headers=headers, follow_redirects=True)
assert response.url == "https://example.org/cross_domain_target"
assert response.json()["headers"]["authorization"] == "abc"
def test_body_redirect():
"""
A 308 redirect should preserve the request body.
"""
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "https://example.org/redirect_body"
content = b"Example request body"
response = client.post(url, content=content, follow_redirects=True)
assert response.url == "https://example.org/redirect_body_target"
assert response.json()["body"] == "Example request body"
assert "content-length" in response.json()["headers"]
def test_no_body_redirect():
"""
A 303 redirect should remove the request body.
"""
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "https://example.org/redirect_no_body"
content = b"Example request body"
response = client.post(url, content=content, follow_redirects=True)
assert response.url == "https://example.org/redirect_body_target"
assert response.json()["body"] == ""
assert "content-length" not in response.json()["headers"]
def test_can_stream_if_no_redirect():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "https://example.org/redirect_301"
with client.stream("GET", url, follow_redirects=False) as response:
pass
assert response.status_code == httpx.codes.MOVED_PERMANENTLY
assert response.headers["location"] == "https://example.org/"
class ConsumeBodyTransport(httpx.MockTransport):
def handle_request(self, request: httpx.Request) -> httpx.Response:
assert isinstance(request.stream, httpx.SyncByteStream)
for _ in request.stream:
pass
return self.handler(request) # type: ignore[return-value]
def test_cannot_redirect_streaming_body():
with httpx.Client(transport=ConsumeBodyTransport(redirects)) as client:
url = "https://example.org/redirect_body"
def streaming_body() -> typing.Iterator[bytes]:
yield b"Example request body" # pragma: no cover
with pytest.raises(httpx.StreamConsumed):
client.post(url, content=streaming_body(), follow_redirects=True)
def test_cross_subdomain_redirect():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
url = "https://example.com/cross_subdomain"
response = client.get(url, follow_redirects=True)
assert response.url == "https://www.example.org/cross_subdomain"
def cookie_sessions(request: httpx.Request) -> httpx.Response:
if request.url.path == "/":
cookie = request.headers.get("Cookie")
if cookie is not None:
content = b"Logged in"
else:
content = b"Not logged in"
return httpx.Response(200, content=content)
elif request.url.path == "/login":
status_code = httpx.codes.SEE_OTHER
headers = {
"location": "/",
"set-cookie": (
"session=eyJ1c2VybmFtZSI6ICJ0b21; path=/; Max-Age=1209600; "
"httponly; samesite=lax"
),
}
return httpx.Response(status_code, headers=headers)
else:
assert request.url.path == "/logout"
status_code = httpx.codes.SEE_OTHER
headers = {
"location": "/",
"set-cookie": (
"session=null; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT; "
"httponly; samesite=lax"
),
}
return httpx.Response(status_code, headers=headers)
def test_redirect_cookie_behavior():
with httpx.Client(
transport=httpx.MockTransport(cookie_sessions), follow_redirects=True
) as client:
# The client is not logged in.
response = client.get("https://example.com/")
assert response.url == "https://example.com/"
assert response.text == "Not logged in"
# Login redirects to the homepage, setting a session cookie.
response = client.post("https://example.com/login")
assert response.url == "https://example.com/"
assert response.text == "Logged in"
# The client is logged in.
response = client.get("https://example.com/")
assert response.url == "https://example.com/"
assert response.text == "Logged in"
# Logout redirects to the homepage, expiring the session cookie.
response = client.post("https://example.com/logout")
assert response.url == "https://example.com/"
assert response.text == "Not logged in"
# The client is not logged in.
response = client.get("https://example.com/")
assert response.url == "https://example.com/"
assert response.text == "Not logged in"
def test_redirect_custom_scheme():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
with pytest.raises(httpx.UnsupportedProtocol) as e:
client.post(
"https://example.org/redirect_custom_scheme", follow_redirects=True
)
assert str(e.value) == "Scheme 'market' not supported."
def test_invalid_redirect():
with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
with pytest.raises(httpx.RemoteProtocolError):
client.get("http://example.org/invalid_redirect", follow_redirects=True)