PYTHON-1066 - Add parallel_scan support for arbitrary keyword args
This commit is contained in:
parent
c44add7c90
commit
f25df9799e
@ -1144,7 +1144,7 @@ class Collection(common.BaseObject):
|
||||
"""
|
||||
return Cursor(self, *args, **kwargs)
|
||||
|
||||
def parallel_scan(self, num_cursors):
|
||||
def parallel_scan(self, num_cursors, **kwargs):
|
||||
"""Scan this entire collection in parallel.
|
||||
|
||||
Returns a list of up to ``num_cursors`` cursors that can be iterated
|
||||
@ -1180,15 +1180,23 @@ class Collection(common.BaseObject):
|
||||
|
||||
:Parameters:
|
||||
- `num_cursors`: the number of cursors to return
|
||||
- `**kwargs`: additional options for the parallelCollectionScan
|
||||
command can be passed as keyword arguments.
|
||||
|
||||
.. note:: Requires server version **>= 2.5.5**.
|
||||
|
||||
.. versionchanged:: 3.4
|
||||
Added back support for arbitrary keyword arguments. MongoDB 3.4
|
||||
adds support for maxTimeMS as an option to the
|
||||
parallelCollectionScan command.
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Removed support for arbitrary keyword arguments, since
|
||||
the parallelCollectionScan command has no optional arguments.
|
||||
"""
|
||||
cmd = SON([('parallelCollectionScan', self.__name),
|
||||
('numCursors', num_cursors)])
|
||||
cmd.update(kwargs)
|
||||
|
||||
with self._socket_for_reads() as (sock_info, slave_ok):
|
||||
result = self._command(sock_info, cmd, slave_ok,
|
||||
|
||||
@ -42,6 +42,7 @@ from pymongo.command_cursor import CommandCursor
|
||||
from pymongo.cursor import CursorType
|
||||
from pymongo.errors import (DocumentTooLarge,
|
||||
DuplicateKeyError,
|
||||
ExecutionTimeout,
|
||||
InvalidDocument,
|
||||
InvalidName,
|
||||
InvalidOperation,
|
||||
@ -1559,6 +1560,22 @@ class TestCollection(IntegrationTest):
|
||||
set(range(8000)),
|
||||
set(doc['_id'] for doc in docs))
|
||||
|
||||
@client_context.require_version_min(3, 3, 10)
|
||||
@client_context.require_test_commands
|
||||
def test_parallel_scan_max_time_ms(self):
|
||||
self.client.admin.command("configureFailPoint",
|
||||
"maxTimeAlwaysTimeOut",
|
||||
mode="alwaysOn")
|
||||
try:
|
||||
self.assertRaises(ExecutionTimeout,
|
||||
self.db.test.parallel_scan,
|
||||
3,
|
||||
maxTimeMS=1)
|
||||
finally:
|
||||
self.client.admin.command("configureFailPoint",
|
||||
"maxTimeAlwaysTimeOut",
|
||||
mode="off")
|
||||
|
||||
def test_group(self):
|
||||
db = self.db
|
||||
db.drop_collection("test")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user