PYTHON-1592 Remove Collection.parallel_scan (#547)

This commit is contained in:
Shane Harvey 2021-01-15 14:01:45 -08:00 committed by GitHub
parent 387bfa0bfa
commit 3c899aeb89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 14 additions and 180 deletions

View File

@ -68,7 +68,6 @@
.. automethod:: options
.. automethod:: map_reduce
.. automethod:: inline_map_reduce
.. automethod:: parallel_scan
.. automethod:: initialize_unordered_bulk_op
.. automethod:: initialize_ordered_bulk_op
.. automethod:: group

View File

@ -15,13 +15,14 @@ Breaking Changes in 4.0
.......................
- Removed support for Python 2.7, 3.4, and 3.5. Python 3.6+ is now required.
- Removed :meth:`~pymongo.database.Database.eval`,
:data:`~pymongo.database.Database.system_js` and
:class:`~pymongo.database.SystemJS`.
- Removed :meth:`pymongo.database.Database.eval`,
:data:`pymongo.database.Database.system_js` and
:class:`pymongo.database.SystemJS`.
- Removed :meth:`pymongo.mongo_client.MongoClient.fsync`,
:meth:`pymongo.mongo_client.MongoClient.unlock`, and
:attr:`pymongo.mongo_client.MongoClient.is_locked`.
- Removed :mod:`~pymongo.thread_util`.
- Removed :meth:`pymongo.collection.Collection.parallel_scan`.
- Removed :mod:`pymongo.thread_util`.
Notable improvements
....................

View File

@ -116,3 +116,12 @@ can be changed to this::
>>> from bson.code import Code
>>> result = database.command('eval', Code('function (x) {return x;}'), args=[3]).get('retval')
Collection.parallel_scan is removed
...................................
Removed :meth:`~pymongo.collection.Collection.parallel_scan`. MongoDB 4.2
removed the `parallelCollectionScan command`_. There is no replacement.
.. _parallelCollectionScan command: https://docs.mongodb.com/manual/reference/command/parallelCollectionScan/

View File

@ -1559,93 +1559,6 @@ class Collection(common.BaseObject):
return RawBatchCursor(self, *args, **kwargs)
def parallel_scan(self, num_cursors, session=None, **kwargs):
"""**DEPRECATED**: Scan this entire collection in parallel.
Returns a list of up to ``num_cursors`` cursors that can be iterated
concurrently. As long as the collection is not modified during
scanning, each document appears once in one of the cursors result
sets.
For example, to process each document in a collection using some
thread-safe ``process_document()`` function:
>>> def process_cursor(cursor):
... for document in cursor:
... # Some thread-safe processing function:
... process_document(document)
>>>
>>> # Get up to 4 cursors.
...
>>> cursors = collection.parallel_scan(4)
>>> threads = [
... threading.Thread(target=process_cursor, args=(cursor,))
... for cursor in cursors]
>>>
>>> for thread in threads:
... thread.start()
>>>
>>> for thread in threads:
... thread.join()
>>>
>>> # All documents have now been processed.
The :meth:`parallel_scan` method obeys the :attr:`read_preference` of
this :class:`Collection`.
:Parameters:
- `num_cursors`: the number of cursors to return
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`.
- `**kwargs`: additional options for the parallelCollectionScan
command can be passed as keyword arguments.
.. note:: Requires server version **>= 2.5.5**.
.. versionchanged:: 3.7
Deprecated.
.. versionchanged:: 3.6
Added ``session`` parameter.
.. versionchanged:: 3.4
Added back support for arbitrary keyword arguments. MongoDB 3.4
adds support for maxTimeMS as an option to the
parallelCollectionScan command.
.. versionchanged:: 3.0
Removed support for arbitrary keyword arguments, since
the parallelCollectionScan command has no optional arguments.
"""
warnings.warn("parallel_scan is deprecated. MongoDB 4.2 will remove "
"the parallelCollectionScan command.",
DeprecationWarning, stacklevel=2)
cmd = SON([('parallelCollectionScan', self.__name),
('numCursors', num_cursors)])
cmd.update(kwargs)
with self._socket_for_reads(session) as (sock_info, slave_ok):
# We call sock_info.command here directly, instead of
# calling self._command to avoid using an implicit session.
result = sock_info.command(
self.__database.name,
cmd,
slave_ok,
self._read_preference_for(session),
self.codec_options,
read_concern=self.read_concern,
parse_write_concern_error=True,
session=session,
client=self.__database.client)
cursors = []
for cursor in result['cursors']:
cursors.append(CommandCursor(
self, cursor['cursor'], sock_info.address,
session=session, explicit_session=session is not None))
return cursors
def _count(self, cmd, collation=None, session=None):
"""Internal count helper."""
# XXX: "ns missing" checks can be removed when we drop support for

View File

@ -1737,52 +1737,6 @@ class TestCollection(IntegrationTest):
self.assertTrue(cursor.alive)
@client_context.require_no_mongos
@client_context.require_version_max(4, 1, 0)
@ignore_deprecations
def test_parallel_scan(self):
db = self.db
db.drop_collection("test")
if client_context.has_secondaries:
# Test that getMore messages are sent to the right server.
db = self.client.get_database(
db.name,
read_preference=ReadPreference.SECONDARY,
write_concern=WriteConcern(w=self.w))
coll = db.test
coll.insert_many([{'_id': i} for i in range(8000)])
docs = []
threads = [threading.Thread(target=docs.extend, args=(cursor,))
for cursor in coll.parallel_scan(3)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(
set(range(8000)),
set(doc['_id'] for doc in docs))
@client_context.require_no_mongos
@client_context.require_version_min(3, 3, 10)
@client_context.require_version_max(4, 1, 0)
@client_context.require_test_commands
@ignore_deprecations
def test_parallel_scan_max_time_ms(self):
self.client.admin.command("configureFailPoint",
"maxTimeAlwaysTimeOut",
mode="alwaysOn")
try:
self.assertRaises(ExecutionTimeout,
self.db.test.parallel_scan,
3,
maxTimeMS=1)
finally:
self.client.admin.command("configureFailPoint",
"maxTimeAlwaysTimeOut",
mode="off")
def test_large_limit(self):
db = self.db
db.drop_collection("test_large_limit")

View File

@ -323,40 +323,6 @@ class TestSession(IntegrationTest):
self._test_ops(client, *ops)
@client_context.require_no_mongos
@client_context.require_version_max(4, 1, 0)
@ignore_deprecations
def test_parallel_collection_scan(self):
listener = self.listener
client = self.client
coll = client.pymongo_test.collection
coll.insert_many([{'_id': i} for i in range(1000)])
listener.results.clear()
def scan(session=None):
cursors = coll.parallel_scan(4, session=session)
for c in cursors:
c.batch_size(2)
list(c)
listener.results.clear()
with client.start_session() as session:
scan(session)
cursor_lsids = {}
for event in listener.results['started']:
self.assertIn(
'lsid', event.command,
"parallel_scan sent no lsid with %s" % (event.command_name, ))
if event.command_name == 'getMore':
cursor_id = event.command['getMore']
if cursor_id in cursor_lsids:
self.assertEqual(cursor_lsids[cursor_id],
event.command['lsid'])
else:
cursor_lsids[cursor_id] = event.command['lsid']
def test_cursor_clone(self):
coll = self.client.pymongo_test.collection
# Ensure some batches.
@ -874,14 +840,6 @@ class TestCausalConsistency(unittest.TestCase):
lambda coll, session: coll.inline_map_reduce(
'function() {}', 'function() {}', session=session),
exception=map_reduce_exc)
if (not client_context.is_mongos and
not client_context.version.at_least(4, 1, 0)):
def scan(coll, session):
cursors = coll.parallel_scan(1, session=session)
for cur in cursors:
list(cur)
self._test_reads(
lambda coll, session: scan(coll, session=session))
self.assertRaises(
ConfigurationError,