PYTHON-1513 Efficiently read data using memoryview and recv_into
On Python 2, preallocate a bytearray and use slice assignment instead of using str +=. On Python 3, preallocate a bytearray and use a memoryview with recv_into.
This commit is contained in:
parent
9dd16cefc4
commit
04693ae33a
@ -966,7 +966,12 @@ class _OpReply(object):
|
||||
"""Construct an _OpReply from raw bytes."""
|
||||
# PYTHON-945: ignore starting_from field.
|
||||
flags, cursor_id, _, number_returned = cls.UNPACK(msg[:20])
|
||||
return cls(flags, cursor_id, number_returned, msg[20:])
|
||||
|
||||
documents = msg[20:]
|
||||
if not isinstance(msg, bytes):
|
||||
# msg is a memoryview in Python 3.
|
||||
documents = documents.tobytes()
|
||||
return cls(flags, cursor_id, number_returned, documents)
|
||||
|
||||
|
||||
def _first_batch(sock_info, db, coll, query, ntoreturn,
|
||||
|
||||
@ -34,6 +34,8 @@ try:
|
||||
except ImportError:
|
||||
_SELECT_ERROR = OSError
|
||||
|
||||
from bson.py3compat import PY3
|
||||
|
||||
from pymongo import helpers, message
|
||||
from pymongo.common import MAX_MESSAGE_SIZE
|
||||
from pymongo.errors import (AutoReconnect,
|
||||
@ -164,22 +166,48 @@ def receive_message(sock, request_id, max_message_size=MAX_MESSAGE_SIZE):
|
||||
return _OpReply.unpack(_receive_data_on_socket(sock, length - 16))
|
||||
|
||||
|
||||
def _receive_data_on_socket(sock, length):
|
||||
msg = b""
|
||||
while length:
|
||||
try:
|
||||
chunk = sock.recv(length)
|
||||
except (IOError, OSError) as exc:
|
||||
if _errno_from_exception(exc) == errno.EINTR:
|
||||
continue
|
||||
raise
|
||||
if chunk == b"":
|
||||
raise AutoReconnect("connection closed")
|
||||
# memoryview was introduced in Python 2.7 but we only use it on Python 3
|
||||
# because before 2.7.4 the struct module did not support memoryview:
|
||||
# https://bugs.python.org/issue10212.
|
||||
# In Jython, using slice assignment on a memoryview results in a
|
||||
# NullPointerException.
|
||||
if not PY3:
|
||||
def _receive_data_on_socket(sock, length):
|
||||
buf = bytearray(length)
|
||||
i = 0
|
||||
while length:
|
||||
try:
|
||||
chunk = sock.recv(length)
|
||||
except (IOError, OSError) as exc:
|
||||
if _errno_from_exception(exc) == errno.EINTR:
|
||||
continue
|
||||
raise
|
||||
if chunk == b"":
|
||||
raise AutoReconnect("connection closed")
|
||||
|
||||
length -= len(chunk)
|
||||
msg += chunk
|
||||
buf[i:i + len(chunk)] = chunk
|
||||
i += len(chunk)
|
||||
length -= len(chunk)
|
||||
|
||||
return msg
|
||||
return bytes(buf)
|
||||
else:
|
||||
def _receive_data_on_socket(sock, length):
|
||||
buf = bytearray(length)
|
||||
mv = memoryview(buf)
|
||||
bytes_read = 0
|
||||
while bytes_read < length:
|
||||
try:
|
||||
chunk_length = sock.recv_into(mv[bytes_read:])
|
||||
except (IOError, OSError) as exc:
|
||||
if _errno_from_exception(exc) == errno.EINTR:
|
||||
continue
|
||||
raise
|
||||
if chunk_length == 0:
|
||||
raise AutoReconnect("connection closed")
|
||||
|
||||
bytes_read += chunk_length
|
||||
|
||||
return mv
|
||||
|
||||
|
||||
def _errno_from_exception(exc):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user