From f7d2deb27d20ae2f2259579e8ff029f1f44a5396 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 10 Jun 2024 12:31:19 -0700 Subject: [PATCH 1/2] PYTHON-4480 Deprecate create=True for Collection (#1659) --- pymongo/asynchronous/collection.py | 6 ++++++ pymongo/synchronous/collection.py | 6 ++++++ test/test_collection.py | 6 +++++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pymongo/asynchronous/collection.py b/pymongo/asynchronous/collection.py index ed396fb9c..372cafe5b 100644 --- a/pymongo/asynchronous/collection.py +++ b/pymongo/asynchronous/collection.py @@ -15,6 +15,7 @@ """Collection level utilities for Mongo.""" from __future__ import annotations +import warnings from collections import abc from typing import ( TYPE_CHECKING, @@ -248,6 +249,11 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): if create or kwargs: if _IS_SYNC: + warnings.warn( + "The `create` and `kwargs` arguments to Collection are deprecated and will be removed in PyMongo 5.0", + DeprecationWarning, + stacklevel=2, + ) self._create(kwargs, session) # type: ignore[unused-coroutine] else: raise ValueError( diff --git a/pymongo/synchronous/collection.py b/pymongo/synchronous/collection.py index 61bd81fd9..c04afbe7c 100644 --- a/pymongo/synchronous/collection.py +++ b/pymongo/synchronous/collection.py @@ -15,6 +15,7 @@ """Collection level utilities for Mongo.""" from __future__ import annotations +import warnings from collections import abc from typing import ( TYPE_CHECKING, @@ -251,6 +252,11 @@ class Collection(common.BaseObject, Generic[_DocumentType]): if create or kwargs: if _IS_SYNC: + warnings.warn( + "The `create` and `kwargs` arguments to Collection are deprecated and will be removed in PyMongo 5.0", + DeprecationWarning, + stacklevel=2, + ) self._create(kwargs, session) # type: ignore[unused-coroutine] else: raise ValueError("Collection does not support the `create` or `kwargs` arguments.") diff --git a/test/test_collection.py b/test/test_collection.py index 54f76336d..4bbe0fb57 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -198,7 +198,11 @@ class TestCollection(IntegrationTest): "create create_test_no_wc collection", ) db.create_test_no_wc.drop() - Collection(db, name="create_test_no_wc", create=True) + with self.assertWarns( + DeprecationWarning, + msg="The `create` and `kwargs` arguments to Collection are deprecated and will be removed in PyMongo 5.0", + ): + Collection(db, name="create_test_no_wc", create=True) wait_until( lambda: "create_test_no_wc" in db.list_collection_names(), "create create_test_no_wc collection", From 8c35d1e481ed9e2990ac9fbc42d2fa2a0d8e8c44 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Tue, 11 Jun 2024 12:50:26 -0700 Subject: [PATCH 2/2] PYTHON-4347 Improve performance by making _ServerSessionPool lock-free (#1660) --- pymongo/asynchronous/bulk.py | 2 +- pymongo/asynchronous/client_session.py | 52 +++++++++++++------------- pymongo/asynchronous/message.py | 4 +- pymongo/asynchronous/mongo_client.py | 10 +++-- pymongo/asynchronous/pool.py | 2 +- pymongo/asynchronous/topology.py | 21 +++-------- pymongo/synchronous/client_session.py | 40 ++++++++++---------- pymongo/synchronous/mongo_client.py | 6 ++- pymongo/synchronous/topology.py | 17 ++------- 9 files changed, 70 insertions(+), 84 deletions(-) diff --git a/pymongo/asynchronous/bulk.py b/pymongo/asynchronous/bulk.py index 4205fceac..f6b45e0fa 100644 --- a/pymongo/asynchronous/bulk.py +++ b/pymongo/asynchronous/bulk.py @@ -378,7 +378,7 @@ class _Bulk: if retryable and not self.started_retryable_write: session._start_retryable_write() self.started_retryable_write = True - await session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn) + session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn) conn.send_cluster_time(cmd, session, client) conn.add_server_api(cmd) # CSOT: apply timeout before encoding the command. diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index fcaf26a87..62d5ed29a 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -528,7 +528,7 @@ class ClientSession: # is in the committed state when the session is discarded. await self._unpin() finally: - await self._client._return_server_session(self._server_session, lock) + self._client._return_server_session(self._server_session) self._server_session = None def _check_ended(self) -> None: @@ -557,13 +557,13 @@ class ClientSession: async def session_id(self) -> Mapping[str, Any]: """A BSON document, the opaque server session identifier.""" self._check_ended() - await self._materialize(self._client.topology_description.logical_session_timeout_minutes) + self._materialize(self._client.topology_description.logical_session_timeout_minutes) return self._server_session.session_id @property async def _transaction_id(self) -> Int64: """The current transaction id for the underlying server session.""" - await self._materialize(self._client.topology_description.logical_session_timeout_minutes) + self._materialize(self._client.topology_description.logical_session_timeout_minutes) return self._server_session.transaction_id @property @@ -979,16 +979,16 @@ class ClientSession: return self._transaction.opts.read_preference return None - async def _materialize(self, logical_session_timeout_minutes: Optional[int] = None) -> None: + def _materialize(self, logical_session_timeout_minutes: Optional[int] = None) -> None: if isinstance(self._server_session, _EmptyServerSession): old = self._server_session - self._server_session = await self._client._topology.get_server_session( + self._server_session = self._client._topology.get_server_session( logical_session_timeout_minutes ) if old.started_retryable_write: self._server_session.inc_transaction_id() - async def _apply_to( + def _apply_to( self, command: MutableMapping[str, Any], is_retryable: bool, @@ -1000,7 +1000,7 @@ class ClientSession: raise ConfigurationError("Sessions are not supported by this MongoDB deployment") return self._check_ended() - await self._materialize(conn.logical_session_timeout_minutes) + self._materialize(conn.logical_session_timeout_minutes) if self.options.snapshot: self._update_read_concern(command, conn) @@ -1103,7 +1103,7 @@ class _ServerSession: class _ServerSessionPool(collections.deque): """Pool of _ServerSession objects. - This class is not thread-safe, access it while holding the Topology lock. + This class is thread-safe. """ def __init__(self, *args: Any, **kwargs: Any): @@ -1116,8 +1116,11 @@ class _ServerSessionPool(collections.deque): def pop_all(self) -> list[_ServerSession]: ids = [] - while self: - ids.append(self.pop().session_id) + while True: + try: + ids.append(self.pop().session_id) + except IndexError: + break return ids def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession: @@ -1129,23 +1132,17 @@ class _ServerSessionPool(collections.deque): self._clear_stale(session_timeout_minutes) # The most recently used sessions are on the left. - while self: - s = self.popleft() + while True: + try: + s = self.popleft() + except IndexError: + break if not s.timed_out(session_timeout_minutes): return s return _ServerSession(self.generation) - def return_server_session( - self, server_session: _ServerSession, session_timeout_minutes: Optional[int] - ) -> None: - if session_timeout_minutes is not None: - self._clear_stale(session_timeout_minutes) - if server_session.timed_out(session_timeout_minutes): - return - self.return_server_session_no_lock(server_session) - - def return_server_session_no_lock(self, server_session: _ServerSession) -> None: + def return_server_session(self, server_session: _ServerSession) -> None: # Discard sessions from an old pool to avoid duplicate sessions in the # child process after a fork. if server_session.generation == self.generation and not server_session.dirty: @@ -1153,9 +1150,12 @@ class _ServerSessionPool(collections.deque): def _clear_stale(self, session_timeout_minutes: Optional[int]) -> None: # Clear stale sessions. The least recently used are on the right. - while self: - if self[-1].timed_out(session_timeout_minutes): - self.pop() - else: + while True: + try: + s = self.pop() + except IndexError: + break + if not s.timed_out(session_timeout_minutes): + self.append(s) # The remaining sessions also haven't timed out. break diff --git a/pymongo/asynchronous/message.py b/pymongo/asynchronous/message.py index 0815d3353..d2f048b40 100644 --- a/pymongo/asynchronous/message.py +++ b/pymongo/asynchronous/message.py @@ -394,7 +394,7 @@ class _Query: session = self.session conn.add_server_api(cmd) if session: - await session._apply_to(cmd, False, self.read_preference, conn) + session._apply_to(cmd, False, self.read_preference, conn) # Explain does not support readConcern. if not explain and not session.in_transaction: session._update_read_concern(cmd, conn) @@ -546,7 +546,7 @@ class _GetMore: conn, ) if self.session: - await self.session._apply_to(cmd, False, self.read_preference, conn) + self.session._apply_to(cmd, False, self.read_preference, conn) conn.add_server_api(cmd) conn.send_cluster_time(cmd, self.session, self.client) # Support auto encryption diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 5eedd5ba0..8319755d0 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -902,6 +902,8 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): def _after_fork(self) -> None: """Resets topology in a child after successfully forking.""" self._init_background() + # Reset the session pool to avoid duplicate sessions in the child process. + self._topology._session_pool.reset() def _duplicate(self, **kwargs: Any) -> AsyncMongoClient: args = self._init_kwargs.copy() @@ -1508,7 +1510,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): .. versionchanged:: 3.6 End all server sessions created by this client. """ - session_ids = await self._topology.pop_all_sessions() + session_ids = self._topology.pop_all_sessions() if session_ids: await self._end_sessions(session_ids) # Stop the periodic task thread and then send pending killCursor @@ -2006,13 +2008,13 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): else: helpers._handle_exception() - async def _return_server_session( - self, server_session: Union[_ServerSession, _EmptyServerSession], lock: bool + def _return_server_session( + self, server_session: Union[_ServerSession, _EmptyServerSession] ) -> None: """Internal: return a _ServerSession to the pool.""" if isinstance(server_session, _EmptyServerSession): return None - return await self._topology.return_server_session(server_session, lock) + return self._topology.return_server_session(server_session) @contextlib.asynccontextmanager async def _tmp_session( diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a4d3c5064..df0fb636e 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -982,7 +982,7 @@ class Connection: self.add_server_api(spec) if session: - await session._apply_to(spec, retryable_write, read_preference, self) + session._apply_to(spec, retryable_write, read_preference, self) self.send_cluster_time(spec, session, client) listeners = self.listeners if publish_events else None unacknowledged = bool(write_concern and not write_concern.acknowledged) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index df6dd903a..ac578113b 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -671,25 +671,16 @@ class Topology: def description(self) -> TopologyDescription: return self._description - async def pop_all_sessions(self) -> list[_ServerSession]: + def pop_all_sessions(self) -> list[_ServerSession]: """Pop all session ids from the pool.""" - async with self._lock: - return self._session_pool.pop_all() + return self._session_pool.pop_all() - async def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession: + def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession: """Start or resume a server session, or raise ConfigurationError.""" - async with self._lock: - return self._session_pool.get_server_session(session_timeout_minutes) + return self._session_pool.get_server_session(session_timeout_minutes) - async def return_server_session(self, server_session: _ServerSession, lock: bool) -> None: - if lock: - async with self._lock: - self._session_pool.return_server_session( - server_session, self._description.logical_session_timeout_minutes - ) - else: - # Called from a __del__ method, can't use a lock. - self._session_pool.return_server_session_no_lock(server_session) + def return_server_session(self, server_session: _ServerSession) -> None: + self._session_pool.return_server_session(server_session) def _new_selection(self) -> Selection: """A Selection object, initially including all known servers. diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index b4339bd12..71af41aa8 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -528,7 +528,7 @@ class ClientSession: # is in the committed state when the session is discarded. self._unpin() finally: - self._client._return_server_session(self._server_session, lock) + self._client._return_server_session(self._server_session) self._server_session = None def _check_ended(self) -> None: @@ -1099,7 +1099,7 @@ class _ServerSession: class _ServerSessionPool(collections.deque): """Pool of _ServerSession objects. - This class is not thread-safe, access it while holding the Topology lock. + This class is thread-safe. """ def __init__(self, *args: Any, **kwargs: Any): @@ -1112,8 +1112,11 @@ class _ServerSessionPool(collections.deque): def pop_all(self) -> list[_ServerSession]: ids = [] - while self: - ids.append(self.pop().session_id) + while True: + try: + ids.append(self.pop().session_id) + except IndexError: + break return ids def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession: @@ -1125,23 +1128,17 @@ class _ServerSessionPool(collections.deque): self._clear_stale(session_timeout_minutes) # The most recently used sessions are on the left. - while self: - s = self.popleft() + while True: + try: + s = self.popleft() + except IndexError: + break if not s.timed_out(session_timeout_minutes): return s return _ServerSession(self.generation) - def return_server_session( - self, server_session: _ServerSession, session_timeout_minutes: Optional[int] - ) -> None: - if session_timeout_minutes is not None: - self._clear_stale(session_timeout_minutes) - if server_session.timed_out(session_timeout_minutes): - return - self.return_server_session_no_lock(server_session) - - def return_server_session_no_lock(self, server_session: _ServerSession) -> None: + def return_server_session(self, server_session: _ServerSession) -> None: # Discard sessions from an old pool to avoid duplicate sessions in the # child process after a fork. if server_session.generation == self.generation and not server_session.dirty: @@ -1149,9 +1146,12 @@ class _ServerSessionPool(collections.deque): def _clear_stale(self, session_timeout_minutes: Optional[int]) -> None: # Clear stale sessions. The least recently used are on the right. - while self: - if self[-1].timed_out(session_timeout_minutes): - self.pop() - else: + while True: + try: + s = self.pop() + except IndexError: + break + if not s.timed_out(session_timeout_minutes): + self.append(s) # The remaining sessions also haven't timed out. break diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index a44a4e039..69bfa7a9d 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -901,6 +901,8 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): def _after_fork(self) -> None: """Resets topology in a child after successfully forking.""" self._init_background() + # Reset the session pool to avoid duplicate sessions in the child process. + self._topology._session_pool.reset() def _duplicate(self, **kwargs: Any) -> MongoClient: args = self._init_kwargs.copy() @@ -2004,12 +2006,12 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): helpers._handle_exception() def _return_server_session( - self, server_session: Union[_ServerSession, _EmptyServerSession], lock: bool + self, server_session: Union[_ServerSession, _EmptyServerSession] ) -> None: """Internal: return a _ServerSession to the pool.""" if isinstance(server_session, _EmptyServerSession): return None - return self._topology.return_server_session(server_session, lock) + return self._topology.return_server_session(server_session) @contextlib.contextmanager def _tmp_session( diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index d76cef7bf..6c8cd8870 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -671,23 +671,14 @@ class Topology: def pop_all_sessions(self) -> list[_ServerSession]: """Pop all session ids from the pool.""" - with self._lock: - return self._session_pool.pop_all() + return self._session_pool.pop_all() def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession: """Start or resume a server session, or raise ConfigurationError.""" - with self._lock: - return self._session_pool.get_server_session(session_timeout_minutes) + return self._session_pool.get_server_session(session_timeout_minutes) - def return_server_session(self, server_session: _ServerSession, lock: bool) -> None: - if lock: - with self._lock: - self._session_pool.return_server_session( - server_session, self._description.logical_session_timeout_minutes - ) - else: - # Called from a __del__ method, can't use a lock. - self._session_pool.return_server_session_no_lock(server_session) + def return_server_session(self, server_session: _ServerSession) -> None: + self._session_pool.return_server_session(server_session) def _new_selection(self) -> Selection: """A Selection object, initially including all known servers.