diff --git a/bson/__init__.py b/bson/__init__.py index dc2e29238..c6a81d97e 100644 --- a/bson/__init__.py +++ b/bson/__init__.py @@ -128,7 +128,6 @@ if TYPE_CHECKING: from array import array from mmap import mmap - try: from bson import _cbson # type: ignore[attr-defined] @@ -520,19 +519,32 @@ _ELEMENT_GETTER: Dict[int, Callable[..., Tuple[Any, int]]] = { if _USE_C: def _element_to_dict( - data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions + data: Any, + view: Any, + position: int, + obj_end: int, + opts: CodecOptions, + raw_array: bool = False, ) -> Any: - return _cbson._element_to_dict(data, position, obj_end, opts) + return _cbson._element_to_dict(data, position, obj_end, opts, raw_array) else: def _element_to_dict( - data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions + data: Any, + view: Any, + position: int, + obj_end: int, + opts: CodecOptions, + raw_array: bool = False, ) -> Any: """Decode a single key, value pair.""" element_type = data[position] position += 1 element_name, position = _get_c_string(data, view, position, opts) + if raw_array and element_type == ord(BSONARR): + _, end = _get_object_size(data, position, len(data)) + return element_name, view[position : end + 1], end + 1 try: value, position = _ELEMENT_GETTER[element_type]( data, view, position, obj_end, opts, element_name @@ -551,20 +563,30 @@ else: _T = TypeVar("_T", bound=MutableMapping[Any, Any]) -def _raw_to_dict(data: Any, position: int, obj_end: int, opts: CodecOptions, result: _T) -> _T: +def _raw_to_dict( + data: Any, position: int, obj_end: int, opts: CodecOptions, result: _T, raw_array: bool = False +) -> _T: data, view = get_data_and_view(data) - return _elements_to_dict(data, view, position, obj_end, opts, result) + return _elements_to_dict(data, view, position, obj_end, opts, result, raw_array=raw_array) def _elements_to_dict( - data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, result: Any = None + data: Any, + view: Any, + position: int, + obj_end: int, + opts: CodecOptions, + result: Any = None, + raw_array: bool = False, ) -> Any: """Decode a BSON document into result.""" if result is None: result = opts.document_class() end = obj_end - 1 while position < end: - key, value, position = _element_to_dict(data, view, position, obj_end, opts) + key, value, position = _element_to_dict( + data, view, position, obj_end, opts, raw_array=raw_array + ) result[key] = value if position != obj_end: raise InvalidBSON("bad object or element length") @@ -1119,14 +1141,44 @@ def _decode_selective(rawdoc: Any, fields: Any, codec_options: Any) -> Mapping[A return doc +def _array_of_documents_to_buffer(view: memoryview) -> bytes: + # Extract the raw bytes of each document. + position = 0 + _, end = _get_object_size(view, position, len(view)) + position += 4 + buffers: List[memoryview] = [] + append = buffers.append + while position < end - 1: + # Just skip the keys. + while view[position] != 0: + position += 1 + position += 1 + obj_size, _ = _get_object_size(view, position, end) + append(view[position : position + obj_size]) + position += obj_size + if position != end: + raise InvalidBSON("bad object or element length") + return b"".join(buffers) + + +if _USE_C: + _array_of_documents_to_buffer = _cbson._array_of_documents_to_buffer # noqa: F811 + + def _convert_raw_document_lists_to_streams(document: Any) -> None: + """Convert raw array of documents to a stream of BSON documents.""" cursor = document.get("cursor") - if cursor: - for key in ("firstBatch", "nextBatch"): - batch = cursor.get(key) - if batch: - stream = b"".join(doc.raw for doc in batch) - cursor[key] = [stream] + if not cursor: + return + for key in ("firstBatch", "nextBatch"): + batch = cursor.get(key) + if not batch: + continue + data = _array_of_documents_to_buffer(batch) + if data: + cursor[key] = [data] + else: + cursor[key] = [] def _decode_all_selective(data: Any, codec_options: CodecOptions, fields: Any) -> List[Any]: diff --git a/bson/_cbsonmodule.c b/bson/_cbsonmodule.c index 019f049bb..8678e8050 100644 --- a/bson/_cbsonmodule.c +++ b/bson/_cbsonmodule.c @@ -1615,7 +1615,7 @@ invalid: static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer, unsigned* position, unsigned char type, - unsigned max, const codec_options_t* options) { + unsigned max, const codec_options_t* options, int raw_array) { struct module_state *state = GETSTATE(self); PyObject* value = NULL; switch (type) { @@ -1712,11 +1712,20 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer, if (size < BSON_MIN_SIZE || max < size) { goto invalid; } + end = *position + size - 1; /* Check for bad eoo */ if (buffer[end]) { goto invalid; } + + if (raw_array != 0) { + // Treat it as a binary buffer. + value = PyBytes_FromStringAndSize(buffer + *position, size); + *position += size; + break; + } + *position += 4; value = PyList_New(0); @@ -1740,7 +1749,7 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer, goto invalid; } to_append = get_value(self, name, buffer, position, bson_type, - max - (unsigned)key_size, options); + max - (unsigned)key_size, options, raw_array); Py_LeaveRecursiveCall(); if (!to_append) { Py_DECREF(value); @@ -2464,6 +2473,7 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer, static int _element_to_dict(PyObject* self, const char* string, unsigned position, unsigned max, const codec_options_t* options, + int raw_array, PyObject** name, PyObject** value) { unsigned char type = (unsigned char)string[position++]; size_t name_length = strlen(string + position); @@ -2504,7 +2514,7 @@ static int _element_to_dict(PyObject* self, const char* string, } position += (unsigned)name_length + 1; *value = get_value(self, *name, string, &position, type, - max - position, options); + max - position, options, raw_array); if (!*value) { Py_DECREF(*name); return -1; @@ -2520,12 +2530,13 @@ static PyObject* _cbson_element_to_dict(PyObject* self, PyObject* args) { unsigned position; unsigned max; int new_position; + int raw_array = 0; PyObject* name; PyObject* value; PyObject* result_tuple; - if (!PyArg_ParseTuple(args, "OIIO&", &bson, &position, &max, - convert_codec_options, &options)) { + if (!PyArg_ParseTuple(args, "OIIO&p", &bson, &position, &max, + convert_codec_options, &options, &raw_array)) { return NULL; } @@ -2535,8 +2546,7 @@ static PyObject* _cbson_element_to_dict(PyObject* self, PyObject* args) { } string = PyBytes_AS_STRING(bson); - new_position = _element_to_dict(self, string, position, max, &options, - &name, &value); + new_position = _element_to_dict(self, string, position, max, &options, raw_array, &name, &value); if (new_position < 0) { return NULL; } @@ -2560,13 +2570,14 @@ static PyObject* _elements_to_dict(PyObject* self, const char* string, if (!dict) { return NULL; } + int raw_array = 0; while (position < max) { PyObject* name = NULL; PyObject* value = NULL; int new_position; new_position = _element_to_dict( - self, string, position, max, options, &name, &value); + self, string, position, max, options, raw_array, &name, &value); if (new_position < 0) { Py_DECREF(dict); return NULL; @@ -2649,7 +2660,6 @@ static PyObject* _cbson_bson_to_dict(PyObject* self, PyObject* args) { } string = (char*)view.buf; - memcpy(&size, string, 4); size = (int32_t)BSON_UINT32_FROM_LE(size); if (size < BSON_MIN_SIZE) { @@ -2797,6 +2807,124 @@ done: return result; } + +static PyObject* _cbson_array_of_documents_to_buffer(PyObject* self, PyObject* args) { + uint32_t size; + uint32_t value_length; + uint32_t position = 0; + buffer_t buffer; + const char* string; + PyObject* arr; + PyObject* result = NULL; + Py_buffer view = {0}; + + if (!PyArg_ParseTuple(args, "O", &arr)) { + return NULL; + } + + if (!_get_buffer(arr, &view)) { + return NULL; + } + + buffer = pymongo_buffer_new(); + if (!buffer) { + PyBuffer_Release(&view); + return NULL; + } + + string = (char*)view.buf; + + if (view.len < BSON_MIN_SIZE) { + PyObject* InvalidBSON = _error("InvalidBSON"); + if (InvalidBSON) { + PyErr_SetString(InvalidBSON, + "not enough data for a BSON document"); + Py_DECREF(InvalidBSON); + } + goto done; + } + + memcpy(&size, string, 4); + size = BSON_UINT32_FROM_LE(size); + /* save space for length */ + if (pymongo_buffer_save_space(buffer, size) == -1) { + goto fail; + } + pymongo_buffer_update_position(buffer, 0); + + position += 4; + while (position < size - 1) { + // Verify the value is an object. + unsigned char type = (unsigned char)string[position]; + if (type != 3) { + PyObject* InvalidBSON = _error("InvalidBSON"); + if (InvalidBSON) { + PyErr_SetString(InvalidBSON, "array element was not an object"); + Py_DECREF(InvalidBSON); + } + goto fail; + } + + // Just skip the keys. + position = position + strlen(string + position) + 1; + + if (position >= size || (size - position) < BSON_MIN_SIZE) { + PyObject* InvalidBSON = _error("InvalidBSON"); + if (InvalidBSON) { + PyErr_SetString(InvalidBSON, "invalid array content"); + Py_DECREF(InvalidBSON); + } + goto fail; + } + + memcpy(&value_length, string + position, 4); + value_length = BSON_UINT32_FROM_LE(value_length); + if (value_length < BSON_MIN_SIZE) { + PyObject* InvalidBSON = _error("InvalidBSON"); + if (InvalidBSON) { + PyErr_SetString(InvalidBSON, "invalid message size"); + Py_DECREF(InvalidBSON); + } + goto fail; + } + + if (view.len < size) { + PyObject* InvalidBSON = _error("InvalidBSON"); + if (InvalidBSON) { + PyErr_SetString(InvalidBSON, "objsize too large"); + Py_DECREF(InvalidBSON); + } + goto fail; + } + + if (string[size - 1]) { + PyObject* InvalidBSON = _error("InvalidBSON"); + if (InvalidBSON) { + PyErr_SetString(InvalidBSON, "bad eoo"); + Py_DECREF(InvalidBSON); + } + goto fail; + } + + if (pymongo_buffer_write(buffer, string + position, value_length) == 1) { + goto fail; + } + position += value_length; + } + + /* objectify buffer */ + result = Py_BuildValue("y#", pymongo_buffer_get_buffer(buffer), + (Py_ssize_t)pymongo_buffer_get_position(buffer)); + goto done; +fail: + result = NULL; +done: + PyBuffer_Release(&view); + pymongo_buffer_free(buffer); + return result; +} + + static PyMethodDef _CBSONMethods[] = { {"_dict_to_bson", _cbson_dict_to_bson, METH_VARARGS, "convert a dictionary to a string containing its BSON representation."}, @@ -2806,6 +2934,7 @@ static PyMethodDef _CBSONMethods[] = { "convert binary data to a sequence of documents."}, {"_element_to_dict", _cbson_element_to_dict, METH_VARARGS, "Decode a single key, value pair."}, + {"_array_of_documents_to_buffer", _cbson_array_of_documents_to_buffer, METH_VARARGS, "Convert raw array of documents to a stream of BSON documents"}, {NULL, NULL, 0, NULL} }; diff --git a/bson/raw_bson.py b/bson/raw_bson.py index ca7207f0a..6a80ea70c 100644 --- a/bson/raw_bson.py +++ b/bson/raw_bson.py @@ -60,6 +60,23 @@ from bson.codec_options import CodecOptions from bson.son import SON +def _inflate_bson( + bson_bytes: bytes, codec_options: CodecOptions, raw_array: bool = False +) -> Mapping[Any, Any]: + """Inflates the top level fields of a BSON document. + + :Parameters: + - `bson_bytes`: the BSON bytes that compose this document + - `codec_options`: An instance of + :class:`~bson.codec_options.CodecOptions` whose ``document_class`` + must be :class:`RawBSONDocument`. + """ + # Use SON to preserve ordering of elements. + return _raw_to_dict( + bson_bytes, 4, len(bson_bytes) - 1, codec_options, SON(), raw_array=raw_array + ) + + class RawBSONDocument(Mapping[str, Any]): """Representation for a MongoDB document that provides access to the raw BSON bytes that compose it. @@ -111,7 +128,7 @@ class RawBSONDocument(Mapping[str, Any]): # it refers to this class RawBSONDocument. if codec_options is None: codec_options = DEFAULT_RAW_BSON_OPTIONS - elif codec_options.document_class is not RawBSONDocument: + elif not issubclass(codec_options.document_class, RawBSONDocument): raise TypeError( "RawBSONDocument cannot use CodecOptions with document " "class %s" % (codec_options.document_class,) @@ -135,9 +152,13 @@ class RawBSONDocument(Mapping[str, Any]): # We already validated the object's size when this document was # created, so no need to do that again. # Use SON to preserve ordering of elements. - self.__inflated_doc = _inflate_bson(self.__raw, self.__codec_options) + self.__inflated_doc = self._inflate_bson(self.__raw, self.__codec_options) return self.__inflated_doc + @staticmethod + def _inflate_bson(bson_bytes: bytes, codec_options: CodecOptions) -> Mapping[Any, Any]: + return _inflate_bson(bson_bytes, codec_options) + def __getitem__(self, item: str) -> Any: return self.__inflated[item] @@ -153,23 +174,23 @@ class RawBSONDocument(Mapping[str, Any]): return NotImplemented def __repr__(self): - return "RawBSONDocument(%r, codec_options=%r)" % (self.raw, self.__codec_options) + return "%s(%r, codec_options=%r)" % ( + self.__class__.__name__, + self.raw, + self.__codec_options, + ) -def _inflate_bson(bson_bytes: bytes, codec_options: CodecOptions) -> Mapping[Any, Any]: - """Inflates the top level fields of a BSON document. +class _RawArrayBSONDocument(RawBSONDocument): + """A RawBSONDocument that only expands sub-documents and arrays when accessed.""" - :Parameters: - - `bson_bytes`: the BSON bytes that compose this document - - `codec_options`: An instance of - :class:`~bson.codec_options.CodecOptions` whose ``document_class`` - must be :class:`RawBSONDocument`. - """ - # Use SON to preserve ordering of elements. - return _raw_to_dict(bson_bytes, 4, len(bson_bytes) - 1, codec_options, SON()) + @staticmethod + def _inflate_bson(bson_bytes: bytes, codec_options: CodecOptions) -> Mapping[Any, Any]: + return _inflate_bson(bson_bytes, codec_options, raw_array=True) DEFAULT_RAW_BSON_OPTIONS: CodecOptions = DEFAULT.with_options(document_class=RawBSONDocument) +_RAW_ARRAY_BSON_OPTIONS: CodecOptions = DEFAULT.with_options(document_class=_RawArrayBSONDocument) """The default :class:`~bson.codec_options.CodecOptions` for :class:`RawBSONDocument`. """ diff --git a/pymongo/message.py b/pymongo/message.py index 8f37fdc06..960832cb9 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -29,7 +29,12 @@ from typing import Any, Dict, NoReturn import bson from bson import CodecOptions, _decode_selective, _dict_to_bson, _make_c_string, encode from bson.int64 import Int64 -from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument, _inflate_bson +from bson.raw_bson import ( + _RAW_ARRAY_BSON_OPTIONS, + DEFAULT_RAW_BSON_OPTIONS, + RawBSONDocument, + _inflate_bson, +) from bson.son import SON try: @@ -1379,7 +1384,7 @@ class _OpMsg(object): user_fields is used to determine which fields must not be decoded """ inflated_response = _decode_selective( - RawBSONDocument(self.payload_document), user_fields, DEFAULT_RAW_BSON_OPTIONS + RawBSONDocument(self.payload_document), user_fields, _RAW_ARRAY_BSON_OPTIONS ) return [inflated_response]