diff --git a/doc/api/pymongo/collection.rst b/doc/api/pymongo/collection.rst index d68653070..dddf8dc34 100644 --- a/doc/api/pymongo/collection.rst +++ b/doc/api/pymongo/collection.rst @@ -68,7 +68,6 @@ .. automethod:: options .. automethod:: map_reduce .. automethod:: inline_map_reduce - .. automethod:: parallel_scan .. automethod:: initialize_unordered_bulk_op .. automethod:: initialize_ordered_bulk_op .. automethod:: group diff --git a/doc/changelog.rst b/doc/changelog.rst index 904db340f..943ec9625 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -15,13 +15,14 @@ Breaking Changes in 4.0 ....................... - Removed support for Python 2.7, 3.4, and 3.5. Python 3.6+ is now required. -- Removed :meth:`~pymongo.database.Database.eval`, - :data:`~pymongo.database.Database.system_js` and - :class:`~pymongo.database.SystemJS`. +- Removed :meth:`pymongo.database.Database.eval`, + :data:`pymongo.database.Database.system_js` and + :class:`pymongo.database.SystemJS`. - Removed :meth:`pymongo.mongo_client.MongoClient.fsync`, :meth:`pymongo.mongo_client.MongoClient.unlock`, and :attr:`pymongo.mongo_client.MongoClient.is_locked`. -- Removed :mod:`~pymongo.thread_util`. +- Removed :meth:`pymongo.collection.Collection.parallel_scan`. +- Removed :mod:`pymongo.thread_util`. Notable improvements .................... diff --git a/doc/migrate-to-pymongo4.rst b/doc/migrate-to-pymongo4.rst index b0f747679..8871f80e0 100644 --- a/doc/migrate-to-pymongo4.rst +++ b/doc/migrate-to-pymongo4.rst @@ -116,3 +116,12 @@ can be changed to this:: >>> from bson.code import Code >>> result = database.command('eval', Code('function (x) {return x;}'), args=[3]).get('retval') + + +Collection.parallel_scan is removed +................................... + +Removed :meth:`~pymongo.collection.Collection.parallel_scan`. MongoDB 4.2 +removed the `parallelCollectionScan command`_. There is no replacement. + +.. _parallelCollectionScan command: https://docs.mongodb.com/manual/reference/command/parallelCollectionScan/ diff --git a/pymongo/collection.py b/pymongo/collection.py index 4adce0a38..025c1723e 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -1559,93 +1559,6 @@ class Collection(common.BaseObject): return RawBatchCursor(self, *args, **kwargs) - def parallel_scan(self, num_cursors, session=None, **kwargs): - """**DEPRECATED**: Scan this entire collection in parallel. - - Returns a list of up to ``num_cursors`` cursors that can be iterated - concurrently. As long as the collection is not modified during - scanning, each document appears once in one of the cursors result - sets. - - For example, to process each document in a collection using some - thread-safe ``process_document()`` function: - - >>> def process_cursor(cursor): - ... for document in cursor: - ... # Some thread-safe processing function: - ... process_document(document) - >>> - >>> # Get up to 4 cursors. - ... - >>> cursors = collection.parallel_scan(4) - >>> threads = [ - ... threading.Thread(target=process_cursor, args=(cursor,)) - ... for cursor in cursors] - >>> - >>> for thread in threads: - ... thread.start() - >>> - >>> for thread in threads: - ... thread.join() - >>> - >>> # All documents have now been processed. - - The :meth:`parallel_scan` method obeys the :attr:`read_preference` of - this :class:`Collection`. - - :Parameters: - - `num_cursors`: the number of cursors to return - - `session` (optional): a - :class:`~pymongo.client_session.ClientSession`. - - `**kwargs`: additional options for the parallelCollectionScan - command can be passed as keyword arguments. - - .. note:: Requires server version **>= 2.5.5**. - - .. versionchanged:: 3.7 - Deprecated. - - .. versionchanged:: 3.6 - Added ``session`` parameter. - - .. versionchanged:: 3.4 - Added back support for arbitrary keyword arguments. MongoDB 3.4 - adds support for maxTimeMS as an option to the - parallelCollectionScan command. - - .. versionchanged:: 3.0 - Removed support for arbitrary keyword arguments, since - the parallelCollectionScan command has no optional arguments. - """ - warnings.warn("parallel_scan is deprecated. MongoDB 4.2 will remove " - "the parallelCollectionScan command.", - DeprecationWarning, stacklevel=2) - cmd = SON([('parallelCollectionScan', self.__name), - ('numCursors', num_cursors)]) - cmd.update(kwargs) - - with self._socket_for_reads(session) as (sock_info, slave_ok): - # We call sock_info.command here directly, instead of - # calling self._command to avoid using an implicit session. - result = sock_info.command( - self.__database.name, - cmd, - slave_ok, - self._read_preference_for(session), - self.codec_options, - read_concern=self.read_concern, - parse_write_concern_error=True, - session=session, - client=self.__database.client) - - cursors = [] - for cursor in result['cursors']: - cursors.append(CommandCursor( - self, cursor['cursor'], sock_info.address, - session=session, explicit_session=session is not None)) - - return cursors - def _count(self, cmd, collation=None, session=None): """Internal count helper.""" # XXX: "ns missing" checks can be removed when we drop support for diff --git a/test/test_collection.py b/test/test_collection.py index 96a3e454a..c2b06ca37 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -1737,52 +1737,6 @@ class TestCollection(IntegrationTest): self.assertTrue(cursor.alive) - @client_context.require_no_mongos - @client_context.require_version_max(4, 1, 0) - @ignore_deprecations - def test_parallel_scan(self): - db = self.db - db.drop_collection("test") - if client_context.has_secondaries: - # Test that getMore messages are sent to the right server. - db = self.client.get_database( - db.name, - read_preference=ReadPreference.SECONDARY, - write_concern=WriteConcern(w=self.w)) - - coll = db.test - coll.insert_many([{'_id': i} for i in range(8000)]) - docs = [] - threads = [threading.Thread(target=docs.extend, args=(cursor,)) - for cursor in coll.parallel_scan(3)] - for t in threads: - t.start() - for t in threads: - t.join() - - self.assertEqual( - set(range(8000)), - set(doc['_id'] for doc in docs)) - - @client_context.require_no_mongos - @client_context.require_version_min(3, 3, 10) - @client_context.require_version_max(4, 1, 0) - @client_context.require_test_commands - @ignore_deprecations - def test_parallel_scan_max_time_ms(self): - self.client.admin.command("configureFailPoint", - "maxTimeAlwaysTimeOut", - mode="alwaysOn") - try: - self.assertRaises(ExecutionTimeout, - self.db.test.parallel_scan, - 3, - maxTimeMS=1) - finally: - self.client.admin.command("configureFailPoint", - "maxTimeAlwaysTimeOut", - mode="off") - def test_large_limit(self): db = self.db db.drop_collection("test_large_limit") diff --git a/test/test_session.py b/test/test_session.py index b36654b16..61f8ff420 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -323,40 +323,6 @@ class TestSession(IntegrationTest): self._test_ops(client, *ops) - @client_context.require_no_mongos - @client_context.require_version_max(4, 1, 0) - @ignore_deprecations - def test_parallel_collection_scan(self): - listener = self.listener - client = self.client - coll = client.pymongo_test.collection - coll.insert_many([{'_id': i} for i in range(1000)]) - - listener.results.clear() - - def scan(session=None): - cursors = coll.parallel_scan(4, session=session) - for c in cursors: - c.batch_size(2) - list(c) - - listener.results.clear() - with client.start_session() as session: - scan(session) - cursor_lsids = {} - for event in listener.results['started']: - self.assertIn( - 'lsid', event.command, - "parallel_scan sent no lsid with %s" % (event.command_name, )) - - if event.command_name == 'getMore': - cursor_id = event.command['getMore'] - if cursor_id in cursor_lsids: - self.assertEqual(cursor_lsids[cursor_id], - event.command['lsid']) - else: - cursor_lsids[cursor_id] = event.command['lsid'] - def test_cursor_clone(self): coll = self.client.pymongo_test.collection # Ensure some batches. @@ -874,14 +840,6 @@ class TestCausalConsistency(unittest.TestCase): lambda coll, session: coll.inline_map_reduce( 'function() {}', 'function() {}', session=session), exception=map_reduce_exc) - if (not client_context.is_mongos and - not client_context.version.at_least(4, 1, 0)): - def scan(coll, session): - cursors = coll.parallel_scan(1, session=session) - for cur in cursors: - list(cur) - self._test_reads( - lambda coll, session: scan(coll, session=session)) self.assertRaises( ConfigurationError,