httpx/tests/test_api.py
Tom Christie 0cbf3c7581
Sync or Async dispatch (#83)
* Support thread-pooled dispatch

* Add ConcurrencyBackend.run

* Initial work towards support byte-iterators on sync request data

* Test case for byte iterator content

* byte iterator support for RequestData

* Add BaseResponse

* Bridge sync/async data in SyncResponse

* Add BaseClient

* SyncResponse -> Response

* Tweaking type annotation

* Distinct classes for Request, AsyncRequest

* Tweak is_streaming, content in BaseRequest

* Stream handling moves to client

* Handle mediating to AsyncResponse from a standard sync Dispatcher class

* Working on thread-pooled dispatcher

* Support threaded dispatch, inc. streaming requests/responses

* Increase test coverage

* Coverage and tweaks

* Include Accept and User-Agent headers by default
2019-06-10 12:26:03 +01:00

86 lines
2.0 KiB
Python

import asyncio
import functools
import pytest
import httpcore
def threadpool(func):
"""
Our sync tests should run in seperate thread to the uvicorn server.
"""
@functools.wraps(func)
async def wrapped(*args, **kwargs):
nonlocal func
loop = asyncio.get_event_loop()
if kwargs:
func = functools.partial(func, **kwargs)
await loop.run_in_executor(None, func, *args)
return pytest.mark.asyncio(wrapped)
@threadpool
def test_get(server):
response = httpcore.get("http://127.0.0.1:8000/")
assert response.status_code == 200
assert response.reason_phrase == "OK"
assert response.text == "Hello, world!"
@threadpool
def test_post(server):
response = httpcore.post("http://127.0.0.1:8000/", data=b"Hello, world!")
assert response.status_code == 200
assert response.reason_phrase == "OK"
@threadpool
def test_post_byte_iterator(server):
def data():
yield b"Hello"
yield b", "
yield b"world!"
response = httpcore.post("http://127.0.0.1:8000/", data=data())
assert response.status_code == 200
assert response.reason_phrase == "OK"
@threadpool
def test_options(server):
response = httpcore.options("http://127.0.0.1:8000/")
assert response.status_code == 200
assert response.reason_phrase == "OK"
@threadpool
def test_head(server):
response = httpcore.head("http://127.0.0.1:8000/")
assert response.status_code == 200
assert response.reason_phrase == "OK"
@threadpool
def test_put(server):
response = httpcore.put("http://127.0.0.1:8000/", data=b"Hello, world!")
assert response.status_code == 200
assert response.reason_phrase == "OK"
@threadpool
def test_patch(server):
response = httpcore.patch("http://127.0.0.1:8000/", data=b"Hello, world!")
assert response.status_code == 200
assert response.reason_phrase == "OK"
@threadpool
def test_delete(server):
response = httpcore.delete("http://127.0.0.1:8000/")
assert response.status_code == 200
assert response.reason_phrase == "OK"