PYTHON-736 - Don't close sockets on OperationFailure
This also speeds up returning exhaust sockets to the pool when the server returns an error and fixes the tests to run against all MongoDB versions we test against.
This commit is contained in:
parent
9ad421a58a
commit
d32016274b
@ -22,8 +22,8 @@ from bson.son import SON
|
||||
from pymongo import helpers, message, read_preferences
|
||||
from pymongo.read_preferences import ReadPreference, secondary_ok_commands
|
||||
from pymongo.errors import (AutoReconnect,
|
||||
CursorNotFound,
|
||||
InvalidOperation)
|
||||
InvalidOperation,
|
||||
OperationFailure)
|
||||
|
||||
_QUERY_OPTIONS = {
|
||||
"tailable_cursor": 2,
|
||||
@ -927,7 +927,7 @@ class Cursor(object):
|
||||
# Exhaust cursor - no getMore message.
|
||||
try:
|
||||
response = client._exhaust_next(self.__exhaust_mgr.sock)
|
||||
except:
|
||||
except AutoReconnect:
|
||||
self.__killed = True
|
||||
self.__exhaust_mgr.error()
|
||||
raise
|
||||
@ -938,8 +938,10 @@ class Cursor(object):
|
||||
self.__tz_aware,
|
||||
self.__uuid_subtype,
|
||||
self.__compile_re)
|
||||
except CursorNotFound:
|
||||
except OperationFailure:
|
||||
self.__killed = True
|
||||
# Make sure exhaust socket is returned immediately, if necessary.
|
||||
self.__die()
|
||||
# If this is a tailable cursor the error is likely
|
||||
# due to capped collection roll over. Setting
|
||||
# self.__killed to True ensures Cursor.alive will be
|
||||
@ -951,6 +953,8 @@ class Cursor(object):
|
||||
# Don't send kill cursors to another server after a "not master"
|
||||
# error. It's completely pointless.
|
||||
self.__killed = True
|
||||
# Make sure exhaust socket is returned immediately, if necessary.
|
||||
self.__die()
|
||||
client.disconnect()
|
||||
raise
|
||||
|
||||
|
||||
@ -23,6 +23,8 @@ import threading
|
||||
import time
|
||||
|
||||
from nose.plugins.skip import SkipTest
|
||||
|
||||
from bson.son import SON
|
||||
from pymongo import MongoClient, MongoReplicaSetClient
|
||||
from pymongo.errors import AutoReconnect, ConnectionFailure, OperationFailure
|
||||
from pymongo.pool import NO_REQUEST, NO_SOCKET_YET, SocketInfo
|
||||
@ -588,15 +590,6 @@ class _TestLazyConnectMixin(object):
|
||||
c.max_message_size)
|
||||
|
||||
|
||||
def collect_until(fn):
|
||||
start = time.time()
|
||||
while not fn():
|
||||
if (time.time() - start) > 5:
|
||||
raise AssertionError("timed out")
|
||||
|
||||
gc.collect()
|
||||
|
||||
|
||||
class _TestExhaustCursorMixin(object):
|
||||
"""Test that clients properly handle errors from exhaust cursors.
|
||||
|
||||
@ -609,15 +602,18 @@ class _TestExhaustCursorMixin(object):
|
||||
client = self._get_client(max_pool_size=1)
|
||||
if is_mongos(client):
|
||||
raise SkipTest("Can't use exhaust cursors with mongos")
|
||||
if not version.at_least(client, (2, 2, 0)):
|
||||
raise SkipTest("mongod < 2.2.0 closes exhaust socket on error")
|
||||
|
||||
collection = client.pymongo_test.test
|
||||
pool = get_pool(client)
|
||||
|
||||
sock_info = one(pool.sockets)
|
||||
cursor = collection.find({'$bad_query_operator': 1}, exhaust=True)
|
||||
# This will cause OperationFailure in all mongo versions since
|
||||
# the value for $orderby must be a document.
|
||||
cursor = collection.find(
|
||||
SON([('$query', {}), ('$orderby', True)]), exhaust=True)
|
||||
self.assertRaises(OperationFailure, cursor.next)
|
||||
del cursor
|
||||
collect_until(lambda: sock_info in pool.sockets)
|
||||
self.assertFalse(sock_info.closed)
|
||||
|
||||
# The semaphore was decremented despite the error.
|
||||
@ -639,7 +635,7 @@ class _TestExhaustCursorMixin(object):
|
||||
|
||||
# Enough data to ensure it streams down for a few milliseconds.
|
||||
long_str = 'a' * (256 * 1024)
|
||||
collection.insert([{'a': long_str} for _ in range(1000)])
|
||||
collection.insert([{'a': long_str} for _ in range(200)])
|
||||
|
||||
pool = get_pool(client)
|
||||
pool._check_interval_seconds = None # Never check.
|
||||
@ -653,12 +649,9 @@ class _TestExhaustCursorMixin(object):
|
||||
# Cause a server error on getmore.
|
||||
client2.pymongo_test.test.drop()
|
||||
self.assertRaises(OperationFailure, list, cursor)
|
||||
del cursor
|
||||
collect_until(lambda: sock_info.closed)
|
||||
self.assertFalse(sock_info in pool.sockets)
|
||||
|
||||
# The semaphore was decremented despite the error.
|
||||
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
|
||||
# Make sure the socket is still valid
|
||||
self.assertEqual(0, collection.count())
|
||||
|
||||
def test_exhaust_query_network_error(self):
|
||||
# When doing an exhaust query, the socket stays checked out on success
|
||||
|
||||
Loading…
Reference in New Issue
Block a user