PYTHON-2722 Improve performance of find/aggregate_raw_batches (#1047)

This commit is contained in:
Steven Silvester 2022-09-22 15:14:40 -05:00 committed by GitHub
parent 0143881f02
commit 449cb8fb0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 245 additions and 38 deletions

View File

@ -128,7 +128,6 @@ if TYPE_CHECKING:
from array import array
from mmap import mmap
try:
from bson import _cbson # type: ignore[attr-defined]
@ -520,19 +519,32 @@ _ELEMENT_GETTER: Dict[int, Callable[..., Tuple[Any, int]]] = {
if _USE_C:
def _element_to_dict(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions
data: Any,
view: Any,
position: int,
obj_end: int,
opts: CodecOptions,
raw_array: bool = False,
) -> Any:
return _cbson._element_to_dict(data, position, obj_end, opts)
return _cbson._element_to_dict(data, position, obj_end, opts, raw_array)
else:
def _element_to_dict(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions
data: Any,
view: Any,
position: int,
obj_end: int,
opts: CodecOptions,
raw_array: bool = False,
) -> Any:
"""Decode a single key, value pair."""
element_type = data[position]
position += 1
element_name, position = _get_c_string(data, view, position, opts)
if raw_array and element_type == ord(BSONARR):
_, end = _get_object_size(data, position, len(data))
return element_name, view[position : end + 1], end + 1
try:
value, position = _ELEMENT_GETTER[element_type](
data, view, position, obj_end, opts, element_name
@ -551,20 +563,30 @@ else:
_T = TypeVar("_T", bound=MutableMapping[Any, Any])
def _raw_to_dict(data: Any, position: int, obj_end: int, opts: CodecOptions, result: _T) -> _T:
def _raw_to_dict(
data: Any, position: int, obj_end: int, opts: CodecOptions, result: _T, raw_array: bool = False
) -> _T:
data, view = get_data_and_view(data)
return _elements_to_dict(data, view, position, obj_end, opts, result)
return _elements_to_dict(data, view, position, obj_end, opts, result, raw_array=raw_array)
def _elements_to_dict(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, result: Any = None
data: Any,
view: Any,
position: int,
obj_end: int,
opts: CodecOptions,
result: Any = None,
raw_array: bool = False,
) -> Any:
"""Decode a BSON document into result."""
if result is None:
result = opts.document_class()
end = obj_end - 1
while position < end:
key, value, position = _element_to_dict(data, view, position, obj_end, opts)
key, value, position = _element_to_dict(
data, view, position, obj_end, opts, raw_array=raw_array
)
result[key] = value
if position != obj_end:
raise InvalidBSON("bad object or element length")
@ -1119,14 +1141,44 @@ def _decode_selective(rawdoc: Any, fields: Any, codec_options: Any) -> Mapping[A
return doc
def _array_of_documents_to_buffer(view: memoryview) -> bytes:
# Extract the raw bytes of each document.
position = 0
_, end = _get_object_size(view, position, len(view))
position += 4
buffers: List[memoryview] = []
append = buffers.append
while position < end - 1:
# Just skip the keys.
while view[position] != 0:
position += 1
position += 1
obj_size, _ = _get_object_size(view, position, end)
append(view[position : position + obj_size])
position += obj_size
if position != end:
raise InvalidBSON("bad object or element length")
return b"".join(buffers)
if _USE_C:
_array_of_documents_to_buffer = _cbson._array_of_documents_to_buffer # noqa: F811
def _convert_raw_document_lists_to_streams(document: Any) -> None:
"""Convert raw array of documents to a stream of BSON documents."""
cursor = document.get("cursor")
if cursor:
for key in ("firstBatch", "nextBatch"):
batch = cursor.get(key)
if batch:
stream = b"".join(doc.raw for doc in batch)
cursor[key] = [stream]
if not cursor:
return
for key in ("firstBatch", "nextBatch"):
batch = cursor.get(key)
if not batch:
continue
data = _array_of_documents_to_buffer(batch)
if data:
cursor[key] = [data]
else:
cursor[key] = []
def _decode_all_selective(data: Any, codec_options: CodecOptions, fields: Any) -> List[Any]:

View File

@ -1615,7 +1615,7 @@ invalid:
static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
unsigned* position, unsigned char type,
unsigned max, const codec_options_t* options) {
unsigned max, const codec_options_t* options, int raw_array) {
struct module_state *state = GETSTATE(self);
PyObject* value = NULL;
switch (type) {
@ -1712,11 +1712,20 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
if (size < BSON_MIN_SIZE || max < size) {
goto invalid;
}
end = *position + size - 1;
/* Check for bad eoo */
if (buffer[end]) {
goto invalid;
}
if (raw_array != 0) {
// Treat it as a binary buffer.
value = PyBytes_FromStringAndSize(buffer + *position, size);
*position += size;
break;
}
*position += 4;
value = PyList_New(0);
@ -1740,7 +1749,7 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
goto invalid;
}
to_append = get_value(self, name, buffer, position, bson_type,
max - (unsigned)key_size, options);
max - (unsigned)key_size, options, raw_array);
Py_LeaveRecursiveCall();
if (!to_append) {
Py_DECREF(value);
@ -2464,6 +2473,7 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
static int _element_to_dict(PyObject* self, const char* string,
unsigned position, unsigned max,
const codec_options_t* options,
int raw_array,
PyObject** name, PyObject** value) {
unsigned char type = (unsigned char)string[position++];
size_t name_length = strlen(string + position);
@ -2504,7 +2514,7 @@ static int _element_to_dict(PyObject* self, const char* string,
}
position += (unsigned)name_length + 1;
*value = get_value(self, *name, string, &position, type,
max - position, options);
max - position, options, raw_array);
if (!*value) {
Py_DECREF(*name);
return -1;
@ -2520,12 +2530,13 @@ static PyObject* _cbson_element_to_dict(PyObject* self, PyObject* args) {
unsigned position;
unsigned max;
int new_position;
int raw_array = 0;
PyObject* name;
PyObject* value;
PyObject* result_tuple;
if (!PyArg_ParseTuple(args, "OIIO&", &bson, &position, &max,
convert_codec_options, &options)) {
if (!PyArg_ParseTuple(args, "OIIO&p", &bson, &position, &max,
convert_codec_options, &options, &raw_array)) {
return NULL;
}
@ -2535,8 +2546,7 @@ static PyObject* _cbson_element_to_dict(PyObject* self, PyObject* args) {
}
string = PyBytes_AS_STRING(bson);
new_position = _element_to_dict(self, string, position, max, &options,
&name, &value);
new_position = _element_to_dict(self, string, position, max, &options, raw_array, &name, &value);
if (new_position < 0) {
return NULL;
}
@ -2560,13 +2570,14 @@ static PyObject* _elements_to_dict(PyObject* self, const char* string,
if (!dict) {
return NULL;
}
int raw_array = 0;
while (position < max) {
PyObject* name = NULL;
PyObject* value = NULL;
int new_position;
new_position = _element_to_dict(
self, string, position, max, options, &name, &value);
self, string, position, max, options, raw_array, &name, &value);
if (new_position < 0) {
Py_DECREF(dict);
return NULL;
@ -2649,7 +2660,6 @@ static PyObject* _cbson_bson_to_dict(PyObject* self, PyObject* args) {
}
string = (char*)view.buf;
memcpy(&size, string, 4);
size = (int32_t)BSON_UINT32_FROM_LE(size);
if (size < BSON_MIN_SIZE) {
@ -2797,6 +2807,124 @@ done:
return result;
}
static PyObject* _cbson_array_of_documents_to_buffer(PyObject* self, PyObject* args) {
uint32_t size;
uint32_t value_length;
uint32_t position = 0;
buffer_t buffer;
const char* string;
PyObject* arr;
PyObject* result = NULL;
Py_buffer view = {0};
if (!PyArg_ParseTuple(args, "O", &arr)) {
return NULL;
}
if (!_get_buffer(arr, &view)) {
return NULL;
}
buffer = pymongo_buffer_new();
if (!buffer) {
PyBuffer_Release(&view);
return NULL;
}
string = (char*)view.buf;
if (view.len < BSON_MIN_SIZE) {
PyObject* InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
PyErr_SetString(InvalidBSON,
"not enough data for a BSON document");
Py_DECREF(InvalidBSON);
}
goto done;
}
memcpy(&size, string, 4);
size = BSON_UINT32_FROM_LE(size);
/* save space for length */
if (pymongo_buffer_save_space(buffer, size) == -1) {
goto fail;
}
pymongo_buffer_update_position(buffer, 0);
position += 4;
while (position < size - 1) {
// Verify the value is an object.
unsigned char type = (unsigned char)string[position];
if (type != 3) {
PyObject* InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
PyErr_SetString(InvalidBSON, "array element was not an object");
Py_DECREF(InvalidBSON);
}
goto fail;
}
// Just skip the keys.
position = position + strlen(string + position) + 1;
if (position >= size || (size - position) < BSON_MIN_SIZE) {
PyObject* InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
PyErr_SetString(InvalidBSON, "invalid array content");
Py_DECREF(InvalidBSON);
}
goto fail;
}
memcpy(&value_length, string + position, 4);
value_length = BSON_UINT32_FROM_LE(value_length);
if (value_length < BSON_MIN_SIZE) {
PyObject* InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
PyErr_SetString(InvalidBSON, "invalid message size");
Py_DECREF(InvalidBSON);
}
goto fail;
}
if (view.len < size) {
PyObject* InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
PyErr_SetString(InvalidBSON, "objsize too large");
Py_DECREF(InvalidBSON);
}
goto fail;
}
if (string[size - 1]) {
PyObject* InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
PyErr_SetString(InvalidBSON, "bad eoo");
Py_DECREF(InvalidBSON);
}
goto fail;
}
if (pymongo_buffer_write(buffer, string + position, value_length) == 1) {
goto fail;
}
position += value_length;
}
/* objectify buffer */
result = Py_BuildValue("y#", pymongo_buffer_get_buffer(buffer),
(Py_ssize_t)pymongo_buffer_get_position(buffer));
goto done;
fail:
result = NULL;
done:
PyBuffer_Release(&view);
pymongo_buffer_free(buffer);
return result;
}
static PyMethodDef _CBSONMethods[] = {
{"_dict_to_bson", _cbson_dict_to_bson, METH_VARARGS,
"convert a dictionary to a string containing its BSON representation."},
@ -2806,6 +2934,7 @@ static PyMethodDef _CBSONMethods[] = {
"convert binary data to a sequence of documents."},
{"_element_to_dict", _cbson_element_to_dict, METH_VARARGS,
"Decode a single key, value pair."},
{"_array_of_documents_to_buffer", _cbson_array_of_documents_to_buffer, METH_VARARGS, "Convert raw array of documents to a stream of BSON documents"},
{NULL, NULL, 0, NULL}
};

View File

@ -60,6 +60,23 @@ from bson.codec_options import CodecOptions
from bson.son import SON
def _inflate_bson(
bson_bytes: bytes, codec_options: CodecOptions, raw_array: bool = False
) -> Mapping[Any, Any]:
"""Inflates the top level fields of a BSON document.
:Parameters:
- `bson_bytes`: the BSON bytes that compose this document
- `codec_options`: An instance of
:class:`~bson.codec_options.CodecOptions` whose ``document_class``
must be :class:`RawBSONDocument`.
"""
# Use SON to preserve ordering of elements.
return _raw_to_dict(
bson_bytes, 4, len(bson_bytes) - 1, codec_options, SON(), raw_array=raw_array
)
class RawBSONDocument(Mapping[str, Any]):
"""Representation for a MongoDB document that provides access to the raw
BSON bytes that compose it.
@ -111,7 +128,7 @@ class RawBSONDocument(Mapping[str, Any]):
# it refers to this class RawBSONDocument.
if codec_options is None:
codec_options = DEFAULT_RAW_BSON_OPTIONS
elif codec_options.document_class is not RawBSONDocument:
elif not issubclass(codec_options.document_class, RawBSONDocument):
raise TypeError(
"RawBSONDocument cannot use CodecOptions with document "
"class %s" % (codec_options.document_class,)
@ -135,9 +152,13 @@ class RawBSONDocument(Mapping[str, Any]):
# We already validated the object's size when this document was
# created, so no need to do that again.
# Use SON to preserve ordering of elements.
self.__inflated_doc = _inflate_bson(self.__raw, self.__codec_options)
self.__inflated_doc = self._inflate_bson(self.__raw, self.__codec_options)
return self.__inflated_doc
@staticmethod
def _inflate_bson(bson_bytes: bytes, codec_options: CodecOptions) -> Mapping[Any, Any]:
return _inflate_bson(bson_bytes, codec_options)
def __getitem__(self, item: str) -> Any:
return self.__inflated[item]
@ -153,23 +174,23 @@ class RawBSONDocument(Mapping[str, Any]):
return NotImplemented
def __repr__(self):
return "RawBSONDocument(%r, codec_options=%r)" % (self.raw, self.__codec_options)
return "%s(%r, codec_options=%r)" % (
self.__class__.__name__,
self.raw,
self.__codec_options,
)
def _inflate_bson(bson_bytes: bytes, codec_options: CodecOptions) -> Mapping[Any, Any]:
"""Inflates the top level fields of a BSON document.
class _RawArrayBSONDocument(RawBSONDocument):
"""A RawBSONDocument that only expands sub-documents and arrays when accessed."""
:Parameters:
- `bson_bytes`: the BSON bytes that compose this document
- `codec_options`: An instance of
:class:`~bson.codec_options.CodecOptions` whose ``document_class``
must be :class:`RawBSONDocument`.
"""
# Use SON to preserve ordering of elements.
return _raw_to_dict(bson_bytes, 4, len(bson_bytes) - 1, codec_options, SON())
@staticmethod
def _inflate_bson(bson_bytes: bytes, codec_options: CodecOptions) -> Mapping[Any, Any]:
return _inflate_bson(bson_bytes, codec_options, raw_array=True)
DEFAULT_RAW_BSON_OPTIONS: CodecOptions = DEFAULT.with_options(document_class=RawBSONDocument)
_RAW_ARRAY_BSON_OPTIONS: CodecOptions = DEFAULT.with_options(document_class=_RawArrayBSONDocument)
"""The default :class:`~bson.codec_options.CodecOptions` for
:class:`RawBSONDocument`.
"""

View File

@ -29,7 +29,12 @@ from typing import Any, Dict, NoReturn
import bson
from bson import CodecOptions, _decode_selective, _dict_to_bson, _make_c_string, encode
from bson.int64 import Int64
from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument, _inflate_bson
from bson.raw_bson import (
_RAW_ARRAY_BSON_OPTIONS,
DEFAULT_RAW_BSON_OPTIONS,
RawBSONDocument,
_inflate_bson,
)
from bson.son import SON
try:
@ -1379,7 +1384,7 @@ class _OpMsg(object):
user_fields is used to determine which fields must not be decoded
"""
inflated_response = _decode_selective(
RawBSONDocument(self.payload_document), user_fields, DEFAULT_RAW_BSON_OPTIONS
RawBSONDocument(self.payload_document), user_fields, _RAW_ARRAY_BSON_OPTIONS
)
return [inflated_response]