diff --git a/doc/api/pymongo/collection.rst b/doc/api/pymongo/collection.rst index 03e770556..81c17bfc2 100644 --- a/doc/api/pymongo/collection.rst +++ b/doc/api/pymongo/collection.rst @@ -24,7 +24,6 @@ .. autoattribute:: name .. autoattribute:: database .. autoattribute:: read_preference - .. autoattribute:: tag_sets .. autoattribute:: write_concern .. autoattribute:: uuid_subtype .. automethod:: insert(doc_or_docs[, manipulate=True[, check_keys=True[, continue_on_error=False[, **kwargs]]]]) @@ -34,7 +33,7 @@ .. automethod:: initialize_unordered_bulk_op .. automethod:: initialize_ordered_bulk_op .. automethod:: drop - .. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, exhaust=False, [compile_re=True, [,**kwargs]]]]]]]]]]]]]]]]]) + .. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, await_data=False[, partial=False[, manipulate=True[, read_preference=None[, exhaust=False[, compile_re=True]]]]]]]]]]]]]]]]) .. automethod:: find_one([spec_or_id=None[, *args[, **kwargs]]]) .. automethod:: parallel_scan .. automethod:: count diff --git a/doc/api/pymongo/cursor.rst b/doc/api/pymongo/cursor.rst index f86c5296c..ea3aca50f 100644 --- a/doc/api/pymongo/cursor.rst +++ b/doc/api/pymongo/cursor.rst @@ -4,7 +4,7 @@ .. automodule:: pymongo.cursor :synopsis: Tools for iterating over MongoDB query results - .. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=ReadPreference.PRIMARY, tag_sets=[{}], exhaust=False) + .. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=None, exhaust=False, compile_re=True) :members: .. describe:: c[index] diff --git a/doc/api/pymongo/database.rst b/doc/api/pymongo/database.rst index 99336ac38..9cb0b94b3 100644 --- a/doc/api/pymongo/database.rst +++ b/doc/api/pymongo/database.rst @@ -24,7 +24,6 @@ attribute of the :class:`Database` class eg: db[`collection_name`]. .. autoattribute:: read_preference - .. autoattribute:: tag_sets .. autoattribute:: write_concern .. autoattribute:: uuid_subtype diff --git a/doc/api/pymongo/index.rst b/doc/api/pymongo/index.rst index 5106b4ea1..ced76518a 100644 --- a/doc/api/pymongo/index.rst +++ b/doc/api/pymongo/index.rst @@ -13,7 +13,10 @@ Alias for :class:`pymongo.mongo_replica_set_client.MongoReplicaSetClient`. - .. autoclass:: pymongo.read_preferences.ReadPreference + .. data:: ReadPreference + + Alias for :class:`pymongo.read_preferences.ReadPreference`. + .. autofunction:: has_c .. data:: MIN_SUPPORTED_WIRE_VERSION @@ -38,6 +41,7 @@ Sub-modules: mongo_client mongo_replica_set_client pool + read_preferences son_manipulator cursor_manager uri_parser diff --git a/doc/api/pymongo/mongo_client.rst b/doc/api/pymongo/mongo_client.rst index c06572f02..a420ae340 100644 --- a/doc/api/pymongo/mongo_client.rst +++ b/doc/api/pymongo/mongo_client.rst @@ -30,7 +30,6 @@ .. autoattribute:: min_wire_version .. autoattribute:: max_wire_version .. autoattribute:: read_preference - .. autoattribute:: tag_sets .. autoattribute:: acceptable_latency_ms .. autoattribute:: write_concern .. autoattribute:: uuid_subtype diff --git a/doc/api/pymongo/mongo_replica_set_client.rst b/doc/api/pymongo/mongo_replica_set_client.rst index c917db243..be8e9dbc5 100644 --- a/doc/api/pymongo/mongo_replica_set_client.rst +++ b/doc/api/pymongo/mongo_replica_set_client.rst @@ -31,7 +31,6 @@ .. autoattribute:: max_wire_version .. autoattribute:: auto_start_request .. autoattribute:: read_preference - .. autoattribute:: tag_sets .. autoattribute:: acceptable_latency_ms .. autoattribute:: write_concern .. autoattribute:: uuid_subtype diff --git a/doc/api/pymongo/read_preferences.rst b/doc/api/pymongo/read_preferences.rst new file mode 100644 index 000000000..0f9fd159f --- /dev/null +++ b/doc/api/pymongo/read_preferences.rst @@ -0,0 +1,20 @@ +:mod:`read_preferences` -- Utilities for choosing which member of a replica set to read from. +============================================================================================= + +.. automodule:: pymongo.read_preferences + :synopsis: Utilities for choosing which member of a replica set to read from. + + .. autoclass:: pymongo.read_preferences.Primary + :inherited-members: + .. autoclass:: pymongo.read_preferences.PrimaryPreferred + :inherited-members: + .. autoclass:: pymongo.read_preferences.Secondary + :inherited-members: + .. autoclass:: pymongo.read_preferences.SecondaryPreferred + :inherited-members: + .. autoclass:: pymongo.read_preferences.Nearest + :inherited-members: + + .. autodata:: ReadPreference + .. autodata:: SECONDARY_OK_COMMANDS + diff --git a/doc/examples/high_availability.rst b/doc/examples/high_availability.rst index f7240d3b7..0218d20ee 100644 --- a/doc/examples/high_availability.rst +++ b/doc/examples/high_availability.rst @@ -228,19 +228,20 @@ and **acceptableLatencyMS**. Replica-set members can be `tagged `_ according to any criteria you choose. By default, MongoReplicaSetClient ignores tags when -choosing a member to read from, but it can be configured with the ``tag_sets`` -parameter. ``tag_sets`` must be a list of dictionaries, each dict providing tag -values that the replica set member must match. MongoReplicaSetClient tries each -set of tags in turn until it finds a set of tags with at least one matching -member. For example, to prefer reads from the New York data center, but fall -back to the San Francisco data center, tag your replica set members according -to their location and create a MongoReplicaSetClient like so: +choosing a member to read from, but your read preference can be configured with +a ``tag_sets`` parameter. ``tag_sets`` must be a list of dictionaries, each +dict providing tag values that the replica set member must match. +MongoReplicaSetClient tries each set of tags in turn until it finds a set of +tags with at least one matching member. For example, to prefer reads from the +New York data center, but fall back to the San Francisco data center, tag your +replica set members according to their location and create a +MongoReplicaSetClient like so: + >>> from pymongo.read_preferences import Secondary >>> rsc = MongoReplicaSetClient( ... "morton.local:27017", ... replicaSet='foo' - ... read_preference=ReadPreference.SECONDARY, - ... tag_sets=[{'dc': 'ny'}, {'dc': 'sf'}] + ... read_preference=Secondary(tag_sets=[{'dc': 'ny'}, {'dc': 'sf'}]) ... ) MongoReplicaSetClient tries to find secondaries in New York, then San Francisco, @@ -248,6 +249,8 @@ and raises :class:`~pymongo.errors.AutoReconnect` if none are available. As an additional fallback, specify a final, empty tag set, ``{}``, which means "read from any member that matches the mode, ignoring tags." +See :mod:`~pymongo.read_preferences` for more information. + **acceptableLatencyMS**: If multiple members match the mode and tag sets, MongoReplicaSetClient reads diff --git a/gridfs/__init__.py b/gridfs/__init__.py index c9a15d7eb..2d915359a 100644 --- a/gridfs/__init__.py +++ b/gridfs/__init__.py @@ -324,7 +324,6 @@ class GridFS(object): examined when performing the query - `read_preference` (optional): The read preference for this query. - - `tag_sets` (optional): The tag sets for this query. - `compile_re` (optional): if ``False``, don't attempt to compile BSON regex objects into Python regexes. Return instances of :class:`~bson.regex.Regex` instead. diff --git a/gridfs/grid_file.py b/gridfs/grid_file.py index 7f45f478e..48482322e 100644 --- a/gridfs/grid_file.py +++ b/gridfs/grid_file.py @@ -625,7 +625,7 @@ class GridOutCursor(Cursor): """ def __init__(self, collection, spec=None, skip=0, limit=0, timeout=True, sort=None, max_scan=None, - read_preference=None, tag_sets=None, compile_re=True): + read_preference=None, compile_re=True): """Create a new cursor, similar to the normal :class:`~pymongo.cursor.Cursor`. @@ -641,12 +641,11 @@ class GridOutCursor(Cursor): # Copy these settings from collection if they are not set by caller. read_preference = read_preference or collection.files.read_preference - tag_sets = tag_sets or collection.files.tag_sets super(GridOutCursor, self).__init__( collection.files, spec, skip=skip, limit=limit, timeout=timeout, sort=sort, max_scan=max_scan, read_preference=read_preference, - compile_re=compile_re, tag_sets=tag_sets) + compile_re=compile_re) def next(self): """Get next GridOut object from cursor. diff --git a/pymongo/collection.py b/pymongo/collection.py index 7d4e4bb40..dd858c269 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -87,7 +87,6 @@ class Collection(common.BaseObject): """ super(Collection, self).__init__( read_preference=database.read_preference, - tag_sets=database.tag_sets, uuidrepresentation=database.uuid_subtype, **database.write_concern) @@ -781,7 +780,6 @@ class Collection(common.BaseObject): outgoing SON manipulators before returning. - `read_preference` (optional): The read preference for this query. - - `tag_sets` (optional): The tag sets for this query. - `compile_re` (optional): if ``False``, don't attempt to compile BSON regex objects into Python regexes. Return instances of :class:`~bson.regex.Regex` instead. @@ -816,7 +814,7 @@ class Collection(common.BaseObject): version **>= 1.5.1** .. versionchanged:: 3.0 - Removed the `network_timeout` and + Removed the `network_timeout`, `tag_sets`, and `secondary_acceptable_latency_ms` parameters. .. versionadded:: 2.7 @@ -843,13 +841,9 @@ class Collection(common.BaseObject): .. mongodoc:: find """ - if not 'read_preference' in kwargs: - kwargs['read_preference'] = self.read_preference - if not 'tag_sets' in kwargs: - kwargs['tag_sets'] = self.tag_sets return Cursor(self, *args, **kwargs) - def parallel_scan(self, num_cursors, **kwargs): + def parallel_scan(self, num_cursors, read_preference=None, **kwargs): """Scan this entire collection in parallel. Returns a list of up to ``num_cursors`` cursors that can be iterated @@ -887,21 +881,20 @@ class Collection(common.BaseObject): :Parameters: - `num_cursors`: the number of cursors to return + - `read_preference`: the read preference to use for this scan .. note:: Requires server version **>= 2.5.5**. """ compile_re = kwargs.get('compile_re', False) - command_kwargs = { - 'numCursors': num_cursors, - 'read_preference': self.read_preference, - 'tag_sets': self.tag_sets, - } - command_kwargs.update(kwargs) + cmd = SON([('parallelCollectionScan', self.__name), + ('numCursors', num_cursors)]) - result, conn_id = self.__database._command( - "parallelCollectionScan", self.__name, **command_kwargs) + mode = read_preference or self.read_preference + result, conn_id = self.__database._command(cmd, + read_preference=mode, + **kwargs) return [CommandCursor(self, cursor['cursor'], @@ -1230,7 +1223,7 @@ class Collection(common.BaseObject): return options - def aggregate(self, pipeline, **kwargs): + def aggregate(self, pipeline, read_preference=None, **kwargs): """Perform an aggregation using the aggregation framework on this collection. @@ -1242,6 +1235,7 @@ class Collection(common.BaseObject): :Parameters: - `pipeline`: a single command or list of aggregation commands + - `read_preference`: read preference to use for this aggregate - `**kwargs`: send arbitrary parameters to the aggregate command .. note:: Requires server version **>= 2.1.0**. @@ -1272,28 +1266,29 @@ class Collection(common.BaseObject): if isinstance(pipeline, dict): pipeline = [pipeline] - command_kwargs = { - 'pipeline': pipeline, - 'read_preference': self.read_preference, - 'tag_sets': self.tag_sets, - } + cmd = SON([("aggregate", self.__name), + ("pipeline", pipeline)]) - command_kwargs.update(kwargs) + compile_re = kwargs.get('compile_re', True) + + mode = read_preference or self.read_preference result, conn_id = self.__database._command( - "aggregate", self.__name, **command_kwargs) + cmd, uuid_subtype=self.uuid_subtype, + read_preference=mode, **kwargs) if 'cursor' in result: return CommandCursor( self, result['cursor'], conn_id, - command_kwargs.get('compile_re', True)) + compile_re) else: return result # TODO key and condition ought to be optional, but deprecation # could be painful as argument order would have to change. - def group(self, key, condition, initial, reduce, finalize=None, **kwargs): + def group(self, key, condition, initial, + reduce, finalize=None, read_preference=None, **kwargs): """Perform a query similar to an SQL *group by* operation. Returns an array of grouped items. @@ -1321,6 +1316,7 @@ class Collection(common.BaseObject): - `initial`: initial value of the aggregation counter object - `reduce`: aggregation function as a JavaScript string - `finalize`: function to be called on each object in output list. + - `read_preference`: read preference to use for this group .. versionchanged:: 2.2 Removed deprecated argument: command @@ -1345,10 +1341,10 @@ class Collection(common.BaseObject): if finalize is not None: group["finalize"] = Code(finalize) + mode = read_preference or self.read_preference return self.__database.command("group", group, uuid_subtype=self.uuid_subtype, - read_preference=self.read_preference, - tag_sets=self.tag_sets, + read_preference=mode, **kwargs)["retval"] def rename(self, new_name, **kwargs): @@ -1404,7 +1400,8 @@ class Collection(common.BaseObject): """ return self.find().distinct(key) - def map_reduce(self, map, reduce, out, full_response=False, **kwargs): + def map_reduce(self, map, reduce, out, + full_response=False, read_preference=None, **kwargs): """Perform a map/reduce operation on this collection. If `full_response` is ``False`` (default) returns a @@ -1422,6 +1419,7 @@ class Collection(common.BaseObject): e.g. SON([('replace', ), ('db', )]) - `full_response` (optional): if ``True``, return full response to this command - otherwise just return the result collection + - `read_preference`: read preference to use for this map reduce - `**kwargs` (optional): additional arguments to the `map reduce command`_ may be passed as keyword arguments to this helper method, e.g.:: @@ -1448,11 +1446,11 @@ class Collection(common.BaseObject): raise TypeError("'out' must be an instance of " "%s or dict" % (basestring.__name__,)) + mode = read_preference or self.read_preference response = self.__database.command("mapreduce", self.__name, uuid_subtype=self.uuid_subtype, + read_preference=mode, map=map, reduce=reduce, - read_preference=self.read_preference, - tag_sets=self.tag_sets, out=out, **kwargs) if full_response or not response.get('result'): @@ -1464,7 +1462,8 @@ class Collection(common.BaseObject): else: return self.__database[response["result"]] - def inline_map_reduce(self, map, reduce, full_response=False, **kwargs): + def inline_map_reduce(self, map, reduce, + full_response=False, read_preference=None, **kwargs): """Perform an inline map/reduce operation on this collection. Perform the map/reduce operation on the server in RAM. A result @@ -1486,6 +1485,7 @@ class Collection(common.BaseObject): - `reduce`: reduce function (as a JavaScript string) - `full_response` (optional): if ``True``, return full response to this command - otherwise just return the result collection + - `read_preference`: read preference to use for this map reduce - `**kwargs` (optional): additional arguments to the `map reduce command`_ may be passed as keyword arguments to this helper method, e.g.:: @@ -1497,10 +1497,10 @@ class Collection(common.BaseObject): .. versionadded:: 1.10 """ + mode = read_preference or self.read_preference res = self.__database.command("mapreduce", self.__name, uuid_subtype=self.uuid_subtype, - read_preference=self.read_preference, - tag_sets=self.tag_sets, + read_preference=mode, map=map, reduce=reduce, out={"inline": 1}, **kwargs) diff --git a/pymongo/common.py b/pymongo/common.py index 90777f321..554606859 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -15,11 +15,9 @@ """Functions and classes common to multiple pymongo modules.""" import sys -import warnings from pymongo import read_preferences from pymongo.auth import MECHANISMS -from pymongo.read_preferences import ReadPreference from pymongo.errors import ConfigurationError from bson.binary import (OLD_UUID_SUBTYPE, UUID_SUBTYPE, JAVA_LEGACY, CSHARP_LEGACY) @@ -183,40 +181,23 @@ def validate_timeout_or_none(option, value): def validate_read_preference(dummy, value): - """Validate read preference for a MongoReplicaSetClient. + """Validate a read preference. """ - if value in read_preferences.modes: - return value + if not isinstance(value, read_preferences.ServerMode): + raise ConfigurationError("%r is not a " + "valid read preference." % (value,)) + return value - # Also allow string form of enum for uri_parser + +def validate_read_preference_mode(dummy, name): + """Validate read preference mode for a MongoReplicaSetClient. + """ try: - return read_preferences.mongos_enum(value) + return read_preferences.read_pref_mode_from_name(name) except ValueError: raise ConfigurationError("Not a valid read preference") -def validate_tag_sets(dummy, value): - """Validate tag sets for a MongoReplicaSetClient. - """ - if value is None: - return [{}] - - if not isinstance(value, list): - raise ConfigurationError(( - "Tag sets %s invalid, must be a list") % repr(value)) - if len(value) == 0: - raise ConfigurationError(( - "Tag sets %s invalid, must be None or contain at least one set of" - " tags") % repr(value)) - - for tags in value: - if not isinstance(tags, dict): - raise ConfigurationError( - "Tag set %s invalid, must be a dict" % repr(tags)) - - return value - - def validate_auth_mechanism(option, value): """Validate the authMechanism URI option. """ @@ -247,9 +228,26 @@ def validate_uuid_subtype(dummy, value): return value +def validate_read_preference_tags(name, value): + """Parse readPreferenceTags if passed as a client kwarg. + """ + # Parsed in uri_parser.parse_uri + if isinstance(value, list): + return value + + tags = {} + try: + for tag in value.split(","): + key, val = tag.split(":") + tags[key] = val + except Exception: + raise ConfigurationError("%r not a valid value for %s" % (value, name)) + return [tags] + + + # jounal is an alias for j, # wtimeoutms is an alias for wtimeout, -# readpreferencetags is an alias for tag_sets. VALIDATORS = { 'replicaset': validate_basestring, 'w': validate_int_or_basestring, @@ -267,10 +265,9 @@ VALIDATORS = { 'ssl_certfile': validate_readable, 'ssl_cert_reqs': validate_cert_reqs, 'ssl_ca_certs': validate_readable, - 'readpreference': validate_read_preference, 'read_preference': validate_read_preference, - 'readpreferencetags': validate_tag_sets, - 'tag_sets': validate_tag_sets, + 'readpreference': validate_read_preference_mode, + 'readpreferencetags': validate_read_preference_tags, 'acceptablelatencyms': validate_positive_float, 'auto_start_request': validate_boolean, 'use_greenlets': validate_boolean, @@ -338,15 +335,10 @@ class BaseObject(object): def __init__(self, **options): - self.__read_pref = ReadPreference.PRIMARY - self.__tag_sets = [{}] + self.__read_pref = read_preferences.ReadPreference.PRIMARY self.__uuid_subtype = OLD_UUID_SUBTYPE self.__write_concern = WriteConcern() self.__set_options(options) - if (self.__read_pref == ReadPreference.PRIMARY - and self.__tag_sets != [{}]): - raise ConfigurationError( - "ReadPreference PRIMARY cannot be combined with tags") def __set_write_concern_option(self, option, value): """Validates and sets getlasterror options for this @@ -360,10 +352,16 @@ class BaseObject(object): def __set_options(self, options): """Validates and sets all options passed to this object.""" for option, value in options.iteritems(): - if option in ('read_preference', "readpreference"): + if option == 'read_preference': self.__read_pref = validate_read_preference(option, value) - elif option in ('tag_sets', 'readpreferencetags'): - self.__tag_sets = validate_tag_sets(option, value) + elif option == 'readpreference': + klass = read_preferences.read_pref_class_from_mode(value) + if value == 0: + # Primary, no tags + self.__read_pref = klass() + continue + tags = options.get('readpreferencetags', None) + self.__read_pref = klass(tags) elif option == 'uuidrepresentation': self.__uuid_subtype = validate_uuid_subtype(option, value) elif option in WRITE_CONCERN_OPTIONS: @@ -453,32 +451,10 @@ class BaseObject(object): def __set_read_pref(self, value): """Property setter for read_preference""" - self.__read_pref = validate_read_preference('read_preference', value) + self.__read_pref = validate_read_preference(None, value) read_preference = property(__get_read_pref, __set_read_pref) - def __get_tag_sets(self): - """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to - read only from members whose ``dc`` tag has the value ``"ny"``. - To specify a priority-order for tag sets, provide a list of - tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag - set, ``{}``, means "read from any member that matches the mode, - ignoring tags." MongoReplicaSetClient tries each set of tags in turn - until it finds a set of tags with at least one matching member. - - .. seealso:: `Data-Center Awareness - `_ - - .. versionadded:: 2.3 - """ - return self.__tag_sets - - def __set_tag_sets(self, value): - """Property setter for tag_sets""" - self.__tag_sets = validate_tag_sets('tag_sets', value) - - tag_sets = property(__get_tag_sets, __set_tag_sets) - def __get_uuid_subtype(self): """This attribute specifies which BSON Binary subtype is used when storing UUIDs. Historically UUIDs have been stored as BSON Binary diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 19036b1c3..b6c73e1c9 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -20,7 +20,7 @@ from bson import RE_TYPE from bson.code import Code from bson.son import SON from pymongo import helpers, message, read_preferences -from pymongo.read_preferences import ReadPreference, secondary_ok_commands +from pymongo.read_preferences import ReadPreference, SECONDARY_OK_COMMANDS from pymongo.errors import (AutoReconnect, CursorNotFound, InvalidOperation) @@ -68,8 +68,7 @@ class Cursor(object): timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, - read_preference=ReadPreference.PRIMARY, - tag_sets=[{}], exhaust=False, compile_re=True, + read_preference=None, exhaust=False, compile_re=True, _uuid_subtype=None): """Create a new cursor. @@ -146,8 +145,7 @@ class Cursor(object): self.__comment = None self.__as_class = as_class self.__manipulate = manipulate - self.__read_preference = read_preference - self.__tag_sets = tag_sets + self.__read_preference = read_preference or collection.read_preference self.__tz_aware = collection.database.connection.tz_aware self.__compile_re = compile_re self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype @@ -160,7 +158,7 @@ class Cursor(object): self.__query_flags = 0 if tailable: self.__query_flags |= _QUERY_OPTIONS["tailable_cursor"] - if read_preference != ReadPreference.PRIMARY: + if self.__read_preference != ReadPreference.PRIMARY: self.__query_flags |= _QUERY_OPTIONS["slave_okay"] if not timeout: self.__query_flags |= _QUERY_OPTIONS["no_timeout"] @@ -233,7 +231,7 @@ class Cursor(object): "comment", "max", "min", "snapshot", "ordering", "explain", "hint", "batch_size", "max_scan", "as_class", - "manipulate", "read_preference", "tag_sets", + "manipulate", "read_preference", "uuid_subtype", "compile_re", "query_flags") data = dict((k, v) for k, v in self.__dict__.iteritems() if k.startswith('_Cursor__') and k[9:] in values_to_clone) @@ -301,22 +299,15 @@ class Cursor(object): if (self.__collection.database.connection.is_mongos and self.__read_preference != ReadPreference.PRIMARY): - has_tags = self.__tag_sets and self.__tag_sets != [{}] - # For maximum backwards compatibility, don't set $readPreference # for SECONDARY_PREFERRED unless tags are in use. Just rely on # the slaveOkay bit (set automatically if read preference is not # PRIMARY), which has the same behavior. - if (self.__read_preference != ReadPreference.SECONDARY_PREFERRED or - has_tags): - - read_pref = { - 'mode': read_preferences.mongos_mode(self.__read_preference) - } - if has_tags: - read_pref['tags'] = self.__tag_sets - - operators['$readPreference'] = read_pref + mode = self.__read_preference.mode + tag_sets = self.__read_preference.tag_sets + if (mode != ReadPreference.SECONDARY_PREFERRED.mode or + tag_sets != [{}]): + operators['$readPreference'] = self.__read_preference.document if operators: # Make a shallow copy so we can cleanly rewind or clone. @@ -328,7 +319,7 @@ class Cursor(object): if self.collection.name == "$cmd": # Don't change commands that can't be sent to secondaries command_name = spec and spec.keys()[0].lower() or "" - if command_name not in secondary_ok_commands: + if command_name not in SECONDARY_OK_COMMANDS: return spec elif command_name == 'mapreduce': # mapreduce shouldn't be changed if its not inline @@ -691,8 +682,6 @@ class Cursor(object): command = { "query": self.__spec, "fields": self.__fields, - "read_preference": self.__read_preference, - "tag_sets": self.__tag_sets, } if self.__max_time_ms is not None: command["maxTimeMS"] = self.__max_time_ms @@ -710,6 +699,7 @@ class Cursor(object): allowable_errors=["ns missing"], uuid_subtype=self.__uuid_subtype, compile_re=self.__compile_re, + read_preference=self.__read_preference, **command) if r.get("errmsg", "") == "ns missing": return 0 @@ -744,9 +734,6 @@ class Cursor(object): options = {"key": key} if self.__spec: options["query"] = self.__spec - - options['read_preference'] = self.__read_preference - options['tag_sets'] = self.__tag_sets if self.__max_time_ms is not None: options['maxTimeMS'] = self.__max_time_ms if self.__comment: @@ -757,6 +744,7 @@ class Cursor(object): self.__collection.name, uuid_subtype=self.__uuid_subtype, compile_re=self.__compile_re, + read_preference=self.__read_preference, **options)["values"] def explain(self): @@ -854,7 +842,6 @@ class Cursor(object): if message: kwargs = { "read_preference": self.__read_preference, - "tag_sets": self.__tag_sets, "exhaust": self.__exhaust, } if self.__connection_id is not None: diff --git a/pymongo/database.py b/pymongo/database.py index a83081086..a43010076 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -27,7 +27,7 @@ from pymongo.errors import (CollectionInvalid, InvalidName, OperationFailure) from pymongo.read_preferences import (ReadPreference, - modes, secondary_ok_commands) + SECONDARY_OK_COMMANDS) def _check_name(name): @@ -62,7 +62,6 @@ class Database(common.BaseObject): """ super(Database, self).__init__(read_preference=connection.read_preference, - tag_sets=connection.tag_sets, uuidrepresentation=connection.uuid_subtype, **connection.write_concern) @@ -270,7 +269,8 @@ class Database(common.BaseObject): def _command(self, command, value=1, check=True, allowable_errors=None, - uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, **kwargs): + uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, + read_preference=None, **kwargs): """Internal command helper. """ @@ -280,22 +280,25 @@ class Database(common.BaseObject): else: command_name = command.keys()[0].lower() - orig = mode = kwargs.pop('read_preference', self.read_preference) - tags = kwargs.pop('tag_sets', self.tag_sets) as_class = kwargs.pop('as_class', None) + fields = kwargs.pop('fields', None) + if fields is not None and not isinstance(fields, dict): + fields = helpers._fields_list_to_dict(fields) + command.update(kwargs) - if command_name not in secondary_ok_commands: + orig = mode = read_preference or self.read_preference + if command_name not in SECONDARY_OK_COMMANDS: mode = ReadPreference.PRIMARY # Special-case: mapreduce can go to secondaries only if inline elif command_name == 'mapreduce': - out = command.get('out') or kwargs.get('out') + out = command.get('out') if not isinstance(out, dict) or not out.get('inline'): mode = ReadPreference.PRIMARY # Special-case: aggregate with $out cannot go to secondaries. elif command_name == 'aggregate': - for stage in kwargs.get('pipeline', []): + for stage in command.get('pipeline', []): if '$out' in stage: mode = ReadPreference.PRIMARY break @@ -304,21 +307,14 @@ class Database(common.BaseObject): if mode != orig: warnings.warn("%s does not support %s read preference " "and will be routed to the primary instead." % - (command_name, modes[orig]), UserWarning) - tags = [{}] + (command_name, orig.name), UserWarning) - fields = kwargs.pop('fields', None) - if fields is not None and not isinstance(fields, dict): - fields = helpers._fields_list_to_dict(fields) - - command.update(kwargs) cursor = self["$cmd"].find(command, fields=fields, limit=-1, as_class=as_class, read_preference=mode, - tag_sets=tags, compile_re=compile_re, _uuid_subtype=uuid_subtype) for doc in cursor: @@ -333,7 +329,8 @@ class Database(common.BaseObject): def command(self, command, value=1, check=True, allowable_errors=[], - uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, **kwargs): + uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, + read_preference=None, **kwargs): """Issue a MongoDB command. Send command `command` to the database and return the @@ -384,21 +381,13 @@ class Database(common.BaseObject): :exc:`~bson.errors.InvalidBSON` errors when receiving Python-incompatible regular expressions, for example from ``currentOp`` - - `read_preference`: The read preference for this connection. - See :class:`~pymongo.read_preferences.ReadPreference` for available - options. - - `tag_sets`: Read from replica-set members with these tags. - To specify a priority-order for tag sets, provide a list of - tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag - set, ``{}``, means "read from any member that matches the mode, - ignoring tags." MongoReplicaSetClient tries each set of tags in - turn until it finds a set of tags with at least one matching - member. + - `read_preference`: The read preference for this operation. - `**kwargs` (optional): additional keyword arguments will be added to the command document before it is sent .. versionchanged:: 3.0 - Removed the `secondary_acceptable_latency_ms` option. + Removed the `tag_sets` and `secondary_acceptable_latency_ms` + options. .. versionchanged:: 2.7 Added ``compile_re`` option. .. versionchanged:: 2.3 @@ -416,7 +405,8 @@ class Database(common.BaseObject): .. mongodoc:: commands """ return self._command(command, value, check, allowable_errors, - uuid_subtype, compile_re, **kwargs)[0] + uuid_subtype, compile_re, + read_preference, **kwargs)[0] def collection_names(self, include_system_collections=True): """Get a list of all the collection names in this database. diff --git a/pymongo/member.py b/pymongo/member.py index dbdafdf0f..5a360152c 100644 --- a/pymongo/member.py +++ b/pymongo/member.py @@ -114,10 +114,10 @@ class Member(object): assert not self.is_mongos, \ "Tried to match read preference mode on a mongos Member" - if mode == ReadPreference.PRIMARY and not self.is_primary: + if mode == ReadPreference.PRIMARY.mode and not self.is_primary: return False - if mode == ReadPreference.SECONDARY and not self.is_secondary: + if mode == ReadPreference.SECONDARY.mode and not self.is_secondary: return False # If we're not primary or secondary, then we're in a state like diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 468e86fbe..a5b5cc45c 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -191,12 +191,7 @@ class MongoClient(common.BaseObject): :class:`~pymongo.errors.AutoReconnect` "not master". See :class:`~pymongo.read_preferences.ReadPreference` for all available read preference options. - - `tag_sets`: Ignored unless connecting to a replica set via mongos. - Specify a priority-order for tag sets, provide a list of - tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag - set, ``{}``, means "read from any member that matches the mode, - ignoring tags. - - `acceptable_latency_ms`: (integer) When used with mongos + - `acceptableLatencyMS`: (integer) When used with mongos high availability, any mongos whose ping time is within acceptable_latency_ms of the nearest member may be chosen as the new primary during a failover. Default 15 milliseconds. diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index c109bf1d7..a8f28aac6 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -37,7 +37,6 @@ import socket import struct import threading import time -import warnings import weakref from bson.py3compat import b @@ -51,7 +50,7 @@ from pymongo import (auth, uri_parser) from pymongo.member import Member from pymongo.read_preferences import ( - ReadPreference, select_member, modes, MovingAverage) + ReadPreference, select_member, MovingAverage) from pymongo.errors import (AutoReconnect, ConfigurationError, ConnectionFailure, @@ -262,21 +261,21 @@ class RSState(object): """Return a Member instance or None for the given (host, port).""" return self._host_to_member.get(host) - def pin_host(self, host, mode, tag_sets): + def pin_host(self, host, pref): """Pin this thread / greenlet to a member. - `host` is a (host, port) pair. The remaining parameters are a read - preference. + `host` is a (host, port) pair. The `pref` parameter is a + read_preferences.ServerMode subclass (the read preference). """ # Fun fact: Unlike in thread_util.ThreadIdent, we needn't lock around # assignment here. Assignment to a threadlocal is only unsafe if it # can cause other Python code to run implicitly. self._threadlocal.host = host - self._threadlocal.read_preference = (mode, tag_sets) + self._threadlocal.read_preference = pref - def keep_pinned_host(self, mode, tag_sets): + def keep_pinned_host(self, pref): """Does a read pref match the last used by this thread / greenlet?""" - return self._threadlocal.read_preference == (mode, tag_sets) + return self._threadlocal.read_preference == pref @property def pinned_host(self): @@ -542,14 +541,7 @@ class MongoReplicaSetClient(common.BaseObject): - `read_preference`: The read preference for this client. See :class:`~pymongo.read_preferences.ReadPreference` for available options. - - `tag_sets`: Read from replica-set members with these tags. - To specify a priority-order for tag sets, provide a list of - tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag - set, ``{}``, means "read from any member that matches the mode, - ignoring tags." :class:`MongoReplicaSetClient` tries each set of - tags in turn until it finds a set of tags with at least one matching - member. - - `acceptable_latency_ms`: (integer) Any replica-set member + - `acceptableLatencyMS`: (integer) Any replica-set member whose ping time is within acceptable_latency_ms of the nearest member may accept reads. Default 15 milliseconds. @@ -794,7 +786,7 @@ class MongoReplicaSetClient(common.BaseObject): if connect: # Try to authenticate even during failover. member = select_member( - self.__rs_state.members, ReadPreference.PRIMARY_PREFERRED) + self.__rs_state.members, ReadPreference.PRIMARY_PREFERRED.mode) if not member: raise AutoReconnect( @@ -1589,8 +1581,7 @@ class MongoReplicaSetClient(common.BaseObject): self._ensure_connected() rs_state = self.__get_rs_state() - tag_sets = kwargs.get('tag_sets', [{}]) - mode = kwargs.get('read_preference', ReadPreference.PRIMARY) + pref = kwargs.get('read_preference', ReadPreference.PRIMARY) if not rs_state.primary_member: # If we were initialized with _connect=False then connect now. @@ -1598,7 +1589,7 @@ class MongoReplicaSetClient(common.BaseObject): # if one is not already in progress. If caller requested the # primary, wait to see if it's up, otherwise continue with # known-good members. - sync = (rs_state.initial or mode == ReadPreference.PRIMARY) + sync = (rs_state.initial or pref == ReadPreference.PRIMARY) self.__schedule_refresh(sync=sync) rs_state = self.__rs_state @@ -1632,15 +1623,14 @@ class MongoReplicaSetClient(common.BaseObject): pinned_host = rs_state.pinned_host pinned_member = rs_state.get(pinned_host) if (pinned_member - and pinned_member.matches_mode(mode) - and pinned_member.matches_tag_sets(tag_sets) # TODO: REMOVE? - and rs_state.keep_pinned_host(mode, tag_sets)): + and pinned_member.matches_mode(pref.mode) + and rs_state.keep_pinned_host(pref)): try: return ( pinned_member.host, self.__try_read(pinned_member, msg, **kwargs)) except AutoReconnect, why: - if mode == ReadPreference.PRIMARY: + if pref == ReadPreference.PRIMARY: self.disconnect() raise else: @@ -1653,8 +1643,8 @@ class MongoReplicaSetClient(common.BaseObject): while len(errors) < MAX_RETRY: member = select_member( members=members, - mode=mode, - tag_sets=tag_sets, + mode=pref.mode, + tag_sets=pref.tag_sets, latency=self.__acceptable_latency) if not member: @@ -1669,27 +1659,24 @@ class MongoReplicaSetClient(common.BaseObject): if self.in_request(): # Keep reading from this member in this thread / greenlet # unless read preference changes - rs_state.pin_host(member.host, mode, tag_sets) + rs_state.pin_host(member.host, pref) return member.host, response except AutoReconnect, why: - if mode == ReadPreference.PRIMARY: + if pref == ReadPreference.PRIMARY: raise errors.append(str(why)) members.remove(member) # Ran out of tries - if mode == ReadPreference.PRIMARY: + if pref == ReadPreference.PRIMARY: msg = "No replica set primary available for query" - elif mode == ReadPreference.SECONDARY: + elif pref.mode == ReadPreference.SECONDARY.mode: msg = "No replica set secondary available for query" else: msg = "No replica set members available for query" - msg += " with ReadPreference %s" % modes[mode] - - if tag_sets != [{}]: - msg += " and tags " + repr(tag_sets) + msg += " with read preference %r" % (pref,) # Format a message like: # 'No replica set secondary available for query with ReadPreference diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py index 5be7e3cff..4552184ca 100644 --- a/pymongo/read_preferences.py +++ b/pymongo/read_preferences.py @@ -16,75 +16,260 @@ import random +from collections import namedtuple + from pymongo.errors import ConfigurationError -class ReadPreference: - """An enum that defines the read preference modes supported by PyMongo. - Used in three cases: +_PRIMARY = 0 +_PRIMARY_PREFERRED = 1 +_SECONDARY = 2 +_SECONDARY_PREFERRED = 3 +_NEAREST = 4 - :class:`~pymongo.mongo_client.MongoClient` connected to a single host: - * `PRIMARY`: Queries are allowed if the host is standalone or the replica - set primary. - * All other modes allow queries to standalone servers, to the primary, or - to secondaries. - - :class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a - sharded cluster of replica sets: - - * `PRIMARY`: Queries are sent to the primary of a shard. - * `PRIMARY_PREFERRED`: Queries are sent to the primary if available, - otherwise a secondary. - * `SECONDARY`: Queries are distributed among shard secondaries. An error - is raised if no secondaries are available. - * `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries, - or the primary if no secondary is available. - * `NEAREST`: Queries are distributed among all members of a shard. - - :class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient`: - - * `PRIMARY`: Queries are sent to the primary of the replica set. - * `PRIMARY_PREFERRED`: Queries are sent to the primary if available, - otherwise a secondary. - * `SECONDARY`: Queries are distributed among secondaries. An error - is raised if no secondaries are available. - * `SECONDARY_PREFERRED`: Queries are distributed among secondaries, - or the primary if no secondary is available. - * `NEAREST`: Queries are distributed among all members. - """ - - PRIMARY = 0 - PRIMARY_PREFERRED = 1 - SECONDARY = 2 - SECONDARY_ONLY = 2 - SECONDARY_PREFERRED = 3 - NEAREST = 4 - -# For formatting error messages -modes = { - ReadPreference.PRIMARY: 'PRIMARY', - ReadPreference.PRIMARY_PREFERRED: 'PRIMARY_PREFERRED', - ReadPreference.SECONDARY: 'SECONDARY', - ReadPreference.SECONDARY_PREFERRED: 'SECONDARY_PREFERRED', - ReadPreference.NEAREST: 'NEAREST', -} - -_mongos_modes = [ +_MONGOS_MODES = ( 'primary', 'primaryPreferred', 'secondary', 'secondaryPreferred', 'nearest', -] +) -def mongos_mode(mode): - return _mongos_modes[mode] -def mongos_enum(enum): - return _mongos_modes.index(enum) +def _validate_tag_sets(tag_sets): + """Validate tag sets for a MongoReplicaSetClient. + """ + if tag_sets is None: + return [{}] -def select_primary(members): + if not isinstance(tag_sets, list): + raise ConfigurationError(( + "Tag sets %r invalid, must be a list") % (tag_sets,)) + if len(tag_sets) == 0: + raise ConfigurationError(( + "Tag sets %r invalid, must be None or contain at least one set of" + " tags") % (tag_sets,)) + + for tags in tag_sets: + if not isinstance(tags, dict): + raise ConfigurationError( + "Tag set %r invalid, must be a dict" % (tags,)) + + return tag_sets + + +class ServerMode(object): + """Base class for all read preferences. + """ + + __slots__ = ("__mode", "__mongos_mode", "__tag_sets") + + def __init__(self, mode, tag_sets=None): + if mode == _PRIMARY and tag_sets is not None: + raise ConfigurationError("PRIMARY cannot be combined with tags") + self.__mode = mode + self.__mongos_mode = _MONGOS_MODES[mode] + self.__tag_sets = _validate_tag_sets(tag_sets) + + @property + def name(self): + """The name of this read preference. + """ + return self.__class__.__name__ + + @property + def document(self): + """Read preference as a document. + """ + return {'mode': self.__mongos_mode, 'tags': self.__tag_sets} + + @property + def mode(self): + """The mode of this read preference instance. + """ + return self.__mode + + @property + def tag_sets(self): + """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to + read only from members whose ``dc`` tag has the value ``"ny"``. + To specify a priority-order for tag sets, provide a list of + tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag + set, ``{}``, means "read from any member that matches the mode, + ignoring tags." MongoReplicaSetClient tries each set of tags in turn + until it finds a set of tags with at least one matching member. + + .. seealso:: `Data-Center Awareness + `_ + """ + return self.__tag_sets + + def __repr__(self): + return "%s(%r)" % (self.name, self.__tag_sets) + + def __eq__(self, other): + return self.mode == other.mode and self.tag_sets == other.tag_sets + + +class Primary(ServerMode): + """Primary read preference. + + * When directly connected to one mongod queries are allowed if the server + is standalone or a replica set primary. + * When connected to a mongos queries are sent to the primary of a shard. + * When connected to a replica set queries are sent to the primary of + the replica set. + """ + + def __init__(self): + super(Primary, self).__init__(_PRIMARY) + + def __repr__(self): + return "Primary()" + + def __eq__(self, other): + return other.mode == _PRIMARY + + +class PrimaryPreferred(ServerMode): + """PrimaryPreferred read preference. + + * When directly connected to one mongod queries are allowed to standalone + servers, to a replica set primary, or to replica set secondaries. + * When connected to a mongos queries are sent to the primary of a shard if + available, otherwise a shard secondary. + * When connected to a replica set queries are sent to the primary if + available, otherwise a secondary. + + :Parameters: + - `tag_sets`: The :attr:`~tag_sets` to use if the primary is not + available. + """ + + def __init__(self, tag_sets=None): + super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets) + + +class Secondary(ServerMode): + """Secondary read preference. + + * When directly connected to one mongod queries are allowed to standalone + servers, to a replica set primary, or to replica set secondaries. + * When connected to a mongos queries are distributed among shard + secondaries. An error is raised if no secondaries are available. + * When connected to a replica set queries are distributed among + secondaries. An error is raised if no secondaries are available. + + :Parameters: + - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference + """ + + def __init__(self, tag_sets=None): + super(Secondary, self).__init__(_SECONDARY, tag_sets) + + +class SecondaryPreferred(ServerMode): + """SecondaryPreferred read preference. + + * When directly connected to one mongod queries are allowed to standalone + servers, to a replica set primary, or to replica set secondaries. + * When connected to a mongos queries are distributed among shard + secondaries, or the shard primary if no secondary is available. + * When connected to a replica set queries are distributed among + secondaries, or the primary if no secondary is available. + + :Parameters: + - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference + """ + + def __init__(self, tag_sets=None): + super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets) + + +class Nearest(ServerMode): + """Nearest read preference. + + * When directly connected to one mongod queries are allowed to standalone + servers, to a replica set primary, or to replica set secondaries. + * When connected to a mongos queries are distributed among all members of + a shard. + * When connected to a replica set queries are distributed among all + members. + + :Parameters: + - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference + """ + + def __init__(self, tag_sets=None): + super(Nearest, self).__init__(_NEAREST, tag_sets) + + +_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred, + Secondary, SecondaryPreferred, Nearest) + + +def read_pref_class_from_mode(mode): + """Get the read preference class for a specific mode. + """ + return _ALL_READ_PREFERENCES[mode] + + +_MODES = ( + 'PRIMARY', + 'PRIMARY_PREFERRED', + 'SECONDARY', + 'SECONDARY_PREFERRED', + 'NEAREST', +) + + +ReadPreference = namedtuple("ReadPreference", _MODES)( + Primary(), PrimaryPreferred(), Secondary(), SecondaryPreferred(), Nearest()) +"""An enum that defines the read preference modes supported by PyMongo. +Used in three cases: + +:class:`~pymongo.mongo_client.MongoClient` connected to a single mongod: + +* `PRIMARY`: Queries are allowed if the server is standalone or a replica + set primary. +* All other modes allow queries to standalone servers, to a replica set + primary, or to replica set secondaries. + +:class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a +sharded cluster of replica sets: + +* `PRIMARY`: Queries are sent to the primary of a shard. +* `PRIMARY_PREFERRED`: Queries are sent to the shard primary if available, + otherwise a shard secondary. +* `SECONDARY`: Queries are distributed among shard secondaries. An error + is raised if no secondaries are available. +* `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries, + or the shard primary if no secondary is available. +* `NEAREST`: Queries are distributed among all members of a shard. + +:class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient`: + +* `PRIMARY`: Queries are sent to the primary of the replica set. +* `PRIMARY_PREFERRED`: Queries are sent to the primary if available, + otherwise a secondary. +* `SECONDARY`: Queries are distributed among secondaries. An error + is raised if no secondaries are available. +* `SECONDARY_PREFERRED`: Queries are distributed among secondaries, + or the primary if no secondary is available. +* `NEAREST`: Queries are distributed among all members. +""" + + +def read_pref_mode_from_name(name): + """Get the read preference mode from mongos/uri name. + """ + return _MONGOS_MODES.index(name) + + +def _select_primary(members): + """Get the primary member. + """ for member in members: if member.is_primary: return member @@ -92,7 +277,9 @@ def select_primary(members): return None -def select_member_with_tags(members, tags, secondary_only, latency): +def _select_member_with_tags(members, tags, secondary_only, latency): + """Get the member matching the given tags, and acceptable latency. + """ candidates = [] for candidate in members: @@ -118,57 +305,40 @@ def select_member_with_tags(members, tags, secondary_only, latency): return random.choice(near_candidates) -def select_member( - members, - mode=ReadPreference.PRIMARY, - tag_sets=None, - latency=15 -): +def select_member(members, mode, tag_sets=[{}], latency=15): """Return a Member or None. """ - if tag_sets is None: - tag_sets = [{}] + if mode == _PRIMARY: + return _select_primary(members) - # For brevity - PRIMARY = ReadPreference.PRIMARY - PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED - SECONDARY = ReadPreference.SECONDARY - SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED - NEAREST = ReadPreference.NEAREST - - if mode == PRIMARY: - if tag_sets != [{}]: - raise ConfigurationError("PRIMARY cannot be combined with tags") - return select_primary(members) - - elif mode == PRIMARY_PREFERRED: + elif mode == _PRIMARY_PREFERRED: # Recurse. - candidate_primary = select_member(members, PRIMARY, [{}], latency) + candidate_primary = select_member(members, _PRIMARY, [{}], latency) if candidate_primary: return candidate_primary else: - return select_member(members, SECONDARY, tag_sets, latency) + return select_member(members, _SECONDARY, tag_sets, latency) - elif mode == SECONDARY: + elif mode == _SECONDARY: for tags in tag_sets: - candidate = select_member_with_tags(members, tags, True, latency) + candidate = _select_member_with_tags(members, tags, True, latency) if candidate: return candidate return None - elif mode == SECONDARY_PREFERRED: + elif mode == _SECONDARY_PREFERRED: # Recurse. candidate_secondary = select_member( - members, SECONDARY, tag_sets, latency) + members, _SECONDARY, tag_sets, latency) if candidate_secondary: return candidate_secondary else: - return select_member(members, PRIMARY, [{}], latency) + return select_member(members, _PRIMARY, [{}], latency) - elif mode == NEAREST: + elif mode == _NEAREST: for tags in tag_sets: - candidate = select_member_with_tags(members, tags, False, latency) + candidate = _select_member_with_tags(members, tags, False, latency) if candidate: return candidate @@ -176,23 +346,23 @@ def select_member( return None else: - raise ConfigurationError("Invalid mode %s" % repr(mode)) + raise ConfigurationError("Invalid mode %d" % (mode,)) -"""Commands that may be sent to replica-set secondaries, depending on - ReadPreference and tags. All other commands are always run on the primary. -""" -secondary_ok_commands = frozenset([ +SECONDARY_OK_COMMANDS = frozenset([ "group", "aggregate", "collstats", "dbstats", "count", "distinct", "geonear", "geosearch", "geowalk", "mapreduce", "getnonce", "authenticate", "text", "parallelcollectionscan" ]) +"""Commands that may be sent to replica-set secondaries, depending on + ReadPreference and tags. All other commands are always run on the primary. +""" class MovingAverage(object): + """Immutable structure to track a 5-sample moving average. + """ def __init__(self, samples): - """Immutable structure to track a 5-sample moving average. - """ self.samples = samples[-5:] assert self.samples self.average = sum(self.samples) / float(len(self.samples)) @@ -202,4 +372,6 @@ class MovingAverage(object): return MovingAverage(self.samples + [sample]) def get(self): + """Get the calculated average. + """ return self.average diff --git a/test/high_availability/test_ha.py b/test/high_availability/test_ha.py index f1edca50c..7819e5b66 100644 --- a/test/high_availability/test_ha.py +++ b/test/high_availability/test_ha.py @@ -34,7 +34,7 @@ from pymongo.member import Member from pymongo.mongo_replica_set_client import Monitor from pymongo.mongo_replica_set_client import MongoReplicaSetClient from pymongo.mongo_client import MongoClient, _partition_node -from pymongo.read_preferences import ReadPreference, modes +from pymongo.read_preferences import ReadPreference from test import utils, version from test.utils import one @@ -800,9 +800,8 @@ class TestReadPreference(HATestCase): # Reading with a different mode unpinned, hooray! break else: - self.fail( - "Changing from mode %s to mode %s never unpinned" % ( - modes[mode0], modes[mode1])) + self.fail("Changing from mode %r to mode " + "%r never unpinned" % (mode0, mode1)) # Now verify changing the tag_sets unpins the member. tags0 = [{'a': 'a'}, {}] diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index 17648e6de..4bc643a50 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -25,8 +25,11 @@ sys.path[0:0] = [""] from bson.son import SON from pymongo.cursor import _QUERY_OPTIONS from pymongo.mongo_replica_set_client import MongoReplicaSetClient -from pymongo.read_preferences import (ReadPreference, modes, MovingAverage, - secondary_ok_commands) +from pymongo.read_preferences import (ReadPreference, MovingAverage, + PrimaryPreferred, + Secondary, SecondaryPreferred, + Nearest, ServerMode, + SECONDARY_OK_COMMANDS) from pymongo.errors import ConfigurationError from test.test_replica_set_client import TestReplicaSetClientBase @@ -78,8 +81,7 @@ class TestReadPreferencesBase(TestReplicaSetClientBase): class TestReadPreferences(TestReadPreferencesBase): def test_mode_validation(self): - # 'modes' are imported from read_preferences.py - for mode in modes: + for mode in ReadPreference: self.assertEqual(mode, self._get_client( read_preference=mode).read_preference) @@ -88,33 +90,33 @@ class TestReadPreferences(TestReadPreferencesBase): def test_tag_sets_validation(self): # Can't use tags with PRIMARY - self.assertRaises(ConfigurationError, self._get_client, - tag_sets=[{'k': 'v'}]) + self.assertRaises(ConfigurationError, ServerMode, + 0, tag_sets=[{'k': 'v'}]) # ... but empty tag sets are ok with PRIMARY - self.assertEqual([{}], self._get_client(tag_sets=[{}]).tag_sets) + self.assertRaises(ConfigurationError, ServerMode, + 0, tag_sets=[{}]) - S = ReadPreference.SECONDARY - self.assertEqual([{}], self._get_client(read_preference=S).tag_sets) + S = Secondary([{}]) + self.assertEqual([{}], + self._get_client(read_preference=S).read_preference.tag_sets) - self.assertEqual([{'k': 'v'}], self._get_client( - read_preference=S, tag_sets=[{'k': 'v'}]).tag_sets) + S = Secondary([{'k': 'v'}]) + self.assertEqual([{'k': 'v'}], + self._get_client(read_preference=S).read_preference.tag_sets) - self.assertEqual([{'k': 'v'}, {}], self._get_client( - read_preference=S, tag_sets=[{'k': 'v'}, {}]).tag_sets) + S = Secondary([{'k': 'v'}, {}]) + self.assertEqual([{'k': 'v'}, {}], + self._get_client(read_preference=S).read_preference.tag_sets) - self.assertRaises(ConfigurationError, self._get_client, - read_preference=S, tag_sets=[]) + self.assertRaises(ConfigurationError, Secondary, tag_sets=[]) # One dict not ok, must be a list of dicts - self.assertRaises(ConfigurationError, self._get_client, - read_preference=S, tag_sets={'k': 'v'}) + self.assertRaises(ConfigurationError, Secondary, tag_sets={'k': 'v'}) - self.assertRaises(ConfigurationError, self._get_client, - read_preference=S, tag_sets='foo') + self.assertRaises(ConfigurationError, Secondary, tag_sets='foo') - self.assertRaises(ConfigurationError, self._get_client, - read_preference=S, tag_sets=['foo']) + self.assertRaises(ConfigurationError, Secondary, tag_sets=['foo']) def test_latency_validation(self): self.assertEqual(17, self._get_client( @@ -150,12 +152,6 @@ class TestReadPreferences(TestReadPreferencesBase): self.assertReadsFrom('secondary', read_preference=ReadPreference.SECONDARY_PREFERRED) - def test_secondary_only(self): - # Test deprecated mode SECONDARY_ONLY, which is now a synonym for - # SECONDARY - self.assertEqual( - ReadPreference.SECONDARY, ReadPreference.SECONDARY_ONLY) - def test_nearest(self): # With high acceptableLatencyMS, expect to read from any # member @@ -237,7 +233,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase): def _test_fn(self, obedient, fn): if not obedient: - for mode in modes: + for mode in ReadPreference: self.c.read_preference = mode # Run it a few times to make sure we don't just get lucky the @@ -511,24 +507,20 @@ class TestMongosConnection(unittest.TestCase): '$readPreference' in cursor._Cursor__query_spec()) # Copy these constants for brevity - PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED - SECONDARY = ReadPreference.SECONDARY - SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED - NEAREST = ReadPreference.NEAREST SLAVE_OKAY = _QUERY_OPTIONS['slave_okay'] # Test non-PRIMARY modes which can be combined with tags - for kwarg, value, mongos_mode in ( - ('read_preference', PRIMARY_PREFERRED, 'primaryPreferred'), - ('read_preference', SECONDARY, 'secondary'), - ('read_preference', SECONDARY_PREFERRED, 'secondaryPreferred'), - ('read_preference', NEAREST, 'nearest'), + for mode, mongos_mode in ( + (PrimaryPreferred, 'primaryPreferred'), + (Secondary, 'secondary'), + (SecondaryPreferred, 'secondaryPreferred'), + (Nearest, 'nearest'), ): for tag_sets in ( None, [{}] ): # Create a client e.g. with read_preference=NEAREST - c = get_client(tag_sets=tag_sets, **{kwarg: value}) + c = get_client(read_preference=mode(tag_sets)) self.assertEqual(is_mongos, c.is_mongos) cursor = c.pymongo_test.test.find() @@ -567,7 +559,7 @@ class TestMongosConnection(unittest.TestCase): [{'dc': 'la'}, {'dc': 'sf'}], [{'dc': 'la'}, {'dc': 'sf'}, {}], ): - c = get_client(tag_sets=tag_sets, **{kwarg: value}) + c = get_client(read_preference=mode(tag_sets)) self.assertEqual(is_mongos, c.is_mongos) cursor = c.pymongo_test.test.find() @@ -586,7 +578,7 @@ class TestMongosConnection(unittest.TestCase): raise SkipTest("Only mongos have read_prefs added to the spec") # Ensure secondary_ok_commands have readPreference - for cmd in secondary_ok_commands: + for cmd in SECONDARY_OK_COMMANDS: if cmd == 'mapreduce': # map reduce is a special case continue command = SON([(cmd, 1)]) diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index 685fece19..16afcb33d 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -16,7 +16,6 @@ # TODO: anywhere we wait for refresh in tests, consider just refreshing w/ sync -import copy import datetime import signal import socket @@ -34,8 +33,7 @@ from nose.plugins.skip import SkipTest from bson.son import SON from bson.tz_util import utc from pymongo.mongo_client import MongoClient -from pymongo.read_preferences import ReadPreference -from pymongo.member import PRIMARY, SECONDARY, OTHER +from pymongo.read_preferences import ReadPreference, Secondary, Nearest from pymongo.mongo_replica_set_client import MongoReplicaSetClient from pymongo.mongo_replica_set_client import _partition_node, have_gevent from pymongo.database import Database @@ -240,20 +238,18 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): # Make sure MRSC's properties are copied to Database and Collection for obj in c, c.pymongo_test, c.pymongo_test.test: self.assertEqual(obj.read_preference, ReadPreference.PRIMARY) - self.assertEqual(obj.tag_sets, [{}]) self.assertEqual(obj.write_concern, {}) cursor = c.pymongo_test.test.find() self.assertEqual( ReadPreference.PRIMARY, cursor._Cursor__read_preference) - self.assertEqual([{}], cursor._Cursor__tag_sets) c.close() tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}] + secondary = Secondary(tag_sets) c = MongoReplicaSetClient(pair, replicaSet=self.name, max_pool_size=25, document_class=SON, tz_aware=True, - read_preference=ReadPreference.SECONDARY, - tag_sets=copy.deepcopy(tag_sets), + read_preference=secondary, acceptablelatencyms=77) c.admin.command('ping') self.assertEqual(c.primary, self.primary) @@ -264,21 +260,17 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): self.assertEqual(c.tz_aware, True) for obj in c, c.pymongo_test, c.pymongo_test.test: - self.assertEqual(obj.read_preference, ReadPreference.SECONDARY) - self.assertEqual(obj.tag_sets, tag_sets) + self.assertEqual(obj.read_preference, secondary) cursor = c.pymongo_test.test.find() self.assertEqual( - ReadPreference.SECONDARY, cursor._Cursor__read_preference) - self.assertEqual(tag_sets, cursor._Cursor__tag_sets) + secondary, cursor._Cursor__read_preference) - cursor = c.pymongo_test.test.find( - read_preference=ReadPreference.NEAREST, - tag_sets=[{'dc':'ny'}, {}]) + nearest = Nearest([{'dc': 'ny'}, {}]) + cursor = c.pymongo_test.test.find(read_preference=nearest) self.assertEqual( - ReadPreference.NEAREST, cursor._Cursor__read_preference) - self.assertEqual([{'dc':'ny'}, {}], cursor._Cursor__tag_sets) + nearest, cursor._Cursor__read_preference) if version.at_least(c, (1, 7, 4)): self.assertEqual(c.max_bson_size, 16777216) @@ -704,12 +696,12 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): ConnectionFailure, collection.find_one, {'$where': delay(5)}, - read_preference=SECONDARY) + read_preference=ReadPreference.SECONDARY) rs_state = c._MongoReplicaSetClient__rs_state secondary_host = one(rs_state.secondaries) self.assertTrue(rs_state.get(secondary_host)) - collection.find_one(read_preference=SECONDARY) # No error. + collection.find_one(read_preference=ReadPreference.SECONDARY) # No error. def test_waitQueueTimeoutMS(self): client = self._get_client(waitQueueTimeoutMS=2000) diff --git a/test/test_uri_parser.py b/test/test_uri_parser.py index 5eb3794bb..5ee22aabc 100644 --- a/test/test_uri_parser.py +++ b/test/test_uri_parser.py @@ -267,7 +267,7 @@ class TestURI(unittest.TestCase): "test.yield_historical.in")) res = copy.deepcopy(orig) - res['options'] = {'readpreference': ReadPreference.SECONDARY} + res['options'] = {'readpreference': ReadPreference.SECONDARY.mode} self.assertEqual(res, parse_uri("mongodb://localhost/?readPreference=secondary")) @@ -324,7 +324,7 @@ class TestURI(unittest.TestCase): "@localhost/foo?authMechanism=GSSAPI")) res = copy.deepcopy(orig) - res['options'] = {'readpreference': ReadPreference.SECONDARY, + res['options'] = {'readpreference': ReadPreference.SECONDARY.mode, 'readpreferencetags': [ {'dc': 'west', 'use': 'website'}, {'dc': 'east', 'use': 'website'}]} @@ -338,7 +338,7 @@ class TestURI(unittest.TestCase): "readpreferencetags=dc:east,use:website")) res = copy.deepcopy(orig) - res['options'] = {'readpreference': ReadPreference.SECONDARY, + res['options'] = {'readpreference': ReadPreference.SECONDARY.mode, 'readpreferencetags': [ {'dc': 'west', 'use': 'website'}, {'dc': 'east', 'use': 'website'},