It's not safe to return the pinned connection to the pool from within Cursor.del because the Pool's lock may be held by a python thread while the cyclic garbage collector runs. Instead we send the cursor cleanup request to the client's background thread. The thread will send killCursors on the pinned socket and then return the socket to the pool. Also fixed a similar bug when garbage collecting a pinned session.
328 lines
12 KiB
Python
328 lines
12 KiB
Python
# Copyright 2014-present MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""CommandCursor class to iterate over command results."""
|
|
|
|
from collections import deque
|
|
|
|
from bson import _convert_raw_document_lists_to_streams
|
|
from pymongo.cursor import _SocketManager, _CURSOR_CLOSED_ERRORS
|
|
from pymongo.errors import (ConnectionFailure,
|
|
InvalidOperation,
|
|
OperationFailure)
|
|
from pymongo.message import (_CursorAddress,
|
|
_GetMore,
|
|
_RawBatchGetMore)
|
|
from pymongo.response import PinnedResponse
|
|
|
|
|
|
class CommandCursor(object):
|
|
"""A cursor / iterator over command cursors."""
|
|
_getmore_class = _GetMore
|
|
|
|
def __init__(self, collection, cursor_info, address, retrieved=0,
|
|
batch_size=0, max_await_time_ms=None, session=None,
|
|
explicit_session=False):
|
|
"""Create a new command cursor.
|
|
|
|
The parameter 'retrieved' is unused.
|
|
"""
|
|
self.__sock_mgr = None
|
|
self.__collection = collection
|
|
self.__id = cursor_info['id']
|
|
self.__data = deque(cursor_info['firstBatch'])
|
|
self.__postbatchresumetoken = cursor_info.get('postBatchResumeToken')
|
|
self.__address = address
|
|
self.__batch_size = batch_size
|
|
self.__max_await_time_ms = max_await_time_ms
|
|
self.__session = session
|
|
self.__explicit_session = explicit_session
|
|
self.__killed = (self.__id == 0)
|
|
if self.__killed:
|
|
self.__end_session(True)
|
|
|
|
if "ns" in cursor_info:
|
|
self.__ns = cursor_info["ns"]
|
|
else:
|
|
self.__ns = collection.full_name
|
|
|
|
self.batch_size(batch_size)
|
|
|
|
if (not isinstance(max_await_time_ms, int)
|
|
and max_await_time_ms is not None):
|
|
raise TypeError("max_await_time_ms must be an integer or None")
|
|
|
|
def __del__(self):
|
|
self.__die()
|
|
|
|
def __die(self, synchronous=False):
|
|
"""Closes this cursor.
|
|
"""
|
|
already_killed = self.__killed
|
|
self.__killed = True
|
|
if self.__id and not already_killed:
|
|
cursor_id = self.__id
|
|
address = _CursorAddress(
|
|
self.__address, self.__collection.full_name)
|
|
else:
|
|
# Skip killCursors.
|
|
cursor_id = 0
|
|
address = None
|
|
self.__collection.database.client._cleanup_cursor(
|
|
synchronous,
|
|
cursor_id,
|
|
address,
|
|
self.__sock_mgr,
|
|
self.__session,
|
|
self.__explicit_session)
|
|
if not self.__explicit_session:
|
|
self.__session = None
|
|
self.__sock_mgr = None
|
|
|
|
def __end_session(self, synchronous):
|
|
if self.__session and not self.__explicit_session:
|
|
self.__session._end_session(lock=synchronous)
|
|
self.__session = None
|
|
|
|
def close(self):
|
|
"""Explicitly close / kill this cursor.
|
|
"""
|
|
self.__die(True)
|
|
|
|
def batch_size(self, batch_size):
|
|
"""Limits the number of documents returned in one batch. Each batch
|
|
requires a round trip to the server. It can be adjusted to optimize
|
|
performance and limit data transfer.
|
|
|
|
.. note:: batch_size can not override MongoDB's internal limits on the
|
|
amount of data it will return to the client in a single batch (i.e
|
|
if you set batch size to 1,000,000,000, MongoDB will currently only
|
|
return 4-16MB of results per batch).
|
|
|
|
Raises :exc:`TypeError` if `batch_size` is not an integer.
|
|
Raises :exc:`ValueError` if `batch_size` is less than ``0``.
|
|
|
|
:Parameters:
|
|
- `batch_size`: The size of each batch of results requested.
|
|
"""
|
|
if not isinstance(batch_size, int):
|
|
raise TypeError("batch_size must be an integer")
|
|
if batch_size < 0:
|
|
raise ValueError("batch_size must be >= 0")
|
|
|
|
self.__batch_size = batch_size == 1 and 2 or batch_size
|
|
return self
|
|
|
|
def _has_next(self):
|
|
"""Returns `True` if the cursor has documents remaining from the
|
|
previous batch."""
|
|
return len(self.__data) > 0
|
|
|
|
@property
|
|
def _post_batch_resume_token(self):
|
|
"""Retrieve the postBatchResumeToken from the response to a
|
|
changeStream aggregate or getMore."""
|
|
return self.__postbatchresumetoken
|
|
|
|
def _maybe_pin_connection(self, sock_info):
|
|
client = self.__collection.database.client
|
|
if not client._should_pin_cursor(self.__session):
|
|
return
|
|
if not self.__sock_mgr:
|
|
sock_info.pin_cursor()
|
|
sock_mgr = _SocketManager(sock_info, False)
|
|
# Ensure the connection gets returned when the entire result is
|
|
# returned in the first batch.
|
|
if self.__id == 0:
|
|
sock_mgr.close()
|
|
else:
|
|
self.__sock_mgr = sock_mgr
|
|
|
|
def __send_message(self, operation):
|
|
"""Send a getmore message and handle the response.
|
|
"""
|
|
client = self.__collection.database.client
|
|
try:
|
|
response = client._run_operation(
|
|
operation, self._unpack_response, address=self.__address)
|
|
except OperationFailure as exc:
|
|
if exc.code in _CURSOR_CLOSED_ERRORS:
|
|
# Don't send killCursors because the cursor is already closed.
|
|
self.__killed = True
|
|
# Return the session and pinned connection, if necessary.
|
|
self.close()
|
|
raise
|
|
except ConnectionFailure:
|
|
# Don't send killCursors because the cursor is already closed.
|
|
self.__killed = True
|
|
# Return the session and pinned connection, if necessary.
|
|
self.close()
|
|
raise
|
|
except Exception:
|
|
self.close()
|
|
raise
|
|
|
|
if isinstance(response, PinnedResponse):
|
|
if not self.__sock_mgr:
|
|
self.__sock_mgr = _SocketManager(response.socket_info,
|
|
response.more_to_come)
|
|
if response.from_command:
|
|
cursor = response.docs[0]['cursor']
|
|
documents = cursor['nextBatch']
|
|
self.__postbatchresumetoken = cursor.get('postBatchResumeToken')
|
|
self.__id = cursor['id']
|
|
else:
|
|
documents = response.docs
|
|
self.__id = response.data.cursor_id
|
|
|
|
if self.__id == 0:
|
|
self.close()
|
|
self.__data = deque(documents)
|
|
|
|
def _unpack_response(self, response, cursor_id, codec_options,
|
|
user_fields=None, legacy_response=False):
|
|
return response.unpack_response(cursor_id, codec_options, user_fields,
|
|
legacy_response)
|
|
|
|
def _refresh(self):
|
|
"""Refreshes the cursor with more data from the server.
|
|
|
|
Returns the length of self.__data after refresh. Will exit early if
|
|
self.__data is already non-empty. Raises OperationFailure when the
|
|
cursor cannot be refreshed due to an error on the query.
|
|
"""
|
|
if len(self.__data) or self.__killed:
|
|
return len(self.__data)
|
|
|
|
if self.__id: # Get More
|
|
dbname, collname = self.__ns.split('.', 1)
|
|
read_pref = self.__collection._read_preference_for(self.session)
|
|
self.__send_message(
|
|
self._getmore_class(dbname,
|
|
collname,
|
|
self.__batch_size,
|
|
self.__id,
|
|
self.__collection.codec_options,
|
|
read_pref,
|
|
self.__session,
|
|
self.__collection.database.client,
|
|
self.__max_await_time_ms,
|
|
self.__sock_mgr, False))
|
|
else: # Cursor id is zero nothing else to return
|
|
self.__die(True)
|
|
|
|
return len(self.__data)
|
|
|
|
@property
|
|
def alive(self):
|
|
"""Does this cursor have the potential to return more data?
|
|
|
|
Even if :attr:`alive` is ``True``, :meth:`next` can raise
|
|
:exc:`StopIteration`. Best to use a for loop::
|
|
|
|
for doc in collection.aggregate(pipeline):
|
|
print(doc)
|
|
|
|
.. note:: :attr:`alive` can be True while iterating a cursor from
|
|
a failed server. In this case :attr:`alive` will return False after
|
|
:meth:`next` fails to retrieve the next batch of results from the
|
|
server.
|
|
"""
|
|
return bool(len(self.__data) or (not self.__killed))
|
|
|
|
@property
|
|
def cursor_id(self):
|
|
"""Returns the id of the cursor."""
|
|
return self.__id
|
|
|
|
@property
|
|
def address(self):
|
|
"""The (host, port) of the server used, or None.
|
|
|
|
.. versionadded:: 3.0
|
|
"""
|
|
return self.__address
|
|
|
|
@property
|
|
def session(self):
|
|
"""The cursor's :class:`~pymongo.client_session.ClientSession`, or None.
|
|
|
|
.. versionadded:: 3.6
|
|
"""
|
|
if self.__explicit_session:
|
|
return self.__session
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
"""Advance the cursor."""
|
|
# Block until a document is returnable.
|
|
while self.alive:
|
|
doc = self._try_next(True)
|
|
if doc is not None:
|
|
return doc
|
|
|
|
raise StopIteration
|
|
|
|
__next__ = next
|
|
|
|
def _try_next(self, get_more_allowed):
|
|
"""Advance the cursor blocking for at most one getMore command."""
|
|
if not len(self.__data) and not self.__killed and get_more_allowed:
|
|
self._refresh()
|
|
if len(self.__data):
|
|
return self.__data.popleft()
|
|
else:
|
|
return None
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
|
|
class RawBatchCommandCursor(CommandCursor):
|
|
_getmore_class = _RawBatchGetMore
|
|
|
|
def __init__(self, collection, cursor_info, address, retrieved=0,
|
|
batch_size=0, max_await_time_ms=None, session=None,
|
|
explicit_session=False):
|
|
"""Create a new cursor / iterator over raw batches of BSON data.
|
|
|
|
Should not be called directly by application developers -
|
|
see :meth:`~pymongo.collection.Collection.aggregate_raw_batches`
|
|
instead.
|
|
|
|
.. mongodoc:: cursors
|
|
"""
|
|
assert not cursor_info.get('firstBatch')
|
|
super(RawBatchCommandCursor, self).__init__(
|
|
collection, cursor_info, address, retrieved, batch_size,
|
|
max_await_time_ms, session, explicit_session)
|
|
|
|
def _unpack_response(self, response, cursor_id, codec_options,
|
|
user_fields=None, legacy_response=False):
|
|
raw_response = response.raw_response(
|
|
cursor_id, user_fields=user_fields)
|
|
if not legacy_response:
|
|
# OP_MSG returns firstBatch/nextBatch documents as a BSON array
|
|
# Re-assemble the array of documents into a document stream
|
|
_convert_raw_document_lists_to_streams(raw_response[0])
|
|
return raw_response
|
|
|
|
def __getitem__(self, index):
|
|
raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor")
|