Typing: enable disallow_untyped_calls (#2479)
* Typing: enable disallow_untyped_calls Only the test suite needed adjusting to add type hints. * Update setup.cfg Co-authored-by: Tom Christie <tom@tomchristie.com>
This commit is contained in:
parent
884a69a902
commit
933551c519
16
setup.cfg
16
setup.cfg
@ -3,22 +3,8 @@ ignore = W503, E203, B305, PIE801
|
||||
max-line-length = 120
|
||||
|
||||
[mypy]
|
||||
disallow_untyped_defs = True
|
||||
disallow_any_generics = True
|
||||
ignore_missing_imports = True
|
||||
no_implicit_optional = True
|
||||
show_error_codes = True
|
||||
warn_unused_ignores = True
|
||||
warn_unused_configs = True
|
||||
disallow_subclassing_any = True
|
||||
check_untyped_defs = True
|
||||
disallow_untyped_decorators = True
|
||||
warn_redundant_casts = True
|
||||
strict_concatenate = True
|
||||
disallow_incomplete_defs = True
|
||||
no_implicit_reexport = True
|
||||
warn_return_any = True
|
||||
strict_equality = True
|
||||
strict = True
|
||||
|
||||
[mypy-tests.*]
|
||||
disallow_untyped_defs = False
|
||||
|
||||
@ -89,7 +89,7 @@ async def test_access_content_stream_response(server):
|
||||
|
||||
@pytest.mark.usefixtures("async_environment")
|
||||
async def test_stream_request(server):
|
||||
async def hello_world():
|
||||
async def hello_world() -> typing.AsyncIterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
@ -100,7 +100,7 @@ async def test_stream_request(server):
|
||||
|
||||
@pytest.mark.usefixtures("async_environment")
|
||||
async def test_cannot_stream_sync_request(server):
|
||||
def hello_world(): # pragma: no cover
|
||||
def hello_world() -> typing.Iterator[bytes]: # pragma: no cover
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
@ -180,8 +180,8 @@ async def test_100_continue(server):
|
||||
@pytest.mark.usefixtures("async_environment")
|
||||
async def test_context_managed_transport():
|
||||
class Transport(httpx.AsyncBaseTransport):
|
||||
def __init__(self):
|
||||
self.events = []
|
||||
def __init__(self) -> None:
|
||||
self.events: typing.List[str] = []
|
||||
|
||||
async def aclose(self):
|
||||
# The base implementation of httpx.AsyncBaseTransport just
|
||||
|
||||
@ -113,7 +113,7 @@ class ResponseBodyAuth(Auth):
|
||||
|
||||
requires_response_body = True
|
||||
|
||||
def __init__(self, token):
|
||||
def __init__(self, token: str) -> None:
|
||||
self.token = token
|
||||
|
||||
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
|
||||
@ -689,7 +689,7 @@ async def test_digest_auth_unavailable_streaming_body():
|
||||
auth = DigestAuth(username="user", password="password123")
|
||||
app = DigestApp()
|
||||
|
||||
async def streaming_body():
|
||||
async def streaming_body() -> typing.AsyncIterator[bytes]:
|
||||
yield b"Example request body" # pragma: no cover
|
||||
|
||||
async with httpx.AsyncClient(transport=ConsumeBodyTransport(app)) as client:
|
||||
|
||||
@ -120,7 +120,7 @@ def test_raw_iterator(server):
|
||||
|
||||
|
||||
def test_cannot_stream_async_request(server):
|
||||
async def hello_world(): # pragma: no cover
|
||||
async def hello_world() -> typing.AsyncIterator[bytes]: # pragma: no cover
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
@ -229,8 +229,8 @@ def test_merge_relative_url_with_encoded_slashes():
|
||||
|
||||
def test_context_managed_transport():
|
||||
class Transport(httpx.BaseTransport):
|
||||
def __init__(self):
|
||||
self.events = []
|
||||
def __init__(self) -> None:
|
||||
self.events: typing.List[str] = []
|
||||
|
||||
def close(self):
|
||||
# The base implementation of httpx.BaseTransport just
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import typing
|
||||
|
||||
import pytest
|
||||
|
||||
import httpx
|
||||
@ -351,7 +353,7 @@ def test_cannot_redirect_streaming_body():
|
||||
client = httpx.Client(transport=ConsumeBodyTransport(redirects))
|
||||
url = "https://example.org/redirect_body"
|
||||
|
||||
def streaming_body():
|
||||
def streaming_body() -> typing.Iterator[bytes]:
|
||||
yield b"Example request body" # pragma: no cover
|
||||
|
||||
with pytest.raises(httpx.StreamConsumed):
|
||||
|
||||
@ -20,6 +20,9 @@ from uvicorn.server import Server
|
||||
from httpx import URL
|
||||
from tests.concurrency import sleep
|
||||
|
||||
if typing.TYPE_CHECKING: # pragma: no cover
|
||||
from httpx._transports.asgi import _Receive, _Send
|
||||
|
||||
ENVIRONMENT_VARIABLES = {
|
||||
"SSL_CERT_FILE",
|
||||
"SSL_CERT_DIR",
|
||||
@ -72,7 +75,10 @@ def clean_environ():
|
||||
os.environ.update(original_environ)
|
||||
|
||||
|
||||
async def app(scope, receive, send):
|
||||
_Scope = typing.Dict[str, typing.Any]
|
||||
|
||||
|
||||
async def app(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
assert scope["type"] == "http"
|
||||
if scope["path"].startswith("/slow_response"):
|
||||
await slow_response(scope, receive, send)
|
||||
@ -92,7 +98,7 @@ async def app(scope, receive, send):
|
||||
await hello_world(scope, receive, send)
|
||||
|
||||
|
||||
async def hello_world(scope, receive, send):
|
||||
async def hello_world(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.start",
|
||||
@ -103,7 +109,7 @@ async def hello_world(scope, receive, send):
|
||||
await send({"type": "http.response.body", "body": b"Hello, world!"})
|
||||
|
||||
|
||||
async def hello_world_json(scope, receive, send):
|
||||
async def hello_world_json(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.start",
|
||||
@ -114,7 +120,7 @@ async def hello_world_json(scope, receive, send):
|
||||
await send({"type": "http.response.body", "body": b'{"Hello": "world!"}'})
|
||||
|
||||
|
||||
async def slow_response(scope, receive, send):
|
||||
async def slow_response(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
await sleep(1.0)
|
||||
await send(
|
||||
{
|
||||
@ -126,7 +132,7 @@ async def slow_response(scope, receive, send):
|
||||
await send({"type": "http.response.body", "body": b"Hello, world!"})
|
||||
|
||||
|
||||
async def status_code(scope, receive, send):
|
||||
async def status_code(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
status_code = int(scope["path"].replace("/status/", ""))
|
||||
await send(
|
||||
{
|
||||
@ -138,7 +144,7 @@ async def status_code(scope, receive, send):
|
||||
await send({"type": "http.response.body", "body": b"Hello, world!"})
|
||||
|
||||
|
||||
async def echo_body(scope, receive, send):
|
||||
async def echo_body(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
body = b""
|
||||
more_body = True
|
||||
|
||||
@ -157,7 +163,7 @@ async def echo_body(scope, receive, send):
|
||||
await send({"type": "http.response.body", "body": body})
|
||||
|
||||
|
||||
async def echo_binary(scope, receive, send):
|
||||
async def echo_binary(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
body = b""
|
||||
more_body = True
|
||||
|
||||
@ -176,7 +182,7 @@ async def echo_binary(scope, receive, send):
|
||||
await send({"type": "http.response.body", "body": body})
|
||||
|
||||
|
||||
async def echo_headers(scope, receive, send):
|
||||
async def echo_headers(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
body = {
|
||||
name.capitalize().decode(): value.decode()
|
||||
for name, value in scope.get("headers", [])
|
||||
@ -191,7 +197,7 @@ async def echo_headers(scope, receive, send):
|
||||
await send({"type": "http.response.body", "body": json.dumps(body).encode()})
|
||||
|
||||
|
||||
async def redirect_301(scope, receive, send):
|
||||
async def redirect_301(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
|
||||
await send(
|
||||
{"type": "http.response.start", "status": 301, "headers": [[b"location", b"/"]]}
|
||||
)
|
||||
@ -275,7 +281,7 @@ class TestServer(Server):
|
||||
while not self.started:
|
||||
await sleep(0.2)
|
||||
|
||||
async def watch_restarts(self): # pragma: no cover
|
||||
async def watch_restarts(self) -> None: # pragma: no cover
|
||||
while True:
|
||||
if self.should_exit:
|
||||
return
|
||||
|
||||
@ -31,7 +31,7 @@ def test_iterable_content():
|
||||
|
||||
|
||||
def test_generator_with_transfer_encoding_header():
|
||||
def content():
|
||||
def content() -> typing.Iterator[bytes]:
|
||||
yield b"test 123" # pragma: no cover
|
||||
|
||||
request = httpx.Request("POST", "http://example.org", content=content())
|
||||
@ -39,7 +39,7 @@ def test_generator_with_transfer_encoding_header():
|
||||
|
||||
|
||||
def test_generator_with_content_length_header():
|
||||
def content():
|
||||
def content() -> typing.Iterator[bytes]:
|
||||
yield b"test 123" # pragma: no cover
|
||||
|
||||
headers = {"Content-Length": "8"}
|
||||
@ -100,8 +100,8 @@ async def test_aread_and_stream_data():
|
||||
|
||||
def test_cannot_access_streaming_content_without_read():
|
||||
# Ensure that streaming requests
|
||||
def streaming_body(): # pragma: no cover
|
||||
yield ""
|
||||
def streaming_body() -> typing.Iterator[bytes]: # pragma: no cover
|
||||
yield b""
|
||||
|
||||
request = httpx.Request("POST", "http://example.org", content=streaming_body())
|
||||
with pytest.raises(httpx.RequestNotRead):
|
||||
@ -109,7 +109,7 @@ def test_cannot_access_streaming_content_without_read():
|
||||
|
||||
|
||||
def test_transfer_encoding_header():
|
||||
async def streaming_body(data):
|
||||
async def streaming_body(data: bytes) -> typing.AsyncIterator[bytes]:
|
||||
yield data # pragma: no cover
|
||||
|
||||
data = streaming_body(b"test 123")
|
||||
@ -125,7 +125,7 @@ def test_ignore_transfer_encoding_header_if_content_length_exists():
|
||||
See https://github.com/encode/httpx/issues/1168
|
||||
"""
|
||||
|
||||
def streaming_body(data):
|
||||
def streaming_body(data: bytes) -> typing.Iterator[bytes]:
|
||||
yield data # pragma: no cover
|
||||
|
||||
data = streaming_body(b"abcd")
|
||||
@ -151,7 +151,7 @@ def test_override_accept_encoding_header():
|
||||
|
||||
|
||||
def test_override_content_length_header():
|
||||
async def streaming_body(data):
|
||||
async def streaming_body(data: bytes) -> typing.AsyncIterator[bytes]:
|
||||
yield data # pragma: no cover
|
||||
|
||||
data = streaming_body(b"test 123")
|
||||
@ -194,7 +194,7 @@ def test_request_picklable():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_async_streaming_content_picklable():
|
||||
async def streaming_body(data):
|
||||
async def streaming_body(data: bytes) -> typing.AsyncIterator[bytes]:
|
||||
yield data
|
||||
|
||||
data = streaming_body(b"test 123")
|
||||
@ -212,7 +212,7 @@ async def test_request_async_streaming_content_picklable():
|
||||
|
||||
|
||||
def test_request_generator_content_picklable():
|
||||
def content():
|
||||
def content() -> typing.Iterator[bytes]:
|
||||
yield b"test 123" # pragma: no cover
|
||||
|
||||
request = httpx.Request("POST", "http://example.org", content=content())
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import json
|
||||
import pickle
|
||||
import typing
|
||||
|
||||
import chardet
|
||||
import pytest
|
||||
@ -14,12 +15,12 @@ class StreamingBody:
|
||||
yield b"world!"
|
||||
|
||||
|
||||
def streaming_body():
|
||||
def streaming_body() -> typing.Iterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
|
||||
async def async_streaming_body():
|
||||
async def async_streaming_body() -> typing.AsyncIterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
@ -396,7 +397,7 @@ def test_iter_raw_with_chunksize():
|
||||
|
||||
|
||||
def test_iter_raw_doesnt_return_empty_chunks():
|
||||
def streaming_body_with_empty_chunks():
|
||||
def streaming_body_with_empty_chunks() -> typing.Iterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b""
|
||||
yield b"world!"
|
||||
@ -539,7 +540,7 @@ def test_iter_bytes_with_empty_response():
|
||||
|
||||
|
||||
def test_iter_bytes_doesnt_return_empty_chunks():
|
||||
def streaming_body_with_empty_chunks():
|
||||
def streaming_body_with_empty_chunks() -> typing.Iterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b""
|
||||
yield b"world!"
|
||||
@ -915,7 +916,7 @@ def test_cannot_access_unset_request():
|
||||
|
||||
|
||||
def test_generator_with_transfer_encoding_header():
|
||||
def content():
|
||||
def content() -> typing.Iterator[bytes]:
|
||||
yield b"test 123" # pragma: no cover
|
||||
|
||||
response = httpx.Response(200, content=content())
|
||||
@ -923,7 +924,7 @@ def test_generator_with_transfer_encoding_header():
|
||||
|
||||
|
||||
def test_generator_with_content_length_header():
|
||||
def content():
|
||||
def content() -> typing.Iterator[bytes]:
|
||||
yield b"test 123" # pragma: no cover
|
||||
|
||||
headers = {"Content-Length": "8"}
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import typing
|
||||
|
||||
import pytest
|
||||
|
||||
import httpx
|
||||
@ -18,7 +20,7 @@ def test_post(server):
|
||||
|
||||
|
||||
def test_post_byte_iterator(server):
|
||||
def data():
|
||||
def data() -> typing.Iterator[bytes]:
|
||||
yield b"Hello"
|
||||
yield b", "
|
||||
yield b"world!"
|
||||
|
||||
@ -63,7 +63,7 @@ async def test_bytesio_content():
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_bytesio_content():
|
||||
class AsyncBytesIO:
|
||||
def __init__(self, content: bytes):
|
||||
def __init__(self, content: bytes) -> None:
|
||||
self._idx = 0
|
||||
self._content = content
|
||||
|
||||
@ -87,7 +87,7 @@ async def test_async_bytesio_content():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_iterator_content():
|
||||
def hello_world():
|
||||
def hello_world() -> typing.Iterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
@ -105,7 +105,7 @@ async def test_iterator_content():
|
||||
|
||||
# Support 'data' for compat with requests.
|
||||
with pytest.warns(DeprecationWarning):
|
||||
headers, stream = encode_request(data=hello_world())
|
||||
headers, stream = encode_request(data=hello_world()) # type: ignore
|
||||
assert isinstance(stream, typing.Iterable)
|
||||
assert not isinstance(stream, typing.AsyncIterable)
|
||||
|
||||
@ -117,7 +117,7 @@ async def test_iterator_content():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_aiterator_content():
|
||||
async def hello_world():
|
||||
async def hello_world() -> typing.AsyncIterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
@ -135,7 +135,7 @@ async def test_aiterator_content():
|
||||
|
||||
# Support 'data' for compat with requests.
|
||||
with pytest.warns(DeprecationWarning):
|
||||
headers, stream = encode_request(data=hello_world())
|
||||
headers, stream = encode_request(data=hello_world()) # type: ignore
|
||||
assert not isinstance(stream, typing.Iterable)
|
||||
assert isinstance(stream, typing.AsyncIterable)
|
||||
|
||||
@ -409,7 +409,7 @@ async def test_response_bytes_content():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_response_iterator_content():
|
||||
def hello_world():
|
||||
def hello_world() -> typing.Iterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
@ -428,7 +428,7 @@ async def test_response_iterator_content():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_response_aiterator_content():
|
||||
async def hello_world():
|
||||
async def hello_world() -> typing.AsyncIterator[bytes]:
|
||||
yield b"Hello, "
|
||||
yield b"world!"
|
||||
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import typing
|
||||
import zlib
|
||||
|
||||
import chardet
|
||||
@ -127,7 +128,7 @@ async def test_streaming():
|
||||
body = b"test 123"
|
||||
compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
|
||||
|
||||
async def compress(body):
|
||||
async def compress(body: bytes) -> typing.AsyncIterator[bytes]:
|
||||
yield compressor.compress(body)
|
||||
yield compressor.flush()
|
||||
|
||||
@ -186,7 +187,7 @@ def test_decoding_errors(header_value):
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_text_decoder_with_autodetect(data, encoding):
|
||||
async def iterator():
|
||||
async def iterator() -> typing.AsyncIterator[bytes]:
|
||||
nonlocal data
|
||||
for chunk in data:
|
||||
yield chunk
|
||||
@ -209,7 +210,7 @@ async def test_text_decoder_with_autodetect(data, encoding):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_text_decoder_known_encoding():
|
||||
async def iterator():
|
||||
async def iterator() -> typing.AsyncIterator[bytes]:
|
||||
yield b"\x83g"
|
||||
yield b"\x83"
|
||||
yield b"\x89\x83x\x83\x8b"
|
||||
|
||||
@ -1,21 +1,27 @@
|
||||
import os
|
||||
import typing
|
||||
|
||||
from click.testing import CliRunner
|
||||
|
||||
import httpx
|
||||
from httpx import main
|
||||
|
||||
if typing.TYPE_CHECKING: # pragma: no cover
|
||||
# don't let mypy be misled by the fallback defined in httpx/__init__.py
|
||||
from httpx._main import main # noqa: F811
|
||||
|
||||
|
||||
def splitlines(output):
|
||||
def splitlines(output: str) -> typing.Iterable[str]:
|
||||
return [line.strip() for line in output.splitlines()]
|
||||
|
||||
|
||||
def remove_date_header(lines):
|
||||
def remove_date_header(lines: typing.Iterable[str]) -> typing.Iterable[str]:
|
||||
return [line for line in lines if not line.startswith("date:")]
|
||||
|
||||
|
||||
def test_help():
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, ["--help"])
|
||||
result = runner.invoke(main, ["--help"])
|
||||
assert result.exit_code == 0
|
||||
assert "A next generation HTTP client." in result.output
|
||||
|
||||
@ -23,7 +29,7 @@ def test_help():
|
||||
def test_get(server):
|
||||
url = str(server.url)
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, [url])
|
||||
result = runner.invoke(main, [url])
|
||||
assert result.exit_code == 0
|
||||
assert remove_date_header(splitlines(result.output)) == [
|
||||
"HTTP/1.1 200 OK",
|
||||
@ -38,7 +44,7 @@ def test_get(server):
|
||||
def test_json(server):
|
||||
url = str(server.url.copy_with(path="/json"))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, [url])
|
||||
result = runner.invoke(main, [url])
|
||||
assert result.exit_code == 0
|
||||
assert remove_date_header(splitlines(result.output)) == [
|
||||
"HTTP/1.1 200 OK",
|
||||
@ -56,7 +62,7 @@ def test_binary(server):
|
||||
url = str(server.url.copy_with(path="/echo_binary"))
|
||||
runner = CliRunner()
|
||||
content = "Hello, world!"
|
||||
result = runner.invoke(httpx.main, [url, "-c", content])
|
||||
result = runner.invoke(main, [url, "-c", content])
|
||||
assert result.exit_code == 0
|
||||
assert remove_date_header(splitlines(result.output)) == [
|
||||
"HTTP/1.1 200 OK",
|
||||
@ -71,7 +77,7 @@ def test_binary(server):
|
||||
def test_redirects(server):
|
||||
url = str(server.url.copy_with(path="/redirect_301"))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, [url])
|
||||
result = runner.invoke(main, [url])
|
||||
assert result.exit_code == 1
|
||||
assert remove_date_header(splitlines(result.output)) == [
|
||||
"HTTP/1.1 301 Moved Permanently",
|
||||
@ -85,7 +91,7 @@ def test_redirects(server):
|
||||
def test_follow_redirects(server):
|
||||
url = str(server.url.copy_with(path="/redirect_301"))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, [url, "--follow-redirects"])
|
||||
result = runner.invoke(main, [url, "--follow-redirects"])
|
||||
assert result.exit_code == 0
|
||||
assert remove_date_header(splitlines(result.output)) == [
|
||||
"HTTP/1.1 301 Moved Permanently",
|
||||
@ -105,7 +111,7 @@ def test_follow_redirects(server):
|
||||
def test_post(server):
|
||||
url = str(server.url.copy_with(path="/echo_body"))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, [url, "-m", "POST", "-j", '{"hello": "world"}'])
|
||||
result = runner.invoke(main, [url, "-m", "POST", "-j", '{"hello": "world"}'])
|
||||
assert result.exit_code == 0
|
||||
assert remove_date_header(splitlines(result.output)) == [
|
||||
"HTTP/1.1 200 OK",
|
||||
@ -120,7 +126,7 @@ def test_post(server):
|
||||
def test_verbose(server):
|
||||
url = str(server.url)
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, [url, "-v"])
|
||||
result = runner.invoke(main, [url, "-v"])
|
||||
assert result.exit_code == 0
|
||||
assert remove_date_header(splitlines(result.output)) == [
|
||||
"* Connecting to '127.0.0.1'",
|
||||
@ -144,7 +150,7 @@ def test_verbose(server):
|
||||
def test_auth(server):
|
||||
url = str(server.url)
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, [url, "-v", "--auth", "username", "password"])
|
||||
result = runner.invoke(main, [url, "-v", "--auth", "username", "password"])
|
||||
print(result.output)
|
||||
assert result.exit_code == 0
|
||||
assert remove_date_header(splitlines(result.output)) == [
|
||||
@ -171,7 +177,7 @@ def test_download(server):
|
||||
url = str(server.url)
|
||||
runner = CliRunner()
|
||||
with runner.isolated_filesystem():
|
||||
runner.invoke(httpx.main, [url, "--download", "index.txt"])
|
||||
runner.invoke(main, [url, "--download", "index.txt"])
|
||||
assert os.path.exists("index.txt")
|
||||
with open("index.txt", "r") as input_file:
|
||||
assert input_file.read() == "Hello, world!"
|
||||
@ -179,7 +185,7 @@ def test_download(server):
|
||||
|
||||
def test_errors():
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(httpx.main, ["invalid://example.org"])
|
||||
result = runner.invoke(main, ["invalid://example.org"])
|
||||
assert result.exit_code == 1
|
||||
assert splitlines(result.output) == [
|
||||
"UnsupportedProtocol: Request URL has an unsupported protocol 'invalid://'.",
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import sys
|
||||
import typing
|
||||
import wsgiref.validate
|
||||
from functools import partial
|
||||
from io import StringIO
|
||||
@ -7,8 +8,11 @@ import pytest
|
||||
|
||||
import httpx
|
||||
|
||||
if typing.TYPE_CHECKING: # pragma: no cover
|
||||
from _typeshed.wsgi import StartResponse, WSGIApplication, WSGIEnvironment
|
||||
|
||||
def application_factory(output):
|
||||
|
||||
def application_factory(output: typing.Iterable[bytes]) -> "WSGIApplication":
|
||||
def application(environ, start_response):
|
||||
status = "200 OK"
|
||||
|
||||
@ -24,7 +28,9 @@ def application_factory(output):
|
||||
return wsgiref.validate.validator(application)
|
||||
|
||||
|
||||
def echo_body(environ, start_response):
|
||||
def echo_body(
|
||||
environ: "WSGIEnvironment", start_response: "StartResponse"
|
||||
) -> typing.Iterable[bytes]:
|
||||
status = "200 OK"
|
||||
output = environ["wsgi.input"].read()
|
||||
|
||||
@ -37,14 +43,16 @@ def echo_body(environ, start_response):
|
||||
return [output]
|
||||
|
||||
|
||||
def echo_body_with_response_stream(environ, start_response):
|
||||
def echo_body_with_response_stream(
|
||||
environ: "WSGIEnvironment", start_response: "StartResponse"
|
||||
) -> typing.Iterable[bytes]:
|
||||
status = "200 OK"
|
||||
|
||||
response_headers = [("Content-Type", "text/plain")]
|
||||
|
||||
start_response(status, response_headers)
|
||||
|
||||
def output_generator(f):
|
||||
def output_generator(f: typing.IO[bytes]) -> typing.Iterator[bytes]:
|
||||
while True:
|
||||
output = f.read(2)
|
||||
if not output:
|
||||
@ -54,7 +62,11 @@ def echo_body_with_response_stream(environ, start_response):
|
||||
return output_generator(f=environ["wsgi.input"])
|
||||
|
||||
|
||||
def raise_exc(environ, start_response, exc=ValueError):
|
||||
def raise_exc(
|
||||
environ: "WSGIEnvironment",
|
||||
start_response: "StartResponse",
|
||||
exc: typing.Type[Exception] = ValueError,
|
||||
) -> typing.Iterable[bytes]:
|
||||
status = "500 Server Error"
|
||||
output = b"Nope!"
|
||||
|
||||
@ -66,7 +78,7 @@ def raise_exc(environ, start_response, exc=ValueError):
|
||||
raise exc()
|
||||
except exc:
|
||||
exc_info = sys.exc_info()
|
||||
start_response(status, response_headers, exc_info=exc_info)
|
||||
start_response(status, response_headers, exc_info)
|
||||
|
||||
return [output]
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user