190 lines
6.4 KiB
Python
190 lines
6.4 KiB
Python
# Copyright 2014-2015 MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""CommandCursor class to iterate over command results."""
|
|
|
|
from collections import deque
|
|
|
|
from pymongo import helpers, message
|
|
from pymongo.errors import AutoReconnect, CursorNotFound
|
|
|
|
|
|
class CommandCursor(object):
|
|
"""A cursor / iterator over command cursors.
|
|
"""
|
|
|
|
def __init__(self, collection, cursor_info,
|
|
conn_id, compile_re=True, retrieved=0):
|
|
"""Create a new command cursor.
|
|
"""
|
|
self.__collection = collection
|
|
self.__id = cursor_info['id']
|
|
self.__conn_id = conn_id
|
|
self.__data = deque(cursor_info['firstBatch'])
|
|
self.__codec_options = collection.codec_options
|
|
self.__compile_re = compile_re
|
|
self.__retrieved = retrieved
|
|
self.__batch_size = 0
|
|
self.__killed = (self.__id == 0)
|
|
|
|
if "ns" in cursor_info:
|
|
self.__ns = cursor_info["ns"]
|
|
else:
|
|
self.__ns = collection.full_name
|
|
|
|
def __del__(self):
|
|
if self.__id and not self.__killed:
|
|
self.__die()
|
|
|
|
def __die(self):
|
|
"""Closes this cursor.
|
|
"""
|
|
if self.__id and not self.__killed:
|
|
client = self.__collection.database.connection
|
|
if self.__conn_id is not None:
|
|
client.close_cursor(self.__id, self.__conn_id)
|
|
else:
|
|
client.close_cursor(self.__id)
|
|
self.__killed = True
|
|
|
|
def close(self):
|
|
"""Explicitly close / kill this cursor. Required for PyPy, Jython and
|
|
other Python implementations that don't use reference counting
|
|
garbage collection.
|
|
"""
|
|
self.__die()
|
|
|
|
def batch_size(self, batch_size):
|
|
"""Limits the number of documents returned in one batch. Each batch
|
|
requires a round trip to the server. It can be adjusted to optimize
|
|
performance and limit data transfer.
|
|
|
|
.. note:: batch_size can not override MongoDB's internal limits on the
|
|
amount of data it will return to the client in a single batch (i.e
|
|
if you set batch size to 1,000,000,000, MongoDB will currently only
|
|
return 4-16MB of results per batch).
|
|
|
|
Raises :exc:`TypeError` if `batch_size` is not an integer.
|
|
Raises :exc:`ValueError` if `batch_size` is less than ``0``.
|
|
|
|
:Parameters:
|
|
- `batch_size`: The size of each batch of results requested.
|
|
"""
|
|
if not isinstance(batch_size, (int, long)):
|
|
raise TypeError("batch_size must be an integer")
|
|
if batch_size < 0:
|
|
raise ValueError("batch_size must be >= 0")
|
|
|
|
self.__batch_size = batch_size == 1 and 2 or batch_size
|
|
return self
|
|
|
|
def __send_message(self, msg):
|
|
"""Send a getmore message and handle the response.
|
|
"""
|
|
client = self.__collection.database.connection
|
|
try:
|
|
res = client._send_message_with_response(
|
|
msg, _connection_to_use=self.__conn_id)
|
|
self.__conn_id, (response, dummy0, dummy1) = res
|
|
except AutoReconnect:
|
|
# Don't try to send kill cursors on another socket
|
|
# or to another server. It can cause a _pinValue
|
|
# assertion on some server releases if we get here
|
|
# due to a socket timeout.
|
|
self.__killed = True
|
|
raise
|
|
|
|
try:
|
|
response = helpers._unpack_response(
|
|
response,
|
|
self.__id,
|
|
self.__codec_options.document_class,
|
|
self.__codec_options.tz_aware,
|
|
self.__codec_options.uuid_representation,
|
|
self.__compile_re)
|
|
except CursorNotFound:
|
|
self.__killed = True
|
|
raise
|
|
except AutoReconnect:
|
|
# Don't send kill cursors to another server after a "not master"
|
|
# error. It's completely pointless.
|
|
self.__killed = True
|
|
client.disconnect()
|
|
raise
|
|
self.__id = response["cursor_id"]
|
|
if self.__id == 0:
|
|
self.__killed = True
|
|
|
|
self.__retrieved += response["number_returned"]
|
|
self.__data = deque(response["data"])
|
|
|
|
def _refresh(self):
|
|
"""Refreshes the cursor with more data from the server.
|
|
|
|
Returns the length of self.__data after refresh. Will exit early if
|
|
self.__data is already non-empty. Raises OperationFailure when the
|
|
cursor cannot be refreshed due to an error on the query.
|
|
"""
|
|
if len(self.__data) or self.__killed:
|
|
return len(self.__data)
|
|
|
|
if self.__id: # Get More
|
|
self.__send_message(
|
|
message.get_more(self.__ns,
|
|
self.__batch_size, self.__id))
|
|
|
|
else: # Cursor id is zero nothing else to return
|
|
self.__killed = True
|
|
|
|
return len(self.__data)
|
|
|
|
@property
|
|
def alive(self):
|
|
"""Does this cursor have the potential to return more data?
|
|
|
|
Even if :attr:`alive` is ``True``, :meth:`next` can raise
|
|
:exc:`StopIteration`. Best to use a for loop::
|
|
|
|
for doc in collection.aggregate(pipeline):
|
|
print(doc)
|
|
|
|
.. note:: :attr:`alive` can be True while iterating a cursor from
|
|
a failed server. In this case :attr:`alive` will return False after
|
|
:meth:`next` fails to retrieve the next batch of results from the
|
|
server.
|
|
"""
|
|
return bool(len(self.__data) or (not self.__killed))
|
|
|
|
@property
|
|
def cursor_id(self):
|
|
"""Returns the id of the cursor."""
|
|
return self.__id
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
"""Advance the cursor."""
|
|
if len(self.__data) or self._refresh():
|
|
coll = self.__collection
|
|
return coll.database._fix_outgoing(self.__data.popleft(), coll)
|
|
else:
|
|
raise StopIteration
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.__die()
|