From d32016274b95161e3b20c4244cf81fba71f21d22 Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Thu, 24 Jul 2014 13:22:23 -0700 Subject: [PATCH] PYTHON-736 - Don't close sockets on OperationFailure This also speeds up returning exhaust sockets to the pool when the server returns an error and fixes the tests to run against all MongoDB versions we test against. --- pymongo/cursor.py | 12 ++++++++---- test/utils.py | 29 +++++++++++------------------ 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/pymongo/cursor.py b/pymongo/cursor.py index b5059afba..7ef62f779 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -22,8 +22,8 @@ from bson.son import SON from pymongo import helpers, message, read_preferences from pymongo.read_preferences import ReadPreference, secondary_ok_commands from pymongo.errors import (AutoReconnect, - CursorNotFound, - InvalidOperation) + InvalidOperation, + OperationFailure) _QUERY_OPTIONS = { "tailable_cursor": 2, @@ -927,7 +927,7 @@ class Cursor(object): # Exhaust cursor - no getMore message. try: response = client._exhaust_next(self.__exhaust_mgr.sock) - except: + except AutoReconnect: self.__killed = True self.__exhaust_mgr.error() raise @@ -938,8 +938,10 @@ class Cursor(object): self.__tz_aware, self.__uuid_subtype, self.__compile_re) - except CursorNotFound: + except OperationFailure: self.__killed = True + # Make sure exhaust socket is returned immediately, if necessary. + self.__die() # If this is a tailable cursor the error is likely # due to capped collection roll over. Setting # self.__killed to True ensures Cursor.alive will be @@ -951,6 +953,8 @@ class Cursor(object): # Don't send kill cursors to another server after a "not master" # error. It's completely pointless. self.__killed = True + # Make sure exhaust socket is returned immediately, if necessary. + self.__die() client.disconnect() raise diff --git a/test/utils.py b/test/utils.py index c303e5b62..e7b7f52c8 100644 --- a/test/utils.py +++ b/test/utils.py @@ -23,6 +23,8 @@ import threading import time from nose.plugins.skip import SkipTest + +from bson.son import SON from pymongo import MongoClient, MongoReplicaSetClient from pymongo.errors import AutoReconnect, ConnectionFailure, OperationFailure from pymongo.pool import NO_REQUEST, NO_SOCKET_YET, SocketInfo @@ -588,15 +590,6 @@ class _TestLazyConnectMixin(object): c.max_message_size) -def collect_until(fn): - start = time.time() - while not fn(): - if (time.time() - start) > 5: - raise AssertionError("timed out") - - gc.collect() - - class _TestExhaustCursorMixin(object): """Test that clients properly handle errors from exhaust cursors. @@ -609,15 +602,18 @@ class _TestExhaustCursorMixin(object): client = self._get_client(max_pool_size=1) if is_mongos(client): raise SkipTest("Can't use exhaust cursors with mongos") + if not version.at_least(client, (2, 2, 0)): + raise SkipTest("mongod < 2.2.0 closes exhaust socket on error") collection = client.pymongo_test.test pool = get_pool(client) sock_info = one(pool.sockets) - cursor = collection.find({'$bad_query_operator': 1}, exhaust=True) + # This will cause OperationFailure in all mongo versions since + # the value for $orderby must be a document. + cursor = collection.find( + SON([('$query', {}), ('$orderby', True)]), exhaust=True) self.assertRaises(OperationFailure, cursor.next) - del cursor - collect_until(lambda: sock_info in pool.sockets) self.assertFalse(sock_info.closed) # The semaphore was decremented despite the error. @@ -639,7 +635,7 @@ class _TestExhaustCursorMixin(object): # Enough data to ensure it streams down for a few milliseconds. long_str = 'a' * (256 * 1024) - collection.insert([{'a': long_str} for _ in range(1000)]) + collection.insert([{'a': long_str} for _ in range(200)]) pool = get_pool(client) pool._check_interval_seconds = None # Never check. @@ -653,12 +649,9 @@ class _TestExhaustCursorMixin(object): # Cause a server error on getmore. client2.pymongo_test.test.drop() self.assertRaises(OperationFailure, list, cursor) - del cursor - collect_until(lambda: sock_info.closed) - self.assertFalse(sock_info in pool.sockets) - # The semaphore was decremented despite the error. - self.assertTrue(pool._socket_semaphore.acquire(blocking=False)) + # Make sure the socket is still valid + self.assertEqual(0, collection.count()) def test_exhaust_query_network_error(self): # When doing an exhaust query, the socket stays checked out on success