PYTHON-525 Remove (_must)_use_master.
Left over from MasterSlaveConnection.
This commit is contained in:
parent
de23b63994
commit
9404ff1f12
@ -909,7 +909,7 @@ class Collection(common.BaseObject):
|
||||
'tag_sets': self.tag_sets,
|
||||
'secondary_acceptable_latency_ms': (
|
||||
self.secondary_acceptable_latency_ms),
|
||||
'_use_master': not self.read_preference}
|
||||
}
|
||||
command_kwargs.update(kwargs)
|
||||
|
||||
result, conn_id = self.__database._command(
|
||||
@ -1290,7 +1290,7 @@ class Collection(common.BaseObject):
|
||||
'tag_sets': self.tag_sets,
|
||||
'secondary_acceptable_latency_ms': (
|
||||
self.secondary_acceptable_latency_ms),
|
||||
'_use_master': not self.read_preference}
|
||||
}
|
||||
|
||||
command_kwargs.update(kwargs)
|
||||
result, conn_id = self.__database._command(
|
||||
@ -1365,7 +1365,6 @@ class Collection(common.BaseObject):
|
||||
tag_sets=self.tag_sets,
|
||||
secondary_acceptable_latency_ms=(
|
||||
self.secondary_acceptable_latency_ms),
|
||||
_use_master=not self.read_preference,
|
||||
**kwargs)["retval"]
|
||||
|
||||
def rename(self, new_name, **kwargs):
|
||||
@ -1465,11 +1464,6 @@ class Collection(common.BaseObject):
|
||||
raise TypeError("'out' must be an instance of "
|
||||
"%s or dict" % (basestring.__name__,))
|
||||
|
||||
if isinstance(out, dict) and out.get('inline'):
|
||||
must_use_master = False
|
||||
else:
|
||||
must_use_master = True
|
||||
|
||||
response = self.__database.command("mapreduce", self.__name,
|
||||
uuid_subtype=self.uuid_subtype,
|
||||
map=map, reduce=reduce,
|
||||
@ -1477,8 +1471,7 @@ class Collection(common.BaseObject):
|
||||
tag_sets=self.tag_sets,
|
||||
secondary_acceptable_latency_ms=(
|
||||
self.secondary_acceptable_latency_ms),
|
||||
out=out, _use_master=must_use_master,
|
||||
**kwargs)
|
||||
out=out, **kwargs)
|
||||
|
||||
if full_response or not response.get('result'):
|
||||
return response
|
||||
@ -1528,7 +1521,6 @@ class Collection(common.BaseObject):
|
||||
tag_sets=self.tag_sets,
|
||||
secondary_acceptable_latency_ms=(
|
||||
self.secondary_acceptable_latency_ms),
|
||||
_use_master=not self.read_preference,
|
||||
map=map, reduce=reduce,
|
||||
out={"inline": 1}, **kwargs)
|
||||
|
||||
|
||||
@ -70,8 +70,7 @@ class Cursor(object):
|
||||
await_data=False, partial=False, manipulate=True,
|
||||
read_preference=ReadPreference.PRIMARY,
|
||||
tag_sets=[{}], secondary_acceptable_latency_ms=None,
|
||||
exhaust=False, compile_re=True, _must_use_master=False,
|
||||
_uuid_subtype=None):
|
||||
exhaust=False, compile_re=True, _uuid_subtype=None):
|
||||
"""Create a new cursor.
|
||||
|
||||
Should not be called directly by application developers - see
|
||||
@ -152,7 +151,6 @@ class Cursor(object):
|
||||
self.__secondary_acceptable_latency_ms = secondary_acceptable_latency_ms
|
||||
self.__tz_aware = collection.database.connection.tz_aware
|
||||
self.__compile_re = compile_re
|
||||
self.__must_use_master = _must_use_master
|
||||
self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype
|
||||
|
||||
self.__data = deque()
|
||||
@ -238,8 +236,7 @@ class Cursor(object):
|
||||
"batch_size", "max_scan", "as_class",
|
||||
"manipulate", "read_preference", "tag_sets",
|
||||
"secondary_acceptable_latency_ms",
|
||||
"must_use_master", "uuid_subtype", "compile_re",
|
||||
"query_flags")
|
||||
"uuid_subtype", "compile_re", "query_flags")
|
||||
data = dict((k, v) for k, v in self.__dict__.iteritems()
|
||||
if k.startswith('_Cursor__') and k[9:] in values_to_clone)
|
||||
if deepcopy:
|
||||
@ -699,7 +696,6 @@ class Cursor(object):
|
||||
command['tag_sets'] = self.__tag_sets
|
||||
command['secondary_acceptable_latency_ms'] = (
|
||||
self.__secondary_acceptable_latency_ms)
|
||||
command['_use_master'] = not self.__read_preference
|
||||
if self.__max_time_ms is not None:
|
||||
command["maxTimeMS"] = self.__max_time_ms
|
||||
if self.__comment:
|
||||
@ -755,7 +751,6 @@ class Cursor(object):
|
||||
options['tag_sets'] = self.__tag_sets
|
||||
options['secondary_acceptable_latency_ms'] = (
|
||||
self.__secondary_acceptable_latency_ms)
|
||||
options['_use_master'] = not self.__read_preference
|
||||
if self.__max_time_ms is not None:
|
||||
options['maxTimeMS'] = self.__max_time_ms
|
||||
if self.__comment:
|
||||
@ -861,12 +856,13 @@ class Cursor(object):
|
||||
client = self.__collection.database.connection
|
||||
|
||||
if message:
|
||||
kwargs = {"_must_use_master": self.__must_use_master}
|
||||
kwargs["read_preference"] = self.__read_preference
|
||||
kwargs["tag_sets"] = self.__tag_sets
|
||||
kwargs["secondary_acceptable_latency_ms"] = (
|
||||
self.__secondary_acceptable_latency_ms)
|
||||
kwargs['exhaust'] = self.__exhaust
|
||||
kwargs = {
|
||||
"read_preference": self.__read_preference,
|
||||
"tag_sets": self.__tag_sets,
|
||||
"secondary_acceptable_latency_ms":
|
||||
self.__secondary_acceptable_latency_ms,
|
||||
"exhaust": self.__exhaust,
|
||||
}
|
||||
if self.__connection_id is not None:
|
||||
kwargs["_connection_to_use"] = self.__connection_id
|
||||
|
||||
|
||||
@ -26,7 +26,8 @@ from pymongo.errors import (CollectionInvalid,
|
||||
ConfigurationError,
|
||||
InvalidName,
|
||||
OperationFailure)
|
||||
from pymongo import read_preferences as rp
|
||||
from pymongo.read_preferences import (ReadPreference,
|
||||
modes, secondary_ok_commands)
|
||||
|
||||
|
||||
def _check_name(name):
|
||||
@ -276,59 +277,56 @@ class Database(common.BaseObject):
|
||||
"""
|
||||
|
||||
if isinstance(command, basestring):
|
||||
command_name = command.lower()
|
||||
command = SON([(command, value)])
|
||||
else:
|
||||
command_name = command.keys()[0].lower()
|
||||
|
||||
command_name = command.keys()[0].lower()
|
||||
must_use_master = kwargs.pop('_use_master', False)
|
||||
if command_name not in rp.secondary_ok_commands:
|
||||
must_use_master = True
|
||||
orig = mode = kwargs.pop('read_preference', self.read_preference)
|
||||
tags = kwargs.pop('tag_sets', self.tag_sets)
|
||||
latency = kwargs.pop('secondary_acceptable_latency_ms',
|
||||
self.secondary_acceptable_latency_ms)
|
||||
as_class = kwargs.pop('as_class', None)
|
||||
|
||||
if command_name not in secondary_ok_commands:
|
||||
mode = ReadPreference.PRIMARY
|
||||
|
||||
# Special-case: mapreduce can go to secondaries only if inline
|
||||
if command_name == 'mapreduce':
|
||||
elif command_name == 'mapreduce':
|
||||
out = command.get('out') or kwargs.get('out')
|
||||
if not isinstance(out, dict) or not out.get('inline'):
|
||||
must_use_master = True
|
||||
mode = ReadPreference.PRIMARY
|
||||
|
||||
# Special-case: aggregate with $out cannot go to secondaries.
|
||||
if command_name == 'aggregate':
|
||||
elif command_name == 'aggregate':
|
||||
for stage in kwargs.get('pipeline', []):
|
||||
if '$out' in stage:
|
||||
must_use_master = True
|
||||
mode = ReadPreference.PRIMARY
|
||||
break
|
||||
|
||||
extra_opts = {
|
||||
'as_class': kwargs.pop('as_class', None),
|
||||
'_must_use_master': must_use_master,
|
||||
'_uuid_subtype': uuid_subtype
|
||||
}
|
||||
# Warn if mode will override read_preference.
|
||||
if mode != orig:
|
||||
warnings.warn("%s does not support %s read preference "
|
||||
"and will be routed to the primary instead." %
|
||||
(command_name, modes[orig]), UserWarning)
|
||||
tags = [{}]
|
||||
latency = None
|
||||
|
||||
extra_opts['read_preference'] = kwargs.pop(
|
||||
'read_preference',
|
||||
self.read_preference)
|
||||
extra_opts['tag_sets'] = kwargs.pop(
|
||||
'tag_sets',
|
||||
self.tag_sets)
|
||||
extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop(
|
||||
'secondary_acceptable_latency_ms',
|
||||
self.secondary_acceptable_latency_ms)
|
||||
extra_opts['compile_re'] = compile_re
|
||||
|
||||
fields = kwargs.get('fields')
|
||||
fields = kwargs.pop('fields', None)
|
||||
if fields is not None and not isinstance(fields, dict):
|
||||
kwargs['fields'] = helpers._fields_list_to_dict(fields)
|
||||
fields = helpers._fields_list_to_dict(fields)
|
||||
|
||||
command.update(kwargs)
|
||||
|
||||
# Warn if must_use_master will override read_preference.
|
||||
if (extra_opts['read_preference'] != rp.ReadPreference.PRIMARY and
|
||||
extra_opts['_must_use_master']):
|
||||
warnings.warn("%s does not support %s read preference "
|
||||
"and will be routed to the primary instead." %
|
||||
(command_name,
|
||||
rp.modes[extra_opts['read_preference']]),
|
||||
UserWarning)
|
||||
|
||||
cursor = self["$cmd"].find(command, **extra_opts).limit(-1)
|
||||
cursor = self["$cmd"].find(command,
|
||||
fields=fields,
|
||||
limit=-1,
|
||||
as_class=as_class,
|
||||
read_preference=mode,
|
||||
tag_sets=tags,
|
||||
secondary_acceptable_latency_ms=latency,
|
||||
compile_re=compile_re,
|
||||
_uuid_subtype=uuid_subtype)
|
||||
for doc in cursor:
|
||||
result = doc
|
||||
|
||||
@ -437,7 +435,8 @@ class Database(common.BaseObject):
|
||||
- `include_system_collections` (optional): if ``False`` list
|
||||
will not include system collections (e.g ``system.indexes``)
|
||||
"""
|
||||
results = self["system.namespaces"].find(_must_use_master=True)
|
||||
results = self["system.namespaces"].find(
|
||||
read_preference=ReadPreference.PRIMARY)
|
||||
names = [r["name"] for r in results]
|
||||
names = [n[len(self.__name) + 1:] for n in names
|
||||
if n.startswith(self.__name + ".") and "$" not in n]
|
||||
|
||||
@ -1165,10 +1165,7 @@ class MongoClient(common.BaseObject):
|
||||
sock_info.close()
|
||||
raise
|
||||
|
||||
# we just ignore _must_use_master here: it's only relevant for
|
||||
# MasterSlaveConnection instances.
|
||||
def _send_message_with_response(self, message,
|
||||
_must_use_master=False, **kwargs):
|
||||
def _send_message_with_response(self, message, **kwargs):
|
||||
"""Send a message to Mongo and return the response.
|
||||
|
||||
Sends the given message and returns the response.
|
||||
|
||||
@ -1566,8 +1566,8 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
host, port = member.host
|
||||
raise AutoReconnect("%s:%d: %s" % (host, port, why))
|
||||
|
||||
def _send_message_with_response(self, msg, _connection_to_use=None,
|
||||
_must_use_master=False, **kwargs):
|
||||
def _send_message_with_response(self, msg,
|
||||
_connection_to_use=None, **kwargs):
|
||||
"""Send a message to Mongo and return the response.
|
||||
|
||||
Sends the given message and returns (host used, response).
|
||||
@ -1576,16 +1576,12 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
- `msg`: (request_id, data) pair making up the message to send
|
||||
- `_connection_to_use`: Optional (host, port) of member for message,
|
||||
used by Cursor for getMore and killCursors messages.
|
||||
- `_must_use_master`: If True, send to primary.
|
||||
"""
|
||||
self._ensure_connected()
|
||||
|
||||
rs_state = self.__get_rs_state()
|
||||
tag_sets = kwargs.get('tag_sets', [{}])
|
||||
mode = kwargs.get('read_preference', ReadPreference.PRIMARY)
|
||||
if _must_use_master:
|
||||
mode = ReadPreference.PRIMARY
|
||||
tag_sets = [{}]
|
||||
|
||||
if not rs_state.primary_member:
|
||||
# If we were initialized with _connect=False then connect now.
|
||||
@ -1639,7 +1635,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
pinned_member.host,
|
||||
self.__try_read(pinned_member, msg, **kwargs))
|
||||
except AutoReconnect, why:
|
||||
if _must_use_master or mode == ReadPreference.PRIMARY:
|
||||
if mode == ReadPreference.PRIMARY:
|
||||
self.disconnect()
|
||||
raise
|
||||
else:
|
||||
|
||||
@ -480,7 +480,8 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
c.copy_database("pymongo_test", "pymongo_test1",
|
||||
username="mike", password="password")
|
||||
self.assertTrue("pymongo_test1" in c.database_names())
|
||||
res = c.pymongo_test1.test.find_one(_must_use_master=True)
|
||||
res = c.pymongo_test1.test.find_one(
|
||||
read_preference=ReadPreference.PRIMARY)
|
||||
self.assertEqual("bar", res["foo"])
|
||||
finally:
|
||||
# Cleanup
|
||||
|
||||
Loading…
Reference in New Issue
Block a user