PYTHON-1105 Configurable heartbeatFrequencyMS.
This commit is contained in:
parent
7d81a01a81
commit
a2f97f92a4
@ -140,6 +140,8 @@ class ClientOptions(object):
|
||||
self.__write_concern = _parse_write_concern(options)
|
||||
self.__read_concern = _parse_read_concern(options)
|
||||
self.__connect = options.get('connect')
|
||||
self.__heartbeat_frequency = options.get(
|
||||
'heartbeatfrequencyms', common.HEARTBEAT_FREQUENCY)
|
||||
|
||||
@property
|
||||
def _options(self):
|
||||
@ -171,6 +173,11 @@ class ClientOptions(object):
|
||||
"""The server selection timeout for this instance in seconds."""
|
||||
return self.__server_selection_timeout
|
||||
|
||||
@property
|
||||
def heartbeat_frequency(self):
|
||||
"""The monitoring frequency in seconds."""
|
||||
return self.__heartbeat_frequency
|
||||
|
||||
@property
|
||||
def pool_options(self):
|
||||
"""A :class:`~pymongo.pool.PoolOptions` instance."""
|
||||
|
||||
@ -458,6 +458,7 @@ TIMEOUT_VALIDATORS = {
|
||||
'sockettimeoutms': validate_timeout_or_none,
|
||||
'waitqueuetimeoutms': validate_timeout_or_none,
|
||||
'serverselectiontimeoutms': validate_timeout_or_zero,
|
||||
'heartbeatfrequencyms': validate_timeout_or_none,
|
||||
'maxidletimems': validate_timeout_or_none,
|
||||
}
|
||||
|
||||
|
||||
@ -159,6 +159,9 @@ class MongoClient(common.BaseObject):
|
||||
- `socketKeepAlive`: (boolean) Whether to send periodic keep-alive
|
||||
packets on connected sockets. Defaults to ``False`` (do not send
|
||||
keep-alive packets).
|
||||
- `heartbeatFrequencyMS`: (optional) The number of milliseconds
|
||||
between periodic server checks, or None to accept the default
|
||||
frequency of 10 seconds.
|
||||
- `event_listeners`: a list or tuple of event listeners. See
|
||||
:mod:`~pymongo.monitoring` for details.
|
||||
|
||||
@ -389,7 +392,8 @@ class MongoClient(common.BaseObject):
|
||||
monitor_class=monitor_class,
|
||||
condition_class=condition_class,
|
||||
local_threshold_ms=options.local_threshold_ms,
|
||||
server_selection_timeout=options.server_selection_timeout)
|
||||
server_selection_timeout=options.server_selection_timeout,
|
||||
heartbeat_frequency=options.heartbeat_frequency)
|
||||
|
||||
self._topology = Topology(self._topology_settings)
|
||||
if connect:
|
||||
|
||||
@ -58,7 +58,7 @@ class Monitor(object):
|
||||
return True
|
||||
|
||||
executor = periodic_executor.PeriodicExecutor(
|
||||
interval=common.HEARTBEAT_FREQUENCY,
|
||||
interval=self._settings.heartbeat_frequency,
|
||||
min_interval=common.MIN_HEARTBEAT_INTERVAL,
|
||||
target=target,
|
||||
name="pymongo_server_monitor_thread")
|
||||
|
||||
@ -17,8 +17,9 @@
|
||||
import threading
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
from pymongo import monitor, pool
|
||||
from pymongo import common, monitor, pool
|
||||
from pymongo.common import LOCAL_THRESHOLD_MS, SERVER_SELECTION_TIMEOUT
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.topology_description import TOPOLOGY_TYPE
|
||||
from pymongo.pool import PoolOptions
|
||||
from pymongo.server_description import ServerDescription
|
||||
@ -33,11 +34,16 @@ class TopologySettings(object):
|
||||
monitor_class=None,
|
||||
condition_class=None,
|
||||
local_threshold_ms=LOCAL_THRESHOLD_MS,
|
||||
server_selection_timeout=SERVER_SELECTION_TIMEOUT):
|
||||
server_selection_timeout=SERVER_SELECTION_TIMEOUT,
|
||||
heartbeat_frequency=common.HEARTBEAT_FREQUENCY):
|
||||
"""Represent MongoClient's configuration.
|
||||
|
||||
Take a list of (host, port) pairs and optional replica set name.
|
||||
"""
|
||||
if heartbeat_frequency < common.MIN_HEARTBEAT_INTERVAL:
|
||||
raise ConfigurationError("%s cannot be less than %.1f" % (
|
||||
'heartbeatFrequencyMS', common.MIN_HEARTBEAT_INTERVAL))
|
||||
|
||||
self._seeds = seeds or [('localhost', 27017)]
|
||||
self._replica_set_name = replica_set_name
|
||||
self._pool_class = pool_class or pool.Pool
|
||||
@ -46,6 +52,7 @@ class TopologySettings(object):
|
||||
self._condition_class = condition_class or threading.Condition
|
||||
self._local_threshold_ms = local_threshold_ms
|
||||
self._server_selection_timeout = server_selection_timeout
|
||||
self._heartbeat_frequency = heartbeat_frequency
|
||||
self._direct = (len(self._seeds) == 1 and not replica_set_name)
|
||||
self._topology_id = ObjectId()
|
||||
|
||||
@ -82,6 +89,10 @@ class TopologySettings(object):
|
||||
def server_selection_timeout(self):
|
||||
return self._server_selection_timeout
|
||||
|
||||
@property
|
||||
def heartbeat_frequency(self):
|
||||
return self._heartbeat_frequency
|
||||
|
||||
@property
|
||||
def direct(self):
|
||||
"""Connect directly to a single server, or use a set of servers?
|
||||
|
||||
@ -58,24 +58,31 @@ class client_knobs(object):
|
||||
def __init__(
|
||||
self,
|
||||
heartbeat_frequency=None,
|
||||
min_heartbeat_interval=None,
|
||||
kill_cursor_frequency=None,
|
||||
events_queue_frequency=None):
|
||||
self.heartbeat_frequency = heartbeat_frequency
|
||||
self.min_heartbeat_interval = min_heartbeat_interval
|
||||
self.kill_cursor_frequency = kill_cursor_frequency
|
||||
self.events_queue_frequency = events_queue_frequency
|
||||
|
||||
self.old_heartbeat_frequency = None
|
||||
self.old_min_heartbeat_interval = None
|
||||
self.old_kill_cursor_frequency = None
|
||||
self.old_events_queue_frequency = None
|
||||
|
||||
def enable(self):
|
||||
self.old_heartbeat_frequency = common.HEARTBEAT_FREQUENCY
|
||||
self.old_min_heartbeat_interval = common.MIN_HEARTBEAT_INTERVAL
|
||||
self.old_kill_cursor_frequency = common.KILL_CURSOR_FREQUENCY
|
||||
self.old_events_queue_frequency = common.EVENTS_QUEUE_FREQUENCY
|
||||
|
||||
if self.heartbeat_frequency is not None:
|
||||
common.HEARTBEAT_FREQUENCY = self.heartbeat_frequency
|
||||
|
||||
if self.min_heartbeat_interval is not None:
|
||||
common.MIN_HEARTBEAT_INTERVAL = self.min_heartbeat_interval
|
||||
|
||||
if self.kill_cursor_frequency is not None:
|
||||
common.KILL_CURSOR_FREQUENCY = self.kill_cursor_frequency
|
||||
|
||||
@ -87,6 +94,7 @@ class client_knobs(object):
|
||||
|
||||
def disable(self):
|
||||
common.HEARTBEAT_FREQUENCY = self.old_heartbeat_frequency
|
||||
common.MIN_HEARTBEAT_INTERVAL = self.old_min_heartbeat_interval
|
||||
common.KILL_CURSOR_FREQUENCY = self.old_kill_cursor_frequency
|
||||
common.EVENTS_QUEUE_FREQUENCY = self.old_events_queue_frequency
|
||||
|
||||
@ -358,7 +366,8 @@ class MockClientTest(unittest.TestCase):
|
||||
super(MockClientTest, self).setUp()
|
||||
|
||||
self.client_knobs = client_knobs(
|
||||
heartbeat_frequency=0.001)
|
||||
heartbeat_frequency=0.001,
|
||||
min_heartbeat_interval=0.001)
|
||||
|
||||
self.client_knobs.enable()
|
||||
|
||||
|
||||
@ -40,6 +40,7 @@ from pymongo.errors import (AutoReconnect,
|
||||
OperationFailure,
|
||||
NetworkTimeout,
|
||||
InvalidURI)
|
||||
from pymongo.monitoring import ServerHeartbeatStartedEvent
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.pool import SocketInfo
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
@ -61,6 +62,7 @@ from test import (client_context,
|
||||
from test.pymongo_mocks import MockClient
|
||||
from test.utils import (assertRaisesExactly,
|
||||
delay,
|
||||
HeartbeatEventListener,
|
||||
remove_all_users,
|
||||
server_is_master_with_slave,
|
||||
get_pool,
|
||||
@ -69,6 +71,7 @@ from test.utils import (assertRaisesExactly,
|
||||
wait_until,
|
||||
rs_or_single_client,
|
||||
rs_or_single_client_noauth,
|
||||
single_client,
|
||||
lazy_client_trial,
|
||||
NTHREADS)
|
||||
|
||||
@ -899,6 +902,25 @@ class TestClient(IntegrationTest):
|
||||
101, 1234, client.codec_options),
|
||||
address=('not-a-member', 27017))
|
||||
|
||||
def test_heartbeat_frequency_ms(self):
|
||||
listener = HeartbeatEventListener()
|
||||
uri = "mongodb://%s:%d/?heartbeatFrequencyMS=500" % (host, port)
|
||||
client = single_client(uri, event_listeners=[listener])
|
||||
time.sleep(3)
|
||||
started_events = [r for r in listener.results
|
||||
if isinstance(r, ServerHeartbeatStartedEvent)]
|
||||
|
||||
# Frequency is 500ms, expect 5 or 6 events in 3 sec, but be forgiving.
|
||||
self.assertGreaterEqual(len(started_events), 4)
|
||||
client.close()
|
||||
|
||||
def test_small_heartbeat_frequency_ms(self):
|
||||
uri = "mongodb://example/?heartbeatFrequencyMS=499"
|
||||
with self.assertRaises(ConfigurationError) as context:
|
||||
MongoClient(uri)
|
||||
|
||||
self.assertIn('heartbeatFrequencyMS', str(context.exception))
|
||||
|
||||
|
||||
class TestExhaustCursor(IntegrationTest):
|
||||
"""Test that clients properly handle errors from exhaust cursors."""
|
||||
|
||||
@ -71,7 +71,9 @@ class TestHeartbeatMonitoring(unittest.TestCase):
|
||||
monitoring._LISTENERS = cls.saved_listeners
|
||||
|
||||
def create_mock_monitor(self, responses, uri, expected_results):
|
||||
with client_knobs(heartbeat_frequency=0.1, events_queue_frequency=0.1):
|
||||
with client_knobs(heartbeat_frequency=0.1,
|
||||
min_heartbeat_interval=0.1,
|
||||
events_queue_frequency=0.1):
|
||||
class MockMonitor(Monitor):
|
||||
def _check_with_socket(self, sock_info):
|
||||
if isinstance(responses[1], Exception):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user