Fix aggregate cursor read preference issues PYTHON-636

The cursor returned by aggregate now knows what server
it was created on, allowing it to return getMore results
when the read preference is not PRIMARY.

This also introduces a simplified cursor class for use
with commands that return a cursor id and perhaps an initial
set of results. None of the methods of cursor.Cursor (other
than batch_size) make sense in this case.
This commit is contained in:
Bernie Hackett 2014-02-10 18:55:50 -08:00
parent 4307f225b8
commit cdacc2f4b9
7 changed files with 285 additions and 81 deletions

View File

@ -0,0 +1,7 @@
:mod:`command_cursor` -- Tools for iterating over MongoDB command results
=========================================================================
.. automodule:: pymongo.command_cursor
:synopsis: Tools for iterating over MongoDB command results
:members:

View File

@ -31,6 +31,7 @@ Sub-modules:
connection
database
collection
command_cursor
cursor
bulk
errors

View File

@ -23,6 +23,7 @@ from pymongo import (bulk,
common,
helpers,
message)
from pymongo.command_cursor import CommandCursor
from pymongo.cursor import Cursor
from pymongo.errors import InvalidName, OperationFailure
from pymongo.helpers import _check_write_command_response
@ -1196,13 +1197,17 @@ class Collection(common.BaseObject):
With server version **>= 2.5.1**, pass
``cursor={}`` to retrieve unlimited aggregation results
with a :class:`~pymongo.cursor.Cursor`::
with a :class:`~pymongo.command_cursor.CommandCursor`::
pipeline = [{'$project': {'name': {'$toUpper': '$name'}}}]
cursor = collection.aggregate(pipeline, cursor={})
for doc in cursor:
print doc
.. versionchanged:: 2.7
When the cursor option is used, return
:class:`~pymongo.command_cursor.CommandCursor` instead of
:class:`~pymongo.cursor.Cursor`.
.. versionchanged:: 2.6
Added cursor support.
.. versionadded:: 2.3
@ -1228,17 +1233,19 @@ class Collection(common.BaseObject):
'_use_master': use_master}
command_kwargs.update(kwargs)
command_response = self.__database.command(
result, conn_id = self.__database._command(
"aggregate", self.__name, **command_kwargs)
if 'cursor' in command_response:
cursor_info = command_response['cursor']
return Cursor(
if 'cursor' in result:
cursor_info = result['cursor']
return CommandCursor(
self,
_first_batch=cursor_info['firstBatch'],
_cursor_id=cursor_info['id'])
cursor_info['id'],
conn_id,
cursor_info['firstBatch'],
command_kwargs.get('compile_re', True))
else:
return command_response
return result
# TODO key and condition ought to be optional, but deprecation
# could be painful as argument order would have to change.

162
pymongo/command_cursor.py Normal file
View File

@ -0,0 +1,162 @@
# Copyright 2014 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""CommandCursor class to iterate over command results."""
from collections import deque
from pymongo import helpers, message
from pymongo.errors import AutoReconnect
class CommandCursor(object):
"""A cursor / iterator over command cursors.
"""
def __init__(self, collection, cursor_id,
conn_id, initial=None, compile_re=True):
"""Create a new command cursor.
"""
self.__collection = collection
self.__id = cursor_id
self.__conn_id = conn_id
self.__data = deque(initial or [])
self.__decode_opts = (
collection.database.connection.document_class,
collection.database.connection.tz_aware,
collection.uuid_subtype,
compile_re
)
self.__retrieved = 0
self.__batch_size = 0
self.__killed = False
def __del__(self):
if self.__id and not self.__killed:
self.__die()
def __die(self):
"""Closes this cursor.
"""
if self.__id and not self.__killed:
client = self.__collection.database.connection
if self.__conn_id is not None:
client.close_cursor(self.__id, self.__conn_id)
else:
client.close_cursor(self.__id)
self.__killed = True
def close(self):
"""Explicitly close / kill this cursor. Required for PyPy, Jython and
other Python implementations that don't use reference counting
garbage collection.
"""
self.__die()
def batch_size(self, batch_size):
"""Limits the number of documents returned in one batch. Each batch
requires a round trip to the server. It can be adjusted to optimize
performance and limit data transfer.
.. note:: batch_size can not override MongoDB's internal limits on the
amount of data it will return to the client in a single batch (i.e
if you set batch size to 1,000,000,000, MongoDB will currently only
return 4-16MB of results per batch).
Raises :exc:`TypeError` if `batch_size` is not an integer.
Raises :exc:`ValueError` if `batch_size` is less than ``0``.
:Parameters:
- `batch_size`: The size of each batch of results requested.
"""
if not isinstance(batch_size, (int, long)):
raise TypeError("batch_size must be an integer")
if batch_size < 0:
raise ValueError("batch_size must be >= 0")
self.__batch_size = batch_size == 1 and 2 or batch_size
return self
def __send_message(self, msg):
"""Send a getmore message and handle the response.
"""
client = self.__collection.database.connection
try:
res = client._send_message_with_response(
msg, _connection_to_use=self.__conn_id)
self.__conn_id, (response, dummy0, dummy1) = res
except AutoReconnect:
# Don't try to send kill cursors on another socket
# or to another server. It can cause a _pinValue
# assertion on some server releases if we get here
# due to a socket timeout.
self.__killed = True
raise
try:
response = helpers._unpack_response(response,
self.__id,
*self.__decode_opts)
except AutoReconnect:
# Don't send kill cursors to another server after a "not master"
# error. It's completely pointless.
self.__killed = True
client.disconnect()
raise
self.__id = response["cursor_id"]
assert response["starting_from"] == self.__retrieved, (
"Result batch started from %s, expected %s" % (
response['starting_from'], self.__retrieved))
self.__retrieved += response["number_returned"]
self.__data = deque(response["data"])
def __refresh(self):
"""Refreshes the cursor with more data from the server.
Returns the length of self.__data after refresh. Will exit early if
self.__data is already non-empty. Raises OperationFailure when the
cursor cannot be refreshed due to an error on the query.
"""
if len(self.__data) or self.__killed:
return len(self.__data)
if self.__id: # Get More
self.__send_message(
message.get_more(self.__collection.full_name,
self.__batch_size, self.__id))
else: # Cursor id is zero nothing else to return
self.__killed = True
return len(self.__data)
def __iter__(self):
return self
def next(self):
"""Advance the cursor.
"""
if len(self.__data) or self.__refresh():
coll = self.__collection
return coll.database._fix_incoming(self.__data.popleft(), coll)
else:
raise StopIteration
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.__die()

View File

@ -189,6 +189,17 @@ class Cursor(object):
"""
return self.__collection
@property
def _connection_id(self):
"""The server/client/pool this cursor lives on.
Could be (host, port), -1, or None depending on what
client class executed the initial query or this cursor
being advanced at all.
"""
# TODO: Make this public after sorting out client issues.
return self.__connection_id
def __del__(self):
if self.__id and not self.__killed:
self.__die()

View File

@ -271,6 +271,77 @@ class Database(common.BaseObject):
son = manipulator.transform_outgoing(son, collection)
return son
def _command(self, command, value=1,
check=True, allowable_errors=None,
uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, **kwargs):
"""Internal command helper.
"""
if isinstance(command, basestring):
command = SON([(command, value)])
command_name = command.keys()[0].lower()
must_use_master = kwargs.pop('_use_master', False)
if command_name not in rp.secondary_ok_commands:
must_use_master = True
# Special-case: mapreduce can go to secondaries only if inline
if command_name == 'mapreduce':
out = command.get('out') or kwargs.get('out')
if not isinstance(out, dict) or not out.get('inline'):
must_use_master = True
# Special-case: aggregate with $out cannot go to secondaries.
if command_name == 'aggregate':
for stage in kwargs.get('pipeline', []):
if '$out' in stage:
must_use_master = True
break
extra_opts = {
'as_class': kwargs.pop('as_class', None),
'slave_okay': kwargs.pop('slave_okay', self.slave_okay),
'_must_use_master': must_use_master,
'_uuid_subtype': uuid_subtype
}
extra_opts['read_preference'] = kwargs.pop(
'read_preference',
self.read_preference)
extra_opts['tag_sets'] = kwargs.pop(
'tag_sets',
self.tag_sets)
extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop(
'secondary_acceptable_latency_ms',
self.secondary_acceptable_latency_ms)
extra_opts['compile_re'] = compile_re
fields = kwargs.get('fields')
if fields is not None and not isinstance(fields, dict):
kwargs['fields'] = helpers._fields_list_to_dict(fields)
command.update(kwargs)
# Warn if must_use_master will override read_preference.
if (extra_opts['read_preference'] != rp.ReadPreference.PRIMARY and
extra_opts['_must_use_master']):
warnings.warn("%s does not support %s read preference "
"and will be routed to the primary instead." %
(command_name,
rp.modes[extra_opts['read_preference']]),
UserWarning)
cursor = self["$cmd"].find(command, **extra_opts).limit(-1)
for doc in cursor:
result = doc
if check:
msg = "command %s failed: %%s" % repr(command).replace("%", "%%")
helpers._check_command_response(result, self.connection.disconnect,
msg, allowable_errors)
return result, cursor._connection_id
def command(self, command, value=1,
check=True, allowable_errors=[],
uuid_subtype=OLD_UUID_SUBTYPE, compile_re=True, **kwargs):
@ -360,69 +431,8 @@ class Database(common.BaseObject):
.. mongodoc:: commands
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold
"""
if isinstance(command, basestring):
command = SON([(command, value)])
command_name = command.keys()[0].lower()
must_use_master = kwargs.pop('_use_master', False)
if command_name not in rp.secondary_ok_commands:
must_use_master = True
# Special-case: mapreduce can go to secondaries only if inline
if command_name == 'mapreduce':
out = command.get('out') or kwargs.get('out')
if not isinstance(out, dict) or not out.get('inline'):
must_use_master = True
# Special-case: aggregate with $out cannot go to secondaries.
if command_name == 'aggregate':
for stage in kwargs.get('pipeline', []):
if '$out' in stage:
must_use_master = True
break
extra_opts = {
'as_class': kwargs.pop('as_class', None),
'slave_okay': kwargs.pop('slave_okay', self.slave_okay),
'_must_use_master': must_use_master,
'_uuid_subtype': uuid_subtype
}
extra_opts['read_preference'] = kwargs.pop(
'read_preference',
self.read_preference)
extra_opts['tag_sets'] = kwargs.pop(
'tag_sets',
self.tag_sets)
extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop(
'secondary_acceptable_latency_ms',
self.secondary_acceptable_latency_ms)
extra_opts['compile_re'] = compile_re
fields = kwargs.get('fields')
if fields is not None and not isinstance(fields, dict):
kwargs['fields'] = helpers._fields_list_to_dict(fields)
command.update(kwargs)
# Warn if must_use_master will override read_preference.
if (extra_opts['read_preference'] != rp.ReadPreference.PRIMARY and
extra_opts['_must_use_master']):
warnings.warn("%s does not support %s read preference "
"and will be routed to the primary instead." %
(command_name,
rp.modes[extra_opts['read_preference']]),
UserWarning)
result = self["$cmd"].find_one(command, **extra_opts)
if check:
msg = "command %s failed: %%s" % repr(command).replace("%", "%%")
helpers._check_command_response(result, self.connection.disconnect,
msg, allowable_errors)
return result
return self._command(command, value, check, allowable_errors,
uuid_subtype, compile_re, **kwargs)[0]
def collection_names(self, include_system_collections=True):
"""Get a list of all the collection names in this database.

View File

@ -41,7 +41,9 @@ from pymongo import (ASCENDING, DESCENDING, GEO2D,
GEOHAYSTACK, GEOSPHERE, HASHED)
from pymongo import message as message_module
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from pymongo.command_cursor import CommandCursor
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
from pymongo.read_preferences import ReadPreference
from pymongo.son_manipulator import SONManipulator
from pymongo.errors import (DocumentTooLarge,
DuplicateKeyError,
@ -1349,25 +1351,29 @@ class TestCollection(unittest.TestCase):
db = self.db
projection = {'$project': {'_id': '$_id'}}
cursor = db.test.aggregate(projection, cursor={})
self.assertTrue(isinstance(cursor, Cursor))
self.assertRaises(InvalidOperation, cursor.rewind)
self.assertRaises(InvalidOperation, cursor.clone)
self.assertRaises(InvalidOperation, cursor.count)
self.assertRaises(InvalidOperation, cursor.explain)
self.assertTrue(isinstance(cursor, CommandCursor))
def test_aggregation_cursor(self):
if not version.at_least(self.db.connection, (2, 5, 1)):
raise SkipTest("Aggregation cursor requires MongoDB >= 2.5.1")
db = self.db
ismaster = db.command('ismaster')
setname = ismaster.get('setName')
w = len(ismaster.get('hosts', [])) or 1
if setname:
db = MongoReplicaSetClient(host=self.client.host,
port=self.client.port,
replicaSet=setname)[db.name]
db.read_preference = ReadPreference.SECONDARY
# A small collection which returns only an initial batch,
# and a larger one that requires a getMore.
for collection_size in (10, 1000):
db.drop_collection("test")
db.test.insert([{'_id': i} for i in range(collection_size)])
db.test.insert([{'_id': i} for i in range(collection_size)], w=w)
expected_sum = sum(range(collection_size))
# Use batchSize to ensure multiple getMore messages
cursor = db.test.aggregate(
{'$project': {'_id': '$_id'}}, cursor={})
{'$project': {'_id': '$_id'}},
cursor={'batchSize': 5})
self.assertEqual(
expected_sum,