PYTHON-1759 Find/getMore command failures reset servers's SDAM state (#406)

This commit is contained in:
Shane Harvey 2019-02-14 15:19:38 -08:00
parent 0ca7ccd52c
commit 05565783d9
3 changed files with 48 additions and 56 deletions

View File

@ -149,13 +149,14 @@ class CommandCursor(object):
reply = response.data
try:
docs = self._unpack_response(reply,
self.__id,
self.__collection.codec_options)
if from_command:
first = docs[0]
client._receive_cluster_time(first, self.__session)
helpers._check_command_response(first)
with client._reset_on_error(self.__address):
docs = self._unpack_response(reply,
self.__id,
self.__collection.codec_options)
if from_command:
first = docs[0]
client._receive_cluster_time(first, self.__session)
helpers._check_command_response(first)
except OperationFailure as exc:
kill()
@ -174,7 +175,6 @@ class CommandCursor(object):
listeners.publish_command_failure(
duration(), exc.details, "getMore", rqst_id, self.__address)
client._reset_server_and_request_check(self.address)
raise
except Exception as exc:
if publish:

View File

@ -973,13 +973,14 @@ class Cursor(object):
raise
try:
docs = self._unpack_response(response=reply,
cursor_id=self.__id,
codec_options=self.__codec_options)
if from_command:
first = docs[0]
client._receive_cluster_time(first, self.__session)
helpers._check_command_response(first)
with client._reset_on_error(self.__address):
docs = self._unpack_response(reply,
self.__id,
self.__collection.codec_options)
if from_command:
first = docs[0]
client._receive_cluster_time(first, self.__session)
helpers._check_command_response(first)
except OperationFailure as exc:
self.__killed = True
@ -1009,7 +1010,6 @@ class Cursor(object):
listeners.publish_command_failure(
duration(), exc.details, cmd_name, rqst_id, self.__address)
client._reset_server_and_request_check(self.__address)
raise
except Exception as exc:
if publish:

View File

@ -1113,32 +1113,9 @@ class MongoClient(common.BaseObject):
@contextlib.contextmanager
def _get_socket(self, server):
try:
with self._reset_on_error(server.description.address):
with server.get_socket(self.__all_credentials) as sock_info:
yield sock_info
except NetworkTimeout:
# The socket has been closed. Don't reset the server.
# Server Discovery And Monitoring Spec: "When an application
# operation fails because of any network error besides a socket
# timeout...."
raise
except NotMasterError:
# "When the client sees a "not master" error it MUST replace the
# server's description with type Unknown. It MUST request an
# immediate check of the server."
self._reset_server_and_request_check(server.description.address)
raise
except ConnectionFailure:
# "Client MUST replace the server's description with type Unknown
# ... MUST NOT request an immediate check of the server."
self.__reset_server(server.description.address)
raise
except OperationFailure as exc:
if exc.code in helpers._RETRYABLE_ERROR_CODES:
# Do not request an immediate check since the server is likely
# shutting down.
self.__reset_server(server.description.address)
raise
def _select_server(self, server_selector, session, address=None):
"""Select a server to run an operation on this client.
@ -1217,30 +1194,45 @@ class MongoClient(common.BaseObject):
and server.description.server_type != SERVER_TYPE.Mongos) or (
operation.read_preference != ReadPreference.PRIMARY)
return self._reset_on_error(
server,
server.send_message_with_response,
operation,
set_slave_ok,
self.__all_credentials,
self._event_listeners,
exhaust)
with self._reset_on_error(server.description.address):
return server.send_message_with_response(
operation,
set_slave_ok,
self.__all_credentials,
self._event_listeners,
exhaust)
def _reset_on_error(self, server, func, *args, **kwargs):
"""Execute an operation. Reset the server on network error.
@contextlib.contextmanager
def _reset_on_error(self, server_address):
"""On "not master" or "node is recovering" errors reset the server
according to the SDAM spec.
Returns fn()'s return value on success. On error, clears the server's
pool and marks the server Unknown.
Re-raises any exception thrown by fn().
Unpin the session on transient transaction errors.
"""
try:
return func(*args, **kwargs)
yield
except NetworkTimeout:
# The socket has been closed. Don't reset the server.
# Server Discovery And Monitoring Spec: "When an application
# operation fails because of any network error besides a socket
# timeout...."
raise
except NotMasterError:
# "When the client sees a "not master" error it MUST replace the
# server's description with type Unknown. It MUST request an
# immediate check of the server."
self._reset_server_and_request_check(server_address)
raise
except ConnectionFailure:
self.__reset_server(server.description.address)
# "Client MUST replace the server's description with type Unknown
# ... MUST NOT request an immediate check of the server."
self.__reset_server(server_address)
raise
except OperationFailure as exc:
if exc.code in helpers._RETRYABLE_ERROR_CODES:
# Do not request an immediate check since the server is likely
# shutting down.
self.__reset_server(server_address)
raise
def _retry_with_session(self, retryable, func, session, bulk):