diff --git a/pymongo/client_options.py b/pymongo/client_options.py index c2afe9b3b..0d71bdc40 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -140,6 +140,8 @@ class ClientOptions(object): self.__write_concern = _parse_write_concern(options) self.__read_concern = _parse_read_concern(options) self.__connect = options.get('connect') + self.__heartbeat_frequency = options.get( + 'heartbeatfrequencyms', common.HEARTBEAT_FREQUENCY) @property def _options(self): @@ -171,6 +173,11 @@ class ClientOptions(object): """The server selection timeout for this instance in seconds.""" return self.__server_selection_timeout + @property + def heartbeat_frequency(self): + """The monitoring frequency in seconds.""" + return self.__heartbeat_frequency + @property def pool_options(self): """A :class:`~pymongo.pool.PoolOptions` instance.""" diff --git a/pymongo/common.py b/pymongo/common.py index 3ac788349..984afd951 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -458,6 +458,7 @@ TIMEOUT_VALIDATORS = { 'sockettimeoutms': validate_timeout_or_none, 'waitqueuetimeoutms': validate_timeout_or_none, 'serverselectiontimeoutms': validate_timeout_or_zero, + 'heartbeatfrequencyms': validate_timeout_or_none, 'maxidletimems': validate_timeout_or_none, } diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index a554e1be8..4fdc654c3 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -159,6 +159,9 @@ class MongoClient(common.BaseObject): - `socketKeepAlive`: (boolean) Whether to send periodic keep-alive packets on connected sockets. Defaults to ``False`` (do not send keep-alive packets). + - `heartbeatFrequencyMS`: (optional) The number of milliseconds + between periodic server checks, or None to accept the default + frequency of 10 seconds. - `event_listeners`: a list or tuple of event listeners. See :mod:`~pymongo.monitoring` for details. @@ -389,7 +392,8 @@ class MongoClient(common.BaseObject): monitor_class=monitor_class, condition_class=condition_class, local_threshold_ms=options.local_threshold_ms, - server_selection_timeout=options.server_selection_timeout) + server_selection_timeout=options.server_selection_timeout, + heartbeat_frequency=options.heartbeat_frequency) self._topology = Topology(self._topology_settings) if connect: diff --git a/pymongo/monitor.py b/pymongo/monitor.py index 2f2d5172f..102eb30f2 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -58,7 +58,7 @@ class Monitor(object): return True executor = periodic_executor.PeriodicExecutor( - interval=common.HEARTBEAT_FREQUENCY, + interval=self._settings.heartbeat_frequency, min_interval=common.MIN_HEARTBEAT_INTERVAL, target=target, name="pymongo_server_monitor_thread") diff --git a/pymongo/settings.py b/pymongo/settings.py index 7e2675f0c..900a9275f 100644 --- a/pymongo/settings.py +++ b/pymongo/settings.py @@ -17,8 +17,9 @@ import threading from bson.objectid import ObjectId -from pymongo import monitor, pool +from pymongo import common, monitor, pool from pymongo.common import LOCAL_THRESHOLD_MS, SERVER_SELECTION_TIMEOUT +from pymongo.errors import ConfigurationError from pymongo.topology_description import TOPOLOGY_TYPE from pymongo.pool import PoolOptions from pymongo.server_description import ServerDescription @@ -33,11 +34,16 @@ class TopologySettings(object): monitor_class=None, condition_class=None, local_threshold_ms=LOCAL_THRESHOLD_MS, - server_selection_timeout=SERVER_SELECTION_TIMEOUT): + server_selection_timeout=SERVER_SELECTION_TIMEOUT, + heartbeat_frequency=common.HEARTBEAT_FREQUENCY): """Represent MongoClient's configuration. Take a list of (host, port) pairs and optional replica set name. """ + if heartbeat_frequency < common.MIN_HEARTBEAT_INTERVAL: + raise ConfigurationError("%s cannot be less than %.1f" % ( + 'heartbeatFrequencyMS', common.MIN_HEARTBEAT_INTERVAL)) + self._seeds = seeds or [('localhost', 27017)] self._replica_set_name = replica_set_name self._pool_class = pool_class or pool.Pool @@ -46,6 +52,7 @@ class TopologySettings(object): self._condition_class = condition_class or threading.Condition self._local_threshold_ms = local_threshold_ms self._server_selection_timeout = server_selection_timeout + self._heartbeat_frequency = heartbeat_frequency self._direct = (len(self._seeds) == 1 and not replica_set_name) self._topology_id = ObjectId() @@ -82,6 +89,10 @@ class TopologySettings(object): def server_selection_timeout(self): return self._server_selection_timeout + @property + def heartbeat_frequency(self): + return self._heartbeat_frequency + @property def direct(self): """Connect directly to a single server, or use a set of servers? diff --git a/test/__init__.py b/test/__init__.py index 2965060d9..1be9519c6 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -58,24 +58,31 @@ class client_knobs(object): def __init__( self, heartbeat_frequency=None, + min_heartbeat_interval=None, kill_cursor_frequency=None, events_queue_frequency=None): self.heartbeat_frequency = heartbeat_frequency + self.min_heartbeat_interval = min_heartbeat_interval self.kill_cursor_frequency = kill_cursor_frequency self.events_queue_frequency = events_queue_frequency self.old_heartbeat_frequency = None + self.old_min_heartbeat_interval = None self.old_kill_cursor_frequency = None self.old_events_queue_frequency = None def enable(self): self.old_heartbeat_frequency = common.HEARTBEAT_FREQUENCY + self.old_min_heartbeat_interval = common.MIN_HEARTBEAT_INTERVAL self.old_kill_cursor_frequency = common.KILL_CURSOR_FREQUENCY self.old_events_queue_frequency = common.EVENTS_QUEUE_FREQUENCY if self.heartbeat_frequency is not None: common.HEARTBEAT_FREQUENCY = self.heartbeat_frequency + if self.min_heartbeat_interval is not None: + common.MIN_HEARTBEAT_INTERVAL = self.min_heartbeat_interval + if self.kill_cursor_frequency is not None: common.KILL_CURSOR_FREQUENCY = self.kill_cursor_frequency @@ -87,6 +94,7 @@ class client_knobs(object): def disable(self): common.HEARTBEAT_FREQUENCY = self.old_heartbeat_frequency + common.MIN_HEARTBEAT_INTERVAL = self.old_min_heartbeat_interval common.KILL_CURSOR_FREQUENCY = self.old_kill_cursor_frequency common.EVENTS_QUEUE_FREQUENCY = self.old_events_queue_frequency @@ -358,7 +366,8 @@ class MockClientTest(unittest.TestCase): super(MockClientTest, self).setUp() self.client_knobs = client_knobs( - heartbeat_frequency=0.001) + heartbeat_frequency=0.001, + min_heartbeat_interval=0.001) self.client_knobs.enable() diff --git a/test/test_client.py b/test/test_client.py index c50807ebd..f1c62935a 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -40,6 +40,7 @@ from pymongo.errors import (AutoReconnect, OperationFailure, NetworkTimeout, InvalidURI) +from pymongo.monitoring import ServerHeartbeatStartedEvent from pymongo.mongo_client import MongoClient from pymongo.pool import SocketInfo from pymongo.read_preferences import ReadPreference @@ -61,6 +62,7 @@ from test import (client_context, from test.pymongo_mocks import MockClient from test.utils import (assertRaisesExactly, delay, + HeartbeatEventListener, remove_all_users, server_is_master_with_slave, get_pool, @@ -69,6 +71,7 @@ from test.utils import (assertRaisesExactly, wait_until, rs_or_single_client, rs_or_single_client_noauth, + single_client, lazy_client_trial, NTHREADS) @@ -899,6 +902,25 @@ class TestClient(IntegrationTest): 101, 1234, client.codec_options), address=('not-a-member', 27017)) + def test_heartbeat_frequency_ms(self): + listener = HeartbeatEventListener() + uri = "mongodb://%s:%d/?heartbeatFrequencyMS=500" % (host, port) + client = single_client(uri, event_listeners=[listener]) + time.sleep(3) + started_events = [r for r in listener.results + if isinstance(r, ServerHeartbeatStartedEvent)] + + # Frequency is 500ms, expect 5 or 6 events in 3 sec, but be forgiving. + self.assertGreaterEqual(len(started_events), 4) + client.close() + + def test_small_heartbeat_frequency_ms(self): + uri = "mongodb://example/?heartbeatFrequencyMS=499" + with self.assertRaises(ConfigurationError) as context: + MongoClient(uri) + + self.assertIn('heartbeatFrequencyMS', str(context.exception)) + class TestExhaustCursor(IntegrationTest): """Test that clients properly handle errors from exhaust cursors.""" diff --git a/test/test_heartbeat_monitoring.py b/test/test_heartbeat_monitoring.py index a37da0d7d..32889f2f6 100644 --- a/test/test_heartbeat_monitoring.py +++ b/test/test_heartbeat_monitoring.py @@ -71,7 +71,9 @@ class TestHeartbeatMonitoring(unittest.TestCase): monitoring._LISTENERS = cls.saved_listeners def create_mock_monitor(self, responses, uri, expected_results): - with client_knobs(heartbeat_frequency=0.1, events_queue_frequency=0.1): + with client_knobs(heartbeat_frequency=0.1, + min_heartbeat_interval=0.1, + events_queue_frequency=0.1): class MockMonitor(Monitor): def _check_with_socket(self, sock_info): if isinstance(responses[1], Exception):