From 05565783d9c1a50e1ff76200ca6149db4f45a2d0 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 14 Feb 2019 15:19:38 -0800 Subject: [PATCH] PYTHON-1759 Find/getMore command failures reset servers's SDAM state (#406) --- pymongo/command_cursor.py | 16 ++++----- pymongo/cursor.py | 16 ++++----- pymongo/mongo_client.py | 72 +++++++++++++++++---------------------- 3 files changed, 48 insertions(+), 56 deletions(-) diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index 0875cd4e0..bf7d86984 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -149,13 +149,14 @@ class CommandCursor(object): reply = response.data try: - docs = self._unpack_response(reply, - self.__id, - self.__collection.codec_options) - if from_command: - first = docs[0] - client._receive_cluster_time(first, self.__session) - helpers._check_command_response(first) + with client._reset_on_error(self.__address): + docs = self._unpack_response(reply, + self.__id, + self.__collection.codec_options) + if from_command: + first = docs[0] + client._receive_cluster_time(first, self.__session) + helpers._check_command_response(first) except OperationFailure as exc: kill() @@ -174,7 +175,6 @@ class CommandCursor(object): listeners.publish_command_failure( duration(), exc.details, "getMore", rqst_id, self.__address) - client._reset_server_and_request_check(self.address) raise except Exception as exc: if publish: diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 6c45fb6d7..ead35d88f 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -973,13 +973,14 @@ class Cursor(object): raise try: - docs = self._unpack_response(response=reply, - cursor_id=self.__id, - codec_options=self.__codec_options) - if from_command: - first = docs[0] - client._receive_cluster_time(first, self.__session) - helpers._check_command_response(first) + with client._reset_on_error(self.__address): + docs = self._unpack_response(reply, + self.__id, + self.__collection.codec_options) + if from_command: + first = docs[0] + client._receive_cluster_time(first, self.__session) + helpers._check_command_response(first) except OperationFailure as exc: self.__killed = True @@ -1009,7 +1010,6 @@ class Cursor(object): listeners.publish_command_failure( duration(), exc.details, cmd_name, rqst_id, self.__address) - client._reset_server_and_request_check(self.__address) raise except Exception as exc: if publish: diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 889b61048..15b21bc77 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1113,32 +1113,9 @@ class MongoClient(common.BaseObject): @contextlib.contextmanager def _get_socket(self, server): - try: + with self._reset_on_error(server.description.address): with server.get_socket(self.__all_credentials) as sock_info: yield sock_info - except NetworkTimeout: - # The socket has been closed. Don't reset the server. - # Server Discovery And Monitoring Spec: "When an application - # operation fails because of any network error besides a socket - # timeout...." - raise - except NotMasterError: - # "When the client sees a "not master" error it MUST replace the - # server's description with type Unknown. It MUST request an - # immediate check of the server." - self._reset_server_and_request_check(server.description.address) - raise - except ConnectionFailure: - # "Client MUST replace the server's description with type Unknown - # ... MUST NOT request an immediate check of the server." - self.__reset_server(server.description.address) - raise - except OperationFailure as exc: - if exc.code in helpers._RETRYABLE_ERROR_CODES: - # Do not request an immediate check since the server is likely - # shutting down. - self.__reset_server(server.description.address) - raise def _select_server(self, server_selector, session, address=None): """Select a server to run an operation on this client. @@ -1217,30 +1194,45 @@ class MongoClient(common.BaseObject): and server.description.server_type != SERVER_TYPE.Mongos) or ( operation.read_preference != ReadPreference.PRIMARY) - return self._reset_on_error( - server, - server.send_message_with_response, - operation, - set_slave_ok, - self.__all_credentials, - self._event_listeners, - exhaust) + with self._reset_on_error(server.description.address): + return server.send_message_with_response( + operation, + set_slave_ok, + self.__all_credentials, + self._event_listeners, + exhaust) - def _reset_on_error(self, server, func, *args, **kwargs): - """Execute an operation. Reset the server on network error. + @contextlib.contextmanager + def _reset_on_error(self, server_address): + """On "not master" or "node is recovering" errors reset the server + according to the SDAM spec. - Returns fn()'s return value on success. On error, clears the server's - pool and marks the server Unknown. - - Re-raises any exception thrown by fn(). + Unpin the session on transient transaction errors. """ try: - return func(*args, **kwargs) + yield except NetworkTimeout: # The socket has been closed. Don't reset the server. + # Server Discovery And Monitoring Spec: "When an application + # operation fails because of any network error besides a socket + # timeout...." + raise + except NotMasterError: + # "When the client sees a "not master" error it MUST replace the + # server's description with type Unknown. It MUST request an + # immediate check of the server." + self._reset_server_and_request_check(server_address) raise except ConnectionFailure: - self.__reset_server(server.description.address) + # "Client MUST replace the server's description with type Unknown + # ... MUST NOT request an immediate check of the server." + self.__reset_server(server_address) + raise + except OperationFailure as exc: + if exc.code in helpers._RETRYABLE_ERROR_CODES: + # Do not request an immediate check since the server is likely + # shutting down. + self.__reset_server(server_address) raise def _retry_with_session(self, retryable, func, session, bulk):