use # pragma: no cover instead of # pragma: nocover (#2471)

Co-authored-by: Tom Christie <tom@tomchristie.com>
This commit is contained in:
Adrian Garcia Badaracco 2022-11-29 10:23:18 -06:00 committed by GitHub
parent deb904dd15
commit 16e2830624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 70 additions and 70 deletions

View File

@ -46,7 +46,7 @@ from ._urls import URL, QueryParams
try:
from ._main import main
except ImportError: # pragma: nocover
except ImportError: # pragma: no cover
def main() -> None: # type: ignore
import sys

View File

@ -667,7 +667,7 @@ class Client(BaseClient):
if http2:
try:
import h2 # noqa
except ImportError: # pragma: nocover
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
@ -1388,7 +1388,7 @@ class AsyncClient(BaseClient):
if http2:
try:
import h2 # noqa
except ImportError: # pragma: nocover
except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."

View File

@ -10,7 +10,7 @@ import sys
# The CFFI bindings in `brotlicffi` are recommended for PyPy and everything else.
try:
import brotlicffi as brotli
except ImportError: # pragma: nocover
except ImportError: # pragma: no cover
try:
import brotli
except ImportError:

View File

@ -34,7 +34,7 @@ logger = get_logger(__name__)
class UnsetType:
pass # pragma: nocover
pass # pragma: no cover
UNSET = UnsetType()
@ -127,14 +127,14 @@ class SSLConfig:
# AttributeError if only read-only access is implemented.
try:
context.post_handshake_auth = True # type: ignore
except AttributeError: # pragma: nocover
except AttributeError: # pragma: no cover
pass
# Disable using 'commonName' for SSLContext.check_hostname
# when the 'subjectAltName' extension isn't available.
try:
context.hostname_checks_common_name = False # type: ignore
except AttributeError: # pragma: nocover
except AttributeError: # pragma: no cover
pass
if ca_bundle_path.is_file():
@ -162,7 +162,7 @@ class SSLConfig:
alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"]
context.set_alpn_protocols(alpn_idents)
if hasattr(context, "keylog_filename"): # pragma: nocover (Available in 3.8+)
if hasattr(context, "keylog_filename"): # pragma: no cover (Available in 3.8+)
keylogfile = os.environ.get("SSLKEYLOGFILE")
if keylogfile and self.trust_env:
context.keylog_filename = keylogfile # type: ignore

View File

@ -101,7 +101,7 @@ class UnattachedStream(AsyncByteStream, SyncByteStream):
async def __aiter__(self) -> AsyncIterator[bytes]:
raise StreamClosed()
yield b"" # pragma: nocover
yield b"" # pragma: no cover
def encode_content(

View File

@ -14,10 +14,10 @@ from ._exceptions import DecodingError
class ContentDecoder:
def decode(self, data: bytes) -> bytes:
raise NotImplementedError() # pragma: nocover
raise NotImplementedError() # pragma: no cover
def flush(self) -> bytes:
raise NotImplementedError() # pragma: nocover
raise NotImplementedError() # pragma: no cover
class IdentityDecoder(ContentDecoder):
@ -57,7 +57,7 @@ class DeflateDecoder(ContentDecoder):
def flush(self) -> bytes:
try:
return self.decompressor.flush()
except zlib.error as exc: # pragma: nocover
except zlib.error as exc: # pragma: no cover
raise DecodingError(str(exc)) from exc
@ -80,7 +80,7 @@ class GZipDecoder(ContentDecoder):
def flush(self) -> bytes:
try:
return self.decompressor.flush()
except zlib.error as exc: # pragma: nocover
except zlib.error as exc: # pragma: no cover
raise DecodingError(str(exc)) from exc
@ -95,7 +95,7 @@ class BrotliDecoder(ContentDecoder):
"""
def __init__(self) -> None:
if brotli is None: # pragma: nocover
if brotli is None: # pragma: no cover
raise ImportError(
"Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
"packages have been installed. "
@ -106,10 +106,10 @@ class BrotliDecoder(ContentDecoder):
self.seen_data = False
if hasattr(self.decompressor, "decompress"):
# The 'brotlicffi' package.
self._decompress = self.decompressor.decompress # pragma: nocover
self._decompress = self.decompressor.decompress # pragma: no cover
else:
# The 'brotli' package.
self._decompress = self.decompressor.process # pragma: nocover
self._decompress = self.decompressor.process # pragma: no cover
def decode(self, data: bytes) -> bytes:
if not data:
@ -130,9 +130,9 @@ class BrotliDecoder(ContentDecoder):
# As the decompressor decompresses eagerly, this
# will never actually emit any data. However, it will potentially throw
# errors if a truncated or damaged data stream has been used.
self.decompressor.finish() # pragma: nocover
self.decompressor.finish() # pragma: no cover
return b""
except brotli.error as exc: # pragma: nocover
except brotli.error as exc: # pragma: no cover
raise DecodingError(str(exc)) from exc
@ -330,4 +330,4 @@ SUPPORTED_DECODERS = {
if brotli is None:
SUPPORTED_DECODERS.pop("br") # pragma: nocover
SUPPORTED_DECODERS.pop("br") # pragma: no cover

View File

@ -34,7 +34,7 @@ import contextlib
import typing
if typing.TYPE_CHECKING:
from ._models import Request, Response # pragma: nocover
from ._models import Request, Response # pragma: no cover
class HTTPError(Exception):

View File

@ -101,9 +101,9 @@ def get_lexer_for_response(response: Response) -> str:
mime_type, _, _ = content_type.partition(";")
try:
return pygments.lexers.get_lexer_for_mimetype(mime_type.strip()).name
except pygments.util.ClassNotFound: # pragma: nocover
except pygments.util.ClassNotFound: # pragma: no cover
pass
return "" # pragma: nocover
return "" # pragma: no cover
def format_request_headers(request: httpcore.Request, http2: bool = False) -> str:
@ -168,7 +168,7 @@ def print_response(response: Response) -> None:
try:
data = response.json()
text = json.dumps(data, indent=4)
except ValueError: # pragma: nocover
except ValueError: # pragma: no cover
text = response.text
else:
text = response.text
@ -179,7 +179,7 @@ def print_response(response: Response) -> None:
console.print(f"<{len(response.content)} bytes of binary data>")
def format_certificate(cert: dict) -> str: # pragma: nocover
def format_certificate(cert: dict) -> str: # pragma: no cover
lines = []
for key, value in cert.items():
if isinstance(value, (list, tuple)):
@ -208,7 +208,7 @@ def trace(
stream = info["return_value"]
server_addr = stream.get_extra_info("server_addr")
console.print(f"* Connected to {server_addr[0]!r} on port {server_addr[1]}")
elif name == "connection.start_tls.complete" and verbose: # pragma: nocover
elif name == "connection.start_tls.complete" and verbose: # pragma: no cover
stream = info["return_value"]
ssl_object = stream.get_extra_info("ssl_object")
version = ssl_object.version()
@ -223,13 +223,13 @@ def trace(
elif name == "http11.send_request_headers.started" and verbose:
request = info["request"]
print_request_headers(request, http2=False)
elif name == "http2.send_request_headers.started" and verbose: # pragma: nocover
elif name == "http2.send_request_headers.started" and verbose: # pragma: no cover
request = info["request"]
print_request_headers(request, http2=True)
elif name == "http11.receive_response_headers.complete":
http_version, status, reason_phrase, headers = info["return_value"]
print_response_headers(http_version, status, reason_phrase, headers)
elif name == "http2.receive_response_headers.complete": # pragma: nocover
elif name == "http2.receive_response_headers.complete": # pragma: no cover
status, headers = info["return_value"]
http_version = b"HTTP/2"
reason_phrase = None
@ -268,7 +268,7 @@ def validate_json(
try:
return json.loads(value)
except json.JSONDecodeError: # pragma: nocover
except json.JSONDecodeError: # pragma: no cover
raise click.BadParameter("Not valid JSON")
@ -281,7 +281,7 @@ def validate_auth(
return None
username, password = value
if password == "-": # pragma: nocover
if password == "-": # pragma: no cover
password = click.prompt("Password", hide_input=True)
return (username, password)

View File

@ -822,7 +822,7 @@ class Response:
yield chunk
decoded = decoder.flush()
for chunk in chunker.decode(decoded):
yield chunk # pragma: nocover
yield chunk # pragma: no cover
for chunk in chunker.flush():
yield chunk
@ -926,7 +926,7 @@ class Response:
yield chunk
decoded = decoder.flush()
for chunk in chunker.decode(decoded):
yield chunk # pragma: nocover
yield chunk # pragma: no cover
for chunk in chunker.flush():
yield chunk

View File

@ -52,7 +52,7 @@ class BaseTransport:
"""
raise NotImplementedError(
"The 'handle_request' method must be implemented."
) # pragma: nocover
) # pragma: no cover
def close(self) -> None:
pass
@ -76,7 +76,7 @@ class AsyncBaseTransport:
) -> Response:
raise NotImplementedError(
"The 'handle_async_request' method must be implemented."
) # pragma: nocover
) # pragma: no cover
async def aclose(self) -> None:
pass

View File

@ -70,7 +70,7 @@ def map_httpcore_exceptions() -> typing.Iterator[None]:
if mapped_exc is None or issubclass(to_exc, mapped_exc):
mapped_exc = to_exc
if mapped_exc is None: # pragma: nocover
if mapped_exc is None: # pragma: no cover
raise
message = str(exc)
@ -157,7 +157,7 @@ class HTTPTransport(BaseTransport):
elif proxy.url.scheme == "socks5":
try:
import socksio # noqa
except ImportError: # pragma: nocover
except ImportError: # pragma: no cover
raise ImportError(
"Using SOCKS proxy, but the 'socksio' package is not installed. "
"Make sure to install httpx using `pip install httpx[socks]`."
@ -178,7 +178,7 @@ class HTTPTransport(BaseTransport):
http1=http1,
http2=http2,
)
else: # pragma: nocover
else: # pragma: no cover
raise ValueError(
f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}."
)
@ -292,7 +292,7 @@ class AsyncHTTPTransport(AsyncBaseTransport):
elif proxy.url.scheme == "socks5":
try:
import socksio # noqa
except ImportError: # pragma: nocover
except ImportError: # pragma: no cover
raise ImportError(
"Using SOCKS proxy, but the 'socksio' package is not installed. "
"Make sure to install httpx using `pip install httpx[socks]`."
@ -313,7 +313,7 @@ class AsyncHTTPTransport(AsyncBaseTransport):
http1=http1,
http2=http2,
)
else: # pragma: nocover
else: # pragma: no cover
raise ValueError(
f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}."
)

View File

@ -100,8 +100,8 @@ class SyncByteStream:
def __iter__(self) -> Iterator[bytes]:
raise NotImplementedError(
"The '__iter__' method must be implemented."
) # pragma: nocover
yield b"" # pragma: nocover
) # pragma: no cover
yield b"" # pragma: no cover
def close(self) -> None:
"""
@ -114,8 +114,8 @@ class AsyncByteStream:
async def __aiter__(self) -> AsyncIterator[bytes]:
raise NotImplementedError(
"The '__aiter__' method must be implemented."
) # pragma: nocover
yield b"" # pragma: nocover
) # pragma: no cover
yield b"" # pragma: no cover
async def aclose(self) -> None:
pass

View File

@ -146,7 +146,7 @@ class NetRCInfo:
if expanded_path.is_file():
self._netrc_info = netrc.netrc(str(expanded_path))
break
except (netrc.NetrcParseError, IOError): # pragma: nocover
except (netrc.NetrcParseError, IOError): # pragma: no cover
# Issue while reading the netrc file, ignore...
pass
return self._netrc_info
@ -237,7 +237,7 @@ TRACE_LOG_LEVEL = 5
class Logger(logging.Logger):
# Stub for type checkers.
def trace(self, message: str, *args: typing.Any, **kwargs: typing.Any) -> None:
... # pragma: nocover
... # pragma: no cover
def get_logger(name: str) -> Logger:
@ -398,7 +398,7 @@ class Timer:
import trio
return trio.current_time()
elif library == "curio": # pragma: nocover
elif library == "curio": # pragma: no cover
import curio
return await curio.clock()

View File

@ -100,7 +100,7 @@ async def test_stream_request(server):
@pytest.mark.usefixtures("async_environment")
async def test_cannot_stream_sync_request(server):
def hello_world(): # pragma: nocover
def hello_world(): # pragma: no cover
yield b"Hello, "
yield b"world!"
@ -272,7 +272,7 @@ async def test_client_closed_state_using_implicit_open():
# Once we're closed we cannot reopen the client.
with pytest.raises(RuntimeError):
async with client:
pass # pragma: nocover
pass # pragma: no cover
@pytest.mark.usefixtures("async_environment")
@ -346,7 +346,7 @@ async def test_cancellation_during_stream():
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
yield b"Hello"
raise KeyboardInterrupt()
yield b", world" # pragma: nocover
yield b", world" # pragma: no cover
async def aclose(self) -> None:
nonlocal stream_was_closed

View File

@ -633,7 +633,7 @@ async def test_digest_auth_unavailable_streaming_body():
app = DigestApp()
async def streaming_body():
yield b"Example request body" # pragma: nocover
yield b"Example request body" # pragma: no cover
async with httpx.AsyncClient(transport=ConsumeBodyTransport(app)) as client:
with pytest.raises(httpx.StreamConsumed):

View File

@ -120,7 +120,7 @@ def test_raw_iterator(server):
def test_cannot_stream_async_request(server):
async def hello_world(): # pragma: nocover
async def hello_world(): # pragma: no cover
yield b"Hello, "
yield b"world!"
@ -318,7 +318,7 @@ def test_client_closed_state_using_implicit_open():
# Once we're closed we cannot reopen the client.
with pytest.raises(RuntimeError):
with client:
pass # pragma: nocover
pass # pragma: no cover
def test_client_closed_state_using_with_block():

View File

@ -53,7 +53,7 @@ def test_client_timeout():
def test_client_event_hooks():
def on_request(request):
pass # pragma: nocover
pass # pragma: no cover
client = httpx.Client()
client.event_hooks = {"request": [on_request]}

View File

@ -352,7 +352,7 @@ def test_cannot_redirect_streaming_body():
url = "https://example.org/redirect_body"
def streaming_body():
yield b"Example request body" # pragma: nocover
yield b"Example request body" # pragma: no cover
with pytest.raises(httpx.StreamConsumed):
client.post(url, content=streaming_body(), follow_redirects=True)

View File

@ -10,6 +10,6 @@ import trio
async def sleep(seconds: float) -> None:
if sniffio.current_async_library() == "trio":
await trio.sleep(seconds) # pragma: nocover
await trio.sleep(seconds) # pragma: no cover
else:
await asyncio.sleep(seconds)

View File

@ -264,7 +264,7 @@ class TestServer(Server):
}
await asyncio.wait(tasks)
async def restart(self) -> None: # pragma: nocover
async def restart(self) -> None: # pragma: no cover
# This coroutine may be called from a different thread than the one the
# server is running on, and from an async environment that's not asyncio.
# For this reason, we use an event to coordinate with the server
@ -275,7 +275,7 @@ class TestServer(Server):
while not self.started:
await sleep(0.2)
async def watch_restarts(self): # pragma: nocover
async def watch_restarts(self): # pragma: no cover
while True:
if self.should_exit:
return

View File

@ -24,7 +24,7 @@ def test_content_length_header():
def test_iterable_content():
class Content:
def __iter__(self):
yield b"test 123" # pragma: nocover
yield b"test 123" # pragma: no cover
request = httpx.Request("POST", "http://example.org", content=Content())
assert request.headers == {"Host": "example.org", "Transfer-Encoding": "chunked"}
@ -32,7 +32,7 @@ def test_iterable_content():
def test_generator_with_transfer_encoding_header():
def content():
yield b"test 123" # pragma: nocover
yield b"test 123" # pragma: no cover
request = httpx.Request("POST", "http://example.org", content=content())
assert request.headers == {"Host": "example.org", "Transfer-Encoding": "chunked"}
@ -40,7 +40,7 @@ def test_generator_with_transfer_encoding_header():
def test_generator_with_content_length_header():
def content():
yield b"test 123" # pragma: nocover
yield b"test 123" # pragma: no cover
headers = {"Content-Length": "8"}
request = httpx.Request(
@ -100,7 +100,7 @@ async def test_aread_and_stream_data():
def test_cannot_access_streaming_content_without_read():
# Ensure that streaming requests
def streaming_body(): # pragma: nocover
def streaming_body(): # pragma: no cover
yield ""
request = httpx.Request("POST", "http://example.org", content=streaming_body())
@ -110,7 +110,7 @@ def test_cannot_access_streaming_content_without_read():
def test_transfer_encoding_header():
async def streaming_body(data):
yield data # pragma: nocover
yield data # pragma: no cover
data = streaming_body(b"test 123")
@ -126,7 +126,7 @@ def test_ignore_transfer_encoding_header_if_content_length_exists():
"""
def streaming_body(data):
yield data # pragma: nocover
yield data # pragma: no cover
data = streaming_body(b"abcd")
@ -152,7 +152,7 @@ def test_override_accept_encoding_header():
def test_override_content_length_header():
async def streaming_body(data):
yield data # pragma: nocover
yield data # pragma: no cover
data = streaming_body(b"test 123")
headers = {"Content-Length": "8"}
@ -213,7 +213,7 @@ async def test_request_async_streaming_content_picklable():
def test_request_generator_content_picklable():
def content():
yield b"test 123" # pragma: nocover
yield b"test 123" # pragma: no cover
request = httpx.Request("POST", "http://example.org", content=content())
pickle_request = pickle.loads(pickle.dumps(request))

View File

@ -916,7 +916,7 @@ def test_cannot_access_unset_request():
def test_generator_with_transfer_encoding_header():
def content():
yield b"test 123" # pragma: nocover
yield b"test 123" # pragma: no cover
response = httpx.Response(200, content=content())
assert response.headers == {"Transfer-Encoding": "chunked"}
@ -924,7 +924,7 @@ def test_generator_with_transfer_encoding_header():
def test_generator_with_content_length_header():
def content():
yield b"test 123" # pragma: nocover
yield b"test 123" # pragma: no cover
headers = {"Content-Length": "8"}
response = httpx.Response(200, content=content(), headers=headers)

View File

@ -177,7 +177,7 @@ def test_timeout_repr():
reason="requires OpenSSL 1.1.1 or higher",
)
@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher")
def test_ssl_config_support_for_keylog_file(tmpdir, monkeypatch): # pragma: nocover
def test_ssl_config_support_for_keylog_file(tmpdir, monkeypatch): # pragma: no cover
with monkeypatch.context() as m:
m.delenv("SSLKEYLOGFILE", raising=False)

View File

@ -73,7 +73,7 @@ async def test_async_bytesio_content():
return chunk
async def __aiter__(self):
yield self._content # pragma: nocover
yield self._content # pragma: no cover
headers, stream = encode_request(content=AsyncBytesIO(b"Hello, world!"))
assert not isinstance(stream, typing.Iterable)

View File

@ -21,7 +21,7 @@ def test_httpcore_all_exceptions_mapped() -> None:
and value is not httpcore.ConnectionNotAvailable
]
if not_mapped: # pragma: nocover
if not_mapped: # pragma: no cover
pytest.fail(f"Unmapped httpcore exceptions: {not_mapped}")
@ -70,7 +70,7 @@ def test_httpx_exceptions_exposed() -> None:
and not hasattr(httpx, name)
]
if not_exposed: # pragma: nocover
if not_exposed: # pragma: no cover
pytest.fail(f"Unexposed HTTPX exceptions: {not_exposed}")