diff --git a/pymongo/client_session.py b/pymongo/client_session.py index 841a79f60..b7f79512d 100644 --- a/pymongo/client_session.py +++ b/pymongo/client_session.py @@ -236,6 +236,12 @@ class _ServerSessionPool(collections.deque): This class is not thread-safe, access it while holding the Topology lock. """ + def pop_all(self): + ids = [] + while self: + ids.append(self.pop().session_id) + return ids + def get_server_session(self, session_timeout_minutes): # Although the Driver Sessions Spec says we only clear stale sessions # in return_server_session, PyMongo can't take a lock when returning diff --git a/pymongo/common.py b/pymongo/common.py index e2f4241eb..cc5b8898b 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -94,6 +94,10 @@ COMMAND_NOT_FOUND_CODES = (59,) # Error codes to ignore if GridFS calls createIndex on a secondary UNAUTHORIZED_CODES = (13, 16547, 16548) +# Maximum number of sessions to send in a single endSessions command. +# From the driver sessions spec. +_MAX_END_SESSIONS = 10000 + def partition_node(node): """Split a host:port string into (host, int(port)) pair.""" diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index f62d19fdd..75b308393 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -857,13 +857,42 @@ class MongoClient(common.BaseObject): except ConnectionFailure: return False + def _end_sessions(self, session_ids): + """Send endSessions command(s) with the given session ids.""" + try: + # Use SocketInfo.command directly to avoid implicitly creating + # another session. + with self._socket_for_reads( + ReadPreference.PRIMARY_PREFERRED) as (sock_info, slave_ok): + if not sock_info.supports_sessions: + return + + for i in range(0, len(session_ids), common._MAX_END_SESSIONS): + spec = SON([('endSessions', + session_ids[i:i + common._MAX_END_SESSIONS])]) + sock_info.command( + 'admin', spec, slave_ok=slave_ok, client=self) + except PyMongoError: + # Drivers MUST ignore any errors returned by the endSessions + # command. + pass + def close(self): - """Disconnect from MongoDB. + """Cleanup client resources and disconnect from MongoDB. + + On MongoDB >= 3.6, end all server sessions created by this client by + sending one or more endSessions commands. Close all sockets in the connection pools and stop the monitor threads. If this instance is used again it will be automatically re-opened and the threads restarted. + + .. versionchanged:: 3.6 + End all server sessions created by this client. """ + session_ids = self._topology.pop_all_sessions() + if session_ids: + self._end_sessions(session_ids) # Run _process_periodic_tasks to send pending killCursor requests # before closing the topology. self._process_periodic_tasks() @@ -1307,7 +1336,7 @@ class MongoClient(common.BaseObject): except Exception: helpers._handle_exception() - def start_session(self, **kwargs): + def start_session(self, causal_consistency=True): """Start a logical session. This method takes the same parameters as @@ -1318,6 +1347,12 @@ class MongoClient(common.BaseObject): if this client has been authenticated to multiple databases using the deprecated method :meth:`~pymongo.database.Database.authenticate`. + A :class:`~pymongo.client_session.ClientSession` may only be used with + the MongoClient that started it. + + :Returns: + An instance of :class:`~pymongo.client_session.ClientSession`. + .. versionadded:: 3.6 """ # Driver Sessions Spec: "If startSession is called when multiple users @@ -1330,8 +1365,10 @@ class MongoClient(common.BaseObject): # Raises ConfigurationError if sessions are not supported. server_session = self._get_server_session() - opts = client_session.SessionOptions(**kwargs) - return client_session.ClientSession(self, server_session, opts, authset) + opts = client_session.SessionOptions( + causal_consistency=causal_consistency) + return client_session.ClientSession( + self, server_session, opts, authset) def _get_server_session(self): """Internal: start or resume a _ServerSession.""" diff --git a/pymongo/topology.py b/pymongo/topology.py index e49487e18..2ece443df 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -397,6 +397,11 @@ class Topology(object): def description(self): return self._description + def pop_all_sessions(self): + """Pop all session ids from the pool.""" + with self._lock: + return self._session_pool.pop_all() + def get_server_session(self): """Start or resume a server session, or raise ConfigurationError.""" with self._lock: diff --git a/test/test_session.py b/test/test_session.py index 15816b3f2..78718b363 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -21,6 +21,7 @@ from bson import DBRef from bson.py3compat import StringIO from gridfs import GridFS, GridFSBucket from pymongo import ASCENDING, InsertOne, IndexModel, OFF, monitoring +from pymongo.common import _MAX_END_SESSIONS from pymongo.errors import (ConfigurationError, InvalidOperation, OperationFailure) @@ -943,6 +944,29 @@ class TestCausalConsistency(unittest.TestCase): '$clusterTime') self.assertIsNone(after_cluster_time) + def test_end_sessions(self): + listener = SessionTestListener() + client = rs_or_single_client(event_listeners=[listener]) + # Start many sessions. + sessions = [client.start_session() + for _ in range(_MAX_END_SESSIONS + 1)] + for s in sessions: + s.end_session() + + # Closing the client should end all sessions and clear the pool. + self.assertEqual(len(client._topology._session_pool), + _MAX_END_SESSIONS + 1) + client.close() + self.assertEqual(len(client._topology._session_pool), 0) + end_sessions = [e for e in listener.results['started'] + if e.command_name == 'endSessions'] + self.assertEqual(len(end_sessions), 2) + + # Closing again should not send any commands. + listener.results.clear() + client.close() + self.assertEqual(len(listener.results['started']), 0) + class TestSessionsMultiAuth(IntegrationTest): @client_context.require_auth