PYTHON-672 - Make read preference a class

This commit introduces the following changes:

- Each read preference is now a class.
- Read preferences other than Primary accept a tag_sets parameter.
- The tag_sets attribute of MongoClient, Database, and Collection has
  been removed. Use the tag_sets parameter of the new classes instead.
- The read_preferences.ReadPreference constant still exists and should
continue to work as expected for most users.
This commit is contained in:
behackett 2014-04-14 16:28:07 -07:00
parent 2f86207246
commit e0b52baf8e
22 changed files with 486 additions and 375 deletions

View File

@ -24,7 +24,6 @@
.. autoattribute:: name
.. autoattribute:: database
.. autoattribute:: read_preference
.. autoattribute:: tag_sets
.. autoattribute:: write_concern
.. autoattribute:: uuid_subtype
.. automethod:: insert(doc_or_docs[, manipulate=True[, check_keys=True[, continue_on_error=False[, **kwargs]]]])
@ -34,7 +33,7 @@
.. automethod:: initialize_unordered_bulk_op
.. automethod:: initialize_ordered_bulk_op
.. automethod:: drop
.. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, exhaust=False, [compile_re=True, [,**kwargs]]]]]]]]]]]]]]]]])
.. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, await_data=False[, partial=False[, manipulate=True[, read_preference=None[, exhaust=False[, compile_re=True]]]]]]]]]]]]]]]])
.. automethod:: find_one([spec_or_id=None[, *args[, **kwargs]]])
.. automethod:: parallel_scan
.. automethod:: count

View File

@ -4,7 +4,7 @@
.. automodule:: pymongo.cursor
:synopsis: Tools for iterating over MongoDB query results
.. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=ReadPreference.PRIMARY, tag_sets=[{}], exhaust=False)
.. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=None, exhaust=False, compile_re=True)
:members:
.. describe:: c[index]

View File

@ -24,7 +24,6 @@
attribute of the :class:`Database` class eg: db[`collection_name`].
.. autoattribute:: read_preference
.. autoattribute:: tag_sets
.. autoattribute:: write_concern
.. autoattribute:: uuid_subtype

View File

@ -13,7 +13,10 @@
Alias for :class:`pymongo.mongo_replica_set_client.MongoReplicaSetClient`.
.. autoclass:: pymongo.read_preferences.ReadPreference
.. data:: ReadPreference
Alias for :class:`pymongo.read_preferences.ReadPreference`.
.. autofunction:: has_c
.. data:: MIN_SUPPORTED_WIRE_VERSION
@ -38,6 +41,7 @@ Sub-modules:
mongo_client
mongo_replica_set_client
pool
read_preferences
son_manipulator
cursor_manager
uri_parser

View File

@ -30,7 +30,6 @@
.. autoattribute:: min_wire_version
.. autoattribute:: max_wire_version
.. autoattribute:: read_preference
.. autoattribute:: tag_sets
.. autoattribute:: acceptable_latency_ms
.. autoattribute:: write_concern
.. autoattribute:: uuid_subtype

View File

@ -31,7 +31,6 @@
.. autoattribute:: max_wire_version
.. autoattribute:: auto_start_request
.. autoattribute:: read_preference
.. autoattribute:: tag_sets
.. autoattribute:: acceptable_latency_ms
.. autoattribute:: write_concern
.. autoattribute:: uuid_subtype

View File

@ -0,0 +1,20 @@
:mod:`read_preferences` -- Utilities for choosing which member of a replica set to read from.
=============================================================================================
.. automodule:: pymongo.read_preferences
:synopsis: Utilities for choosing which member of a replica set to read from.
.. autoclass:: pymongo.read_preferences.Primary
:inherited-members:
.. autoclass:: pymongo.read_preferences.PrimaryPreferred
:inherited-members:
.. autoclass:: pymongo.read_preferences.Secondary
:inherited-members:
.. autoclass:: pymongo.read_preferences.SecondaryPreferred
:inherited-members:
.. autoclass:: pymongo.read_preferences.Nearest
:inherited-members:
.. autodata:: ReadPreference
.. autodata:: SECONDARY_OK_COMMANDS

View File

@ -228,19 +228,20 @@ and **acceptableLatencyMS**.
Replica-set members can be `tagged
<http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_ according to any
criteria you choose. By default, MongoReplicaSetClient ignores tags when
choosing a member to read from, but it can be configured with the ``tag_sets``
parameter. ``tag_sets`` must be a list of dictionaries, each dict providing tag
values that the replica set member must match. MongoReplicaSetClient tries each
set of tags in turn until it finds a set of tags with at least one matching
member. For example, to prefer reads from the New York data center, but fall
back to the San Francisco data center, tag your replica set members according
to their location and create a MongoReplicaSetClient like so:
choosing a member to read from, but your read preference can be configured with
a ``tag_sets`` parameter. ``tag_sets`` must be a list of dictionaries, each
dict providing tag values that the replica set member must match.
MongoReplicaSetClient tries each set of tags in turn until it finds a set of
tags with at least one matching member. For example, to prefer reads from the
New York data center, but fall back to the San Francisco data center, tag your
replica set members according to their location and create a
MongoReplicaSetClient like so:
>>> from pymongo.read_preferences import Secondary
>>> rsc = MongoReplicaSetClient(
... "morton.local:27017",
... replicaSet='foo'
... read_preference=ReadPreference.SECONDARY,
... tag_sets=[{'dc': 'ny'}, {'dc': 'sf'}]
... read_preference=Secondary(tag_sets=[{'dc': 'ny'}, {'dc': 'sf'}])
... )
MongoReplicaSetClient tries to find secondaries in New York, then San Francisco,
@ -248,6 +249,8 @@ and raises :class:`~pymongo.errors.AutoReconnect` if none are available. As an
additional fallback, specify a final, empty tag set, ``{}``, which means "read
from any member that matches the mode, ignoring tags."
See :mod:`~pymongo.read_preferences` for more information.
**acceptableLatencyMS**:
If multiple members match the mode and tag sets, MongoReplicaSetClient reads

View File

@ -324,7 +324,6 @@ class GridFS(object):
examined when performing the query
- `read_preference` (optional): The read preference for
this query.
- `tag_sets` (optional): The tag sets for this query.
- `compile_re` (optional): if ``False``, don't attempt to compile
BSON regex objects into Python regexes. Return instances of
:class:`~bson.regex.Regex` instead.

View File

@ -625,7 +625,7 @@ class GridOutCursor(Cursor):
"""
def __init__(self, collection, spec=None, skip=0, limit=0,
timeout=True, sort=None, max_scan=None,
read_preference=None, tag_sets=None, compile_re=True):
read_preference=None, compile_re=True):
"""Create a new cursor, similar to the normal
:class:`~pymongo.cursor.Cursor`.
@ -641,12 +641,11 @@ class GridOutCursor(Cursor):
# Copy these settings from collection if they are not set by caller.
read_preference = read_preference or collection.files.read_preference
tag_sets = tag_sets or collection.files.tag_sets
super(GridOutCursor, self).__init__(
collection.files, spec, skip=skip, limit=limit, timeout=timeout,
sort=sort, max_scan=max_scan, read_preference=read_preference,
compile_re=compile_re, tag_sets=tag_sets)
compile_re=compile_re)
def next(self):
"""Get next GridOut object from cursor.

View File

@ -87,7 +87,6 @@ class Collection(common.BaseObject):
"""
super(Collection, self).__init__(
read_preference=database.read_preference,
tag_sets=database.tag_sets,
uuidrepresentation=database.uuid_subtype,
**database.write_concern)
@ -781,7 +780,6 @@ class Collection(common.BaseObject):
outgoing SON manipulators before returning.
- `read_preference` (optional): The read preference for
this query.
- `tag_sets` (optional): The tag sets for this query.
- `compile_re` (optional): if ``False``, don't attempt to compile
BSON regex objects into Python regexes. Return instances of
:class:`~bson.regex.Regex` instead.
@ -816,7 +814,7 @@ class Collection(common.BaseObject):
version **>= 1.5.1**
.. versionchanged:: 3.0
Removed the `network_timeout` and
Removed the `network_timeout`, `tag_sets`, and
`secondary_acceptable_latency_ms` parameters.
.. versionadded:: 2.7
@ -843,13 +841,9 @@ class Collection(common.BaseObject):
.. mongodoc:: find
"""
if not 'read_preference' in kwargs:
kwargs['read_preference'] = self.read_preference
if not 'tag_sets' in kwargs:
kwargs['tag_sets'] = self.tag_sets
return Cursor(self, *args, **kwargs)
def parallel_scan(self, num_cursors, **kwargs):
def parallel_scan(self, num_cursors, read_preference=None, **kwargs):
"""Scan this entire collection in parallel.
Returns a list of up to ``num_cursors`` cursors that can be iterated
@ -887,21 +881,20 @@ class Collection(common.BaseObject):
:Parameters:
- `num_cursors`: the number of cursors to return
- `read_preference`: the read preference to use for this scan
.. note:: Requires server version **>= 2.5.5**.
"""
compile_re = kwargs.get('compile_re', False)
command_kwargs = {
'numCursors': num_cursors,
'read_preference': self.read_preference,
'tag_sets': self.tag_sets,
}
command_kwargs.update(kwargs)
cmd = SON([('parallelCollectionScan', self.__name),
('numCursors', num_cursors)])
result, conn_id = self.__database._command(
"parallelCollectionScan", self.__name, **command_kwargs)
mode = read_preference or self.read_preference
result, conn_id = self.__database._command(cmd,
read_preference=mode,
**kwargs)
return [CommandCursor(self,
cursor['cursor'],
@ -1230,7 +1223,7 @@ class Collection(common.BaseObject):
return options
def aggregate(self, pipeline, **kwargs):
def aggregate(self, pipeline, read_preference=None, **kwargs):
"""Perform an aggregation using the aggregation framework on this
collection.
@ -1242,6 +1235,7 @@ class Collection(common.BaseObject):
:Parameters:
- `pipeline`: a single command or list of aggregation commands
- `read_preference`: read preference to use for this aggregate
- `**kwargs`: send arbitrary parameters to the aggregate command
.. note:: Requires server version **>= 2.1.0**.
@ -1272,28 +1266,29 @@ class Collection(common.BaseObject):
if isinstance(pipeline, dict):
pipeline = [pipeline]
command_kwargs = {
'pipeline': pipeline,
'read_preference': self.read_preference,
'tag_sets': self.tag_sets,
}
cmd = SON([("aggregate", self.__name),
("pipeline", pipeline)])
command_kwargs.update(kwargs)
compile_re = kwargs.get('compile_re', True)
mode = read_preference or self.read_preference
result, conn_id = self.__database._command(
"aggregate", self.__name, **command_kwargs)
cmd, uuid_subtype=self.uuid_subtype,
read_preference=mode, **kwargs)
if 'cursor' in result:
return CommandCursor(
self,
result['cursor'],
conn_id,
command_kwargs.get('compile_re', True))
compile_re)
else:
return result
# TODO key and condition ought to be optional, but deprecation
# could be painful as argument order would have to change.
def group(self, key, condition, initial, reduce, finalize=None, **kwargs):
def group(self, key, condition, initial,
reduce, finalize=None, read_preference=None, **kwargs):
"""Perform a query similar to an SQL *group by* operation.
Returns an array of grouped items.
@ -1321,6 +1316,7 @@ class Collection(common.BaseObject):
- `initial`: initial value of the aggregation counter object
- `reduce`: aggregation function as a JavaScript string
- `finalize`: function to be called on each object in output list.
- `read_preference`: read preference to use for this group
.. versionchanged:: 2.2
Removed deprecated argument: command
@ -1345,10 +1341,10 @@ class Collection(common.BaseObject):
if finalize is not None:
group["finalize"] = Code(finalize)
mode = read_preference or self.read_preference
return self.__database.command("group", group,
uuid_subtype=self.uuid_subtype,
read_preference=self.read_preference,
tag_sets=self.tag_sets,
read_preference=mode,
**kwargs)["retval"]
def rename(self, new_name, **kwargs):
@ -1404,7 +1400,8 @@ class Collection(common.BaseObject):
"""
return self.find().distinct(key)
def map_reduce(self, map, reduce, out, full_response=False, **kwargs):
def map_reduce(self, map, reduce, out,
full_response=False, read_preference=None, **kwargs):
"""Perform a map/reduce operation on this collection.
If `full_response` is ``False`` (default) returns a
@ -1422,6 +1419,7 @@ class Collection(common.BaseObject):
e.g. SON([('replace', <collection name>), ('db', <database name>)])
- `full_response` (optional): if ``True``, return full response to
this command - otherwise just return the result collection
- `read_preference`: read preference to use for this map reduce
- `**kwargs` (optional): additional arguments to the
`map reduce command`_ may be passed as keyword arguments to this
helper method, e.g.::
@ -1448,11 +1446,11 @@ class Collection(common.BaseObject):
raise TypeError("'out' must be an instance of "
"%s or dict" % (basestring.__name__,))
mode = read_preference or self.read_preference
response = self.__database.command("mapreduce", self.__name,
uuid_subtype=self.uuid_subtype,
read_preference=mode,
map=map, reduce=reduce,
read_preference=self.read_preference,
tag_sets=self.tag_sets,
out=out, **kwargs)
if full_response or not response.get('result'):
@ -1464,7 +1462,8 @@ class Collection(common.BaseObject):
else:
return self.__database[response["result"]]
def inline_map_reduce(self, map, reduce, full_response=False, **kwargs):
def inline_map_reduce(self, map, reduce,
full_response=False, read_preference=None, **kwargs):
"""Perform an inline map/reduce operation on this collection.
Perform the map/reduce operation on the server in RAM. A result
@ -1486,6 +1485,7 @@ class Collection(common.BaseObject):
- `reduce`: reduce function (as a JavaScript string)
- `full_response` (optional): if ``True``, return full response to
this command - otherwise just return the result collection
- `read_preference`: read preference to use for this map reduce
- `**kwargs` (optional): additional arguments to the
`map reduce command`_ may be passed as keyword arguments to this
helper method, e.g.::
@ -1497,10 +1497,10 @@ class Collection(common.BaseObject):
.. versionadded:: 1.10
"""
mode = read_preference or self.read_preference
res = self.__database.command("mapreduce", self.__name,
uuid_subtype=self.uuid_subtype,
read_preference=self.read_preference,
tag_sets=self.tag_sets,
read_preference=mode,
map=map, reduce=reduce,
out={"inline": 1}, **kwargs)

View File

@ -15,11 +15,9 @@
"""Functions and classes common to multiple pymongo modules."""
import sys
import warnings
from pymongo import read_preferences
from pymongo.auth import MECHANISMS
from pymongo.read_preferences import ReadPreference
from pymongo.errors import ConfigurationError
from bson.binary import (OLD_UUID_SUBTYPE, UUID_SUBTYPE,
JAVA_LEGACY, CSHARP_LEGACY)
@ -183,40 +181,23 @@ def validate_timeout_or_none(option, value):
def validate_read_preference(dummy, value):
"""Validate read preference for a MongoReplicaSetClient.
"""Validate a read preference.
"""
if value in read_preferences.modes:
return value
if not isinstance(value, read_preferences.ServerMode):
raise ConfigurationError("%r is not a "
"valid read preference." % (value,))
return value
# Also allow string form of enum for uri_parser
def validate_read_preference_mode(dummy, name):
"""Validate read preference mode for a MongoReplicaSetClient.
"""
try:
return read_preferences.mongos_enum(value)
return read_preferences.read_pref_mode_from_name(name)
except ValueError:
raise ConfigurationError("Not a valid read preference")
def validate_tag_sets(dummy, value):
"""Validate tag sets for a MongoReplicaSetClient.
"""
if value is None:
return [{}]
if not isinstance(value, list):
raise ConfigurationError((
"Tag sets %s invalid, must be a list") % repr(value))
if len(value) == 0:
raise ConfigurationError((
"Tag sets %s invalid, must be None or contain at least one set of"
" tags") % repr(value))
for tags in value:
if not isinstance(tags, dict):
raise ConfigurationError(
"Tag set %s invalid, must be a dict" % repr(tags))
return value
def validate_auth_mechanism(option, value):
"""Validate the authMechanism URI option.
"""
@ -247,9 +228,26 @@ def validate_uuid_subtype(dummy, value):
return value
def validate_read_preference_tags(name, value):
"""Parse readPreferenceTags if passed as a client kwarg.
"""
# Parsed in uri_parser.parse_uri
if isinstance(value, list):
return value
tags = {}
try:
for tag in value.split(","):
key, val = tag.split(":")
tags[key] = val
except Exception:
raise ConfigurationError("%r not a valid value for %s" % (value, name))
return [tags]
# jounal is an alias for j,
# wtimeoutms is an alias for wtimeout,
# readpreferencetags is an alias for tag_sets.
VALIDATORS = {
'replicaset': validate_basestring,
'w': validate_int_or_basestring,
@ -267,10 +265,9 @@ VALIDATORS = {
'ssl_certfile': validate_readable,
'ssl_cert_reqs': validate_cert_reqs,
'ssl_ca_certs': validate_readable,
'readpreference': validate_read_preference,
'read_preference': validate_read_preference,
'readpreferencetags': validate_tag_sets,
'tag_sets': validate_tag_sets,
'readpreference': validate_read_preference_mode,
'readpreferencetags': validate_read_preference_tags,
'acceptablelatencyms': validate_positive_float,
'auto_start_request': validate_boolean,
'use_greenlets': validate_boolean,
@ -338,15 +335,10 @@ class BaseObject(object):
def __init__(self, **options):
self.__read_pref = ReadPreference.PRIMARY
self.__tag_sets = [{}]
self.__read_pref = read_preferences.ReadPreference.PRIMARY
self.__uuid_subtype = OLD_UUID_SUBTYPE
self.__write_concern = WriteConcern()
self.__set_options(options)
if (self.__read_pref == ReadPreference.PRIMARY
and self.__tag_sets != [{}]):
raise ConfigurationError(
"ReadPreference PRIMARY cannot be combined with tags")
def __set_write_concern_option(self, option, value):
"""Validates and sets getlasterror options for this
@ -360,10 +352,16 @@ class BaseObject(object):
def __set_options(self, options):
"""Validates and sets all options passed to this object."""
for option, value in options.iteritems():
if option in ('read_preference', "readpreference"):
if option == 'read_preference':
self.__read_pref = validate_read_preference(option, value)
elif option in ('tag_sets', 'readpreferencetags'):
self.__tag_sets = validate_tag_sets(option, value)
elif option == 'readpreference':
klass = read_preferences.read_pref_class_from_mode(value)
if value == 0:
# Primary, no tags
self.__read_pref = klass()
continue
tags = options.get('readpreferencetags', None)
self.__read_pref = klass(tags)
elif option == 'uuidrepresentation':
self.__uuid_subtype = validate_uuid_subtype(option, value)
elif option in WRITE_CONCERN_OPTIONS:
@ -453,32 +451,10 @@ class BaseObject(object):
def __set_read_pref(self, value):
"""Property setter for read_preference"""
self.__read_pref = validate_read_preference('read_preference', value)
self.__read_pref = validate_read_preference(None, value)
read_preference = property(__get_read_pref, __set_read_pref)
def __get_tag_sets(self):
"""Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
read only from members whose ``dc`` tag has the value ``"ny"``.
To specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags." MongoReplicaSetClient tries each set of tags in turn
until it finds a set of tags with at least one matching member.
.. seealso:: `Data-Center Awareness
<http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_
.. versionadded:: 2.3
"""
return self.__tag_sets
def __set_tag_sets(self, value):
"""Property setter for tag_sets"""
self.__tag_sets = validate_tag_sets('tag_sets', value)
tag_sets = property(__get_tag_sets, __set_tag_sets)
def __get_uuid_subtype(self):
"""This attribute specifies which BSON Binary subtype is used when
storing UUIDs. Historically UUIDs have been stored as BSON Binary

View File

@ -20,7 +20,7 @@ from bson import RE_TYPE
from bson.code import Code
from bson.son import SON
from pymongo import helpers, message, read_preferences
from pymongo.read_preferences import ReadPreference, secondary_ok_commands
from pymongo.read_preferences import ReadPreference, SECONDARY_OK_COMMANDS
from pymongo.errors import (AutoReconnect,
CursorNotFound,
InvalidOperation)
@ -68,8 +68,7 @@ class Cursor(object):
timeout=True, snapshot=False, tailable=False, sort=None,
max_scan=None, as_class=None,
await_data=False, partial=False, manipulate=True,
read_preference=ReadPreference.PRIMARY,
tag_sets=[{}], exhaust=False, compile_re=True,
read_preference=None, exhaust=False, compile_re=True,
_uuid_subtype=None):
"""Create a new cursor.
@ -146,8 +145,7 @@ class Cursor(object):
self.__comment = None
self.__as_class = as_class
self.__manipulate = manipulate
self.__read_preference = read_preference
self.__tag_sets = tag_sets
self.__read_preference = read_preference or collection.read_preference
self.__tz_aware = collection.database.connection.tz_aware
self.__compile_re = compile_re
self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype
@ -160,7 +158,7 @@ class Cursor(object):
self.__query_flags = 0
if tailable:
self.__query_flags |= _QUERY_OPTIONS["tailable_cursor"]
if read_preference != ReadPreference.PRIMARY:
if self.__read_preference != ReadPreference.PRIMARY:
self.__query_flags |= _QUERY_OPTIONS["slave_okay"]
if not timeout:
self.__query_flags |= _QUERY_OPTIONS["no_timeout"]
@ -233,7 +231,7 @@ class Cursor(object):
"comment", "max", "min",
"snapshot", "ordering", "explain", "hint",
"batch_size", "max_scan", "as_class",
"manipulate", "read_preference", "tag_sets",
"manipulate", "read_preference",
"uuid_subtype", "compile_re", "query_flags")
data = dict((k, v) for k, v in self.__dict__.iteritems()
if k.startswith('_Cursor__') and k[9:] in values_to_clone)
@ -301,22 +299,15 @@ class Cursor(object):
if (self.__collection.database.connection.is_mongos and
self.__read_preference != ReadPreference.PRIMARY):
has_tags = self.__tag_sets and self.__tag_sets != [{}]
# For maximum backwards compatibility, don't set $readPreference
# for SECONDARY_PREFERRED unless tags are in use. Just rely on
# the slaveOkay bit (set automatically if read preference is not
# PRIMARY), which has the same behavior.
if (self.__read_preference != ReadPreference.SECONDARY_PREFERRED or
has_tags):
read_pref = {
'mode': read_preferences.mongos_mode(self.__read_preference)
}
if has_tags:
read_pref['tags'] = self.__tag_sets
operators['$readPreference'] = read_pref
mode = self.__read_preference.mode
tag_sets = self.__read_preference.tag_sets
if (mode != ReadPreference.SECONDARY_PREFERRED.mode or
tag_sets != [{}]):
operators['$readPreference'] = self.__read_preference.document
if operators:
# Make a shallow copy so we can cleanly rewind or clone.
@ -328,7 +319,7 @@ class Cursor(object):
if self.collection.name == "$cmd":
# Don't change commands that can't be sent to secondaries
command_name = spec and spec.keys()[0].lower() or ""
if command_name not in secondary_ok_commands:
if command_name not in SECONDARY_OK_COMMANDS:
return spec
elif command_name == 'mapreduce':
# mapreduce shouldn't be changed if its not inline
@ -691,8 +682,6 @@ class Cursor(object):
command = {
"query": self.__spec,
"fields": self.__fields,
"read_preference": self.__read_preference,
"tag_sets": self.__tag_sets,
}
if self.__max_time_ms is not None:
command["maxTimeMS"] = self.__max_time_ms
@ -710,6 +699,7 @@ class Cursor(object):
allowable_errors=["ns missing"],
uuid_subtype=self.__uuid_subtype,
compile_re=self.__compile_re,
read_preference=self.__read_preference,
**command)
if r.get("errmsg", "") == "ns missing":
return 0
@ -744,9 +734,6 @@ class Cursor(object):
options = {"key": key}
if self.__spec:
options["query"] = self.__spec
options['read_preference'] = self.__read_preference
options['tag_sets'] = self.__tag_sets
if self.__max_time_ms is not None:
options['maxTimeMS'] = self.__max_time_ms
if self.__comment:
@ -757,6 +744,7 @@ class Cursor(object):
self.__collection.name,
uuid_subtype=self.__uuid_subtype,
compile_re=self.__compile_re,
read_preference=self.__read_preference,
**options)["values"]
def explain(self):
@ -854,7 +842,6 @@ class Cursor(object):
if message:
kwargs = {
"read_preference": self.__read_preference,
"tag_sets": self.__tag_sets,
"exhaust": self.__exhaust,
}
if self.__connection_id is not None:

View File

@ -27,7 +27,7 @@ from pymongo.errors import (CollectionInvalid,
InvalidName,
OperationFailure)
from pymongo.read_preferences import (ReadPreference,
modes, secondary_ok_commands)
SECONDARY_OK_COMMANDS)
def _check_name(name):
@ -62,7 +62,6 @@ class Database(common.BaseObject):
"""
super(Database,
self).__init__(read_preference=connection.read_preference,
tag_sets=connection.tag_sets,
uuidrepresentation=connection.uuid_subtype,
**connection.write_concern)
@ -270,7 +269,8 @@ class Database(common.BaseObject):
def _command(self, command, value=1,
check=True, allowable_errors=None,
uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, **kwargs):
uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True,
read_preference=None, **kwargs):
"""Internal command helper.
"""
@ -280,22 +280,25 @@ class Database(common.BaseObject):
else:
command_name = command.keys()[0].lower()
orig = mode = kwargs.pop('read_preference', self.read_preference)
tags = kwargs.pop('tag_sets', self.tag_sets)
as_class = kwargs.pop('as_class', None)
fields = kwargs.pop('fields', None)
if fields is not None and not isinstance(fields, dict):
fields = helpers._fields_list_to_dict(fields)
command.update(kwargs)
if command_name not in secondary_ok_commands:
orig = mode = read_preference or self.read_preference
if command_name not in SECONDARY_OK_COMMANDS:
mode = ReadPreference.PRIMARY
# Special-case: mapreduce can go to secondaries only if inline
elif command_name == 'mapreduce':
out = command.get('out') or kwargs.get('out')
out = command.get('out')
if not isinstance(out, dict) or not out.get('inline'):
mode = ReadPreference.PRIMARY
# Special-case: aggregate with $out cannot go to secondaries.
elif command_name == 'aggregate':
for stage in kwargs.get('pipeline', []):
for stage in command.get('pipeline', []):
if '$out' in stage:
mode = ReadPreference.PRIMARY
break
@ -304,21 +307,14 @@ class Database(common.BaseObject):
if mode != orig:
warnings.warn("%s does not support %s read preference "
"and will be routed to the primary instead." %
(command_name, modes[orig]), UserWarning)
tags = [{}]
(command_name, orig.name), UserWarning)
fields = kwargs.pop('fields', None)
if fields is not None and not isinstance(fields, dict):
fields = helpers._fields_list_to_dict(fields)
command.update(kwargs)
cursor = self["$cmd"].find(command,
fields=fields,
limit=-1,
as_class=as_class,
read_preference=mode,
tag_sets=tags,
compile_re=compile_re,
_uuid_subtype=uuid_subtype)
for doc in cursor:
@ -333,7 +329,8 @@ class Database(common.BaseObject):
def command(self, command, value=1,
check=True, allowable_errors=[],
uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, **kwargs):
uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True,
read_preference=None, **kwargs):
"""Issue a MongoDB command.
Send command `command` to the database and return the
@ -384,21 +381,13 @@ class Database(common.BaseObject):
:exc:`~bson.errors.InvalidBSON` errors when receiving
Python-incompatible regular expressions, for example from
``currentOp``
- `read_preference`: The read preference for this connection.
See :class:`~pymongo.read_preferences.ReadPreference` for available
options.
- `tag_sets`: Read from replica-set members with these tags.
To specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags." MongoReplicaSetClient tries each set of tags in
turn until it finds a set of tags with at least one matching
member.
- `read_preference`: The read preference for this operation.
- `**kwargs` (optional): additional keyword arguments will
be added to the command document before it is sent
.. versionchanged:: 3.0
Removed the `secondary_acceptable_latency_ms` option.
Removed the `tag_sets` and `secondary_acceptable_latency_ms`
options.
.. versionchanged:: 2.7
Added ``compile_re`` option.
.. versionchanged:: 2.3
@ -416,7 +405,8 @@ class Database(common.BaseObject):
.. mongodoc:: commands
"""
return self._command(command, value, check, allowable_errors,
uuid_subtype, compile_re, **kwargs)[0]
uuid_subtype, compile_re,
read_preference, **kwargs)[0]
def collection_names(self, include_system_collections=True):
"""Get a list of all the collection names in this database.

View File

@ -114,10 +114,10 @@ class Member(object):
assert not self.is_mongos, \
"Tried to match read preference mode on a mongos Member"
if mode == ReadPreference.PRIMARY and not self.is_primary:
if mode == ReadPreference.PRIMARY.mode and not self.is_primary:
return False
if mode == ReadPreference.SECONDARY and not self.is_secondary:
if mode == ReadPreference.SECONDARY.mode and not self.is_secondary:
return False
# If we're not primary or secondary, then we're in a state like

View File

@ -191,12 +191,7 @@ class MongoClient(common.BaseObject):
:class:`~pymongo.errors.AutoReconnect` "not master".
See :class:`~pymongo.read_preferences.ReadPreference` for all
available read preference options.
- `tag_sets`: Ignored unless connecting to a replica set via mongos.
Specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags.
- `acceptable_latency_ms`: (integer) When used with mongos
- `acceptableLatencyMS`: (integer) When used with mongos
high availability, any mongos whose ping time is within
acceptable_latency_ms of the nearest member may be chosen
as the new primary during a failover. Default 15 milliseconds.

View File

@ -37,7 +37,6 @@ import socket
import struct
import threading
import time
import warnings
import weakref
from bson.py3compat import b
@ -51,7 +50,7 @@ from pymongo import (auth,
uri_parser)
from pymongo.member import Member
from pymongo.read_preferences import (
ReadPreference, select_member, modes, MovingAverage)
ReadPreference, select_member, MovingAverage)
from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
@ -262,21 +261,21 @@ class RSState(object):
"""Return a Member instance or None for the given (host, port)."""
return self._host_to_member.get(host)
def pin_host(self, host, mode, tag_sets):
def pin_host(self, host, pref):
"""Pin this thread / greenlet to a member.
`host` is a (host, port) pair. The remaining parameters are a read
preference.
`host` is a (host, port) pair. The `pref` parameter is a
read_preferences.ServerMode subclass (the read preference).
"""
# Fun fact: Unlike in thread_util.ThreadIdent, we needn't lock around
# assignment here. Assignment to a threadlocal is only unsafe if it
# can cause other Python code to run implicitly.
self._threadlocal.host = host
self._threadlocal.read_preference = (mode, tag_sets)
self._threadlocal.read_preference = pref
def keep_pinned_host(self, mode, tag_sets):
def keep_pinned_host(self, pref):
"""Does a read pref match the last used by this thread / greenlet?"""
return self._threadlocal.read_preference == (mode, tag_sets)
return self._threadlocal.read_preference == pref
@property
def pinned_host(self):
@ -542,14 +541,7 @@ class MongoReplicaSetClient(common.BaseObject):
- `read_preference`: The read preference for this client.
See :class:`~pymongo.read_preferences.ReadPreference` for available
options.
- `tag_sets`: Read from replica-set members with these tags.
To specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags." :class:`MongoReplicaSetClient` tries each set of
tags in turn until it finds a set of tags with at least one matching
member.
- `acceptable_latency_ms`: (integer) Any replica-set member
- `acceptableLatencyMS`: (integer) Any replica-set member
whose ping time is within acceptable_latency_ms of the
nearest member may accept reads. Default 15 milliseconds.
@ -794,7 +786,7 @@ class MongoReplicaSetClient(common.BaseObject):
if connect:
# Try to authenticate even during failover.
member = select_member(
self.__rs_state.members, ReadPreference.PRIMARY_PREFERRED)
self.__rs_state.members, ReadPreference.PRIMARY_PREFERRED.mode)
if not member:
raise AutoReconnect(
@ -1589,8 +1581,7 @@ class MongoReplicaSetClient(common.BaseObject):
self._ensure_connected()
rs_state = self.__get_rs_state()
tag_sets = kwargs.get('tag_sets', [{}])
mode = kwargs.get('read_preference', ReadPreference.PRIMARY)
pref = kwargs.get('read_preference', ReadPreference.PRIMARY)
if not rs_state.primary_member:
# If we were initialized with _connect=False then connect now.
@ -1598,7 +1589,7 @@ class MongoReplicaSetClient(common.BaseObject):
# if one is not already in progress. If caller requested the
# primary, wait to see if it's up, otherwise continue with
# known-good members.
sync = (rs_state.initial or mode == ReadPreference.PRIMARY)
sync = (rs_state.initial or pref == ReadPreference.PRIMARY)
self.__schedule_refresh(sync=sync)
rs_state = self.__rs_state
@ -1632,15 +1623,14 @@ class MongoReplicaSetClient(common.BaseObject):
pinned_host = rs_state.pinned_host
pinned_member = rs_state.get(pinned_host)
if (pinned_member
and pinned_member.matches_mode(mode)
and pinned_member.matches_tag_sets(tag_sets) # TODO: REMOVE?
and rs_state.keep_pinned_host(mode, tag_sets)):
and pinned_member.matches_mode(pref.mode)
and rs_state.keep_pinned_host(pref)):
try:
return (
pinned_member.host,
self.__try_read(pinned_member, msg, **kwargs))
except AutoReconnect, why:
if mode == ReadPreference.PRIMARY:
if pref == ReadPreference.PRIMARY:
self.disconnect()
raise
else:
@ -1653,8 +1643,8 @@ class MongoReplicaSetClient(common.BaseObject):
while len(errors) < MAX_RETRY:
member = select_member(
members=members,
mode=mode,
tag_sets=tag_sets,
mode=pref.mode,
tag_sets=pref.tag_sets,
latency=self.__acceptable_latency)
if not member:
@ -1669,27 +1659,24 @@ class MongoReplicaSetClient(common.BaseObject):
if self.in_request():
# Keep reading from this member in this thread / greenlet
# unless read preference changes
rs_state.pin_host(member.host, mode, tag_sets)
rs_state.pin_host(member.host, pref)
return member.host, response
except AutoReconnect, why:
if mode == ReadPreference.PRIMARY:
if pref == ReadPreference.PRIMARY:
raise
errors.append(str(why))
members.remove(member)
# Ran out of tries
if mode == ReadPreference.PRIMARY:
if pref == ReadPreference.PRIMARY:
msg = "No replica set primary available for query"
elif mode == ReadPreference.SECONDARY:
elif pref.mode == ReadPreference.SECONDARY.mode:
msg = "No replica set secondary available for query"
else:
msg = "No replica set members available for query"
msg += " with ReadPreference %s" % modes[mode]
if tag_sets != [{}]:
msg += " and tags " + repr(tag_sets)
msg += " with read preference %r" % (pref,)
# Format a message like:
# 'No replica set secondary available for query with ReadPreference

View File

@ -16,75 +16,260 @@
import random
from collections import namedtuple
from pymongo.errors import ConfigurationError
class ReadPreference:
"""An enum that defines the read preference modes supported by PyMongo.
Used in three cases:
_PRIMARY = 0
_PRIMARY_PREFERRED = 1
_SECONDARY = 2
_SECONDARY_PREFERRED = 3
_NEAREST = 4
:class:`~pymongo.mongo_client.MongoClient` connected to a single host:
* `PRIMARY`: Queries are allowed if the host is standalone or the replica
set primary.
* All other modes allow queries to standalone servers, to the primary, or
to secondaries.
:class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a
sharded cluster of replica sets:
* `PRIMARY`: Queries are sent to the primary of a shard.
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
otherwise a secondary.
* `SECONDARY`: Queries are distributed among shard secondaries. An error
is raised if no secondaries are available.
* `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries,
or the primary if no secondary is available.
* `NEAREST`: Queries are distributed among all members of a shard.
:class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient`:
* `PRIMARY`: Queries are sent to the primary of the replica set.
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
otherwise a secondary.
* `SECONDARY`: Queries are distributed among secondaries. An error
is raised if no secondaries are available.
* `SECONDARY_PREFERRED`: Queries are distributed among secondaries,
or the primary if no secondary is available.
* `NEAREST`: Queries are distributed among all members.
"""
PRIMARY = 0
PRIMARY_PREFERRED = 1
SECONDARY = 2
SECONDARY_ONLY = 2
SECONDARY_PREFERRED = 3
NEAREST = 4
# For formatting error messages
modes = {
ReadPreference.PRIMARY: 'PRIMARY',
ReadPreference.PRIMARY_PREFERRED: 'PRIMARY_PREFERRED',
ReadPreference.SECONDARY: 'SECONDARY',
ReadPreference.SECONDARY_PREFERRED: 'SECONDARY_PREFERRED',
ReadPreference.NEAREST: 'NEAREST',
}
_mongos_modes = [
_MONGOS_MODES = (
'primary',
'primaryPreferred',
'secondary',
'secondaryPreferred',
'nearest',
]
)
def mongos_mode(mode):
return _mongos_modes[mode]
def mongos_enum(enum):
return _mongos_modes.index(enum)
def _validate_tag_sets(tag_sets):
"""Validate tag sets for a MongoReplicaSetClient.
"""
if tag_sets is None:
return [{}]
def select_primary(members):
if not isinstance(tag_sets, list):
raise ConfigurationError((
"Tag sets %r invalid, must be a list") % (tag_sets,))
if len(tag_sets) == 0:
raise ConfigurationError((
"Tag sets %r invalid, must be None or contain at least one set of"
" tags") % (tag_sets,))
for tags in tag_sets:
if not isinstance(tags, dict):
raise ConfigurationError(
"Tag set %r invalid, must be a dict" % (tags,))
return tag_sets
class ServerMode(object):
"""Base class for all read preferences.
"""
__slots__ = ("__mode", "__mongos_mode", "__tag_sets")
def __init__(self, mode, tag_sets=None):
if mode == _PRIMARY and tag_sets is not None:
raise ConfigurationError("PRIMARY cannot be combined with tags")
self.__mode = mode
self.__mongos_mode = _MONGOS_MODES[mode]
self.__tag_sets = _validate_tag_sets(tag_sets)
@property
def name(self):
"""The name of this read preference.
"""
return self.__class__.__name__
@property
def document(self):
"""Read preference as a document.
"""
return {'mode': self.__mongos_mode, 'tags': self.__tag_sets}
@property
def mode(self):
"""The mode of this read preference instance.
"""
return self.__mode
@property
def tag_sets(self):
"""Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
read only from members whose ``dc`` tag has the value ``"ny"``.
To specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags." MongoReplicaSetClient tries each set of tags in turn
until it finds a set of tags with at least one matching member.
.. seealso:: `Data-Center Awareness
<http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_
"""
return self.__tag_sets
def __repr__(self):
return "%s(%r)" % (self.name, self.__tag_sets)
def __eq__(self, other):
return self.mode == other.mode and self.tag_sets == other.tag_sets
class Primary(ServerMode):
"""Primary read preference.
* When directly connected to one mongod queries are allowed if the server
is standalone or a replica set primary.
* When connected to a mongos queries are sent to the primary of a shard.
* When connected to a replica set queries are sent to the primary of
the replica set.
"""
def __init__(self):
super(Primary, self).__init__(_PRIMARY)
def __repr__(self):
return "Primary()"
def __eq__(self, other):
return other.mode == _PRIMARY
class PrimaryPreferred(ServerMode):
"""PrimaryPreferred read preference.
* When directly connected to one mongod queries are allowed to standalone
servers, to a replica set primary, or to replica set secondaries.
* When connected to a mongos queries are sent to the primary of a shard if
available, otherwise a shard secondary.
* When connected to a replica set queries are sent to the primary if
available, otherwise a secondary.
:Parameters:
- `tag_sets`: The :attr:`~tag_sets` to use if the primary is not
available.
"""
def __init__(self, tag_sets=None):
super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets)
class Secondary(ServerMode):
"""Secondary read preference.
* When directly connected to one mongod queries are allowed to standalone
servers, to a replica set primary, or to replica set secondaries.
* When connected to a mongos queries are distributed among shard
secondaries. An error is raised if no secondaries are available.
* When connected to a replica set queries are distributed among
secondaries. An error is raised if no secondaries are available.
:Parameters:
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
"""
def __init__(self, tag_sets=None):
super(Secondary, self).__init__(_SECONDARY, tag_sets)
class SecondaryPreferred(ServerMode):
"""SecondaryPreferred read preference.
* When directly connected to one mongod queries are allowed to standalone
servers, to a replica set primary, or to replica set secondaries.
* When connected to a mongos queries are distributed among shard
secondaries, or the shard primary if no secondary is available.
* When connected to a replica set queries are distributed among
secondaries, or the primary if no secondary is available.
:Parameters:
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
"""
def __init__(self, tag_sets=None):
super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets)
class Nearest(ServerMode):
"""Nearest read preference.
* When directly connected to one mongod queries are allowed to standalone
servers, to a replica set primary, or to replica set secondaries.
* When connected to a mongos queries are distributed among all members of
a shard.
* When connected to a replica set queries are distributed among all
members.
:Parameters:
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
"""
def __init__(self, tag_sets=None):
super(Nearest, self).__init__(_NEAREST, tag_sets)
_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
Secondary, SecondaryPreferred, Nearest)
def read_pref_class_from_mode(mode):
"""Get the read preference class for a specific mode.
"""
return _ALL_READ_PREFERENCES[mode]
_MODES = (
'PRIMARY',
'PRIMARY_PREFERRED',
'SECONDARY',
'SECONDARY_PREFERRED',
'NEAREST',
)
ReadPreference = namedtuple("ReadPreference", _MODES)(
Primary(), PrimaryPreferred(), Secondary(), SecondaryPreferred(), Nearest())
"""An enum that defines the read preference modes supported by PyMongo.
Used in three cases:
:class:`~pymongo.mongo_client.MongoClient` connected to a single mongod:
* `PRIMARY`: Queries are allowed if the server is standalone or a replica
set primary.
* All other modes allow queries to standalone servers, to a replica set
primary, or to replica set secondaries.
:class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a
sharded cluster of replica sets:
* `PRIMARY`: Queries are sent to the primary of a shard.
* `PRIMARY_PREFERRED`: Queries are sent to the shard primary if available,
otherwise a shard secondary.
* `SECONDARY`: Queries are distributed among shard secondaries. An error
is raised if no secondaries are available.
* `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries,
or the shard primary if no secondary is available.
* `NEAREST`: Queries are distributed among all members of a shard.
:class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient`:
* `PRIMARY`: Queries are sent to the primary of the replica set.
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
otherwise a secondary.
* `SECONDARY`: Queries are distributed among secondaries. An error
is raised if no secondaries are available.
* `SECONDARY_PREFERRED`: Queries are distributed among secondaries,
or the primary if no secondary is available.
* `NEAREST`: Queries are distributed among all members.
"""
def read_pref_mode_from_name(name):
"""Get the read preference mode from mongos/uri name.
"""
return _MONGOS_MODES.index(name)
def _select_primary(members):
"""Get the primary member.
"""
for member in members:
if member.is_primary:
return member
@ -92,7 +277,9 @@ def select_primary(members):
return None
def select_member_with_tags(members, tags, secondary_only, latency):
def _select_member_with_tags(members, tags, secondary_only, latency):
"""Get the member matching the given tags, and acceptable latency.
"""
candidates = []
for candidate in members:
@ -118,57 +305,40 @@ def select_member_with_tags(members, tags, secondary_only, latency):
return random.choice(near_candidates)
def select_member(
members,
mode=ReadPreference.PRIMARY,
tag_sets=None,
latency=15
):
def select_member(members, mode, tag_sets=[{}], latency=15):
"""Return a Member or None.
"""
if tag_sets is None:
tag_sets = [{}]
if mode == _PRIMARY:
return _select_primary(members)
# For brevity
PRIMARY = ReadPreference.PRIMARY
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
SECONDARY = ReadPreference.SECONDARY
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
NEAREST = ReadPreference.NEAREST
if mode == PRIMARY:
if tag_sets != [{}]:
raise ConfigurationError("PRIMARY cannot be combined with tags")
return select_primary(members)
elif mode == PRIMARY_PREFERRED:
elif mode == _PRIMARY_PREFERRED:
# Recurse.
candidate_primary = select_member(members, PRIMARY, [{}], latency)
candidate_primary = select_member(members, _PRIMARY, [{}], latency)
if candidate_primary:
return candidate_primary
else:
return select_member(members, SECONDARY, tag_sets, latency)
return select_member(members, _SECONDARY, tag_sets, latency)
elif mode == SECONDARY:
elif mode == _SECONDARY:
for tags in tag_sets:
candidate = select_member_with_tags(members, tags, True, latency)
candidate = _select_member_with_tags(members, tags, True, latency)
if candidate:
return candidate
return None
elif mode == SECONDARY_PREFERRED:
elif mode == _SECONDARY_PREFERRED:
# Recurse.
candidate_secondary = select_member(
members, SECONDARY, tag_sets, latency)
members, _SECONDARY, tag_sets, latency)
if candidate_secondary:
return candidate_secondary
else:
return select_member(members, PRIMARY, [{}], latency)
return select_member(members, _PRIMARY, [{}], latency)
elif mode == NEAREST:
elif mode == _NEAREST:
for tags in tag_sets:
candidate = select_member_with_tags(members, tags, False, latency)
candidate = _select_member_with_tags(members, tags, False, latency)
if candidate:
return candidate
@ -176,23 +346,23 @@ def select_member(
return None
else:
raise ConfigurationError("Invalid mode %s" % repr(mode))
raise ConfigurationError("Invalid mode %d" % (mode,))
"""Commands that may be sent to replica-set secondaries, depending on
ReadPreference and tags. All other commands are always run on the primary.
"""
secondary_ok_commands = frozenset([
SECONDARY_OK_COMMANDS = frozenset([
"group", "aggregate", "collstats", "dbstats", "count", "distinct",
"geonear", "geosearch", "geowalk", "mapreduce", "getnonce", "authenticate",
"text", "parallelcollectionscan"
])
"""Commands that may be sent to replica-set secondaries, depending on
ReadPreference and tags. All other commands are always run on the primary.
"""
class MovingAverage(object):
"""Immutable structure to track a 5-sample moving average.
"""
def __init__(self, samples):
"""Immutable structure to track a 5-sample moving average.
"""
self.samples = samples[-5:]
assert self.samples
self.average = sum(self.samples) / float(len(self.samples))
@ -202,4 +372,6 @@ class MovingAverage(object):
return MovingAverage(self.samples + [sample])
def get(self):
"""Get the calculated average.
"""
return self.average

View File

@ -34,7 +34,7 @@ from pymongo.member import Member
from pymongo.mongo_replica_set_client import Monitor
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
from pymongo.mongo_client import MongoClient, _partition_node
from pymongo.read_preferences import ReadPreference, modes
from pymongo.read_preferences import ReadPreference
from test import utils, version
from test.utils import one
@ -800,9 +800,8 @@ class TestReadPreference(HATestCase):
# Reading with a different mode unpinned, hooray!
break
else:
self.fail(
"Changing from mode %s to mode %s never unpinned" % (
modes[mode0], modes[mode1]))
self.fail("Changing from mode %r to mode "
"%r never unpinned" % (mode0, mode1))
# Now verify changing the tag_sets unpins the member.
tags0 = [{'a': 'a'}, {}]

View File

@ -25,8 +25,11 @@ sys.path[0:0] = [""]
from bson.son import SON
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
from pymongo.read_preferences import (ReadPreference, modes, MovingAverage,
secondary_ok_commands)
from pymongo.read_preferences import (ReadPreference, MovingAverage,
PrimaryPreferred,
Secondary, SecondaryPreferred,
Nearest, ServerMode,
SECONDARY_OK_COMMANDS)
from pymongo.errors import ConfigurationError
from test.test_replica_set_client import TestReplicaSetClientBase
@ -78,8 +81,7 @@ class TestReadPreferencesBase(TestReplicaSetClientBase):
class TestReadPreferences(TestReadPreferencesBase):
def test_mode_validation(self):
# 'modes' are imported from read_preferences.py
for mode in modes:
for mode in ReadPreference:
self.assertEqual(mode, self._get_client(
read_preference=mode).read_preference)
@ -88,33 +90,33 @@ class TestReadPreferences(TestReadPreferencesBase):
def test_tag_sets_validation(self):
# Can't use tags with PRIMARY
self.assertRaises(ConfigurationError, self._get_client,
tag_sets=[{'k': 'v'}])
self.assertRaises(ConfigurationError, ServerMode,
0, tag_sets=[{'k': 'v'}])
# ... but empty tag sets are ok with PRIMARY
self.assertEqual([{}], self._get_client(tag_sets=[{}]).tag_sets)
self.assertRaises(ConfigurationError, ServerMode,
0, tag_sets=[{}])
S = ReadPreference.SECONDARY
self.assertEqual([{}], self._get_client(read_preference=S).tag_sets)
S = Secondary([{}])
self.assertEqual([{}],
self._get_client(read_preference=S).read_preference.tag_sets)
self.assertEqual([{'k': 'v'}], self._get_client(
read_preference=S, tag_sets=[{'k': 'v'}]).tag_sets)
S = Secondary([{'k': 'v'}])
self.assertEqual([{'k': 'v'}],
self._get_client(read_preference=S).read_preference.tag_sets)
self.assertEqual([{'k': 'v'}, {}], self._get_client(
read_preference=S, tag_sets=[{'k': 'v'}, {}]).tag_sets)
S = Secondary([{'k': 'v'}, {}])
self.assertEqual([{'k': 'v'}, {}],
self._get_client(read_preference=S).read_preference.tag_sets)
self.assertRaises(ConfigurationError, self._get_client,
read_preference=S, tag_sets=[])
self.assertRaises(ConfigurationError, Secondary, tag_sets=[])
# One dict not ok, must be a list of dicts
self.assertRaises(ConfigurationError, self._get_client,
read_preference=S, tag_sets={'k': 'v'})
self.assertRaises(ConfigurationError, Secondary, tag_sets={'k': 'v'})
self.assertRaises(ConfigurationError, self._get_client,
read_preference=S, tag_sets='foo')
self.assertRaises(ConfigurationError, Secondary, tag_sets='foo')
self.assertRaises(ConfigurationError, self._get_client,
read_preference=S, tag_sets=['foo'])
self.assertRaises(ConfigurationError, Secondary, tag_sets=['foo'])
def test_latency_validation(self):
self.assertEqual(17, self._get_client(
@ -150,12 +152,6 @@ class TestReadPreferences(TestReadPreferencesBase):
self.assertReadsFrom('secondary',
read_preference=ReadPreference.SECONDARY_PREFERRED)
def test_secondary_only(self):
# Test deprecated mode SECONDARY_ONLY, which is now a synonym for
# SECONDARY
self.assertEqual(
ReadPreference.SECONDARY, ReadPreference.SECONDARY_ONLY)
def test_nearest(self):
# With high acceptableLatencyMS, expect to read from any
# member
@ -237,7 +233,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
def _test_fn(self, obedient, fn):
if not obedient:
for mode in modes:
for mode in ReadPreference:
self.c.read_preference = mode
# Run it a few times to make sure we don't just get lucky the
@ -511,24 +507,20 @@ class TestMongosConnection(unittest.TestCase):
'$readPreference' in cursor._Cursor__query_spec())
# Copy these constants for brevity
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
SECONDARY = ReadPreference.SECONDARY
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
NEAREST = ReadPreference.NEAREST
SLAVE_OKAY = _QUERY_OPTIONS['slave_okay']
# Test non-PRIMARY modes which can be combined with tags
for kwarg, value, mongos_mode in (
('read_preference', PRIMARY_PREFERRED, 'primaryPreferred'),
('read_preference', SECONDARY, 'secondary'),
('read_preference', SECONDARY_PREFERRED, 'secondaryPreferred'),
('read_preference', NEAREST, 'nearest'),
for mode, mongos_mode in (
(PrimaryPreferred, 'primaryPreferred'),
(Secondary, 'secondary'),
(SecondaryPreferred, 'secondaryPreferred'),
(Nearest, 'nearest'),
):
for tag_sets in (
None, [{}]
):
# Create a client e.g. with read_preference=NEAREST
c = get_client(tag_sets=tag_sets, **{kwarg: value})
c = get_client(read_preference=mode(tag_sets))
self.assertEqual(is_mongos, c.is_mongos)
cursor = c.pymongo_test.test.find()
@ -567,7 +559,7 @@ class TestMongosConnection(unittest.TestCase):
[{'dc': 'la'}, {'dc': 'sf'}],
[{'dc': 'la'}, {'dc': 'sf'}, {}],
):
c = get_client(tag_sets=tag_sets, **{kwarg: value})
c = get_client(read_preference=mode(tag_sets))
self.assertEqual(is_mongos, c.is_mongos)
cursor = c.pymongo_test.test.find()
@ -586,7 +578,7 @@ class TestMongosConnection(unittest.TestCase):
raise SkipTest("Only mongos have read_prefs added to the spec")
# Ensure secondary_ok_commands have readPreference
for cmd in secondary_ok_commands:
for cmd in SECONDARY_OK_COMMANDS:
if cmd == 'mapreduce': # map reduce is a special case
continue
command = SON([(cmd, 1)])

View File

@ -16,7 +16,6 @@
# TODO: anywhere we wait for refresh in tests, consider just refreshing w/ sync
import copy
import datetime
import signal
import socket
@ -34,8 +33,7 @@ from nose.plugins.skip import SkipTest
from bson.son import SON
from bson.tz_util import utc
from pymongo.mongo_client import MongoClient
from pymongo.read_preferences import ReadPreference
from pymongo.member import PRIMARY, SECONDARY, OTHER
from pymongo.read_preferences import ReadPreference, Secondary, Nearest
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
from pymongo.mongo_replica_set_client import _partition_node, have_gevent
from pymongo.database import Database
@ -240,20 +238,18 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
# Make sure MRSC's properties are copied to Database and Collection
for obj in c, c.pymongo_test, c.pymongo_test.test:
self.assertEqual(obj.read_preference, ReadPreference.PRIMARY)
self.assertEqual(obj.tag_sets, [{}])
self.assertEqual(obj.write_concern, {})
cursor = c.pymongo_test.test.find()
self.assertEqual(
ReadPreference.PRIMARY, cursor._Cursor__read_preference)
self.assertEqual([{}], cursor._Cursor__tag_sets)
c.close()
tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}]
secondary = Secondary(tag_sets)
c = MongoReplicaSetClient(pair, replicaSet=self.name, max_pool_size=25,
document_class=SON, tz_aware=True,
read_preference=ReadPreference.SECONDARY,
tag_sets=copy.deepcopy(tag_sets),
read_preference=secondary,
acceptablelatencyms=77)
c.admin.command('ping')
self.assertEqual(c.primary, self.primary)
@ -264,21 +260,17 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
self.assertEqual(c.tz_aware, True)
for obj in c, c.pymongo_test, c.pymongo_test.test:
self.assertEqual(obj.read_preference, ReadPreference.SECONDARY)
self.assertEqual(obj.tag_sets, tag_sets)
self.assertEqual(obj.read_preference, secondary)
cursor = c.pymongo_test.test.find()
self.assertEqual(
ReadPreference.SECONDARY, cursor._Cursor__read_preference)
self.assertEqual(tag_sets, cursor._Cursor__tag_sets)
secondary, cursor._Cursor__read_preference)
cursor = c.pymongo_test.test.find(
read_preference=ReadPreference.NEAREST,
tag_sets=[{'dc':'ny'}, {}])
nearest = Nearest([{'dc': 'ny'}, {}])
cursor = c.pymongo_test.test.find(read_preference=nearest)
self.assertEqual(
ReadPreference.NEAREST, cursor._Cursor__read_preference)
self.assertEqual([{'dc':'ny'}, {}], cursor._Cursor__tag_sets)
nearest, cursor._Cursor__read_preference)
if version.at_least(c, (1, 7, 4)):
self.assertEqual(c.max_bson_size, 16777216)
@ -704,12 +696,12 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
ConnectionFailure,
collection.find_one,
{'$where': delay(5)},
read_preference=SECONDARY)
read_preference=ReadPreference.SECONDARY)
rs_state = c._MongoReplicaSetClient__rs_state
secondary_host = one(rs_state.secondaries)
self.assertTrue(rs_state.get(secondary_host))
collection.find_one(read_preference=SECONDARY) # No error.
collection.find_one(read_preference=ReadPreference.SECONDARY) # No error.
def test_waitQueueTimeoutMS(self):
client = self._get_client(waitQueueTimeoutMS=2000)

View File

@ -267,7 +267,7 @@ class TestURI(unittest.TestCase):
"test.yield_historical.in"))
res = copy.deepcopy(orig)
res['options'] = {'readpreference': ReadPreference.SECONDARY}
res['options'] = {'readpreference': ReadPreference.SECONDARY.mode}
self.assertEqual(res,
parse_uri("mongodb://localhost/?readPreference=secondary"))
@ -324,7 +324,7 @@ class TestURI(unittest.TestCase):
"@localhost/foo?authMechanism=GSSAPI"))
res = copy.deepcopy(orig)
res['options'] = {'readpreference': ReadPreference.SECONDARY,
res['options'] = {'readpreference': ReadPreference.SECONDARY.mode,
'readpreferencetags': [
{'dc': 'west', 'use': 'website'},
{'dc': 'east', 'use': 'website'}]}
@ -338,7 +338,7 @@ class TestURI(unittest.TestCase):
"readpreferencetags=dc:east,use:website"))
res = copy.deepcopy(orig)
res['options'] = {'readpreference': ReadPreference.SECONDARY,
res['options'] = {'readpreference': ReadPreference.SECONDARY.mode,
'readpreferencetags': [
{'dc': 'west', 'use': 'website'},
{'dc': 'east', 'use': 'website'},