PYTHON-879 - Backport new read preference classes from 3.x.
This commit is contained in:
parent
8d072d59e8
commit
32c06e0494
@ -13,7 +13,10 @@
|
||||
|
||||
Alias for :class:`pymongo.mongo_replica_set_client.MongoReplicaSetClient`.
|
||||
|
||||
.. autoclass:: pymongo.read_preferences.ReadPreference
|
||||
.. data:: ReadPreference
|
||||
|
||||
Alias for :class:`pymongo.read_preferences.ReadPreference`
|
||||
|
||||
.. autofunction:: has_c
|
||||
.. data:: MIN_SUPPORTED_WIRE_VERSION
|
||||
|
||||
@ -40,6 +43,7 @@ Sub-modules:
|
||||
mongo_client
|
||||
mongo_replica_set_client
|
||||
pool
|
||||
read_preferences
|
||||
replica_set_connection
|
||||
son_manipulator
|
||||
cursor_manager
|
||||
|
||||
19
doc/api/pymongo/read_preferences.rst
Normal file
19
doc/api/pymongo/read_preferences.rst
Normal file
@ -0,0 +1,19 @@
|
||||
:mod:`read_preferences` -- Utilities for choosing which member of a replica set to read from.
|
||||
=============================================================================================
|
||||
|
||||
.. automodule:: pymongo.read_preferences
|
||||
:synopsis: Utilities for choosing which member of a replica set to read from.
|
||||
|
||||
.. autoclass:: pymongo.read_preferences.Primary
|
||||
:inherited-members:
|
||||
.. autoclass:: pymongo.read_preferences.PrimaryPreferred
|
||||
:inherited-members:
|
||||
.. autoclass:: pymongo.read_preferences.Secondary
|
||||
:inherited-members:
|
||||
.. autoclass:: pymongo.read_preferences.SecondaryPreferred
|
||||
:inherited-members:
|
||||
.. autoclass:: pymongo.read_preferences.Nearest
|
||||
:inherited-members:
|
||||
|
||||
.. autoclass:: ReadPreference
|
||||
|
||||
@ -19,6 +19,13 @@ import random
|
||||
from pymongo.errors import ConfigurationError
|
||||
|
||||
|
||||
_PRIMARY = 0
|
||||
_PRIMARY_PREFERRED = 1
|
||||
_SECONDARY = 2
|
||||
_SECONDARY_PREFERRED = 3
|
||||
_NEAREST = 4
|
||||
|
||||
|
||||
class ReadPreference:
|
||||
"""An enum that defines the read preference modes supported by PyMongo.
|
||||
Used in three cases:
|
||||
@ -54,20 +61,20 @@ class ReadPreference:
|
||||
* `NEAREST`: Queries are distributed among all members.
|
||||
"""
|
||||
|
||||
PRIMARY = 0
|
||||
PRIMARY_PREFERRED = 1
|
||||
SECONDARY = 2
|
||||
SECONDARY_ONLY = 2
|
||||
SECONDARY_PREFERRED = 3
|
||||
NEAREST = 4
|
||||
PRIMARY = _PRIMARY
|
||||
PRIMARY_PREFERRED = _PRIMARY_PREFERRED
|
||||
SECONDARY = _SECONDARY
|
||||
SECONDARY_ONLY = _SECONDARY
|
||||
SECONDARY_PREFERRED = _SECONDARY_PREFERRED
|
||||
NEAREST = _NEAREST
|
||||
|
||||
# For formatting error messages
|
||||
modes = {
|
||||
ReadPreference.PRIMARY: 'PRIMARY',
|
||||
ReadPreference.PRIMARY_PREFERRED: 'PRIMARY_PREFERRED',
|
||||
ReadPreference.SECONDARY: 'SECONDARY',
|
||||
ReadPreference.SECONDARY_PREFERRED: 'SECONDARY_PREFERRED',
|
||||
ReadPreference.NEAREST: 'NEAREST',
|
||||
_PRIMARY: 'PRIMARY',
|
||||
_PRIMARY_PREFERRED: 'PRIMARY_PREFERRED',
|
||||
_SECONDARY: 'SECONDARY',
|
||||
_SECONDARY_PREFERRED: 'SECONDARY_PREFERRED',
|
||||
_NEAREST: 'NEAREST',
|
||||
}
|
||||
|
||||
_mongos_modes = [
|
||||
@ -118,38 +125,29 @@ def select_member_with_tags(members, tags, secondary_only, latency):
|
||||
return random.choice(near_candidates)
|
||||
|
||||
|
||||
def select_member(
|
||||
members,
|
||||
mode=ReadPreference.PRIMARY,
|
||||
tag_sets=None,
|
||||
latency=15
|
||||
):
|
||||
def select_member(members,
|
||||
mode=ReadPreference.PRIMARY,
|
||||
tag_sets=None,
|
||||
latency=15):
|
||||
"""Return a Member or None.
|
||||
"""
|
||||
if tag_sets is None:
|
||||
tag_sets = [{}]
|
||||
|
||||
# For brevity
|
||||
PRIMARY = ReadPreference.PRIMARY
|
||||
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
|
||||
SECONDARY = ReadPreference.SECONDARY
|
||||
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
|
||||
NEAREST = ReadPreference.NEAREST
|
||||
|
||||
if mode == PRIMARY:
|
||||
if mode == _PRIMARY:
|
||||
if tag_sets != [{}]:
|
||||
raise ConfigurationError("PRIMARY cannot be combined with tags")
|
||||
return select_primary(members)
|
||||
|
||||
elif mode == PRIMARY_PREFERRED:
|
||||
elif mode == _PRIMARY_PREFERRED:
|
||||
# Recurse.
|
||||
candidate_primary = select_member(members, PRIMARY, [{}], latency)
|
||||
candidate_primary = select_member(members, _PRIMARY, [{}], latency)
|
||||
if candidate_primary:
|
||||
return candidate_primary
|
||||
else:
|
||||
return select_member(members, SECONDARY, tag_sets, latency)
|
||||
return select_member(members, _SECONDARY, tag_sets, latency)
|
||||
|
||||
elif mode == SECONDARY:
|
||||
elif mode == _SECONDARY:
|
||||
for tags in tag_sets:
|
||||
candidate = select_member_with_tags(members, tags, True, latency)
|
||||
if candidate:
|
||||
@ -157,16 +155,16 @@ def select_member(
|
||||
|
||||
return None
|
||||
|
||||
elif mode == SECONDARY_PREFERRED:
|
||||
elif mode == _SECONDARY_PREFERRED:
|
||||
# Recurse.
|
||||
candidate_secondary = select_member(
|
||||
members, SECONDARY, tag_sets, latency)
|
||||
members, _SECONDARY, tag_sets, latency)
|
||||
if candidate_secondary:
|
||||
return candidate_secondary
|
||||
else:
|
||||
return select_member(members, PRIMARY, [{}], latency)
|
||||
return select_member(members, _PRIMARY, [{}], latency)
|
||||
|
||||
elif mode == NEAREST:
|
||||
elif mode == _NEAREST:
|
||||
for tags in tag_sets:
|
||||
candidate = select_member_with_tags(members, tags, False, latency)
|
||||
if candidate:
|
||||
@ -203,3 +201,208 @@ class MovingAverage(object):
|
||||
|
||||
def get(self):
|
||||
return self.average
|
||||
|
||||
|
||||
def _validate_tag_sets(tag_sets):
|
||||
"""Validate tag sets for a MongoReplicaSetClient.
|
||||
"""
|
||||
if tag_sets is None:
|
||||
return tag_sets
|
||||
|
||||
if not isinstance(tag_sets, list):
|
||||
raise TypeError((
|
||||
"Tag sets %r invalid, must be a list") % (tag_sets,))
|
||||
if len(tag_sets) == 0:
|
||||
raise ValueError((
|
||||
"Tag sets %r invalid, must be None or contain at least one set of"
|
||||
" tags") % (tag_sets,))
|
||||
|
||||
for tags in tag_sets:
|
||||
if not isinstance(tags, dict):
|
||||
raise TypeError(
|
||||
"Tag set %r invalid, must be an instance of dict, or"
|
||||
"bson.son.SON" % (tags,))
|
||||
|
||||
return tag_sets
|
||||
|
||||
|
||||
class _ServerMode(object):
|
||||
"""Base class for all read preferences.
|
||||
"""
|
||||
|
||||
__slots__ = ("__mongos_mode", "__mode", "__tag_sets")
|
||||
|
||||
def __init__(self, mode, tag_sets=None):
|
||||
if mode == _PRIMARY and tag_sets is not None:
|
||||
raise ConfigurationError("Read preference primary "
|
||||
"cannot be combined with tags")
|
||||
self.__mongos_mode = _mongos_modes[mode]
|
||||
self.__mode = mode
|
||||
self.__tag_sets = _validate_tag_sets(tag_sets)
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""The name of this read preference.
|
||||
"""
|
||||
return self.__class__.__name__
|
||||
|
||||
@property
|
||||
def document(self):
|
||||
"""Read preference as a document.
|
||||
"""
|
||||
if self.__tag_sets in (None, [{}]):
|
||||
return {'mode': self.__mongos_mode}
|
||||
return {'mode': self.__mongos_mode, 'tags': self.__tag_sets}
|
||||
|
||||
@property
|
||||
def mode(self):
|
||||
"""The mode of this read preference instance.
|
||||
"""
|
||||
return self.__mode
|
||||
|
||||
@property
|
||||
def tag_sets(self):
|
||||
"""Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
|
||||
read only from members whose ``dc`` tag has the value ``"ny"``.
|
||||
To specify a priority-order for tag sets, provide a list of
|
||||
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
|
||||
set, ``{}``, means "read from any member that matches the mode,
|
||||
ignoring tags." MongoReplicaSetClient tries each set of tags in turn
|
||||
until it finds a set of tags with at least one matching member.
|
||||
|
||||
.. seealso:: `Data-Center Awareness
|
||||
<http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_
|
||||
"""
|
||||
if self.__tag_sets:
|
||||
return list(self.__tag_sets)
|
||||
return [{}]
|
||||
|
||||
def __repr__(self):
|
||||
return "%s(tag_sets=%r)" % (
|
||||
self.name, self.__tag_sets)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, _ServerMode):
|
||||
return (self.mode == other.mode and
|
||||
self.tag_sets == other.tag_sets)
|
||||
raise NotImplementedError
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
||||
|
||||
def __getstate__(self):
|
||||
"""Return value of object for pickling.
|
||||
Needed explicitly because __slots__() defined.
|
||||
"""
|
||||
return {'mode': self.__mode, 'tag_sets': self.__tag_sets}
|
||||
|
||||
def __setstate__(self, value):
|
||||
"""Restore from pickling."""
|
||||
self.__mode = value['mode']
|
||||
self.__mongos_mode = _mongos_modes[self.__mode]
|
||||
self.__tag_sets = _validate_tag_sets(value['tag_sets'])
|
||||
|
||||
|
||||
class Primary(_ServerMode):
|
||||
"""Primary read preference.
|
||||
|
||||
* When directly connected to one mongod queries are allowed if the server
|
||||
is standalone or a replica set primary.
|
||||
* When connected to a mongos queries are sent to the primary of a shard.
|
||||
* When connected to a replica set queries are sent to the primary of
|
||||
the replica set.
|
||||
|
||||
.. versionadded:: 2.9
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(Primary, self).__init__(_PRIMARY)
|
||||
|
||||
def __repr__(self):
|
||||
return "Primary()"
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, _ServerMode):
|
||||
return other.mode == _PRIMARY
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PrimaryPreferred(_ServerMode):
|
||||
"""PrimaryPreferred read preference.
|
||||
|
||||
* When directly connected to one mongod queries are allowed to standalone
|
||||
servers, to a replica set primary, or to replica set secondaries.
|
||||
* When connected to a mongos queries are sent to the primary of a shard if
|
||||
available, otherwise a shard secondary.
|
||||
* When connected to a replica set queries are sent to the primary if
|
||||
available, otherwise a secondary.
|
||||
|
||||
:Parameters:
|
||||
- `tag_sets`: The :attr:`~tag_sets` to use if the primary is not
|
||||
available.
|
||||
|
||||
.. versionadded:: 2.9
|
||||
"""
|
||||
|
||||
def __init__(self, tag_sets=None):
|
||||
super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets)
|
||||
|
||||
|
||||
class Secondary(_ServerMode):
|
||||
"""Secondary read preference.
|
||||
|
||||
* When directly connected to one mongod queries are allowed to standalone
|
||||
servers, to a replica set primary, or to replica set secondaries.
|
||||
* When connected to a mongos queries are distributed among shard
|
||||
secondaries. An error is raised if no secondaries are available.
|
||||
* When connected to a replica set queries are distributed among
|
||||
secondaries. An error is raised if no secondaries are available.
|
||||
|
||||
:Parameters:
|
||||
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
|
||||
|
||||
.. versionadded:: 2.9
|
||||
"""
|
||||
|
||||
def __init__(self, tag_sets=None):
|
||||
super(Secondary, self).__init__(_SECONDARY, tag_sets)
|
||||
|
||||
|
||||
class SecondaryPreferred(_ServerMode):
|
||||
"""SecondaryPreferred read preference.
|
||||
|
||||
* When directly connected to one mongod queries are allowed to standalone
|
||||
servers, to a replica set primary, or to replica set secondaries.
|
||||
* When connected to a mongos queries are distributed among shard
|
||||
secondaries, or the shard primary if no secondary is available.
|
||||
* When connected to a replica set queries are distributed among
|
||||
secondaries, or the primary if no secondary is available.
|
||||
|
||||
:Parameters:
|
||||
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
|
||||
|
||||
.. versionadded:: 2.9
|
||||
"""
|
||||
|
||||
def __init__(self, tag_sets=None):
|
||||
super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets)
|
||||
|
||||
|
||||
class Nearest(_ServerMode):
|
||||
"""Nearest read preference.
|
||||
|
||||
* When directly connected to one mongod queries are allowed to standalone
|
||||
servers, to a replica set primary, or to replica set secondaries.
|
||||
* When connected to a mongos queries are distributed among all members of
|
||||
a shard.
|
||||
* When connected to a replica set queries are distributed among all
|
||||
members.
|
||||
|
||||
:Parameters:
|
||||
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
|
||||
|
||||
.. versionadded:: 2.9
|
||||
"""
|
||||
|
||||
def __init__(self, tag_sets=None):
|
||||
super(Nearest, self).__init__(_NEAREST, tag_sets)
|
||||
|
||||
@ -13,6 +13,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
"""Test the replica_set_connection module."""
|
||||
|
||||
import copy
|
||||
import pickle
|
||||
import random
|
||||
import sys
|
||||
import unittest
|
||||
@ -28,7 +31,9 @@ from pymongo.master_slave_connection import MasterSlaveConnection
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
|
||||
from pymongo.read_preferences import (ReadPreference, modes, MovingAverage,
|
||||
secondary_ok_commands)
|
||||
secondary_ok_commands, Primary,
|
||||
PrimaryPreferred, Secondary,
|
||||
SecondaryPreferred, Nearest)
|
||||
from pymongo.errors import ConfigurationError
|
||||
|
||||
from test.test_replica_set_client import TestReplicaSetClientBase
|
||||
@ -710,5 +715,101 @@ class TestMongosConnection(unittest.TestCase):
|
||||
cursor = c.pymongo_test["$cmd"].find(command.copy())
|
||||
self.assertEqual(command, cursor._Cursor__query_spec())
|
||||
|
||||
|
||||
class TestServerModes(unittest.TestCase):
|
||||
prefs = [Primary(), Secondary(), Nearest(tag_sets=[{'a': 1}, {'b': 2}])]
|
||||
|
||||
def test_primary(self):
|
||||
pref = Primary()
|
||||
self.assertEqual(pref.mode, 0)
|
||||
self.assertEqual(pref.name, 'Primary')
|
||||
self.assertEqual(pref.document, {'mode': 'primary'})
|
||||
self.assertEqual(pref.tag_sets, [{}])
|
||||
self.assertEqual(pref, Primary())
|
||||
self.assertNotEqual(pref, PrimaryPreferred())
|
||||
|
||||
def test_primary_preferred(self):
|
||||
pref = PrimaryPreferred()
|
||||
self.assertEqual(pref.mode, 1)
|
||||
self.assertEqual(pref.name, 'PrimaryPreferred')
|
||||
self.assertEqual(pref.document, {'mode': 'primaryPreferred'})
|
||||
self.assertEqual(pref.tag_sets, [{}])
|
||||
self.assertEqual(pref, PrimaryPreferred())
|
||||
self.assertNotEqual(pref, Primary())
|
||||
|
||||
pref = PrimaryPreferred([{"dc": "ny"}])
|
||||
self.assertEqual(pref.document,
|
||||
{'mode': 'primaryPreferred', 'tags': [{"dc": "ny"}]})
|
||||
self.assertEqual(pref.tag_sets, [{"dc": "ny"}])
|
||||
self.assertEqual(pref, PrimaryPreferred([{"dc": "ny"}]))
|
||||
self.assertNotEqual(pref, PrimaryPreferred())
|
||||
self.assertNotEqual(pref, PrimaryPreferred([{"dc": "sf"}]))
|
||||
|
||||
def test_secondary(self):
|
||||
pref = Secondary()
|
||||
self.assertEqual(pref.mode, 2)
|
||||
self.assertEqual(pref.name, 'Secondary')
|
||||
self.assertEqual(pref.document, {'mode': 'secondary'})
|
||||
self.assertEqual(pref.tag_sets, [{}])
|
||||
self.assertEqual(pref, Secondary())
|
||||
self.assertNotEqual(pref, Primary())
|
||||
|
||||
pref = Secondary([{"dc": "ny"}])
|
||||
self.assertEqual(pref.document,
|
||||
{'mode': 'secondary', 'tags': [{"dc": "ny"}]})
|
||||
self.assertEqual(pref.tag_sets, [{"dc": "ny"}])
|
||||
self.assertEqual(pref, Secondary([{"dc": "ny"}]))
|
||||
self.assertNotEqual(pref, Secondary())
|
||||
self.assertNotEqual(pref, Secondary([{"dc": "sf"}]))
|
||||
|
||||
def test_secondary_preferred(self):
|
||||
pref = SecondaryPreferred()
|
||||
self.assertEqual(pref.mode, 3)
|
||||
self.assertEqual(pref.name, 'SecondaryPreferred')
|
||||
self.assertEqual(pref.document, {'mode': 'secondaryPreferred'})
|
||||
self.assertEqual(pref.tag_sets, [{}])
|
||||
self.assertEqual(pref, SecondaryPreferred())
|
||||
self.assertNotEqual(pref, Primary())
|
||||
|
||||
pref = SecondaryPreferred([{"dc": "ny"}])
|
||||
self.assertEqual(pref.document,
|
||||
{'mode': 'secondaryPreferred', 'tags': [{"dc": "ny"}]})
|
||||
self.assertEqual(pref.tag_sets, [{"dc": "ny"}])
|
||||
self.assertEqual(pref, SecondaryPreferred([{"dc": "ny"}]))
|
||||
self.assertNotEqual(pref, SecondaryPreferred())
|
||||
self.assertNotEqual(pref, SecondaryPreferred([{"dc": "sf"}]))
|
||||
|
||||
def test_nearest(self):
|
||||
pref = Nearest()
|
||||
self.assertEqual(pref.mode, 4)
|
||||
self.assertEqual(pref.name, 'Nearest')
|
||||
self.assertEqual(pref.document, {'mode': 'nearest'})
|
||||
self.assertEqual(pref.tag_sets, [{}])
|
||||
self.assertEqual(pref, Nearest())
|
||||
self.assertNotEqual(pref, Primary())
|
||||
|
||||
pref = Nearest([{"dc": "ny"}])
|
||||
self.assertEqual(pref.document,
|
||||
{'mode': 'nearest', 'tags': [{"dc": "ny"}]})
|
||||
self.assertEqual(pref.tag_sets, [{"dc": "ny"}])
|
||||
self.assertEqual(pref, Nearest([{"dc": "ny"}]))
|
||||
self.assertNotEqual(pref, Nearest())
|
||||
self.assertNotEqual(pref, Nearest([{"dc": "sf"}]))
|
||||
|
||||
def test_pickle(self):
|
||||
for pref in self.prefs:
|
||||
self.assertEqual(pref, pickle.loads(pickle.dumps(pref)))
|
||||
|
||||
def test_copy(self):
|
||||
for pref in self.prefs:
|
||||
self.assertEqual(pref, copy.copy(pref))
|
||||
|
||||
def test_tag_sets_validation(self):
|
||||
self.assertRaises(TypeError, Secondary, {})
|
||||
self.assertRaises(ValueError, Secondary, [])
|
||||
self.assertRaises(TypeError, Secondary, ["foo"])
|
||||
self.assertRaises(TypeError, Secondary, [{"dc": "ny"}, "foo"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user