PYTHON-814 - API and behavior changes for Database.command.

Database.command no longer obeys Database.read_preference or
Database.codec_options. Explicit parameters have been added
for both options. See the ticket for a full explanation.
This commit is contained in:
Bernie Hackett 2015-01-09 13:18:36 -08:00
parent 4e13a39db9
commit 755d5c74eb
6 changed files with 116 additions and 380 deletions

View File

@ -16,5 +16,4 @@
:inherited-members:
.. autodata:: ReadPreference
.. autodata:: SECONDARY_OK_COMMANDS

View File

@ -27,8 +27,7 @@ from bson.py3compat import (iteritems,
from bson.son import SON
from pymongo import helpers, message
from pymongo.codec_options import CodecOptions
from pymongo.read_preferences import (ReadPreference,
SECONDARY_OK_COMMANDS)
from pymongo.read_preferences import ReadPreference
from pymongo.errors import (AutoReconnect,
InvalidOperation,
NotMasterError,
@ -289,38 +288,22 @@ class Cursor(object):
# Only set $readPreference if it's something other than
# PRIMARY to avoid problems with mongos versions that
# don't support read preferences.
rpref = self.__read_preference
if (self.__collection.database.connection.is_mongos and
self.__read_preference.mode != ReadPreference.PRIMARY.mode):
rpref != ReadPreference.PRIMARY):
# For maximum backwards compatibility, don't set $readPreference
# for SECONDARY_PREFERRED unless tags are in use. Just rely on
# the slaveOkay bit (set automatically if read preference is not
# PRIMARY), which has the same behavior.
mode = self.__read_preference.mode
tag_sets = self.__read_preference.tag_sets
if (mode != ReadPreference.SECONDARY_PREFERRED.mode or
tag_sets != [{}]):
operators['$readPreference'] = self.__read_preference.document
if (rpref.mode != ReadPreference.SECONDARY_PREFERRED.mode or
rpref.tag_sets != [{}]):
operators['$readPreference'] = rpref.document
if operators:
# Make a shallow copy so we can cleanly rewind or clone.
spec = self.__spec.copy()
# Only commands that can be run on secondaries should have any
# operators added to the spec. Command queries can be issued
# by db.command or calling find_one on $cmd directly
if self.collection.name == "$cmd":
# Don't change commands that can't be sent to secondaries
command_name = spec and next(iter(spec)).lower() or ""
if command_name not in SECONDARY_OK_COMMANDS:
return spec
elif command_name == 'mapreduce':
# mapreduce shouldn't be changed if its not inline
out = spec.get('out')
if (not isinstance(out, Mapping) or not
out.get('inline')):
return spec
# White-listed commands must be wrapped in $query.
if "$query" not in spec:
# $query has to come first

View File

@ -14,7 +14,6 @@
"""Database level operations."""
import collections
import warnings
from bson.code import Code
@ -22,15 +21,14 @@ from bson.dbref import DBRef
from bson.py3compat import iteritems, string_type, _unicode
from bson.son import SON
from pymongo import auth, common, helpers
from pymongo.codec_options import CodecOptions
from pymongo.collection import Collection
from pymongo.command_cursor import CommandCursor
from pymongo.errors import (CollectionInvalid,
ConfigurationError,
InvalidName,
OperationFailure)
from pymongo.read_preferences import (make_read_preference,
ReadPreference,
SECONDARY_OK_COMMANDS)
from pymongo.read_preferences import ReadPreference
from pymongo.son_manipulator import SONManipulator
@ -98,6 +96,7 @@ class Database(common.BaseObject):
_check_name(name)
self.__name = _unicode(name)
self.__cmd_name = _unicode(name + ".$cmd")
self.__connection = connection
self.__incoming_manipulators = []
@ -328,63 +327,25 @@ class Database(common.BaseObject):
son = manipulator.transform_outgoing(son, collection)
return son
def _command(self, command, value=1,
check=True, allowable_errors=None,
read_preference=None, **kwargs):
"""Internal command helper.
"""
def _command(self, command, value=1, check=True,
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
codec_options=CodecOptions(), **kwargs):
"""Internal command helper."""
if isinstance(command, string_type):
command_name = command.lower()
command = SON([(command, value)])
else:
command_name = next(iter(command)).lower()
command.update(kwargs)
orig = pref = read_preference or self.read_preference
tags = kwargs.pop('tags_sets', None)
if tags:
warnings.warn("The secondary_acceptable_latency_ms "
"and tag_sets options are deprecated",
DeprecationWarning, stacklevel=3)
mode = orig.mode
tags = tags or orig.tag_sets
orig = make_read_preference(mode, tags)
if command_name not in SECONDARY_OK_COMMANDS:
pref = ReadPreference.PRIMARY
# Special-case: mapreduce can go to secondaries only if inline
elif command_name == 'mapreduce':
out = command.get('out')
if (not isinstance(out, collections.Mapping) or not
out.get('inline')):
pref = ReadPreference.PRIMARY
# Special-case: aggregate with $out cannot go to secondaries.
elif command_name == 'aggregate':
for stage in command.get('pipeline', []):
if '$out' in stage:
pref = ReadPreference.PRIMARY
break
# Warn if mode will override read_preference.
if pref.mode != orig.mode:
warnings.warn("%s does not support %s read preference "
"and will be routed to the primary instead." %
(command_name, orig.name), UserWarning, stacklevel=3)
return helpers._command(self.connection,
self.name + ".$cmd",
return helpers._command(self.__connection,
self.__cmd_name,
command,
pref,
self.codec_options,
read_preference,
codec_options,
check,
allowable_errors)
def command(self, command, value=1,
check=True, allowable_errors=None,
read_preference=None, **kwargs):
def command(self, command, value=1, check=True,
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
codec_options=CodecOptions(), **kwargs):
"""Issue a MongoDB command.
Send command `command` to the database and return the
@ -428,19 +389,23 @@ class Database(common.BaseObject):
- `allowable_errors`: if `check` is ``True``, error messages
in this list will be ignored by error-checking
- `read_preference`: The read preference for this operation.
- `tag_sets` **DEPRECATED**
- `secondary_acceptable_latency_ms` **DEPRECATED**
- `codec_options`: A :class:`~pymongo.codec_options.CodecOptions`
instance.
- `**kwargs` (optional): additional keyword arguments will
be added to the command document before it is sent
.. note:: :meth:`command` does **not** obey :attr:`read_preference`
or :attr:`codec_options`. You must use the `read_preference` and
`codec_options` parameters instead.
.. versionchanged:: 3.0
Deprecated the `tag_sets` option.
Removed the `secondary_acceptable_latency_ms` option.
Removed the `as_class`, `fields`, `uuid_subtype`, `tag_sets`,
and `secondary_acceptable_latency_ms` option.
Removed `compile_re` option: PyMongo now always represents BSON
regular expressions as :class:`~bson.regex.Regex` objects. Use
:meth:`~bson.regex.Regex.try_compile` to attempt to convert from a
BSON regular expression to a Python regular expression object.
Removed the as_class, uuid_subtype, and fields options.
Added the `codec_options` parameter.
.. versionchanged:: 2.7
Added `compile_re` option. If set to False, PyMongo represented BSON
@ -460,7 +425,7 @@ class Database(common.BaseObject):
.. mongodoc:: commands
"""
return self._command(command, value, check, allowable_errors,
read_preference, **kwargs)[0]
read_preference, codec_options, **kwargs)[0]
def collection_names(self, include_system_collections=True):
"""Get a list of all the collection names in this database.
@ -472,9 +437,7 @@ class Database(common.BaseObject):
client = self.__connection
if client._writable_max_wire_version() > 2:
res, addr = self._command("listCollections",
cursor={},
read_preference=ReadPreference.PRIMARY)
res, addr = self._command("listCollections", cursor={})
# MongoDB 2.8rc2
if "collections" in res:
results = res["collections"]
@ -511,8 +474,7 @@ class Database(common.BaseObject):
self.__connection._purge_index(self.__name, name)
self.command("drop", _unicode(name), allowable_errors=["ns not found"],
read_preference=ReadPreference.PRIMARY)
self.command("drop", _unicode(name), allowable_errors=["ns not found"])
def validate_collection(self, name_or_collection,
scandata=False, full=False):
@ -545,8 +507,7 @@ class Database(common.BaseObject):
"%s or Collection" % (string_type.__name__,))
result = self.command("validate", _unicode(name),
scandata=scandata, full=full,
read_preference=ReadPreference.PRIMARY)
scandata=scandata, full=full)
valid = True
# Pre 1.9 results
@ -595,8 +556,7 @@ class Database(common.BaseObject):
.. mongodoc:: profiling
"""
result = self.command("profile", -1,
read_preference=ReadPreference.PRIMARY)
result = self.command("profile", -1)
assert result["was"] >= 0 and result["was"] <= 2
return result["was"]
@ -636,11 +596,9 @@ class Database(common.BaseObject):
raise TypeError("slow_ms must be an integer")
if slow_ms is not None:
self.command("profile", level, slowms=slow_ms,
read_preference=ReadPreference.PRIMARY)
self.command("profile", level, slowms=slow_ms)
else:
self.command("profile", level,
read_preference=ReadPreference.PRIMARY)
self.command("profile", level)
def profiling_info(self):
"""Returns a list containing current profiling information.
@ -662,8 +620,7 @@ class Database(common.BaseObject):
warnings.warn("Database.error() is deprecated",
DeprecationWarning, stacklevel=2)
error = self.command("getlasterror",
read_preference=ReadPreference.PRIMARY)
error = self.command("getlasterror")
error_msg = error.get("err", "")
if error_msg is None:
return None
@ -690,8 +647,7 @@ class Database(common.BaseObject):
warnings.warn("last_status() is deprecated",
DeprecationWarning, stacklevel=2)
return self.command("getlasterror",
read_preference=ReadPreference.PRIMARY)
return self.command("getlasterror")
def previous_error(self):
"""**DEPRECATED**: Get the most recent error on this database.
@ -710,8 +666,7 @@ class Database(common.BaseObject):
warnings.warn("previous_error() is deprecated",
DeprecationWarning, stacklevel=2)
error = self.command("getpreverror",
read_preference=ReadPreference.PRIMARY)
error = self.command("getpreverror")
if error.get("err", 0) is None:
return None
return error
@ -732,8 +687,7 @@ class Database(common.BaseObject):
warnings.warn("reset_error_history() is deprecated",
DeprecationWarning, stacklevel=2)
self.command("reseterror",
read_preference=ReadPreference.PRIMARY)
self.command("reseterror")
def __iter__(self):
return self
@ -789,8 +743,7 @@ class Database(common.BaseObject):
else:
command_name = "updateUser"
self.command(command_name, name,
read_preference=ReadPreference.PRIMARY, **opts)
self.command(command_name, name, **opts)
def _legacy_add_user(self, name, password, read_only, **kwargs):
"""Uses v1 system to add users, i.e. saving to system.users.
@ -859,8 +812,7 @@ class Database(common.BaseObject):
"read_only and roles together")
try:
uinfo = self.command("usersInfo", name,
read_preference=ReadPreference.PRIMARY)
uinfo = self.command("usersInfo", name)
# Create the user if not found in uinfo, otherwise update one.
self._create_or_update_user(
(not uinfo["users"]), name, password, read_only, **kwargs)
@ -890,7 +842,6 @@ class Database(common.BaseObject):
try:
self.command("dropUser", name,
read_preference=ReadPreference.PRIMARY,
writeConcern=self._get_wc_override())
except OperationFailure as exc:
# See comment in add_user try / except above.
@ -1033,9 +984,7 @@ class Database(common.BaseObject):
if not isinstance(code, Code):
code = Code(code)
result = self.command("$eval", code,
read_preference=ReadPreference.PRIMARY,
args=args)
result = self.command("$eval", code, args=args)
return result.get("retval", None)
def __call__(self, *args, **kwargs):

View File

@ -344,16 +344,6 @@ def read_pref_mode_from_name(name):
return _MONGOS_MODES.index(name)
SECONDARY_OK_COMMANDS = frozenset([
"group", "aggregate", "collstats", "dbstats", "count", "distinct",
"geonear", "geosearch", "geowalk", "mapreduce", "getnonce", "authenticate",
"text", "parallelcollectionscan"
])
"""Commands that may be sent to replica-set secondaries, depending on
ReadPreference and tags. All other commands are always run on the primary.
"""
class MovingAverage(object):
"""Tracks an exponentially-weighted moving average."""
def __init__(self):

View File

@ -897,18 +897,6 @@ class TestDatabase(IntegrationTest):
self.assertEqual('outer', str(context.exception))
def test_command_read_pref_warning(self):
with warnings.catch_warnings():
warnings.simplefilter("error", UserWarning)
self.assertRaises(UserWarning, self.client.pymongo_test.command,
'ping', read_preference=ReadPreference.SECONDARY)
try:
self.client.pymongo_test.command(
'dbStats',
read_preference=ReadPreference.SECONDARY_PREFERRED)
except UserWarning:
self.fail("Shouldn't have raised UserWarning.")
@client_context.require_version_min(2, 5, 3, -1)
@client_context.require_test_commands
def test_command_max_time_ms(self):

View File

@ -15,19 +15,16 @@
"""Test the replica_set_connection module."""
import random
import sys
import warnings
sys.path[0:0] = [""]
from bson.py3compat import MAXSIZE
from bson.son import SON
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.mongo_client import MongoClient
from pymongo.read_preferences import (ReadPreference, MovingAverage,
Primary, PrimaryPreferred,
Secondary, SecondaryPreferred,
Nearest, ServerMode,
SECONDARY_OK_COMMANDS)
Nearest, ServerMode)
from pymongo.server_selectors import any_server_selector
from pymongo.server_type import SERVER_TYPE
from pymongo.errors import ConfigurationError
@ -36,7 +33,6 @@ from test.test_replica_set_client import TestReplicaSetClientBase
from test import (client_context,
host,
port,
SkipTest,
unittest,
utils,
IntegrationTest,
@ -213,6 +209,15 @@ class ReadPrefTester(MongoClient):
server, fn, *args, **kwargs)
_PREF_MAP = [
(Primary, SERVER_TYPE.RSPrimary),
(PrimaryPreferred, SERVER_TYPE.RSPrimary),
(Secondary, SERVER_TYPE.RSSecondary),
(SecondaryPreferred, SERVER_TYPE.RSSecondary),
(Nearest, 'any')
]
class TestCommandAndReadPreference(TestReplicaSetClientBase):
def setUp(self):
@ -246,163 +251,47 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
self.assertEqual(SERVER_TYPE._fields[server_type],
SERVER_TYPE._fields[server.description.server_type])
def _test_fn(self, obedient, fn):
if not obedient:
for mode in ReadPreference:
self.c.read_preference = mode
def _test_fn(self, server_type, fn):
for _ in range(10):
if server_type == 'any':
used = set()
for _ in range(1000):
server = self.executed_on_which_server(self.c, fn)
used.add(server.description.address)
if len(used) == len(self.c.secondaries) + 1:
# Success
break
# Run it a few times to make sure we don't just get lucky the
# first time.
for _ in range(10):
self.assertExecutedOn(SERVER_TYPE.RSPrimary, self.c, fn)
else:
for mode, server_type in [
(Primary, SERVER_TYPE.RSPrimary),
(PrimaryPreferred, SERVER_TYPE.RSPrimary),
(Secondary, SERVER_TYPE.RSSecondary),
(SecondaryPreferred, SERVER_TYPE.RSSecondary),
(Nearest, 'any'),
]:
self.c.read_preference = mode()
for i in range(10):
if server_type == 'any':
used = set()
for j in range(1000):
server = self.executed_on_which_server(self.c, fn)
used.add(server.description.address)
if len(used) == len(self.c.secondaries) + 1:
# Success
break
unused = self.c.secondaries.union(
set([self.c.primary])
).difference(used)
if unused:
self.fail(
"Some members not used for NEAREST: %s" % (
unused))
else:
self.assertExecutedOn(server_type, self.c, fn)
unused = self.c.secondaries.union(
set([self.c.primary])
).difference(used)
if unused:
self.fail(
"Some members not used for NEAREST: %s" % (
unused))
else:
self.assertExecutedOn(server_type, self.c, fn)
def _test_primary_helper(self, func):
# Helpers that ignore read preference.
self._test_fn(SERVER_TYPE.RSPrimary, func)
def _test_coll_helper(self, secondary_ok, coll, meth, *args, **kwargs):
for mode, server_type in _PREF_MAP:
new_coll = coll.with_options(read_preference=mode())
func = lambda: getattr(new_coll, meth)(*args, **kwargs)
if secondary_ok:
self._test_fn(server_type, func)
else:
self._test_fn(SERVER_TYPE.RSPrimary, func)
def test_command(self):
# Test generic 'command' method. Some commands obey read preference,
# most don't.
# Disobedient commands, always go to primary
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
self._test_fn(False, lambda: self.c.pymongo_test.command('ping'))
self._test_fn(False, lambda: self.c.admin.command('buildinfo'))
# Obedient commands.
self._test_fn(True, lambda: self.c.pymongo_test.command('group', {
'ns': 'test', 'key': {'a': 1}, '$reduce': 'function(obj, prev) { }',
'initial': {}}))
self._test_fn(True, lambda: self.c.pymongo_test.command('dbStats'))
# collStats fails if no collection
self.c.pymongo_test.test.insert({}, w=self.w)
self._test_fn(True, lambda: self.c.pymongo_test.command(
'collStats', 'test'))
# Count
self._test_fn(True, lambda: self.c.pymongo_test.command(
'count', 'test'))
self._test_fn(True, lambda: self.c.pymongo_test.command(
'count', 'test', query={'a': 1}))
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
('count', 'test'), ('query', {'a': 1})])))
# Distinct
self._test_fn(True, lambda: self.c.pymongo_test.command(
'distinct', 'test', key='a'))
self._test_fn(True, lambda: self.c.pymongo_test.command(
'distinct', 'test', key='a', query={'a': 1}))
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
('distinct', 'test'), ('key', 'a'), ('query', {'a': 1})])))
# Geo stuff.
self.c.pymongo_test.test.create_index([('location', '2d')])
self.c.pymongo_test.test.create_index([('location', 'geoHaystack'),
('key', 1)], bucketSize=100)
# Attempt to await replication of indexes.
self.c.pymongo_test.test2.insert({}, w=self.w)
self.c.pymongo_test.test2.remove({}, w=self.w)
self._test_fn(True, lambda: self.c.pymongo_test.command(
'geoNear', 'test', near=[0, 0]))
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
('geoNear', 'test'), ('near', [0, 0])])))
self._test_fn(True, lambda: self.c.pymongo_test.command(
'geoSearch', 'test', near=[33, 33], maxDistance=6,
search={'type': 'restaurant' }, limit=30))
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
('geoSearch', 'test'), ('near', [33, 33]), ('maxDistance', 6),
('search', {'type': 'restaurant'}), ('limit', 30)])))
if self.client_version.at_least(2, 1, 0):
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
('aggregate', 'test'),
('pipeline', [])
])))
# 'text' command introduced in 2.3 and removed in 2.8.
if (self.client_version.at_least(2, 3, 2)
and not self.client_version.at_least(2, 7, 9)):
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
utils.enable_text_search(self.c)
db = self.c.pymongo_test
# Only way to create an index and wait for all members to build it.
db.test.create_index([('t', 'text')])
db.test.insert({}, w=self.w)
db.test.remove({}, w=self.w)
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
('text', 'test'),
('search', 'foo')])))
self.c.pymongo_test.test.drop_indexes()
def test_map_reduce_command(self):
# mapreduce fails if no collection
self.c.pymongo_test.test.insert({}, w=self.w)
# Non-inline mapreduce always goes to primary, doesn't obey read prefs.
# Test with command in a SON and with kwargs
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
self._test_fn(False, lambda: self.c.pymongo_test.command(SON([
('mapreduce', 'test'),
('map', 'function() { }'),
('reduce', 'function() { }'),
('out', 'mr_out')
])))
self._test_fn(False, lambda: self.c.pymongo_test.command(
'mapreduce', 'test', map='function() { }',
reduce='function() { }', out='mr_out'))
self._test_fn(False, lambda: self.c.pymongo_test.command(
'mapreduce', 'test', map='function() { }',
reduce='function() { }', out={'replace': 'some_collection'}))
# Inline mapreduce obeys read prefs
self._test_fn(True, lambda: self.c.pymongo_test.command(
'mapreduce', 'test', map='function() { }',
reduce='function() { }', out={'inline': True}))
self._test_fn(True, lambda: self.c.pymongo_test.command(SON([
('mapreduce', 'test'),
('map', 'function() { }'),
('reduce', 'function() { }'),
('out', {'inline': True})
])))
# Test that the generic command helper obeys the read preference
# passed to it.
for mode, server_type in _PREF_MAP:
func = lambda: self.c.pymongo_test.command('dbStats',
read_preference=mode())
self._test_fn(server_type, func)
@client_context.require_version_min(2, 5, 2)
def test_aggregate_command_with_out(self):
@ -412,85 +301,62 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
self.c.pymongo_test.test.insert({"x": 2, "y": 1}, w=self.w)
self.c.pymongo_test.test.insert({"x": 2, "y": 2}, w=self.w)
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
# Aggregate with $out always goes to primary, doesn't obey
# read prefs.
# Test aggregate command sent directly to db.command.
self._test_fn(False, lambda: self.c.pymongo_test.command(
"aggregate", "test",
pipeline=[{"$match": {"x": 1}}, {"$out": "agg_out"}]
))
# Test aggregate when sent through the collection aggregate
# function.
self._test_fn(False, lambda: self.c.pymongo_test.test.aggregate(
[{"$match": {"x": 2}}, {"$out": "agg_out"}]
))
# Test aggregate when sent through the collection aggregate
# function. Aggregate with $out always goes to primary, doesn't obey
# read prefs.
self._test_coll_helper(False, self.c.pymongo_test.test, 'aggregate',
[{"$match": {"x": 2}}, {"$out": "agg_out"}])
self.c.pymongo_test.drop_collection("test")
self.c.pymongo_test.drop_collection("agg_out")
def test_create_collection(self):
# Collections should be created on primary, obviously
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
self._test_fn(False, lambda: self.c.pymongo_test.command(
'create', 'some_collection%s' % random.randint(0, MAXSIZE)))
self._test_fn(False, lambda: self.c.pymongo_test.create_collection(
self._test_primary_helper(
lambda: self.c.pymongo_test.create_collection(
'some_collection%s' % random.randint(0, MAXSIZE)))
def test_drop_collection(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
self._test_fn(False, lambda: self.c.pymongo_test.drop_collection(
'some_collection'))
self._test_primary_helper(
lambda: self.c.pymongo_test.drop_collection('some_collection'))
self._test_fn(False,
lambda: self.c.pymongo_test.some_collection.drop())
self._test_primary_helper(
lambda: self.c.pymongo_test.some_collection.drop())
def test_group(self):
self._test_fn(True, lambda: self.c.pymongo_test.test.group(
{'a': 1}, {}, {}, 'function() { }'))
self._test_coll_helper(True, self.c.pymongo_test.test, 'group',
{'a': 1}, {}, {}, 'function() { }')
def test_map_reduce(self):
# mapreduce fails if no collection
self.c.pymongo_test.test.insert({}, w=self.w)
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
self._test_fn(False, lambda: self.c.pymongo_test.test.map_reduce(
'function() { }', 'function() { }', 'mr_out'))
self._test_coll_helper(False, self.c.pymongo_test.test, 'map_reduce',
'function() { }', 'function() { }', 'mr_out')
self._test_fn(True, lambda: self.c.pymongo_test.test.map_reduce(
'function() { }', 'function() { }', {'inline': 1}))
self._test_coll_helper(True, self.c.pymongo_test.test, 'map_reduce',
'function() { }', 'function() { }',
{'inline': 1})
def test_inline_map_reduce(self):
# mapreduce fails if no collection
self.c.pymongo_test.test.insert({}, w=self.w)
self._test_fn(True, lambda: self.c.pymongo_test.test.inline_map_reduce(
'function() { }', 'function() { }'))
self._test_fn(True, lambda: self.c.pymongo_test.test.inline_map_reduce(
'function() { }', 'function() { }', full_response=True))
self._test_coll_helper(True, self.c.pymongo_test.test,
'inline_map_reduce',
'function() { }', 'function() { }')
def test_count(self):
self._test_fn(True, lambda: self.c.pymongo_test.test.count())
self._test_fn(True, lambda: self.c.pymongo_test.test.find().count())
self._test_coll_helper(True, self.c.pymongo_test.test, 'count')
def test_distinct(self):
self._test_fn(True, lambda: self.c.pymongo_test.test.distinct('a'))
self._test_fn(True,
lambda: self.c.pymongo_test.test.find().distinct('a'))
self._test_coll_helper(True, self.c.pymongo_test.test, 'distinct', 'a')
def test_aggregate(self):
if self.client_version.at_least(2, 1, 0):
self._test_fn(True,
lambda: self.c.pymongo_test.test.aggregate(
[{'$project': {'_id': 1}}]))
self._test_coll_helper(True, self.c.pymongo_test.test,
'aggregate',
[{'$project': {'_id': 1}}])
class TestMovingAverage(unittest.TestCase):
@ -592,45 +458,6 @@ class TestMongosConnection(IntegrationTest):
self.assertFalse(
'$readPreference' in cursor._Cursor__query_spec())
def test_only_secondary_ok_commands_have_read_prefs(self):
c = single_client(host, port, read_preference=ReadPreference.SECONDARY)
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
is_mongos = utils.is_mongos(c)
if not is_mongos:
raise SkipTest("Only mongos have read_prefs added to the spec")
# Ensure secondary_ok_commands have readPreference
for cmd in SECONDARY_OK_COMMANDS:
if cmd == 'mapreduce': # map reduce is a special case
continue
command = SON([(cmd, 1)])
cursor = c.pymongo_test["$cmd"].find(command.copy())
# White-listed commands also have to be wrapped in $query
command = SON([('$query', command)])
command['$readPreference'] = {'mode': 'secondary'}
self.assertEqual(command, cursor._Cursor__query_spec())
# map_reduce inline should have read prefs
command = SON([('mapreduce', 'test'), ('out', {'inline': 1})])
cursor = c.pymongo_test["$cmd"].find(command.copy())
# White-listed commands also have to be wrapped in $query
command = SON([('$query', command)])
command['$readPreference'] = {'mode': 'secondary'}
self.assertEqual(command, cursor._Cursor__query_spec())
# map_reduce that outputs to a collection shouldn't have read prefs
command = SON([('mapreduce', 'test'), ('out', {'mrtest': 1})])
cursor = c.pymongo_test["$cmd"].find(command.copy())
self.assertEqual(command, cursor._Cursor__query_spec())
# Other commands shouldn't be changed
for cmd in ('drop', 'create', 'any-future-cmd'):
command = SON([(cmd, 1)])
cursor = c.pymongo_test["$cmd"].find(command.copy())
self.assertEqual(command, cursor._Cursor__query_spec())
if __name__ == "__main__":
unittest.main()