PYTHON-1104 - Support idleWritePeriodMS.

This commit is contained in:
A. Jesse Jiryu Davis 2016-11-02 13:59:00 -04:00
parent f4922da97c
commit 497e316d34
14 changed files with 503 additions and 22 deletions

View File

@ -40,6 +40,11 @@ MIN_WIRE_VERSION = 0
MAX_WIRE_VERSION = 0
MAX_WRITE_BATCH_SIZE = 1000
# Server Discovery and Monitoring Spec: The isMaster response of a replica set
# member running MongoDB 3.4 or later may contain idleWritePeriodMillis. Set the
# ServerDescription's idleWritePeriod to this value if present, else 10 seconds.
IDLE_WRITE_PERIOD = 10
# What this version of PyMongo supports.
MIN_SUPPORTED_WIRE_VERSION = 0
MAX_SUPPORTED_WIRE_VERSION = 5

View File

@ -144,3 +144,10 @@ class IsMaster(object):
@property
def last_write_date(self):
return self._doc.get('lastWriteDate')
@property
def idle_write_period(self):
if 'idleWritePeriodMillis' in self._doc:
return self._doc['idleWritePeriodMillis'] / 1000.0
else:
return common.IDLE_WRITE_PERIOD

View File

@ -31,9 +31,30 @@ from pymongo.errors import ConfigurationError
from pymongo.server_type import SERVER_TYPE
def _validate_max_staleness(max_staleness,
heartbeat_frequency,
idle_write_period):
if max_staleness < heartbeat_frequency + idle_write_period:
raise ConfigurationError(
"maxStalenessSeconds must be at least heartbeatFrequencyMS +"
" %d seconds. maxStalenessSeconds is set to %d,"
" heartbeatFrequencyMS is set to %d." % (
idle_write_period, max_staleness,
heartbeat_frequency * 1000))
def _with_primary(max_staleness, selection):
"""Apply max_staleness, in seconds, to a Selection with a known primary."""
primary = selection.primary
assert primary
# Server Selection Spec: If the TopologyType is ReplicaSetWithPrimary, a
# client MUST raise an error if maxStaleness < heartbeatFrequency +
# (primary's idleWritePeriod).
_validate_max_staleness(max_staleness,
selection.heartbeat_frequency,
primary.idle_write_period)
sds = []
for s in selection.server_descriptions:
@ -54,9 +75,25 @@ def _with_primary(max_staleness, selection):
def _no_primary(max_staleness, selection):
"""Apply max_staleness, in seconds, to a Selection with no known primary."""
# Secondary that's replicated the most recent writes.
smax = selection.secondary_with_max_last_write_date()
if not smax:
# No secondaries and no primary, short-circuit out of here.
return selection.with_server_descriptions([])
# Secondary we've most recently checked.
srecent = selection.secondary_with_max_last_update_time()
assert srecent
sds = []
# Server Selection Spec: If the TopologyType is ReplicaSetNoPrimary, a
# client MUST raise an error if maxStaleness < heartbeatFrequency +
# (idleWritePeriod of secondary with greatest lastUpdateTime).
_validate_max_staleness(max_staleness,
selection.heartbeat_frequency,
srecent.idle_write_period)
for s in selection.server_descriptions:
if s.server_type == SERVER_TYPE.RSSecondary:
# See max-staleness.rst for explanation of this formula.
@ -77,13 +114,6 @@ def select(max_staleness, selection):
if not max_staleness:
return selection
# Server Selection Spec: "A driver MUST raise an error if the
# TopologyType is ReplicaSetWithPrimary or ReplicaSetNoPrimary and
# maxStalenessSeconds * 1000 is less than twice heartbeatFrequencyMS."
if max_staleness < 2 * selection.heartbeat_frequency:
raise ConfigurationError(
"maxStalenessSeconds must be twice heartbeatFrequencyMS")
if selection.primary:
return _with_primary(max_staleness, selection)
else:

View File

@ -45,7 +45,8 @@ class ServerDescription(object):
'_primary', '_max_bson_size', '_max_message_size',
'_max_write_batch_size', '_min_wire_version', '_max_wire_version',
'_round_trip_time', '_me', '_is_writable', '_is_readable', '_error',
'_set_version', '_election_id', '_last_write_date', '_last_update_time')
'_set_version', '_election_id', '_last_write_date', '_last_update_time',
'_idle_write_period')
def __init__(
self,
@ -74,6 +75,7 @@ class ServerDescription(object):
self._round_trip_time = round_trip_time
self._me = ismaster.me
self._last_update_time = _time()
self._idle_write_period = ismaster.idle_write_period
self._error = error
if ismaster.last_write_date:
@ -164,6 +166,10 @@ class ServerDescription(object):
def last_update_time(self):
return self._last_update_time
@property
def idle_write_period(self):
return self._idle_write_period
@property
def round_trip_time(self):
"""The current average latency or None."""

View File

@ -51,16 +51,16 @@ class Selection(object):
self.primary)
def secondary_with_max_last_write_date(self):
smax = None
for s in self.topology_description.known_servers:
if s.server_type == SERVER_TYPE.RSSecondary:
if not smax:
smax = s
else:
if s.last_write_date > smax.last_write_date:
smax = s
secondaries = secondary_server_selector(self)
if secondaries.server_descriptions:
return max(secondaries.server_descriptions,
key=lambda sd: sd.last_write_date)
return smax
def secondary_with_max_last_update_time(self):
secondaries = secondary_server_selector(self)
if secondaries.server_descriptions:
return max(secondaries.server_descriptions,
key=lambda sd: sd.last_update_time)
@property
def primary_selection(self):
@ -71,6 +71,10 @@ class Selection(object):
def heartbeat_frequency(self):
return self.topology_description.heartbeat_frequency
@property
def topology_type(self):
return self.topology_description.topology_type
def __bool__(self):
return bool(self.server_descriptions)

View File

@ -0,0 +1,81 @@
{
"heartbeatFrequencyMS": 500,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 9000,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessSeconds": 10,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 9000,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"idleWritePeriodMillis": 11000,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 9000,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"idleWritePeriodMillis": 11000,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,39 @@
{
"error": true,
"heartbeatFrequencyMS": 500,
"read_preference": {
"maxStalenessSeconds": 10.5,
"mode": "Nearest"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 9000,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 11000,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,37 @@
{
"error": true,
"heartbeatFrequencyMS": 500,
"read_preference": {
"maxStalenessSeconds": 10.4,
"mode": "Nearest"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,39 @@
{
"error": true,
"heartbeatFrequencyMS": 500,
"read_preference": {
"maxStalenessSeconds": 10.5,
"mode": "Nearest"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 11000,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 9000,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,81 @@
{
"heartbeatFrequencyMS": 500,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 11000,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessSeconds": 10,
"mode": "Secondary"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 11000,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 9000,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 11000,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"idleWritePeriodMillis": 9000,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -1,7 +1,8 @@
{
"error": true,
"heartbeatFrequencyMS": 500,
"read_preference": {
"maxStalenessSeconds": 1,
"maxStalenessSeconds": 10.4,
"mode": "Nearest"
},
"topology_description": {

View File

@ -0,0 +1,76 @@
{
"heartbeatFrequencyMS": 1000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessSeconds": 11,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,76 @@
{
"heartbeatFrequencyMS": 1000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"read_preference": {
"maxStalenessSeconds": 11,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -111,11 +111,10 @@ def make_server_description(server, hosts):
ismaster_response['msg'] = 'isdbgrid'
ismaster_response['lastWriteDate'] = make_last_write_date(server)
if 'maxWireVersion' in server:
ismaster_response['maxWireVersion'] = server['maxWireVersion']
if 'tags' in server:
ismaster_response['tags'] = server['tags']
for field in 'maxWireVersion', 'tags', 'idleWritePeriodMillis':
if field in server:
ismaster_response[field] = server[field]
# Sets _last_update_time to now.
sd = ServerDescription(clean_node(server['address']),