diff --git a/pymongo/helpers.py b/pymongo/helpers.py index 6f752e1a1..010683ae6 100644 --- a/pymongo/helpers.py +++ b/pymongo/helpers.py @@ -131,6 +131,7 @@ def _check_command_response(response, msg=None, allowable_errors=None): response.get("code"), response) + # TODO: remove, this is moving to _check_gle_response if response.get("wtimeout", False): # MongoDB versions before 1.8.0 return the error message in an "errmsg" # field. If "errmsg" exists "err" will also exist set to None, so we @@ -179,6 +180,46 @@ def _check_command_response(response, msg=None, allowable_errors=None): raise OperationFailure(msg % errmsg, code, response) +def _check_gle_response(response): + """Return getlasterror response as a dict, or raise OperationFailure.""" + response = _unpack_response(response) + + assert response["number_returned"] == 1 + result = response["data"][0] + + # Did getlasterror itself fail? + _check_command_response(result) + + if result.get("wtimeout", False): + # MongoDB versions before 1.8.0 return the error message in an "errmsg" + # field. If "errmsg" exists "err" will also exist set to None, so we + # have to check for "errmsg" first. + raise WTimeoutError(result.get("errmsg", result.get("err")), + result.get("code"), + result) + + error_msg = result.get("err", "") + if error_msg is None: + return result + + if error_msg.startswith("not master"): + raise NotMasterError(error_msg) + + details = result + + # mongos returns the error code in an error object for some errors. + if "errObjects" in result: + for errobj in result["errObjects"]: + if errobj.get("err") == error_msg: + details = errobj + break + + code = details.get("code") + if code in (11000, 11001, 12582): + raise DuplicateKeyError(details["err"], code, result) + raise OperationFailure(details["err"], code, result) + + def _command(client, namespace, command, read_preference, codec_options, check=True, allowable_errors=None): """Internal command helper.""" diff --git a/pymongo/ismaster.py b/pymongo/ismaster.py index f9dc501c1..1c398c98d 100644 --- a/pymongo/ismaster.py +++ b/pymongo/ismaster.py @@ -46,12 +46,20 @@ def _get_server_type(doc): class IsMaster(object): - __slots__ = ('_doc', '_server_type') + __slots__ = ('_doc', '_server_type', '_is_writable', '_is_readable') def __init__(self, doc): """Parse an ismaster response from the server.""" self._server_type = _get_server_type(doc) self._doc = doc + self._is_writable = self._server_type in ( + SERVER_TYPE.RSPrimary, + SERVER_TYPE.Standalone, + SERVER_TYPE.Mongos) + + self._is_readable = ( + self.server_type == SERVER_TYPE.RSSecondary + or self._is_writable) @property def server_type(self): @@ -102,3 +110,11 @@ class IsMaster(object): @property def max_wire_version(self): return self._doc.get('maxWireVersion', common.MAX_WIRE_VERSION) + + @property + def is_writable(self): + return self._is_writable + + @property + def is_readable(self): + return self._is_readable diff --git a/pymongo/message.py b/pymongo/message.py index 65fc8ebe0..66376cee6 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -280,8 +280,9 @@ def _do_batched_insert(collection_name, docs, check_keys, if has_docs: # We have enough data, send this message. try: - client._send_message(_insert_message(data.getvalue(), - send_safe), send_safe) + request_id, msg = _insert_message(data.getvalue(), send_safe) + with client._get_socket_for_writes() as sock_info: + sock_info.legacy_write(msg, request_id, send_safe) # Exception type could be OperationFailure or a subtype # (e.g. DuplicateKeyError) except OperationFailure as exc: @@ -311,13 +312,16 @@ def _do_batched_insert(collection_name, docs, check_keys, if not has_docs: raise InvalidOperation("cannot do an empty bulk insert") - client._send_message(_insert_message(data.getvalue(), safe), safe) + request_id, msg = _insert_message(data.getvalue(), safe) + with client._get_socket_for_writes() as sock_info: + sock_info.legacy_write(msg, request_id, safe) # Re-raise any exception stored due to continue_on_error if last_error is not None: raise last_error -if _use_c: - _do_batched_insert = _cmessage._do_batched_insert +# TODO: reenable +# if _use_c: +# _do_batched_insert = _cmessage._do_batched_insert def _do_batched_write_command(namespace, operation, command, diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 7c592e7f2..343d4c34b 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -31,6 +31,7 @@ access: Database(MongoClient('localhost', 27017), u'test-database') """ +import contextlib import datetime import threading import warnings @@ -671,7 +672,22 @@ class MongoClient(common.BaseObject): self._topology.open() return self._topology + @contextlib.contextmanager + def _get_socket_for_writes(self): + server = self._get_topology().select_server(writable_server_selector) + with server.get_socket(self.__all_credentials) as sock_info: + # TODO: refactor with _reset_on_error + try: + yield sock_info + except NetworkTimeout: + # The socket has been closed. Don't reset the server. + raise + except ConnectionFailure: + self.__reset_server(server.description.address) + raise + def __check_gle_response(self, response, is_command): + # TODO: remove """Check a response to a lastError message for errors. `response` is a byte string representing a response to the message. diff --git a/pymongo/pool.py b/pymongo/pool.py index fc50df12a..034fe7ede 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -18,10 +18,11 @@ import socket import threading from bson.py3compat import u, itervalues -from pymongo import auth, thread_util +from pymongo import auth, helpers, thread_util from pymongo.errors import (AutoReconnect, ConnectionFailure, NetworkTimeout, + NotMasterError, OperationFailure) from pymongo.ismaster import IsMaster from pymongo.monotonic import time as _time @@ -183,6 +184,26 @@ class SocketInfo(object): except BaseException as error: self._raise_connection_failure(error) + def legacy_write(self, message, request_id, with_last_error): + """Send OP_INSERT, etc., optionally returning response as a dict. + + Can raise ConnectionFailure or OperationFailure. + + :Parameters: + - `message`: bytes, an OP_INSERT, OP_UPDATE, or OP_DELETE message, + perhaps with a getlasterror command appended. + - `request_id`: an int. + - `with_last_error`: True if a getlasterror command is appended. + """ + if not with_last_error and not self.ismaster.is_writable: + # Write won't succeed, bail as if we'd done a getlasterror. + raise NotMasterError("not master") + + self.send_message(message) + if with_last_error: + response = self.receive_message(1, request_id) + return helpers._check_gle_response(response) + def check_auth(self, all_credentials): """Update this socket's authentication. diff --git a/pymongo/server_description.py b/pymongo/server_description.py index 0787cfa2e..db478b0bb 100644 --- a/pymongo/server_description.py +++ b/pymongo/server_description.py @@ -54,16 +54,8 @@ class ServerDescription(object): self._max_write_batch_size = ismaster.max_write_batch_size self._min_wire_version = ismaster.min_wire_version self._max_wire_version = ismaster.max_wire_version - - self._is_writable = self.server_type in ( - SERVER_TYPE.RSPrimary, - SERVER_TYPE.Standalone, - SERVER_TYPE.Mongos) - - self._is_readable = ( - self.server_type == SERVER_TYPE.RSSecondary - or self._is_writable) - + self._is_writable = ismaster.is_writable + self._is_readable = ismaster.is_readable self._round_trip_time = round_trip_time self._error = error