diff --git a/pymongo/collection.py b/pymongo/collection.py index 7cf62d7c6..d49d1b9e6 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -909,7 +909,7 @@ class Collection(common.BaseObject): 'tag_sets': self.tag_sets, 'secondary_acceptable_latency_ms': ( self.secondary_acceptable_latency_ms), - '_use_master': not self.read_preference} + } command_kwargs.update(kwargs) result, conn_id = self.__database._command( @@ -1290,7 +1290,7 @@ class Collection(common.BaseObject): 'tag_sets': self.tag_sets, 'secondary_acceptable_latency_ms': ( self.secondary_acceptable_latency_ms), - '_use_master': not self.read_preference} + } command_kwargs.update(kwargs) result, conn_id = self.__database._command( @@ -1365,7 +1365,6 @@ class Collection(common.BaseObject): tag_sets=self.tag_sets, secondary_acceptable_latency_ms=( self.secondary_acceptable_latency_ms), - _use_master=not self.read_preference, **kwargs)["retval"] def rename(self, new_name, **kwargs): @@ -1465,11 +1464,6 @@ class Collection(common.BaseObject): raise TypeError("'out' must be an instance of " "%s or dict" % (basestring.__name__,)) - if isinstance(out, dict) and out.get('inline'): - must_use_master = False - else: - must_use_master = True - response = self.__database.command("mapreduce", self.__name, uuid_subtype=self.uuid_subtype, map=map, reduce=reduce, @@ -1477,8 +1471,7 @@ class Collection(common.BaseObject): tag_sets=self.tag_sets, secondary_acceptable_latency_ms=( self.secondary_acceptable_latency_ms), - out=out, _use_master=must_use_master, - **kwargs) + out=out, **kwargs) if full_response or not response.get('result'): return response @@ -1528,7 +1521,6 @@ class Collection(common.BaseObject): tag_sets=self.tag_sets, secondary_acceptable_latency_ms=( self.secondary_acceptable_latency_ms), - _use_master=not self.read_preference, map=map, reduce=reduce, out={"inline": 1}, **kwargs) diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 511640b37..67f0aa769 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -70,8 +70,7 @@ class Cursor(object): await_data=False, partial=False, manipulate=True, read_preference=ReadPreference.PRIMARY, tag_sets=[{}], secondary_acceptable_latency_ms=None, - exhaust=False, compile_re=True, _must_use_master=False, - _uuid_subtype=None): + exhaust=False, compile_re=True, _uuid_subtype=None): """Create a new cursor. Should not be called directly by application developers - see @@ -152,7 +151,6 @@ class Cursor(object): self.__secondary_acceptable_latency_ms = secondary_acceptable_latency_ms self.__tz_aware = collection.database.connection.tz_aware self.__compile_re = compile_re - self.__must_use_master = _must_use_master self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype self.__data = deque() @@ -238,8 +236,7 @@ class Cursor(object): "batch_size", "max_scan", "as_class", "manipulate", "read_preference", "tag_sets", "secondary_acceptable_latency_ms", - "must_use_master", "uuid_subtype", "compile_re", - "query_flags") + "uuid_subtype", "compile_re", "query_flags") data = dict((k, v) for k, v in self.__dict__.iteritems() if k.startswith('_Cursor__') and k[9:] in values_to_clone) if deepcopy: @@ -699,7 +696,6 @@ class Cursor(object): command['tag_sets'] = self.__tag_sets command['secondary_acceptable_latency_ms'] = ( self.__secondary_acceptable_latency_ms) - command['_use_master'] = not self.__read_preference if self.__max_time_ms is not None: command["maxTimeMS"] = self.__max_time_ms if self.__comment: @@ -755,7 +751,6 @@ class Cursor(object): options['tag_sets'] = self.__tag_sets options['secondary_acceptable_latency_ms'] = ( self.__secondary_acceptable_latency_ms) - options['_use_master'] = not self.__read_preference if self.__max_time_ms is not None: options['maxTimeMS'] = self.__max_time_ms if self.__comment: @@ -861,12 +856,13 @@ class Cursor(object): client = self.__collection.database.connection if message: - kwargs = {"_must_use_master": self.__must_use_master} - kwargs["read_preference"] = self.__read_preference - kwargs["tag_sets"] = self.__tag_sets - kwargs["secondary_acceptable_latency_ms"] = ( - self.__secondary_acceptable_latency_ms) - kwargs['exhaust'] = self.__exhaust + kwargs = { + "read_preference": self.__read_preference, + "tag_sets": self.__tag_sets, + "secondary_acceptable_latency_ms": + self.__secondary_acceptable_latency_ms, + "exhaust": self.__exhaust, + } if self.__connection_id is not None: kwargs["_connection_to_use"] = self.__connection_id diff --git a/pymongo/database.py b/pymongo/database.py index b27ff153b..904b484f9 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -26,7 +26,8 @@ from pymongo.errors import (CollectionInvalid, ConfigurationError, InvalidName, OperationFailure) -from pymongo import read_preferences as rp +from pymongo.read_preferences import (ReadPreference, + modes, secondary_ok_commands) def _check_name(name): @@ -276,59 +277,56 @@ class Database(common.BaseObject): """ if isinstance(command, basestring): + command_name = command.lower() command = SON([(command, value)]) + else: + command_name = command.keys()[0].lower() - command_name = command.keys()[0].lower() - must_use_master = kwargs.pop('_use_master', False) - if command_name not in rp.secondary_ok_commands: - must_use_master = True + orig = mode = kwargs.pop('read_preference', self.read_preference) + tags = kwargs.pop('tag_sets', self.tag_sets) + latency = kwargs.pop('secondary_acceptable_latency_ms', + self.secondary_acceptable_latency_ms) + as_class = kwargs.pop('as_class', None) + + if command_name not in secondary_ok_commands: + mode = ReadPreference.PRIMARY # Special-case: mapreduce can go to secondaries only if inline - if command_name == 'mapreduce': + elif command_name == 'mapreduce': out = command.get('out') or kwargs.get('out') if not isinstance(out, dict) or not out.get('inline'): - must_use_master = True + mode = ReadPreference.PRIMARY # Special-case: aggregate with $out cannot go to secondaries. - if command_name == 'aggregate': + elif command_name == 'aggregate': for stage in kwargs.get('pipeline', []): if '$out' in stage: - must_use_master = True + mode = ReadPreference.PRIMARY break - extra_opts = { - 'as_class': kwargs.pop('as_class', None), - '_must_use_master': must_use_master, - '_uuid_subtype': uuid_subtype - } + # Warn if mode will override read_preference. + if mode != orig: + warnings.warn("%s does not support %s read preference " + "and will be routed to the primary instead." % + (command_name, modes[orig]), UserWarning) + tags = [{}] + latency = None - extra_opts['read_preference'] = kwargs.pop( - 'read_preference', - self.read_preference) - extra_opts['tag_sets'] = kwargs.pop( - 'tag_sets', - self.tag_sets) - extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop( - 'secondary_acceptable_latency_ms', - self.secondary_acceptable_latency_ms) - extra_opts['compile_re'] = compile_re - - fields = kwargs.get('fields') + fields = kwargs.pop('fields', None) if fields is not None and not isinstance(fields, dict): - kwargs['fields'] = helpers._fields_list_to_dict(fields) + fields = helpers._fields_list_to_dict(fields) command.update(kwargs) - # Warn if must_use_master will override read_preference. - if (extra_opts['read_preference'] != rp.ReadPreference.PRIMARY and - extra_opts['_must_use_master']): - warnings.warn("%s does not support %s read preference " - "and will be routed to the primary instead." % - (command_name, - rp.modes[extra_opts['read_preference']]), - UserWarning) - - cursor = self["$cmd"].find(command, **extra_opts).limit(-1) + cursor = self["$cmd"].find(command, + fields=fields, + limit=-1, + as_class=as_class, + read_preference=mode, + tag_sets=tags, + secondary_acceptable_latency_ms=latency, + compile_re=compile_re, + _uuid_subtype=uuid_subtype) for doc in cursor: result = doc @@ -437,7 +435,8 @@ class Database(common.BaseObject): - `include_system_collections` (optional): if ``False`` list will not include system collections (e.g ``system.indexes``) """ - results = self["system.namespaces"].find(_must_use_master=True) + results = self["system.namespaces"].find( + read_preference=ReadPreference.PRIMARY) names = [r["name"] for r in results] names = [n[len(self.__name) + 1:] for n in names if n.startswith(self.__name + ".") and "$" not in n] diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 38c343e12..a7eb2d342 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1165,10 +1165,7 @@ class MongoClient(common.BaseObject): sock_info.close() raise - # we just ignore _must_use_master here: it's only relevant for - # MasterSlaveConnection instances. - def _send_message_with_response(self, message, - _must_use_master=False, **kwargs): + def _send_message_with_response(self, message, **kwargs): """Send a message to Mongo and return the response. Sends the given message and returns the response. diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index 864a4c20b..479a0c1af 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -1566,8 +1566,8 @@ class MongoReplicaSetClient(common.BaseObject): host, port = member.host raise AutoReconnect("%s:%d: %s" % (host, port, why)) - def _send_message_with_response(self, msg, _connection_to_use=None, - _must_use_master=False, **kwargs): + def _send_message_with_response(self, msg, + _connection_to_use=None, **kwargs): """Send a message to Mongo and return the response. Sends the given message and returns (host used, response). @@ -1576,16 +1576,12 @@ class MongoReplicaSetClient(common.BaseObject): - `msg`: (request_id, data) pair making up the message to send - `_connection_to_use`: Optional (host, port) of member for message, used by Cursor for getMore and killCursors messages. - - `_must_use_master`: If True, send to primary. """ self._ensure_connected() rs_state = self.__get_rs_state() tag_sets = kwargs.get('tag_sets', [{}]) mode = kwargs.get('read_preference', ReadPreference.PRIMARY) - if _must_use_master: - mode = ReadPreference.PRIMARY - tag_sets = [{}] if not rs_state.primary_member: # If we were initialized with _connect=False then connect now. @@ -1639,7 +1635,7 @@ class MongoReplicaSetClient(common.BaseObject): pinned_member.host, self.__try_read(pinned_member, msg, **kwargs)) except AutoReconnect, why: - if _must_use_master or mode == ReadPreference.PRIMARY: + if mode == ReadPreference.PRIMARY: self.disconnect() raise else: diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index bcf732b63..c304a1d64 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -480,7 +480,8 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): c.copy_database("pymongo_test", "pymongo_test1", username="mike", password="password") self.assertTrue("pymongo_test1" in c.database_names()) - res = c.pymongo_test1.test.find_one(_must_use_master=True) + res = c.pymongo_test1.test.find_one( + read_preference=ReadPreference.PRIMARY) self.assertEqual("bar", res["foo"]) finally: # Cleanup