PYTHON-857 - Use client._get_socket_for_writes for OP_INSERT.

We must get a socket *before* choosing whether to use write commands or legacy
writes. I will update the code in steps, getting a socket earlier and earlier
in the write path.

This commit gets a socket just before sending OP_INSERT in _do_batched_insert,
and temporarily disables the C version of _do_batched_insert.
This commit is contained in:
A. Jesse Jiryu Davis 2015-03-16 12:34:38 -04:00
parent 54b52f2abd
commit 27c961a2ab
6 changed files with 107 additions and 17 deletions

View File

@ -131,6 +131,7 @@ def _check_command_response(response, msg=None, allowable_errors=None):
response.get("code"),
response)
# TODO: remove, this is moving to _check_gle_response
if response.get("wtimeout", False):
# MongoDB versions before 1.8.0 return the error message in an "errmsg"
# field. If "errmsg" exists "err" will also exist set to None, so we
@ -179,6 +180,46 @@ def _check_command_response(response, msg=None, allowable_errors=None):
raise OperationFailure(msg % errmsg, code, response)
def _check_gle_response(response):
"""Return getlasterror response as a dict, or raise OperationFailure."""
response = _unpack_response(response)
assert response["number_returned"] == 1
result = response["data"][0]
# Did getlasterror itself fail?
_check_command_response(result)
if result.get("wtimeout", False):
# MongoDB versions before 1.8.0 return the error message in an "errmsg"
# field. If "errmsg" exists "err" will also exist set to None, so we
# have to check for "errmsg" first.
raise WTimeoutError(result.get("errmsg", result.get("err")),
result.get("code"),
result)
error_msg = result.get("err", "")
if error_msg is None:
return result
if error_msg.startswith("not master"):
raise NotMasterError(error_msg)
details = result
# mongos returns the error code in an error object for some errors.
if "errObjects" in result:
for errobj in result["errObjects"]:
if errobj.get("err") == error_msg:
details = errobj
break
code = details.get("code")
if code in (11000, 11001, 12582):
raise DuplicateKeyError(details["err"], code, result)
raise OperationFailure(details["err"], code, result)
def _command(client, namespace, command, read_preference,
codec_options, check=True, allowable_errors=None):
"""Internal command helper."""

View File

@ -46,12 +46,20 @@ def _get_server_type(doc):
class IsMaster(object):
__slots__ = ('_doc', '_server_type')
__slots__ = ('_doc', '_server_type', '_is_writable', '_is_readable')
def __init__(self, doc):
"""Parse an ismaster response from the server."""
self._server_type = _get_server_type(doc)
self._doc = doc
self._is_writable = self._server_type in (
SERVER_TYPE.RSPrimary,
SERVER_TYPE.Standalone,
SERVER_TYPE.Mongos)
self._is_readable = (
self.server_type == SERVER_TYPE.RSSecondary
or self._is_writable)
@property
def server_type(self):
@ -102,3 +110,11 @@ class IsMaster(object):
@property
def max_wire_version(self):
return self._doc.get('maxWireVersion', common.MAX_WIRE_VERSION)
@property
def is_writable(self):
return self._is_writable
@property
def is_readable(self):
return self._is_readable

View File

@ -280,8 +280,9 @@ def _do_batched_insert(collection_name, docs, check_keys,
if has_docs:
# We have enough data, send this message.
try:
client._send_message(_insert_message(data.getvalue(),
send_safe), send_safe)
request_id, msg = _insert_message(data.getvalue(), send_safe)
with client._get_socket_for_writes() as sock_info:
sock_info.legacy_write(msg, request_id, send_safe)
# Exception type could be OperationFailure or a subtype
# (e.g. DuplicateKeyError)
except OperationFailure as exc:
@ -311,13 +312,16 @@ def _do_batched_insert(collection_name, docs, check_keys,
if not has_docs:
raise InvalidOperation("cannot do an empty bulk insert")
client._send_message(_insert_message(data.getvalue(), safe), safe)
request_id, msg = _insert_message(data.getvalue(), safe)
with client._get_socket_for_writes() as sock_info:
sock_info.legacy_write(msg, request_id, safe)
# Re-raise any exception stored due to continue_on_error
if last_error is not None:
raise last_error
if _use_c:
_do_batched_insert = _cmessage._do_batched_insert
# TODO: reenable
# if _use_c:
# _do_batched_insert = _cmessage._do_batched_insert
def _do_batched_write_command(namespace, operation, command,

View File

@ -31,6 +31,7 @@ access:
Database(MongoClient('localhost', 27017), u'test-database')
"""
import contextlib
import datetime
import threading
import warnings
@ -671,7 +672,22 @@ class MongoClient(common.BaseObject):
self._topology.open()
return self._topology
@contextlib.contextmanager
def _get_socket_for_writes(self):
server = self._get_topology().select_server(writable_server_selector)
with server.get_socket(self.__all_credentials) as sock_info:
# TODO: refactor with _reset_on_error
try:
yield sock_info
except NetworkTimeout:
# The socket has been closed. Don't reset the server.
raise
except ConnectionFailure:
self.__reset_server(server.description.address)
raise
def __check_gle_response(self, response, is_command):
# TODO: remove
"""Check a response to a lastError message for errors.
`response` is a byte string representing a response to the message.

View File

@ -18,10 +18,11 @@ import socket
import threading
from bson.py3compat import u, itervalues
from pymongo import auth, thread_util
from pymongo import auth, helpers, thread_util
from pymongo.errors import (AutoReconnect,
ConnectionFailure,
NetworkTimeout,
NotMasterError,
OperationFailure)
from pymongo.ismaster import IsMaster
from pymongo.monotonic import time as _time
@ -183,6 +184,26 @@ class SocketInfo(object):
except BaseException as error:
self._raise_connection_failure(error)
def legacy_write(self, message, request_id, with_last_error):
"""Send OP_INSERT, etc., optionally returning response as a dict.
Can raise ConnectionFailure or OperationFailure.
:Parameters:
- `message`: bytes, an OP_INSERT, OP_UPDATE, or OP_DELETE message,
perhaps with a getlasterror command appended.
- `request_id`: an int.
- `with_last_error`: True if a getlasterror command is appended.
"""
if not with_last_error and not self.ismaster.is_writable:
# Write won't succeed, bail as if we'd done a getlasterror.
raise NotMasterError("not master")
self.send_message(message)
if with_last_error:
response = self.receive_message(1, request_id)
return helpers._check_gle_response(response)
def check_auth(self, all_credentials):
"""Update this socket's authentication.

View File

@ -54,16 +54,8 @@ class ServerDescription(object):
self._max_write_batch_size = ismaster.max_write_batch_size
self._min_wire_version = ismaster.min_wire_version
self._max_wire_version = ismaster.max_wire_version
self._is_writable = self.server_type in (
SERVER_TYPE.RSPrimary,
SERVER_TYPE.Standalone,
SERVER_TYPE.Mongos)
self._is_readable = (
self.server_type == SERVER_TYPE.RSSecondary
or self._is_writable)
self._is_writable = ismaster.is_writable
self._is_readable = ismaster.is_readable
self._round_trip_time = round_trip_time
self._error = error