PYTHON-4146 Improve GridFS upload performance by batch writing chunks with insert_many (#1478)
This commit is contained in:
parent
c3458e9d8e
commit
2fc4282acd
@ -21,21 +21,23 @@ import math
|
||||
import os
|
||||
from typing import Any, Iterable, Mapping, NoReturn, Optional
|
||||
|
||||
from bson.binary import Binary
|
||||
from bson.int64 import Int64
|
||||
from bson.objectid import ObjectId
|
||||
from gridfs.errors import CorruptGridFile, FileExists, NoFile
|
||||
from pymongo import ASCENDING
|
||||
from pymongo.client_session import ClientSession
|
||||
from pymongo.collection import Collection
|
||||
from pymongo.common import MAX_MESSAGE_SIZE
|
||||
from pymongo.cursor import Cursor
|
||||
from pymongo.errors import (
|
||||
BulkWriteError,
|
||||
ConfigurationError,
|
||||
CursorNotFound,
|
||||
DuplicateKeyError,
|
||||
InvalidOperation,
|
||||
OperationFailure,
|
||||
)
|
||||
from pymongo.helpers import _check_write_command_response
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
|
||||
_SEEK_SET = os.SEEK_SET
|
||||
@ -48,6 +50,13 @@ NEWLN = b"\n"
|
||||
"""Default chunk size, in bytes."""
|
||||
# Slightly under a power of 2, to work well with server's record allocations.
|
||||
DEFAULT_CHUNK_SIZE = 255 * 1024
|
||||
# The number of chunked bytes to buffer before calling insert_many.
|
||||
_UPLOAD_BUFFER_SIZE = MAX_MESSAGE_SIZE
|
||||
# The number of chunk documents to buffer before calling insert_many.
|
||||
_UPLOAD_BUFFER_CHUNKS = 100000
|
||||
# Rough BSON overhead of a chunk document not including the chunk data itself.
|
||||
# Essentially len(encode({"_id": ObjectId(), "files_id": ObjectId(), "n": 1, "data": ""}))
|
||||
_CHUNK_OVERHEAD = 60
|
||||
|
||||
_C_INDEX: dict[str, Any] = {"files_id": ASCENDING, "n": ASCENDING}
|
||||
_F_INDEX: dict[str, Any] = {"filename": ASCENDING, "uploadDate": ASCENDING}
|
||||
@ -198,6 +207,8 @@ class GridIn:
|
||||
object.__setattr__(self, "_chunk_number", 0)
|
||||
object.__setattr__(self, "_closed", False)
|
||||
object.__setattr__(self, "_ensured_index", False)
|
||||
object.__setattr__(self, "_buffered_docs", [])
|
||||
object.__setattr__(self, "_buffered_docs_size", 0)
|
||||
|
||||
def __create_index(self, collection: Collection, index_key: Any, unique: bool) -> None:
|
||||
doc = collection.find_one(projection={"_id": 1}, session=self._session)
|
||||
@ -249,6 +260,8 @@ class GridIn:
|
||||
|
||||
_buffer: io.BytesIO
|
||||
_closed: bool
|
||||
_buffered_docs: list[dict[str, Any]]
|
||||
_buffered_docs_size: int
|
||||
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
if name in self._file:
|
||||
@ -268,32 +281,52 @@ class GridIn:
|
||||
if self._closed:
|
||||
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
|
||||
|
||||
def __flush_data(self, data: Any) -> None:
|
||||
def __flush_data(self, data: Any, force: bool = False) -> None:
|
||||
"""Flush `data` to a chunk."""
|
||||
self.__ensure_indexes()
|
||||
if not data:
|
||||
return
|
||||
assert len(data) <= self.chunk_size
|
||||
|
||||
chunk = {"files_id": self._file["_id"], "n": self._chunk_number, "data": Binary(data)}
|
||||
|
||||
try:
|
||||
self._chunks.insert_one(chunk, session=self._session)
|
||||
except DuplicateKeyError:
|
||||
self._raise_file_exists(self._file["_id"])
|
||||
if data:
|
||||
self._buffered_docs.append(
|
||||
{"files_id": self._file["_id"], "n": self._chunk_number, "data": data}
|
||||
)
|
||||
self._buffered_docs_size += len(data) + _CHUNK_OVERHEAD
|
||||
if not self._buffered_docs:
|
||||
return
|
||||
# Limit to 100,000 chunks or 32MB (+1 chunk) of data.
|
||||
if (
|
||||
force
|
||||
or self._buffered_docs_size >= _UPLOAD_BUFFER_SIZE
|
||||
or len(self._buffered_docs) >= _UPLOAD_BUFFER_CHUNKS
|
||||
):
|
||||
try:
|
||||
self._chunks.insert_many(self._buffered_docs, session=self._session)
|
||||
except BulkWriteError as exc:
|
||||
# For backwards compatibility, raise an insert_one style exception.
|
||||
write_errors = exc.details["writeErrors"]
|
||||
for err in write_errors:
|
||||
if err.get("code") in (11000, 11001, 12582): # Duplicate key errors
|
||||
self._raise_file_exists(self._file["_id"])
|
||||
result = {"writeErrors": write_errors}
|
||||
wces = exc.details["writeConcernErrors"]
|
||||
if wces:
|
||||
result["writeConcernError"] = wces[-1]
|
||||
_check_write_command_response(result)
|
||||
raise
|
||||
self._buffered_docs = []
|
||||
self._buffered_docs_size = 0
|
||||
self._chunk_number += 1
|
||||
self._position += len(data)
|
||||
|
||||
def __flush_buffer(self) -> None:
|
||||
def __flush_buffer(self, force: bool = False) -> None:
|
||||
"""Flush the buffer contents out to a chunk."""
|
||||
self.__flush_data(self._buffer.getvalue())
|
||||
self.__flush_data(self._buffer.getvalue(), force=force)
|
||||
self._buffer.close()
|
||||
self._buffer = io.BytesIO()
|
||||
|
||||
def __flush(self) -> Any:
|
||||
"""Flush the file to the database."""
|
||||
try:
|
||||
self.__flush_buffer()
|
||||
self.__flush_buffer(force=True)
|
||||
# The GridFS spec says length SHOULD be an Int64.
|
||||
self._file["length"] = Int64(self._position)
|
||||
self._file["uploadDate"] = datetime.datetime.now(tz=datetime.timezone.utc)
|
||||
|
||||
@ -21,6 +21,7 @@ import sys
|
||||
import threading
|
||||
import time
|
||||
from io import BytesIO
|
||||
from unittest.mock import patch
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
@ -30,7 +31,7 @@ from test.utils import joinall, one, rs_client, rs_or_single_client, single_clie
|
||||
import gridfs
|
||||
from bson.binary import Binary
|
||||
from gridfs.errors import CorruptGridFile, FileExists, NoFile
|
||||
from gridfs.grid_file import GridOutCursor
|
||||
from gridfs.grid_file import DEFAULT_CHUNK_SIZE, GridOutCursor
|
||||
from pymongo.database import Database
|
||||
from pymongo.errors import (
|
||||
ConfigurationError,
|
||||
@ -344,8 +345,18 @@ class TestGridfs(IntegrationTest):
|
||||
one.write(b"some content")
|
||||
one.close()
|
||||
|
||||
# Attempt to upload a file with more chunks to the same _id.
|
||||
with patch("gridfs.grid_file._UPLOAD_BUFFER_SIZE", DEFAULT_CHUNK_SIZE):
|
||||
two = self.fs.new_file(_id=123)
|
||||
self.assertRaises(FileExists, two.write, b"x" * DEFAULT_CHUNK_SIZE * 3)
|
||||
# Original file is still readable (no extra chunks were uploaded).
|
||||
self.assertEqual(self.fs.get(123).read(), b"some content")
|
||||
|
||||
two = self.fs.new_file(_id=123)
|
||||
self.assertRaises(FileExists, two.write, b"x" * 262146)
|
||||
two.write(b"some content")
|
||||
self.assertRaises(FileExists, two.close)
|
||||
# Original file is still readable.
|
||||
self.assertEqual(self.fs.get(123).read(), b"some content")
|
||||
|
||||
def test_exists(self):
|
||||
oid = self.fs.put(b"hello")
|
||||
|
||||
@ -18,9 +18,14 @@ from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import itertools
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from io import BytesIO
|
||||
from unittest.mock import patch
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from test import IntegrationTest, client_context, unittest
|
||||
from test.utils import joinall, one, rs_client, rs_or_single_client, single_client
|
||||
|
||||
@ -34,6 +39,7 @@ from pymongo.errors import (
|
||||
ConfigurationError,
|
||||
NotPrimaryError,
|
||||
ServerSelectionTimeoutError,
|
||||
WriteConcernError,
|
||||
)
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
@ -276,6 +282,39 @@ class TestGridfs(IntegrationTest):
|
||||
)
|
||||
self.assertEqual(b"custom id", self.fs.open_download_stream(oid).read())
|
||||
|
||||
@patch("gridfs.grid_file._UPLOAD_BUFFER_CHUNKS", 3)
|
||||
@client_context.require_failCommand_fail_point
|
||||
def test_upload_bulk_write_error(self):
|
||||
# Test BulkWriteError from insert_many is converted to an insert_one style error.
|
||||
expected_wce = {
|
||||
"code": 100,
|
||||
"codeName": "UnsatisfiableWriteConcern",
|
||||
"errmsg": "Not enough data-bearing nodes",
|
||||
}
|
||||
cause_wce = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 2},
|
||||
"data": {"failCommands": ["insert"], "writeConcernError": expected_wce},
|
||||
}
|
||||
gin = self.fs.open_upload_stream("test_file", chunk_size_bytes=1)
|
||||
with self.fail_point(cause_wce):
|
||||
# Assert we raise WriteConcernError, not BulkWriteError.
|
||||
with self.assertRaises(WriteConcernError):
|
||||
gin.write(b"hello world")
|
||||
# 3 chunks were uploaded.
|
||||
self.assertEqual(3, self.db.fs.chunks.count_documents({"files_id": gin._id}))
|
||||
gin.abort()
|
||||
|
||||
@patch("gridfs.grid_file._UPLOAD_BUFFER_CHUNKS", 10)
|
||||
def test_upload_batching(self):
|
||||
with self.fs.open_upload_stream("test_file", chunk_size_bytes=1) as gin:
|
||||
gin.write(b"s" * (10 - 1))
|
||||
# No chunks were uploaded yet.
|
||||
self.assertEqual(0, self.db.fs.chunks.count_documents({"files_id": gin._id}))
|
||||
gin.write(b"s")
|
||||
# All chunks were uploaded since we hit the _UPLOAD_BUFFER_CHUNKS limit.
|
||||
self.assertEqual(10, self.db.fs.chunks.count_documents({"files_id": gin._id}))
|
||||
|
||||
def test_open_upload_stream(self):
|
||||
gin = self.fs.open_upload_stream("from_stream")
|
||||
gin.write(b"from stream")
|
||||
@ -362,6 +401,7 @@ class TestGridfs(IntegrationTest):
|
||||
self.assertRaises(NoFile, self.fs.open_download_stream_by_name, "first_name")
|
||||
self.assertEqual(b"testing", self.fs.open_download_stream_by_name("second_name").read())
|
||||
|
||||
@patch("gridfs.grid_file._UPLOAD_BUFFER_SIZE", 5)
|
||||
def test_abort(self):
|
||||
gin = self.fs.open_upload_stream("test_filename", chunk_size_bytes=5)
|
||||
gin.write(b"test1")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user