parent
23d3c2ee86
commit
726921a034
@ -217,15 +217,14 @@ class PoolOptions(object):
|
||||
'__connect_timeout', '__socket_timeout',
|
||||
'__wait_queue_timeout', '__wait_queue_multiple',
|
||||
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
|
||||
'__event_listeners', '__appname', '__metadata',
|
||||
'__handshake_callback')
|
||||
'__event_listeners', '__appname', '__metadata')
|
||||
|
||||
def __init__(self, max_pool_size=100, min_pool_size=0,
|
||||
max_idle_time_ms=None, connect_timeout=None,
|
||||
socket_timeout=None, wait_queue_timeout=None,
|
||||
wait_queue_multiple=None, ssl_context=None,
|
||||
ssl_match_hostname=True, socket_keepalive=False,
|
||||
event_listeners=None, appname=None, handshake_callback=None):
|
||||
event_listeners=None, appname=None):
|
||||
|
||||
self.__max_pool_size = max_pool_size
|
||||
self.__min_pool_size = min_pool_size
|
||||
@ -243,27 +242,6 @@ class PoolOptions(object):
|
||||
if appname:
|
||||
self.__metadata['application'] = {'name': appname}
|
||||
|
||||
self.__handshake_callback = handshake_callback
|
||||
|
||||
def with_options(self, **kwargs):
|
||||
options = {
|
||||
'max_pool_size': self.max_pool_size,
|
||||
'min_pool_size': self.min_pool_size,
|
||||
'max_idle_time_ms': self.max_idle_time_ms,
|
||||
'connect_timeout': self.connect_timeout,
|
||||
'socket_timeout': self.socket_timeout,
|
||||
'wait_queue_timeout': self.wait_queue_timeout,
|
||||
'wait_queue_multiple': self.wait_queue_multiple,
|
||||
'ssl_context': self.ssl_context,
|
||||
'ssl_match_hostname': self.ssl_match_hostname,
|
||||
'socket_keepalive': self.socket_keepalive,
|
||||
'event_listeners': self.event_listeners,
|
||||
'appname': self.appname,
|
||||
'handshake_callback': self.handshake_callback}
|
||||
|
||||
options.update(kwargs)
|
||||
return PoolOptions(**options)
|
||||
|
||||
@property
|
||||
def max_pool_size(self):
|
||||
"""The maximum allowable number of concurrent connections to each
|
||||
@ -357,11 +335,6 @@ class PoolOptions(object):
|
||||
"""
|
||||
return self.__metadata.copy()
|
||||
|
||||
@property
|
||||
def handshake_callback(self):
|
||||
"""Receives an ismaster reply and updates the topology."""
|
||||
return self.__handshake_callback
|
||||
|
||||
|
||||
class SocketInfo(object):
|
||||
"""Store a socket with some metadata.
|
||||
@ -773,8 +746,6 @@ class Pool:
|
||||
('ismaster', 1),
|
||||
('client', self.opts.metadata)
|
||||
])
|
||||
|
||||
start = _time()
|
||||
ismaster = IsMaster(
|
||||
command(sock,
|
||||
'admin',
|
||||
@ -783,9 +754,6 @@ class Pool:
|
||||
False,
|
||||
ReadPreference.PRIMARY,
|
||||
DEFAULT_CODEC_OPTIONS))
|
||||
|
||||
# Can raise ConnectionFailure.
|
||||
self._handshake_callback(ismaster, _time() - start)
|
||||
else:
|
||||
ismaster = None
|
||||
return SocketInfo(sock, self, ismaster, self.address)
|
||||
@ -793,10 +761,6 @@ class Pool:
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
_raise_connection_failure(self.address, error)
|
||||
except:
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
raise
|
||||
|
||||
@contextlib.contextmanager
|
||||
def get_socket(self, all_credentials, checkout=False):
|
||||
@ -925,14 +889,6 @@ class Pool:
|
||||
else:
|
||||
return self.connect()
|
||||
|
||||
def _handshake_callback(self, ismaster, round_trip_time):
|
||||
callback = self.opts.handshake_callback
|
||||
if callback:
|
||||
kept = callback(self.address, ismaster, round_trip_time)
|
||||
if not kept:
|
||||
_raise_connection_failure(
|
||||
self.address, "server removed from topology")
|
||||
|
||||
def _raise_wait_queue_timeout(self):
|
||||
raise ConnectionFailure(
|
||||
'Timed out waiting for socket from pool with max_size %r and'
|
||||
|
||||
@ -38,8 +38,6 @@ class ServerDescription(object):
|
||||
- `ismaster`: Optional IsMaster instance
|
||||
- `round_trip_time`: Optional float
|
||||
- `error`: Optional, the last error attempting to connect to the server
|
||||
- `from_handshake`: Optional, whether this is from expanding a connection
|
||||
pool, rather than from background monitoring.
|
||||
"""
|
||||
|
||||
__slots__ = (
|
||||
@ -47,16 +45,14 @@ class ServerDescription(object):
|
||||
'_primary', '_max_bson_size', '_max_message_size',
|
||||
'_max_write_batch_size', '_min_wire_version', '_max_wire_version',
|
||||
'_round_trip_time', '_me', '_is_writable', '_is_readable', '_error',
|
||||
'_set_version', '_election_id', '_last_write_date', '_last_update_time',
|
||||
'_from_handshake')
|
||||
'_set_version', '_election_id', '_last_write_date', '_last_update_time')
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
address,
|
||||
ismaster=None,
|
||||
round_trip_time=None,
|
||||
error=None,
|
||||
from_handshake=False):
|
||||
error=None):
|
||||
self._address = address
|
||||
if not ismaster:
|
||||
ismaster = IsMaster({})
|
||||
@ -79,7 +75,6 @@ class ServerDescription(object):
|
||||
self._me = ismaster.me
|
||||
self._last_update_time = _time()
|
||||
self._error = error
|
||||
self._from_handshake = from_handshake # For tests.
|
||||
|
||||
if ismaster.last_write_date:
|
||||
# Convert from datetime to seconds.
|
||||
|
||||
@ -28,13 +28,13 @@ else:
|
||||
|
||||
from pymongo import common
|
||||
from pymongo import periodic_executor
|
||||
from pymongo.pool import PoolOptions
|
||||
from pymongo.topology_description import (updated_topology_description,
|
||||
TOPOLOGY_TYPE,
|
||||
TopologyDescription)
|
||||
from pymongo.errors import ServerSelectionTimeoutError
|
||||
from pymongo.monotonic import time as _time
|
||||
from pymongo.server import Server
|
||||
from pymongo.server_description import ServerDescription
|
||||
from pymongo.server_selectors import (any_server_selector,
|
||||
arbiter_server_selector,
|
||||
secondary_server_selector,
|
||||
@ -237,10 +237,7 @@ class Topology(object):
|
||||
address)
|
||||
|
||||
def on_change(self, server_description):
|
||||
"""Process a new ServerDescription after an ismaster call completes.
|
||||
|
||||
Returns False if the server was removed from the topology.
|
||||
"""
|
||||
"""Process a new ServerDescription after an ismaster call completes."""
|
||||
# We do no I/O holding the lock.
|
||||
with self._lock:
|
||||
# Any monitored server was definitely in the topology description
|
||||
@ -269,9 +266,6 @@ class Topology(object):
|
||||
|
||||
# Wake waiters in select_servers().
|
||||
self._condition.notify_all()
|
||||
return self._description.has_server(server_description.address)
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_server_by_address(self, address):
|
||||
"""Get a Server or None.
|
||||
@ -343,13 +337,9 @@ class Topology(object):
|
||||
|
||||
def update_pool(self):
|
||||
# Remove any stale sockets and add new sockets if pool is too small.
|
||||
# Avoid locking around network I/O, or deadlocking when a new connection
|
||||
# opens and calls Topology.on_change() with the ismaster reply.
|
||||
with self._lock:
|
||||
pools = [server._pool for server in self._servers.values()]
|
||||
|
||||
for pool in pools:
|
||||
pool.remove_stale_sockets()
|
||||
for server in self._servers.values():
|
||||
server._pool.remove_stale_sockets()
|
||||
|
||||
def close(self):
|
||||
"""Clear pools and terminate monitors. Topology reopens on demand."""
|
||||
@ -458,36 +448,22 @@ class Topology(object):
|
||||
self._servers.pop(address)
|
||||
|
||||
def _create_pool_for_server(self, address):
|
||||
# Server Discovery And Monitoring Spec: When a client calls ismaster
|
||||
# to handshake a new connection for application operations, use the
|
||||
# ismaster reply to update the topology.
|
||||
ref = weakref.proxy(self)
|
||||
|
||||
def handshake_callback(address, ismaster, round_trip_time):
|
||||
sd = ServerDescription(address, ismaster, round_trip_time,
|
||||
from_handshake=True)
|
||||
|
||||
try:
|
||||
# Return False if server was removed from topology.
|
||||
return ref.on_change(sd)
|
||||
except ReferenceError:
|
||||
return True
|
||||
|
||||
server_pool_options = self._settings.pool_options.with_options(
|
||||
handshake_callback=handshake_callback)
|
||||
|
||||
return self._settings.pool_class(address, server_pool_options)
|
||||
return self._settings.pool_class(address, self._settings.pool_options)
|
||||
|
||||
def _create_pool_for_monitor(self, address):
|
||||
options = self._settings.pool_options
|
||||
|
||||
# According to the Server Discovery And Monitoring Spec, monitors use
|
||||
# connect_timeout for both connect_timeout and socket_timeout. The
|
||||
# pool only has one socket so maxPoolSize and so on aren't needed.
|
||||
opts = self._settings.pool_options
|
||||
monitor_pool_options = opts.with_options(
|
||||
socket_timeout=opts.connect_timeout,
|
||||
monitor_pool_options = PoolOptions(
|
||||
connect_timeout=options.connect_timeout,
|
||||
socket_timeout=options.connect_timeout,
|
||||
ssl_context=options.ssl_context,
|
||||
ssl_match_hostname=options.ssl_match_hostname,
|
||||
socket_keepalive=True,
|
||||
max_pool_size=None,
|
||||
min_pool_size=None)
|
||||
event_listeners=options.event_listeners,
|
||||
appname=options.appname)
|
||||
|
||||
return self._settings.pool_class(address, monitor_pool_options,
|
||||
handshake=False)
|
||||
|
||||
@ -58,10 +58,6 @@ class MockPool(Pool):
|
||||
sock_info.mock_port = self.mock_port
|
||||
yield sock_info
|
||||
|
||||
def _handshake_callback(self, ismaster, round_trip_time):
|
||||
# Don't mock how PyMongo updates topology from ismaster reply.
|
||||
return True
|
||||
|
||||
|
||||
class MockMonitor(Monitor):
|
||||
def __init__(
|
||||
|
||||
@ -22,7 +22,6 @@ import threading
|
||||
|
||||
from bson.py3compat import imap
|
||||
from pymongo import common
|
||||
from pymongo import monitoring
|
||||
from pymongo.read_preferences import ReadPreference, Secondary
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
from pymongo.topology import Topology
|
||||
@ -38,7 +37,7 @@ from pymongo.server_selectors import (any_server_selector,
|
||||
writable_server_selector)
|
||||
from pymongo.settings import TopologySettings
|
||||
from test import client_knobs, unittest
|
||||
from test.utils import rs_or_single_client, wait_until
|
||||
from test.utils import wait_until
|
||||
|
||||
|
||||
class MockSocketInfo(object):
|
||||
@ -298,23 +297,6 @@ class TestSingleServerTopology(TopologyTest):
|
||||
if tries > 10:
|
||||
self.fail("Didn't ever calculate correct new average")
|
||||
|
||||
def test_update_from_handshake(self):
|
||||
class ServerHandshakes(monitoring.ServerListener, list):
|
||||
def opened(self, e):
|
||||
pass
|
||||
|
||||
def description_changed(self, e):
|
||||
if e.new_description._from_handshake:
|
||||
self.append(e)
|
||||
|
||||
def closed(self, e):
|
||||
pass
|
||||
|
||||
handshakes = ServerHandshakes()
|
||||
client = rs_or_single_client(event_listeners=[handshakes])
|
||||
client.admin.command('ping')
|
||||
wait_until(lambda: handshakes, 'record handshakes')
|
||||
|
||||
|
||||
class TestMultiServerTopology(TopologyTest):
|
||||
def test_readable_writable(self):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user