PYTHON-2243 Raise informative error message when attempting a GridFS operation in a transaction (#454)
This commit is contained in:
parent
7e2790cc44
commit
5d92b2f552
@ -53,6 +53,13 @@ Version 3.11 adds support for MongoDB 4.4. Highlights include:
|
||||
:meth:`~pymongo.database.Database.command` to run the ``reIndex`` command
|
||||
instead.
|
||||
|
||||
Unavoidable breaking changes:
|
||||
|
||||
- :class:`~gridfs.GridFSBucket` and :class:`~gridfs.GridFS` do not support
|
||||
multi-document transactions. Running a GridFS operation in a transaction
|
||||
now always raises the following error:
|
||||
``InvalidOperation: GridFS does not support multi-document transactions``
|
||||
|
||||
.. _validate command: https://docs.mongodb.com/manual/reference/command/validate/
|
||||
|
||||
Issues Resolved
|
||||
|
||||
@ -26,7 +26,8 @@ from gridfs.grid_file import (GridIn,
|
||||
GridOut,
|
||||
GridOutCursor,
|
||||
DEFAULT_CHUNK_SIZE,
|
||||
_clear_entity_type_registry)
|
||||
_clear_entity_type_registry,
|
||||
_disallow_transactions)
|
||||
from pymongo import (ASCENDING,
|
||||
DESCENDING)
|
||||
from pymongo.common import UNAUTHORIZED_CODES, validate_string
|
||||
@ -50,6 +51,10 @@ class GridFS(object):
|
||||
computed for uploaded files. Useful in environments where MD5
|
||||
cannot be used for regulatory or other reasons. Defaults to False.
|
||||
|
||||
.. versionchanged:: 3.11
|
||||
Running a GridFS operation in a transaction now always raises an
|
||||
error. GridFS does not support multi-document transactions.
|
||||
|
||||
.. versionchanged:: 3.1
|
||||
Indexes are only ensured on the first write to the DB.
|
||||
|
||||
@ -68,7 +73,6 @@ class GridFS(object):
|
||||
raise ConfigurationError('database must use '
|
||||
'acknowledged write_concern')
|
||||
|
||||
self.__database = database
|
||||
self.__collection = database[collection]
|
||||
self.__files = self.__collection.files
|
||||
self.__chunks = self.__collection.chunks
|
||||
@ -88,8 +92,6 @@ class GridFS(object):
|
||||
:Parameters:
|
||||
- `**kwargs` (optional): keyword arguments for file creation
|
||||
"""
|
||||
# No need for __ensure_index_files_id() here; GridIn ensures
|
||||
# the (files_id, n) index when needed.
|
||||
return GridIn(
|
||||
self.__collection, disable_md5=self.__disable_md5, **kwargs)
|
||||
|
||||
@ -192,6 +194,7 @@ class GridFS(object):
|
||||
if filename is not None:
|
||||
query["filename"] = filename
|
||||
|
||||
_disallow_transactions(session)
|
||||
cursor = self.__files.find(query, session=session)
|
||||
if version < 0:
|
||||
skip = abs(version) - 1
|
||||
@ -249,6 +252,7 @@ class GridFS(object):
|
||||
.. versionchanged:: 3.1
|
||||
``delete`` no longer ensures indexes.
|
||||
"""
|
||||
_disallow_transactions(session)
|
||||
self.__files.delete_one({"_id": file_id}, session=session)
|
||||
self.__chunks.delete_many({"files_id": file_id}, session=session)
|
||||
|
||||
@ -266,6 +270,7 @@ class GridFS(object):
|
||||
.. versionchanged:: 3.1
|
||||
``list`` no longer ensures indexes.
|
||||
"""
|
||||
_disallow_transactions(session)
|
||||
# With an index, distinct includes documents with no filename
|
||||
# as None.
|
||||
return [
|
||||
@ -299,6 +304,7 @@ class GridFS(object):
|
||||
if filter is not None and not isinstance(filter, abc.Mapping):
|
||||
filter = {"_id": filter}
|
||||
|
||||
_disallow_transactions(session)
|
||||
for f in self.find(filter, *args, session=session, **kwargs):
|
||||
return f
|
||||
|
||||
@ -403,6 +409,7 @@ class GridFS(object):
|
||||
.. versionchanged:: 3.6
|
||||
Added ``session`` parameter.
|
||||
"""
|
||||
_disallow_transactions(session)
|
||||
if kwargs:
|
||||
f = self.__files.find_one(kwargs, ["_id"], session=session)
|
||||
else:
|
||||
@ -439,6 +446,10 @@ class GridFSBucket(object):
|
||||
computed for uploaded files. Useful in environments where MD5
|
||||
cannot be used for regulatory or other reasons. Defaults to False.
|
||||
|
||||
.. versionchanged:: 3.11
|
||||
Running a GridFS operation in a transaction now always raises an
|
||||
error. GridFSBucket does not support multi-document transactions.
|
||||
|
||||
.. versionadded:: 3.1
|
||||
|
||||
.. mongodoc:: gridfs
|
||||
@ -452,7 +463,6 @@ class GridFSBucket(object):
|
||||
if not wtc.acknowledged:
|
||||
raise ConfigurationError('write concern must be acknowledged')
|
||||
|
||||
self._db = db
|
||||
self._bucket_name = bucket_name
|
||||
self._collection = db[bucket_name]
|
||||
self._disable_md5 = disable_md5
|
||||
@ -746,6 +756,7 @@ class GridFSBucket(object):
|
||||
.. versionchanged:: 3.6
|
||||
Added ``session`` parameter.
|
||||
"""
|
||||
_disallow_transactions(session)
|
||||
res = self._files.delete_one({"_id": file_id}, session=session)
|
||||
self._chunks.delete_many({"files_id": file_id}, session=session)
|
||||
if not res.deleted_count:
|
||||
@ -839,9 +850,8 @@ class GridFSBucket(object):
|
||||
Added ``session`` parameter.
|
||||
"""
|
||||
validate_string("filename", filename)
|
||||
|
||||
query = {"filename": filename}
|
||||
|
||||
_disallow_transactions(session)
|
||||
cursor = self._files.find(query, session=session)
|
||||
if revision < 0:
|
||||
skip = abs(revision) - 1
|
||||
@ -922,6 +932,7 @@ class GridFSBucket(object):
|
||||
.. versionchanged:: 3.6
|
||||
Added ``session`` parameter.
|
||||
"""
|
||||
_disallow_transactions(session)
|
||||
result = self._files.update_one({"_id": file_id},
|
||||
{"$set": {"filename": new_filename}},
|
||||
session=session)
|
||||
|
||||
@ -31,6 +31,7 @@ from pymongo.cursor import Cursor
|
||||
from pymongo.errors import (ConfigurationError,
|
||||
CursorNotFound,
|
||||
DuplicateKeyError,
|
||||
InvalidOperation,
|
||||
OperationFailure)
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
|
||||
@ -105,6 +106,12 @@ def _clear_entity_type_registry(entity, **kwargs):
|
||||
return entity.with_options(codec_options=codecopts, **kwargs)
|
||||
|
||||
|
||||
def _disallow_transactions(session):
|
||||
if session and session.in_transaction:
|
||||
raise InvalidOperation(
|
||||
'GridFS does not support multi-document transactions')
|
||||
|
||||
|
||||
class GridIn(object):
|
||||
"""Class to write data to GridFS.
|
||||
"""
|
||||
@ -168,6 +175,7 @@ class GridIn(object):
|
||||
if not root_collection.write_concern.acknowledged:
|
||||
raise ConfigurationError('root_collection must use '
|
||||
'acknowledged write_concern')
|
||||
_disallow_transactions(session)
|
||||
|
||||
# Handle alternative naming
|
||||
if "content_type" in kwargs:
|
||||
@ -207,6 +215,7 @@ class GridIn(object):
|
||||
|
||||
def __ensure_indexes(self):
|
||||
if not object.__getattribute__(self, "_ensured_index"):
|
||||
_disallow_transactions(self._session)
|
||||
self.__create_index(self._coll.files, _F_INDEX, False)
|
||||
self.__create_index(self._coll.chunks, _C_INDEX, True)
|
||||
object.__setattr__(self, "_ensured_index", True)
|
||||
@ -456,6 +465,7 @@ class GridOut(object):
|
||||
if not isinstance(root_collection, Collection):
|
||||
raise TypeError("root_collection must be an "
|
||||
"instance of Collection")
|
||||
_disallow_transactions(session)
|
||||
|
||||
root_collection = _clear_entity_type_registry(root_collection)
|
||||
|
||||
@ -483,6 +493,7 @@ class GridOut(object):
|
||||
|
||||
def _ensure_file(self):
|
||||
if not self._file:
|
||||
_disallow_transactions(self._session)
|
||||
self._file = self.__files.find_one({"_id": self.__file_id},
|
||||
session=self._session)
|
||||
if not self._file:
|
||||
@ -718,6 +729,7 @@ class _GridOutChunkIterator(object):
|
||||
filter = {"files_id": self._id}
|
||||
if self._next_chunk > 0:
|
||||
filter["n"] = {"$gte": self._next_chunk}
|
||||
_disallow_transactions(self._session)
|
||||
self._cursor = self._chunks.find(filter, sort=[("n", 1)],
|
||||
session=self._session)
|
||||
|
||||
@ -810,6 +822,7 @@ class GridOutCursor(Cursor):
|
||||
|
||||
.. mongodoc:: cursors
|
||||
"""
|
||||
_disallow_transactions(session)
|
||||
collection = _clear_entity_type_registry(collection)
|
||||
|
||||
# Hold on to the base "fs" collection to create GridOut objects later.
|
||||
@ -823,6 +836,7 @@ class GridOutCursor(Cursor):
|
||||
def next(self):
|
||||
"""Get next GridOut object from cursor.
|
||||
"""
|
||||
_disallow_transactions(self.session)
|
||||
# Work around "super is not iterable" issue in Python 3.x
|
||||
next_file = super(GridOutCursor, self).next()
|
||||
return GridOut(self.__root_collection, file_document=next_file,
|
||||
|
||||
@ -19,16 +19,21 @@ import sys
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from bson.py3compat import StringIO
|
||||
|
||||
from pymongo import client_session, WriteConcern
|
||||
from pymongo.client_session import TransactionOptions
|
||||
from pymongo.errors import (CollectionInvalid,
|
||||
ConfigurationError,
|
||||
ConnectionFailure,
|
||||
InvalidOperation,
|
||||
OperationFailure)
|
||||
from pymongo.operations import IndexModel, InsertOne
|
||||
from pymongo.read_concern import ReadConcern
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
|
||||
from gridfs import GridFS, GridFSBucket
|
||||
|
||||
from test import unittest, client_context
|
||||
from test.utils import (rs_client, single_client,
|
||||
wait_until, OvertCommandListener,
|
||||
@ -215,8 +220,7 @@ class TestTransactions(TransactionsBase):
|
||||
@client_context.require_transactions
|
||||
@client_context.require_version_min(4, 3, 4)
|
||||
def test_create_collection(self):
|
||||
client = rs_client()
|
||||
self.addCleanup(client.close)
|
||||
client = client_context.client
|
||||
db = client.pymongo_test
|
||||
coll = db.test_create_collection
|
||||
self.addCleanup(coll.drop)
|
||||
@ -241,6 +245,48 @@ class TestTransactions(TransactionsBase):
|
||||
db.create_collection(coll.name, session=s)
|
||||
self.assertEqual(ctx.exception.code, 48) # NamespaceExists
|
||||
|
||||
@client_context.require_transactions
|
||||
def test_gridfs_does_not_support_transactions(self):
|
||||
client = client_context.client
|
||||
db = client.pymongo_test
|
||||
gfs = GridFS(db)
|
||||
bucket = GridFSBucket(db)
|
||||
|
||||
def gridfs_find(*args, **kwargs):
|
||||
return gfs.find(*args, **kwargs).next()
|
||||
|
||||
def gridfs_open_upload_stream(*args, **kwargs):
|
||||
bucket.open_upload_stream(*args, **kwargs).write(b'1')
|
||||
|
||||
gridfs_ops = [
|
||||
(gfs.put, (b'123',)),
|
||||
(gfs.get, (1,)),
|
||||
(gfs.get_version, ('name',)),
|
||||
(gfs.get_last_version, ('name',)),
|
||||
(gfs.delete, (1, )),
|
||||
(gfs.list, ()),
|
||||
(gfs.find_one, ()),
|
||||
(gridfs_find, ()),
|
||||
(gfs.exists, ()),
|
||||
(gridfs_open_upload_stream, ('name',)),
|
||||
(bucket.upload_from_stream, ('name', b'data',)),
|
||||
(bucket.download_to_stream, (1, StringIO(),)),
|
||||
(bucket.download_to_stream_by_name, ('name', StringIO(),)),
|
||||
(bucket.delete, (1,)),
|
||||
(bucket.find, ()),
|
||||
(bucket.open_download_stream, (1,)),
|
||||
(bucket.open_download_stream_by_name, ('name',)),
|
||||
(bucket.rename, (1, 'new-name',)),
|
||||
]
|
||||
|
||||
with client.start_session() as s, s.start_transaction():
|
||||
for op, args in gridfs_ops:
|
||||
with self.assertRaisesRegex(
|
||||
InvalidOperation,
|
||||
'GridFS does not support multi-document transactions',
|
||||
):
|
||||
op(*args, session=s)
|
||||
|
||||
|
||||
class PatchSessionTimeout(object):
|
||||
"""Patches the client_session's with_transaction timeout for testing."""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user