PYTHON-3362 Ignore wtimeout when timeoutMS or timeout() is configured (#1013)

Apply client timeoutMS to gridfs operations.
This commit is contained in:
Shane Harvey 2022-07-19 17:46:09 -07:00 committed by GitHub
parent db3f2dca05
commit 935f926bd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 133 additions and 87 deletions

View File

@ -33,7 +33,7 @@ from gridfs.grid_file import (
_clear_entity_type_registry,
_disallow_transactions,
)
from pymongo import ASCENDING, DESCENDING
from pymongo import ASCENDING, DESCENDING, _csot
from pymongo.client_session import ClientSession
from pymongo.collection import Collection
from pymongo.common import validate_string
@ -514,6 +514,7 @@ class GridFSBucket(object):
)
self._chunk_size_bytes = chunk_size_bytes
self._timeout = db.client.options.timeout
def open_upload_stream(
self,
@ -631,6 +632,7 @@ class GridFSBucket(object):
return GridIn(self._collection, session=session, **opts)
@_csot.apply
def upload_from_stream(
self,
filename: str,
@ -679,6 +681,7 @@ class GridFSBucket(object):
return cast(ObjectId, gin._id)
@_csot.apply
def upload_from_stream_with_id(
self,
file_id: Any,
@ -762,6 +765,7 @@ class GridFSBucket(object):
gout._ensure_file()
return gout
@_csot.apply
def download_to_stream(
self, file_id: Any, destination: Any, session: Optional[ClientSession] = None
) -> None:
@ -795,6 +799,7 @@ class GridFSBucket(object):
for chunk in gout:
destination.write(chunk)
@_csot.apply
def delete(self, file_id: Any, session: Optional[ClientSession] = None) -> None:
"""Given an file_id, delete this stored file's files collection document
and associated chunks from a GridFS bucket.
@ -926,6 +931,7 @@ class GridFSBucket(object):
except StopIteration:
raise NoFile("no version %d for filename %r" % (revision, filename))
@_csot.apply
def download_to_stream_by_name(
self,
filename: str,

View File

@ -17,7 +17,9 @@
import functools
import time
from contextvars import ContextVar, Token
from typing import Any, Callable, Optional, Tuple, TypeVar, cast
from typing import Any, Callable, MutableMapping, Optional, Tuple, TypeVar, cast
from pymongo.write_concern import WriteConcern
TIMEOUT: ContextVar[Optional[float]] = ContextVar("TIMEOUT", default=None)
RTT: ContextVar[float] = ContextVar("RTT", default=0.0)
@ -103,3 +105,14 @@ def apply(func: F) -> F:
return func(self, *args, **kwargs)
return cast(F, csot_wrapper)
def apply_write_concern(cmd: MutableMapping, write_concern: Optional[WriteConcern]) -> None:
"""Apply the given write concern to a command."""
if not write_concern or write_concern.is_server_default:
return
wc = write_concern.document
if get_timeout() is not None:
wc.pop("wtimeout", None)
if wc:
cmd["writeConcern"] = wc

View File

@ -23,7 +23,7 @@ from typing import Any, NoReturn
from bson.objectid import ObjectId
from bson.raw_bson import RawBSONDocument
from bson.son import SON
from pymongo import common
from pymongo import _csot, common
from pymongo.client_session import _validate_session_write_concern
from pymongo.collation import validate_collation_or_none
from pymongo.common import (
@ -315,8 +315,7 @@ class _Bulk(object):
cmd = SON([(cmd_name, self.collection.name), ("ordered", self.ordered)])
if self.comment:
cmd["comment"] = self.comment
if not write_concern.is_server_default:
cmd["writeConcern"] = write_concern.document
_csot.apply_write_concern(cmd, write_concern)
if self.bypass_doc_val:
cmd["bypassDocumentValidation"] = True
if self.let is not None and run.op_type in (_DELETE, _UPDATE):

View File

@ -542,8 +542,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
command = SON([("insert", self.name), ("ordered", ordered), ("documents", [doc])])
if comment is not None:
command["comment"] = comment
if not write_concern.is_server_default:
command["writeConcern"] = write_concern.document
def _insert_command(session, sock_info, retryable_write):
if bypass_doc_val:
@ -756,8 +754,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
if let is not None:
common.validate_is_mapping("let", let)
command["let"] = let
if not write_concern.is_server_default:
command["writeConcern"] = write_concern.document
if comment is not None:
command["comment"] = comment
@ -1232,8 +1228,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
hint = helpers._index_document(hint)
delete_doc["hint"] = hint
command = SON([("delete", self.name), ("ordered", ordered), ("deletes", [delete_doc])])
if not write_concern.is_server_default:
command["writeConcern"] = write_concern.document
if let is not None:
common.validate_is_document_type("let", let)
@ -2820,8 +2814,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
"Must be connected to MongoDB 4.4+ to use hint on unacknowledged find and modify commands."
)
cmd["hint"] = hint
if not write_concern.is_server_default:
cmd["writeConcern"] = write_concern.document
out = self._command(
sock_info,
cmd,

View File

@ -118,9 +118,8 @@ def command(
# Support CSOT
if client:
sock_info.apply_timeout(client, spec, write_concern)
elif write_concern and not write_concern.is_server_default:
spec["writeConcern"] = write_concern.document
sock_info.apply_timeout(client, spec)
_csot.apply_write_concern(spec, write_concern)
if use_op_msg:
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0

View File

@ -569,16 +569,13 @@ class SocketInfo(object):
self.last_timeout = timeout
self.sock.settimeout(timeout)
def apply_timeout(self, client, cmd, write_concern=None):
def apply_timeout(self, client, cmd):
# CSOT: use remaining timeout when set.
timeout = _csot.remaining()
if timeout is None:
# Reset the socket timeout unless we're performing a streaming monitor check.
if not self.more_to_come:
self.set_socket_timeout(self.opts.socket_timeout)
if cmd and write_concern and not write_concern.is_server_default:
cmd["writeConcern"] = write_concern.document
return None
# RTT validation.
rtt = _csot.get_rtt()
@ -593,10 +590,6 @@ class SocketInfo(object):
)
if cmd is not None:
cmd["maxTimeMS"] = int(max_time_ms * 1000)
wc = write_concern.document if write_concern else {}
wc.pop("wtimeout", None)
if wc:
cmd["writeConcern"] = wc
self.set_socket_timeout(timeout)
return timeout

View File

@ -17,7 +17,7 @@
"client": {
"id": "client",
"uriOptions": {
"timeoutMS": 50
"timeoutMS": 75
},
"useMultipleMongoses": false,
"observeEvents": [
@ -62,13 +62,12 @@
"_id": {
"$oid": "000000000000000000000005"
},
"length": 10,
"length": 8,
"chunkSize": 4,
"uploadDate": {
"$date": "1970-01-01T00:00:00.000Z"
},
"md5": "57d83cd477bfb1ccd975ab33d827a92b",
"filename": "length-10",
"filename": "length-8",
"contentType": "application/octet-stream",
"aliases": [],
"metadata": {}
@ -93,6 +92,21 @@
"subType": "00"
}
}
},
{
"_id": {
"$oid": "000000000000000000000006"
},
"files_id": {
"$oid": "000000000000000000000005"
},
"n": 1,
"data": {
"$binary": {
"base64": "ESIzRA==",
"subType": "00"
}
}
}
]
}
@ -116,7 +130,7 @@
"update"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -129,7 +143,7 @@
"$oid": "000000000000000000000005"
},
"newFilename": "foo",
"timeoutMS": 100
"timeoutMS": 2000
}
}
],
@ -174,7 +188,7 @@
"update"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -234,7 +248,7 @@
"drop"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -243,7 +257,7 @@
"name": "drop",
"object": "bucket",
"arguments": {
"timeoutMS": 100
"timeoutMS": 2000
}
}
]
@ -266,7 +280,7 @@
"drop"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -320,7 +334,7 @@
"drop"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}

View File

@ -17,7 +17,7 @@
"client": {
"id": "client",
"uriOptions": {
"timeoutMS": 50
"timeoutMS": 75
},
"useMultipleMongoses": false,
"observeEvents": [
@ -62,13 +62,12 @@
"_id": {
"$oid": "000000000000000000000005"
},
"length": 10,
"length": 8,
"chunkSize": 4,
"uploadDate": {
"$date": "1970-01-01T00:00:00.000Z"
},
"md5": "57d83cd477bfb1ccd975ab33d827a92b",
"filename": "length-10",
"filename": "length-8",
"contentType": "application/octet-stream",
"aliases": [],
"metadata": {}
@ -93,6 +92,21 @@
"subType": "00"
}
}
},
{
"_id": {
"$oid": "000000000000000000000006"
},
"files_id": {
"$oid": "000000000000000000000005"
},
"n": 1,
"data": {
"$binary": {
"base64": "ESIzRA==",
"subType": "00"
}
}
}
]
}
@ -116,7 +130,7 @@
"delete"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -128,7 +142,7 @@
"id": {
"$oid": "000000000000000000000005"
},
"timeoutMS": 100
"timeoutMS": 1000
}
}
]
@ -151,7 +165,7 @@
"delete"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -210,7 +224,7 @@
"delete"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -247,7 +261,7 @@
"delete"
],
"blockConnection": true,
"blockTimeMS": 30
"blockTimeMS": 50
}
}
}

View File

@ -17,7 +17,7 @@
"client": {
"id": "client",
"uriOptions": {
"timeoutMS": 50
"timeoutMS": 75
},
"useMultipleMongoses": false,
"observeEvents": [
@ -62,13 +62,12 @@
"_id": {
"$oid": "000000000000000000000005"
},
"length": 10,
"length": 8,
"chunkSize": 4,
"uploadDate": {
"$date": "1970-01-01T00:00:00.000Z"
},
"md5": "57d83cd477bfb1ccd975ab33d827a92b",
"filename": "length-10",
"filename": "length-8",
"contentType": "application/octet-stream",
"aliases": [],
"metadata": {}
@ -93,6 +92,21 @@
"subType": "00"
}
}
},
{
"_id": {
"$oid": "000000000000000000000006"
},
"files_id": {
"$oid": "000000000000000000000005"
},
"n": 1,
"data": {
"$binary": {
"base64": "ESIzRA==",
"subType": "00"
}
}
}
]
}
@ -116,7 +130,7 @@
"find"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -128,7 +142,7 @@
"id": {
"$oid": "000000000000000000000005"
},
"timeoutMS": 100
"timeoutMS": 1000
}
}
]
@ -151,7 +165,7 @@
"find"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -210,7 +224,7 @@
"find"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -284,7 +298,7 @@
"find"
],
"blockConnection": true,
"blockTimeMS": 30
"blockTimeMS": 50
}
}
}

View File

@ -17,7 +17,7 @@
"client": {
"id": "client",
"uriOptions": {
"timeoutMS": 50
"timeoutMS": 75
},
"useMultipleMongoses": false,
"observeEvents": [
@ -84,7 +84,7 @@
"find"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -94,7 +94,7 @@
"object": "bucket",
"arguments": {
"filter": {},
"timeoutMS": 100
"timeoutMS": 1000
}
}
],
@ -139,7 +139,7 @@
"find"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}

View File

@ -17,7 +17,7 @@
"client": {
"id": "client",
"uriOptions": {
"timeoutMS": 50
"timeoutMS": 75
},
"useMultipleMongoses": false
}
@ -81,7 +81,7 @@
"find"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -117,7 +117,7 @@
"find"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -155,7 +155,7 @@
"listIndexes"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -193,7 +193,7 @@
"createIndexes"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -231,7 +231,7 @@
"listIndexes"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -269,7 +269,7 @@
"createIndexes"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -307,7 +307,7 @@
"insert"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -345,7 +345,7 @@
"insert"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
@ -384,7 +384,7 @@
"listIndexes"
],
"blockConnection": true,
"blockTimeMS": 30
"blockTimeMS": 50
}
}
}

View File

@ -52,7 +52,7 @@ from test.utils import (
snake_to_camel,
)
from test.version import Version
from typing import Any
from typing import Any, List
import pymongo
from bson import SON, Code, DBRef, Decimal128, Int64, MaxKey, MinKey, json_util
@ -60,8 +60,8 @@ from bson.binary import Binary
from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.objectid import ObjectId
from bson.regex import RE_TYPE, Regex
from gridfs import GridFSBucket
from pymongo import ASCENDING, MongoClient
from gridfs import GridFSBucket, GridOut
from pymongo import ASCENDING, MongoClient, _csot
from pymongo.change_stream import ChangeStream
from pymongo.client_session import ClientSession, TransactionOptions, _TxnState
from pymongo.collection import Collection
@ -460,7 +460,17 @@ class EntityMapUtil(object):
elif entity_type == "bucket":
db = self[spec["database"]]
kwargs = parse_spec_options(spec.get("bucketOptions", {}).copy())
self[spec["id"]] = GridFSBucket(db, **kwargs)
bucket = GridFSBucket(db, **kwargs)
# PyMongo does not support GridFSBucket.drop(), emulate it.
@_csot.apply
def drop(self: GridFSBucket, *args: Any, **kwargs: Any) -> None:
self._files.drop(*args, **kwargs)
self._chunks.drop(*args, **kwargs)
if not hasattr(bucket, "drop"):
bucket.drop = drop.__get__(bucket)
self[spec["id"]] = bucket
return
elif entity_type == "clientEncryption":
opts = camel_to_snake_args(spec["clientEncryptionOpts"].copy())
@ -871,8 +881,11 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
or "Dirty implicit session is discarded" in spec["description"]
):
self.skipTest("MMAPv1 does not support retryWrites=True")
elif "Client side error in command starting transaction" in spec["description"]:
if "Client side error in command starting transaction" in spec["description"]:
self.skipTest("Implement PYTHON-1894")
if "timeoutMS applied to entire download" in spec["description"]:
self.skipTest("PyMongo's open_download_stream does not cap the stream's lifetime")
class_name = self.__class__.__name__.lower()
description = spec["description"].lower()
if "csot" in class_name:
@ -914,17 +927,6 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
self.skipTest("PyMongo does not support modifyCollection")
if "timeoutMode" in op.get("arguments", {}):
self.skipTest("PyMongo does not support timeoutMode")
if "csot" in class_name:
if "bucket" in op["object"]:
self.skipTest("CSOT not implemented for GridFS")
if name == "createEntities":
self.maybe_skip_entity(op.get("arguments", {}).get("entities", []))
def maybe_skip_entity(self, entities):
for entity in entities:
entity_type = next(iter(entity))
if entity_type == "bucket":
self.skipTest("GridFS is not currently supported (PYTHON-2459)")
def process_error(self, exception, spec):
is_error = spec.get("isError")
@ -1145,10 +1147,10 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
kwargs.setdefault("metadata", {})["contentType"] = kwargs.pop("content_type")
return target.upload_from_stream_with_id(*args, **kwargs)
def _bucketOperation_drop(self, target: GridFSBucket, *args: Any, **kwargs: Any) -> None:
# PyMongo does not support GridFSBucket.drop(), emulate it.
target._files.drop(*args, **kwargs)
target._chunks.drop(*args, **kwargs)
def _bucketOperation_find(
self, target: GridFSBucket, *args: Any, **kwargs: Any
) -> List[GridOut]:
return list(target.find(*args, **kwargs))
def run_entity_operation(self, spec):
target = self.entity_map[spec["object"]]