From d98a7457009ce6c1cfa8949d652fdf2e9c427e1b Mon Sep 17 00:00:00 2001 From: aherlihy Date: Tue, 14 Jun 2016 15:20:19 +0200 Subject: [PATCH] PYTHON-1067 - Implement SDAM Monitoring --- doc/api/pymongo/monitoring.rst | 27 +- pymongo/common.py | 3 + pymongo/monitor.py | 25 +- pymongo/monitoring.py | 494 +++++++++++++++++- pymongo/server.py | 13 +- pymongo/settings.py | 2 + pymongo/topology.py | 108 +++- test/__init__.py | 12 +- .../replica_set_with_no_primary.json | 147 ++++++ .../replica_set_with_primary.json | 147 ++++++ .../replica_set_with_removal.json | 152 ++++++ .../sdam_monitoring/required_replica_set.json | 149 ++++++ test/sdam_monitoring/standalone.json | 104 ++++ test/test_collection.py | 2 +- test/test_command_monitoring_spec.py | 2 +- test/test_cursor.py | 2 +- test/test_discovery_and_monitoring.py | 3 + test/test_heartbeat_monitoring.py | 134 +++++ test/test_monitoring.py | 2 +- test/test_read_concern.py | 2 +- test/test_sdam_monitoring_spec.py | 288 ++++++++++ test/test_server_selection.py | 3 + test/test_topology.py | 3 + test/utils.py | 35 +- 24 files changed, 1812 insertions(+), 47 deletions(-) create mode 100644 test/sdam_monitoring/replica_set_with_no_primary.json create mode 100644 test/sdam_monitoring/replica_set_with_primary.json create mode 100644 test/sdam_monitoring/replica_set_with_removal.json create mode 100644 test/sdam_monitoring/required_replica_set.json create mode 100644 test/sdam_monitoring/standalone.json create mode 100644 test/test_heartbeat_monitoring.py create mode 100644 test/test_sdam_monitoring_spec.py diff --git a/doc/api/pymongo/monitoring.rst b/doc/api/pymongo/monitoring.rst index 26fc7ca72..a230ed134 100644 --- a/doc/api/pymongo/monitoring.rst +++ b/doc/api/pymongo/monitoring.rst @@ -7,12 +7,33 @@ .. autofunction:: register(listener) .. autoclass:: CommandListener :members: + .. autoclass:: ServerListener + :members: + .. autoclass:: ServerHeartbeatListener + :members: + .. autoclass:: TopologyListener + :members: .. autoclass:: CommandStartedEvent :members: - :inherited-members: .. autoclass:: CommandSucceededEvent :members: - :inherited-members: .. autoclass:: CommandFailedEvent :members: - :inherited-members: + .. autoclass:: ServerDescriptionChangedEvent + :members: + .. autoclass:: ServerOpeningEvent + :members: + .. autoclass:: ServerClosedEvent + :members: + .. autoclass:: TopologyDescriptionChangedEvent + :members: + .. autoclass:: TopologyOpenedEvent + :members: + .. autoclass:: TopologyClosedEvent + :members: + .. autoclass:: ServerHeartbeatStartedEvent + :members: + .. autoclass:: ServerHeartbeatSucceededEvent + :members: + .. autoclass:: ServerHeartbeatFailedEvent + :members: diff --git a/pymongo/common.py b/pymongo/common.py index 9481879d4..73d3f3807 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -49,6 +49,9 @@ HEARTBEAT_FREQUENCY = 10 # Frequency to process kill-cursors, in seconds. See MongoClient.close_cursor. KILL_CURSOR_FREQUENCY = 1 +# Frequency to process events queue, in seconds. +EVENTS_QUEUE_FREQUENCY = 1 + # How long to wait, in seconds, for a suitable server to be found before # aborting an operation. For example, if the client attempts an insert # during a replica set election, SERVER_SELECTION_TIMEOUT governs the diff --git a/pymongo/monitor.py b/pymongo/monitor.py index 059df8942..2f2d5172f 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -44,6 +44,9 @@ class Monitor(object): self._pool = pool self._settings = topology_settings self._avg_round_trip_time = MovingAverage() + self._listeners = self._settings._pool_options.event_listeners + pub = self._listeners is not None + self._publish = pub and self._listeners.enabled_for_server_heartbeat # We strongly reference the executor and it weakly references us via # this closure. When the monitor is freed, stop the executor soon. @@ -61,7 +64,7 @@ class Monitor(object): name="pymongo_server_monitor_thread") self._executor = executor - + # Avoid cycles. When self or topology is freed, stop executor soon. self_ref = weakref.ref(self, executor.close) self._topology = weakref.proxy(topology, executor.close) @@ -110,24 +113,34 @@ class Monitor(object): address = self._server_description.address retry = self._server_description.server_type != SERVER_TYPE.Unknown + start = _time() try: return self._check_once() except ReferenceError: raise except Exception as error: + error_time = _time() - start self._topology.reset_pool(address) default = ServerDescription(address, error=error) if not retry: + if self._publish: + self._listeners.publish_server_heartbeat_failed( + address, error_time, error) self._avg_round_trip_time.reset() # Server type defaults to Unknown. return default # Try a second and final time. If it fails return original error. + start = _time() try: return self._check_once() except ReferenceError: raise - except Exception: + except Exception as error: + error_time = _time() - start + if self._publish: + self._listeners.publish_server_heartbeat_failed( + address, error_time, error) self._avg_round_trip_time.reset() return default @@ -136,13 +149,19 @@ class Monitor(object): Returns a ServerDescription, or raises an exception. """ + address = self._server_description.address + if self._publish: + self._listeners.publish_server_heartbeat_started(address) with self._pool.get_socket({}) as sock_info: response, round_trip_time = self._check_with_socket(sock_info) self._avg_round_trip_time.add_sample(round_trip_time) sd = ServerDescription( - address=self._server_description.address, + address=address, ismaster=response, round_trip_time=self._avg_round_trip_time.get()) + if self._publish: + self._listeners.publish_server_heartbeat_succeeded( + address, round_trip_time, response) return sd diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 2173e3a44..75f4440b1 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -15,10 +15,8 @@ """Tools to monitor driver events. Use :func:`register` to register global listeners for specific events. -Currently only command events are published. Listeners must be -a subclass of :class:`CommandListener` and implement -:meth:`~CommandListener.started`, :meth:`~CommandListener.succeeded`, and -:meth:`~CommandListener.failed`. +Listeners must inherit from one of the abstract classes below and implement +the correct functions for that class. For example, a simple command logger might be implemented like this:: @@ -52,9 +50,9 @@ Event listeners can also be registered per instance of client = MongoClient(event_listeners=[CommandLogger()]) -Note that previously registered global listeners are automatically included when -configuring per client event listeners. Registering a new global listener will -not add that listener to existing client instances. +Note that previously registered global listeners are automatically included +when configuring per client event listeners. Registering a new global listener +will not add that listener to existing client instances. .. note:: Events are delivered **synchronously**. Application threads block waiting for event handlers (e.g. :meth:`~CommandListener.started`) to @@ -72,35 +70,133 @@ import traceback from collections import namedtuple, Sequence from pymongo.helpers import _handle_exception -_Listeners = namedtuple('Listeners', ('command_listeners',)) +_Listeners = namedtuple('Listeners', + ('command_listeners', 'server_listeners', + 'server_heartbeat_listeners', 'topology_listeners')) -_LISTENERS = _Listeners([]) +_LISTENERS = _Listeners([], [], [], []) -class CommandListener(object): - """Abstract base class for command listeners.""" +class _EventListener(object): + """Abstract base class for all event listeners. """ + + +class CommandListener(_EventListener): + """Abstract base class for command listeners. + Handles `CommandStartedEvent`, `CommandSucceededEvent`, + and `CommandFailedEvent`""" def started(self, event): - """Abstract method to handle CommandStartedEvent. + """Abstract method to handle a `CommandStartedEvent`. :Parameters: - - `event`: An instance of :class:`CommandStartedEvent` + - `event`: An instance of :class:`CommandStartedEvent`. """ raise NotImplementedError def succeeded(self, event): - """Abstract method to handle CommandSucceededEvent. + """Abstract method to handle a `CommandSucceededEvent`. :Parameters: - - `event`: An instance of :class:`CommandSucceededEvent` + - `event`: An instance of :class:`CommandSucceededEvent`. """ raise NotImplementedError def failed(self, event): - """Abstract method to handle CommandFailedEvent. + """Abstract method to handle a `CommandFailedEvent`. :Parameters: - - `event`: An instance of :class:`CommandFailedEvent` + - `event`: An instance of :class:`CommandFailedEvent`. + """ + raise NotImplementedError + + +class ServerHeartbeatListener(_EventListener): + """Abstract base class for server heartbeat listeners. + Handles `ServerHeartbeatStartedEvent`, `ServerHeartbeatSucceededEvent`, + and `ServerHeartbeatFailedEvent`.""" + + def started(self, event): + """Abstract method to handle a `ServerHeartbeatStartedEvent`. + + :Parameters: + - `event`: An instance of :class:`ServerHeartbeatStartedEvent`. + """ + raise NotImplementedError + + def succeeded(self, event): + """Abstract method to handle a `ServerHeartbeatSucceededEvent`. + + :Parameters: + - `event`: An instance of :class:`ServerHeartbeatSucceededEvent`. + """ + raise NotImplementedError + + def failed(self, event): + """Abstract method to handle a `ServerHeartbeatFailedEvent`. + + :Parameters: + - `event`: An instance of :class:`ServerHeartbeatFailedEvent`. + """ + raise NotImplementedError + + +class TopologyListener(_EventListener): + """Abstract base class for topology monitoring listeners. + Handles `TopologyOpenedEvent`, `TopologyDescriptionChangedEvent`, and + `TopologyClosedEvent`.""" + + def opened(self, event): + """Abstract method to handle a `TopologyOpenedEvent`. + + :Parameters: + - `event`: An instance of :class:`TopologyOpenedEvent`. + """ + raise NotImplementedError + + def changed(self, event): + """Abstract method to handle a `TopologyDescriptionChangedEvent`. + + :Parameters: + - `event`: An instance of :class:`TopologyDescriptionChangedEvent`. + """ + raise NotImplementedError + + def closed(self, event): + """Abstract method to handle a `TopologyClosedEvent`. + + :Parameters: + - `event`: An instance of :class:`TopologyClosedEvent`. + """ + raise NotImplementedError + + +class ServerListener(_EventListener): + """Abstract base class for server listeners. + Handles `ServerOpeningEvent`, `ServerDescriptionChangedEvent`, and + `ServerClosedEvent`.""" + + def opened(self, event): + """Abstract method to handle a `ServerOpeningEvent`. + + :Parameters: + - `event`: An instance of :class:`ServerOpeningEvent`. + """ + raise NotImplementedError + + def changed(self, event): + """Abstract method to handle a `ServerDescriptionChangedEvent`. + + :Parameters: + - `event`: An instance of :class:`ServerDescriptionChangedEvent`. + """ + raise NotImplementedError + + def closed(self, event): + """Abstract method to handle a `ServerClosedEvent`. + + :Parameters: + - `event`: An instance of :class:`ServerClosedEvent`. """ raise NotImplementedError @@ -118,9 +214,10 @@ def _validate_event_listeners(option, listeners): if not isinstance(listeners, Sequence): raise TypeError("%s must be a list or tuple" % (option,)) for listener in listeners: - if not isinstance(listener, CommandListener): - raise TypeError("Only subclasses of " - "pymongo.monitoring.CommandListener are supported") + if not isinstance(listener, _EventListener): + raise TypeError("Listeners for %s must be either a " + "CommandListener, ServerHeartbeatListener, " + "ServerListener, or TopologyListener." % (option,)) return listeners @@ -128,10 +225,22 @@ def register(listener): """Register a global event listener. :Parameters: - - `listener`: A subclass of :class:`CommandListener`. + - `listener`: A subclasses of :class:`CommandListener,` + :class:`ServerHeartbeatListener`, :class:`ServerListener`, or + :class:`TopologyListener`. """ - _validate_event_listeners('listener', [listener]) - _LISTENERS.command_listeners.append(listener) + if not isinstance(listener, _EventListener): + raise TypeError("Listeners for %s must be either a " + "CommandListener, ServerHeartbeatListener, " + "ServerListener, or TopologyListener." % (listener,)) + if isinstance(listener, CommandListener): + _LISTENERS.command_listeners.append(listener) + if isinstance(listener, ServerHeartbeatListener): + _LISTENERS.server_heartbeat_listeners.append(listener) + if isinstance(listener, ServerListener): + _LISTENERS.server_listeners.append(listener) + if isinstance(listener, TopologyListener): + _LISTENERS.topology_listeners.append(listener) # Note - to avoid bugs from forgetting which if these is all lowercase and @@ -275,6 +384,160 @@ class CommandFailedEvent(_CommandEvent): return self.__failure +class _ServerEvent(object): + """Base class for server events.""" + + __slots__ = ("__server_address", "__topology_id") + + def __init__(self, server_address, topology_id): + self.__server_address = server_address + self.__topology_id = topology_id + + @property + def server_address(self): + """The address (host/port pair) of the server""" + return self.__server_address + + @property + def topology_id(self): + """A unique identifier for the topology this server is a part of.""" + return self.__topology_id + + +class ServerDescriptionChangedEvent(_ServerEvent): + """Published when server description changes.""" + + __slots__ = ('__previous_description', '__new_description') + + def __init__(self, previous_description, new_description, *args): + super(ServerDescriptionChangedEvent, self).__init__(*args) + self.__previous_description = previous_description + self.__new_description = new_description + + @property + def previous_description(self): + """The previous server description.""" + return self.__previous_description + + @property + def new_description(self): + """The new server description.""" + return self.__new_description + + +class ServerOpeningEvent(_ServerEvent): + """Published when server is initialized.""" + + +class ServerClosedEvent(_ServerEvent): + """Published when server is closed.""" + + +class TopologyEvent(object): + """Base class for topology description events""" + + __slots__ = ('__topology_id') + + def __init__(self, topology_id): + self.__topology_id = topology_id + + @property + def topology_id(self): + """A unique identifier for the topology this server is a part of.""" + return self.__topology_id + + +class TopologyDescriptionChangedEvent(TopologyEvent): + """Published when the topology description changes.""" + + __slots__ = ('__previous_description', '__new_description') + + def __init__(self, previous_description, new_description, *args): + super(TopologyDescriptionChangedEvent, self).__init__(*args) + self.__previous_description = previous_description + self.__new_description = new_description + + @property + def previous_description(self): + """The old topology description.""" + return self.__previous_description + + @property + def new_description(self): + """The new topology description.""" + return self.__new_description + + +class TopologyOpenedEvent(TopologyEvent): + """Published when the topology is initialized.""" + + +class TopologyClosedEvent(TopologyEvent): + """Published when the topology is closed.""" + + +class _ServerHeartbeatEvent(object): + """Base class for server heartbeat events""" + + __slots__ = ('__connection_id') + + def __init__(self, connection_id): + self.__connection_id = connection_id + + @property + def connection_id(self): + """"The address (host, port) of the server this heartbeat was sent + to.""" + return self.__connection_id + + +class ServerHeartbeatStartedEvent(_ServerHeartbeatEvent): + """"Published when a heartbeat is started.""" + + +class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent): + """Fired when the server heartbeat succeeds.""" + + __slots__ = ('__duration', '__reply') + + def __init__(self, duration, reply, *args): + super(ServerHeartbeatSucceededEvent, self).__init__(*args) + self.__duration = duration + self.__reply = reply + + @property + def duration(self): + """The duration of this heartbeat in microseconds.""" + return self.__duration + + @property + def reply(self): + """The command reply.""" + return self.__reply + + +class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent): + """Fired when the server heartbeat fails, either with an "ok: 0" + or a socket exception.""" + + __slots__ = ('__duration', '__reply') + + def __init__(self, duration, reply, *args): + super(ServerHeartbeatFailedEvent, self).__init__(*args) + self.__duration = duration + self.__reply = reply + + @property + def duration(self): + """The duration of this heartbeat in microseconds.""" + return self.__duration + + @property + def reply(self): + """The command reply.""" + return self.__reply + + class _EventListeners(object): """Configure event listeners for a client instance. @@ -285,9 +548,25 @@ class _EventListeners(object): """ def __init__(self, listeners): self.__command_listeners = _LISTENERS.command_listeners[:] + self.__server_listeners = _LISTENERS.server_listeners[:] + lst = _LISTENERS.server_heartbeat_listeners + self.__server_heartbeat_listeners = lst[:] + self.__topology_listeners = _LISTENERS.topology_listeners[:] if listeners is not None: - self.__command_listeners.extend(listeners) + for lst in listeners: + if isinstance(lst, CommandListener): + self.__command_listeners.append(lst) + if isinstance(lst, ServerListener): + self.__server_listeners.append(lst) + if isinstance(lst, ServerHeartbeatListener): + self.__server_heartbeat_listeners.append(lst) + if isinstance(lst, TopologyListener): + self.__topology_listeners.append(lst) self.__enabled_for_commands = bool(self.__command_listeners) + self.__enabled_for_server = bool(self.__server_listeners) + self.__enabled_for_server_heartbeat = bool( + self.__server_heartbeat_listeners) + self.__enabled_for_topology = bool(self.__topology_listeners) @property def enabled_for_commands(self): @@ -295,9 +574,26 @@ class _EventListeners(object): return self.__enabled_for_commands @property + def enabled_for_server(self): + """Are any ServerListener instances registered?""" + return self.__enabled_for_server + + @property + def enabled_for_server_heartbeat(self): + """Are any ServerHeartbeatListener instances registered?""" + return self.__enabled_for_server_heartbeat + + @property + def enabled_for_topology(self): + """Are any TopologyListener instances registered?""" + return self.__enabled_for_topology + def event_listeners(self): """List of registered event listeners.""" - return self.__command_listeners[:] + return (self.__command_listeners[:], + self.__server_heartbeat_listeners[:], + self.__server_listeners[:], + self.__topology_listeners[:]) def publish_command_start(self, command, database_name, request_id, connection_id, op_id=None): @@ -322,7 +618,6 @@ class _EventListeners(object): except Exception: _handle_exception() - def publish_command_success(self, duration, reply, command_name, request_id, connection_id, op_id=None): """Publish a CommandSucceededEvent to all command listeners. @@ -346,7 +641,6 @@ class _EventListeners(object): except Exception: _handle_exception() - def publish_command_failure(self, duration, failure, command_name, request_id, connection_id, op_id=None): """Publish a CommandFailedEvent to all command listeners. @@ -370,3 +664,149 @@ class _EventListeners(object): subscriber.failed(event) except Exception: _handle_exception() + + def publish_server_heartbeat_started(self, connection_id): + """Publish a ServerHeartbeatStartedEvent to all server heartbeat + listeners. + + :Parameters: + - `connection_id`: The address (host/port pair) of the connection. + """ + event = ServerHeartbeatStartedEvent(connection_id) + for subscriber in self.__server_heartbeat_listeners: + try: + subscriber.started(event) + except Exception: + _handle_exception() + + def publish_server_heartbeat_succeeded(self, connection_id, duration, + reply): + """Publish a ServerHeartbeatSucceededEvent to all server heartbeat + listeners. + + :Parameters: + - `connection_id`: The address (host/port pair) of the connection. + - `duration`: The execution time of the event in the highest possible + resolution for the platform. + - `reply`: The command reply. + """ + event = ServerHeartbeatSucceededEvent(duration, reply, connection_id) + for subscriber in self.__server_heartbeat_listeners: + try: + subscriber.succeeded(event) + except Exception: + _handle_exception() + + def publish_server_heartbeat_failed(self, connection_id, duration, reply): + """Publish a ServerHeartbeatFailedEvent to all server heartbeat + listeners. + + :Parameters: + - `connection_id`: The address (host/port pair) of the connection. + - `duration`: The execution time of the event in the highest possible + resolution for the platform. + - `reply`: The command reply. + """ + event = ServerHeartbeatFailedEvent(duration, reply, connection_id) + for subscriber in self.__server_heartbeat_listeners: + try: + subscriber.failed(event) + except Exception: + _handle_exception() + + def publish_server_opened(self, server_address, topology_id): + """Publish a ServerOpeningEvent to all server listeners. + + :Parameters: + - `server_address`: The address (host/port pair) of the server. + - `topology_id`: A unique identifier for the topology this server + is a part of. + """ + event = ServerOpeningEvent(server_address, topology_id) + for subscriber in self.__server_listeners: + try: + subscriber.opened(event) + except Exception: + _handle_exception() + + def publish_server_closed(self, server_address, topology_id): + """Publish a ServerClosedEvent to all server listeners. + + :Parameters: + - `server_address`: The address (host/port pair) of the server. + - `topology_id`: A unique identifier for the topology this server + is a part of. + """ + event = ServerClosedEvent(server_address, topology_id) + for subscriber in self.__server_listeners: + try: + subscriber.closed(event) + except Exception: + _handle_exception() + + def publish_server_description_changed(self, previous_description, + new_description, server_address, + topology_id): + """Publish a ServerDescriptionChangedEvent to all server listeners. + + :Parameters: + - `previous_description`: The previous server description. + - `server_address`: The address (host/port pair) of the server. + - `new_description`: The new server description. + - `topology_id`: A unique identifier for the topology this server + is a part of. + """ + event = ServerDescriptionChangedEvent(previous_description, + new_description, server_address, + topology_id) + for subscriber in self.__server_listeners: + try: + subscriber.description_changed(event) + except Exception: + _handle_exception() + + def publish_topology_opened(self, topology_id): + """Publish a TopologyOpenedEvent to all topology listeners. + + :Parameters: + - `topology_id`: A unique identifier for the topology this server + is a part of. + """ + event = TopologyOpenedEvent(topology_id) + for subscriber in self.__topology_listeners: + try: + subscriber.opened(event) + except Exception: + _handle_exception() + + def publish_topology_closed(self, topology_id): + """Publish a TopologyClosedEvent to all topology listeners. + + :Parameters: + - `topology_id`: A unique identifier for the topology this server + is a part of. + """ + event = TopologyClosedEvent(topology_id) + for subscriber in self.__topology_listeners: + try: + subscriber.closed(event) + except Exception: + _handle_exception() + + def publish_topology_description_changed(self, previous_description, + new_description, topology_id): + """Publish a TopologyDescriptionChangedEvent to all topology listeners. + + :Parameters: + - `previous_description`: The previous topology description. + - `new_description`: The new topology description. + - `topology_id`: A unique identifier for the topology this server + is a part of. + """ + event = TopologyDescriptionChangedEvent(previous_description, + new_description, topology_id) + for subscriber in self.__topology_listeners: + try: + subscriber.description_changed(event) + except Exception: + _handle_exception() diff --git a/pymongo/server.py b/pymongo/server.py index f1bb181b5..f4f402216 100644 --- a/pymongo/server.py +++ b/pymongo/server.py @@ -25,11 +25,18 @@ from pymongo.server_type import SERVER_TYPE class Server(object): - def __init__(self, server_description, pool, monitor): + def __init__(self, server_description, pool, monitor, topology_id=None, + listeners=None, events=None): """Represent one MongoDB server.""" self._description = server_description self._pool = pool self._monitor = monitor + self._topology_id = topology_id + self._publish = listeners is not None and listeners.enabled_for_server + self._listener = listeners + self._events = None + if self._publish: + self._events = events() def open(self): """Start monitoring, or restart after a fork. @@ -47,6 +54,9 @@ class Server(object): Reconnect with open(). """ + if self._publish: + self._events.put((self._listener.publish_server_closed, + (self._description.address, self._topology_id))) self._monitor.close() self._pool.reset() @@ -82,6 +92,7 @@ class Server(object): - `operation`: A _Query or _GetMore object. - `set_slave_okay`: Pass to operation.get_message. - `all_credentials`: dict, maps auth source to MongoCredential. + - `listeners`: Instance of _EventListeners or None. - `exhaust` (optional): If True, the socket used stays checked out. It is returned along with its Pool in the Response. """ diff --git a/pymongo/settings.py b/pymongo/settings.py index 73881fffd..7e2675f0c 100644 --- a/pymongo/settings.py +++ b/pymongo/settings.py @@ -16,6 +16,7 @@ import threading +from bson.objectid import ObjectId from pymongo import monitor, pool from pymongo.common import LOCAL_THRESHOLD_MS, SERVER_SELECTION_TIMEOUT from pymongo.topology_description import TOPOLOGY_TYPE @@ -46,6 +47,7 @@ class TopologySettings(object): self._local_threshold_ms = local_threshold_ms self._server_selection_timeout = server_selection_timeout self._direct = (len(self._seeds) == 1 and not replica_set_name) + self._topology_id = ObjectId() @property def seeds(self): diff --git a/pymongo/topology.py b/pymongo/topology.py index bec295131..76e315631 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -18,14 +18,21 @@ import os import random import threading import warnings +import weakref + +from bson.py3compat import itervalues, PY3 +if PY3: + import queue as Queue +else: + import Queue -from bson.py3compat import itervalues from pymongo import common +from pymongo import periodic_executor from pymongo.pool import PoolOptions from pymongo.topology_description import (updated_topology_description, TOPOLOGY_TYPE, TopologyDescription) -from pymongo.errors import ServerSelectionTimeoutError, InvalidOperation +from pymongo.errors import ServerSelectionTimeoutError from pymongo.monotonic import time as _time from pymongo.server import Server from pymongo.server_selectors import (any_server_selector, @@ -35,9 +42,42 @@ from pymongo.server_selectors import (any_server_selector, writable_server_selector) +def process_events_queue(queue_ref): + q = queue_ref() + if not q: + return False # Cancel PeriodicExecutor. + + while True: + try: + event = q.get_nowait() + except Queue.Empty: + break + else: + fn, args = event + fn(*args) + + return True # Continue PeriodicExecutor. + + class Topology(object): """Monitor a topology of one or more servers.""" def __init__(self, topology_settings): + self._topology_id = topology_settings._topology_id + self._listeners = topology_settings._pool_options.event_listeners + pub = self._listeners is not None + self._publish_server = pub and self._listeners.enabled_for_server + self._publish_tp = pub and self._listeners.enabled_for_topology + + # Create events queue if there are publishers. + self._events = None + self._events_thread = None + + if self._publish_server or self._publish_tp: + self._events = Queue.Queue(maxsize=100) + + if self._publish_tp: + self._events.put((self._listeners.publish_topology_opened, + (self._topology_id,))) self._settings = topology_settings topology_description = TopologyDescription( topology_settings.get_topology_type(), @@ -47,6 +87,17 @@ class Topology(object): None) self._description = topology_description + if self._publish_tp: + self._events.put(( + self._listeners.publish_topology_description_changed, + (TopologyDescription( + TOPOLOGY_TYPE.Unknown, {}, None, None, None), + self._description, self._topology_id))) + for seed in topology_settings.seeds: + if self._publish_server: + self._events.put((self._listeners.publish_server_opened, + (seed, self._topology_id))) + # Store the seed list to help diagnose errors in _error_message(). self._seed_addresses = list(topology_description.server_descriptions()) self._opened = False @@ -55,6 +106,23 @@ class Topology(object): self._servers = {} self._pid = None + if self._publish_server or self._publish_tp: + def target(): + return process_events_queue(weak) + + executor = periodic_executor.PeriodicExecutor( + interval=common.EVENTS_QUEUE_FREQUENCY, + min_interval=0.5, + target=target, + name="pymongo_events_thread") + + # We strongly reference the executor and it weakly references + # the queue via this closure. When the topology is freed, stop + # the executor soon. + weak = weakref.ref(self._events) + self.__events_executor = executor + executor.open() + def open(self): """Start monitoring, or restart after a fork. @@ -173,11 +241,25 @@ class Topology(object): # change removed it. E.g., we got a host list from the primary # that didn't include this server. if self._description.has_server(server_description.address): + td_old = self._description + if self._publish_server: + old_server_description = td_old._server_descriptions[ + server_description.address] + self._events.put(( + self._listeners.publish_server_description_changed, + (old_server_description, server_description, + server_description.address, self._topology_id))) + self._description = updated_topology_description( self._description, server_description) self._update_servers() + if self._publish_tp: + self._events.put(( + self._listeners.publish_topology_description_changed, + (td_old, self._description, self._topology_id))) + # Wake waiters in select_servers(). self._condition.notify_all() @@ -219,7 +301,6 @@ class Topology(object): descriptions = selector(self._description.known_servers) return set([d.address for d in descriptions]) - def get_secondaries(self): """Return set of secondary addresses.""" return self._get_replica_set_members(secondary_server_selector) @@ -269,6 +350,12 @@ class Topology(object): # Mark all servers Unknown. self._description = self._description.reset() self._update_servers() + # Publish only after releasing the lock. + if self._publish_tp: + self._events.put((self._listeners.publish_topology_closed, + (self._topology_id,))) + if self._publish_server or self._publish_tp: + self.__events_executor.close() @property def description(self): @@ -282,6 +369,10 @@ class Topology(object): if not self._opened: self._opened = True self._update_servers() + + # Start or restart the events publishing thread. + if self._publish_tp or self._publish_server: + self.__events_executor.open() else: # Restart monitors if we forked since previous call. for server in itervalues(self._servers): @@ -343,10 +434,16 @@ class Topology(object): pool=self._create_pool_for_monitor(address), topology_settings=self._settings) + weak = None + if self._publish_server: + weak = weakref.ref(self._events) server = Server( server_description=sd, pool=self._create_pool_for_server(address), - monitor=monitor) + monitor=monitor, + topology_id=self._topology_id, + listeners=self._listeners, + events=weak) self._servers[address] = server server.open() @@ -372,7 +469,8 @@ class Topology(object): socket_timeout=options.connect_timeout, ssl_context=options.ssl_context, ssl_match_hostname=options.ssl_match_hostname, - socket_keepalive=True) + socket_keepalive=True, + event_listeners=options.event_listeners) return self._settings.pool_class(address, monitor_pool_options, handshake=False) diff --git a/test/__init__.py b/test/__init__.py index 62e6bdf4d..ca2e18de7 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -58,16 +58,20 @@ class client_knobs(object): def __init__( self, heartbeat_frequency=None, - kill_cursor_frequency=None): + kill_cursor_frequency=None, + events_queue_frequency=None): self.heartbeat_frequency = heartbeat_frequency self.kill_cursor_frequency = kill_cursor_frequency + self.events_queue_frequency = events_queue_frequency self.old_heartbeat_frequency = None self.old_kill_cursor_frequency = None + self.old_events_queue_frequency = None def enable(self): self.old_heartbeat_frequency = common.HEARTBEAT_FREQUENCY self.old_kill_cursor_frequency = common.KILL_CURSOR_FREQUENCY + self.old_events_queue_frequency = common.EVENTS_QUEUE_FREQUENCY if self.heartbeat_frequency is not None: common.HEARTBEAT_FREQUENCY = self.heartbeat_frequency @@ -75,12 +79,16 @@ class client_knobs(object): if self.kill_cursor_frequency is not None: common.KILL_CURSOR_FREQUENCY = self.kill_cursor_frequency + if self.events_queue_frequency is not None: + common.EVENTS_QUEUE_FREQUENCY = self.events_queue_frequency + def __enter__(self): self.enable() def disable(self): common.HEARTBEAT_FREQUENCY = self.old_heartbeat_frequency common.KILL_CURSOR_FREQUENCY = self.old_kill_cursor_frequency + common.EVENTS_QUEUE_FREQUENCY = self.old_events_queue_frequency def __exit__(self, exc_type, exc_val, exc_tb): self.disable() @@ -108,7 +116,7 @@ class ClientContext(object): client = pymongo.MongoClient(host, port, serverSelectionTimeoutMS=100) client.admin.command('ismaster') # Can we connect? - + # If so, then reset client to defaults. self.client = pymongo.MongoClient(host, port) diff --git a/test/sdam_monitoring/replica_set_with_no_primary.json b/test/sdam_monitoring/replica_set_with_no_primary.json new file mode 100644 index 000000000..2f398a85f --- /dev/null +++ b/test/sdam_monitoring/replica_set_with_no_primary.json @@ -0,0 +1,147 @@ +{ + "description": "Monitoring a topology that is a replica set with no primary connected", + "uri": "mongodb://a,b", + "phases": [ + { + "responses": [ + [ + "a:27017", + { + "ok": 1, + "ismaster": false, + "secondary": true, + "setName": "rs", + "setVersion": 1, + "primary": "b:27017", + "hosts": [ + "a:27017", "b:27017" + ], + "minWireVersion": 0, + "maxWireVersion": 4 + } + ] + ], + "outcome": { + "events": [ + { + "topology_opening_event": { + "topologyId": "42" + } + }, + { + "topology_description_changed_event": { + "topologyId": "42", + "previousDescription": { + "topologyType": "Unknown", + "servers": [] + }, + "newDescription": { + "topologyType": "Unknown", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ] + } + } + }, + { + "server_opening_event": { + "topologyId": "42", + "address": "a:27017" + } + }, + { + "server_opening_event": { + "topologyId": "42", + "address": "b:27017" + } + }, + { + "server_description_changed_event": { + "topologyId": "42", + "address": "a:27017", + "previousDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + "newDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [ + "a:27017", "b:27017" + ], + "passives": [], + "primary": "b:27017", + "setName": "rs", + "type": "RSSecondary" + } + } + }, + { + "topology_description_changed_event": { + "topologyId": "42", + "previousDescription": { + "topologyType": "Unknown", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ] + }, + "newDescription": { + "topologyType": "ReplicaSetNoPrimary", + "setName": "rs", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [ + "a:27017", "b:27017" + ], + "passives": [], + "primary": "b:27017", + "setName": "rs", + "type": "RSSecondary" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ] + } + } + } + ] + } + } + ] +} diff --git a/test/sdam_monitoring/replica_set_with_primary.json b/test/sdam_monitoring/replica_set_with_primary.json new file mode 100644 index 000000000..6c0d8819d --- /dev/null +++ b/test/sdam_monitoring/replica_set_with_primary.json @@ -0,0 +1,147 @@ +{ + "description": "Monitoring a topology that is a replica set with a primary connected", + "uri": "mongodb://a,b", + "phases": [ + { + "responses": [ + [ + "a:27017", + { + "ok": 1, + "ismaster": true, + "setName": "rs", + "setVersion": 1, + "primary": "a:27017", + "hosts": [ + "a:27017", "b:27017" + ], + "minWireVersion": 0, + "maxWireVersion": 4 + } + ] + ], + "outcome": { + "events": [ + { + "topology_opening_event": { + "topologyId": "42" + } + }, + { + "topology_description_changed_event": { + "topologyId": "42", + "previousDescription": { + "topologyType": "Unknown", + "servers": [] + }, + "newDescription": { + "topologyType": "Unknown", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ] + } + } + }, + { + "server_opening_event": { + "topologyId": "42", + "address": "a:27017" + } + }, + { + "server_opening_event": { + "topologyId": "42", + "address": "b:27017" + } + }, + { + "server_description_changed_event": { + "topologyId": "42", + "address": "a:27017", + "previousDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + "newDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [ + "a:27017", "b:27017" + ], + "passives": [], + "primary": "a:27017", + "setName": "rs", + "type": "RSPrimary" + } + } + }, + { + "topology_description_changed_event": { + "topologyId": "42", + "previousDescription": { + "topologyType": "Unknown", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ] + }, + "newDescription": { + "topologyType": "ReplicaSetWithPrimary", + "setName": "rs", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [ + "a:27017", + "b:27017" + ], + "passives": [], + "primary": "a:27017", + "setName": "rs", + "type": "RSPrimary" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ] + } + } + } + ] + } + } + ] +} diff --git a/test/sdam_monitoring/replica_set_with_removal.json b/test/sdam_monitoring/replica_set_with_removal.json new file mode 100644 index 000000000..a14456cdb --- /dev/null +++ b/test/sdam_monitoring/replica_set_with_removal.json @@ -0,0 +1,152 @@ +{ + "description": "Monitoring a replica set with non member", + "uri": "mongodb://a,b/", + "phases": [ + { + "responses": [ + [ + "a:27017", + { + "ok": 1, + "ismaster": true, + "setName": "rs", + "setVersion": 1, + "primary": "a:27017", + "hosts": [ + "a:27017" + ], + "minWireVersion": 0, + "maxWireVersion": 4 + } + ], + [ + "b:27017", + { + "ok": 1, + "ismaster": true + } + ] + ], + "outcome": { + "events": [ + { + "topology_opening_event": { + "topologyId": "42" + } + }, + { + "topology_description_changed_event": { + "topologyId": "42", + "previousDescription": { + "topologyType": "Unknown", + "servers": [] + }, + "newDescription": { + "topologyType": "Unknown", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ] + } + } + }, + { + "server_opening_event": { + "topologyId": "42", + "address": "a:27017" + } + }, + { + "server_opening_event": { + "topologyId": "42", + "address": "b:27017" + } + }, + { + "server_description_changed_event": { + "topologyId": "42", + "address": "a:27017", + "previousDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + "newDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [ + "a:27017" + ], + "passives": [], + "primary": "a:27017", + "setName": "rs", + "type": "RSPrimary" + } + } + }, + { + "server_closed_event": { + "topologyId": "42", + "address": "b:27017" + } + }, + { + "topology_description_changed_event": { + "topologyId": "42", + "previousDescription": { + "topologyType": "Unknown", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ] + }, + "newDescription": { + "topologyType": "ReplicaSetWithPrimary", + "setName": "rs", + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [ + "a:27017" + ], + "passives": [], + "primary": "a:27017", + "setName": "rs", + "type": "RSPrimary" + } + ] + } + } + } + ] + } + } + ] +} diff --git a/test/sdam_monitoring/required_replica_set.json b/test/sdam_monitoring/required_replica_set.json new file mode 100644 index 000000000..0afe0d1a4 --- /dev/null +++ b/test/sdam_monitoring/required_replica_set.json @@ -0,0 +1,149 @@ +{ + "description": "Monitoring a topology that is required to be a replica set", + "phases": [ + { + "outcome": { + "events": [ + { + "topology_opening_event": { + "topologyId": "42" + } + }, + { + "topology_description_changed_event": { + "newDescription": { + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ], + "topologyType": "ReplicaSetNoPrimary" + }, + "previousDescription": { + "servers": [], + "topologyType": "Unknown" + }, + "topologyId": "42" + } + }, + { + "server_opening_event": { + "address": "a:27017", + "topologyId": "42" + } + }, + { + "server_opening_event": { + "address": "b:27017", + "topologyId": "42" + } + }, + { + "server_description_changed_event": { + "address": "a:27017", + "newDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [ + "a:27017", + "b:27017" + ], + "passives": [], + "primary": "a:27017", + "setName": "rs", + "type": "RSPrimary" + }, + "previousDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + "topologyId": "42" + } + }, + { + "topology_description_changed_event": { + "newDescription": { + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [ + "a:27017", + "b:27017" + ], + "passives": [], + "primary": "a:27017", + "setName": "rs", + "type": "RSPrimary" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ], + "setName": "rs", + "topologyType": "ReplicaSetWithPrimary" + }, + "previousDescription": { + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + { + "address": "b:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ], + "topologyType": "ReplicaSetNoPrimary" + }, + "topologyId": "42" + } + } + ] + }, + "responses": [ + [ + "a:27017", + { + "hosts": [ + "a:27017", + "b:27017" + ], + "ismaster": true, + "maxWireVersion": 4, + "minWireVersion": 0, + "ok": 1, + "primary": "a:27017", + "setName": "rs", + "setVersion": 1.0 + } + ] + ] + } + ], + "uri": "mongodb://a,b/?replicaSet=rs" +} diff --git a/test/sdam_monitoring/standalone.json b/test/sdam_monitoring/standalone.json new file mode 100644 index 000000000..1ca3c3c24 --- /dev/null +++ b/test/sdam_monitoring/standalone.json @@ -0,0 +1,104 @@ +{ + "description": "Monitoring a standalone connection", + "phases": [ + { + "outcome": { + "events": [ + { + "topology_opening_event": { + "topologyId": "42" + } + }, + { + "topology_description_changed_event": { + "newDescription": { + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ], + "topologyType": "Single" + }, + "previousDescription": { + "servers": [], + "topologyType": "Unknown" + }, + "topologyId": "42" + } + }, + { + "server_opening_event": { + "address": "a:27017", + "topologyId": "42" + } + }, + { + "server_description_changed_event": { + "address": "a:27017", + "newDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Standalone" + }, + "previousDescription": { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + }, + "topologyId": "42" + } + }, + { + "topology_description_changed_event": { + "newDescription": { + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Standalone" + } + ], + "topologyType": "Single" + }, + "previousDescription": { + "servers": [ + { + "address": "a:27017", + "arbiters": [], + "hosts": [], + "passives": [], + "type": "Unknown" + } + ], + "topologyType": "Single" + }, + "topologyId": "42" + } + } + ] + }, + "responses": [ + [ + "a:27017", + { + "ismaster": true, + "maxWireVersion": 4, + "minWireVersion": 0, + "ok": 1 + } + ] + ] + } + ], + "uri": "mongodb://a:27017" +} diff --git a/test/test_collection.py b/test/test_collection.py index 1f7b94f6a..65f3e6f0e 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -2064,7 +2064,7 @@ class TestCollection(IntegrationTest): def test_find_one_and_write_concern(self): listener = EventListener() saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([]) + monitoring._LISTENERS = monitoring._Listeners([], [], [], []) db = single_client(event_listeners=[listener])[self.db.name] # non-default WriteConcern. c_w0 = db.get_collection( diff --git a/test/test_command_monitoring_spec.py b/test/test_command_monitoring_spec.py index 2149ac95a..a45209aa6 100644 --- a/test/test_command_monitoring_spec.py +++ b/test/test_command_monitoring_spec.py @@ -51,7 +51,7 @@ class TestAllScenarios(unittest.TestCase): def setUpClass(cls): cls.listener = EventListener() cls.saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([]) + monitoring._LISTENERS = monitoring._Listeners([], [], [], []) cls.client = single_client(event_listeners=[cls.listener]) @classmethod diff --git a/test/test_cursor.py b/test/test_cursor.py index 812beab47..adae3317e 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -228,7 +228,7 @@ class TestCursor(IntegrationTest): listener = EventListener() listener.add_command_filter('killCursors') saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([]) + monitoring._LISTENERS = monitoring._Listeners([], [], [], []) coll = single_client( event_listeners=[listener])[self.db.name].pymongo_test results = listener.results diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index bdd3ed5c5..32c4d8d55 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -71,6 +71,9 @@ class MockMonitor(object): def close(self): pass + def remove_stale_sockets(self): + pass + def create_mock_topology(uri, monitor_class=MockMonitor): # Some tests in the spec include URIs like mongodb://A/?connect=direct, diff --git a/test/test_heartbeat_monitoring.py b/test/test_heartbeat_monitoring.py new file mode 100644 index 000000000..a37da0d7d --- /dev/null +++ b/test/test_heartbeat_monitoring.py @@ -0,0 +1,134 @@ +# Copyright 2016 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the monitoring of the server heartbeats.""" + +import sys +import threading + +sys.path[0:0] = [""] + +from pymongo import monitoring +from pymongo.errors import ConnectionFailure +from pymongo.ismaster import IsMaster +from pymongo.monitor import Monitor +from test import unittest, client_knobs +from test.utils import HeartbeatEventListener, single_client, wait_until + +sys.path[0:0] = [""] + + +class MockSocketInfo(object): + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class MockPool(object): + def __init__(self, *args, **kwargs): + self.pool_id = 0 + self._lock = threading.Lock() + + def get_socket(self, all_credentials): + return MockSocketInfo() + + def return_socket(self, _): + pass + + def reset(self): + with self._lock: + self.pool_id += 1 + + def remove_stale_sockets(self): + pass + + +class TestHeartbeatMonitoring(unittest.TestCase): + @classmethod + def setUp(cls): + cls.all_listener = HeartbeatEventListener() + cls.saved_listeners = monitoring._LISTENERS + monitoring._LISTENERS = monitoring._Listeners([], [], [], []) + + @classmethod + def tearDown(cls): + monitoring._LISTENERS = cls.saved_listeners + + def create_mock_monitor(self, responses, uri, expected_results): + with client_knobs(heartbeat_frequency=0.1, events_queue_frequency=0.1): + class MockMonitor(Monitor): + def _check_with_socket(self, sock_info): + if isinstance(responses[1], Exception): + raise responses[1] + return IsMaster(responses[1]), 99 + + m = single_client(h=uri, + event_listeners=(self.all_listener,), + _monitor_class=MockMonitor, + _pool_class=MockPool + ) + + expected_len = len(expected_results) + wait_until(lambda: len(self.all_listener.results) == expected_len, + "publish all events", timeout=15) + + try: + for i in range(len(expected_results)): + result = self.all_listener.results[i] if len( + self.all_listener.results) > i else None + self.assertEqual(expected_results[i], + result.__class__.__name__) + self.assertEqual(result.connection_id, + responses[0]) + if expected_results[i] != 'ServerHeartbeatStartedEvent': + if isinstance(result.reply, IsMaster): + self.assertEqual(result.duration, 99) + self.assertEqual(result.reply._doc, responses[1]) + else: + self.assertEqual(result.reply, responses[1]) + + finally: + m.close() + + def test_standalone(self): + responses = (('a', 27017), + { + "ismaster": True, + "maxWireVersion": 4, + "minWireVersion": 0, + "ok": 1 + }) + uri = "mongodb://a:27017" + expected_results = ['ServerHeartbeatStartedEvent', + 'ServerHeartbeatSucceededEvent'] + + self.create_mock_monitor(responses, uri, expected_results) + + def test_standalone_error(self): + responses = (('a', 27017), + ConnectionFailure("SPECIAL MESSAGE")) + uri = "mongodb://a:27017" + expected_results = ['ServerHeartbeatStartedEvent', + 'ServerHeartbeatFailedEvent'] + + self.create_mock_monitor(responses, uri, expected_results) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_monitoring.py b/test/test_monitoring.py index 371ec4846..013eb284d 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -39,7 +39,7 @@ class TestCommandMonitoring(unittest.TestCase): cls.listener = EventListener() cls.saved_listeners = monitoring._LISTENERS # Don't use any global subscribers. - monitoring._LISTENERS = monitoring._Listeners([]) + monitoring._LISTENERS = monitoring._Listeners([], [], [], []) cls.client = single_client(event_listeners=[cls.listener]) @classmethod diff --git a/test/test_read_concern.py b/test/test_read_concern.py index 72b33d686..4bdabab04 100644 --- a/test/test_read_concern.py +++ b/test/test_read_concern.py @@ -33,7 +33,7 @@ class TestReadConcern(unittest.TestCase): cls.listener = EventListener() cls.saved_listeners = monitoring._LISTENERS # Don't use any global subscribers. - monitoring._LISTENERS = monitoring._Listeners([]) + monitoring._LISTENERS = monitoring._Listeners([], [], [], []) cls.client = single_client(event_listeners=[cls.listener]) cls.db = cls.client.pymongo_test diff --git a/test/test_sdam_monitoring_spec.py b/test/test_sdam_monitoring_spec.py new file mode 100644 index 000000000..6225034e5 --- /dev/null +++ b/test/test_sdam_monitoring_spec.py @@ -0,0 +1,288 @@ +# Copyright 2016 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Run the sdam monitoring spec tests.""" + +import json +import os +import sys +import weakref + +sys.path[0:0] = [""] + +from bson.json_util import object_hook +from pymongo import monitoring +from pymongo import periodic_executor +from pymongo.ismaster import IsMaster +from pymongo.monitor import Monitor +from pymongo.read_preferences import MovingAverage +from pymongo.server_description import ServerDescription +from pymongo.server_type import SERVER_TYPE +from pymongo.topology import TOPOLOGY_TYPE +from test import unittest, client_context, client_knobs +from test.utils import (ServerAndTopologyEventListener, + single_client, + wait_until) + +# Location of JSON test specifications. +_TEST_PATH = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + 'sdam_monitoring') + + +def compare_server_descriptions(expected, actual): + if ((not expected['address'] == "%s:%s" % actual.address) or + (not SERVER_TYPE.__getattribute__(expected['type']) == + actual.server_type)): + return False + expected_hosts = set( + expected['arbiters'] + expected['passives'] + expected['hosts']) + return expected_hosts == set("%s:%s" % s for s in actual.all_hosts) + + +def compare_topology_descriptions(expected, actual): + if not (TOPOLOGY_TYPE.__getattribute__( + expected['topologyType']) == actual.topology_type): + return False + expected = expected['servers'] + actual = actual.server_descriptions() + if len(expected) != len(actual): + return False + for exp_server in expected: + for address, actual_server in actual.items(): + if compare_server_descriptions(exp_server, actual_server): + break + else: + return False + return True + + +def compare_events(expected_dict, actual): + if not expected_dict: + return False, "Error: Bad expected value in YAML test" + if not actual: + return False, "Error: Event published was None" + + expected_type, expected = list(expected_dict.items())[0] + + if expected_type == "server_opening_event": + if not isinstance(actual, monitoring.ServerOpeningEvent): + return False, "Expected ServerOpeningEvent, got %s" % ( + actual.__class__) + if not expected['address'] == "%s:%s" % actual.server_address: + return (False, + "ServerOpeningEvent published with wrong address (expected" + " %s, got %s" % (expected['address'], + actual.server_address)) + + elif expected_type == "server_description_changed_event": + + if not isinstance(actual, monitoring.ServerDescriptionChangedEvent): + return (False, + "Expected ServerDescriptionChangedEvent, got %s" % ( + actual.__class__)) + if not expected['address'] == "%s:%s" % actual.server_address: + return (False, "ServerDescriptionChangedEvent has wrong address" + " (expected %s, got %s" % (expected['address'], + actual.server_address)) + + if not compare_server_descriptions( + expected['newDescription'], actual.new_description): + return (False, "New ServerDescription incorrect in" + " ServerDescriptionChangedEvent") + if not compare_server_descriptions(expected['previousDescription'], + actual.previous_description): + return (False, "Previous ServerDescription incorrect in" + " ServerDescriptionChangedEvent") + + elif expected_type == "server_closed_event": + if not isinstance(actual, monitoring.ServerClosedEvent): + return False, "Expected ServerClosedEvent, got %s" % ( + actual.__class__) + if not expected['address'] == "%s:%s" % actual.server_address: + return (False, "ServerClosedEvent published with wrong address" + " (expected %s, got %s" % (expected['address'], + actual.server_address)) + + elif expected_type == "topology_opening_event": + if not isinstance(actual, monitoring.TopologyOpenedEvent): + return False, "Expected TopologyOpeningEvent, got %s" % ( + actual.__class__) + + elif expected_type == "topology_description_changed_event": + if not isinstance(actual, monitoring.TopologyDescriptionChangedEvent): + return (False, "Expected TopologyDescriptionChangedEvent," + " got %s" % (actual.__class__)) + if not compare_topology_descriptions(expected['newDescription'], + actual.new_description): + return (False, "New TopologyDescription incorrect in " + "TopologyDescriptionChangedEvent") + if not compare_topology_descriptions( + expected['previousDescription'], + actual.previous_description): + return (False, "Previous TopologyDescription incorrect in" + " TopologyDescriptionChangedEvent") + + elif expected_type == "topology_closed_event": + if not isinstance(actual, monitoring.TopologyClosedEvent): + return False, "Expected TopologyClosedEvent, got %s" % ( + actual.__class__) + + else: + return False, "Incorrect event: expected %s, actual %s" % ( + expected_type, actual) + + return True, "" + + +def compare_multiple_events(i, expected_results, actual_results): + events_in_a_row = [] + j = i + while(j < len(expected_results) and isinstance( + actual_results[j], + actual_results[i].__class__)): + events_in_a_row.append(actual_results[j]) + j += 1 + message = '' + for event in events_in_a_row: + for k in range(i, j): + passed, message = compare_events(expected_results[k], event) + if passed: + expected_results[k] = None + break + else: + return i, False, message + return j, True, '' + + +class TestAllScenarios(unittest.TestCase): + + @classmethod + @client_context.require_connection + def setUp(cls): + cls.all_listener = ServerAndTopologyEventListener() + cls.saved_listeners = monitoring._LISTENERS + monitoring._LISTENERS = monitoring._Listeners([], [], [], []) + + @classmethod + def tearDown(cls): + monitoring._LISTENERS = cls.saved_listeners + + +def create_test(scenario_def): + def run_scenario(self): + responses = (r for r in scenario_def['phases'][0]['responses']) + + with client_knobs(events_queue_frequency=0.1): + class MockMonitor(Monitor): + def __init__(self, server_description, topology, pool, + topology_settings): + """Have to copy entire constructor from Monitor so that we + can override _run and change the periodic executor's + interval.""" + + self._server_description = server_description + self._pool = pool + self._settings = topology_settings + self._avg_round_trip_time = MovingAverage() + options = self._settings._pool_options + self._listeners = options.event_listeners + self._publish = self._listeners is not None + + def target(): + monitor = self_ref() + if monitor is None: + return False + MockMonitor._run(monitor) # Change target to subclass + return True + + # Shorten interval + executor = periodic_executor.PeriodicExecutor( + interval=0.1, + min_interval=0.1, + target=target, + name="pymongo_server_monitor_thread") + self._executor = executor + self_ref = weakref.ref(self, executor.close) + self._topology = weakref.proxy(topology, executor.close) + + def _run(self): + try: + if self._server_description.address != ('a', 27017): + # Because PyMongo doesn't keep information about + # the order of addresses, we might accidentally + # start a MockMonitor on the wrong server first, + # so we need to only mock responses for the server + # the test's response is supposed to come from. + return + response = next(responses)[1] + isMaster = IsMaster(response) + self._server_description = ServerDescription( + address=self._server_description.address, + ismaster=isMaster) + self._topology.on_change(self._server_description) + except (ReferenceError, StopIteration): + # Topology was garbage-collected. + self.close() + + m = single_client(h=scenario_def['uri'], + event_listeners=(self.all_listener,), + _monitor_class=MockMonitor) + + expected_results = scenario_def['phases'][0]['outcome']['events'] + + expected_len = len(expected_results) + wait_until(lambda: len(self.all_listener.results) >= expected_len, + "publish all events", timeout=15) + + try: + i = 0 + while i < expected_len: + result = self.all_listener.results[i] if len( + self.all_listener.results) > i else None + # The order of ServerOpening/ClosedEvents doesn't matter + if (isinstance(result, + monitoring.ServerOpeningEvent) or + isinstance(result, + monitoring.ServerClosedEvent)): + i, passed, message = compare_multiple_events( + i, expected_results, self.all_listener.results) + self.assertTrue(passed, message) + else: + self.assertTrue( + *compare_events(expected_results[i], result)) + i += 1 + + finally: + m.close() + return run_scenario + + +def create_tests(): + for dirpath, _, filenames in os.walk(_TEST_PATH): + for filename in filenames: + with open(os.path.join(dirpath, filename)) as scenario_stream: + scenario_def = json.load( + scenario_stream, object_hook=object_hook) + # Construct test from scenario. + new_test = create_test(scenario_def) + test_name = 'test_%s' % (os.path.splitext(filename)[0],) + new_test.__name__ = test_name + setattr(TestAllScenarios, new_test.__name__, new_test) + + +create_tests() + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_server_selection.py b/test/test_server_selection.py index d0820c9ba..ee926a632 100644 --- a/test/test_server_selection.py +++ b/test/test_server_selection.py @@ -55,6 +55,9 @@ class MockPool(object): def reset(self): pass + def remove_stale_sockets(self): + pass + class MockMonitor(object): def __init__(self, server_description, topology, pool, topology_settings): diff --git a/test/test_topology.py b/test/test_topology.py index 23a30e45e..ccebcb5a6 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -66,6 +66,9 @@ class MockPool(object): with self._lock: self.pool_id += 1 + def remove_stale_sockets(self): + pass + class MockMonitor(object): def __init__(self, server_description, topology, pool, topology_settings): diff --git a/test/utils.py b/test/utils.py index 6dcf70451..192a69e98 100644 --- a/test/utils.py +++ b/test/utils.py @@ -64,6 +64,39 @@ class EventListener(monitoring.CommandListener): self.results['failed'].append(event) +class ServerAndTopologyEventListener(monitoring.ServerListener, + monitoring.TopologyListener): + """Listens to all events.""" + + def __init__(self): + self.results = [] + + def opened(self, event): + self.results.append(event) + + def description_changed(self, event): + self.results.append(event) + + def closed(self, event): + self.results.append(event) + + +class HeartbeatEventListener(monitoring.ServerHeartbeatListener): + """Listens to only server heartbeat events.""" + + def __init__(self): + self.results = [] + + def started(self, event): + self.results.append(event) + + def succeeded(self, event): + self.results.append(event) + + def failed(self, event): + self.results.append(event) + + def _connection_string_noauth(h, p): if h.startswith("mongodb://"): return h @@ -146,7 +179,7 @@ def get_command_line(client): def server_started_with_option(client, cmdline_opt, config_opt): """Check if the server was started with a particular option. - + :Parameters: - `cmdline_opt`: The command line option (i.e. --nojournal) - `config_opt`: The config file option (i.e. nojournal)