Support exhaust cursor flag PYTHON-265

This commit is contained in:
behackett 2013-08-05 11:29:11 -07:00
parent 40a32cc4a2
commit 4d42258697
9 changed files with 243 additions and 83 deletions

View File

@ -33,7 +33,7 @@
.. automethod:: update(spec, document[, upsert=False[, manipulate=False[, safe=None[, multi=False[, check_keys=True[, **kwargs]]]]]])
.. automethod:: remove([spec_or_id=None[, safe=None[, **kwargs]]])
.. automethod:: drop
.. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, slave_okay=False[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, **kwargs]]]]]]]]]]]]]]]])
.. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, slave_okay=False[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, exhaust=False[,**kwargs]]]]]]]]]]]]]]]]])
.. automethod:: find_one([spec_or_id=None[, *args[, **kwargs]]])
.. automethod:: count
.. automethod:: create_index

View File

@ -679,6 +679,32 @@ class Collection(common.BaseObject):
the nearest member may accept reads. Default 15 milliseconds.
**Ignored by mongos** and must be configured on the command line.
See the localThreshold_ option for more information.
- `exhaust` (optional): If ``True`` create an "exhaust" cursor.
MongoDB will stream batched results to the client without waiting
for the client to request each batch, reducing latency.
.. note:: There are a number of caveats to using the `exhaust`
parameter:
1. The `exhaust` and `limit` options are incompatible and can
not be used together.
2. The `exhaust` option is not supported by mongos and can not be
used with a sharded cluster.
3. A :class:`~pymongo.cursor.Cursor` instance created with the
`exhaust` option requires an exclusive :class:`~socket.socket`
connection to MongoDB. If the :class:`~pymongo.cursor.Cursor` is
discarded without being completely iterated the underlying
:class:`~socket.socket` connection will be closed and discarded
without being returned to the connection pool.
4. A :class:`~pymongo.cursor.Cursor` instance created with the
`exhaust` option in a :doc:`request </examples/requests>` **must**
be completely iterated before executing any other operation.
5. The `network_timeout` option is ignored when using the
`exhaust` option.
.. note:: The `manipulate` parameter may default to False in
a future release.

View File

@ -34,6 +34,28 @@ _QUERY_OPTIONS = {
"partial": 128}
# This has to be an old style class due to
# http://bugs.jython.org/issue1057
class _SocketManager:
"""Used with exhaust cursors to ensure the socket is returned.
"""
def __init__(self, sock, pool):
self.sock = sock
self.pool = pool
self.__closed = False
def __del__(self):
self.close()
def close(self):
"""Return this instance's socket to the connection pool.
"""
if not self.__closed:
self.__closed = True
self.pool.maybe_return_socket(self.sock)
self.sock, self.pool = None, None
# TODO might be cool to be able to do find().include("foo") or
# find().exclude(["bar", "baz"]) or find().slice("a", 1, 2) as an
# alternative to the fields specifier.
@ -46,7 +68,7 @@ class Cursor(object):
max_scan=None, as_class=None, slave_okay=False,
await_data=False, partial=False, manipulate=True,
read_preference=ReadPreference.PRIMARY, tag_sets=[{}],
secondary_acceptable_latency_ms=None,
secondary_acceptable_latency_ms=None, exhaust=False,
_must_use_master=False, _uuid_subtype=None, **kwargs):
"""Create a new cursor.
@ -78,6 +100,8 @@ class Cursor(object):
raise TypeError("await_data must be an instance of bool")
if not isinstance(partial, bool):
raise TypeError("partial must be an instance of bool")
if not isinstance(exhaust, bool):
raise TypeError("exhaust must be an instance of bool")
if fields is not None:
if not fields:
@ -95,6 +119,15 @@ class Cursor(object):
self.__limit = limit
self.__batch_size = 0
# Exhaust cursor support
if self.__collection.database.connection.is_mongos and exhaust:
raise InvalidOperation('Exhaust cursors are '
'not supported by mongos')
if limit and exhaust:
raise InvalidOperation("Can't use limit and exhaust together.")
self.__exhaust = exhaust
self.__exhaust_mgr = None
# This is ugly. People want to be able to do cursor[5:5] and
# get an empty result set (old behavior was an
# exception). It's hard to do that right, though, because the
@ -193,11 +226,19 @@ class Cursor(object):
"""Closes this cursor.
"""
if self.__id and not self.__killed:
connection = self.__collection.database.connection
if self.__connection_id is not None:
connection.close_cursor(self.__id, self.__connection_id)
if self.__exhaust and self.__exhaust_mgr:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
self.__exhaust_mgr.sock.close()
else:
connection.close_cursor(self.__id)
connection = self.__collection.database.connection
if self.__connection_id is not None:
connection.close_cursor(self.__id, self.__connection_id)
else:
connection.close_cursor(self.__id)
if self.__exhaust and self.__exhaust_mgr:
self.__exhaust_mgr.close()
self.__killed = True
def close(self):
@ -299,6 +340,8 @@ class Cursor(object):
options |= _QUERY_OPTIONS["no_timeout"]
if self.__await_data:
options |= _QUERY_OPTIONS["await_data"]
if self.__exhaust:
options |= _QUERY_OPTIONS["exhaust"]
if self.__partial:
options |= _QUERY_OPTIONS["partial"]
return options
@ -319,6 +362,14 @@ class Cursor(object):
raise TypeError("mask must be an int")
self.__check_okay_to_chain()
if mask & _QUERY_OPTIONS["exhaust"]:
if self.__limit:
raise InvalidOperation("Can't use limit and exhaust together.")
if self.__collection.database.connection.is_mongos:
raise InvalidOperation('Exhaust cursors are '
'not supported by mongos')
self.__exhaust = True
self.__query_flags |= mask
return self
@ -332,6 +383,9 @@ class Cursor(object):
raise TypeError("mask must be an int")
self.__check_okay_to_chain()
if mask & _QUERY_OPTIONS["exhaust"]:
self.__exhaust = False
self.__query_flags &= ~mask
return self
@ -350,6 +404,8 @@ class Cursor(object):
"""
if not isinstance(limit, int):
raise TypeError("limit must be an int")
if self.__exhaust:
raise InvalidOperation("Can't use limit and exhaust together.")
self.__check_okay_to_chain()
self.__empty = False
@ -689,34 +745,38 @@ class Cursor(object):
def __send_message(self, message):
"""Send a query or getmore message and handles the response.
If message is ``None`` this is an exhaust cursor, which reads
the next result batch off the exhaust socket instead of
sending getMore messages to the server.
"""
db = self.__collection.database
kwargs = {"_must_use_master": self.__must_use_master}
kwargs["read_preference"] = self.__read_preference
kwargs["tag_sets"] = self.__tag_sets
kwargs["secondary_acceptable_latency_ms"] = (
self.__secondary_acceptable_latency_ms)
if self.__connection_id is not None:
kwargs["_connection_to_use"] = self.__connection_id
kwargs.update(self.__kwargs)
client = self.__collection.database.connection
try:
response = db.connection._send_message_with_response(message,
**kwargs)
except AutoReconnect:
# Don't try to send kill cursors on another socket
# or to another server. It can cause a _pinValue
# assertion on some server releases if we get here
# due to a socket timeout.
self.__killed = True
raise
if message:
kwargs = {"_must_use_master": self.__must_use_master}
kwargs["read_preference"] = self.__read_preference
kwargs["tag_sets"] = self.__tag_sets
kwargs["secondary_acceptable_latency_ms"] = (
self.__secondary_acceptable_latency_ms)
kwargs['exhaust'] = self.__exhaust
if self.__connection_id is not None:
kwargs["_connection_to_use"] = self.__connection_id
kwargs.update(self.__kwargs)
if isinstance(response, tuple):
(connection_id, response) = response
else:
connection_id = None
self.__connection_id = connection_id
try:
res = client._send_message_with_response(message, **kwargs)
self.__connection_id, (response, sock, pool) = res
if self.__exhaust:
self.__exhaust_mgr = _SocketManager(sock, pool)
except AutoReconnect:
# Don't try to send kill cursors on another socket
# or to another server. It can cause a _pinValue
# assertion on some server releases if we get here
# due to a socket timeout.
self.__killed = True
raise
else: # exhaust cursor - no getMore message
response = client._exhaust_next(self.__exhaust_mgr.sock)
try:
response = helpers._unpack_response(response, self.__id,
@ -727,7 +787,7 @@ class Cursor(object):
# Don't send kill cursors to another server after a "not master"
# error. It's completely pointless.
self.__killed = True
db.connection.disconnect()
client.disconnect()
raise
self.__id = response["cursor_id"]
@ -743,6 +803,11 @@ class Cursor(object):
if self.__limit and self.__id and self.__limit <= self.__retrieved:
self.__die()
# Don't wait for garbage collection to call __del__, return the
# socket to the pool now.
if self.__exhaust and self.__id == 0:
self.__exhaust_mgr.close()
def _refresh(self):
"""Refreshes the cursor with more data from Mongo.
@ -776,9 +841,14 @@ class Cursor(object):
else:
limit = self.__batch_size
self.__send_message(
message.get_more(self.__collection.full_name,
limit, self.__id))
# Exhaust cursors don't send getMore messages.
if self.__exhaust:
self.__send_message(None)
else:
self.__send_message(
message.get_more(self.__collection.full_name,
limit, self.__id))
else: # Cursor id is zero nothing else to return
self.__killed = True

View File

@ -178,20 +178,20 @@ class MasterSlaveConnection(BaseObject):
"""
if _connection_to_use is not None:
if _connection_to_use == -1:
return (-1,
self.__master._send_message_with_response(message,
**kwargs))
member = self.__master
conn = -1
else:
return (_connection_to_use,
self.__slaves[_connection_to_use]
._send_message_with_response(message, **kwargs))
member = self.__slaves[_connection_to_use]
conn = _connection_to_use
return (conn,
member._send_message_with_response(message, **kwargs)[1])
# _must_use_master is set for commands, which must be sent to the
# master instance. any queries in a request must be sent to the
# master since that is where writes go.
if _must_use_master or self.in_request():
return (-1, self.__master._send_message_with_response(message,
**kwargs))
**kwargs)[1])
# Iterate through the slaves randomly until we have success. Raise
# reconnect if they all fail.
@ -199,7 +199,8 @@ class MasterSlaveConnection(BaseObject):
try:
slave = self.__slaves[connection_id]
return (connection_id,
slave._send_message_with_response(message, **kwargs))
slave._send_message_with_response(message,
**kwargs)[1])
except AutoReconnect:
pass

View File

@ -975,16 +975,18 @@ class MongoClient(common.BaseObject):
message += chunk
return message
def __receive_message_on_socket(self, operation, request_id, sock_info):
"""Receive a message in response to `request_id` on `sock`.
def __receive_message_on_socket(self, operation, rqst_id, sock_info):
"""Receive a message in response to `rqst_id` on `sock`.
Returns the response data with the header removed.
"""
header = self.__receive_data_on_socket(16, sock_info)
length = struct.unpack("<i", header[:4])[0]
msg_req_id = struct.unpack("<i", header[8:12])[0]
assert request_id == msg_req_id, \
"ids don't match %r %r" % (request_id, msg_req_id)
# No rqst_id for exhaust cursor "getMore".
if rqst_id is not None:
resp_id = struct.unpack("<i", header[8:12])[0]
assert rqst_id == resp_id, "ids don't match %r %r" % (rqst_id,
resp_id)
assert operation == struct.unpack("<i", header[12:])[0]
return self.__receive_data_on_socket(length - 16, sock_info)
@ -1012,28 +1014,30 @@ class MongoClient(common.BaseObject):
- `message`: (request_id, data) pair making up the message to send
"""
sock_info = self.__socket()
exhaust = kwargs.get('exhaust')
try:
try:
if "network_timeout" in kwargs:
if not exhaust and "network_timeout" in kwargs:
sock_info.sock.settimeout(kwargs["network_timeout"])
return self.__send_and_receive(message, sock_info)
response = self.__send_and_receive(message, sock_info)
if not exhaust:
if "network_timeout" in kwargs:
sock_info.sock.settimeout(self.__net_timeout)
return (None, (response, sock_info, self.__pool))
except (ConnectionFailure, socket.error), e:
self.disconnect()
raise AutoReconnect(str(e))
finally:
if "network_timeout" in kwargs:
try:
# Restore the socket's original timeout and return it to
# the pool
sock_info.sock.settimeout(self.__net_timeout)
self.__pool.maybe_return_socket(sock_info)
except socket.error:
# There was an exception and we've closed the socket
pass
else:
if not exhaust:
self.__pool.maybe_return_socket(sock_info)
def _exhaust_next(self, sock_info):
"""Used with exhaust cursors to get the next batch off the socket.
"""
return self.__receive_message_on_socket(1, None, sock_info)
def start_request(self):
"""Ensure the current thread or greenlet always uses the same socket
until it calls :meth:`end_request`. This ensures consistent reads,

View File

@ -1327,16 +1327,18 @@ class MongoReplicaSetClient(common.BaseObject):
message += chunk
return message
def __recv_msg(self, operation, request_id, sock):
"""Receive a message in response to `request_id` on `sock`.
def __recv_msg(self, operation, rqst_id, sock):
"""Receive a message in response to `rqst_id` on `sock`.
Returns the response data with the header removed.
"""
header = self.__recv_data(16, sock)
length = struct.unpack("<i", header[:4])[0]
resp_id = struct.unpack("<i", header[8:12])[0]
assert resp_id == request_id, "ids don't match %r %r" % (resp_id,
request_id)
# No rqst_id for exhaust cursor "getMore".
if rqst_id is not None:
resp_id = struct.unpack("<i", header[8:12])[0]
assert rqst_id == resp_id, "ids don't match %r %r" % (rqst_id,
resp_id)
assert operation == struct.unpack("<i", header[12:])[0]
return self.__recv_data(length - 16, sock)
@ -1421,29 +1423,29 @@ class MongoReplicaSetClient(common.BaseObject):
Can raise socket.error.
"""
sock_info = None
exhaust = kwargs.get('exhaust')
rqst_id, data = self.__check_bson_size(msg, member.max_bson_size)
try:
try:
sock_info = self.__socket(member)
sock_info = self.__socket(member)
if "network_timeout" in kwargs:
sock_info.sock.settimeout(kwargs['network_timeout'])
if not exhaust and "network_timeout" in kwargs:
sock_info.sock.settimeout(kwargs['network_timeout'])
rqst_id, data = self.__check_bson_size(msg, member.max_bson_size)
sock_info.sock.sendall(data)
response = self.__recv_msg(1, rqst_id, sock_info)
sock_info.sock.sendall(data)
response = self.__recv_msg(1, rqst_id, sock_info)
if not exhaust:
if "network_timeout" in kwargs:
sock_info.sock.settimeout(self.__net_timeout)
return response
except:
if sock_info is not None:
sock_info.close()
raise
finally:
if sock_info:
member.pool.maybe_return_socket(sock_info)
return response, sock_info, member.pool
except:
if sock_info is not None:
sock_info.close()
member.pool.maybe_return_socket(sock_info)
raise
def __try_read(self, member, msg, **kwargs):
"""Attempt a read from a member; on failure mark the member "down" and
wake up the monitor thread to refresh as soon as possible.
@ -1592,6 +1594,11 @@ class MongoReplicaSetClient(common.BaseObject):
raise AutoReconnect(msg, errors)
def _exhaust_next(self, sock_info):
"""Used with exhaust cursors to get the next batch off the socket.
"""
return self.__recv_msg(1, None, sock_info)
def start_request(self):
"""Ensure the current thread or greenlet always uses the same socket
until it calls :meth:`end_request`. For

View File

@ -1521,6 +1521,58 @@ class TestCollection(unittest.TestCase):
list(self.db.test.find(timeout=False))
list(self.db.test.find(timeout=True))
def test_exhaust(self):
if is_mongos(self.db.connection):
self.assertRaises(InvalidOperation,
self.db.test.find, exhaust=True)
return
self.assertRaises(TypeError, self.db.test.find, exhaust=5)
# Limit is incompatible with exhaust.
self.assertRaises(InvalidOperation,
self.db.test.find, exhaust=True, limit=5)
cur = self.db.test.find(exhaust=True)
self.assertRaises(InvalidOperation, cur.limit, 5)
cur = self.db.test.find(limit=5)
self.assertRaises(InvalidOperation, cur.add_option, 64)
cur = self.db.test.find()
cur.add_option(64)
self.assertRaises(InvalidOperation, cur.limit, 5)
self.db.drop_collection("test")
# Insert enough documents to require more than one batch
self.db.test.insert([{'i': i} for i in xrange(150)])
client = get_client(max_pool_size=1)
socks = client._MongoClient__pool.sockets
self.assertEqual(1, len(socks))
# Make sure the socket is returned after exhaustion.
cur = client[self.db.name].test.find(exhaust=True)
cur.next()
self.assertEqual(0, len(socks))
for doc in cur:
pass
self.assertEqual(1, len(socks))
# Same as previous but don't call next()
for doc in client[self.db.name].test.find(exhaust=True):
pass
self.assertEqual(1, len(socks))
# If the Cursor intance is discarded before being
# completely interated we have to close and
# discard the socket.
cur = client[self.db.name].test.find(exhaust=True)
cur.next()
self.assertEqual(0, len(socks))
if sys.platform.startswith('java') or 'PyPy' in sys.version:
# Don't wait for GC or use gc.collect(), it's unreliable.
cur.close()
cur = None
# The socket should be discarded.
self.assertEqual(0, len(socks))
def test_distinct(self):
if not version.at_least(self.db.connection, (1, 1)):
raise SkipTest("distinct command requires MongoDB >= 1.1")

View File

@ -476,7 +476,7 @@ class TestCursor(unittest.TestCase):
partial=True,
manipulate=False,
fields={'_id': False}).limit(2)
cursor.add_option(64)
cursor.add_option(128)
cursor2 = cursor.clone()
self.assertEqual(cursor._Cursor__skip, cursor2._Cursor__skip)

View File

@ -124,7 +124,7 @@ class TestMasterSlaveConnection(unittest.TestCase, TestRequestMixin):
Slave.calls += 1
if self._fail:
raise AutoReconnect()
return 'sent'
return (None, 'sent')
class NotRandomList(object):
last_idx = -1