Add .wait_ready to parser for clean server disconnects (#3690)

This commit is contained in:
Kim Christie 2025-10-13 15:00:49 +01:00 committed by GitHub
parent 20380490fd
commit ae25e86f5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 51 additions and 6 deletions

View File

@ -224,6 +224,13 @@ class HTTPParser:
# Handle body close
self.send_state = State.DONE
async def wait_ready(self) -> bool:
"""
Wait until read data starts arriving, and return `True`.
Return `False` if the stream closes.
"""
return await self.parser.wait_ready()
async def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
"""
Receive the initial request method line:
@ -453,6 +460,15 @@ class ReadAheadParser:
assert self._buffer == b''
self._buffer = buffer
async def wait_ready(self) -> bool:
"""
Attempt a read, and return True if read succeeds or False if the
stream is closed. The data remains in the read buffer.
"""
data = await self._read_some()
self._push_back(data)
return data != b''
async def read(self, size: int) -> bytes:
"""
Read and return up to 'size' bytes from the stream, with I/O buffering provided.

View File

@ -32,9 +32,14 @@ class HTTPConnection:
async def handle_requests(self):
try:
while not self._parser.is_closed():
if not await self._parser.wait_ready():
# Wait until we have read data, or return
# if the stream closes.
return
# Read the initial part of the request,
# and setup a stream for reading the body.
method, url, headers = await self._recv_head()
stream = HTTPStream(self._recv_body, self._reset)
# TODO: Handle endpoint exceptions
async with Request(method, url, headers=headers, content=stream) as request:
try:
response = await self._endpoint(request)
@ -50,7 +55,10 @@ class HTTPConnection:
await self._send_head(response)
await self._send_body(response)
if self._parser.is_keepalive():
# If the client hasn't read the request body to
# completion, then do that here.
await stream.read()
# Either revert to idle, or close the connection.
await self._reset()
except Exception:
logger.error("Internal Server Error", exc_info=True)

View File

@ -224,6 +224,13 @@ class HTTPParser:
# Handle body close
self.send_state = State.DONE
def wait_ready(self) -> bool:
"""
Wait until read data starts arriving, and return `True`.
Return `False` if the stream closes.
"""
return self.parser.wait_ready()
def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
"""
Receive the initial request method line:
@ -453,6 +460,15 @@ class ReadAheadParser:
assert self._buffer == b''
self._buffer = buffer
def wait_ready(self) -> bool:
"""
Attempt a read, and return True if read succeeds or False if the
stream is closed. The data remains in the read buffer.
"""
data = self._read_some()
self._push_back(data)
return data != b''
def read(self, size: int) -> bytes:
"""
Read and return up to 'size' bytes from the stream, with I/O buffering provided.

View File

@ -32,9 +32,14 @@ class HTTPConnection:
def handle_requests(self):
try:
while not self._parser.is_closed():
if not self._parser.wait_ready():
# Wait until we have read data, or return
# if the stream closes.
return
# Read the initial part of the request,
# and setup a stream for reading the body.
method, url, headers = self._recv_head()
stream = HTTPStream(self._recv_body, self._reset)
# TODO: Handle endpoint exceptions
with Request(method, url, headers=headers, content=stream) as request:
try:
response = self._endpoint(request)
@ -50,7 +55,10 @@ class HTTPConnection:
self._send_head(response)
self._send_body(response)
if self._parser.is_keepalive():
# If the client hasn't read the request body to
# completion, then do that here.
stream.read()
# Either revert to idle, or close the connection.
self._reset()
except Exception:
logger.error("Internal Server Error", exc_info=True)
@ -102,10 +110,7 @@ class HTTPServer:
def wait(self):
while(True):
try:
sleep(1)
except KeyboardInterrupt:
break
sleep(1)
@contextlib.contextmanager