Simplify ASGI concurrency (#248)
* Drop keyword argument * Improve docstrings for WSGIDispatch and ASGIDispatch * Add docs for fine grained WSGI/ASGI control * Simplify concurrency handling in ASGIDispatch * Variable renaming * Fix missing nonlocal declaration * Split nonlocal onto multiple lines
This commit is contained in:
parent
79425b28d0
commit
2ed70ebc2a
@ -78,7 +78,7 @@ class ASGIDispatch(AsyncDispatcher):
|
||||
app_exc = None
|
||||
status_code = None
|
||||
headers = None
|
||||
response_started = asyncio.Event()
|
||||
response_started_or_failed = asyncio.Event()
|
||||
response_body = BodyIterator()
|
||||
request_stream = request.stream()
|
||||
|
||||
@ -92,19 +92,20 @@ class ASGIDispatch(AsyncDispatcher):
|
||||
return {"type": "http.request", "body": body, "more_body": True}
|
||||
|
||||
async def send(message: dict) -> None:
|
||||
nonlocal status_code, headers, response_started, response_body, request
|
||||
nonlocal status_code, headers, response_started_or_failed
|
||||
nonlocal response_body, request
|
||||
|
||||
if message["type"] == "http.response.start":
|
||||
status_code = message["status"]
|
||||
headers = message.get("headers", [])
|
||||
response_started.set()
|
||||
response_started_or_failed.set()
|
||||
elif message["type"] == "http.response.body":
|
||||
body = message.get("body", b"")
|
||||
more_body = message.get("more_body", False)
|
||||
if body and request.method != "HEAD":
|
||||
await response_body.put(body)
|
||||
if not more_body:
|
||||
await response_body.done()
|
||||
await response_body.mark_as_done()
|
||||
|
||||
async def run_app() -> None:
|
||||
nonlocal app, scope, receive, send, app_exc, response_body
|
||||
@ -113,7 +114,8 @@ class ASGIDispatch(AsyncDispatcher):
|
||||
except Exception as exc:
|
||||
app_exc = exc
|
||||
finally:
|
||||
await response_body.done()
|
||||
await response_body.mark_as_done()
|
||||
response_started_or_failed.set()
|
||||
|
||||
# Really we'd like to push all `asyncio` logic into concurrency.py,
|
||||
# with a standardized interface, so that we can support other event
|
||||
@ -122,17 +124,13 @@ class ASGIDispatch(AsyncDispatcher):
|
||||
# `ConcurrencyBackend` with the `Client(app=asgi_app)` case.
|
||||
loop = asyncio.get_event_loop()
|
||||
app_task = loop.create_task(run_app())
|
||||
response_task = loop.create_task(response_started.wait())
|
||||
|
||||
tasks = {app_task, response_task} # type: typing.Set[asyncio.Task]
|
||||
|
||||
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
await response_started_or_failed.wait()
|
||||
|
||||
if app_exc is not None and self.raise_app_exceptions:
|
||||
raise app_exc
|
||||
|
||||
assert response_started.is_set(), "application did not return a response."
|
||||
assert status_code is not None
|
||||
assert status_code is not None, "application did not return a response."
|
||||
assert headers is not None
|
||||
|
||||
async def on_close() -> None:
|
||||
@ -189,7 +187,7 @@ class BodyIterator:
|
||||
"""
|
||||
await self._queue.put(data)
|
||||
|
||||
async def done(self) -> None:
|
||||
async def mark_as_done(self) -> None:
|
||||
"""
|
||||
Used by the server to signal the end of the response body.
|
||||
"""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user