diff --git a/pymongo/message.py b/pymongo/message.py index 19361cbf2..23da7deb6 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -73,13 +73,16 @@ def _maybe_add_read_preference(spec, read_preference): """Add $readPreference to spec when appropriate.""" mode = read_preference.mode tag_sets = read_preference.tag_sets + max_staleness = read_preference.max_staleness # Only add $readPreference if it's something other than primary to avoid # problems with mongos versions that don't support read preferences. Also, # for maximum backwards compatibility, don't add $readPreference for - # secondaryPreferred unless tags are in use (setting the slaveOkay bit - # has the same effect). + # secondaryPreferred unless tags or maxStalenessMS are in use (setting the + # slaveOkay bit has the same effect). if mode and ( - mode != ReadPreference.SECONDARY_PREFERRED.mode or tag_sets != [{}]): + mode != ReadPreference.SECONDARY_PREFERRED.mode + or tag_sets != [{}] + or max_staleness): if "$query" not in spec: spec = SON([("$query", spec)]) diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py index 1be0df014..19cfc2432 100644 --- a/pymongo/read_preferences.py +++ b/pymongo/read_preferences.py @@ -63,11 +63,11 @@ def _validate_tag_sets(tag_sets): def _validate_max_staleness(max_staleness): - """Validate maxStalenessMS.""" + """Validate max_staleness.""" if max_staleness is None: return 0.0 - errmsg = "maxStalenessMS must be an integer or float" + errmsg = "max_staleness must be an integer or float" try: max_staleness = float(max_staleness) except ValueError: @@ -75,6 +75,10 @@ def _validate_max_staleness(max_staleness): except TypeError: raise TypeError(errmsg) + if not 0 < max_staleness < 1e9: + raise ValueError( + "max_staleness must be greater than 0 and less than one billion") + return max_staleness @@ -100,9 +104,12 @@ class _ServerMode(object): def document(self): """Read preference as a document. """ - if self.__tag_sets in (None, [{}]): - return {'mode': self.__mongos_mode} - return {'mode': self.__mongos_mode, 'tags': self.__tag_sets} + doc = {'mode': self.__mongos_mode} + if self.__tag_sets not in (None, [{}]): + doc['tags'] = self.__tag_sets + if self.__max_staleness: + doc['maxStalenessMS'] = int(self.__max_staleness * 1000) + return doc @property def mode(self): @@ -130,6 +137,8 @@ class _ServerMode(object): """The maximum estimated length of time (in seconds) a replica set secondary can fall behind the primary in replication before it will no longer be selected for operations.""" + if not self.__max_staleness: + return None return self.__max_staleness @property @@ -146,8 +155,8 @@ class _ServerMode(object): return 5 if self.__max_staleness else 0 def __repr__(self): - return "%s(tag_sets=%r)" % ( - self.name, self.__tag_sets) + return "%s(tag_sets=%r, max_staleness=%r)" % ( + self.name, self.__tag_sets, self.max_staleness) def __eq__(self, other): if isinstance(other, _ServerMode): diff --git a/test/test_max_staleness.py b/test/test_max_staleness.py index 2e307829e..c6c9f351e 100644 --- a/test/test_max_staleness.py +++ b/test/test_max_staleness.py @@ -159,6 +159,8 @@ def create_test(scenario_def): mode_string = mode_string[:1].lower() + mode_string[1:] mode = read_preferences.read_pref_mode_from_name(mode_string) max_staleness = pref_def.get('maxStalenessMS', 0) / 1000.0 + if not max_staleness: + max_staleness = None tag_sets = pref_def.get('tag_sets') if scenario_def.get('error'): diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index a8488eb6f..18fcb6dbc 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -24,7 +24,7 @@ sys.path[0:0] = [""] from bson.py3compat import MAXSIZE from bson.son import SON -from pymongo.errors import ConfigurationError +from pymongo.errors import ConfigurationError, OperationFailure from pymongo.message import _maybe_add_read_preference from pymongo.mongo_client import MongoClient from pymongo.read_preferences import (ReadPreference, MovingAverage, @@ -446,6 +446,77 @@ class TestMovingAverage(unittest.TestCase): class TestMongosAndReadPreference(unittest.TestCase): + def test_read_preference_document(self): + + pref = Primary() + self.assertEqual( + pref.document, + {'mode': 'primary'}) + + pref = PrimaryPreferred() + self.assertEqual( + pref.document, + {'mode': 'primaryPreferred'}) + pref = PrimaryPreferred(tag_sets=[{'dc': 'sf'}]) + self.assertEqual( + pref.document, + {'mode': 'primaryPreferred', 'tags': [{'dc': 'sf'}]}) + pref = PrimaryPreferred( + tag_sets=[{'dc': 'sf'}], max_staleness=30) + self.assertEqual( + pref.document, + {'mode': 'primaryPreferred', + 'tags': [{'dc': 'sf'}], + 'maxStalenessMS': 30000}) + + pref = Secondary() + self.assertEqual( + pref.document, + {'mode': 'secondary'}) + pref = Secondary(tag_sets=[{'dc': 'sf'}]) + self.assertEqual( + pref.document, + {'mode': 'secondary', 'tags': [{'dc': 'sf'}]}) + pref = Secondary( + tag_sets=[{'dc': 'sf'}], max_staleness=30) + self.assertEqual( + pref.document, + {'mode': 'secondary', + 'tags': [{'dc': 'sf'}], + 'maxStalenessMS': 30000}) + + pref = SecondaryPreferred() + self.assertEqual( + pref.document, + {'mode': 'secondaryPreferred'}) + pref = SecondaryPreferred(tag_sets=[{'dc': 'sf'}]) + self.assertEqual( + pref.document, + {'mode': 'secondaryPreferred', 'tags': [{'dc': 'sf'}]}) + pref = SecondaryPreferred( + tag_sets=[{'dc': 'sf'}], max_staleness=30) + self.assertEqual( + pref.document, + {'mode': 'secondaryPreferred', + 'tags': [{'dc': 'sf'}], + 'maxStalenessMS': 30000}) + + pref = Nearest() + self.assertEqual( + pref.document, + {'mode': 'nearest'}) + pref = Nearest(tag_sets=[{'dc': 'sf'}]) + self.assertEqual( + pref.document, + {'mode': 'nearest', 'tags': [{'dc': 'sf'}]}) + pref = Nearest( + tag_sets=[{'dc': 'sf'}], max_staleness=30) + self.assertEqual( + pref.document, + {'mode': 'nearest', + 'tags': [{'dc': 'sf'}], + 'maxStalenessMS': 30000}) + def test_maybe_add_read_preference(self): # Primary doesn't add $readPreference @@ -470,12 +541,17 @@ class TestMongosAndReadPreference(unittest.TestCase): self.assertEqual( out, SON([("$query", {}), ("$readPreference", pref.document)])) - # SecondaryPreferred without tag_sets doesn't add $readPreference + # SecondaryPreferred without tag_sets or max_staleness doesn't add + # $readPreference pref = SecondaryPreferred() out = _maybe_add_read_preference({}, pref) self.assertEqual(out, {}) pref = SecondaryPreferred(tag_sets=[{'dc': 'nyc'}]) out = _maybe_add_read_preference({}, pref) + self.assertEqual( + out, SON([("$query", {}), ("$readPreference", pref.document)])) + pref = SecondaryPreferred(max_staleness=120) + out = _maybe_add_read_preference({}, pref) self.assertEqual( out, SON([("$query", {}), ("$readPreference", pref.document)])) @@ -533,6 +609,39 @@ class TestMongosAndReadPreference(unittest.TestCase): self.assertEqual(first_id, results[-1]["_id"]) self.assertEqual(last_id, results[0]["_id"]) + @client_context.require_mongos + @client_context.require_version_min(3, 3, 12) + def test_mongos_max_staleness(self): + # Sanity check that we're sending maxStalenessMS + coll = client_context.client.pymongo_test.get_collection( + "test", read_preference=SecondaryPreferred(max_staleness=120)) + # No error + coll.find_one() + + coll = client_context.client.pymongo_test.get_collection( + "test", read_preference=SecondaryPreferred(max_staleness=10)) + try: + coll.find_one() + except OperationFailure as exc: + self.assertEqual(160, exc.code) + else: + self.fail("mongos accepted invalid staleness") + + coll = single_client( + readPreference='secondaryPreferred', + maxStalenessMS=120000).pymongo_test.test + # No error + coll.find_one() + + coll = single_client( + readPreference='secondaryPreferred', + maxStalenessMS=10000).pymongo_test.test + try: + coll.find_one() + except OperationFailure as exc: + self.assertEqual(160, exc.code) + else: + self.fail("mongos accepted invalid staleness") if __name__ == "__main__": unittest.main() diff --git a/test/test_topology.py b/test/test_topology.py index 3d3066d0f..f1e0eb22b 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -710,12 +710,12 @@ class TestServerSelectionErrors(TopologyTest): self.assertMessage( 'No replica set members match selector' - ' "Secondary(tag_sets=None)"', + ' "Secondary(tag_sets=None, max_staleness=None)"', t, ReadPreference.SECONDARY) self.assertMessage( "No replica set members match selector" - " \"Secondary(tag_sets=[{'dc': 'ny'}])\"", + " \"Secondary(tag_sets=[{'dc': 'ny'}], max_staleness=None)\"", t, Secondary(tag_sets=[{'dc': 'ny'}])) def test_bad_replica_set_name(self):