PYTHON-1150 - Add maxStalenessMS to $readPreference
This commit is contained in:
parent
1a45a0fa08
commit
5f16e33a6d
@ -73,13 +73,16 @@ def _maybe_add_read_preference(spec, read_preference):
|
||||
"""Add $readPreference to spec when appropriate."""
|
||||
mode = read_preference.mode
|
||||
tag_sets = read_preference.tag_sets
|
||||
max_staleness = read_preference.max_staleness
|
||||
# Only add $readPreference if it's something other than primary to avoid
|
||||
# problems with mongos versions that don't support read preferences. Also,
|
||||
# for maximum backwards compatibility, don't add $readPreference for
|
||||
# secondaryPreferred unless tags are in use (setting the slaveOkay bit
|
||||
# has the same effect).
|
||||
# secondaryPreferred unless tags or maxStalenessMS are in use (setting the
|
||||
# slaveOkay bit has the same effect).
|
||||
if mode and (
|
||||
mode != ReadPreference.SECONDARY_PREFERRED.mode or tag_sets != [{}]):
|
||||
mode != ReadPreference.SECONDARY_PREFERRED.mode
|
||||
or tag_sets != [{}]
|
||||
or max_staleness):
|
||||
|
||||
if "$query" not in spec:
|
||||
spec = SON([("$query", spec)])
|
||||
|
||||
@ -63,11 +63,11 @@ def _validate_tag_sets(tag_sets):
|
||||
|
||||
|
||||
def _validate_max_staleness(max_staleness):
|
||||
"""Validate maxStalenessMS."""
|
||||
"""Validate max_staleness."""
|
||||
if max_staleness is None:
|
||||
return 0.0
|
||||
|
||||
errmsg = "maxStalenessMS must be an integer or float"
|
||||
errmsg = "max_staleness must be an integer or float"
|
||||
try:
|
||||
max_staleness = float(max_staleness)
|
||||
except ValueError:
|
||||
@ -75,6 +75,10 @@ def _validate_max_staleness(max_staleness):
|
||||
except TypeError:
|
||||
raise TypeError(errmsg)
|
||||
|
||||
if not 0 < max_staleness < 1e9:
|
||||
raise ValueError(
|
||||
"max_staleness must be greater than 0 and less than one billion")
|
||||
|
||||
return max_staleness
|
||||
|
||||
|
||||
@ -100,9 +104,12 @@ class _ServerMode(object):
|
||||
def document(self):
|
||||
"""Read preference as a document.
|
||||
"""
|
||||
if self.__tag_sets in (None, [{}]):
|
||||
return {'mode': self.__mongos_mode}
|
||||
return {'mode': self.__mongos_mode, 'tags': self.__tag_sets}
|
||||
doc = {'mode': self.__mongos_mode}
|
||||
if self.__tag_sets not in (None, [{}]):
|
||||
doc['tags'] = self.__tag_sets
|
||||
if self.__max_staleness:
|
||||
doc['maxStalenessMS'] = int(self.__max_staleness * 1000)
|
||||
return doc
|
||||
|
||||
@property
|
||||
def mode(self):
|
||||
@ -130,6 +137,8 @@ class _ServerMode(object):
|
||||
"""The maximum estimated length of time (in seconds) a replica set
|
||||
secondary can fall behind the primary in replication before it will
|
||||
no longer be selected for operations."""
|
||||
if not self.__max_staleness:
|
||||
return None
|
||||
return self.__max_staleness
|
||||
|
||||
@property
|
||||
@ -146,8 +155,8 @@ class _ServerMode(object):
|
||||
return 5 if self.__max_staleness else 0
|
||||
|
||||
def __repr__(self):
|
||||
return "%s(tag_sets=%r)" % (
|
||||
self.name, self.__tag_sets)
|
||||
return "%s(tag_sets=%r, max_staleness=%r)" % (
|
||||
self.name, self.__tag_sets, self.max_staleness)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, _ServerMode):
|
||||
|
||||
@ -159,6 +159,8 @@ def create_test(scenario_def):
|
||||
mode_string = mode_string[:1].lower() + mode_string[1:]
|
||||
mode = read_preferences.read_pref_mode_from_name(mode_string)
|
||||
max_staleness = pref_def.get('maxStalenessMS', 0) / 1000.0
|
||||
if not max_staleness:
|
||||
max_staleness = None
|
||||
tag_sets = pref_def.get('tag_sets')
|
||||
|
||||
if scenario_def.get('error'):
|
||||
|
||||
@ -24,7 +24,7 @@ sys.path[0:0] = [""]
|
||||
|
||||
from bson.py3compat import MAXSIZE
|
||||
from bson.son import SON
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.errors import ConfigurationError, OperationFailure
|
||||
from pymongo.message import _maybe_add_read_preference
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.read_preferences import (ReadPreference, MovingAverage,
|
||||
@ -446,6 +446,77 @@ class TestMovingAverage(unittest.TestCase):
|
||||
|
||||
class TestMongosAndReadPreference(unittest.TestCase):
|
||||
|
||||
def test_read_preference_document(self):
|
||||
|
||||
pref = Primary()
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'primary'})
|
||||
|
||||
pref = PrimaryPreferred()
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'primaryPreferred'})
|
||||
pref = PrimaryPreferred(tag_sets=[{'dc': 'sf'}])
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'primaryPreferred', 'tags': [{'dc': 'sf'}]})
|
||||
pref = PrimaryPreferred(
|
||||
tag_sets=[{'dc': 'sf'}], max_staleness=30)
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'primaryPreferred',
|
||||
'tags': [{'dc': 'sf'}],
|
||||
'maxStalenessMS': 30000})
|
||||
|
||||
pref = Secondary()
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'secondary'})
|
||||
pref = Secondary(tag_sets=[{'dc': 'sf'}])
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'secondary', 'tags': [{'dc': 'sf'}]})
|
||||
pref = Secondary(
|
||||
tag_sets=[{'dc': 'sf'}], max_staleness=30)
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'secondary',
|
||||
'tags': [{'dc': 'sf'}],
|
||||
'maxStalenessMS': 30000})
|
||||
|
||||
pref = SecondaryPreferred()
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'secondaryPreferred'})
|
||||
pref = SecondaryPreferred(tag_sets=[{'dc': 'sf'}])
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'secondaryPreferred', 'tags': [{'dc': 'sf'}]})
|
||||
pref = SecondaryPreferred(
|
||||
tag_sets=[{'dc': 'sf'}], max_staleness=30)
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'secondaryPreferred',
|
||||
'tags': [{'dc': 'sf'}],
|
||||
'maxStalenessMS': 30000})
|
||||
|
||||
pref = Nearest()
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'nearest'})
|
||||
pref = Nearest(tag_sets=[{'dc': 'sf'}])
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'nearest', 'tags': [{'dc': 'sf'}]})
|
||||
pref = Nearest(
|
||||
tag_sets=[{'dc': 'sf'}], max_staleness=30)
|
||||
self.assertEqual(
|
||||
pref.document,
|
||||
{'mode': 'nearest',
|
||||
'tags': [{'dc': 'sf'}],
|
||||
'maxStalenessMS': 30000})
|
||||
|
||||
def test_maybe_add_read_preference(self):
|
||||
|
||||
# Primary doesn't add $readPreference
|
||||
@ -470,12 +541,17 @@ class TestMongosAndReadPreference(unittest.TestCase):
|
||||
self.assertEqual(
|
||||
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
||||
|
||||
# SecondaryPreferred without tag_sets doesn't add $readPreference
|
||||
# SecondaryPreferred without tag_sets or max_staleness doesn't add
|
||||
# $readPreference
|
||||
pref = SecondaryPreferred()
|
||||
out = _maybe_add_read_preference({}, pref)
|
||||
self.assertEqual(out, {})
|
||||
pref = SecondaryPreferred(tag_sets=[{'dc': 'nyc'}])
|
||||
out = _maybe_add_read_preference({}, pref)
|
||||
self.assertEqual(
|
||||
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
||||
pref = SecondaryPreferred(max_staleness=120)
|
||||
out = _maybe_add_read_preference({}, pref)
|
||||
self.assertEqual(
|
||||
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
||||
|
||||
@ -533,6 +609,39 @@ class TestMongosAndReadPreference(unittest.TestCase):
|
||||
self.assertEqual(first_id, results[-1]["_id"])
|
||||
self.assertEqual(last_id, results[0]["_id"])
|
||||
|
||||
@client_context.require_mongos
|
||||
@client_context.require_version_min(3, 3, 12)
|
||||
def test_mongos_max_staleness(self):
|
||||
# Sanity check that we're sending maxStalenessMS
|
||||
coll = client_context.client.pymongo_test.get_collection(
|
||||
"test", read_preference=SecondaryPreferred(max_staleness=120))
|
||||
# No error
|
||||
coll.find_one()
|
||||
|
||||
coll = client_context.client.pymongo_test.get_collection(
|
||||
"test", read_preference=SecondaryPreferred(max_staleness=10))
|
||||
try:
|
||||
coll.find_one()
|
||||
except OperationFailure as exc:
|
||||
self.assertEqual(160, exc.code)
|
||||
else:
|
||||
self.fail("mongos accepted invalid staleness")
|
||||
|
||||
coll = single_client(
|
||||
readPreference='secondaryPreferred',
|
||||
maxStalenessMS=120000).pymongo_test.test
|
||||
# No error
|
||||
coll.find_one()
|
||||
|
||||
coll = single_client(
|
||||
readPreference='secondaryPreferred',
|
||||
maxStalenessMS=10000).pymongo_test.test
|
||||
try:
|
||||
coll.find_one()
|
||||
except OperationFailure as exc:
|
||||
self.assertEqual(160, exc.code)
|
||||
else:
|
||||
self.fail("mongos accepted invalid staleness")
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -710,12 +710,12 @@ class TestServerSelectionErrors(TopologyTest):
|
||||
|
||||
self.assertMessage(
|
||||
'No replica set members match selector'
|
||||
' "Secondary(tag_sets=None)"',
|
||||
' "Secondary(tag_sets=None, max_staleness=None)"',
|
||||
t, ReadPreference.SECONDARY)
|
||||
|
||||
self.assertMessage(
|
||||
"No replica set members match selector"
|
||||
" \"Secondary(tag_sets=[{'dc': 'ny'}])\"",
|
||||
" \"Secondary(tag_sets=[{'dc': 'ny'}], max_staleness=None)\"",
|
||||
t, Secondary(tag_sets=[{'dc': 'ny'}]))
|
||||
|
||||
def test_bad_replica_set_name(self):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user