diff --git a/pymongo/message.py b/pymongo/message.py index 3e1efa27a..bb2a0f63e 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -966,7 +966,12 @@ class _OpReply(object): """Construct an _OpReply from raw bytes.""" # PYTHON-945: ignore starting_from field. flags, cursor_id, _, number_returned = cls.UNPACK(msg[:20]) - return cls(flags, cursor_id, number_returned, msg[20:]) + + documents = msg[20:] + if not isinstance(msg, bytes): + # msg is a memoryview in Python 3. + documents = documents.tobytes() + return cls(flags, cursor_id, number_returned, documents) def _first_batch(sock_info, db, coll, query, ntoreturn, diff --git a/pymongo/network.py b/pymongo/network.py index 7f248f1f0..e50caea25 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -34,6 +34,8 @@ try: except ImportError: _SELECT_ERROR = OSError +from bson.py3compat import PY3 + from pymongo import helpers, message from pymongo.common import MAX_MESSAGE_SIZE from pymongo.errors import (AutoReconnect, @@ -164,22 +166,48 @@ def receive_message(sock, request_id, max_message_size=MAX_MESSAGE_SIZE): return _OpReply.unpack(_receive_data_on_socket(sock, length - 16)) -def _receive_data_on_socket(sock, length): - msg = b"" - while length: - try: - chunk = sock.recv(length) - except (IOError, OSError) as exc: - if _errno_from_exception(exc) == errno.EINTR: - continue - raise - if chunk == b"": - raise AutoReconnect("connection closed") +# memoryview was introduced in Python 2.7 but we only use it on Python 3 +# because before 2.7.4 the struct module did not support memoryview: +# https://bugs.python.org/issue10212. +# In Jython, using slice assignment on a memoryview results in a +# NullPointerException. +if not PY3: + def _receive_data_on_socket(sock, length): + buf = bytearray(length) + i = 0 + while length: + try: + chunk = sock.recv(length) + except (IOError, OSError) as exc: + if _errno_from_exception(exc) == errno.EINTR: + continue + raise + if chunk == b"": + raise AutoReconnect("connection closed") - length -= len(chunk) - msg += chunk + buf[i:i + len(chunk)] = chunk + i += len(chunk) + length -= len(chunk) - return msg + return bytes(buf) +else: + def _receive_data_on_socket(sock, length): + buf = bytearray(length) + mv = memoryview(buf) + bytes_read = 0 + while bytes_read < length: + try: + chunk_length = sock.recv_into(mv[bytes_read:]) + except (IOError, OSError) as exc: + if _errno_from_exception(exc) == errno.EINTR: + continue + raise + if chunk_length == 0: + raise AutoReconnect("connection closed") + + bytes_read += chunk_length + + return mv def _errno_from_exception(exc):