diff --git a/pymongo/cursor.py b/pymongo/cursor.py index b5059afba..7ef62f779 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -22,8 +22,8 @@ from bson.son import SON from pymongo import helpers, message, read_preferences from pymongo.read_preferences import ReadPreference, secondary_ok_commands from pymongo.errors import (AutoReconnect, - CursorNotFound, - InvalidOperation) + InvalidOperation, + OperationFailure) _QUERY_OPTIONS = { "tailable_cursor": 2, @@ -927,7 +927,7 @@ class Cursor(object): # Exhaust cursor - no getMore message. try: response = client._exhaust_next(self.__exhaust_mgr.sock) - except: + except AutoReconnect: self.__killed = True self.__exhaust_mgr.error() raise @@ -938,8 +938,10 @@ class Cursor(object): self.__tz_aware, self.__uuid_subtype, self.__compile_re) - except CursorNotFound: + except OperationFailure: self.__killed = True + # Make sure exhaust socket is returned immediately, if necessary. + self.__die() # If this is a tailable cursor the error is likely # due to capped collection roll over. Setting # self.__killed to True ensures Cursor.alive will be @@ -951,6 +953,8 @@ class Cursor(object): # Don't send kill cursors to another server after a "not master" # error. It's completely pointless. self.__killed = True + # Make sure exhaust socket is returned immediately, if necessary. + self.__die() client.disconnect() raise diff --git a/test/utils.py b/test/utils.py index c303e5b62..e7b7f52c8 100644 --- a/test/utils.py +++ b/test/utils.py @@ -23,6 +23,8 @@ import threading import time from nose.plugins.skip import SkipTest + +from bson.son import SON from pymongo import MongoClient, MongoReplicaSetClient from pymongo.errors import AutoReconnect, ConnectionFailure, OperationFailure from pymongo.pool import NO_REQUEST, NO_SOCKET_YET, SocketInfo @@ -588,15 +590,6 @@ class _TestLazyConnectMixin(object): c.max_message_size) -def collect_until(fn): - start = time.time() - while not fn(): - if (time.time() - start) > 5: - raise AssertionError("timed out") - - gc.collect() - - class _TestExhaustCursorMixin(object): """Test that clients properly handle errors from exhaust cursors. @@ -609,15 +602,18 @@ class _TestExhaustCursorMixin(object): client = self._get_client(max_pool_size=1) if is_mongos(client): raise SkipTest("Can't use exhaust cursors with mongos") + if not version.at_least(client, (2, 2, 0)): + raise SkipTest("mongod < 2.2.0 closes exhaust socket on error") collection = client.pymongo_test.test pool = get_pool(client) sock_info = one(pool.sockets) - cursor = collection.find({'$bad_query_operator': 1}, exhaust=True) + # This will cause OperationFailure in all mongo versions since + # the value for $orderby must be a document. + cursor = collection.find( + SON([('$query', {}), ('$orderby', True)]), exhaust=True) self.assertRaises(OperationFailure, cursor.next) - del cursor - collect_until(lambda: sock_info in pool.sockets) self.assertFalse(sock_info.closed) # The semaphore was decremented despite the error. @@ -639,7 +635,7 @@ class _TestExhaustCursorMixin(object): # Enough data to ensure it streams down for a few milliseconds. long_str = 'a' * (256 * 1024) - collection.insert([{'a': long_str} for _ in range(1000)]) + collection.insert([{'a': long_str} for _ in range(200)]) pool = get_pool(client) pool._check_interval_seconds = None # Never check. @@ -653,12 +649,9 @@ class _TestExhaustCursorMixin(object): # Cause a server error on getmore. client2.pymongo_test.test.drop() self.assertRaises(OperationFailure, list, cursor) - del cursor - collect_until(lambda: sock_info.closed) - self.assertFalse(sock_info in pool.sockets) - # The semaphore was decremented despite the error. - self.assertTrue(pool._socket_semaphore.acquire(blocking=False)) + # Make sure the socket is still valid + self.assertEqual(0, collection.count()) def test_exhaust_query_network_error(self): # When doing an exhaust query, the socket stays checked out on success