diff --git a/gridfs/__init__.py b/gridfs/__init__.py index 8b13e0cb8..0edf03df5 100644 --- a/gridfs/__init__.py +++ b/gridfs/__init__.py @@ -125,7 +125,7 @@ class GridFS(object): return grid_file._id - def get(self, file_id): + def get(self, file_id, session=None): """Get a file from GridFS by ``"_id"``. Returns an instance of :class:`~gridfs.grid_file.GridOut`, @@ -133,14 +133,19 @@ class GridFS(object): :Parameters: - `file_id`: ``"_id"`` of the file to get + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ - gout = GridOut(self.__collection, file_id) + gout = GridOut(self.__collection, file_id, session=session) # Raise NoFile now, instead of on first attribute access. gout._ensure_file() return gout - def get_version(self, filename=None, version=-1, **kwargs): + def get_version(self, filename=None, version=-1, session=None, **kwargs): """Get a file from GridFS by ``"filename"`` or metadata fields. Returns a version of the file in GridFS whose filename matches @@ -165,8 +170,13 @@ class GridFS(object): - `filename`: ``"filename"`` of the file to get, or `None` - `version` (optional): version of the file to get (defaults to -1, the most recent version uploaded) + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` - `**kwargs` (optional): find files by custom metadata. + .. versionchanged:: 3.6 + Added ``session`` parameter. + .. versionchanged:: 3.1 ``get_version`` no longer ensures indexes. """ @@ -174,19 +184,20 @@ class GridFS(object): if filename is not None: query["filename"] = filename - cursor = self.__files.find(query) + cursor = self.__files.find(query, session=session) if version < 0: skip = abs(version) - 1 cursor.limit(-1).skip(skip).sort("uploadDate", DESCENDING) else: cursor.limit(-1).skip(version).sort("uploadDate", ASCENDING) try: - grid_file = next(cursor) - return GridOut(self.__collection, file_document=grid_file) + doc = next(cursor) + return GridOut( + self.__collection, file_document=doc, session=session) except StopIteration: raise NoFile("no version %d for filename %r" % (version, filename)) - def get_last_version(self, filename=None, **kwargs): + def get_last_version(self, filename=None, session=None, **kwargs): """Get the most recent version of a file in GridFS by ``"filename"`` or metadata fields. @@ -195,12 +206,17 @@ class GridFS(object): :Parameters: - `filename`: ``"filename"`` of the file to get, or `None` + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` - `**kwargs` (optional): find files by custom metadata. + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ - return self.get_version(filename=filename, **kwargs) + return self.get_version(filename=filename, session=session, **kwargs) # TODO add optional safe mode for chunk removal? - def delete(self, file_id): + def delete(self, file_id, session=None): """Delete a file from GridFS by ``"_id"``. Deletes all data belonging to the file with ``"_id"``: @@ -216,27 +232,39 @@ class GridFS(object): :Parameters: - `file_id`: ``"_id"`` of the file to delete + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. .. versionchanged:: 3.1 ``delete`` no longer ensures indexes. """ - self.__files.delete_one({"_id": file_id}) - self.__chunks.delete_many({"files_id": file_id}) + self.__files.delete_one({"_id": file_id}, session=session) + self.__chunks.delete_many({"files_id": file_id}, session=session) - def list(self): + def list(self, session=None): """List the names of all files stored in this instance of :class:`GridFS`. + :Parameters: + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. + .. versionchanged:: 3.1 ``list`` no longer ensures indexes. """ # With an index, distinct includes documents with no filename # as None. return [ - name for name in self.__files.distinct("filename") + name for name in self.__files.distinct("filename", session=session) if name is not None] - def find_one(self, filter=None, *args, **kwargs): + def find_one(self, filter=None, session=None, *args, **kwargs): """Get a single file from gridfs. All arguments to :meth:`find` are also valid arguments for @@ -252,13 +280,18 @@ class GridFS(object): the value for a query for ``"_id"`` in the file collection. - `*args` (optional): any additional positional arguments are the same as the arguments to :meth:`find`. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` - `**kwargs` (optional): any additional keyword arguments are the same as the arguments to :meth:`find`. + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ if filter is not None and not isinstance(filter, Mapping): filter = {"_id": filter} - for f in self.find(filter, *args, **kwargs): + for f in self.find(filter, *args, session=session, **kwargs): return f return None @@ -290,6 +323,10 @@ class GridFS(object): :meth:`~pymongo.collection.Collection.find` in :class:`~pymongo.collection.Collection`. + If a :class:`~pymongo.client_session.ClientSession` is passed to + :meth:`find`, all returned :class:`~gridfs.grid_file.GridOut` instances + are associated with that session. + :Parameters: - `filter` (optional): a SON object specifying elements which must be present for a document to be included in the @@ -320,7 +357,7 @@ class GridFS(object): """ return GridOutCursor(self.__collection, *args, **kwargs) - def exists(self, document_or_id=None, **kwargs): + def exists(self, document_or_id=None, session=None, **kwargs): """Check if a file exists in this instance of :class:`GridFS`. The file to check for can be specified by the value of its @@ -350,12 +387,20 @@ class GridFS(object): :Parameters: - `document_or_id` (optional): query document, or _id of the document to check for + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` - `**kwargs` (optional): keyword arguments are used as a query document, if they're present. + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ if kwargs: - return self.__files.find_one(kwargs, ["_id"]) is not None - return self.__files.find_one(document_or_id, ["_id"]) is not None + f = self.__files.find_one(kwargs, ["_id"], session=session) + else: + f = self.__files.find_one(document_or_id, ["_id"], session=session) + + return f is not None class GridFSBucket(object): @@ -409,7 +454,7 @@ class GridFSBucket(object): self._chunk_size_bytes = chunk_size_bytes def open_upload_stream(self, filename, chunk_size_bytes=None, - metadata=None): + metadata=None, session=None): """Opens a Stream that the application can write the contents of the file to. @@ -439,6 +484,11 @@ class GridFSBucket(object): - `metadata` (optional): User data for the 'metadata' field of the files collection document. If not provided the metadata field will be omitted from the files collection document. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ validate_string("filename", filename) @@ -448,10 +498,11 @@ class GridFSBucket(object): if metadata is not None: opts["metadata"] = metadata - return GridIn(self._collection, **opts) + return GridIn(self._collection, session=session, **opts) def open_upload_stream_with_id( - self, file_id, filename, chunk_size_bytes=None, metadata=None): + self, file_id, filename, chunk_size_bytes=None, metadata=None, + session=None): """Opens a Stream that the application can write the contents of the file to. @@ -485,6 +536,11 @@ class GridFSBucket(object): - `metadata` (optional): User data for the 'metadata' field of the files collection document. If not provided the metadata field will be omitted from the files collection document. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ validate_string("filename", filename) @@ -495,10 +551,10 @@ class GridFSBucket(object): if metadata is not None: opts["metadata"] = metadata - return GridIn(self._collection, **opts) + return GridIn(self._collection, session=session, **opts) def upload_from_stream(self, filename, source, chunk_size_bytes=None, - metadata=None): + metadata=None, session=None): """Uploads a user file to a GridFS bucket. Reads the contents of the user file from `source` and uploads @@ -528,15 +584,21 @@ class GridFSBucket(object): - `metadata` (optional): User data for the 'metadata' field of the files collection document. If not provided the metadata field will be omitted from the files collection document. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ with self.open_upload_stream( - filename, chunk_size_bytes, metadata) as gin: + filename, chunk_size_bytes, metadata, session=session) as gin: gin.write(source) return gin._id def upload_from_stream_with_id(self, file_id, filename, source, - chunk_size_bytes=None, metadata=None): + chunk_size_bytes=None, metadata=None, + session=None): """Uploads a user file to a GridFS bucket with a custom file id. Reads the contents of the user file from `source` and uploads @@ -567,12 +629,18 @@ class GridFSBucket(object): - `metadata` (optional): User data for the 'metadata' field of the files collection document. If not provided the metadata field will be omitted from the files collection document. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ with self.open_upload_stream_with_id( - file_id, filename, chunk_size_bytes, metadata) as gin: + file_id, filename, chunk_size_bytes, metadata, + session=session) as gin: gin.write(source) - def open_download_stream(self, file_id): + def open_download_stream(self, file_id, session=None): """Opens a Stream from which the application can read the contents of the stored file specified by file_id. @@ -591,14 +659,19 @@ class GridFSBucket(object): :Parameters: - `file_id`: The _id of the file to be downloaded. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ - gout = GridOut(self._collection, file_id) + gout = GridOut(self._collection, file_id, session=session) # Raise NoFile now, instead of on first attribute access. gout._ensure_file() return gout - def download_to_stream(self, file_id, destination): + def download_to_stream(self, file_id, destination, session=None): """Downloads the contents of the stored file specified by file_id and writes the contents to `destination`. @@ -619,12 +692,17 @@ class GridFSBucket(object): :Parameters: - `file_id`: The _id of the file to be downloaded. - `destination`: a file-like object implementing :meth:`write`. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ - gout = self.open_download_stream(file_id) + gout = self.open_download_stream(file_id, session=session) for chunk in gout: destination.write(chunk) - def delete(self, file_id): + def delete(self, file_id, session=None): """Given an file_id, delete this stored file's files collection document and associated chunks from a GridFS bucket. @@ -640,9 +718,14 @@ class GridFSBucket(object): :Parameters: - `file_id`: The _id of the file to be deleted. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ - res = self._files.delete_one({"_id": file_id}) - self._chunks.delete_many({"files_id": file_id}) + res = self._files.delete_one({"_id": file_id}, session=session) + self._chunks.delete_many({"files_id": file_id}, session=session) if not res.deleted_count: raise NoFile( "no file could be deleted because none matched %s" % file_id) @@ -676,6 +759,10 @@ class GridFSBucket(object): :meth:`~pymongo.collection.Collection.find` in :class:`~pymongo.collection.Collection`. + If a :class:`~pymongo.client_session.ClientSession` is passed to + :meth:`find`, all returned :class:`~gridfs.grid_file.GridOut` instances + are associated with that session. + :Parameters: - `filter`: Search query. - `batch_size` (optional): The number of documents to return per @@ -691,7 +778,7 @@ class GridFSBucket(object): """ return GridOutCursor(self._collection, *args, **kwargs) - def open_download_stream_by_name(self, filename, revision=-1): + def open_download_stream_by_name(self, filename, revision=-1, session=None): """Opens a Stream from which the application can read the contents of `filename` and optional `revision`. @@ -714,6 +801,8 @@ class GridFSBucket(object): - `revision` (optional): Which revision (documents with the same filename and different uploadDate) of the file to retrieve. Defaults to -1 (the most recent revision). + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` :Note: Revision numbers are defined as follows: @@ -723,12 +812,15 @@ class GridFSBucket(object): - etc... - -2 = the second most recent revision - -1 = the most recent revision + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ validate_string("filename", filename) query = {"filename": filename} - cursor = self._files.find(query) + cursor = self._files.find(query, session=session) if revision < 0: skip = abs(revision) - 1 cursor.limit(-1).skip(skip).sort("uploadDate", DESCENDING) @@ -736,12 +828,14 @@ class GridFSBucket(object): cursor.limit(-1).skip(revision).sort("uploadDate", ASCENDING) try: grid_file = next(cursor) - return GridOut(self._collection, file_document=grid_file) + return GridOut( + self._collection, file_document=grid_file, session=session) except StopIteration: raise NoFile( "no version %d for filename %r" % (revision, filename)) - def download_to_stream_by_name(self, filename, destination, revision=-1): + def download_to_stream_by_name(self, filename, destination, revision=-1, + session=None): """Write the contents of `filename` (with optional `revision`) to `destination`. @@ -764,6 +858,8 @@ class GridFSBucket(object): - `revision` (optional): Which revision (documents with the same filename and different uploadDate) of the file to retrieve. Defaults to -1 (the most recent revision). + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` :Note: Revision numbers are defined as follows: @@ -773,12 +869,16 @@ class GridFSBucket(object): - etc... - -2 = the second most recent revision - -1 = the most recent revision + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ - gout = self.open_download_stream_by_name(filename, revision) + gout = self.open_download_stream_by_name( + filename, revision, session=session) for chunk in gout: destination.write(chunk) - def rename(self, file_id, new_filename): + def rename(self, file_id, new_filename, session=None): """Renames the stored file with the specified file_id. For example:: @@ -794,9 +894,15 @@ class GridFSBucket(object): :Parameters: - `file_id`: The _id of the file to be renamed. - `new_filename`: The new name of the file. + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` + + .. versionchanged:: 3.6 + Added ``session`` parameter. """ result = self._files.update_one({"_id": file_id}, - {"$set": {"filename": new_filename}}) + {"$set": {"filename": new_filename}}, + session=session) if not result.matched_count: raise NoFile("no files could be renamed %r because none " "matched file_id %i" % (new_filename, file_id)) diff --git a/gridfs/grid_file.py b/gridfs/grid_file.py index 390f8437e..5cc8f8faf 100644 --- a/gridfs/grid_file.py +++ b/gridfs/grid_file.py @@ -100,7 +100,7 @@ def _grid_out_property(field_name, docstring): class GridIn(object): """Class to write data to GridFS. """ - def __init__(self, root_collection, **kwargs): + def __init__(self, root_collection, session=None, **kwargs): """Write a file to GridFS Application developers should generally not need to @@ -136,8 +136,14 @@ class GridIn(object): :Parameters: - `root_collection`: root collection to write to + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` to use for all + commands - `**kwargs` (optional): file level options (see above) + .. versionchanged:: 3.6 + Added ``session`` parameter. + .. versionchanged:: 3.0 `root_collection` must use an acknowledged :attr:`~pymongo.collection.Collection.write_concern` @@ -164,6 +170,7 @@ class GridIn(object): # Defaults kwargs["_id"] = kwargs.get("_id", ObjectId()) kwargs["chunkSize"] = kwargs.get("chunkSize", DEFAULT_CHUNK_SIZE) + object.__setattr__(self, "_session", session) object.__setattr__(self, "_coll", coll) object.__setattr__(self, "_chunks", coll.chunks) object.__setattr__(self, "_file", kwargs) @@ -174,14 +181,16 @@ class GridIn(object): object.__setattr__(self, "_ensured_index", False) def __create_index(self, collection, index_key, unique): - doc = collection.find_one(projection={"_id": 1}) + doc = collection.find_one(projection={"_id": 1}, session=self._session) if doc is None: try: - index_keys =[index_spec['key'] for index_spec in collection.list_indexes()] + index_keys = [index_spec['key'] for index_spec in + collection.list_indexes(session=self._session)] except OperationFailure: index_keys = [] if index_key not in index_keys: - collection.create_index(index_key.items(), unique=unique) + collection.create_index( + index_key.items(), unique=unique, session=self._session) def __ensure_indexes(self): if not object.__getattribute__(self, "_ensured_index"): @@ -192,11 +201,12 @@ class GridIn(object): def abort(self): """Remove all chunks/files that may have been uploaded and close. """ - self._coll.chunks.delete_many({"files_id": self._file['_id']}) - self._coll.files.delete_one({"_id": self._file['_id']}) + self._coll.chunks.delete_many( + {"files_id": self._file['_id']}, session=self._session) + self._coll.files.delete_one( + {"_id": self._file['_id']}, session=self._session) object.__setattr__(self, "_closed", True) - @property def closed(self): """Is this file closed? @@ -255,7 +265,7 @@ class GridIn(object): "data": Binary(data)} try: - self._chunks.insert_one(chunk) + self._chunks.insert_one(chunk, session=self._session) except DuplicateKeyError: self._raise_file_exists(self._file['_id']) self._chunk_number += 1 @@ -278,7 +288,8 @@ class GridIn(object): self._file["length"] = self._position self._file["uploadDate"] = datetime.datetime.utcnow() - return self._coll.files.insert_one(self._file) + return self._coll.files.insert_one( + self._file, session=self._session) except DuplicateKeyError: self._raise_file_exists(self._id) @@ -382,7 +393,8 @@ class GridIn(object): class GridOut(object): """Class to read data out of GridFS. """ - def __init__(self, root_collection, file_id=None, file_document=None): + def __init__(self, root_collection, file_id=None, file_document=None, + session=None): """Read a file from GridFS Application developers should generally not need to @@ -399,6 +411,12 @@ class GridOut(object): - `file_id` (optional): value of ``"_id"`` for the file to read - `file_document` (optional): file document from `root_collection.files` + - `session` (optional): a + :class:`~pymongo.client_session.ClientSession` to use for all + commands + + .. versionchanged:: 3.6 + Added ``session`` parameter. .. versionchanged:: 3.0 Creating a GridOut does not immediately retrieve the file metadata @@ -414,6 +432,7 @@ class GridOut(object): self.__buffer = EMPTY self.__position = 0 self._file = file_document + self._session = session _id = _grid_out_property("_id", "The ``'_id'`` value for this file.") filename = _grid_out_property("filename", "Name of this file.") @@ -430,7 +449,8 @@ class GridOut(object): def _ensure_file(self): if not self._file: - self._file = self.__files.find_one({"_id": self.__file_id}) + self._file = self.__files.find_one({"_id": self.__file_id}, + session=self._session) if not self._file: raise NoFile("no file in gridfs collection %r with _id %r" % (self.__files, self.__file_id)) @@ -454,7 +474,8 @@ class GridOut(object): elif self.__position < int(self.length): chunk_number = int((received + self.__position) / chunk_size) chunk = self.__chunks.find_one({"files_id": self._id, - "n": chunk_number}) + "n": chunk_number}, + session=self._session) if not chunk: raise CorruptGridFile("no chunk #%d" % chunk_number) @@ -496,7 +517,8 @@ class GridOut(object): # Detect extra chunks. max_chunk_n = math.ceil(self.length / float(self.chunk_size)) chunk = self.__chunks.find_one({"files_id": self._id, - "n": {"$gte": max_chunk_n}}) + "n": {"$gte": max_chunk_n}}, + session=self._session) # According to spec, ignore extra chunks if they are empty. if chunk is not None and len(chunk['data']): raise CorruptGridFile( @@ -585,7 +607,7 @@ class GridOut(object): useful when serving files using a webserver that handles such an iterator efficiently. """ - return GridOutIterator(self, self.__chunks) + return GridOutIterator(self, self.__chunks, self._session) def close(self): """Make GridOut more generically file-like.""" @@ -605,9 +627,10 @@ class GridOut(object): class GridOutIterator(object): - def __init__(self, grid_out, chunks): + def __init__(self, grid_out, chunks, session): self.__id = grid_out._id self.__chunks = chunks + self.__session = session self.__current_chunk = 0 self.__max_chunk = math.ceil(float(grid_out.length) / grid_out.chunk_size) @@ -619,7 +642,8 @@ class GridOutIterator(object): if self.__current_chunk >= self.__max_chunk: raise StopIteration chunk = self.__chunks.find_one({"files_id": self.__id, - "n": self.__current_chunk}) + "n": self.__current_chunk}, + session=self.__session) if not chunk: raise CorruptGridFile("no chunk #%d" % self.__current_chunk) self.__current_chunk += 1 @@ -633,7 +657,8 @@ class GridOutCursor(Cursor): of an arbitrary query against the GridFS files collection. """ def __init__(self, collection, filter=None, skip=0, limit=0, - no_cursor_timeout=False, sort=None, batch_size=0): + no_cursor_timeout=False, sort=None, batch_size=0, + session=None): """Create a new cursor, similar to the normal :class:`~pymongo.cursor.Cursor`. @@ -650,14 +675,15 @@ class GridOutCursor(Cursor): super(GridOutCursor, self).__init__( collection.files, filter, skip=skip, limit=limit, no_cursor_timeout=no_cursor_timeout, sort=sort, - batch_size=batch_size) + batch_size=batch_size, session=session) def next(self): """Get next GridOut object from cursor. """ # Work around "super is not iterable" issue in Python 3.x next_file = super(GridOutCursor, self).next() - return GridOut(self.__root_collection, file_document=next_file) + return GridOut(self.__root_collection, file_document=next_file, + session=self.session) __next__ = next diff --git a/test/test_session.py b/test/test_session.py index a1d462865..9f5bb1825 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -13,8 +13,11 @@ # limitations under the License. """Test the client_session module.""" + from bson import DBRef -from pymongo import InsertOne, IndexModel, monitoring, OFF +from bson.py3compat import StringIO +from gridfs import GridFS, GridFSBucket +from pymongo import InsertOne, IndexModel, OFF, monitoring from pymongo.errors import (ConfigurationError, InvalidOperation, OperationFailure) @@ -49,6 +52,27 @@ class TestSession(IntegrationTest): monitoring._SENSITIVE_COMMANDS.update(self.sensitive_commands) super(TestSession, self).tearDown() + def _test_ops(self, client, *ops): + listener = client.event_listeners()[0][0] + + with client.start_session() as s: + for f, args, kwargs in ops: + listener.results.clear() + kwargs['session'] = s + f(*args, **kwargs) + self.assertGreaterEqual(len(listener.results['started']), 1) + for event in listener.results['started']: + self.assertTrue( + 'lsid' in event.command, + "%s sent no lsid with %s" % ( + f.__name__, event.command_name)) + + self.assertEqual( + s.session_id, + event.command['lsid'], + "%s sent wrong lsid with %s" % ( + f.__name__, event.command_name)) + @client_context.require_auth @ignore_deprecations def test_session_authenticate_multiple(self): @@ -112,23 +136,7 @@ class TestSession(IntegrationTest): (client.unlock, [], {}), ]) - with client.start_session() as s: - for f, args, kwargs in ops: - listener.results.clear() - kwargs['session'] = s - f(*args, **kwargs) - self.assertGreaterEqual(len(listener.results['started']), 1) - for event in listener.results['started']: - self.assertTrue( - 'lsid' in event.command, - "%s sent no lsid with %s" % ( - f.__name__, event.command_name)) - - self.assertEqual( - s.session_id, - event.command['lsid'], - "%s sent wrong lsid with %s" % ( - f.__name__, event.command_name)) + self._test_ops(client, *ops) @client_context.require_sessions def test_database(self): @@ -161,23 +169,7 @@ class TestSession(IntegrationTest): ops.append((db.set_profiling_level, [OFF], {})) ops.append((db.profiling_level, [], {})) - with client.start_session() as s: - for f, args, kwargs in ops: - listener.results.clear() - kwargs['session'] = s - f(*args, **kwargs) - self.assertGreaterEqual(len(listener.results['started']), 1) - for event in listener.results['started']: - self.assertTrue( - 'lsid' in event.command, - "%s sent no lsid with %s" % ( - f.__name__, event.command_name)) - - self.assertEqual( - s.session_id, - event.command['lsid'], - "%s sent wrong lsid with %s" % ( - f.__name__, event.command_name)) + self._test_ops(client, *ops) @client_context.require_sessions def test_collection(self): @@ -290,3 +282,78 @@ class TestSession(IntegrationTest): event.command['lsid'], "%s sent wrong lsid with %s" % ( name, event.command_name)) + + @client_context.require_sessions + def test_gridfs(self): + listener = SessionTestListener() + client = rs_or_single_client(event_listeners=[listener]) + client.drop_database('pymongo_test') + self.addCleanup(client.drop_database, 'pymongo_test') + + fs = GridFS(client.pymongo_test) + + def new_file(session): + grid_file = fs.new_file(_id=1, filename='f', session=session) + grid_file.write(b'a') + grid_file.close() + + self._test_ops( + client, + (new_file, [], {}), + (fs.put, [b'data'], {}), + (lambda session: fs.get(1, session=session).read(), [], {}), + (lambda session: fs.get_version('f', session=session).read(), + [], {}), + (lambda session: fs.get_last_version('f', session=session).read(), + [], {}), + (fs.list, [], {}), + (fs.find_one, [1], {}), + (lambda session: list(fs.find(session=session)), [], {}), + (fs.exists, [1], {}), + (fs.delete, [1], {})) + + @client_context.require_sessions + def test_gridfs_bucket(self): + listener = SessionTestListener() + client = rs_or_single_client(event_listeners=[listener]) + client.drop_database('pymongo_test') + self.addCleanup(client.drop_database, 'pymongo_test') + + bucket = GridFSBucket(client.pymongo_test) + + def upload(session): + stream = bucket.open_upload_stream('f', session=session) + stream.write(b'a') + stream.close() + + def upload_with_id(session): + stream = bucket.open_upload_stream_with_id(1, 'f1', session=session) + stream.write(b'a') + stream.close() + + def open_download_stream(session): + stream = bucket.open_download_stream(1, session=session) + stream.read() + + def open_download_stream_by_name(session): + stream = bucket.open_download_stream_by_name('f', session=session) + stream.read() + + def find(session): + stream = bucket.find({'_id': 1}, session=session).next() + stream.read() + + sio = StringIO() + + self._test_ops( + client, + (upload, [], {}), + (upload_with_id, [], {}), + (bucket.upload_from_stream, ['f', 'data'], {}), + (bucket.upload_from_stream_with_id, [2, 'f', 'data'], {}), + (open_download_stream, [], {}), + (open_download_stream_by_name, [], {}), + (bucket.download_to_stream, [1, sio], {}), + (bucket.download_to_stream_by_name, ['f', sio], {}), + (find, [], {}), + (bucket.rename, [1, 'f2'], {}))