Add NetRCAuth() class. (#2535)

* NetRCAuth class

* Add docs for httpx.NetRCAuth()

* Drop failing cross-domain test for NetRCAuth()

* Update tests

* Update httpx/_auth.py

* Add tests for netrc file with no password
This commit is contained in:
Tom Christie 2023-01-12 11:27:46 +00:00 committed by GitHub
parent 7947b56076
commit 59914c7690
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 134 additions and 134 deletions

View File

@ -428,43 +428,48 @@ with tempfile.NamedTemporaryFile() as download_file:
## .netrc Support
HTTPX supports .netrc file. In `trust_env=True` cases, if auth parameter is
not defined, HTTPX tries to add auth into request's header from .netrc file.
HTTPX can be configured to use [a `.netrc` config file](https://everything.curl.dev/usingcurl/netrc) for authentication.
!!! note
The NETRC file is cached across requests made by a client.
If you need to refresh the cache (e.g. because the NETRC file has changed),
you should create a new client or restart the interpreter.
The `.netrc` config file allows authentication credentials to be associated with specified hosts. When a request is made to a host that is found in the netrc file, the username and password will be included using HTTP basic auth.
Example `.netrc` file:
As default `trust_env` is true. To set false:
```pycon
>>> httpx.get('https://example.org/', trust_env=False)
```
If `NETRC` environment is empty, HTTPX tries to use default files.
(`~/.netrc`, `~/_netrc`)
To change `NETRC` environment:
```pycon
>>> import os
>>> os.environ["NETRC"] = "my_default_folder/.my_netrc"
```
.netrc file content example:
```
machine netrcexample.org
machine example.org
login example-username
password example-password
...
machine python-httpx.org
login other-username
password other-password
```
When using `Client` instances, `trust_env` should be set on the client itself, rather than on the request methods:
Some examples of configuring `.netrc` authentication with `httpx`.
```python
client = httpx.Client(trust_env=False)
Use the default `.netrc` file in the users home directory:
```pycon
>>> auth = httpx.NetRCAuth()
>>> client = httpx.Client(auth=auth)
```
Use an explicit path to a `.netrc` file:
```pycon
>>> auth = httpx.NetRCAuth(file="/path/to/.netrc")
>>> client = httpx.Client(auth=auth)
```
Use the `NETRC` environment variable to configure a path to the `.netrc` file,
or fallback to the default.
```pycon
>>> auth = httpx.NetRCAuth(file=os.environ.get("NETRC"))
>>> client = httpx.Client(auth=auth)
```
The `NetRCAuth()` class uses [the `netrc.netrc()` function from the Python standard library](https://docs.python.org/3/library/netrc.html). See the documentation there for more details on exceptions that may be raised if the netrc file is not found, or cannot be parsed.
## HTTP Proxying
HTTPX supports setting up [HTTP proxies](https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers) via the `proxies` parameter to be passed on client initialization or top-level API functions like `httpx.get(..., proxies=...)`.
@ -828,7 +833,7 @@ For instance this request sends 2 files, `foo.png` and `bar.png` in one request
When issuing requests or instantiating a client, the `auth` argument can be used to pass an authentication scheme to use. The `auth` argument may be one of the following...
* A two-tuple of `username`/`password`, to be used with basic authentication.
* An instance of `httpx.BasicAuth()` or `httpx.DigestAuth()`.
* An instance of `httpx.BasicAuth()`, `httpx.DigestAuth()`, or `httpx.NetRCAuth()`.
* A callable, accepting a request and returning an authenticated request instance.
* An instance of subclasses of `httpx.Auth`.

View File

@ -132,18 +132,6 @@ Example:
SSL_CERT_DIR=/path/to/ca-certs/ python -c "import httpx; httpx.get('https://example.com')"
```
## `NETRC`
Valid values: a filename
If this environment variable is set but auth parameter is not defined, HTTPX will add auth information stored in the .netrc file into the request's header. If you do not provide NETRC environment either, HTTPX will use default files. (~/.netrc, ~/_netrc)
Example:
```console
NETRC=/path/to/netrcfile/.my_netrc python -c "import httpx; httpx.get('https://example.com')"
```
## Proxies
The environment variables documented below are used as a convention by various HTTP tooling, including:
@ -187,4 +175,3 @@ python -c "import httpx; httpx.get('http://example.com')"
python -c "import httpx; httpx.get('http://127.0.0.1:5000/my-api')"
python -c "import httpx; httpx.get('https://www.python-httpx.org')"
```

View File

@ -1,6 +1,6 @@
from .__version__ import __description__, __title__, __version__
from ._api import delete, get, head, options, patch, post, put, request, stream
from ._auth import Auth, BasicAuth, DigestAuth
from ._auth import Auth, BasicAuth, DigestAuth, NetRCAuth
from ._client import USE_CLIENT_DEFAULT, AsyncClient, Client
from ._config import Limits, Proxy, Timeout, create_ssl_context
from ._content import ByteStream
@ -94,6 +94,7 @@ __all__ = [
"LocalProtocolError",
"main",
"MockTransport",
"NetRCAuth",
"NetworkError",
"options",
"patch",

View File

@ -1,4 +1,5 @@
import hashlib
import netrc
import os
import re
import time
@ -141,6 +142,34 @@ class BasicAuth(Auth):
return f"Basic {token}"
class NetRCAuth(Auth):
"""
Use a 'netrc' file to lookup basic auth credentials based on the url host.
"""
def __init__(self, file: typing.Optional[str]):
self._netrc_info = netrc.netrc(file)
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
auth_info = self._netrc_info.authenticators(request.url.host)
if auth_info is None or not auth_info[2]:
# The netrc file did not have authentication credentials for this host.
yield request
else:
# Build a basic auth header with credentials from the netrc file.
request.headers["Authorization"] = self._build_auth_header(
username=auth_info[0], password=auth_info[2]
)
yield request
def _build_auth_header(
self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
) -> str:
userpass = b":".join((to_bytes(username), to_bytes(password)))
token = b64encode(userpass).decode()
return f"Basic {token}"
class DigestAuth(Auth):
_ALGORITHM_TO_HASH_FUNCTION: typing.Dict[str, typing.Callable[[bytes], "_Hash"]] = {
"MD5": hashlib.md5,

View File

@ -47,7 +47,6 @@ from ._types import (
)
from ._urls import URL, QueryParams
from ._utils import (
NetRCInfo,
Timer,
URLPattern,
get_environment_proxies,
@ -191,7 +190,6 @@ class BaseClient:
}
self._trust_env = trust_env
self._default_encoding = default_encoding
self._netrc = NetRCInfo()
self._state = ClientState.UNOPENED
@property
@ -456,11 +454,6 @@ class BaseClient:
if username or password:
return BasicAuth(username=username, password=password)
if self.trust_env and "Authorization" not in request.headers:
credentials = self._netrc.get_credentials(request.url.host)
if credentials is not None:
return BasicAuth(username=credentials[0], password=credentials[1])
return Auth()
def _build_redirect_request(self, request: Request, response: Response) -> Request:

View File

@ -2,7 +2,6 @@ import codecs
import email.message
import logging
import mimetypes
import netrc
import os
import re
import sys
@ -128,37 +127,6 @@ def guess_json_utf(data: bytes) -> typing.Optional[str]:
return None
class NetRCInfo:
def __init__(self, files: typing.Optional[typing.List[str]] = None) -> None:
if files is None:
files = [os.getenv("NETRC", ""), "~/.netrc", "~/_netrc"]
self.netrc_files = files
@property
def netrc_info(self) -> typing.Optional[netrc.netrc]:
if not hasattr(self, "_netrc_info"):
self._netrc_info = None
for file_path in self.netrc_files:
expanded_path = Path(file_path).expanduser()
try:
if expanded_path.is_file():
self._netrc_info = netrc.netrc(str(expanded_path))
break
except (netrc.NetrcParseError, IOError): # pragma: no cover
# Issue while reading the netrc file, ignore...
pass
return self._netrc_info
def get_credentials(self, host: str) -> typing.Optional[typing.Tuple[str, str]]:
if self.netrc_info is None:
return None
auth_info = self.netrc_info.authenticators(host)
if auth_info is None or auth_info[2] is None:
return None
return (auth_info[0], auth_info[2])
def get_ca_bundle_from_env() -> typing.Optional[str]:
if "SSL_CERT_FILE" in os.environ:
ssl_file = Path(os.environ["SSL_CERT_FILE"])

View File

@ -4,7 +4,9 @@ Integration tests for authentication.
Unit tests for auth classes also exist in tests/test_auth.py
"""
import hashlib
import netrc
import os
import sys
import threading
import typing
from urllib.request import parse_keqv_list
@ -18,6 +20,10 @@ from ..common import FIXTURES_DIR
class App:
"""
A mock app to test auth credentials.
"""
def __init__(self, auth_header: str = "", status_code: int = 200) -> None:
self.auth_header = auth_header
self.status_code = status_code
@ -227,14 +233,18 @@ async def test_custom_auth() -> None:
assert response.json() == {"auth": "Token 123"}
@pytest.mark.anyio
async def test_netrc_auth() -> None:
os.environ["NETRC"] = str(FIXTURES_DIR / ".netrc")
def test_netrc_auth_credentials_exist() -> None:
"""
When netrc auth is being used and a request is made to a host that is
in the netrc file, then the relevant credentials should be applied.
"""
netrc_file = str(FIXTURES_DIR / ".netrc")
url = "http://netrcexample.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
@ -242,42 +252,59 @@ async def test_netrc_auth() -> None:
}
@pytest.mark.anyio
async def test_auth_header_has_priority_over_netrc() -> None:
os.environ["NETRC"] = str(FIXTURES_DIR / ".netrc")
url = "http://netrcexample.org"
def test_netrc_auth_credentials_do_not_exist() -> None:
"""
When netrc auth is being used and a request is made to a host that is
not in the netrc file, then no credentials should be applied.
"""
netrc_file = str(FIXTURES_DIR / ".netrc")
url = "http://example.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
response = await client.get(url, headers={"Authorization": "Override"})
assert response.status_code == 200
assert response.json() == {"auth": "Override"}
@pytest.mark.anyio
async def test_trust_env_auth() -> None:
os.environ["NETRC"] = str(FIXTURES_DIR / ".netrc")
url = "http://netrcexample.org"
app = App()
async with httpx.AsyncClient(
transport=httpx.MockTransport(app), trust_env=False
) as client:
response = await client.get(url)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {"auth": None}
async with httpx.AsyncClient(
transport=httpx.MockTransport(app), trust_env=True
) as client:
response = await client.get(url)
@pytest.mark.skipif(
sys.version_info < (3, 11),
reason="netrc files without a password are invalid with Python < 3.11",
)
def test_netrc_auth_nopassword() -> None: # pragma: no cover
"""
Python has different netrc parsing behaviours with different versions.
For Python 3.11+ a netrc file with no password is valid. In this case
we want to check that we allow the netrc auth, and simply don't provide
any credentials in the request.
"""
netrc_file = str(FIXTURES_DIR / ".netrc-nopassword")
url = "http://example.org"
app = App()
auth = httpx.NetRCAuth(netrc_file)
with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
response = client.get(url)
assert response.status_code == 200
assert response.json() == {
"auth": "Basic ZXhhbXBsZS11c2VybmFtZTpleGFtcGxlLXBhc3N3b3Jk"
}
assert response.json() == {"auth": None}
@pytest.mark.skipif(
sys.version_info >= (3, 11),
reason="netrc files without a password are valid from Python >= 3.11",
)
def test_netrc_auth_nopassword_parse_error() -> None: # pragma: no cover
"""
Python has different netrc parsing behaviours with different versions.
For Python < 3.11 a netrc file with no password is invalid. In this case
we want to allow the parse error to be raised.
"""
netrc_file = str(FIXTURES_DIR / ".netrc-nopassword")
with pytest.raises(netrc.NetrcParseError):
httpx.NetRCAuth(netrc_file)
@pytest.mark.anyio

View File

@ -58,3 +58,11 @@ def test_client_event_hooks():
client = httpx.Client()
client.event_hooks = {"request": [on_request]}
assert client.event_hooks == {"request": [on_request], "response": []}
def test_client_trust_env():
client = httpx.Client()
assert client.trust_env
client = httpx.Client(trust_env=False)
assert not client.trust_env

2
tests/fixtures/.netrc-nopassword vendored Normal file
View File

@ -0,0 +1,2 @@
machine netrcexample.org
login example-username

View File

@ -5,7 +5,6 @@ import pytest
import httpx
from httpx._utils import (
NetRCInfo,
URLPattern,
get_ca_bundle_from_env,
get_environment_proxies,
@ -17,7 +16,7 @@ from httpx._utils import (
)
from tests.utils import override_log_level
from .common import FIXTURES_DIR, TESTS_DIR
from .common import TESTS_DIR
@pytest.mark.parametrize(
@ -56,25 +55,6 @@ def test_guess_by_bom(encoding, expected):
assert guess_json_utf(data) == expected
def test_bad_get_netrc_login():
netrc_info = NetRCInfo([str(FIXTURES_DIR / "does-not-exist")])
assert netrc_info.get_credentials("netrcexample.org") is None
def test_get_netrc_login():
netrc_info = NetRCInfo([str(FIXTURES_DIR / ".netrc")])
expected_credentials = (
"example-username",
"example-password",
)
assert netrc_info.get_credentials("netrcexample.org") == expected_credentials
def test_get_netrc_unknown():
netrc_info = NetRCInfo([str(FIXTURES_DIR / ".netrc")])
assert netrc_info.get_credentials("nonexistent.org") is None
@pytest.mark.parametrize(
"value, expected",
(