PYTHON-1419 Call endSessions on MongoClient.close.

This commit is contained in:
Shane Harvey 2017-11-20 17:14:52 -08:00
parent 8416c73ca7
commit e554d6116c
5 changed files with 80 additions and 4 deletions

View File

@ -236,6 +236,12 @@ class _ServerSessionPool(collections.deque):
This class is not thread-safe, access it while holding the Topology lock.
"""
def pop_all(self):
ids = []
while self:
ids.append(self.pop().session_id)
return ids
def get_server_session(self, session_timeout_minutes):
# Although the Driver Sessions Spec says we only clear stale sessions
# in return_server_session, PyMongo can't take a lock when returning

View File

@ -94,6 +94,10 @@ COMMAND_NOT_FOUND_CODES = (59,)
# Error codes to ignore if GridFS calls createIndex on a secondary
UNAUTHORIZED_CODES = (13, 16547, 16548)
# Maximum number of sessions to send in a single endSessions command.
# From the driver sessions spec.
_MAX_END_SESSIONS = 10000
def partition_node(node):
"""Split a host:port string into (host, int(port)) pair."""

View File

@ -857,13 +857,42 @@ class MongoClient(common.BaseObject):
except ConnectionFailure:
return False
def _end_sessions(self, session_ids):
"""Send endSessions command(s) with the given session ids."""
try:
# Use SocketInfo.command directly to avoid implicitly creating
# another session.
with self._socket_for_reads(
ReadPreference.PRIMARY_PREFERRED) as (sock_info, slave_ok):
if not sock_info.supports_sessions:
return
for i in range(0, len(session_ids), common._MAX_END_SESSIONS):
spec = SON([('endSessions',
session_ids[i:i + common._MAX_END_SESSIONS])])
sock_info.command(
'admin', spec, slave_ok=slave_ok, client=self)
except PyMongoError:
# Drivers MUST ignore any errors returned by the endSessions
# command.
pass
def close(self):
"""Disconnect from MongoDB.
"""Cleanup client resources and disconnect from MongoDB.
On MongoDB >= 3.6, end all server sessions created by this client by
sending one or more endSessions commands.
Close all sockets in the connection pools and stop the monitor threads.
If this instance is used again it will be automatically re-opened and
the threads restarted.
.. versionchanged:: 3.6
End all server sessions created by this client.
"""
session_ids = self._topology.pop_all_sessions()
if session_ids:
self._end_sessions(session_ids)
# Run _process_periodic_tasks to send pending killCursor requests
# before closing the topology.
self._process_periodic_tasks()
@ -1307,7 +1336,7 @@ class MongoClient(common.BaseObject):
except Exception:
helpers._handle_exception()
def start_session(self, **kwargs):
def start_session(self, causal_consistency=True):
"""Start a logical session.
This method takes the same parameters as
@ -1318,6 +1347,12 @@ class MongoClient(common.BaseObject):
if this client has been authenticated to multiple databases using the
deprecated method :meth:`~pymongo.database.Database.authenticate`.
A :class:`~pymongo.client_session.ClientSession` may only be used with
the MongoClient that started it.
:Returns:
An instance of :class:`~pymongo.client_session.ClientSession`.
.. versionadded:: 3.6
"""
# Driver Sessions Spec: "If startSession is called when multiple users
@ -1330,8 +1365,10 @@ class MongoClient(common.BaseObject):
# Raises ConfigurationError if sessions are not supported.
server_session = self._get_server_session()
opts = client_session.SessionOptions(**kwargs)
return client_session.ClientSession(self, server_session, opts, authset)
opts = client_session.SessionOptions(
causal_consistency=causal_consistency)
return client_session.ClientSession(
self, server_session, opts, authset)
def _get_server_session(self):
"""Internal: start or resume a _ServerSession."""

View File

@ -397,6 +397,11 @@ class Topology(object):
def description(self):
return self._description
def pop_all_sessions(self):
"""Pop all session ids from the pool."""
with self._lock:
return self._session_pool.pop_all()
def get_server_session(self):
"""Start or resume a server session, or raise ConfigurationError."""
with self._lock:

View File

@ -21,6 +21,7 @@ from bson import DBRef
from bson.py3compat import StringIO
from gridfs import GridFS, GridFSBucket
from pymongo import ASCENDING, InsertOne, IndexModel, OFF, monitoring
from pymongo.common import _MAX_END_SESSIONS
from pymongo.errors import (ConfigurationError,
InvalidOperation,
OperationFailure)
@ -943,6 +944,29 @@ class TestCausalConsistency(unittest.TestCase):
'$clusterTime')
self.assertIsNone(after_cluster_time)
def test_end_sessions(self):
listener = SessionTestListener()
client = rs_or_single_client(event_listeners=[listener])
# Start many sessions.
sessions = [client.start_session()
for _ in range(_MAX_END_SESSIONS + 1)]
for s in sessions:
s.end_session()
# Closing the client should end all sessions and clear the pool.
self.assertEqual(len(client._topology._session_pool),
_MAX_END_SESSIONS + 1)
client.close()
self.assertEqual(len(client._topology._session_pool), 0)
end_sessions = [e for e in listener.results['started']
if e.command_name == 'endSessions']
self.assertEqual(len(end_sessions), 2)
# Closing again should not send any commands.
listener.results.clear()
client.close()
self.assertEqual(len(listener.results['started']), 0)
class TestSessionsMultiAuth(IntegrationTest):
@client_context.require_auth