diff --git a/doc/api/pymongo/collection.rst b/doc/api/pymongo/collection.rst index 8083a229a..a37630c19 100644 --- a/doc/api/pymongo/collection.rst +++ b/doc/api/pymongo/collection.rst @@ -37,6 +37,7 @@ .. automethod:: drop .. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, slave_okay=False[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, exhaust=False, [compile_re=True, [,**kwargs]]]]]]]]]]]]]]]]]]) .. automethod:: find_one([spec_or_id=None[, *args[, **kwargs]]]) + .. automethod:: parallel_collection_scan .. automethod:: count .. automethod:: create_index .. automethod:: ensure_index diff --git a/pymongo/collection.py b/pymongo/collection.py index cb77f2ef7..df16b5080 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -859,6 +859,47 @@ class Collection(common.BaseObject): self.secondary_acceptable_latency_ms) return Cursor(self, *args, **kwargs) + def parallel_collection_scan(self, num_cursors, **kwargs): + """Scan this collection in parallel. + + Returns a list of :class:`~pymongo.command_cursor.CommandCursor` + instances that can be iterated concurrently by one or more threads + or greenlets. + + With :class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient` + or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`, + if the `read_preference` attribute of this instance is not set to + :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or the + (deprecated) `slave_okay` attribute of this instance is set to `True` + the command will be sent to a secondary or slave. + + :Parameters: + - `num_cursors`: the number of cursors to return + + .. note:: Requires server version **>= 2.5.5**. + + """ + use_master = not self.slave_okay and not self.read_preference + compile_re = kwargs.get('compile_re', False) + + command_kwargs = { + 'numCursors': num_cursors, + 'read_preference': self.read_preference, + 'tag_sets': self.tag_sets, + 'secondary_acceptable_latency_ms': ( + self.secondary_acceptable_latency_ms), + 'slave_okay': self.slave_okay, + '_use_master': use_master} + command_kwargs.update(kwargs) + + result, conn_id = self.__database._command( + "parallelCollectionScan", self.__name, **command_kwargs) + + return [CommandCursor(self, + cursor['cursor'], + conn_id, + compile_re) for cursor in result['cursors']] + def count(self): """Get the number of documents in this collection. @@ -1237,12 +1278,10 @@ class Collection(common.BaseObject): "aggregate", self.__name, **command_kwargs) if 'cursor' in result: - cursor_info = result['cursor'] return CommandCursor( self, - cursor_info['id'], + result['cursor'], conn_id, - cursor_info['firstBatch'], command_kwargs.get('compile_re', True)) else: return result diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index 8891bd4c2..04b9ffc10 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -24,21 +24,20 @@ class CommandCursor(object): """A cursor / iterator over command cursors. """ - def __init__(self, collection, cursor_id, - conn_id, initial=None, compile_re=True): + def __init__(self, collection, cursor_info, conn_id, compile_re=True): """Create a new command cursor. """ self.__collection = collection - self.__id = cursor_id + self.__id = cursor_info['id'] self.__conn_id = conn_id - self.__data = deque(initial or []) + self.__data = deque(cursor_info['firstBatch']) self.__decode_opts = ( collection.database.connection.document_class, collection.database.connection.tz_aware, collection.uuid_subtype, compile_re ) - self.__retrieved = 0 + self.__retrieved = cursor_info.get('_retrieved', 0) self.__batch_size = 0 self.__killed = False diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 872118412..5551d09c9 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -70,8 +70,7 @@ class Cursor(object): read_preference=ReadPreference.PRIMARY, tag_sets=[{}], secondary_acceptable_latency_ms=None, exhaust=False, compile_re=True, _must_use_master=False, - _uuid_subtype=None, _first_batch=None, _cursor_id=None, - _retrieved=0, **kwargs): + _uuid_subtype=None, **kwargs): """Create a new cursor. Should not be called directly by application developers - see @@ -79,8 +78,7 @@ class Cursor(object): .. mongodoc:: cursors """ - self.__id = _cursor_id - self.__is_command_cursor = _cursor_id is not None + self.__id = None if spec is None: spec = {} @@ -159,9 +157,9 @@ class Cursor(object): self.__must_use_master = _must_use_master self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype - self.__data = deque(_first_batch or []) + self.__data = deque() self.__connection_id = None - self.__retrieved = _retrieved + self.__retrieved = 0 self.__killed = False self.__query_flags = 0 @@ -213,7 +211,6 @@ class Cursor(object): be sent to the server, even if the resultant data has already been retrieved by this cursor. """ - self.__check_not_command_cursor('rewind') self.__data = deque() self.__id = None self.__connection_id = None @@ -233,7 +230,6 @@ class Cursor(object): return self._clone(True) def _clone(self, deepcopy=True): - self.__check_not_command_cursor('clone') clone = self._clone_base() values_to_clone = ("spec", "fields", "skip", "limit", "max_time_ms", "comment", "max", "min", @@ -383,13 +379,6 @@ class Cursor(object): if self.__retrieved or self.__id is not None: raise InvalidOperation("cannot set options after executing query") - def __check_not_command_cursor(self, method_name): - """Check if calling a method on this cursor is valid. - """ - if self.__is_command_cursor: - raise InvalidOperation( - "cannot call %s on a command cursor" % method_name) - def add_option(self, mask): """Set arbitrary query flags using a bitmask. @@ -703,7 +692,6 @@ class Cursor(object): """ if not isinstance(with_limit_and_skip, bool): raise TypeError("with_limit_and_skip must be an instance of bool") - self.__check_not_command_cursor('count') command = {"query": self.__spec, "fields": self.__fields} command['read_preference'] = self.__read_preference @@ -757,7 +745,6 @@ class Cursor(object): .. versionadded:: 1.2 """ - self.__check_not_command_cursor('distinct') if not isinstance(key, basestring): raise TypeError("key must be an instance " "of %s" % (basestring.__name__,)) @@ -790,7 +777,6 @@ class Cursor(object): .. mongodoc:: explain """ - self.__check_not_command_cursor('explain') c = self.clone() c.__explain = True diff --git a/test/test_collection.py b/test/test_collection.py index 52742e0fb..928a3d58a 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -69,6 +69,9 @@ class TestCollection(unittest.TestCase): def setUp(self): self.client = get_client() self.db = self.client.pymongo_test + ismaster = self.db.command('ismaster') + self.setname = ismaster.get('setName') + self.w = len(ismaster.get('hosts', [])) or 1 def tearDown(self): self.db.drop_collection("test_large_limit") @@ -1357,18 +1360,17 @@ class TestCollection(unittest.TestCase): if not version.at_least(self.db.connection, (2, 5, 1)): raise SkipTest("Aggregation cursor requires MongoDB >= 2.5.1") db = self.db - ismaster = db.command('ismaster') - setname = ismaster.get('setName') - w = len(ismaster.get('hosts', [])) or 1 - if setname: + if self.setname: db = MongoReplicaSetClient(host=self.client.host, port=self.client.port, - replicaSet=setname)[db.name] + replicaSet=self.setname)[db.name] + # Test that getMore messages are sent to the right server. db.read_preference = ReadPreference.SECONDARY for collection_size in (10, 1000): db.drop_collection("test") - db.test.insert([{'_id': i} for i in range(collection_size)], w=w) + db.test.insert([{'_id': i} for i in range(collection_size)], + w=self.w) expected_sum = sum(range(collection_size)) # Use batchSize to ensure multiple getMore messages cursor = db.test.aggregate( @@ -1379,6 +1381,28 @@ class TestCollection(unittest.TestCase): expected_sum, sum(doc['_id'] for doc in cursor)) + def test_parallel_collection_scan(self): + if not version.at_least(self.db.connection, (2, 5, 5)): + raise SkipTest("Requires MongoDB >= 2.5.5") + db = self.db + db.drop_collection("test") + if self.setname: + db = MongoReplicaSetClient(host=self.client.host, + port=self.client.port, + replicaSet=self.setname)[db.name] + # Test that getMore messages are sent to the right server. + db.read_preference = ReadPreference.SECONDARY + coll = db.test + coll.insert(({'_id': i} for i in xrange(8000)), w=self.w) + docs = [] + threads = [threading.Thread(target=docs.extend, args=(cursor,)) + for cursor in coll.parallel_collection_scan(3)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertEqual(len(docs), db.test.count()) + def test_group(self): db = self.db db.drop_collection("test") diff --git a/test/test_cursor.py b/test/test_cursor.py index 953d8d6b7..8151681bf 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -31,7 +31,6 @@ from pymongo import (ASCENDING, DESCENDING, ALL, OFF) -from pymongo.cursor import Cursor from pymongo.database import Database from pymongo.errors import (InvalidOperation, OperationFailure, @@ -992,10 +991,6 @@ class TestCursor(unittest.TestCase): self.assertEqual(50, len(list(self.db.test.find() .max_scan(90).max_scan(50)))) - def test_cursor_retrieved(self): - cursor = Cursor(self.db.test, _retrieved=10) - self.assertEqual(10, cursor._Cursor__retrieved) - def test_with_statement(self): if sys.version_info < (2, 6): raise SkipTest("With statement requires Python >= 2.6")