diff --git a/doc/api/pymongo/command_cursor.rst b/doc/api/pymongo/command_cursor.rst new file mode 100644 index 000000000..9c84f19a6 --- /dev/null +++ b/doc/api/pymongo/command_cursor.rst @@ -0,0 +1,7 @@ +:mod:`command_cursor` -- Tools for iterating over MongoDB command results +========================================================================= + +.. automodule:: pymongo.command_cursor + :synopsis: Tools for iterating over MongoDB command results + :members: + diff --git a/doc/api/pymongo/index.rst b/doc/api/pymongo/index.rst index f1240c86c..d187932a3 100644 --- a/doc/api/pymongo/index.rst +++ b/doc/api/pymongo/index.rst @@ -31,6 +31,7 @@ Sub-modules: connection database collection + command_cursor cursor bulk errors diff --git a/pymongo/collection.py b/pymongo/collection.py index 0ebb82057..cb77f2ef7 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -23,6 +23,7 @@ from pymongo import (bulk, common, helpers, message) +from pymongo.command_cursor import CommandCursor from pymongo.cursor import Cursor from pymongo.errors import InvalidName, OperationFailure from pymongo.helpers import _check_write_command_response @@ -1196,13 +1197,17 @@ class Collection(common.BaseObject): With server version **>= 2.5.1**, pass ``cursor={}`` to retrieve unlimited aggregation results - with a :class:`~pymongo.cursor.Cursor`:: + with a :class:`~pymongo.command_cursor.CommandCursor`:: pipeline = [{'$project': {'name': {'$toUpper': '$name'}}}] cursor = collection.aggregate(pipeline, cursor={}) for doc in cursor: print doc + .. versionchanged:: 2.7 + When the cursor option is used, return + :class:`~pymongo.command_cursor.CommandCursor` instead of + :class:`~pymongo.cursor.Cursor`. .. versionchanged:: 2.6 Added cursor support. .. versionadded:: 2.3 @@ -1228,17 +1233,19 @@ class Collection(common.BaseObject): '_use_master': use_master} command_kwargs.update(kwargs) - command_response = self.__database.command( + result, conn_id = self.__database._command( "aggregate", self.__name, **command_kwargs) - if 'cursor' in command_response: - cursor_info = command_response['cursor'] - return Cursor( + if 'cursor' in result: + cursor_info = result['cursor'] + return CommandCursor( self, - _first_batch=cursor_info['firstBatch'], - _cursor_id=cursor_info['id']) + cursor_info['id'], + conn_id, + cursor_info['firstBatch'], + command_kwargs.get('compile_re', True)) else: - return command_response + return result # TODO key and condition ought to be optional, but deprecation # could be painful as argument order would have to change. diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py new file mode 100644 index 000000000..8891bd4c2 --- /dev/null +++ b/pymongo/command_cursor.py @@ -0,0 +1,162 @@ +# Copyright 2014 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CommandCursor class to iterate over command results.""" + +from collections import deque + +from pymongo import helpers, message +from pymongo.errors import AutoReconnect + + +class CommandCursor(object): + """A cursor / iterator over command cursors. + """ + + def __init__(self, collection, cursor_id, + conn_id, initial=None, compile_re=True): + """Create a new command cursor. + """ + self.__collection = collection + self.__id = cursor_id + self.__conn_id = conn_id + self.__data = deque(initial or []) + self.__decode_opts = ( + collection.database.connection.document_class, + collection.database.connection.tz_aware, + collection.uuid_subtype, + compile_re + ) + self.__retrieved = 0 + self.__batch_size = 0 + self.__killed = False + + def __del__(self): + if self.__id and not self.__killed: + self.__die() + + def __die(self): + """Closes this cursor. + """ + if self.__id and not self.__killed: + client = self.__collection.database.connection + if self.__conn_id is not None: + client.close_cursor(self.__id, self.__conn_id) + else: + client.close_cursor(self.__id) + self.__killed = True + + def close(self): + """Explicitly close / kill this cursor. Required for PyPy, Jython and + other Python implementations that don't use reference counting + garbage collection. + """ + self.__die() + + def batch_size(self, batch_size): + """Limits the number of documents returned in one batch. Each batch + requires a round trip to the server. It can be adjusted to optimize + performance and limit data transfer. + + .. note:: batch_size can not override MongoDB's internal limits on the + amount of data it will return to the client in a single batch (i.e + if you set batch size to 1,000,000,000, MongoDB will currently only + return 4-16MB of results per batch). + + Raises :exc:`TypeError` if `batch_size` is not an integer. + Raises :exc:`ValueError` if `batch_size` is less than ``0``. + + :Parameters: + - `batch_size`: The size of each batch of results requested. + """ + if not isinstance(batch_size, (int, long)): + raise TypeError("batch_size must be an integer") + if batch_size < 0: + raise ValueError("batch_size must be >= 0") + + self.__batch_size = batch_size == 1 and 2 or batch_size + return self + + def __send_message(self, msg): + """Send a getmore message and handle the response. + """ + client = self.__collection.database.connection + try: + res = client._send_message_with_response( + msg, _connection_to_use=self.__conn_id) + self.__conn_id, (response, dummy0, dummy1) = res + except AutoReconnect: + # Don't try to send kill cursors on another socket + # or to another server. It can cause a _pinValue + # assertion on some server releases if we get here + # due to a socket timeout. + self.__killed = True + raise + + try: + response = helpers._unpack_response(response, + self.__id, + *self.__decode_opts) + except AutoReconnect: + # Don't send kill cursors to another server after a "not master" + # error. It's completely pointless. + self.__killed = True + client.disconnect() + raise + self.__id = response["cursor_id"] + + assert response["starting_from"] == self.__retrieved, ( + "Result batch started from %s, expected %s" % ( + response['starting_from'], self.__retrieved)) + + self.__retrieved += response["number_returned"] + self.__data = deque(response["data"]) + + def __refresh(self): + """Refreshes the cursor with more data from the server. + + Returns the length of self.__data after refresh. Will exit early if + self.__data is already non-empty. Raises OperationFailure when the + cursor cannot be refreshed due to an error on the query. + """ + if len(self.__data) or self.__killed: + return len(self.__data) + + if self.__id: # Get More + self.__send_message( + message.get_more(self.__collection.full_name, + self.__batch_size, self.__id)) + + else: # Cursor id is zero nothing else to return + self.__killed = True + + return len(self.__data) + + def __iter__(self): + return self + + def next(self): + """Advance the cursor. + """ + if len(self.__data) or self.__refresh(): + coll = self.__collection + return coll.database._fix_incoming(self.__data.popleft(), coll) + else: + raise StopIteration + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.__die() diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 556295c3f..872118412 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -189,6 +189,17 @@ class Cursor(object): """ return self.__collection + @property + def _connection_id(self): + """The server/client/pool this cursor lives on. + + Could be (host, port), -1, or None depending on what + client class executed the initial query or this cursor + being advanced at all. + """ + # TODO: Make this public after sorting out client issues. + return self.__connection_id + def __del__(self): if self.__id and not self.__killed: self.__die() diff --git a/pymongo/database.py b/pymongo/database.py index d35d2a5e9..fd9428390 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -271,6 +271,77 @@ class Database(common.BaseObject): son = manipulator.transform_outgoing(son, collection) return son + def _command(self, command, value=1, + check=True, allowable_errors=None, + uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, **kwargs): + """Internal command helper. + """ + + if isinstance(command, basestring): + command = SON([(command, value)]) + + command_name = command.keys()[0].lower() + must_use_master = kwargs.pop('_use_master', False) + if command_name not in rp.secondary_ok_commands: + must_use_master = True + + # Special-case: mapreduce can go to secondaries only if inline + if command_name == 'mapreduce': + out = command.get('out') or kwargs.get('out') + if not isinstance(out, dict) or not out.get('inline'): + must_use_master = True + + # Special-case: aggregate with $out cannot go to secondaries. + if command_name == 'aggregate': + for stage in kwargs.get('pipeline', []): + if '$out' in stage: + must_use_master = True + break + + extra_opts = { + 'as_class': kwargs.pop('as_class', None), + 'slave_okay': kwargs.pop('slave_okay', self.slave_okay), + '_must_use_master': must_use_master, + '_uuid_subtype': uuid_subtype + } + + extra_opts['read_preference'] = kwargs.pop( + 'read_preference', + self.read_preference) + extra_opts['tag_sets'] = kwargs.pop( + 'tag_sets', + self.tag_sets) + extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop( + 'secondary_acceptable_latency_ms', + self.secondary_acceptable_latency_ms) + extra_opts['compile_re'] = compile_re + + fields = kwargs.get('fields') + if fields is not None and not isinstance(fields, dict): + kwargs['fields'] = helpers._fields_list_to_dict(fields) + + command.update(kwargs) + + # Warn if must_use_master will override read_preference. + if (extra_opts['read_preference'] != rp.ReadPreference.PRIMARY and + extra_opts['_must_use_master']): + warnings.warn("%s does not support %s read preference " + "and will be routed to the primary instead." % + (command_name, + rp.modes[extra_opts['read_preference']]), + UserWarning) + + cursor = self["$cmd"].find(command, **extra_opts).limit(-1) + for doc in cursor: + result = doc + + if check: + msg = "command %s failed: %%s" % repr(command).replace("%", "%%") + helpers._check_command_response(result, self.connection.disconnect, + msg, allowable_errors) + + return result, cursor._connection_id + def command(self, command, value=1, check=True, allowable_errors=[], uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, **kwargs): @@ -360,69 +431,8 @@ class Database(common.BaseObject): .. mongodoc:: commands .. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold """ - - if isinstance(command, basestring): - command = SON([(command, value)]) - - command_name = command.keys()[0].lower() - must_use_master = kwargs.pop('_use_master', False) - if command_name not in rp.secondary_ok_commands: - must_use_master = True - - # Special-case: mapreduce can go to secondaries only if inline - if command_name == 'mapreduce': - out = command.get('out') or kwargs.get('out') - if not isinstance(out, dict) or not out.get('inline'): - must_use_master = True - - # Special-case: aggregate with $out cannot go to secondaries. - if command_name == 'aggregate': - for stage in kwargs.get('pipeline', []): - if '$out' in stage: - must_use_master = True - break - - extra_opts = { - 'as_class': kwargs.pop('as_class', None), - 'slave_okay': kwargs.pop('slave_okay', self.slave_okay), - '_must_use_master': must_use_master, - '_uuid_subtype': uuid_subtype - } - - extra_opts['read_preference'] = kwargs.pop( - 'read_preference', - self.read_preference) - extra_opts['tag_sets'] = kwargs.pop( - 'tag_sets', - self.tag_sets) - extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop( - 'secondary_acceptable_latency_ms', - self.secondary_acceptable_latency_ms) - extra_opts['compile_re'] = compile_re - - fields = kwargs.get('fields') - if fields is not None and not isinstance(fields, dict): - kwargs['fields'] = helpers._fields_list_to_dict(fields) - - command.update(kwargs) - - # Warn if must_use_master will override read_preference. - if (extra_opts['read_preference'] != rp.ReadPreference.PRIMARY and - extra_opts['_must_use_master']): - warnings.warn("%s does not support %s read preference " - "and will be routed to the primary instead." % - (command_name, - rp.modes[extra_opts['read_preference']]), - UserWarning) - - result = self["$cmd"].find_one(command, **extra_opts) - - if check: - msg = "command %s failed: %%s" % repr(command).replace("%", "%%") - helpers._check_command_response(result, self.connection.disconnect, - msg, allowable_errors) - - return result + return self._command(command, value, check, allowable_errors, + uuid_subtype, compile_re, **kwargs)[0] def collection_names(self, include_system_collections=True): """Get a list of all the collection names in this database. diff --git a/test/test_collection.py b/test/test_collection.py index 9d82d2e53..52742e0fb 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -41,7 +41,9 @@ from pymongo import (ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE, HASHED) from pymongo import message as message_module from pymongo.collection import Collection -from pymongo.cursor import Cursor +from pymongo.command_cursor import CommandCursor +from pymongo.mongo_replica_set_client import MongoReplicaSetClient +from pymongo.read_preferences import ReadPreference from pymongo.son_manipulator import SONManipulator from pymongo.errors import (DocumentTooLarge, DuplicateKeyError, @@ -1349,25 +1351,29 @@ class TestCollection(unittest.TestCase): db = self.db projection = {'$project': {'_id': '$_id'}} cursor = db.test.aggregate(projection, cursor={}) - self.assertTrue(isinstance(cursor, Cursor)) - self.assertRaises(InvalidOperation, cursor.rewind) - self.assertRaises(InvalidOperation, cursor.clone) - self.assertRaises(InvalidOperation, cursor.count) - self.assertRaises(InvalidOperation, cursor.explain) + self.assertTrue(isinstance(cursor, CommandCursor)) def test_aggregation_cursor(self): if not version.at_least(self.db.connection, (2, 5, 1)): raise SkipTest("Aggregation cursor requires MongoDB >= 2.5.1") db = self.db + ismaster = db.command('ismaster') + setname = ismaster.get('setName') + w = len(ismaster.get('hosts', [])) or 1 + if setname: + db = MongoReplicaSetClient(host=self.client.host, + port=self.client.port, + replicaSet=setname)[db.name] + db.read_preference = ReadPreference.SECONDARY - # A small collection which returns only an initial batch, - # and a larger one that requires a getMore. for collection_size in (10, 1000): db.drop_collection("test") - db.test.insert([{'_id': i} for i in range(collection_size)]) + db.test.insert([{'_id': i} for i in range(collection_size)], w=w) expected_sum = sum(range(collection_size)) + # Use batchSize to ensure multiple getMore messages cursor = db.test.aggregate( - {'$project': {'_id': '$_id'}}, cursor={}) + {'$project': {'_id': '$_id'}}, + cursor={'batchSize': 5}) self.assertEqual( expected_sum,