From 2fc4282acd4e18bf1dcc76d9ac978b83ab37fa76 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Mon, 22 Jan 2024 10:53:53 -0800 Subject: [PATCH] PYTHON-4146 Improve GridFS upload performance by batch writing chunks with insert_many (#1478) --- gridfs/grid_file.py | 61 +++++++++++++++++++++++++++++--------- test/test_gridfs.py | 15 ++++++++-- test/test_gridfs_bucket.py | 40 +++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 16 deletions(-) diff --git a/gridfs/grid_file.py b/gridfs/grid_file.py index 16fa103fd..971170c6c 100644 --- a/gridfs/grid_file.py +++ b/gridfs/grid_file.py @@ -21,21 +21,23 @@ import math import os from typing import Any, Iterable, Mapping, NoReturn, Optional -from bson.binary import Binary from bson.int64 import Int64 from bson.objectid import ObjectId from gridfs.errors import CorruptGridFile, FileExists, NoFile from pymongo import ASCENDING from pymongo.client_session import ClientSession from pymongo.collection import Collection +from pymongo.common import MAX_MESSAGE_SIZE from pymongo.cursor import Cursor from pymongo.errors import ( + BulkWriteError, ConfigurationError, CursorNotFound, DuplicateKeyError, InvalidOperation, OperationFailure, ) +from pymongo.helpers import _check_write_command_response from pymongo.read_preferences import ReadPreference _SEEK_SET = os.SEEK_SET @@ -48,6 +50,13 @@ NEWLN = b"\n" """Default chunk size, in bytes.""" # Slightly under a power of 2, to work well with server's record allocations. DEFAULT_CHUNK_SIZE = 255 * 1024 +# The number of chunked bytes to buffer before calling insert_many. +_UPLOAD_BUFFER_SIZE = MAX_MESSAGE_SIZE +# The number of chunk documents to buffer before calling insert_many. +_UPLOAD_BUFFER_CHUNKS = 100000 +# Rough BSON overhead of a chunk document not including the chunk data itself. +# Essentially len(encode({"_id": ObjectId(), "files_id": ObjectId(), "n": 1, "data": ""})) +_CHUNK_OVERHEAD = 60 _C_INDEX: dict[str, Any] = {"files_id": ASCENDING, "n": ASCENDING} _F_INDEX: dict[str, Any] = {"filename": ASCENDING, "uploadDate": ASCENDING} @@ -198,6 +207,8 @@ class GridIn: object.__setattr__(self, "_chunk_number", 0) object.__setattr__(self, "_closed", False) object.__setattr__(self, "_ensured_index", False) + object.__setattr__(self, "_buffered_docs", []) + object.__setattr__(self, "_buffered_docs_size", 0) def __create_index(self, collection: Collection, index_key: Any, unique: bool) -> None: doc = collection.find_one(projection={"_id": 1}, session=self._session) @@ -249,6 +260,8 @@ class GridIn: _buffer: io.BytesIO _closed: bool + _buffered_docs: list[dict[str, Any]] + _buffered_docs_size: int def __getattr__(self, name: str) -> Any: if name in self._file: @@ -268,32 +281,52 @@ class GridIn: if self._closed: self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}}) - def __flush_data(self, data: Any) -> None: + def __flush_data(self, data: Any, force: bool = False) -> None: """Flush `data` to a chunk.""" self.__ensure_indexes() - if not data: - return assert len(data) <= self.chunk_size - - chunk = {"files_id": self._file["_id"], "n": self._chunk_number, "data": Binary(data)} - - try: - self._chunks.insert_one(chunk, session=self._session) - except DuplicateKeyError: - self._raise_file_exists(self._file["_id"]) + if data: + self._buffered_docs.append( + {"files_id": self._file["_id"], "n": self._chunk_number, "data": data} + ) + self._buffered_docs_size += len(data) + _CHUNK_OVERHEAD + if not self._buffered_docs: + return + # Limit to 100,000 chunks or 32MB (+1 chunk) of data. + if ( + force + or self._buffered_docs_size >= _UPLOAD_BUFFER_SIZE + or len(self._buffered_docs) >= _UPLOAD_BUFFER_CHUNKS + ): + try: + self._chunks.insert_many(self._buffered_docs, session=self._session) + except BulkWriteError as exc: + # For backwards compatibility, raise an insert_one style exception. + write_errors = exc.details["writeErrors"] + for err in write_errors: + if err.get("code") in (11000, 11001, 12582): # Duplicate key errors + self._raise_file_exists(self._file["_id"]) + result = {"writeErrors": write_errors} + wces = exc.details["writeConcernErrors"] + if wces: + result["writeConcernError"] = wces[-1] + _check_write_command_response(result) + raise + self._buffered_docs = [] + self._buffered_docs_size = 0 self._chunk_number += 1 self._position += len(data) - def __flush_buffer(self) -> None: + def __flush_buffer(self, force: bool = False) -> None: """Flush the buffer contents out to a chunk.""" - self.__flush_data(self._buffer.getvalue()) + self.__flush_data(self._buffer.getvalue(), force=force) self._buffer.close() self._buffer = io.BytesIO() def __flush(self) -> Any: """Flush the file to the database.""" try: - self.__flush_buffer() + self.__flush_buffer(force=True) # The GridFS spec says length SHOULD be an Int64. self._file["length"] = Int64(self._position) self._file["uploadDate"] = datetime.datetime.now(tz=datetime.timezone.utc) diff --git a/test/test_gridfs.py b/test/test_gridfs.py index f94736708..88fccd654 100644 --- a/test/test_gridfs.py +++ b/test/test_gridfs.py @@ -21,6 +21,7 @@ import sys import threading import time from io import BytesIO +from unittest.mock import patch sys.path[0:0] = [""] @@ -30,7 +31,7 @@ from test.utils import joinall, one, rs_client, rs_or_single_client, single_clie import gridfs from bson.binary import Binary from gridfs.errors import CorruptGridFile, FileExists, NoFile -from gridfs.grid_file import GridOutCursor +from gridfs.grid_file import DEFAULT_CHUNK_SIZE, GridOutCursor from pymongo.database import Database from pymongo.errors import ( ConfigurationError, @@ -344,8 +345,18 @@ class TestGridfs(IntegrationTest): one.write(b"some content") one.close() + # Attempt to upload a file with more chunks to the same _id. + with patch("gridfs.grid_file._UPLOAD_BUFFER_SIZE", DEFAULT_CHUNK_SIZE): + two = self.fs.new_file(_id=123) + self.assertRaises(FileExists, two.write, b"x" * DEFAULT_CHUNK_SIZE * 3) + # Original file is still readable (no extra chunks were uploaded). + self.assertEqual(self.fs.get(123).read(), b"some content") + two = self.fs.new_file(_id=123) - self.assertRaises(FileExists, two.write, b"x" * 262146) + two.write(b"some content") + self.assertRaises(FileExists, two.close) + # Original file is still readable. + self.assertEqual(self.fs.get(123).read(), b"some content") def test_exists(self): oid = self.fs.put(b"hello") diff --git a/test/test_gridfs_bucket.py b/test/test_gridfs_bucket.py index 53e5cad54..f1e7800ce 100644 --- a/test/test_gridfs_bucket.py +++ b/test/test_gridfs_bucket.py @@ -18,9 +18,14 @@ from __future__ import annotations import datetime import itertools +import sys import threading import time from io import BytesIO +from unittest.mock import patch + +sys.path[0:0] = [""] + from test import IntegrationTest, client_context, unittest from test.utils import joinall, one, rs_client, rs_or_single_client, single_client @@ -34,6 +39,7 @@ from pymongo.errors import ( ConfigurationError, NotPrimaryError, ServerSelectionTimeoutError, + WriteConcernError, ) from pymongo.mongo_client import MongoClient from pymongo.read_preferences import ReadPreference @@ -276,6 +282,39 @@ class TestGridfs(IntegrationTest): ) self.assertEqual(b"custom id", self.fs.open_download_stream(oid).read()) + @patch("gridfs.grid_file._UPLOAD_BUFFER_CHUNKS", 3) + @client_context.require_failCommand_fail_point + def test_upload_bulk_write_error(self): + # Test BulkWriteError from insert_many is converted to an insert_one style error. + expected_wce = { + "code": 100, + "codeName": "UnsatisfiableWriteConcern", + "errmsg": "Not enough data-bearing nodes", + } + cause_wce = { + "configureFailPoint": "failCommand", + "mode": {"times": 2}, + "data": {"failCommands": ["insert"], "writeConcernError": expected_wce}, + } + gin = self.fs.open_upload_stream("test_file", chunk_size_bytes=1) + with self.fail_point(cause_wce): + # Assert we raise WriteConcernError, not BulkWriteError. + with self.assertRaises(WriteConcernError): + gin.write(b"hello world") + # 3 chunks were uploaded. + self.assertEqual(3, self.db.fs.chunks.count_documents({"files_id": gin._id})) + gin.abort() + + @patch("gridfs.grid_file._UPLOAD_BUFFER_CHUNKS", 10) + def test_upload_batching(self): + with self.fs.open_upload_stream("test_file", chunk_size_bytes=1) as gin: + gin.write(b"s" * (10 - 1)) + # No chunks were uploaded yet. + self.assertEqual(0, self.db.fs.chunks.count_documents({"files_id": gin._id})) + gin.write(b"s") + # All chunks were uploaded since we hit the _UPLOAD_BUFFER_CHUNKS limit. + self.assertEqual(10, self.db.fs.chunks.count_documents({"files_id": gin._id})) + def test_open_upload_stream(self): gin = self.fs.open_upload_stream("from_stream") gin.write(b"from stream") @@ -362,6 +401,7 @@ class TestGridfs(IntegrationTest): self.assertRaises(NoFile, self.fs.open_download_stream_by_name, "first_name") self.assertEqual(b"testing", self.fs.open_download_stream_by_name("second_name").read()) + @patch("gridfs.grid_file._UPLOAD_BUFFER_SIZE", 5) def test_abort(self): gin = self.fs.open_upload_stream("test_filename", chunk_size_bytes=5) gin.write(b"test1")