diff --git a/pymongo/collection.py b/pymongo/collection.py index 24cb410aa..dba0ae2fc 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -1144,7 +1144,7 @@ class Collection(common.BaseObject): """ return Cursor(self, *args, **kwargs) - def parallel_scan(self, num_cursors): + def parallel_scan(self, num_cursors, **kwargs): """Scan this entire collection in parallel. Returns a list of up to ``num_cursors`` cursors that can be iterated @@ -1180,15 +1180,23 @@ class Collection(common.BaseObject): :Parameters: - `num_cursors`: the number of cursors to return + - `**kwargs`: additional options for the parallelCollectionScan + command can be passed as keyword arguments. .. note:: Requires server version **>= 2.5.5**. + .. versionchanged:: 3.4 + Added back support for arbitrary keyword arguments. MongoDB 3.4 + adds support for maxTimeMS as an option to the + parallelCollectionScan command. + .. versionchanged:: 3.0 Removed support for arbitrary keyword arguments, since the parallelCollectionScan command has no optional arguments. """ cmd = SON([('parallelCollectionScan', self.__name), ('numCursors', num_cursors)]) + cmd.update(kwargs) with self._socket_for_reads() as (sock_info, slave_ok): result = self._command(sock_info, cmd, slave_ok, diff --git a/test/test_collection.py b/test/test_collection.py index e7ed71955..ab2b845e4 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -42,6 +42,7 @@ from pymongo.command_cursor import CommandCursor from pymongo.cursor import CursorType from pymongo.errors import (DocumentTooLarge, DuplicateKeyError, + ExecutionTimeout, InvalidDocument, InvalidName, InvalidOperation, @@ -1559,6 +1560,22 @@ class TestCollection(IntegrationTest): set(range(8000)), set(doc['_id'] for doc in docs)) + @client_context.require_version_min(3, 3, 10) + @client_context.require_test_commands + def test_parallel_scan_max_time_ms(self): + self.client.admin.command("configureFailPoint", + "maxTimeAlwaysTimeOut", + mode="alwaysOn") + try: + self.assertRaises(ExecutionTimeout, + self.db.test.parallel_scan, + 3, + maxTimeMS=1) + finally: + self.client.admin.command("configureFailPoint", + "maxTimeAlwaysTimeOut", + mode="off") + def test_group(self): db = self.db db.drop_collection("test")