Add idna support. More tests.

This commit is contained in:
Tom Christie 2019-05-01 10:58:58 +01:00
parent 302fe93df9
commit b948798414
8 changed files with 71 additions and 20 deletions

View File

@ -98,11 +98,10 @@ class RedirectAdapter(Adapter):
url = URL(location, allow_relative=True)
# Facilitate relative 'location' headers, as allowed by RFC 7231.
# Facilitate relative 'Location' headers, as allowed by RFC 7231.
# (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
# Compliant with RFC3986, we percent encode the url.
if not url.is_absolute:
url = url.resolve_with(request.url.copy_with(fragment=None))
if url.is_relative_url:
url = url.resolve_with(request.url)
# Attach previous fragment if needed (RFC 7231 7.1.2)
if request.url.fragment and not url.fragment:

View File

@ -2,6 +2,7 @@ import cgi
import typing
import chardet
import idna
import rfc3986
from .config import SSLConfig, TimeoutConfig
@ -34,20 +35,30 @@ ByteOrByteStream = typing.Union[bytes, typing.AsyncIterator[bytes]]
class URL:
def __init__(self, url: URLTypes, allow_relative: bool = False) -> None:
if isinstance(url, str):
self.components = rfc3986.api.uri_reference(url).normalize()
elif isinstance(url, rfc3986.uri.URIReference):
if isinstance(url, rfc3986.uri.URIReference):
self.components = url
elif isinstance(url, str):
self.components = rfc3986.api.uri_reference(url)
else:
self.components = url.components
# Handle IDNA domain names.
if self.components.authority:
idna_authority = self.components.authority.encode("idna").decode("ascii")
if idna_authority != self.components.authority:
self.components = self.components.copy_with(authority=idna_authority)
# Normalize schema and domain name.
self.components = self.components.normalize()
# Enforce absolute URLs by default.
if not allow_relative:
if not self.scheme:
raise InvalidURL("No scheme included in URL.")
if self.scheme not in ("http", "https"):
raise InvalidURL('URL scheme must be "http" or "https".')
if not self.host:
raise InvalidURL("No hostname included in URL.")
raise InvalidURL("No host included in URL.")
@property
def scheme(self) -> str:
@ -93,8 +104,20 @@ class URL:
return self.components.scheme == "https"
@property
def is_absolute(self) -> bool:
return self.components.is_absolute()
def is_absolute_url(self) -> bool:
"""
Return `True` for absolute URLs such as 'http://example.com/path',
and `False` for relative URLs such as '/path'.
"""
# We don't use rfc3986's `is_absolute` because it treats
# URLs with a fragment portion as not absolute.
# What we actually care about is if the URL provides
# a scheme and hostname to which connections should be made.
return self.components.scheme and self.components.host
@property
def is_relative_url(self) -> bool:
return not self.is_absolute_url
@property
def origin(self) -> "Origin":
@ -104,9 +127,14 @@ class URL:
return URL(self.components.copy_with(**kwargs))
def resolve_with(self, base_url: URLTypes) -> "URL":
if isinstance(base_url, URL):
base_url = base_url.components
return URL(self.components.resolve_with(base_url))
"""
Return an absolute URL, using base_url as the base.
"""
# We drop any fragment portion, because RFC 3986 strictly
# treats URLs with a fragment portion as not being absolute URLs,
# but we want to treat them as such for the purposes of
base_url = URL(base_url).copy_with(fragment=None)
return URL(self.components.resolve_with(base_url.components))
def __hash__(self) -> int:
return hash(str(self))

View File

@ -29,6 +29,10 @@ class SyncResponse:
def content(self) -> bytes:
return self._response.content
@property
def text(self) -> str:
return self._response.text
def read(self) -> bytes:
return self._loop.run_until_complete(self._response.read())

View File

@ -1,11 +1,12 @@
certifi
chardet
h11
h2
rfc3986
chardet==3.*
h11==0.8.*
h2==3.*
idna==2.*
rfc3986==1.*
# Optional
brotlipy
brotlipy==0.7.*
# Testing

View File

@ -47,7 +47,14 @@ setup(
author_email="tom@tomchristie.com",
packages=get_packages("httpcore"),
data_files=[("", ["LICENSE.md"])],
install_requires=["h11", "h2", "certifi", "chardet", "rfc3986"],
install_requires=[
"certifi",
"chardet==3.*",
"h11==0.8.*",
"h2==3.*",
"idna==2.*",
"rfc3986==1.*"
],
classifiers=[
"Development Status :: 3 - Alpha",
"Environment :: Web Environment",

View File

@ -70,7 +70,10 @@ class MockServer(httpcore.BaseReader, httpcore.BaseWriter):
}
).encode()
response_headers = ((b":status", b"200"),)
response_headers = (
(b":status", b"200"),
(b"content-length", str(len(response_body)).encode()),
)
self.conn.send_headers(stream_id, response_headers)
self.conn.send_data(stream_id, response_body, end_stream=True)
self.buffer += self.conn.data_to_send()

7
tests/models/test_url.py Normal file
View File

@ -0,0 +1,7 @@
from httpcore import URL
def test_idna_url():
url = URL("http://中国.icom.museum:80/")
assert url == URL("http://xn--fiqs8s.icom.museum:80/")
assert url.host == "xn--fiqs8s.icom.museum"

View File

@ -29,6 +29,7 @@ def test_get(server):
response = http.request("GET", "http://127.0.0.1:8000/")
assert response.status_code == 200
assert response.content == b"Hello, world!"
assert response.text == "Hello, world!"
@threadpool
@ -36,6 +37,7 @@ def test_post(server):
with httpcore.SyncConnectionPool() as http:
response = http.request("POST", "http://127.0.0.1:8000/", body=b"Hello, world!")
assert response.status_code == 200
assert response.reason_phrase == "OK"
@threadpool