diff --git a/.evergreen/resync-specs.sh b/.evergreen/resync-specs.sh index 1177ebb04..4f5366098 100755 --- a/.evergreen/resync-specs.sh +++ b/.evergreen/resync-specs.sh @@ -105,6 +105,9 @@ do crud|CRUD) cpjson crud/tests/ crud ;; + csot|CSOT|client-side-operations-timeout) + cpjson client-side-operations-timeout/tests csot + ;; load-balancers|load_balancer) cpjson load-balancers/tests load_balancer ;; @@ -150,6 +153,7 @@ do ;; uri|uri-options|uri_options) cpjson uri-options/tests uri_options + cp "$SPECS"/source/uri-options/tests/*.pem $PYMONGO/test/uri_options ;; stable-api|versioned-api) cpjson versioned-api/tests versioned-api diff --git a/doc/api/pymongo/index.rst b/doc/api/pymongo/index.rst index 6e6e33795..a4e15b987 100644 --- a/doc/api/pymongo/index.rst +++ b/doc/api/pymongo/index.rst @@ -22,6 +22,8 @@ The maximum wire protocol version PyMongo supports. + .. autofunction:: timeout + Sub-modules: .. toctree:: diff --git a/doc/changelog.rst b/doc/changelog.rst index f1085c4bf..5497b4f3e 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -6,6 +6,13 @@ Changes in Version 4.2 .. warning:: PyMongo 4.2 drops support for Python 3.6: Python 3.7+ is now required. +PyMongo 4.2 brings a number of improvements including: + +- Support for MongoDB 6.0. +- Provisional (beta) support for :func:`pymongo.timeout` to apply a single timeout + to an entire block of pymongo operations. +- Beta support for Queryable Encryption with MongoDB 6.0. + Bug fixes ......... diff --git a/pymongo/__init__.py b/pymongo/__init__.py index 17c640b1f..bdb1ec97c 100644 --- a/pymongo/__init__.py +++ b/pymongo/__init__.py @@ -14,7 +14,7 @@ """Python driver for MongoDB.""" -from typing import Tuple, Union +from typing import ContextManager, Optional, Tuple, Union ASCENDING = 1 """Ascending sort order.""" @@ -69,6 +69,7 @@ version = __version__ """Current version of PyMongo.""" +from pymongo import _csot from pymongo.collection import ReturnDocument # noqa: F401 from pymongo.common import ( # noqa: F401 MAX_SUPPORTED_WIRE_VERSION, @@ -97,3 +98,47 @@ def has_c() -> bool: return True except ImportError: return False + + +def timeout(seconds: Optional[float]) -> ContextManager: + """**(Provisional)** Apply the given timeout for a block of operations. + + .. note:: :func:`~pymongo.timeout` is currently provisional. Backwards + incompatible changes may occur before becoming officially supported. + + Use :func:`~pymongo.timeout` in a with-statement:: + + with pymongo.timeout(5): + client.db.coll.insert_one({}) + client.db.coll2.insert_one({}) + + When the with-statement is entered, a deadline is set for the entire + block. When that deadline is exceeded, any blocking pymongo operation + will raise a timeout exception. For example:: + + try: + with pymongo.timeout(5): + client.db.coll.insert_one({}) + time.sleep(5) + # The deadline has now expired, the next operation will raise + # a timeout exception. + client.db.coll2.insert_one({}) + except (ServerSelectionTimeoutError, ExecutionTimeout, WTimeoutError, + NetworkTimeout) as exc: + print(f"block timed out: {exc!r}") + + :Parameters: + - `seconds`: A non-negative floating point number expressing seconds, or None. + + :Raises: + - :py:class:`ValueError`: When `seconds` is negative. + + .. versionadded:: 4.2 + """ + if not isinstance(seconds, (int, float, type(None))): + raise TypeError("timeout must be None, an int, or a float") + if seconds and seconds < 0: + raise ValueError("timeout cannot be negative") + if seconds is not None: + seconds = float(seconds) + return _csot._TimeoutContext(seconds) diff --git a/pymongo/_csot.py b/pymongo/_csot.py new file mode 100644 index 000000000..4085562ca --- /dev/null +++ b/pymongo/_csot.py @@ -0,0 +1,80 @@ +# Copyright 2022-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Internal helpers for CSOT.""" + +import time +from contextvars import ContextVar +from typing import Optional + +TIMEOUT: ContextVar[Optional[float]] = ContextVar("TIMEOUT", default=None) +RTT: ContextVar[float] = ContextVar("RTT", default=0.0) +DEADLINE: ContextVar[float] = ContextVar("DEADLINE", default=float("inf")) + + +def get_timeout() -> Optional[float]: + return TIMEOUT.get(None) + + +def get_rtt() -> float: + return RTT.get() + + +def get_deadline() -> float: + return DEADLINE.get() + + +def set_rtt(rtt: float) -> None: + RTT.set(rtt) + + +def set_timeout(timeout: Optional[float]) -> None: + TIMEOUT.set(timeout) + DEADLINE.set(time.monotonic() + timeout if timeout else float("inf")) + + +def remaining() -> Optional[float]: + if not get_timeout(): + return None + return DEADLINE.get() - time.monotonic() + + +def clamp_remaining(max_timeout: float) -> float: + """Return the remaining timeout clamped to a max value.""" + timeout = remaining() + if timeout is None: + return max_timeout + return min(timeout, max_timeout) + + +class _TimeoutContext(object): + """Internal timeout context manager. + + Use :func:`pymongo.timeout` instead:: + + with client.timeout(0.5): + client.test.test.insert_one({}) + """ + + __slots__ = ("_timeout",) + + def __init__(self, timeout: Optional[float]): + self._timeout = timeout + + def __enter__(self): + set_timeout(self._timeout) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + set_timeout(None) diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 44923f73d..7992383f6 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -330,6 +330,8 @@ class _Bulk(object): session._apply_to(cmd, retryable, ReadPreference.PRIMARY, sock_info) sock_info.send_cluster_time(cmd, session, client) sock_info.add_server_api(cmd) + # CSOT: apply timeout before encoding the command. + sock_info.apply_timeout(client, cmd) ops = islice(run.ops, run.idx_offset, None) # Run as many ops as possible in one command. diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 4987601d5..6784e3284 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -14,6 +14,8 @@ """Tools to parse mongo client options.""" +from typing import Optional + from bson.codec_options import _parse_codec_options from pymongo import common from pymongo.auth import _build_credentials_tuple @@ -195,6 +197,7 @@ class ClientOptions(object): self.__server_selector = options.get("server_selector", any_server_selector) self.__auto_encryption_opts = options.get("auto_encryption_opts") self.__load_balanced = options.get("loadbalanced") + self.__timeout = options.get("timeoutms") @property def _options(self): @@ -260,6 +263,14 @@ class ClientOptions(object): """A :class:`~pymongo.read_concern.ReadConcern` instance.""" return self.__read_concern + @property + def timeout(self) -> Optional[float]: + """The timeout. + + ..versionadded: 4.2 + """ + return self.__timeout + @property def retry_writes(self): """If this instance should retry supported write operations.""" diff --git a/pymongo/client_session.py b/pymongo/client_session.py index 7d70eb8f1..3ff98a579 100644 --- a/pymongo/client_session.py +++ b/pymongo/client_session.py @@ -150,6 +150,7 @@ from bson.binary import Binary from bson.int64 import Int64 from bson.son import SON from bson.timestamp import Timestamp +from pymongo import _csot from pymongo.cursor import _SocketManager from pymongo.errors import ( ConfigurationError, @@ -826,7 +827,7 @@ class ClientSession: wc = opts.write_concern cmd = SON([(command_name, 1)]) if command_name == "commitTransaction": - if opts.max_commit_time_ms: + if opts.max_commit_time_ms and _csot.get_timeout() is None: cmd["maxTimeMS"] = opts.max_commit_time_ms # Transaction spec says that after the initial commit attempt, diff --git a/pymongo/collection.py b/pymongo/collection.py index ffd883e93..9f3f73198 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -116,6 +116,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): write_concern: Optional[WriteConcern] = None, read_concern: Optional["ReadConcern"] = None, session: Optional["ClientSession"] = None, + timeout: Optional[float] = None, encrypted_fields: Optional[Mapping[str, Any]] = None, **kwargs: Any, ) -> None: @@ -198,6 +199,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): read_preference or database.read_preference, write_concern or database.write_concern, read_concern or database.read_concern, + timeout if timeout is not None else database.timeout, ) if not isinstance(name, str): raise TypeError("name must be an instance of str") @@ -390,6 +392,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): read_preference: Optional[_ServerMode] = None, write_concern: Optional[WriteConcern] = None, read_concern: Optional["ReadConcern"] = None, + timeout: Optional[float] = None, ) -> "Collection[_DocumentType]": """Get a clone of this collection changing the specified settings. @@ -428,6 +431,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): read_preference or self.read_preference, write_concern or self.write_concern, read_concern or self.read_concern, + timeout=timeout if timeout is not None else self.timeout, ) def bulk_write( diff --git a/pymongo/common.py b/pymongo/common.py index 437665440..858684bf0 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -339,6 +339,15 @@ def validate_timeout_or_none_or_zero(option: Any, value: Any) -> Optional[float] return validate_positive_float(option, value) / 1000.0 +def validate_timeoutms(option: Any, value: Any) -> Optional[float]: + """Validates a timeout specified in milliseconds returning + a value in floating point seconds. + """ + if value is None: + return None + return validate_positive_float_or_zero(option, value) / 1000.0 + + def validate_max_staleness(option: str, value: Any) -> int: """Validates maxStalenessSeconds according to the Max Staleness Spec.""" if value == -1 or value == "-1": @@ -658,6 +667,7 @@ URI_OPTIONS_VALIDATOR_MAP: Dict[str, Callable[[Any, Any], Any]] = { "zlibcompressionlevel": validate_zlib_compression_level, "srvservicename": validate_string, "srvmaxhosts": validate_non_negative_integer, + "timeoutms": validate_timeoutms, } # Dictionary where keys are the names of URI options specific to pymongo, @@ -821,8 +831,8 @@ class BaseObject(object): read_preference: _ServerMode, write_concern: WriteConcern, read_concern: ReadConcern, + timeout: Optional[float], ) -> None: - if not isinstance(codec_options, CodecOptions): raise TypeError("codec_options must be an instance of bson.codec_options.CodecOptions") self.__codec_options = codec_options @@ -845,6 +855,12 @@ class BaseObject(object): raise TypeError("read_concern must be an instance of pymongo.read_concern.ReadConcern") self.__read_concern = read_concern + if not isinstance(timeout, (int, float, type(None))): + raise TypeError("timeout must be None, an int, or a float") + if timeout and timeout < 0: + raise TypeError("timeout cannot be negative") + self.__timeout = float(timeout) if timeout else None + @property def codec_options(self) -> CodecOptions: """Read only access to the :class:`~bson.codec_options.CodecOptions` @@ -894,6 +910,14 @@ class BaseObject(object): """ return self.__read_concern + @property + def timeout(self) -> Optional[float]: + """Read only access to the timeout of this instance. + + .. versionadded:: 4.2 + """ + return self.__timeout + class _CaseInsensitiveDictionary(abc.MutableMapping): def __init__(self, *args, **kwargs): diff --git a/pymongo/database.py b/pymongo/database.py index bb91196f2..393f63c8c 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -75,6 +75,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): read_preference: Optional[_ServerMode] = None, write_concern: Optional["WriteConcern"] = None, read_concern: Optional["ReadConcern"] = None, + timeout: Optional[float] = None, ) -> None: """Get a database by client and name. @@ -127,6 +128,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): read_preference or client.read_preference, write_concern or client.write_concern, read_concern or client.read_concern, + timeout if timeout is not None else client.timeout, ) if not isinstance(name, str): @@ -154,6 +156,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): read_preference: Optional[_ServerMode] = None, write_concern: Optional["WriteConcern"] = None, read_concern: Optional["ReadConcern"] = None, + timeout: Optional[float] = None, ) -> "Database[_DocumentType]": """Get a clone of this database changing the specified settings. @@ -193,6 +196,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): read_preference or self.read_preference, write_concern or self.write_concern, read_concern or self.read_concern, + timeout if timeout is not None else self.timeout, ) def __eq__(self, other: Any) -> bool: @@ -241,6 +245,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): read_preference: Optional[_ServerMode] = None, write_concern: Optional["WriteConcern"] = None, read_concern: Optional["ReadConcern"] = None, + timeout: Optional[float] = None, ) -> Collection[_DocumentType]: """Get a :class:`~pymongo.collection.Collection` with the given name and options. @@ -280,7 +285,14 @@ class Database(common.BaseObject, Generic[_DocumentType]): used. """ return Collection( - self, name, False, codec_options, read_preference, write_concern, read_concern + self, + name, + False, + codec_options, + read_preference, + write_concern, + read_concern, + timeout=timeout, ) def create_collection( @@ -291,6 +303,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): write_concern: Optional["WriteConcern"] = None, read_concern: Optional["ReadConcern"] = None, session: Optional["ClientSession"] = None, + timeout: Optional[float] = None, encrypted_fields: Optional[Mapping[str, Any]] = None, **kwargs: Any, ) -> Collection[_DocumentType]: @@ -421,6 +434,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): write_concern, read_concern, session=s, + timeout=timeout, encrypted_fields=encrypted_fields, **kwargs, ) diff --git a/pymongo/encryption.py b/pymongo/encryption.py index a7a69dbe3..a088bd2da 100644 --- a/pymongo/encryption.py +++ b/pymongo/encryption.py @@ -16,6 +16,7 @@ import contextlib import enum +import socket import uuid import weakref from typing import Any, Mapping, Optional, Sequence @@ -38,6 +39,7 @@ from bson.codec_options import CodecOptions from bson.errors import BSONError from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument, _inflate_bson from bson.son import SON +from pymongo import _csot from pymongo.daemon import _spawn_daemon from pymongo.encryption_options import AutoEncryptionOpts from pymongo.errors import ( @@ -47,6 +49,7 @@ from pymongo.errors import ( ServerSelectionTimeoutError, ) from pymongo.mongo_client import MongoClient +from pymongo.network import BLOCKING_IO_ERRORS from pymongo.pool import PoolOptions, _configured_socket from pymongo.read_concern import ReadConcern from pymongo.ssl_support import get_ssl_context @@ -119,9 +122,11 @@ class _EncryptionIO(MongoCryptCallback): # type: ignore False, # allow_invalid_hostnames False, ) # disable_ocsp_endpoint_check + # CSOT: set timeout for socket creation. + connect_timeout = max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0.001) opts = PoolOptions( - connect_timeout=_KMS_CONNECT_TIMEOUT, - socket_timeout=_KMS_CONNECT_TIMEOUT, + connect_timeout=connect_timeout, + socket_timeout=connect_timeout, ssl_context=ctx, ) host, port = parse_host(endpoint, _HTTPS_PORT) @@ -129,10 +134,14 @@ class _EncryptionIO(MongoCryptCallback): # type: ignore try: conn.sendall(message) while kms_context.bytes_needed > 0: + # CSOT: update timeout. + conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0)) data = conn.recv(kms_context.bytes_needed) if not data: raise OSError("KMS connection closed") kms_context.feed(data) + except BLOCKING_IO_ERRORS: + raise socket.timeout("timed out") finally: conn.close() diff --git a/pymongo/message.py b/pymongo/message.py index de43d20c9..bcdedd7b4 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -300,6 +300,9 @@ class _Query(object): self._as_command = None self.exhaust = exhaust + def reset(self): + self._as_command = None + def namespace(self): return "%s.%s" % (self.db, self.coll) @@ -320,7 +323,7 @@ class _Query(object): sock_info.validate_session(self.client, self.session) return use_find_cmd - def as_command(self, sock_info): + def as_command(self, sock_info, apply_timeout=False): """Return a find command document for this query.""" # We use the command twice: on the wire and for command monitoring. # Generate it once, for speed and to avoid repeating side-effects. @@ -356,6 +359,9 @@ class _Query(object): client = self.client if client._encrypter and not client._encrypter._bypass_auto_encryption: cmd = client._encrypter.encrypt(self.db, cmd, self.codec_options) + # Support CSOT + if apply_timeout: + sock_info.apply_timeout(client, cmd) self._as_command = cmd, self.db return self._as_command @@ -371,7 +377,7 @@ class _Query(object): spec = self.spec if use_cmd: - spec = self.as_command(sock_info)[0] + spec = self.as_command(sock_info, apply_timeout=True)[0] request_id, msg, size, _ = _op_msg( 0, spec, @@ -457,6 +463,9 @@ class _GetMore(object): self.exhaust = exhaust self.comment = comment + def reset(self): + self._as_command = None + def namespace(self): return "%s.%s" % (self.db, self.coll) @@ -471,7 +480,7 @@ class _GetMore(object): sock_info.validate_session(self.client, self.session) return use_cmd - def as_command(self, sock_info): + def as_command(self, sock_info, apply_timeout=False): """Return a getMore command document for this query.""" # See _Query.as_command for an explanation of this caching. if self._as_command is not None: @@ -493,6 +502,9 @@ class _GetMore(object): client = self.client if client._encrypter and not client._encrypter._bypass_auto_encryption: cmd = client._encrypter.encrypt(self.db, cmd, self.codec_options) + # Support CSOT + if apply_timeout: + sock_info.apply_timeout(client, cmd=None) self._as_command = cmd, self.db return self._as_command @@ -503,7 +515,7 @@ class _GetMore(object): ctx = sock_info.compression_context if use_cmd: - spec = self.as_command(sock_info)[0] + spec = self.as_command(sock_info, apply_timeout=True)[0] if self.sock_mgr: flags = _OpMsg.EXHAUST_ALLOWED else: diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index e1aa80e2f..7af4b167f 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -57,6 +57,7 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry from bson.son import SON from bson.timestamp import Timestamp from pymongo import ( + _csot, client_session, common, database, @@ -260,6 +261,10 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): replaced. Defaults to `None` (no limit). - `maxConnecting` (optional): The maximum number of connections that each pool can establish concurrently. Defaults to `2`. + - `timeoutMS`: (integer or None) Controls how long (in + milliseconds) the driver will wait when executing an operation + (including retry attempts) before raising a timeout error. + ``0`` or ``None`` means no timeout. - `socketTimeoutMS`: (integer or None) Controls how long (in milliseconds) the driver will wait for a response after sending an ordinary (non-monitoring) database operation before concluding that @@ -540,6 +545,9 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): .. seealso:: The MongoDB documentation on `connections `_. + .. versionchanged:: 4.2 + Added the ``timeoutMS`` keyword argument. + .. versionchanged:: 4.0 - Removed the fsync, unlock, is_locked, database_names, and @@ -780,6 +788,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): options.read_preference, options.write_concern, options.read_concern, + options.timeout, ) self._topology_settings = TopologySettings( @@ -1273,6 +1282,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): ) def _cmd(session, server, sock_info, read_preference): + operation.reset() # Reset op in case of retry. return server.run_operation( sock_info, operation, read_preference, self._event_listeners, unpack_res ) @@ -1303,6 +1313,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): max_wire_version = 0 last_error: Optional[Exception] = None retrying = False + multiple_retries = _csot.get_timeout() is not None def is_retrying(): return bulk.retrying if bulk else retrying @@ -1350,7 +1361,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): retryable_error = exc.has_error_label("RetryableWriteError") if retryable_error: session._unpin() - if is_retrying() or not retryable_error: + if not retryable_error or (is_retrying() and not multiple_retries): raise if bulk: bulk.retrying = True @@ -1371,6 +1382,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): ) last_error: Optional[Exception] = None retrying = False + multiple_retries = _csot.get_timeout() is not None while True: try: @@ -1394,12 +1406,12 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): # most likely be a waste of time. raise except ConnectionFailure as exc: - if not retryable or retrying: + if not retryable or (retrying and not multiple_retries): raise retrying = True last_error = exc except OperationFailure as exc: - if not retryable or retrying: + if not retryable or (retrying and not multiple_retries): raise if exc.code not in helpers._RETRYABLE_ERROR_CODES: raise @@ -1922,6 +1934,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): read_preference: Optional[_ServerMode] = None, write_concern: Optional[WriteConcern] = None, read_concern: Optional["ReadConcern"] = None, + timeout: Optional[float] = None, ) -> database.Database[_DocumentType]: """Get a :class:`~pymongo.database.Database` with the given name and options. @@ -1972,7 +1985,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): name = self.__default_database_name return database.Database( - self, name, codec_options, read_preference, write_concern, read_concern + self, name, codec_options, read_preference, write_concern, read_concern, timeout ) def _database_default_options(self, name): diff --git a/pymongo/network.py b/pymongo/network.py index df08158b2..3eac0d02d 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -21,7 +21,7 @@ import struct import time from bson import _decode_all_selective -from pymongo import helpers, message +from pymongo import _csot, helpers, message, ssl_support from pymongo.common import MAX_MESSAGE_SIZE from pymongo.compression_support import _NO_COMPRESSION, decompress from pymongo.errors import ( @@ -59,6 +59,7 @@ def command( unacknowledged=False, user_fields=None, exhaust_allowed=False, + write_concern=None, ): """Execute a command over the socket, or raise socket.error. @@ -115,6 +116,12 @@ def command( if client and client._encrypter and not client._encrypter._bypass_auto_encryption: spec = orig = client._encrypter.encrypt(dbname, spec, codec_options) + # Support CSOT + if client: + sock_info.apply_timeout(client, spec, write_concern) + elif write_concern and not write_concern.is_server_default: + spec["writeConcern"] = write_concern.document + if use_op_msg: flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 @@ -198,11 +205,14 @@ _UNPACK_COMPRESSION_HEADER = struct.Struct("