parallel_collection_scan -> parallel_scan PYTHON-633

Spec change. Also added better documentation with a
very basic example.
This commit is contained in:
Bernie Hackett 2014-02-13 09:16:53 -08:00
parent 0b831cb831
commit 26139557eb
4 changed files with 32 additions and 9 deletions

View File

@ -37,7 +37,7 @@
.. automethod:: drop
.. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, slave_okay=False[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, exhaust=False, [compile_re=True, [,**kwargs]]]]]]]]]]]]]]]]]])
.. automethod:: find_one([spec_or_id=None[, *args[, **kwargs]]])
.. automethod:: parallel_collection_scan
.. automethod:: parallel_scan
.. automethod:: count
.. automethod:: create_index
.. automethod:: ensure_index

View File

@ -13,7 +13,7 @@ fixes. Highlights include:
:meth:`~pymongo.cursor.Cursor.max_time_ms`.
- Support for writing :meth:`~pymongo.collection.Collection.aggregate`
output to a collection.
- A new :meth:`~pymongo.collection.Collection.parallel_collection_scan` helper.
- A new :meth:`~pymongo.collection.Collection.parallel_scan` helper.
- :class:`~pymongo.errors.OperationFailure` and its subclasses now include
a :attr:`~pymongo.errors.OperationFailure.details` attribute with complete
error details from the server.

View File

@ -859,12 +859,35 @@ class Collection(common.BaseObject):
self.secondary_acceptable_latency_ms)
return Cursor(self, *args, **kwargs)
def parallel_collection_scan(self, num_cursors, **kwargs):
"""Scan this collection in parallel.
def parallel_scan(self, num_cursors, **kwargs):
"""Scan this entire collection in parallel.
Returns a list of :class:`~pymongo.command_cursor.CommandCursor`
instances that can be iterated concurrently by one or more threads
or greenlets.
Returns a list of up to ``num_cursors`` cursors that can be iterated
concurrently. As long as the collection is not modified during
scanning, each document appears once in one of the cursors' result
sets.
For example, to process each document in a collection using some
thread-safe ``process_document()`` function::
def process_cursor(cursor):
for document in cursor:
# Some thread-safe processing function:
process_document(document)
# Get up to 4 cursors.
cursors = collection.parallel_scan(4)
threads = [
threading.Thread(target=process_cursor, args=(cursor,))
for cursor in cursors]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# All documents have now been processed.
With :class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,

View File

@ -1381,7 +1381,7 @@ class TestCollection(unittest.TestCase):
expected_sum,
sum(doc['_id'] for doc in cursor))
def test_parallel_collection_scan(self):
def test_parallel_scan(self):
if not version.at_least(self.db.connection, (2, 5, 5)):
raise SkipTest("Requires MongoDB >= 2.5.5")
db = self.db
@ -1396,7 +1396,7 @@ class TestCollection(unittest.TestCase):
coll.insert(({'_id': i} for i in xrange(8000)), w=self.w)
docs = []
threads = [threading.Thread(target=docs.extend, args=(cursor,))
for cursor in coll.parallel_collection_scan(3)]
for cursor in coll.parallel_scan(3)]
for t in threads:
t.start()
for t in threads: