Server connection handling. (#3672)

This commit is contained in:
Kim Christie 2025-09-19 12:03:13 +01:00 committed by GitHub
parent 4acf5c2c37
commit 68989ae47d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 62 additions and 27 deletions

View File

@ -375,13 +375,13 @@ class HTTPParser:
self.recv_state = State.DONE
return body
async def complete(self):
async def reset(self) -> bool:
is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE
is_keepalive = self.send_keep_alive and self.recv_keep_alive
if not (is_fully_complete and is_keepalive):
await self.close()
return
return False
if self.mode == Mode.CLIENT:
self.send_state = State.SEND_METHOD_LINE
@ -397,6 +397,7 @@ class HTTPParser:
self.send_keep_alive = True
self.recv_keep_alive = True
self.processing_1xx = False
return True
async def close(self):
if self.send_state != State.CLOSED:
@ -404,6 +405,13 @@ class HTTPParser:
self.recv_state = State.CLOSED
await self.stream.close()
def is_keepalive(self) -> bool:
return (
self.send_keep_alive and
self.recv_keep_alive and
self.send_state != State.CLOSED
)
def is_idle(self) -> bool:
return (
self.send_state == State.SEND_METHOD_LINE or

View File

@ -170,7 +170,7 @@ class Connection(Transport):
await self._send_head(request)
await self._send_body(request)
code, headers = await self._recv_head()
stream = HTTPStream(self._recv_body, self._complete)
stream = HTTPStream(self._recv_body, self._reset)
# TODO...
return Response(code, headers=headers, content=stream)
# finally:
@ -235,9 +235,9 @@ class Connection(Transport):
async def _recv_body(self) -> bytes:
return await self._parser.recv_body()
# Request/response cycle complete...
async def _complete(self) -> None:
await self._parser.complete()
# Request/response cycle reset...
async def _reset(self) -> None:
await self._parser.reset()
self._idle_expiry = time.monotonic() + self._keepalive_duration
async def _close(self) -> None:

View File

@ -33,7 +33,7 @@ class HTTPConnection:
try:
while not self._parser.is_closed():
method, url, headers = await self._recv_head()
stream = HTTPStream(self._recv_body, self._complete)
stream = HTTPStream(self._recv_body, self._reset)
# TODO: Handle endpoint exceptions
async with Request(method, url, headers=headers, content=stream) as request:
try:
@ -43,12 +43,15 @@ class HTTPConnection:
except Exception:
logger.error("Internal Server Error", exc_info=True)
content = Text("Internal Server Error")
err = Response(code=500, content=content)
err = Response(500, content=content)
await self._send_head(err)
await self._send_body(err)
else:
await self._send_head(response)
await self._send_body(response)
if self._parser.is_keepalive():
await stream.read()
await self._reset()
except Exception:
logger.error("Internal Server Error", exc_info=True)
@ -88,8 +91,8 @@ class HTTPConnection:
await self._parser.send_body(b'')
# Start it all over again...
async def _complete(self):
await self._parser.complete
async def _reset(self):
await self._parser.reset()
self._idle_expiry = time.monotonic() + self._keepalive_duration

View File

@ -83,6 +83,9 @@ class NetworkStream(Stream):
self._is_closed = True
self._socket.close()
def is_closed(self) -> bool:
return self._is_closed
def __repr__(self):
description = ""
description += " TLS" if self._is_tls else ""
@ -160,7 +163,7 @@ class NetworkServer:
self._max_workers = 5
self._executor = None
self._thread = None
self._streams = list[NetworkStream]
self._streams: list[NetworkStream] = []
@property
def host(self):
@ -177,11 +180,18 @@ class NetworkServer:
def __exit__(self, exc_type, exc_val, exc_tb):
self.listener.close()
for stream in self._streams:
stream.close()
self._executor.shutdown(wait=True)
def _serve(self):
while stream := self.listener.accept():
self._executor.submit(self._handler, stream)
self._streams = [
stream for stream in self._streams
if not stream.is_closed()
]
self._streams.append(stream)
def _handler(self, stream):
try:

View File

@ -375,13 +375,13 @@ class HTTPParser:
self.recv_state = State.DONE
return body
def complete(self):
def reset(self) -> bool:
is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE
is_keepalive = self.send_keep_alive and self.recv_keep_alive
if not (is_fully_complete and is_keepalive):
self.close()
return
return False
if self.mode == Mode.CLIENT:
self.send_state = State.SEND_METHOD_LINE
@ -397,6 +397,7 @@ class HTTPParser:
self.send_keep_alive = True
self.recv_keep_alive = True
self.processing_1xx = False
return True
def close(self):
if self.send_state != State.CLOSED:
@ -404,6 +405,13 @@ class HTTPParser:
self.recv_state = State.CLOSED
self.stream.close()
def is_keepalive(self) -> bool:
return (
self.send_keep_alive and
self.recv_keep_alive and
self.send_state != State.CLOSED
)
def is_idle(self) -> bool:
return (
self.send_state == State.SEND_METHOD_LINE or

View File

@ -170,7 +170,7 @@ class Connection(Transport):
self._send_head(request)
self._send_body(request)
code, headers = self._recv_head()
stream = HTTPStream(self._recv_body, self._complete)
stream = HTTPStream(self._recv_body, self._reset)
# TODO...
return Response(code, headers=headers, content=stream)
# finally:
@ -235,9 +235,9 @@ class Connection(Transport):
def _recv_body(self) -> bytes:
return self._parser.recv_body()
# Request/response cycle complete...
def _complete(self) -> None:
self._parser.complete()
# Request/response cycle reset...
def _reset(self) -> None:
self._parser.reset()
self._idle_expiry = time.monotonic() + self._keepalive_duration
def _close(self) -> None:

View File

@ -33,7 +33,7 @@ class HTTPConnection:
try:
while not self._parser.is_closed():
method, url, headers = self._recv_head()
stream = HTTPStream(self._recv_body, self._complete)
stream = HTTPStream(self._recv_body, self._reset)
# TODO: Handle endpoint exceptions
with Request(method, url, headers=headers, content=stream) as request:
try:
@ -43,12 +43,15 @@ class HTTPConnection:
except Exception:
logger.error("Internal Server Error", exc_info=True)
content = Text("Internal Server Error")
err = Response(code=500, content=content)
err = Response(500, content=content)
self._send_head(err)
self._send_body(err)
else:
self._send_head(response)
self._send_body(response)
if self._parser.is_keepalive():
stream.read()
self._reset()
except Exception:
logger.error("Internal Server Error", exc_info=True)
@ -88,8 +91,8 @@ class HTTPConnection:
self._parser.send_body(b'')
# Start it all over again...
def _complete(self):
self._parser.complete
def _reset(self):
self._parser.reset()
self._idle_expiry = time.monotonic() + self._keepalive_duration
@ -99,7 +102,10 @@ class HTTPServer:
def wait(self):
while(True):
sleep(1)
try:
sleep(1)
except KeyboardInterrupt:
break
@contextlib.contextmanager

View File

@ -67,7 +67,7 @@ def test_parser():
assert terminator == b''
assert not p.is_idle()
p.complete()
p.reset()
assert p.is_idle()
@ -113,7 +113,7 @@ def test_parser_server():
)
assert not p.is_idle()
p.complete()
p.reset()
assert p.is_idle()
@ -315,7 +315,7 @@ def test_parser_repr():
p.recv_body()
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
p.complete()
p.reset()
assert repr(p) == "<HTTPParser [client SEND_METHOD_LINE, server WAIT]>"
@ -554,7 +554,7 @@ def test_client_connection_close():
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
p.complete()
p.reset()
assert repr(p) == "<HTTPParser [client CLOSED, server CLOSED]>"
assert p.is_closed()
@ -591,7 +591,7 @@ def test_server_connection_close():
assert terminator == b""
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
p.complete()
p.reset()
assert repr(p) == "<HTTPParser [client CLOSED, server CLOSED]>"