PYTHON-5369 - Re-raise socket.timeout errors if the deadline has alre… (#2326)
This commit is contained in:
parent
2655bb4d86
commit
aa41e70523
@ -131,6 +131,7 @@ class AsyncConnection:
|
||||
:param pool: a Pool instance
|
||||
:param address: the server's (host, port)
|
||||
:param id: the id of this socket in it's pool
|
||||
:param is_sdam: SDAM connections do not call hello on creation
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -139,11 +140,13 @@ class AsyncConnection:
|
||||
pool: Pool,
|
||||
address: tuple[str, int],
|
||||
id: int,
|
||||
is_sdam: bool,
|
||||
):
|
||||
self.pool_ref = weakref.ref(pool)
|
||||
self.conn = conn
|
||||
self.address = address
|
||||
self.id = id
|
||||
self.is_sdam = is_sdam
|
||||
self.closed = False
|
||||
self.last_checkin_time = time.monotonic()
|
||||
self.performed_handshake = False
|
||||
@ -711,13 +714,13 @@ class Pool:
|
||||
self,
|
||||
address: _Address,
|
||||
options: PoolOptions,
|
||||
handshake: bool = True,
|
||||
is_sdam: bool = False,
|
||||
client_id: Optional[ObjectId] = None,
|
||||
):
|
||||
"""
|
||||
:param address: a (hostname, port) tuple
|
||||
:param options: a PoolOptions instance
|
||||
:param handshake: whether to call hello for each new AsyncConnection
|
||||
:param is_sdam: whether to call hello for each new AsyncConnection
|
||||
"""
|
||||
if options.pause_enabled:
|
||||
self.state = PoolState.PAUSED
|
||||
@ -746,14 +749,14 @@ class Pool:
|
||||
self.pid = os.getpid()
|
||||
self.address = address
|
||||
self.opts = options
|
||||
self.handshake = handshake
|
||||
self.is_sdam = is_sdam
|
||||
# Don't publish events or logs in Monitor pools.
|
||||
self.enabled_for_cmap = (
|
||||
self.handshake
|
||||
not self.is_sdam
|
||||
and self.opts._event_listeners is not None
|
||||
and self.opts._event_listeners.enabled_for_cmap
|
||||
)
|
||||
self.enabled_for_logging = self.handshake
|
||||
self.enabled_for_logging = not self.is_sdam
|
||||
|
||||
# The first portion of the wait queue.
|
||||
# Enforces: maxPoolSize
|
||||
@ -1058,14 +1061,14 @@ class Pool:
|
||||
|
||||
raise
|
||||
|
||||
conn = AsyncConnection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type]
|
||||
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
|
||||
async with self.lock:
|
||||
self.active_contexts.add(conn.cancel_context)
|
||||
self.active_contexts.discard(tmp_context)
|
||||
if tmp_context.cancelled:
|
||||
conn.cancel_context.cancel()
|
||||
try:
|
||||
if self.handshake:
|
||||
if not self.is_sdam:
|
||||
await conn.hello()
|
||||
self.is_writable = conn.is_writable
|
||||
if handler:
|
||||
|
||||
@ -985,7 +985,7 @@ class Topology:
|
||||
)
|
||||
|
||||
return self._settings.pool_class(
|
||||
address, monitor_pool_options, handshake=False, client_id=self._topology_id
|
||||
address, monitor_pool_options, is_sdam=True, client_id=self._topology_id
|
||||
)
|
||||
|
||||
def _error_message(self, selector: Callable[[Selection], Selection]) -> str:
|
||||
|
||||
@ -357,7 +357,12 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me
|
||||
except socket.timeout:
|
||||
if conn.cancel_context.cancelled:
|
||||
raise _OperationCancelled("operation cancelled") from None
|
||||
if _PYPY:
|
||||
if (
|
||||
_PYPY
|
||||
or not conn.is_sdam
|
||||
and deadline is not None
|
||||
and deadline - time.monotonic() < 0
|
||||
):
|
||||
# We reached the true deadline.
|
||||
raise
|
||||
continue
|
||||
|
||||
@ -131,6 +131,7 @@ class Connection:
|
||||
:param pool: a Pool instance
|
||||
:param address: the server's (host, port)
|
||||
:param id: the id of this socket in it's pool
|
||||
:param is_sdam: SDAM connections do not call hello on creation
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -139,11 +140,13 @@ class Connection:
|
||||
pool: Pool,
|
||||
address: tuple[str, int],
|
||||
id: int,
|
||||
is_sdam: bool,
|
||||
):
|
||||
self.pool_ref = weakref.ref(pool)
|
||||
self.conn = conn
|
||||
self.address = address
|
||||
self.id = id
|
||||
self.is_sdam = is_sdam
|
||||
self.closed = False
|
||||
self.last_checkin_time = time.monotonic()
|
||||
self.performed_handshake = False
|
||||
@ -709,13 +712,13 @@ class Pool:
|
||||
self,
|
||||
address: _Address,
|
||||
options: PoolOptions,
|
||||
handshake: bool = True,
|
||||
is_sdam: bool = False,
|
||||
client_id: Optional[ObjectId] = None,
|
||||
):
|
||||
"""
|
||||
:param address: a (hostname, port) tuple
|
||||
:param options: a PoolOptions instance
|
||||
:param handshake: whether to call hello for each new Connection
|
||||
:param is_sdam: whether to call hello for each new Connection
|
||||
"""
|
||||
if options.pause_enabled:
|
||||
self.state = PoolState.PAUSED
|
||||
@ -744,14 +747,14 @@ class Pool:
|
||||
self.pid = os.getpid()
|
||||
self.address = address
|
||||
self.opts = options
|
||||
self.handshake = handshake
|
||||
self.is_sdam = is_sdam
|
||||
# Don't publish events or logs in Monitor pools.
|
||||
self.enabled_for_cmap = (
|
||||
self.handshake
|
||||
not self.is_sdam
|
||||
and self.opts._event_listeners is not None
|
||||
and self.opts._event_listeners.enabled_for_cmap
|
||||
)
|
||||
self.enabled_for_logging = self.handshake
|
||||
self.enabled_for_logging = not self.is_sdam
|
||||
|
||||
# The first portion of the wait queue.
|
||||
# Enforces: maxPoolSize
|
||||
@ -1054,14 +1057,14 @@ class Pool:
|
||||
|
||||
raise
|
||||
|
||||
conn = Connection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type]
|
||||
conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
|
||||
with self.lock:
|
||||
self.active_contexts.add(conn.cancel_context)
|
||||
self.active_contexts.discard(tmp_context)
|
||||
if tmp_context.cancelled:
|
||||
conn.cancel_context.cancel()
|
||||
try:
|
||||
if self.handshake:
|
||||
if not self.is_sdam:
|
||||
conn.hello()
|
||||
self.is_writable = conn.is_writable
|
||||
if handler:
|
||||
|
||||
@ -983,7 +983,7 @@ class Topology:
|
||||
)
|
||||
|
||||
return self._settings.pool_class(
|
||||
address, monitor_pool_options, handshake=False, client_id=self._topology_id
|
||||
address, monitor_pool_options, is_sdam=True, client_id=self._topology_id
|
||||
)
|
||||
|
||||
def _error_message(self, selector: Callable[[Selection], Selection]) -> str:
|
||||
|
||||
@ -159,6 +159,7 @@ class AsyncMockConnection:
|
||||
self.cancel_context = _CancellationContext()
|
||||
self.more_to_come = False
|
||||
self.id = random.randint(0, 100)
|
||||
self.is_sdam = False
|
||||
self.server_connection_id = random.randint(0, 100)
|
||||
|
||||
def close_conn(self, reason):
|
||||
@ -172,7 +173,7 @@ class AsyncMockConnection:
|
||||
|
||||
|
||||
class AsyncMockPool:
|
||||
def __init__(self, address, options, handshake=True, client_id=None):
|
||||
def __init__(self, address, options, is_sdam=False, client_id=None):
|
||||
self.gen = _PoolGeneration()
|
||||
self._lock = _async_create_lock()
|
||||
self.opts = options
|
||||
|
||||
@ -121,7 +121,7 @@ class TestTopologyConfiguration(TopologyTest):
|
||||
self.assertEqual(1, monitor._pool.opts.socket_timeout)
|
||||
|
||||
# The monitor, not its pool, is responsible for calling hello.
|
||||
self.assertFalse(monitor._pool.handshake)
|
||||
self.assertTrue(monitor._pool.is_sdam)
|
||||
|
||||
|
||||
class TestSingleServerTopology(TopologyTest):
|
||||
|
||||
@ -157,6 +157,7 @@ class MockConnection:
|
||||
self.cancel_context = _CancellationContext()
|
||||
self.more_to_come = False
|
||||
self.id = random.randint(0, 100)
|
||||
self.is_sdam = False
|
||||
self.server_connection_id = random.randint(0, 100)
|
||||
|
||||
def close_conn(self, reason):
|
||||
@ -170,7 +171,7 @@ class MockConnection:
|
||||
|
||||
|
||||
class MockPool:
|
||||
def __init__(self, address, options, handshake=True, client_id=None):
|
||||
def __init__(self, address, options, is_sdam=False, client_id=None):
|
||||
self.gen = _PoolGeneration()
|
||||
self._lock = _create_lock()
|
||||
self.opts = options
|
||||
|
||||
Loading…
Reference in New Issue
Block a user