PYTHON-4740 Convert asyncio.TimeoutError to socket.timeout for compat (#1864)
This commit is contained in:
parent
c136684047
commit
9a71be1615
@ -313,8 +313,6 @@ class _AsyncBulk:
|
||||
if isinstance(exc, (NotPrimaryError, OperationFailure)):
|
||||
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
|
||||
raise
|
||||
finally:
|
||||
bwc.start_time = datetime.datetime.now()
|
||||
return reply # type: ignore[return-value]
|
||||
|
||||
async def unack_write(
|
||||
@ -403,8 +401,6 @@ class _AsyncBulk:
|
||||
assert bwc.start_time is not None
|
||||
bwc._fail(request_id, failure, duration)
|
||||
raise
|
||||
finally:
|
||||
bwc.start_time = datetime.datetime.now()
|
||||
return result # type: ignore[return-value]
|
||||
|
||||
async def _execute_batch_unack(
|
||||
|
||||
@ -319,8 +319,6 @@ class _AsyncClientBulk:
|
||||
await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
|
||||
else:
|
||||
await self.client._process_response({}, bwc.session) # type: ignore[arg-type]
|
||||
finally:
|
||||
bwc.start_time = datetime.datetime.now()
|
||||
return reply # type: ignore[return-value]
|
||||
|
||||
async def unack_write(
|
||||
@ -410,9 +408,7 @@ class _AsyncClientBulk:
|
||||
bwc._fail(request_id, failure, duration)
|
||||
# Top-level error will be embedded in ClientBulkWriteException.
|
||||
reply = {"error": exc}
|
||||
finally:
|
||||
bwc.start_time = datetime.datetime.now()
|
||||
return result # type: ignore[return-value]
|
||||
return reply
|
||||
|
||||
async def _execute_batch_unack(
|
||||
self,
|
||||
|
||||
@ -64,65 +64,69 @@ async def async_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> Non
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
if _HAVE_SSL and isinstance(sock, (SSLSocket, _sslConn)):
|
||||
if sys.platform == "win32":
|
||||
await asyncio.wait_for(_async_sendall_ssl_windows(sock, buf), timeout=timeout)
|
||||
else:
|
||||
await asyncio.wait_for(_async_sendall_ssl(sock, buf, loop), timeout=timeout)
|
||||
await asyncio.wait_for(_async_sendall_ssl(sock, buf, loop), timeout=timeout)
|
||||
else:
|
||||
await asyncio.wait_for(loop.sock_sendall(sock, buf), timeout=timeout) # type: ignore[arg-type]
|
||||
except asyncio.TimeoutError as exc:
|
||||
# Convert the asyncio.wait_for timeout error to socket.timeout which pool.py understands.
|
||||
raise socket.timeout("timed out") from exc
|
||||
finally:
|
||||
sock.settimeout(timeout)
|
||||
|
||||
|
||||
async def _async_sendall_ssl(
|
||||
sock: Union[socket.socket, _sslConn], buf: bytes, loop: AbstractEventLoop
|
||||
) -> None:
|
||||
view = memoryview(buf)
|
||||
fd = sock.fileno()
|
||||
sent = 0
|
||||
if sys.platform != "win32":
|
||||
|
||||
def _is_ready(fut: Future) -> None:
|
||||
loop.remove_writer(fd)
|
||||
loop.remove_reader(fd)
|
||||
if fut.done():
|
||||
return
|
||||
fut.set_result(None)
|
||||
async def _async_sendall_ssl(
|
||||
sock: Union[socket.socket, _sslConn], buf: bytes, loop: AbstractEventLoop
|
||||
) -> None:
|
||||
view = memoryview(buf)
|
||||
fd = sock.fileno()
|
||||
sent = 0
|
||||
|
||||
while sent < len(buf):
|
||||
try:
|
||||
sent += sock.send(view[sent:])
|
||||
except BLOCKING_IO_ERRORS as exc:
|
||||
fd = sock.fileno()
|
||||
# Check for closed socket.
|
||||
if fd == -1:
|
||||
raise SSLError("Underlying socket has been closed") from None
|
||||
if isinstance(exc, BLOCKING_IO_READ_ERROR):
|
||||
fut = loop.create_future()
|
||||
loop.add_reader(fd, _is_ready, fut)
|
||||
await fut
|
||||
if isinstance(exc, BLOCKING_IO_WRITE_ERROR):
|
||||
fut = loop.create_future()
|
||||
loop.add_writer(fd, _is_ready, fut)
|
||||
await fut
|
||||
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR):
|
||||
fut = loop.create_future()
|
||||
loop.add_reader(fd, _is_ready, fut)
|
||||
loop.add_writer(fd, _is_ready, fut)
|
||||
await fut
|
||||
def _is_ready(fut: Future) -> None:
|
||||
loop.remove_writer(fd)
|
||||
loop.remove_reader(fd)
|
||||
if fut.done():
|
||||
return
|
||||
fut.set_result(None)
|
||||
|
||||
|
||||
# The default Windows asyncio event loop does not support loop.add_reader/add_writer: https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support
|
||||
async def _async_sendall_ssl_windows(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
|
||||
view = memoryview(buf)
|
||||
total_length = len(buf)
|
||||
total_sent = 0
|
||||
while total_sent < total_length:
|
||||
try:
|
||||
sent = sock.send(view[total_sent:])
|
||||
except BLOCKING_IO_ERRORS:
|
||||
await asyncio.sleep(0.5)
|
||||
sent = 0
|
||||
total_sent += sent
|
||||
while sent < len(buf):
|
||||
try:
|
||||
sent += sock.send(view[sent:])
|
||||
except BLOCKING_IO_ERRORS as exc:
|
||||
fd = sock.fileno()
|
||||
# Check for closed socket.
|
||||
if fd == -1:
|
||||
raise SSLError("Underlying socket has been closed") from None
|
||||
if isinstance(exc, BLOCKING_IO_READ_ERROR):
|
||||
fut = loop.create_future()
|
||||
loop.add_reader(fd, _is_ready, fut)
|
||||
await fut
|
||||
if isinstance(exc, BLOCKING_IO_WRITE_ERROR):
|
||||
fut = loop.create_future()
|
||||
loop.add_writer(fd, _is_ready, fut)
|
||||
await fut
|
||||
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR):
|
||||
fut = loop.create_future()
|
||||
loop.add_reader(fd, _is_ready, fut)
|
||||
loop.add_writer(fd, _is_ready, fut)
|
||||
await fut
|
||||
else:
|
||||
# The default Windows asyncio event loop does not support loop.add_reader/add_writer:
|
||||
# https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support
|
||||
async def _async_sendall_ssl(
|
||||
sock: Union[socket.socket, _sslConn], buf: bytes, dummy: AbstractEventLoop
|
||||
) -> None:
|
||||
view = memoryview(buf)
|
||||
total_length = len(buf)
|
||||
total_sent = 0
|
||||
while total_sent < total_length:
|
||||
try:
|
||||
sent = sock.send(view[total_sent:])
|
||||
except BLOCKING_IO_ERRORS:
|
||||
await asyncio.sleep(0.5)
|
||||
sent = 0
|
||||
total_sent += sent
|
||||
|
||||
|
||||
def sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
|
||||
|
||||
@ -313,8 +313,6 @@ class _Bulk:
|
||||
if isinstance(exc, (NotPrimaryError, OperationFailure)):
|
||||
client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
|
||||
raise
|
||||
finally:
|
||||
bwc.start_time = datetime.datetime.now()
|
||||
return reply # type: ignore[return-value]
|
||||
|
||||
def unack_write(
|
||||
@ -403,8 +401,6 @@ class _Bulk:
|
||||
assert bwc.start_time is not None
|
||||
bwc._fail(request_id, failure, duration)
|
||||
raise
|
||||
finally:
|
||||
bwc.start_time = datetime.datetime.now()
|
||||
return result # type: ignore[return-value]
|
||||
|
||||
def _execute_batch_unack(
|
||||
|
||||
@ -319,8 +319,6 @@ class _ClientBulk:
|
||||
self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
|
||||
else:
|
||||
self.client._process_response({}, bwc.session) # type: ignore[arg-type]
|
||||
finally:
|
||||
bwc.start_time = datetime.datetime.now()
|
||||
return reply # type: ignore[return-value]
|
||||
|
||||
def unack_write(
|
||||
@ -410,9 +408,7 @@ class _ClientBulk:
|
||||
bwc._fail(request_id, failure, duration)
|
||||
# Top-level error will be embedded in ClientBulkWriteException.
|
||||
reply = {"error": exc}
|
||||
finally:
|
||||
bwc.start_time = datetime.datetime.now()
|
||||
return result # type: ignore[return-value]
|
||||
return reply
|
||||
|
||||
def _execute_batch_unack(
|
||||
self,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user