From 4d422586970599a179df06c8ebe95c5d28edd976 Mon Sep 17 00:00:00 2001 From: behackett Date: Mon, 5 Aug 2013 11:29:11 -0700 Subject: [PATCH] Support exhaust cursor flag PYTHON-265 --- doc/api/pymongo/collection.rst | 2 +- pymongo/collection.py | 26 +++++ pymongo/cursor.py | 138 ++++++++++++++++++++------- pymongo/master_slave_connection.py | 17 ++-- pymongo/mongo_client.py | 40 ++++---- pymongo/mongo_replica_set_client.py | 47 +++++---- test/test_collection.py | 52 ++++++++++ test/test_cursor.py | 2 +- test/test_master_slave_connection.py | 2 +- 9 files changed, 243 insertions(+), 83 deletions(-) diff --git a/doc/api/pymongo/collection.rst b/doc/api/pymongo/collection.rst index 672d02ea9..3288d8916 100644 --- a/doc/api/pymongo/collection.rst +++ b/doc/api/pymongo/collection.rst @@ -33,7 +33,7 @@ .. automethod:: update(spec, document[, upsert=False[, manipulate=False[, safe=None[, multi=False[, check_keys=True[, **kwargs]]]]]]) .. automethod:: remove([spec_or_id=None[, safe=None[, **kwargs]]]) .. automethod:: drop - .. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, slave_okay=False[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, **kwargs]]]]]]]]]]]]]]]]) + .. automethod:: find([spec=None[, fields=None[, skip=0[, limit=0[, timeout=True[, snapshot=False[, tailable=False[, sort=None[, max_scan=None[, as_class=None[, slave_okay=False[, await_data=False[, partial=False[, manipulate=True[, read_preference=ReadPreference.PRIMARY[, exhaust=False[,**kwargs]]]]]]]]]]]]]]]]]) .. automethod:: find_one([spec_or_id=None[, *args[, **kwargs]]]) .. automethod:: count .. automethod:: create_index diff --git a/pymongo/collection.py b/pymongo/collection.py index 9b1cbd986..5fc4568c7 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -679,6 +679,32 @@ class Collection(common.BaseObject): the nearest member may accept reads. Default 15 milliseconds. **Ignored by mongos** and must be configured on the command line. See the localThreshold_ option for more information. + - `exhaust` (optional): If ``True`` create an "exhaust" cursor. + MongoDB will stream batched results to the client without waiting + for the client to request each batch, reducing latency. + + .. note:: There are a number of caveats to using the `exhaust` + parameter: + + 1. The `exhaust` and `limit` options are incompatible and can + not be used together. + + 2. The `exhaust` option is not supported by mongos and can not be + used with a sharded cluster. + + 3. A :class:`~pymongo.cursor.Cursor` instance created with the + `exhaust` option requires an exclusive :class:`~socket.socket` + connection to MongoDB. If the :class:`~pymongo.cursor.Cursor` is + discarded without being completely iterated the underlying + :class:`~socket.socket` connection will be closed and discarded + without being returned to the connection pool. + + 4. A :class:`~pymongo.cursor.Cursor` instance created with the + `exhaust` option in a :doc:`request ` **must** + be completely iterated before executing any other operation. + + 5. The `network_timeout` option is ignored when using the + `exhaust` option. .. note:: The `manipulate` parameter may default to False in a future release. diff --git a/pymongo/cursor.py b/pymongo/cursor.py index e01f81bca..33ec0526a 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -34,6 +34,28 @@ _QUERY_OPTIONS = { "partial": 128} +# This has to be an old style class due to +# http://bugs.jython.org/issue1057 +class _SocketManager: + """Used with exhaust cursors to ensure the socket is returned. + """ + def __init__(self, sock, pool): + self.sock = sock + self.pool = pool + self.__closed = False + + def __del__(self): + self.close() + + def close(self): + """Return this instance's socket to the connection pool. + """ + if not self.__closed: + self.__closed = True + self.pool.maybe_return_socket(self.sock) + self.sock, self.pool = None, None + + # TODO might be cool to be able to do find().include("foo") or # find().exclude(["bar", "baz"]) or find().slice("a", 1, 2) as an # alternative to the fields specifier. @@ -46,7 +68,7 @@ class Cursor(object): max_scan=None, as_class=None, slave_okay=False, await_data=False, partial=False, manipulate=True, read_preference=ReadPreference.PRIMARY, tag_sets=[{}], - secondary_acceptable_latency_ms=None, + secondary_acceptable_latency_ms=None, exhaust=False, _must_use_master=False, _uuid_subtype=None, **kwargs): """Create a new cursor. @@ -78,6 +100,8 @@ class Cursor(object): raise TypeError("await_data must be an instance of bool") if not isinstance(partial, bool): raise TypeError("partial must be an instance of bool") + if not isinstance(exhaust, bool): + raise TypeError("exhaust must be an instance of bool") if fields is not None: if not fields: @@ -95,6 +119,15 @@ class Cursor(object): self.__limit = limit self.__batch_size = 0 + # Exhaust cursor support + if self.__collection.database.connection.is_mongos and exhaust: + raise InvalidOperation('Exhaust cursors are ' + 'not supported by mongos') + if limit and exhaust: + raise InvalidOperation("Can't use limit and exhaust together.") + self.__exhaust = exhaust + self.__exhaust_mgr = None + # This is ugly. People want to be able to do cursor[5:5] and # get an empty result set (old behavior was an # exception). It's hard to do that right, though, because the @@ -193,11 +226,19 @@ class Cursor(object): """Closes this cursor. """ if self.__id and not self.__killed: - connection = self.__collection.database.connection - if self.__connection_id is not None: - connection.close_cursor(self.__id, self.__connection_id) + if self.__exhaust and self.__exhaust_mgr: + # If this is an exhaust cursor and we haven't completely + # exhausted the result set we *must* close the socket + # to stop the server from sending more data. + self.__exhaust_mgr.sock.close() else: - connection.close_cursor(self.__id) + connection = self.__collection.database.connection + if self.__connection_id is not None: + connection.close_cursor(self.__id, self.__connection_id) + else: + connection.close_cursor(self.__id) + if self.__exhaust and self.__exhaust_mgr: + self.__exhaust_mgr.close() self.__killed = True def close(self): @@ -299,6 +340,8 @@ class Cursor(object): options |= _QUERY_OPTIONS["no_timeout"] if self.__await_data: options |= _QUERY_OPTIONS["await_data"] + if self.__exhaust: + options |= _QUERY_OPTIONS["exhaust"] if self.__partial: options |= _QUERY_OPTIONS["partial"] return options @@ -319,6 +362,14 @@ class Cursor(object): raise TypeError("mask must be an int") self.__check_okay_to_chain() + if mask & _QUERY_OPTIONS["exhaust"]: + if self.__limit: + raise InvalidOperation("Can't use limit and exhaust together.") + if self.__collection.database.connection.is_mongos: + raise InvalidOperation('Exhaust cursors are ' + 'not supported by mongos') + self.__exhaust = True + self.__query_flags |= mask return self @@ -332,6 +383,9 @@ class Cursor(object): raise TypeError("mask must be an int") self.__check_okay_to_chain() + if mask & _QUERY_OPTIONS["exhaust"]: + self.__exhaust = False + self.__query_flags &= ~mask return self @@ -350,6 +404,8 @@ class Cursor(object): """ if not isinstance(limit, int): raise TypeError("limit must be an int") + if self.__exhaust: + raise InvalidOperation("Can't use limit and exhaust together.") self.__check_okay_to_chain() self.__empty = False @@ -689,34 +745,38 @@ class Cursor(object): def __send_message(self, message): """Send a query or getmore message and handles the response. + + If message is ``None`` this is an exhaust cursor, which reads + the next result batch off the exhaust socket instead of + sending getMore messages to the server. """ - db = self.__collection.database - kwargs = {"_must_use_master": self.__must_use_master} - kwargs["read_preference"] = self.__read_preference - kwargs["tag_sets"] = self.__tag_sets - kwargs["secondary_acceptable_latency_ms"] = ( - self.__secondary_acceptable_latency_ms) - if self.__connection_id is not None: - kwargs["_connection_to_use"] = self.__connection_id - kwargs.update(self.__kwargs) + client = self.__collection.database.connection - try: - response = db.connection._send_message_with_response(message, - **kwargs) - except AutoReconnect: - # Don't try to send kill cursors on another socket - # or to another server. It can cause a _pinValue - # assertion on some server releases if we get here - # due to a socket timeout. - self.__killed = True - raise + if message: + kwargs = {"_must_use_master": self.__must_use_master} + kwargs["read_preference"] = self.__read_preference + kwargs["tag_sets"] = self.__tag_sets + kwargs["secondary_acceptable_latency_ms"] = ( + self.__secondary_acceptable_latency_ms) + kwargs['exhaust'] = self.__exhaust + if self.__connection_id is not None: + kwargs["_connection_to_use"] = self.__connection_id + kwargs.update(self.__kwargs) - if isinstance(response, tuple): - (connection_id, response) = response - else: - connection_id = None - - self.__connection_id = connection_id + try: + res = client._send_message_with_response(message, **kwargs) + self.__connection_id, (response, sock, pool) = res + if self.__exhaust: + self.__exhaust_mgr = _SocketManager(sock, pool) + except AutoReconnect: + # Don't try to send kill cursors on another socket + # or to another server. It can cause a _pinValue + # assertion on some server releases if we get here + # due to a socket timeout. + self.__killed = True + raise + else: # exhaust cursor - no getMore message + response = client._exhaust_next(self.__exhaust_mgr.sock) try: response = helpers._unpack_response(response, self.__id, @@ -727,7 +787,7 @@ class Cursor(object): # Don't send kill cursors to another server after a "not master" # error. It's completely pointless. self.__killed = True - db.connection.disconnect() + client.disconnect() raise self.__id = response["cursor_id"] @@ -743,6 +803,11 @@ class Cursor(object): if self.__limit and self.__id and self.__limit <= self.__retrieved: self.__die() + # Don't wait for garbage collection to call __del__, return the + # socket to the pool now. + if self.__exhaust and self.__id == 0: + self.__exhaust_mgr.close() + def _refresh(self): """Refreshes the cursor with more data from Mongo. @@ -776,9 +841,14 @@ class Cursor(object): else: limit = self.__batch_size - self.__send_message( - message.get_more(self.__collection.full_name, - limit, self.__id)) + # Exhaust cursors don't send getMore messages. + if self.__exhaust: + self.__send_message(None) + else: + self.__send_message( + message.get_more(self.__collection.full_name, + limit, self.__id)) + else: # Cursor id is zero nothing else to return self.__killed = True diff --git a/pymongo/master_slave_connection.py b/pymongo/master_slave_connection.py index dbf00fac8..0ba826ba5 100644 --- a/pymongo/master_slave_connection.py +++ b/pymongo/master_slave_connection.py @@ -178,20 +178,20 @@ class MasterSlaveConnection(BaseObject): """ if _connection_to_use is not None: if _connection_to_use == -1: - return (-1, - self.__master._send_message_with_response(message, - **kwargs)) + member = self.__master + conn = -1 else: - return (_connection_to_use, - self.__slaves[_connection_to_use] - ._send_message_with_response(message, **kwargs)) + member = self.__slaves[_connection_to_use] + conn = _connection_to_use + return (conn, + member._send_message_with_response(message, **kwargs)[1]) # _must_use_master is set for commands, which must be sent to the # master instance. any queries in a request must be sent to the # master since that is where writes go. if _must_use_master or self.in_request(): return (-1, self.__master._send_message_with_response(message, - **kwargs)) + **kwargs)[1]) # Iterate through the slaves randomly until we have success. Raise # reconnect if they all fail. @@ -199,7 +199,8 @@ class MasterSlaveConnection(BaseObject): try: slave = self.__slaves[connection_id] return (connection_id, - slave._send_message_with_response(message, **kwargs)) + slave._send_message_with_response(message, + **kwargs)[1]) except AutoReconnect: pass diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index b02afa001..7b229b3ff 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -975,16 +975,18 @@ class MongoClient(common.BaseObject): message += chunk return message - def __receive_message_on_socket(self, operation, request_id, sock_info): - """Receive a message in response to `request_id` on `sock`. + def __receive_message_on_socket(self, operation, rqst_id, sock_info): + """Receive a message in response to `rqst_id` on `sock`. Returns the response data with the header removed. """ header = self.__receive_data_on_socket(16, sock_info) length = struct.unpack("= 1.1") diff --git a/test/test_cursor.py b/test/test_cursor.py index ef04f9af4..806ea1178 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -476,7 +476,7 @@ class TestCursor(unittest.TestCase): partial=True, manipulate=False, fields={'_id': False}).limit(2) - cursor.add_option(64) + cursor.add_option(128) cursor2 = cursor.clone() self.assertEqual(cursor._Cursor__skip, cursor2._Cursor__skip) diff --git a/test/test_master_slave_connection.py b/test/test_master_slave_connection.py index a69bdda35..ac39b1199 100644 --- a/test/test_master_slave_connection.py +++ b/test/test_master_slave_connection.py @@ -124,7 +124,7 @@ class TestMasterSlaveConnection(unittest.TestCase, TestRequestMixin): Slave.calls += 1 if self._fail: raise AutoReconnect() - return 'sent' + return (None, 'sent') class NotRandomList(object): last_idx = -1