PYTHON-5492 Fix handling of MaxTimeMS message (#2484)
This commit is contained in:
parent
2a1523fa85
commit
b32da4b409
@ -75,12 +75,12 @@ from pymongo.errors import (
|
||||
NetworkTimeout,
|
||||
ServerSelectionTimeoutError,
|
||||
)
|
||||
from pymongo.helpers_shared import _get_timeout_details
|
||||
from pymongo.network_layer import async_socket_sendall
|
||||
from pymongo.operations import UpdateOne
|
||||
from pymongo.pool_options import PoolOptions
|
||||
from pymongo.pool_shared import (
|
||||
_async_configured_socket,
|
||||
_get_timeout_details,
|
||||
_raise_connection_failure,
|
||||
)
|
||||
from pymongo.read_concern import ReadConcern
|
||||
|
||||
@ -58,6 +58,7 @@ from pymongo.errors import ( # type:ignore[attr-defined]
|
||||
WaitQueueTimeoutError,
|
||||
)
|
||||
from pymongo.hello import Hello, HelloCompat
|
||||
from pymongo.helpers_shared import _get_timeout_details, format_timeout_details
|
||||
from pymongo.lock import (
|
||||
_async_cond_wait,
|
||||
_async_create_condition,
|
||||
@ -79,9 +80,7 @@ from pymongo.pool_shared import (
|
||||
SSLErrors,
|
||||
_CancellationContext,
|
||||
_configured_protocol_interface,
|
||||
_get_timeout_details,
|
||||
_raise_connection_failure,
|
||||
format_timeout_details,
|
||||
)
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.server_api import _add_to_command
|
||||
|
||||
@ -38,7 +38,6 @@ from pymongo.logger import (
|
||||
_SDAMStatusMessage,
|
||||
)
|
||||
from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query
|
||||
from pymongo.pool_shared import _get_timeout_details, format_timeout_details
|
||||
from pymongo.response import PinnedResponse, Response
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -225,11 +224,7 @@ class Server:
|
||||
if use_cmd:
|
||||
first = docs[0]
|
||||
await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type]
|
||||
# Append timeout details to MaxTimeMSExpired responses.
|
||||
if first.get("code") == 50:
|
||||
timeout_details = _get_timeout_details(conn.opts) # type:ignore[has-type]
|
||||
first["errmsg"] += format_timeout_details(timeout_details) # type:ignore[index]
|
||||
_check_command_response(first, conn.max_wire_version)
|
||||
_check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type]
|
||||
except Exception as exc:
|
||||
duration = datetime.now() - start
|
||||
if isinstance(exc, (NotPrimaryError, OperationFailure)):
|
||||
|
||||
@ -47,6 +47,7 @@ from pymongo.hello import HelloCompat
|
||||
if TYPE_CHECKING:
|
||||
from pymongo.cursor_shared import _Hint
|
||||
from pymongo.operations import _IndexList
|
||||
from pymongo.pool_options import PoolOptions
|
||||
from pymongo.typings import _DocumentOut
|
||||
|
||||
|
||||
@ -108,6 +109,34 @@ _SENSITIVE_COMMANDS: set[str] = {
|
||||
}
|
||||
|
||||
|
||||
def _get_timeout_details(options: PoolOptions) -> dict[str, float]:
|
||||
from pymongo import _csot
|
||||
|
||||
details = {}
|
||||
timeout = _csot.get_timeout()
|
||||
socket_timeout = options.socket_timeout
|
||||
connect_timeout = options.connect_timeout
|
||||
if timeout:
|
||||
details["timeoutMS"] = timeout * 1000
|
||||
if socket_timeout and not timeout:
|
||||
details["socketTimeoutMS"] = socket_timeout * 1000
|
||||
if connect_timeout:
|
||||
details["connectTimeoutMS"] = connect_timeout * 1000
|
||||
return details
|
||||
|
||||
|
||||
def format_timeout_details(details: Optional[dict[str, float]]) -> str:
|
||||
result = ""
|
||||
if details:
|
||||
result += " (configured timeouts:"
|
||||
for timeout in ["socketTimeoutMS", "timeoutMS", "connectTimeoutMS"]:
|
||||
if timeout in details:
|
||||
result += f" {timeout}: {details[timeout]}ms,"
|
||||
result = result[:-1]
|
||||
result += ")"
|
||||
return result
|
||||
|
||||
|
||||
def _gen_index_name(keys: _IndexList) -> str:
|
||||
"""Generate an index name from the set of fields it is over."""
|
||||
return "_".join(["{}_{}".format(*item) for item in keys])
|
||||
@ -188,6 +217,7 @@ def _check_command_response(
|
||||
max_wire_version: Optional[int],
|
||||
allowable_errors: Optional[Container[Union[int, str]]] = None,
|
||||
parse_write_concern_error: bool = False,
|
||||
pool_opts: Optional[PoolOptions] = None,
|
||||
) -> None:
|
||||
"""Check the response to a command for errors."""
|
||||
if "ok" not in response:
|
||||
@ -243,6 +273,10 @@ def _check_command_response(
|
||||
if code in (11000, 11001, 12582):
|
||||
raise DuplicateKeyError(errmsg, code, response, max_wire_version)
|
||||
elif code == 50:
|
||||
# Append timeout details to MaxTimeMSExpired responses.
|
||||
if pool_opts:
|
||||
timeout_details = _get_timeout_details(pool_opts)
|
||||
errmsg += format_timeout_details(timeout_details)
|
||||
raise ExecutionTimeout(errmsg, code, response, max_wire_version)
|
||||
elif code == 43:
|
||||
raise CursorNotFound(errmsg, code, response, max_wire_version)
|
||||
|
||||
@ -36,6 +36,7 @@ from pymongo.errors import ( # type:ignore[attr-defined]
|
||||
NetworkTimeout,
|
||||
_CertificateError,
|
||||
)
|
||||
from pymongo.helpers_shared import _get_timeout_details, format_timeout_details
|
||||
from pymongo.network_layer import AsyncNetworkingInterface, NetworkingInterface, PyMongoProtocol
|
||||
from pymongo.pool_options import PoolOptions
|
||||
from pymongo.ssl_support import PYSSLError, SSLError, _has_sni
|
||||
@ -149,32 +150,6 @@ def _raise_connection_failure(
|
||||
raise AutoReconnect(msg) from error
|
||||
|
||||
|
||||
def _get_timeout_details(options: PoolOptions) -> dict[str, float]:
|
||||
details = {}
|
||||
timeout = _csot.get_timeout()
|
||||
socket_timeout = options.socket_timeout
|
||||
connect_timeout = options.connect_timeout
|
||||
if timeout:
|
||||
details["timeoutMS"] = timeout * 1000
|
||||
if socket_timeout and not timeout:
|
||||
details["socketTimeoutMS"] = socket_timeout * 1000
|
||||
if connect_timeout:
|
||||
details["connectTimeoutMS"] = connect_timeout * 1000
|
||||
return details
|
||||
|
||||
|
||||
def format_timeout_details(details: Optional[dict[str, float]]) -> str:
|
||||
result = ""
|
||||
if details:
|
||||
result += " (configured timeouts:"
|
||||
for timeout in ["socketTimeoutMS", "timeoutMS", "connectTimeoutMS"]:
|
||||
if timeout in details:
|
||||
result += f" {timeout}: {details[timeout]}ms,"
|
||||
result = result[:-1]
|
||||
result += ")"
|
||||
return result
|
||||
|
||||
|
||||
class _CancellationContext:
|
||||
def __init__(self) -> None:
|
||||
self._cancelled = False
|
||||
|
||||
@ -70,12 +70,12 @@ from pymongo.errors import (
|
||||
NetworkTimeout,
|
||||
ServerSelectionTimeoutError,
|
||||
)
|
||||
from pymongo.helpers_shared import _get_timeout_details
|
||||
from pymongo.network_layer import sendall
|
||||
from pymongo.operations import UpdateOne
|
||||
from pymongo.pool_options import PoolOptions
|
||||
from pymongo.pool_shared import (
|
||||
_configured_socket,
|
||||
_get_timeout_details,
|
||||
_raise_connection_failure,
|
||||
)
|
||||
from pymongo.read_concern import ReadConcern
|
||||
|
||||
@ -55,6 +55,7 @@ from pymongo.errors import ( # type:ignore[attr-defined]
|
||||
WaitQueueTimeoutError,
|
||||
)
|
||||
from pymongo.hello import Hello, HelloCompat
|
||||
from pymongo.helpers_shared import _get_timeout_details, format_timeout_details
|
||||
from pymongo.lock import (
|
||||
_cond_wait,
|
||||
_create_condition,
|
||||
@ -76,9 +77,7 @@ from pymongo.pool_shared import (
|
||||
SSLErrors,
|
||||
_CancellationContext,
|
||||
_configured_socket_interface,
|
||||
_get_timeout_details,
|
||||
_raise_connection_failure,
|
||||
format_timeout_details,
|
||||
)
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.server_api import _add_to_command
|
||||
|
||||
@ -37,7 +37,6 @@ from pymongo.logger import (
|
||||
_SDAMStatusMessage,
|
||||
)
|
||||
from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query
|
||||
from pymongo.pool_shared import _get_timeout_details, format_timeout_details
|
||||
from pymongo.response import PinnedResponse, Response
|
||||
from pymongo.synchronous.helpers import _handle_reauth
|
||||
|
||||
@ -225,11 +224,7 @@ class Server:
|
||||
if use_cmd:
|
||||
first = docs[0]
|
||||
operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type]
|
||||
# Append timeout details to MaxTimeMSExpired responses.
|
||||
if first.get("code") == 50:
|
||||
timeout_details = _get_timeout_details(conn.opts) # type:ignore[has-type]
|
||||
first["errmsg"] += format_timeout_details(timeout_details) # type:ignore[index]
|
||||
_check_command_response(first, conn.max_wire_version)
|
||||
_check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type]
|
||||
except Exception as exc:
|
||||
duration = datetime.now() - start
|
||||
if isinstance(exc, (NotPrimaryError, OperationFailure)):
|
||||
|
||||
@ -43,6 +43,7 @@ from test.utils_shared import (
|
||||
|
||||
from bson import decode_all
|
||||
from bson.code import Code
|
||||
from bson.raw_bson import RawBSONDocument
|
||||
from pymongo import ASCENDING, DESCENDING
|
||||
from pymongo.asynchronous.cursor import AsyncCursor, CursorType
|
||||
from pymongo.asynchronous.helpers import anext
|
||||
@ -199,6 +200,21 @@ class TestCursor(AsyncIntegrationTest):
|
||||
finally:
|
||||
await client.admin.command("configureFailPoint", "maxTimeAlwaysTimeOut", mode="off")
|
||||
|
||||
async def test_maxtime_ms_message(self):
|
||||
db = self.db
|
||||
await db.t.insert_one({"x": 1})
|
||||
with self.assertRaises(Exception) as error:
|
||||
await db.t.find_one({"$where": delay(2)}, max_time_ms=1)
|
||||
|
||||
self.assertIn("(configured timeouts: connectTimeoutMS: 20000.0ms", str(error.exception))
|
||||
|
||||
client = await self.async_rs_client(document_class=RawBSONDocument)
|
||||
await client.db.t.insert_one({"x": 1})
|
||||
with self.assertRaises(Exception) as error:
|
||||
await client.db.t.find_one({"$where": delay(2)}, max_time_ms=1)
|
||||
|
||||
self.assertIn("(configured timeouts: connectTimeoutMS: 20000.0ms", str(error.exception))
|
||||
|
||||
async def test_max_await_time_ms(self):
|
||||
db = self.db
|
||||
await db.pymongo_test.drop()
|
||||
|
||||
@ -43,6 +43,7 @@ from test.utils_shared import (
|
||||
|
||||
from bson import decode_all
|
||||
from bson.code import Code
|
||||
from bson.raw_bson import RawBSONDocument
|
||||
from pymongo import ASCENDING, DESCENDING
|
||||
from pymongo.collation import Collation
|
||||
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure, PyMongoError
|
||||
@ -197,6 +198,21 @@ class TestCursor(IntegrationTest):
|
||||
finally:
|
||||
client.admin.command("configureFailPoint", "maxTimeAlwaysTimeOut", mode="off")
|
||||
|
||||
def test_maxtime_ms_message(self):
|
||||
db = self.db
|
||||
db.t.insert_one({"x": 1})
|
||||
with self.assertRaises(Exception) as error:
|
||||
db.t.find_one({"$where": delay(2)}, max_time_ms=1)
|
||||
|
||||
self.assertIn("(configured timeouts: connectTimeoutMS: 20000.0ms", str(error.exception))
|
||||
|
||||
client = self.rs_client(document_class=RawBSONDocument)
|
||||
client.db.t.insert_one({"x": 1})
|
||||
with self.assertRaises(Exception) as error:
|
||||
client.db.t.find_one({"$where": delay(2)}, max_time_ms=1)
|
||||
|
||||
self.assertIn("(configured timeouts: connectTimeoutMS: 20000.0ms", str(error.exception))
|
||||
|
||||
def test_max_await_time_ms(self):
|
||||
db = self.db
|
||||
db.pymongo_test.drop()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user