PYTHON-1067 - Implement SDAM Monitoring

This commit is contained in:
aherlihy 2016-06-14 15:20:19 +02:00
parent 10608144d6
commit d98a745700
24 changed files with 1812 additions and 47 deletions

View File

@ -7,12 +7,33 @@
.. autofunction:: register(listener)
.. autoclass:: CommandListener
:members:
.. autoclass:: ServerListener
:members:
.. autoclass:: ServerHeartbeatListener
:members:
.. autoclass:: TopologyListener
:members:
.. autoclass:: CommandStartedEvent
:members:
:inherited-members:
.. autoclass:: CommandSucceededEvent
:members:
:inherited-members:
.. autoclass:: CommandFailedEvent
:members:
:inherited-members:
.. autoclass:: ServerDescriptionChangedEvent
:members:
.. autoclass:: ServerOpeningEvent
:members:
.. autoclass:: ServerClosedEvent
:members:
.. autoclass:: TopologyDescriptionChangedEvent
:members:
.. autoclass:: TopologyOpenedEvent
:members:
.. autoclass:: TopologyClosedEvent
:members:
.. autoclass:: ServerHeartbeatStartedEvent
:members:
.. autoclass:: ServerHeartbeatSucceededEvent
:members:
.. autoclass:: ServerHeartbeatFailedEvent
:members:

View File

@ -49,6 +49,9 @@ HEARTBEAT_FREQUENCY = 10
# Frequency to process kill-cursors, in seconds. See MongoClient.close_cursor.
KILL_CURSOR_FREQUENCY = 1
# Frequency to process events queue, in seconds.
EVENTS_QUEUE_FREQUENCY = 1
# How long to wait, in seconds, for a suitable server to be found before
# aborting an operation. For example, if the client attempts an insert
# during a replica set election, SERVER_SELECTION_TIMEOUT governs the

View File

@ -44,6 +44,9 @@ class Monitor(object):
self._pool = pool
self._settings = topology_settings
self._avg_round_trip_time = MovingAverage()
self._listeners = self._settings._pool_options.event_listeners
pub = self._listeners is not None
self._publish = pub and self._listeners.enabled_for_server_heartbeat
# We strongly reference the executor and it weakly references us via
# this closure. When the monitor is freed, stop the executor soon.
@ -61,7 +64,7 @@ class Monitor(object):
name="pymongo_server_monitor_thread")
self._executor = executor
# Avoid cycles. When self or topology is freed, stop executor soon.
self_ref = weakref.ref(self, executor.close)
self._topology = weakref.proxy(topology, executor.close)
@ -110,24 +113,34 @@ class Monitor(object):
address = self._server_description.address
retry = self._server_description.server_type != SERVER_TYPE.Unknown
start = _time()
try:
return self._check_once()
except ReferenceError:
raise
except Exception as error:
error_time = _time() - start
self._topology.reset_pool(address)
default = ServerDescription(address, error=error)
if not retry:
if self._publish:
self._listeners.publish_server_heartbeat_failed(
address, error_time, error)
self._avg_round_trip_time.reset()
# Server type defaults to Unknown.
return default
# Try a second and final time. If it fails return original error.
start = _time()
try:
return self._check_once()
except ReferenceError:
raise
except Exception:
except Exception as error:
error_time = _time() - start
if self._publish:
self._listeners.publish_server_heartbeat_failed(
address, error_time, error)
self._avg_round_trip_time.reset()
return default
@ -136,13 +149,19 @@ class Monitor(object):
Returns a ServerDescription, or raises an exception.
"""
address = self._server_description.address
if self._publish:
self._listeners.publish_server_heartbeat_started(address)
with self._pool.get_socket({}) as sock_info:
response, round_trip_time = self._check_with_socket(sock_info)
self._avg_round_trip_time.add_sample(round_trip_time)
sd = ServerDescription(
address=self._server_description.address,
address=address,
ismaster=response,
round_trip_time=self._avg_round_trip_time.get())
if self._publish:
self._listeners.publish_server_heartbeat_succeeded(
address, round_trip_time, response)
return sd

View File

@ -15,10 +15,8 @@
"""Tools to monitor driver events.
Use :func:`register` to register global listeners for specific events.
Currently only command events are published. Listeners must be
a subclass of :class:`CommandListener` and implement
:meth:`~CommandListener.started`, :meth:`~CommandListener.succeeded`, and
:meth:`~CommandListener.failed`.
Listeners must inherit from one of the abstract classes below and implement
the correct functions for that class.
For example, a simple command logger might be implemented like this::
@ -52,9 +50,9 @@ Event listeners can also be registered per instance of
client = MongoClient(event_listeners=[CommandLogger()])
Note that previously registered global listeners are automatically included when
configuring per client event listeners. Registering a new global listener will
not add that listener to existing client instances.
Note that previously registered global listeners are automatically included
when configuring per client event listeners. Registering a new global listener
will not add that listener to existing client instances.
.. note:: Events are delivered **synchronously**. Application threads block
waiting for event handlers (e.g. :meth:`~CommandListener.started`) to
@ -72,35 +70,133 @@ import traceback
from collections import namedtuple, Sequence
from pymongo.helpers import _handle_exception
_Listeners = namedtuple('Listeners', ('command_listeners',))
_Listeners = namedtuple('Listeners',
('command_listeners', 'server_listeners',
'server_heartbeat_listeners', 'topology_listeners'))
_LISTENERS = _Listeners([])
_LISTENERS = _Listeners([], [], [], [])
class CommandListener(object):
"""Abstract base class for command listeners."""
class _EventListener(object):
"""Abstract base class for all event listeners. """
class CommandListener(_EventListener):
"""Abstract base class for command listeners.
Handles `CommandStartedEvent`, `CommandSucceededEvent`,
and `CommandFailedEvent`"""
def started(self, event):
"""Abstract method to handle CommandStartedEvent.
"""Abstract method to handle a `CommandStartedEvent`.
:Parameters:
- `event`: An instance of :class:`CommandStartedEvent`
- `event`: An instance of :class:`CommandStartedEvent`.
"""
raise NotImplementedError
def succeeded(self, event):
"""Abstract method to handle CommandSucceededEvent.
"""Abstract method to handle a `CommandSucceededEvent`.
:Parameters:
- `event`: An instance of :class:`CommandSucceededEvent`
- `event`: An instance of :class:`CommandSucceededEvent`.
"""
raise NotImplementedError
def failed(self, event):
"""Abstract method to handle CommandFailedEvent.
"""Abstract method to handle a `CommandFailedEvent`.
:Parameters:
- `event`: An instance of :class:`CommandFailedEvent`
- `event`: An instance of :class:`CommandFailedEvent`.
"""
raise NotImplementedError
class ServerHeartbeatListener(_EventListener):
"""Abstract base class for server heartbeat listeners.
Handles `ServerHeartbeatStartedEvent`, `ServerHeartbeatSucceededEvent`,
and `ServerHeartbeatFailedEvent`."""
def started(self, event):
"""Abstract method to handle a `ServerHeartbeatStartedEvent`.
:Parameters:
- `event`: An instance of :class:`ServerHeartbeatStartedEvent`.
"""
raise NotImplementedError
def succeeded(self, event):
"""Abstract method to handle a `ServerHeartbeatSucceededEvent`.
:Parameters:
- `event`: An instance of :class:`ServerHeartbeatSucceededEvent`.
"""
raise NotImplementedError
def failed(self, event):
"""Abstract method to handle a `ServerHeartbeatFailedEvent`.
:Parameters:
- `event`: An instance of :class:`ServerHeartbeatFailedEvent`.
"""
raise NotImplementedError
class TopologyListener(_EventListener):
"""Abstract base class for topology monitoring listeners.
Handles `TopologyOpenedEvent`, `TopologyDescriptionChangedEvent`, and
`TopologyClosedEvent`."""
def opened(self, event):
"""Abstract method to handle a `TopologyOpenedEvent`.
:Parameters:
- `event`: An instance of :class:`TopologyOpenedEvent`.
"""
raise NotImplementedError
def changed(self, event):
"""Abstract method to handle a `TopologyDescriptionChangedEvent`.
:Parameters:
- `event`: An instance of :class:`TopologyDescriptionChangedEvent`.
"""
raise NotImplementedError
def closed(self, event):
"""Abstract method to handle a `TopologyClosedEvent`.
:Parameters:
- `event`: An instance of :class:`TopologyClosedEvent`.
"""
raise NotImplementedError
class ServerListener(_EventListener):
"""Abstract base class for server listeners.
Handles `ServerOpeningEvent`, `ServerDescriptionChangedEvent`, and
`ServerClosedEvent`."""
def opened(self, event):
"""Abstract method to handle a `ServerOpeningEvent`.
:Parameters:
- `event`: An instance of :class:`ServerOpeningEvent`.
"""
raise NotImplementedError
def changed(self, event):
"""Abstract method to handle a `ServerDescriptionChangedEvent`.
:Parameters:
- `event`: An instance of :class:`ServerDescriptionChangedEvent`.
"""
raise NotImplementedError
def closed(self, event):
"""Abstract method to handle a `ServerClosedEvent`.
:Parameters:
- `event`: An instance of :class:`ServerClosedEvent`.
"""
raise NotImplementedError
@ -118,9 +214,10 @@ def _validate_event_listeners(option, listeners):
if not isinstance(listeners, Sequence):
raise TypeError("%s must be a list or tuple" % (option,))
for listener in listeners:
if not isinstance(listener, CommandListener):
raise TypeError("Only subclasses of "
"pymongo.monitoring.CommandListener are supported")
if not isinstance(listener, _EventListener):
raise TypeError("Listeners for %s must be either a "
"CommandListener, ServerHeartbeatListener, "
"ServerListener, or TopologyListener." % (option,))
return listeners
@ -128,10 +225,22 @@ def register(listener):
"""Register a global event listener.
:Parameters:
- `listener`: A subclass of :class:`CommandListener`.
- `listener`: A subclasses of :class:`CommandListener,`
:class:`ServerHeartbeatListener`, :class:`ServerListener`, or
:class:`TopologyListener`.
"""
_validate_event_listeners('listener', [listener])
_LISTENERS.command_listeners.append(listener)
if not isinstance(listener, _EventListener):
raise TypeError("Listeners for %s must be either a "
"CommandListener, ServerHeartbeatListener, "
"ServerListener, or TopologyListener." % (listener,))
if isinstance(listener, CommandListener):
_LISTENERS.command_listeners.append(listener)
if isinstance(listener, ServerHeartbeatListener):
_LISTENERS.server_heartbeat_listeners.append(listener)
if isinstance(listener, ServerListener):
_LISTENERS.server_listeners.append(listener)
if isinstance(listener, TopologyListener):
_LISTENERS.topology_listeners.append(listener)
# Note - to avoid bugs from forgetting which if these is all lowercase and
@ -275,6 +384,160 @@ class CommandFailedEvent(_CommandEvent):
return self.__failure
class _ServerEvent(object):
"""Base class for server events."""
__slots__ = ("__server_address", "__topology_id")
def __init__(self, server_address, topology_id):
self.__server_address = server_address
self.__topology_id = topology_id
@property
def server_address(self):
"""The address (host/port pair) of the server"""
return self.__server_address
@property
def topology_id(self):
"""A unique identifier for the topology this server is a part of."""
return self.__topology_id
class ServerDescriptionChangedEvent(_ServerEvent):
"""Published when server description changes."""
__slots__ = ('__previous_description', '__new_description')
def __init__(self, previous_description, new_description, *args):
super(ServerDescriptionChangedEvent, self).__init__(*args)
self.__previous_description = previous_description
self.__new_description = new_description
@property
def previous_description(self):
"""The previous server description."""
return self.__previous_description
@property
def new_description(self):
"""The new server description."""
return self.__new_description
class ServerOpeningEvent(_ServerEvent):
"""Published when server is initialized."""
class ServerClosedEvent(_ServerEvent):
"""Published when server is closed."""
class TopologyEvent(object):
"""Base class for topology description events"""
__slots__ = ('__topology_id')
def __init__(self, topology_id):
self.__topology_id = topology_id
@property
def topology_id(self):
"""A unique identifier for the topology this server is a part of."""
return self.__topology_id
class TopologyDescriptionChangedEvent(TopologyEvent):
"""Published when the topology description changes."""
__slots__ = ('__previous_description', '__new_description')
def __init__(self, previous_description, new_description, *args):
super(TopologyDescriptionChangedEvent, self).__init__(*args)
self.__previous_description = previous_description
self.__new_description = new_description
@property
def previous_description(self):
"""The old topology description."""
return self.__previous_description
@property
def new_description(self):
"""The new topology description."""
return self.__new_description
class TopologyOpenedEvent(TopologyEvent):
"""Published when the topology is initialized."""
class TopologyClosedEvent(TopologyEvent):
"""Published when the topology is closed."""
class _ServerHeartbeatEvent(object):
"""Base class for server heartbeat events"""
__slots__ = ('__connection_id')
def __init__(self, connection_id):
self.__connection_id = connection_id
@property
def connection_id(self):
""""The address (host, port) of the server this heartbeat was sent
to."""
return self.__connection_id
class ServerHeartbeatStartedEvent(_ServerHeartbeatEvent):
""""Published when a heartbeat is started."""
class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent):
"""Fired when the server heartbeat succeeds."""
__slots__ = ('__duration', '__reply')
def __init__(self, duration, reply, *args):
super(ServerHeartbeatSucceededEvent, self).__init__(*args)
self.__duration = duration
self.__reply = reply
@property
def duration(self):
"""The duration of this heartbeat in microseconds."""
return self.__duration
@property
def reply(self):
"""The command reply."""
return self.__reply
class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent):
"""Fired when the server heartbeat fails, either with an "ok: 0"
or a socket exception."""
__slots__ = ('__duration', '__reply')
def __init__(self, duration, reply, *args):
super(ServerHeartbeatFailedEvent, self).__init__(*args)
self.__duration = duration
self.__reply = reply
@property
def duration(self):
"""The duration of this heartbeat in microseconds."""
return self.__duration
@property
def reply(self):
"""The command reply."""
return self.__reply
class _EventListeners(object):
"""Configure event listeners for a client instance.
@ -285,9 +548,25 @@ class _EventListeners(object):
"""
def __init__(self, listeners):
self.__command_listeners = _LISTENERS.command_listeners[:]
self.__server_listeners = _LISTENERS.server_listeners[:]
lst = _LISTENERS.server_heartbeat_listeners
self.__server_heartbeat_listeners = lst[:]
self.__topology_listeners = _LISTENERS.topology_listeners[:]
if listeners is not None:
self.__command_listeners.extend(listeners)
for lst in listeners:
if isinstance(lst, CommandListener):
self.__command_listeners.append(lst)
if isinstance(lst, ServerListener):
self.__server_listeners.append(lst)
if isinstance(lst, ServerHeartbeatListener):
self.__server_heartbeat_listeners.append(lst)
if isinstance(lst, TopologyListener):
self.__topology_listeners.append(lst)
self.__enabled_for_commands = bool(self.__command_listeners)
self.__enabled_for_server = bool(self.__server_listeners)
self.__enabled_for_server_heartbeat = bool(
self.__server_heartbeat_listeners)
self.__enabled_for_topology = bool(self.__topology_listeners)
@property
def enabled_for_commands(self):
@ -295,9 +574,26 @@ class _EventListeners(object):
return self.__enabled_for_commands
@property
def enabled_for_server(self):
"""Are any ServerListener instances registered?"""
return self.__enabled_for_server
@property
def enabled_for_server_heartbeat(self):
"""Are any ServerHeartbeatListener instances registered?"""
return self.__enabled_for_server_heartbeat
@property
def enabled_for_topology(self):
"""Are any TopologyListener instances registered?"""
return self.__enabled_for_topology
def event_listeners(self):
"""List of registered event listeners."""
return self.__command_listeners[:]
return (self.__command_listeners[:],
self.__server_heartbeat_listeners[:],
self.__server_listeners[:],
self.__topology_listeners[:])
def publish_command_start(self, command, database_name,
request_id, connection_id, op_id=None):
@ -322,7 +618,6 @@ class _EventListeners(object):
except Exception:
_handle_exception()
def publish_command_success(self, duration, reply, command_name,
request_id, connection_id, op_id=None):
"""Publish a CommandSucceededEvent to all command listeners.
@ -346,7 +641,6 @@ class _EventListeners(object):
except Exception:
_handle_exception()
def publish_command_failure(self, duration, failure, command_name,
request_id, connection_id, op_id=None):
"""Publish a CommandFailedEvent to all command listeners.
@ -370,3 +664,149 @@ class _EventListeners(object):
subscriber.failed(event)
except Exception:
_handle_exception()
def publish_server_heartbeat_started(self, connection_id):
"""Publish a ServerHeartbeatStartedEvent to all server heartbeat
listeners.
:Parameters:
- `connection_id`: The address (host/port pair) of the connection.
"""
event = ServerHeartbeatStartedEvent(connection_id)
for subscriber in self.__server_heartbeat_listeners:
try:
subscriber.started(event)
except Exception:
_handle_exception()
def publish_server_heartbeat_succeeded(self, connection_id, duration,
reply):
"""Publish a ServerHeartbeatSucceededEvent to all server heartbeat
listeners.
:Parameters:
- `connection_id`: The address (host/port pair) of the connection.
- `duration`: The execution time of the event in the highest possible
resolution for the platform.
- `reply`: The command reply.
"""
event = ServerHeartbeatSucceededEvent(duration, reply, connection_id)
for subscriber in self.__server_heartbeat_listeners:
try:
subscriber.succeeded(event)
except Exception:
_handle_exception()
def publish_server_heartbeat_failed(self, connection_id, duration, reply):
"""Publish a ServerHeartbeatFailedEvent to all server heartbeat
listeners.
:Parameters:
- `connection_id`: The address (host/port pair) of the connection.
- `duration`: The execution time of the event in the highest possible
resolution for the platform.
- `reply`: The command reply.
"""
event = ServerHeartbeatFailedEvent(duration, reply, connection_id)
for subscriber in self.__server_heartbeat_listeners:
try:
subscriber.failed(event)
except Exception:
_handle_exception()
def publish_server_opened(self, server_address, topology_id):
"""Publish a ServerOpeningEvent to all server listeners.
:Parameters:
- `server_address`: The address (host/port pair) of the server.
- `topology_id`: A unique identifier for the topology this server
is a part of.
"""
event = ServerOpeningEvent(server_address, topology_id)
for subscriber in self.__server_listeners:
try:
subscriber.opened(event)
except Exception:
_handle_exception()
def publish_server_closed(self, server_address, topology_id):
"""Publish a ServerClosedEvent to all server listeners.
:Parameters:
- `server_address`: The address (host/port pair) of the server.
- `topology_id`: A unique identifier for the topology this server
is a part of.
"""
event = ServerClosedEvent(server_address, topology_id)
for subscriber in self.__server_listeners:
try:
subscriber.closed(event)
except Exception:
_handle_exception()
def publish_server_description_changed(self, previous_description,
new_description, server_address,
topology_id):
"""Publish a ServerDescriptionChangedEvent to all server listeners.
:Parameters:
- `previous_description`: The previous server description.
- `server_address`: The address (host/port pair) of the server.
- `new_description`: The new server description.
- `topology_id`: A unique identifier for the topology this server
is a part of.
"""
event = ServerDescriptionChangedEvent(previous_description,
new_description, server_address,
topology_id)
for subscriber in self.__server_listeners:
try:
subscriber.description_changed(event)
except Exception:
_handle_exception()
def publish_topology_opened(self, topology_id):
"""Publish a TopologyOpenedEvent to all topology listeners.
:Parameters:
- `topology_id`: A unique identifier for the topology this server
is a part of.
"""
event = TopologyOpenedEvent(topology_id)
for subscriber in self.__topology_listeners:
try:
subscriber.opened(event)
except Exception:
_handle_exception()
def publish_topology_closed(self, topology_id):
"""Publish a TopologyClosedEvent to all topology listeners.
:Parameters:
- `topology_id`: A unique identifier for the topology this server
is a part of.
"""
event = TopologyClosedEvent(topology_id)
for subscriber in self.__topology_listeners:
try:
subscriber.closed(event)
except Exception:
_handle_exception()
def publish_topology_description_changed(self, previous_description,
new_description, topology_id):
"""Publish a TopologyDescriptionChangedEvent to all topology listeners.
:Parameters:
- `previous_description`: The previous topology description.
- `new_description`: The new topology description.
- `topology_id`: A unique identifier for the topology this server
is a part of.
"""
event = TopologyDescriptionChangedEvent(previous_description,
new_description, topology_id)
for subscriber in self.__topology_listeners:
try:
subscriber.description_changed(event)
except Exception:
_handle_exception()

View File

@ -25,11 +25,18 @@ from pymongo.server_type import SERVER_TYPE
class Server(object):
def __init__(self, server_description, pool, monitor):
def __init__(self, server_description, pool, monitor, topology_id=None,
listeners=None, events=None):
"""Represent one MongoDB server."""
self._description = server_description
self._pool = pool
self._monitor = monitor
self._topology_id = topology_id
self._publish = listeners is not None and listeners.enabled_for_server
self._listener = listeners
self._events = None
if self._publish:
self._events = events()
def open(self):
"""Start monitoring, or restart after a fork.
@ -47,6 +54,9 @@ class Server(object):
Reconnect with open().
"""
if self._publish:
self._events.put((self._listener.publish_server_closed,
(self._description.address, self._topology_id)))
self._monitor.close()
self._pool.reset()
@ -82,6 +92,7 @@ class Server(object):
- `operation`: A _Query or _GetMore object.
- `set_slave_okay`: Pass to operation.get_message.
- `all_credentials`: dict, maps auth source to MongoCredential.
- `listeners`: Instance of _EventListeners or None.
- `exhaust` (optional): If True, the socket used stays checked out.
It is returned along with its Pool in the Response.
"""

View File

@ -16,6 +16,7 @@
import threading
from bson.objectid import ObjectId
from pymongo import monitor, pool
from pymongo.common import LOCAL_THRESHOLD_MS, SERVER_SELECTION_TIMEOUT
from pymongo.topology_description import TOPOLOGY_TYPE
@ -46,6 +47,7 @@ class TopologySettings(object):
self._local_threshold_ms = local_threshold_ms
self._server_selection_timeout = server_selection_timeout
self._direct = (len(self._seeds) == 1 and not replica_set_name)
self._topology_id = ObjectId()
@property
def seeds(self):

View File

@ -18,14 +18,21 @@ import os
import random
import threading
import warnings
import weakref
from bson.py3compat import itervalues, PY3
if PY3:
import queue as Queue
else:
import Queue
from bson.py3compat import itervalues
from pymongo import common
from pymongo import periodic_executor
from pymongo.pool import PoolOptions
from pymongo.topology_description import (updated_topology_description,
TOPOLOGY_TYPE,
TopologyDescription)
from pymongo.errors import ServerSelectionTimeoutError, InvalidOperation
from pymongo.errors import ServerSelectionTimeoutError
from pymongo.monotonic import time as _time
from pymongo.server import Server
from pymongo.server_selectors import (any_server_selector,
@ -35,9 +42,42 @@ from pymongo.server_selectors import (any_server_selector,
writable_server_selector)
def process_events_queue(queue_ref):
q = queue_ref()
if not q:
return False # Cancel PeriodicExecutor.
while True:
try:
event = q.get_nowait()
except Queue.Empty:
break
else:
fn, args = event
fn(*args)
return True # Continue PeriodicExecutor.
class Topology(object):
"""Monitor a topology of one or more servers."""
def __init__(self, topology_settings):
self._topology_id = topology_settings._topology_id
self._listeners = topology_settings._pool_options.event_listeners
pub = self._listeners is not None
self._publish_server = pub and self._listeners.enabled_for_server
self._publish_tp = pub and self._listeners.enabled_for_topology
# Create events queue if there are publishers.
self._events = None
self._events_thread = None
if self._publish_server or self._publish_tp:
self._events = Queue.Queue(maxsize=100)
if self._publish_tp:
self._events.put((self._listeners.publish_topology_opened,
(self._topology_id,)))
self._settings = topology_settings
topology_description = TopologyDescription(
topology_settings.get_topology_type(),
@ -47,6 +87,17 @@ class Topology(object):
None)
self._description = topology_description
if self._publish_tp:
self._events.put((
self._listeners.publish_topology_description_changed,
(TopologyDescription(
TOPOLOGY_TYPE.Unknown, {}, None, None, None),
self._description, self._topology_id)))
for seed in topology_settings.seeds:
if self._publish_server:
self._events.put((self._listeners.publish_server_opened,
(seed, self._topology_id)))
# Store the seed list to help diagnose errors in _error_message().
self._seed_addresses = list(topology_description.server_descriptions())
self._opened = False
@ -55,6 +106,23 @@ class Topology(object):
self._servers = {}
self._pid = None
if self._publish_server or self._publish_tp:
def target():
return process_events_queue(weak)
executor = periodic_executor.PeriodicExecutor(
interval=common.EVENTS_QUEUE_FREQUENCY,
min_interval=0.5,
target=target,
name="pymongo_events_thread")
# We strongly reference the executor and it weakly references
# the queue via this closure. When the topology is freed, stop
# the executor soon.
weak = weakref.ref(self._events)
self.__events_executor = executor
executor.open()
def open(self):
"""Start monitoring, or restart after a fork.
@ -173,11 +241,25 @@ class Topology(object):
# change removed it. E.g., we got a host list from the primary
# that didn't include this server.
if self._description.has_server(server_description.address):
td_old = self._description
if self._publish_server:
old_server_description = td_old._server_descriptions[
server_description.address]
self._events.put((
self._listeners.publish_server_description_changed,
(old_server_description, server_description,
server_description.address, self._topology_id)))
self._description = updated_topology_description(
self._description, server_description)
self._update_servers()
if self._publish_tp:
self._events.put((
self._listeners.publish_topology_description_changed,
(td_old, self._description, self._topology_id)))
# Wake waiters in select_servers().
self._condition.notify_all()
@ -219,7 +301,6 @@ class Topology(object):
descriptions = selector(self._description.known_servers)
return set([d.address for d in descriptions])
def get_secondaries(self):
"""Return set of secondary addresses."""
return self._get_replica_set_members(secondary_server_selector)
@ -269,6 +350,12 @@ class Topology(object):
# Mark all servers Unknown.
self._description = self._description.reset()
self._update_servers()
# Publish only after releasing the lock.
if self._publish_tp:
self._events.put((self._listeners.publish_topology_closed,
(self._topology_id,)))
if self._publish_server or self._publish_tp:
self.__events_executor.close()
@property
def description(self):
@ -282,6 +369,10 @@ class Topology(object):
if not self._opened:
self._opened = True
self._update_servers()
# Start or restart the events publishing thread.
if self._publish_tp or self._publish_server:
self.__events_executor.open()
else:
# Restart monitors if we forked since previous call.
for server in itervalues(self._servers):
@ -343,10 +434,16 @@ class Topology(object):
pool=self._create_pool_for_monitor(address),
topology_settings=self._settings)
weak = None
if self._publish_server:
weak = weakref.ref(self._events)
server = Server(
server_description=sd,
pool=self._create_pool_for_server(address),
monitor=monitor)
monitor=monitor,
topology_id=self._topology_id,
listeners=self._listeners,
events=weak)
self._servers[address] = server
server.open()
@ -372,7 +469,8 @@ class Topology(object):
socket_timeout=options.connect_timeout,
ssl_context=options.ssl_context,
ssl_match_hostname=options.ssl_match_hostname,
socket_keepalive=True)
socket_keepalive=True,
event_listeners=options.event_listeners)
return self._settings.pool_class(address, monitor_pool_options,
handshake=False)

View File

@ -58,16 +58,20 @@ class client_knobs(object):
def __init__(
self,
heartbeat_frequency=None,
kill_cursor_frequency=None):
kill_cursor_frequency=None,
events_queue_frequency=None):
self.heartbeat_frequency = heartbeat_frequency
self.kill_cursor_frequency = kill_cursor_frequency
self.events_queue_frequency = events_queue_frequency
self.old_heartbeat_frequency = None
self.old_kill_cursor_frequency = None
self.old_events_queue_frequency = None
def enable(self):
self.old_heartbeat_frequency = common.HEARTBEAT_FREQUENCY
self.old_kill_cursor_frequency = common.KILL_CURSOR_FREQUENCY
self.old_events_queue_frequency = common.EVENTS_QUEUE_FREQUENCY
if self.heartbeat_frequency is not None:
common.HEARTBEAT_FREQUENCY = self.heartbeat_frequency
@ -75,12 +79,16 @@ class client_knobs(object):
if self.kill_cursor_frequency is not None:
common.KILL_CURSOR_FREQUENCY = self.kill_cursor_frequency
if self.events_queue_frequency is not None:
common.EVENTS_QUEUE_FREQUENCY = self.events_queue_frequency
def __enter__(self):
self.enable()
def disable(self):
common.HEARTBEAT_FREQUENCY = self.old_heartbeat_frequency
common.KILL_CURSOR_FREQUENCY = self.old_kill_cursor_frequency
common.EVENTS_QUEUE_FREQUENCY = self.old_events_queue_frequency
def __exit__(self, exc_type, exc_val, exc_tb):
self.disable()
@ -108,7 +116,7 @@ class ClientContext(object):
client = pymongo.MongoClient(host, port,
serverSelectionTimeoutMS=100)
client.admin.command('ismaster') # Can we connect?
# If so, then reset client to defaults.
self.client = pymongo.MongoClient(host, port)

View File

@ -0,0 +1,147 @@
{
"description": "Monitoring a topology that is a replica set with no primary connected",
"uri": "mongodb://a,b",
"phases": [
{
"responses": [
[
"a:27017",
{
"ok": 1,
"ismaster": false,
"secondary": true,
"setName": "rs",
"setVersion": 1,
"primary": "b:27017",
"hosts": [
"a:27017", "b:27017"
],
"minWireVersion": 0,
"maxWireVersion": 4
}
]
],
"outcome": {
"events": [
{
"topology_opening_event": {
"topologyId": "42"
}
},
{
"topology_description_changed_event": {
"topologyId": "42",
"previousDescription": {
"topologyType": "Unknown",
"servers": []
},
"newDescription": {
"topologyType": "Unknown",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
]
}
}
},
{
"server_opening_event": {
"topologyId": "42",
"address": "a:27017"
}
},
{
"server_opening_event": {
"topologyId": "42",
"address": "b:27017"
}
},
{
"server_description_changed_event": {
"topologyId": "42",
"address": "a:27017",
"previousDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
"newDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [
"a:27017", "b:27017"
],
"passives": [],
"primary": "b:27017",
"setName": "rs",
"type": "RSSecondary"
}
}
},
{
"topology_description_changed_event": {
"topologyId": "42",
"previousDescription": {
"topologyType": "Unknown",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
]
},
"newDescription": {
"topologyType": "ReplicaSetNoPrimary",
"setName": "rs",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [
"a:27017", "b:27017"
],
"passives": [],
"primary": "b:27017",
"setName": "rs",
"type": "RSSecondary"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
]
}
}
}
]
}
}
]
}

View File

@ -0,0 +1,147 @@
{
"description": "Monitoring a topology that is a replica set with a primary connected",
"uri": "mongodb://a,b",
"phases": [
{
"responses": [
[
"a:27017",
{
"ok": 1,
"ismaster": true,
"setName": "rs",
"setVersion": 1,
"primary": "a:27017",
"hosts": [
"a:27017", "b:27017"
],
"minWireVersion": 0,
"maxWireVersion": 4
}
]
],
"outcome": {
"events": [
{
"topology_opening_event": {
"topologyId": "42"
}
},
{
"topology_description_changed_event": {
"topologyId": "42",
"previousDescription": {
"topologyType": "Unknown",
"servers": []
},
"newDescription": {
"topologyType": "Unknown",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
]
}
}
},
{
"server_opening_event": {
"topologyId": "42",
"address": "a:27017"
}
},
{
"server_opening_event": {
"topologyId": "42",
"address": "b:27017"
}
},
{
"server_description_changed_event": {
"topologyId": "42",
"address": "a:27017",
"previousDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
"newDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [
"a:27017", "b:27017"
],
"passives": [],
"primary": "a:27017",
"setName": "rs",
"type": "RSPrimary"
}
}
},
{
"topology_description_changed_event": {
"topologyId": "42",
"previousDescription": {
"topologyType": "Unknown",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
]
},
"newDescription": {
"topologyType": "ReplicaSetWithPrimary",
"setName": "rs",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [
"a:27017",
"b:27017"
],
"passives": [],
"primary": "a:27017",
"setName": "rs",
"type": "RSPrimary"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
]
}
}
}
]
}
}
]
}

View File

@ -0,0 +1,152 @@
{
"description": "Monitoring a replica set with non member",
"uri": "mongodb://a,b/",
"phases": [
{
"responses": [
[
"a:27017",
{
"ok": 1,
"ismaster": true,
"setName": "rs",
"setVersion": 1,
"primary": "a:27017",
"hosts": [
"a:27017"
],
"minWireVersion": 0,
"maxWireVersion": 4
}
],
[
"b:27017",
{
"ok": 1,
"ismaster": true
}
]
],
"outcome": {
"events": [
{
"topology_opening_event": {
"topologyId": "42"
}
},
{
"topology_description_changed_event": {
"topologyId": "42",
"previousDescription": {
"topologyType": "Unknown",
"servers": []
},
"newDescription": {
"topologyType": "Unknown",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
]
}
}
},
{
"server_opening_event": {
"topologyId": "42",
"address": "a:27017"
}
},
{
"server_opening_event": {
"topologyId": "42",
"address": "b:27017"
}
},
{
"server_description_changed_event": {
"topologyId": "42",
"address": "a:27017",
"previousDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
"newDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [
"a:27017"
],
"passives": [],
"primary": "a:27017",
"setName": "rs",
"type": "RSPrimary"
}
}
},
{
"server_closed_event": {
"topologyId": "42",
"address": "b:27017"
}
},
{
"topology_description_changed_event": {
"topologyId": "42",
"previousDescription": {
"topologyType": "Unknown",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
]
},
"newDescription": {
"topologyType": "ReplicaSetWithPrimary",
"setName": "rs",
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [
"a:27017"
],
"passives": [],
"primary": "a:27017",
"setName": "rs",
"type": "RSPrimary"
}
]
}
}
}
]
}
}
]
}

View File

@ -0,0 +1,149 @@
{
"description": "Monitoring a topology that is required to be a replica set",
"phases": [
{
"outcome": {
"events": [
{
"topology_opening_event": {
"topologyId": "42"
}
},
{
"topology_description_changed_event": {
"newDescription": {
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
],
"topologyType": "ReplicaSetNoPrimary"
},
"previousDescription": {
"servers": [],
"topologyType": "Unknown"
},
"topologyId": "42"
}
},
{
"server_opening_event": {
"address": "a:27017",
"topologyId": "42"
}
},
{
"server_opening_event": {
"address": "b:27017",
"topologyId": "42"
}
},
{
"server_description_changed_event": {
"address": "a:27017",
"newDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [
"a:27017",
"b:27017"
],
"passives": [],
"primary": "a:27017",
"setName": "rs",
"type": "RSPrimary"
},
"previousDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
"topologyId": "42"
}
},
{
"topology_description_changed_event": {
"newDescription": {
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [
"a:27017",
"b:27017"
],
"passives": [],
"primary": "a:27017",
"setName": "rs",
"type": "RSPrimary"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
],
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"previousDescription": {
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
{
"address": "b:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
],
"topologyType": "ReplicaSetNoPrimary"
},
"topologyId": "42"
}
}
]
},
"responses": [
[
"a:27017",
{
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"maxWireVersion": 4,
"minWireVersion": 0,
"ok": 1,
"primary": "a:27017",
"setName": "rs",
"setVersion": 1.0
}
]
]
}
],
"uri": "mongodb://a,b/?replicaSet=rs"
}

View File

@ -0,0 +1,104 @@
{
"description": "Monitoring a standalone connection",
"phases": [
{
"outcome": {
"events": [
{
"topology_opening_event": {
"topologyId": "42"
}
},
{
"topology_description_changed_event": {
"newDescription": {
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
],
"topologyType": "Single"
},
"previousDescription": {
"servers": [],
"topologyType": "Unknown"
},
"topologyId": "42"
}
},
{
"server_opening_event": {
"address": "a:27017",
"topologyId": "42"
}
},
{
"server_description_changed_event": {
"address": "a:27017",
"newDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Standalone"
},
"previousDescription": {
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
},
"topologyId": "42"
}
},
{
"topology_description_changed_event": {
"newDescription": {
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Standalone"
}
],
"topologyType": "Single"
},
"previousDescription": {
"servers": [
{
"address": "a:27017",
"arbiters": [],
"hosts": [],
"passives": [],
"type": "Unknown"
}
],
"topologyType": "Single"
},
"topologyId": "42"
}
}
]
},
"responses": [
[
"a:27017",
{
"ismaster": true,
"maxWireVersion": 4,
"minWireVersion": 0,
"ok": 1
}
]
]
}
],
"uri": "mongodb://a:27017"
}

View File

@ -2064,7 +2064,7 @@ class TestCollection(IntegrationTest):
def test_find_one_and_write_concern(self):
listener = EventListener()
saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([])
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
db = single_client(event_listeners=[listener])[self.db.name]
# non-default WriteConcern.
c_w0 = db.get_collection(

View File

@ -51,7 +51,7 @@ class TestAllScenarios(unittest.TestCase):
def setUpClass(cls):
cls.listener = EventListener()
cls.saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([])
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
cls.client = single_client(event_listeners=[cls.listener])
@classmethod

View File

@ -228,7 +228,7 @@ class TestCursor(IntegrationTest):
listener = EventListener()
listener.add_command_filter('killCursors')
saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([])
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
coll = single_client(
event_listeners=[listener])[self.db.name].pymongo_test
results = listener.results

View File

@ -71,6 +71,9 @@ class MockMonitor(object):
def close(self):
pass
def remove_stale_sockets(self):
pass
def create_mock_topology(uri, monitor_class=MockMonitor):
# Some tests in the spec include URIs like mongodb://A/?connect=direct,

View File

@ -0,0 +1,134 @@
# Copyright 2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the monitoring of the server heartbeats."""
import sys
import threading
sys.path[0:0] = [""]
from pymongo import monitoring
from pymongo.errors import ConnectionFailure
from pymongo.ismaster import IsMaster
from pymongo.monitor import Monitor
from test import unittest, client_knobs
from test.utils import HeartbeatEventListener, single_client, wait_until
sys.path[0:0] = [""]
class MockSocketInfo(object):
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class MockPool(object):
def __init__(self, *args, **kwargs):
self.pool_id = 0
self._lock = threading.Lock()
def get_socket(self, all_credentials):
return MockSocketInfo()
def return_socket(self, _):
pass
def reset(self):
with self._lock:
self.pool_id += 1
def remove_stale_sockets(self):
pass
class TestHeartbeatMonitoring(unittest.TestCase):
@classmethod
def setUp(cls):
cls.all_listener = HeartbeatEventListener()
cls.saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
@classmethod
def tearDown(cls):
monitoring._LISTENERS = cls.saved_listeners
def create_mock_monitor(self, responses, uri, expected_results):
with client_knobs(heartbeat_frequency=0.1, events_queue_frequency=0.1):
class MockMonitor(Monitor):
def _check_with_socket(self, sock_info):
if isinstance(responses[1], Exception):
raise responses[1]
return IsMaster(responses[1]), 99
m = single_client(h=uri,
event_listeners=(self.all_listener,),
_monitor_class=MockMonitor,
_pool_class=MockPool
)
expected_len = len(expected_results)
wait_until(lambda: len(self.all_listener.results) == expected_len,
"publish all events", timeout=15)
try:
for i in range(len(expected_results)):
result = self.all_listener.results[i] if len(
self.all_listener.results) > i else None
self.assertEqual(expected_results[i],
result.__class__.__name__)
self.assertEqual(result.connection_id,
responses[0])
if expected_results[i] != 'ServerHeartbeatStartedEvent':
if isinstance(result.reply, IsMaster):
self.assertEqual(result.duration, 99)
self.assertEqual(result.reply._doc, responses[1])
else:
self.assertEqual(result.reply, responses[1])
finally:
m.close()
def test_standalone(self):
responses = (('a', 27017),
{
"ismaster": True,
"maxWireVersion": 4,
"minWireVersion": 0,
"ok": 1
})
uri = "mongodb://a:27017"
expected_results = ['ServerHeartbeatStartedEvent',
'ServerHeartbeatSucceededEvent']
self.create_mock_monitor(responses, uri, expected_results)
def test_standalone_error(self):
responses = (('a', 27017),
ConnectionFailure("SPECIAL MESSAGE"))
uri = "mongodb://a:27017"
expected_results = ['ServerHeartbeatStartedEvent',
'ServerHeartbeatFailedEvent']
self.create_mock_monitor(responses, uri, expected_results)
if __name__ == "__main__":
unittest.main()

View File

@ -39,7 +39,7 @@ class TestCommandMonitoring(unittest.TestCase):
cls.listener = EventListener()
cls.saved_listeners = monitoring._LISTENERS
# Don't use any global subscribers.
monitoring._LISTENERS = monitoring._Listeners([])
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
cls.client = single_client(event_listeners=[cls.listener])
@classmethod

View File

@ -33,7 +33,7 @@ class TestReadConcern(unittest.TestCase):
cls.listener = EventListener()
cls.saved_listeners = monitoring._LISTENERS
# Don't use any global subscribers.
monitoring._LISTENERS = monitoring._Listeners([])
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
cls.client = single_client(event_listeners=[cls.listener])
cls.db = cls.client.pymongo_test

View File

@ -0,0 +1,288 @@
# Copyright 2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run the sdam monitoring spec tests."""
import json
import os
import sys
import weakref
sys.path[0:0] = [""]
from bson.json_util import object_hook
from pymongo import monitoring
from pymongo import periodic_executor
from pymongo.ismaster import IsMaster
from pymongo.monitor import Monitor
from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
from pymongo.server_type import SERVER_TYPE
from pymongo.topology import TOPOLOGY_TYPE
from test import unittest, client_context, client_knobs
from test.utils import (ServerAndTopologyEventListener,
single_client,
wait_until)
# Location of JSON test specifications.
_TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'sdam_monitoring')
def compare_server_descriptions(expected, actual):
if ((not expected['address'] == "%s:%s" % actual.address) or
(not SERVER_TYPE.__getattribute__(expected['type']) ==
actual.server_type)):
return False
expected_hosts = set(
expected['arbiters'] + expected['passives'] + expected['hosts'])
return expected_hosts == set("%s:%s" % s for s in actual.all_hosts)
def compare_topology_descriptions(expected, actual):
if not (TOPOLOGY_TYPE.__getattribute__(
expected['topologyType']) == actual.topology_type):
return False
expected = expected['servers']
actual = actual.server_descriptions()
if len(expected) != len(actual):
return False
for exp_server in expected:
for address, actual_server in actual.items():
if compare_server_descriptions(exp_server, actual_server):
break
else:
return False
return True
def compare_events(expected_dict, actual):
if not expected_dict:
return False, "Error: Bad expected value in YAML test"
if not actual:
return False, "Error: Event published was None"
expected_type, expected = list(expected_dict.items())[0]
if expected_type == "server_opening_event":
if not isinstance(actual, monitoring.ServerOpeningEvent):
return False, "Expected ServerOpeningEvent, got %s" % (
actual.__class__)
if not expected['address'] == "%s:%s" % actual.server_address:
return (False,
"ServerOpeningEvent published with wrong address (expected"
" %s, got %s" % (expected['address'],
actual.server_address))
elif expected_type == "server_description_changed_event":
if not isinstance(actual, monitoring.ServerDescriptionChangedEvent):
return (False,
"Expected ServerDescriptionChangedEvent, got %s" % (
actual.__class__))
if not expected['address'] == "%s:%s" % actual.server_address:
return (False, "ServerDescriptionChangedEvent has wrong address"
" (expected %s, got %s" % (expected['address'],
actual.server_address))
if not compare_server_descriptions(
expected['newDescription'], actual.new_description):
return (False, "New ServerDescription incorrect in"
" ServerDescriptionChangedEvent")
if not compare_server_descriptions(expected['previousDescription'],
actual.previous_description):
return (False, "Previous ServerDescription incorrect in"
" ServerDescriptionChangedEvent")
elif expected_type == "server_closed_event":
if not isinstance(actual, monitoring.ServerClosedEvent):
return False, "Expected ServerClosedEvent, got %s" % (
actual.__class__)
if not expected['address'] == "%s:%s" % actual.server_address:
return (False, "ServerClosedEvent published with wrong address"
" (expected %s, got %s" % (expected['address'],
actual.server_address))
elif expected_type == "topology_opening_event":
if not isinstance(actual, monitoring.TopologyOpenedEvent):
return False, "Expected TopologyOpeningEvent, got %s" % (
actual.__class__)
elif expected_type == "topology_description_changed_event":
if not isinstance(actual, monitoring.TopologyDescriptionChangedEvent):
return (False, "Expected TopologyDescriptionChangedEvent,"
" got %s" % (actual.__class__))
if not compare_topology_descriptions(expected['newDescription'],
actual.new_description):
return (False, "New TopologyDescription incorrect in "
"TopologyDescriptionChangedEvent")
if not compare_topology_descriptions(
expected['previousDescription'],
actual.previous_description):
return (False, "Previous TopologyDescription incorrect in"
" TopologyDescriptionChangedEvent")
elif expected_type == "topology_closed_event":
if not isinstance(actual, monitoring.TopologyClosedEvent):
return False, "Expected TopologyClosedEvent, got %s" % (
actual.__class__)
else:
return False, "Incorrect event: expected %s, actual %s" % (
expected_type, actual)
return True, ""
def compare_multiple_events(i, expected_results, actual_results):
events_in_a_row = []
j = i
while(j < len(expected_results) and isinstance(
actual_results[j],
actual_results[i].__class__)):
events_in_a_row.append(actual_results[j])
j += 1
message = ''
for event in events_in_a_row:
for k in range(i, j):
passed, message = compare_events(expected_results[k], event)
if passed:
expected_results[k] = None
break
else:
return i, False, message
return j, True, ''
class TestAllScenarios(unittest.TestCase):
@classmethod
@client_context.require_connection
def setUp(cls):
cls.all_listener = ServerAndTopologyEventListener()
cls.saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
@classmethod
def tearDown(cls):
monitoring._LISTENERS = cls.saved_listeners
def create_test(scenario_def):
def run_scenario(self):
responses = (r for r in scenario_def['phases'][0]['responses'])
with client_knobs(events_queue_frequency=0.1):
class MockMonitor(Monitor):
def __init__(self, server_description, topology, pool,
topology_settings):
"""Have to copy entire constructor from Monitor so that we
can override _run and change the periodic executor's
interval."""
self._server_description = server_description
self._pool = pool
self._settings = topology_settings
self._avg_round_trip_time = MovingAverage()
options = self._settings._pool_options
self._listeners = options.event_listeners
self._publish = self._listeners is not None
def target():
monitor = self_ref()
if monitor is None:
return False
MockMonitor._run(monitor) # Change target to subclass
return True
# Shorten interval
executor = periodic_executor.PeriodicExecutor(
interval=0.1,
min_interval=0.1,
target=target,
name="pymongo_server_monitor_thread")
self._executor = executor
self_ref = weakref.ref(self, executor.close)
self._topology = weakref.proxy(topology, executor.close)
def _run(self):
try:
if self._server_description.address != ('a', 27017):
# Because PyMongo doesn't keep information about
# the order of addresses, we might accidentally
# start a MockMonitor on the wrong server first,
# so we need to only mock responses for the server
# the test's response is supposed to come from.
return
response = next(responses)[1]
isMaster = IsMaster(response)
self._server_description = ServerDescription(
address=self._server_description.address,
ismaster=isMaster)
self._topology.on_change(self._server_description)
except (ReferenceError, StopIteration):
# Topology was garbage-collected.
self.close()
m = single_client(h=scenario_def['uri'],
event_listeners=(self.all_listener,),
_monitor_class=MockMonitor)
expected_results = scenario_def['phases'][0]['outcome']['events']
expected_len = len(expected_results)
wait_until(lambda: len(self.all_listener.results) >= expected_len,
"publish all events", timeout=15)
try:
i = 0
while i < expected_len:
result = self.all_listener.results[i] if len(
self.all_listener.results) > i else None
# The order of ServerOpening/ClosedEvents doesn't matter
if (isinstance(result,
monitoring.ServerOpeningEvent) or
isinstance(result,
monitoring.ServerClosedEvent)):
i, passed, message = compare_multiple_events(
i, expected_results, self.all_listener.results)
self.assertTrue(passed, message)
else:
self.assertTrue(
*compare_events(expected_results[i], result))
i += 1
finally:
m.close()
return run_scenario
def create_tests():
for dirpath, _, filenames in os.walk(_TEST_PATH):
for filename in filenames:
with open(os.path.join(dirpath, filename)) as scenario_stream:
scenario_def = json.load(
scenario_stream, object_hook=object_hook)
# Construct test from scenario.
new_test = create_test(scenario_def)
test_name = 'test_%s' % (os.path.splitext(filename)[0],)
new_test.__name__ = test_name
setattr(TestAllScenarios, new_test.__name__, new_test)
create_tests()
if __name__ == "__main__":
unittest.main()

View File

@ -55,6 +55,9 @@ class MockPool(object):
def reset(self):
pass
def remove_stale_sockets(self):
pass
class MockMonitor(object):
def __init__(self, server_description, topology, pool, topology_settings):

View File

@ -66,6 +66,9 @@ class MockPool(object):
with self._lock:
self.pool_id += 1
def remove_stale_sockets(self):
pass
class MockMonitor(object):
def __init__(self, server_description, topology, pool, topology_settings):

View File

@ -64,6 +64,39 @@ class EventListener(monitoring.CommandListener):
self.results['failed'].append(event)
class ServerAndTopologyEventListener(monitoring.ServerListener,
monitoring.TopologyListener):
"""Listens to all events."""
def __init__(self):
self.results = []
def opened(self, event):
self.results.append(event)
def description_changed(self, event):
self.results.append(event)
def closed(self, event):
self.results.append(event)
class HeartbeatEventListener(monitoring.ServerHeartbeatListener):
"""Listens to only server heartbeat events."""
def __init__(self):
self.results = []
def started(self, event):
self.results.append(event)
def succeeded(self, event):
self.results.append(event)
def failed(self, event):
self.results.append(event)
def _connection_string_noauth(h, p):
if h.startswith("mongodb://"):
return h
@ -146,7 +179,7 @@ def get_command_line(client):
def server_started_with_option(client, cmdline_opt, config_opt):
"""Check if the server was started with a particular option.
:Parameters:
- `cmdline_opt`: The command line option (i.e. --nojournal)
- `config_opt`: The config file option (i.e. nojournal)