From 26139557ebea83eaae8267ac351fc1a89e513f31 Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Thu, 13 Feb 2014 09:16:53 -0800 Subject: [PATCH] parallel_collection_scan -> parallel_scan PYTHON-633 Spec change. Also added better documentation with a very basic example. --- doc/api/pymongo/collection.rst | 2 +- doc/changelog.rst | 2 +- pymongo/collection.py | 33 ++++++++++++++++++++++++++++----- test/test_collection.py | 4 ++-- 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/doc/api/pymongo/collection.rst b/doc/api/pymongo/collection.rst index a37630c19..4315b8814 100644 --- a/doc/api/pymongo/collection.rst +++ b/doc/api/pymongo/collection.rst @@ -37,7 +37,7 @@ .. automethod:: drop .. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, slave_okay=False[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, exhaust=False, [compile_re=True, [,**kwargs]]]]]]]]]]]]]]]]]]) .. automethod:: find_one([spec_or_id=None[, *args[, **kwargs]]]) - .. automethod:: parallel_collection_scan + .. automethod:: parallel_scan .. automethod:: count .. automethod:: create_index .. automethod:: ensure_index diff --git a/doc/changelog.rst b/doc/changelog.rst index f0dbf063b..8e90d1afc 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -13,7 +13,7 @@ fixes. Highlights include: :meth:`~pymongo.cursor.Cursor.max_time_ms`. - Support for writing :meth:`~pymongo.collection.Collection.aggregate` output to a collection. -- A new :meth:`~pymongo.collection.Collection.parallel_collection_scan` helper. +- A new :meth:`~pymongo.collection.Collection.parallel_scan` helper. - :class:`~pymongo.errors.OperationFailure` and its subclasses now include a :attr:`~pymongo.errors.OperationFailure.details` attribute with complete error details from the server. diff --git a/pymongo/collection.py b/pymongo/collection.py index df16b5080..f07e0031e 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -859,12 +859,35 @@ class Collection(common.BaseObject): self.secondary_acceptable_latency_ms) return Cursor(self, *args, **kwargs) - def parallel_collection_scan(self, num_cursors, **kwargs): - """Scan this collection in parallel. + def parallel_scan(self, num_cursors, **kwargs): + """Scan this entire collection in parallel. - Returns a list of :class:`~pymongo.command_cursor.CommandCursor` - instances that can be iterated concurrently by one or more threads - or greenlets. + Returns a list of up to ``num_cursors`` cursors that can be iterated + concurrently. As long as the collection is not modified during + scanning, each document appears once in one of the cursors' result + sets. + + For example, to process each document in a collection using some + thread-safe ``process_document()`` function:: + + def process_cursor(cursor): + for document in cursor: + # Some thread-safe processing function: + process_document(document) + + # Get up to 4 cursors. + cursors = collection.parallel_scan(4) + threads = [ + threading.Thread(target=process_cursor, args=(cursor,)) + for cursor in cursors] + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + # All documents have now been processed. With :class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient` or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`, diff --git a/test/test_collection.py b/test/test_collection.py index fe7a90433..56f6fd1ec 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -1381,7 +1381,7 @@ class TestCollection(unittest.TestCase): expected_sum, sum(doc['_id'] for doc in cursor)) - def test_parallel_collection_scan(self): + def test_parallel_scan(self): if not version.at_least(self.db.connection, (2, 5, 5)): raise SkipTest("Requires MongoDB >= 2.5.5") db = self.db @@ -1396,7 +1396,7 @@ class TestCollection(unittest.TestCase): coll.insert(({'_id': i} for i in xrange(8000)), w=self.w) docs = [] threads = [threading.Thread(target=docs.extend, args=(cursor,)) - for cursor in coll.parallel_collection_scan(3)] + for cursor in coll.parallel_scan(3)] for t in threads: t.start() for t in threads: