Add parallel_collection_scan helper PYTHON-633

This commit adds a helper for the new parallelCollectionScan
command introduced in MongoDB 2.5.5. The helper returns
"num_cursors" instances of CommandCursor that can be iterated
by one or more threads concurrently to scan the entire collection.

This commit also removes the remaining command cursor
hacks from cursor.Cursor.
This commit is contained in:
Bernie Hackett 2014-02-11 13:04:53 -08:00
parent cdacc2f4b9
commit 940d73f672
6 changed files with 81 additions and 37 deletions

View File

@ -37,6 +37,7 @@
.. automethod:: drop
.. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, slave_okay=False[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, exhaust=False, [compile_re=True, [,**kwargs]]]]]]]]]]]]]]]]]])
.. automethod:: find_one([spec_or_id=None[, *args[, **kwargs]]])
.. automethod:: parallel_collection_scan
.. automethod:: count
.. automethod:: create_index
.. automethod:: ensure_index

View File

@ -859,6 +859,47 @@ class Collection(common.BaseObject):
self.secondary_acceptable_latency_ms)
return Cursor(self, *args, **kwargs)
def parallel_collection_scan(self, num_cursors, **kwargs):
"""Scan this collection in parallel.
Returns a list of :class:`~pymongo.command_cursor.CommandCursor`
instances that can be iterated concurrently by one or more threads
or greenlets.
With :class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if the `read_preference` attribute of this instance is not set to
:attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or the
(deprecated) `slave_okay` attribute of this instance is set to `True`
the command will be sent to a secondary or slave.
:Parameters:
- `num_cursors`: the number of cursors to return
.. note:: Requires server version **>= 2.5.5**.
"""
use_master = not self.slave_okay and not self.read_preference
compile_re = kwargs.get('compile_re', False)
command_kwargs = {
'numCursors': num_cursors,
'read_preference': self.read_preference,
'tag_sets': self.tag_sets,
'secondary_acceptable_latency_ms': (
self.secondary_acceptable_latency_ms),
'slave_okay': self.slave_okay,
'_use_master': use_master}
command_kwargs.update(kwargs)
result, conn_id = self.__database._command(
"parallelCollectionScan", self.__name, **command_kwargs)
return [CommandCursor(self,
cursor['cursor'],
conn_id,
compile_re) for cursor in result['cursors']]
def count(self):
"""Get the number of documents in this collection.
@ -1237,12 +1278,10 @@ class Collection(common.BaseObject):
"aggregate", self.__name, **command_kwargs)
if 'cursor' in result:
cursor_info = result['cursor']
return CommandCursor(
self,
cursor_info['id'],
result['cursor'],
conn_id,
cursor_info['firstBatch'],
command_kwargs.get('compile_re', True))
else:
return result

View File

@ -24,21 +24,20 @@ class CommandCursor(object):
"""A cursor / iterator over command cursors.
"""
def __init__(self, collection, cursor_id,
conn_id, initial=None, compile_re=True):
def __init__(self, collection, cursor_info, conn_id, compile_re=True):
"""Create a new command cursor.
"""
self.__collection = collection
self.__id = cursor_id
self.__id = cursor_info['id']
self.__conn_id = conn_id
self.__data = deque(initial or [])
self.__data = deque(cursor_info['firstBatch'])
self.__decode_opts = (
collection.database.connection.document_class,
collection.database.connection.tz_aware,
collection.uuid_subtype,
compile_re
)
self.__retrieved = 0
self.__retrieved = cursor_info.get('_retrieved', 0)
self.__batch_size = 0
self.__killed = False

View File

@ -70,8 +70,7 @@ class Cursor(object):
read_preference=ReadPreference.PRIMARY,
tag_sets=[{}], secondary_acceptable_latency_ms=None,
exhaust=False, compile_re=True, _must_use_master=False,
_uuid_subtype=None, _first_batch=None, _cursor_id=None,
_retrieved=0, **kwargs):
_uuid_subtype=None, **kwargs):
"""Create a new cursor.
Should not be called directly by application developers - see
@ -79,8 +78,7 @@ class Cursor(object):
.. mongodoc:: cursors
"""
self.__id = _cursor_id
self.__is_command_cursor = _cursor_id is not None
self.__id = None
if spec is None:
spec = {}
@ -159,9 +157,9 @@ class Cursor(object):
self.__must_use_master = _must_use_master
self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype
self.__data = deque(_first_batch or [])
self.__data = deque()
self.__connection_id = None
self.__retrieved = _retrieved
self.__retrieved = 0
self.__killed = False
self.__query_flags = 0
@ -213,7 +211,6 @@ class Cursor(object):
be sent to the server, even if the resultant data has already been
retrieved by this cursor.
"""
self.__check_not_command_cursor('rewind')
self.__data = deque()
self.__id = None
self.__connection_id = None
@ -233,7 +230,6 @@ class Cursor(object):
return self._clone(True)
def _clone(self, deepcopy=True):
self.__check_not_command_cursor('clone')
clone = self._clone_base()
values_to_clone = ("spec", "fields", "skip", "limit", "max_time_ms",
"comment", "max", "min",
@ -383,13 +379,6 @@ class Cursor(object):
if self.__retrieved or self.__id is not None:
raise InvalidOperation("cannot set options after executing query")
def __check_not_command_cursor(self, method_name):
"""Check if calling a method on this cursor is valid.
"""
if self.__is_command_cursor:
raise InvalidOperation(
"cannot call %s on a command cursor" % method_name)
def add_option(self, mask):
"""Set arbitrary query flags using a bitmask.
@ -703,7 +692,6 @@ class Cursor(object):
"""
if not isinstance(with_limit_and_skip, bool):
raise TypeError("with_limit_and_skip must be an instance of bool")
self.__check_not_command_cursor('count')
command = {"query": self.__spec, "fields": self.__fields}
command['read_preference'] = self.__read_preference
@ -757,7 +745,6 @@ class Cursor(object):
.. versionadded:: 1.2
"""
self.__check_not_command_cursor('distinct')
if not isinstance(key, basestring):
raise TypeError("key must be an instance "
"of %s" % (basestring.__name__,))
@ -790,7 +777,6 @@ class Cursor(object):
.. mongodoc:: explain
"""
self.__check_not_command_cursor('explain')
c = self.clone()
c.__explain = True

View File

@ -69,6 +69,9 @@ class TestCollection(unittest.TestCase):
def setUp(self):
self.client = get_client()
self.db = self.client.pymongo_test
ismaster = self.db.command('ismaster')
self.setname = ismaster.get('setName')
self.w = len(ismaster.get('hosts', [])) or 1
def tearDown(self):
self.db.drop_collection("test_large_limit")
@ -1357,18 +1360,17 @@ class TestCollection(unittest.TestCase):
if not version.at_least(self.db.connection, (2, 5, 1)):
raise SkipTest("Aggregation cursor requires MongoDB >= 2.5.1")
db = self.db
ismaster = db.command('ismaster')
setname = ismaster.get('setName')
w = len(ismaster.get('hosts', [])) or 1
if setname:
if self.setname:
db = MongoReplicaSetClient(host=self.client.host,
port=self.client.port,
replicaSet=setname)[db.name]
replicaSet=self.setname)[db.name]
# Test that getMore messages are sent to the right server.
db.read_preference = ReadPreference.SECONDARY
for collection_size in (10, 1000):
db.drop_collection("test")
db.test.insert([{'_id': i} for i in range(collection_size)], w=w)
db.test.insert([{'_id': i} for i in range(collection_size)],
w=self.w)
expected_sum = sum(range(collection_size))
# Use batchSize to ensure multiple getMore messages
cursor = db.test.aggregate(
@ -1379,6 +1381,28 @@ class TestCollection(unittest.TestCase):
expected_sum,
sum(doc['_id'] for doc in cursor))
def test_parallel_collection_scan(self):
if not version.at_least(self.db.connection, (2, 5, 5)):
raise SkipTest("Requires MongoDB >= 2.5.5")
db = self.db
db.drop_collection("test")
if self.setname:
db = MongoReplicaSetClient(host=self.client.host,
port=self.client.port,
replicaSet=self.setname)[db.name]
# Test that getMore messages are sent to the right server.
db.read_preference = ReadPreference.SECONDARY
coll = db.test
coll.insert(({'_id': i} for i in xrange(8000)), w=self.w)
docs = []
threads = [threading.Thread(target=docs.extend, args=(cursor,))
for cursor in coll.parallel_collection_scan(3)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(len(docs), db.test.count())
def test_group(self):
db = self.db
db.drop_collection("test")

View File

@ -31,7 +31,6 @@ from pymongo import (ASCENDING,
DESCENDING,
ALL,
OFF)
from pymongo.cursor import Cursor
from pymongo.database import Database
from pymongo.errors import (InvalidOperation,
OperationFailure,
@ -992,10 +991,6 @@ class TestCursor(unittest.TestCase):
self.assertEqual(50, len(list(self.db.test.find()
.max_scan(90).max_scan(50))))
def test_cursor_retrieved(self):
cursor = Cursor(self.db.test, _retrieved=10)
self.assertEqual(10, cursor._Cursor__retrieved)
def test_with_statement(self):
if sys.version_info < (2, 6):
raise SkipTest("With statement requires Python >= 2.6")