PYTHON-1332 - Session param for GridFS methods

Also refactor session tests.
This commit is contained in:
A. Jesse Jiryu Davis 2017-09-06 16:42:12 -04:00
parent ab3ab3fdaf
commit e86742d27b
3 changed files with 293 additions and 94 deletions

View File

@ -125,7 +125,7 @@ class GridFS(object):
return grid_file._id
def get(self, file_id):
def get(self, file_id, session=None):
"""Get a file from GridFS by ``"_id"``.
Returns an instance of :class:`~gridfs.grid_file.GridOut`,
@ -133,14 +133,19 @@ class GridFS(object):
:Parameters:
- `file_id`: ``"_id"`` of the file to get
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
gout = GridOut(self.__collection, file_id)
gout = GridOut(self.__collection, file_id, session=session)
# Raise NoFile now, instead of on first attribute access.
gout._ensure_file()
return gout
def get_version(self, filename=None, version=-1, **kwargs):
def get_version(self, filename=None, version=-1, session=None, **kwargs):
"""Get a file from GridFS by ``"filename"`` or metadata fields.
Returns a version of the file in GridFS whose filename matches
@ -165,8 +170,13 @@ class GridFS(object):
- `filename`: ``"filename"`` of the file to get, or `None`
- `version` (optional): version of the file to get (defaults
to -1, the most recent version uploaded)
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
- `**kwargs` (optional): find files by custom metadata.
.. versionchanged:: 3.6
Added ``session`` parameter.
.. versionchanged:: 3.1
``get_version`` no longer ensures indexes.
"""
@ -174,19 +184,20 @@ class GridFS(object):
if filename is not None:
query["filename"] = filename
cursor = self.__files.find(query)
cursor = self.__files.find(query, session=session)
if version < 0:
skip = abs(version) - 1
cursor.limit(-1).skip(skip).sort("uploadDate", DESCENDING)
else:
cursor.limit(-1).skip(version).sort("uploadDate", ASCENDING)
try:
grid_file = next(cursor)
return GridOut(self.__collection, file_document=grid_file)
doc = next(cursor)
return GridOut(
self.__collection, file_document=doc, session=session)
except StopIteration:
raise NoFile("no version %d for filename %r" % (version, filename))
def get_last_version(self, filename=None, **kwargs):
def get_last_version(self, filename=None, session=None, **kwargs):
"""Get the most recent version of a file in GridFS by ``"filename"``
or metadata fields.
@ -195,12 +206,17 @@ class GridFS(object):
:Parameters:
- `filename`: ``"filename"`` of the file to get, or `None`
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
- `**kwargs` (optional): find files by custom metadata.
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
return self.get_version(filename=filename, **kwargs)
return self.get_version(filename=filename, session=session, **kwargs)
# TODO add optional safe mode for chunk removal?
def delete(self, file_id):
def delete(self, file_id, session=None):
"""Delete a file from GridFS by ``"_id"``.
Deletes all data belonging to the file with ``"_id"``:
@ -216,27 +232,39 @@ class GridFS(object):
:Parameters:
- `file_id`: ``"_id"`` of the file to delete
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
.. versionchanged:: 3.1
``delete`` no longer ensures indexes.
"""
self.__files.delete_one({"_id": file_id})
self.__chunks.delete_many({"files_id": file_id})
self.__files.delete_one({"_id": file_id}, session=session)
self.__chunks.delete_many({"files_id": file_id}, session=session)
def list(self):
def list(self, session=None):
"""List the names of all files stored in this instance of
:class:`GridFS`.
:Parameters:
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
.. versionchanged:: 3.1
``list`` no longer ensures indexes.
"""
# With an index, distinct includes documents with no filename
# as None.
return [
name for name in self.__files.distinct("filename")
name for name in self.__files.distinct("filename", session=session)
if name is not None]
def find_one(self, filter=None, *args, **kwargs):
def find_one(self, filter=None, session=None, *args, **kwargs):
"""Get a single file from gridfs.
All arguments to :meth:`find` are also valid arguments for
@ -252,13 +280,18 @@ class GridFS(object):
the value for a query for ``"_id"`` in the file collection.
- `*args` (optional): any additional positional arguments are
the same as the arguments to :meth:`find`.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
- `**kwargs` (optional): any additional keyword arguments
are the same as the arguments to :meth:`find`.
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
if filter is not None and not isinstance(filter, Mapping):
filter = {"_id": filter}
for f in self.find(filter, *args, **kwargs):
for f in self.find(filter, *args, session=session, **kwargs):
return f
return None
@ -290,6 +323,10 @@ class GridFS(object):
:meth:`~pymongo.collection.Collection.find`
in :class:`~pymongo.collection.Collection`.
If a :class:`~pymongo.client_session.ClientSession` is passed to
:meth:`find`, all returned :class:`~gridfs.grid_file.GridOut` instances
are associated with that session.
:Parameters:
- `filter` (optional): a SON object specifying elements which
must be present for a document to be included in the
@ -320,7 +357,7 @@ class GridFS(object):
"""
return GridOutCursor(self.__collection, *args, **kwargs)
def exists(self, document_or_id=None, **kwargs):
def exists(self, document_or_id=None, session=None, **kwargs):
"""Check if a file exists in this instance of :class:`GridFS`.
The file to check for can be specified by the value of its
@ -350,12 +387,20 @@ class GridFS(object):
:Parameters:
- `document_or_id` (optional): query document, or _id of the
document to check for
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
- `**kwargs` (optional): keyword arguments are used as a
query document, if they're present.
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
if kwargs:
return self.__files.find_one(kwargs, ["_id"]) is not None
return self.__files.find_one(document_or_id, ["_id"]) is not None
f = self.__files.find_one(kwargs, ["_id"], session=session)
else:
f = self.__files.find_one(document_or_id, ["_id"], session=session)
return f is not None
class GridFSBucket(object):
@ -409,7 +454,7 @@ class GridFSBucket(object):
self._chunk_size_bytes = chunk_size_bytes
def open_upload_stream(self, filename, chunk_size_bytes=None,
metadata=None):
metadata=None, session=None):
"""Opens a Stream that the application can write the contents of the
file to.
@ -439,6 +484,11 @@ class GridFSBucket(object):
- `metadata` (optional): User data for the 'metadata' field of the
files collection document. If not provided the metadata field will
be omitted from the files collection document.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
validate_string("filename", filename)
@ -448,10 +498,11 @@ class GridFSBucket(object):
if metadata is not None:
opts["metadata"] = metadata
return GridIn(self._collection, **opts)
return GridIn(self._collection, session=session, **opts)
def open_upload_stream_with_id(
self, file_id, filename, chunk_size_bytes=None, metadata=None):
self, file_id, filename, chunk_size_bytes=None, metadata=None,
session=None):
"""Opens a Stream that the application can write the contents of the
file to.
@ -485,6 +536,11 @@ class GridFSBucket(object):
- `metadata` (optional): User data for the 'metadata' field of the
files collection document. If not provided the metadata field will
be omitted from the files collection document.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
validate_string("filename", filename)
@ -495,10 +551,10 @@ class GridFSBucket(object):
if metadata is not None:
opts["metadata"] = metadata
return GridIn(self._collection, **opts)
return GridIn(self._collection, session=session, **opts)
def upload_from_stream(self, filename, source, chunk_size_bytes=None,
metadata=None):
metadata=None, session=None):
"""Uploads a user file to a GridFS bucket.
Reads the contents of the user file from `source` and uploads
@ -528,15 +584,21 @@ class GridFSBucket(object):
- `metadata` (optional): User data for the 'metadata' field of the
files collection document. If not provided the metadata field will
be omitted from the files collection document.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
with self.open_upload_stream(
filename, chunk_size_bytes, metadata) as gin:
filename, chunk_size_bytes, metadata, session=session) as gin:
gin.write(source)
return gin._id
def upload_from_stream_with_id(self, file_id, filename, source,
chunk_size_bytes=None, metadata=None):
chunk_size_bytes=None, metadata=None,
session=None):
"""Uploads a user file to a GridFS bucket with a custom file id.
Reads the contents of the user file from `source` and uploads
@ -567,12 +629,18 @@ class GridFSBucket(object):
- `metadata` (optional): User data for the 'metadata' field of the
files collection document. If not provided the metadata field will
be omitted from the files collection document.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
with self.open_upload_stream_with_id(
file_id, filename, chunk_size_bytes, metadata) as gin:
file_id, filename, chunk_size_bytes, metadata,
session=session) as gin:
gin.write(source)
def open_download_stream(self, file_id):
def open_download_stream(self, file_id, session=None):
"""Opens a Stream from which the application can read the contents of
the stored file specified by file_id.
@ -591,14 +659,19 @@ class GridFSBucket(object):
:Parameters:
- `file_id`: The _id of the file to be downloaded.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
gout = GridOut(self._collection, file_id)
gout = GridOut(self._collection, file_id, session=session)
# Raise NoFile now, instead of on first attribute access.
gout._ensure_file()
return gout
def download_to_stream(self, file_id, destination):
def download_to_stream(self, file_id, destination, session=None):
"""Downloads the contents of the stored file specified by file_id and
writes the contents to `destination`.
@ -619,12 +692,17 @@ class GridFSBucket(object):
:Parameters:
- `file_id`: The _id of the file to be downloaded.
- `destination`: a file-like object implementing :meth:`write`.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
gout = self.open_download_stream(file_id)
gout = self.open_download_stream(file_id, session=session)
for chunk in gout:
destination.write(chunk)
def delete(self, file_id):
def delete(self, file_id, session=None):
"""Given an file_id, delete this stored file's files collection document
and associated chunks from a GridFS bucket.
@ -640,9 +718,14 @@ class GridFSBucket(object):
:Parameters:
- `file_id`: The _id of the file to be deleted.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
res = self._files.delete_one({"_id": file_id})
self._chunks.delete_many({"files_id": file_id})
res = self._files.delete_one({"_id": file_id}, session=session)
self._chunks.delete_many({"files_id": file_id}, session=session)
if not res.deleted_count:
raise NoFile(
"no file could be deleted because none matched %s" % file_id)
@ -676,6 +759,10 @@ class GridFSBucket(object):
:meth:`~pymongo.collection.Collection.find`
in :class:`~pymongo.collection.Collection`.
If a :class:`~pymongo.client_session.ClientSession` is passed to
:meth:`find`, all returned :class:`~gridfs.grid_file.GridOut` instances
are associated with that session.
:Parameters:
- `filter`: Search query.
- `batch_size` (optional): The number of documents to return per
@ -691,7 +778,7 @@ class GridFSBucket(object):
"""
return GridOutCursor(self._collection, *args, **kwargs)
def open_download_stream_by_name(self, filename, revision=-1):
def open_download_stream_by_name(self, filename, revision=-1, session=None):
"""Opens a Stream from which the application can read the contents of
`filename` and optional `revision`.
@ -714,6 +801,8 @@ class GridFSBucket(object):
- `revision` (optional): Which revision (documents with the same
filename and different uploadDate) of the file to retrieve.
Defaults to -1 (the most recent revision).
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
:Note: Revision numbers are defined as follows:
@ -723,12 +812,15 @@ class GridFSBucket(object):
- etc...
- -2 = the second most recent revision
- -1 = the most recent revision
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
validate_string("filename", filename)
query = {"filename": filename}
cursor = self._files.find(query)
cursor = self._files.find(query, session=session)
if revision < 0:
skip = abs(revision) - 1
cursor.limit(-1).skip(skip).sort("uploadDate", DESCENDING)
@ -736,12 +828,14 @@ class GridFSBucket(object):
cursor.limit(-1).skip(revision).sort("uploadDate", ASCENDING)
try:
grid_file = next(cursor)
return GridOut(self._collection, file_document=grid_file)
return GridOut(
self._collection, file_document=grid_file, session=session)
except StopIteration:
raise NoFile(
"no version %d for filename %r" % (revision, filename))
def download_to_stream_by_name(self, filename, destination, revision=-1):
def download_to_stream_by_name(self, filename, destination, revision=-1,
session=None):
"""Write the contents of `filename` (with optional `revision`) to
`destination`.
@ -764,6 +858,8 @@ class GridFSBucket(object):
- `revision` (optional): Which revision (documents with the same
filename and different uploadDate) of the file to retrieve.
Defaults to -1 (the most recent revision).
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
:Note: Revision numbers are defined as follows:
@ -773,12 +869,16 @@ class GridFSBucket(object):
- etc...
- -2 = the second most recent revision
- -1 = the most recent revision
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
gout = self.open_download_stream_by_name(filename, revision)
gout = self.open_download_stream_by_name(
filename, revision, session=session)
for chunk in gout:
destination.write(chunk)
def rename(self, file_id, new_filename):
def rename(self, file_id, new_filename, session=None):
"""Renames the stored file with the specified file_id.
For example::
@ -794,9 +894,15 @@ class GridFSBucket(object):
:Parameters:
- `file_id`: The _id of the file to be renamed.
- `new_filename`: The new name of the file.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
result = self._files.update_one({"_id": file_id},
{"$set": {"filename": new_filename}})
{"$set": {"filename": new_filename}},
session=session)
if not result.matched_count:
raise NoFile("no files could be renamed %r because none "
"matched file_id %i" % (new_filename, file_id))

View File

@ -100,7 +100,7 @@ def _grid_out_property(field_name, docstring):
class GridIn(object):
"""Class to write data to GridFS.
"""
def __init__(self, root_collection, **kwargs):
def __init__(self, root_collection, session=None, **kwargs):
"""Write a file to GridFS
Application developers should generally not need to
@ -136,8 +136,14 @@ class GridIn(object):
:Parameters:
- `root_collection`: root collection to write to
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession` to use for all
commands
- `**kwargs` (optional): file level options (see above)
.. versionchanged:: 3.6
Added ``session`` parameter.
.. versionchanged:: 3.0
`root_collection` must use an acknowledged
:attr:`~pymongo.collection.Collection.write_concern`
@ -164,6 +170,7 @@ class GridIn(object):
# Defaults
kwargs["_id"] = kwargs.get("_id", ObjectId())
kwargs["chunkSize"] = kwargs.get("chunkSize", DEFAULT_CHUNK_SIZE)
object.__setattr__(self, "_session", session)
object.__setattr__(self, "_coll", coll)
object.__setattr__(self, "_chunks", coll.chunks)
object.__setattr__(self, "_file", kwargs)
@ -174,14 +181,16 @@ class GridIn(object):
object.__setattr__(self, "_ensured_index", False)
def __create_index(self, collection, index_key, unique):
doc = collection.find_one(projection={"_id": 1})
doc = collection.find_one(projection={"_id": 1}, session=self._session)
if doc is None:
try:
index_keys =[index_spec['key'] for index_spec in collection.list_indexes()]
index_keys = [index_spec['key'] for index_spec in
collection.list_indexes(session=self._session)]
except OperationFailure:
index_keys = []
if index_key not in index_keys:
collection.create_index(index_key.items(), unique=unique)
collection.create_index(
index_key.items(), unique=unique, session=self._session)
def __ensure_indexes(self):
if not object.__getattribute__(self, "_ensured_index"):
@ -192,11 +201,12 @@ class GridIn(object):
def abort(self):
"""Remove all chunks/files that may have been uploaded and close.
"""
self._coll.chunks.delete_many({"files_id": self._file['_id']})
self._coll.files.delete_one({"_id": self._file['_id']})
self._coll.chunks.delete_many(
{"files_id": self._file['_id']}, session=self._session)
self._coll.files.delete_one(
{"_id": self._file['_id']}, session=self._session)
object.__setattr__(self, "_closed", True)
@property
def closed(self):
"""Is this file closed?
@ -255,7 +265,7 @@ class GridIn(object):
"data": Binary(data)}
try:
self._chunks.insert_one(chunk)
self._chunks.insert_one(chunk, session=self._session)
except DuplicateKeyError:
self._raise_file_exists(self._file['_id'])
self._chunk_number += 1
@ -278,7 +288,8 @@ class GridIn(object):
self._file["length"] = self._position
self._file["uploadDate"] = datetime.datetime.utcnow()
return self._coll.files.insert_one(self._file)
return self._coll.files.insert_one(
self._file, session=self._session)
except DuplicateKeyError:
self._raise_file_exists(self._id)
@ -382,7 +393,8 @@ class GridIn(object):
class GridOut(object):
"""Class to read data out of GridFS.
"""
def __init__(self, root_collection, file_id=None, file_document=None):
def __init__(self, root_collection, file_id=None, file_document=None,
session=None):
"""Read a file from GridFS
Application developers should generally not need to
@ -399,6 +411,12 @@ class GridOut(object):
- `file_id` (optional): value of ``"_id"`` for the file to read
- `file_document` (optional): file document from
`root_collection.files`
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession` to use for all
commands
.. versionchanged:: 3.6
Added ``session`` parameter.
.. versionchanged:: 3.0
Creating a GridOut does not immediately retrieve the file metadata
@ -414,6 +432,7 @@ class GridOut(object):
self.__buffer = EMPTY
self.__position = 0
self._file = file_document
self._session = session
_id = _grid_out_property("_id", "The ``'_id'`` value for this file.")
filename = _grid_out_property("filename", "Name of this file.")
@ -430,7 +449,8 @@ class GridOut(object):
def _ensure_file(self):
if not self._file:
self._file = self.__files.find_one({"_id": self.__file_id})
self._file = self.__files.find_one({"_id": self.__file_id},
session=self._session)
if not self._file:
raise NoFile("no file in gridfs collection %r with _id %r" %
(self.__files, self.__file_id))
@ -454,7 +474,8 @@ class GridOut(object):
elif self.__position < int(self.length):
chunk_number = int((received + self.__position) / chunk_size)
chunk = self.__chunks.find_one({"files_id": self._id,
"n": chunk_number})
"n": chunk_number},
session=self._session)
if not chunk:
raise CorruptGridFile("no chunk #%d" % chunk_number)
@ -496,7 +517,8 @@ class GridOut(object):
# Detect extra chunks.
max_chunk_n = math.ceil(self.length / float(self.chunk_size))
chunk = self.__chunks.find_one({"files_id": self._id,
"n": {"$gte": max_chunk_n}})
"n": {"$gte": max_chunk_n}},
session=self._session)
# According to spec, ignore extra chunks if they are empty.
if chunk is not None and len(chunk['data']):
raise CorruptGridFile(
@ -585,7 +607,7 @@ class GridOut(object):
useful when serving files using a webserver that handles
such an iterator efficiently.
"""
return GridOutIterator(self, self.__chunks)
return GridOutIterator(self, self.__chunks, self._session)
def close(self):
"""Make GridOut more generically file-like."""
@ -605,9 +627,10 @@ class GridOut(object):
class GridOutIterator(object):
def __init__(self, grid_out, chunks):
def __init__(self, grid_out, chunks, session):
self.__id = grid_out._id
self.__chunks = chunks
self.__session = session
self.__current_chunk = 0
self.__max_chunk = math.ceil(float(grid_out.length) /
grid_out.chunk_size)
@ -619,7 +642,8 @@ class GridOutIterator(object):
if self.__current_chunk >= self.__max_chunk:
raise StopIteration
chunk = self.__chunks.find_one({"files_id": self.__id,
"n": self.__current_chunk})
"n": self.__current_chunk},
session=self.__session)
if not chunk:
raise CorruptGridFile("no chunk #%d" % self.__current_chunk)
self.__current_chunk += 1
@ -633,7 +657,8 @@ class GridOutCursor(Cursor):
of an arbitrary query against the GridFS files collection.
"""
def __init__(self, collection, filter=None, skip=0, limit=0,
no_cursor_timeout=False, sort=None, batch_size=0):
no_cursor_timeout=False, sort=None, batch_size=0,
session=None):
"""Create a new cursor, similar to the normal
:class:`~pymongo.cursor.Cursor`.
@ -650,14 +675,15 @@ class GridOutCursor(Cursor):
super(GridOutCursor, self).__init__(
collection.files, filter, skip=skip, limit=limit,
no_cursor_timeout=no_cursor_timeout, sort=sort,
batch_size=batch_size)
batch_size=batch_size, session=session)
def next(self):
"""Get next GridOut object from cursor.
"""
# Work around "super is not iterable" issue in Python 3.x
next_file = super(GridOutCursor, self).next()
return GridOut(self.__root_collection, file_document=next_file)
return GridOut(self.__root_collection, file_document=next_file,
session=self.session)
__next__ = next

View File

@ -13,8 +13,11 @@
# limitations under the License.
"""Test the client_session module."""
from bson import DBRef
from pymongo import InsertOne, IndexModel, monitoring, OFF
from bson.py3compat import StringIO
from gridfs import GridFS, GridFSBucket
from pymongo import InsertOne, IndexModel, OFF, monitoring
from pymongo.errors import (ConfigurationError,
InvalidOperation,
OperationFailure)
@ -49,6 +52,27 @@ class TestSession(IntegrationTest):
monitoring._SENSITIVE_COMMANDS.update(self.sensitive_commands)
super(TestSession, self).tearDown()
def _test_ops(self, client, *ops):
listener = client.event_listeners()[0][0]
with client.start_session() as s:
for f, args, kwargs in ops:
listener.results.clear()
kwargs['session'] = s
f(*args, **kwargs)
self.assertGreaterEqual(len(listener.results['started']), 1)
for event in listener.results['started']:
self.assertTrue(
'lsid' in event.command,
"%s sent no lsid with %s" % (
f.__name__, event.command_name))
self.assertEqual(
s.session_id,
event.command['lsid'],
"%s sent wrong lsid with %s" % (
f.__name__, event.command_name))
@client_context.require_auth
@ignore_deprecations
def test_session_authenticate_multiple(self):
@ -112,23 +136,7 @@ class TestSession(IntegrationTest):
(client.unlock, [], {}),
])
with client.start_session() as s:
for f, args, kwargs in ops:
listener.results.clear()
kwargs['session'] = s
f(*args, **kwargs)
self.assertGreaterEqual(len(listener.results['started']), 1)
for event in listener.results['started']:
self.assertTrue(
'lsid' in event.command,
"%s sent no lsid with %s" % (
f.__name__, event.command_name))
self.assertEqual(
s.session_id,
event.command['lsid'],
"%s sent wrong lsid with %s" % (
f.__name__, event.command_name))
self._test_ops(client, *ops)
@client_context.require_sessions
def test_database(self):
@ -161,23 +169,7 @@ class TestSession(IntegrationTest):
ops.append((db.set_profiling_level, [OFF], {}))
ops.append((db.profiling_level, [], {}))
with client.start_session() as s:
for f, args, kwargs in ops:
listener.results.clear()
kwargs['session'] = s
f(*args, **kwargs)
self.assertGreaterEqual(len(listener.results['started']), 1)
for event in listener.results['started']:
self.assertTrue(
'lsid' in event.command,
"%s sent no lsid with %s" % (
f.__name__, event.command_name))
self.assertEqual(
s.session_id,
event.command['lsid'],
"%s sent wrong lsid with %s" % (
f.__name__, event.command_name))
self._test_ops(client, *ops)
@client_context.require_sessions
def test_collection(self):
@ -290,3 +282,78 @@ class TestSession(IntegrationTest):
event.command['lsid'],
"%s sent wrong lsid with %s" % (
name, event.command_name))
@client_context.require_sessions
def test_gridfs(self):
listener = SessionTestListener()
client = rs_or_single_client(event_listeners=[listener])
client.drop_database('pymongo_test')
self.addCleanup(client.drop_database, 'pymongo_test')
fs = GridFS(client.pymongo_test)
def new_file(session):
grid_file = fs.new_file(_id=1, filename='f', session=session)
grid_file.write(b'a')
grid_file.close()
self._test_ops(
client,
(new_file, [], {}),
(fs.put, [b'data'], {}),
(lambda session: fs.get(1, session=session).read(), [], {}),
(lambda session: fs.get_version('f', session=session).read(),
[], {}),
(lambda session: fs.get_last_version('f', session=session).read(),
[], {}),
(fs.list, [], {}),
(fs.find_one, [1], {}),
(lambda session: list(fs.find(session=session)), [], {}),
(fs.exists, [1], {}),
(fs.delete, [1], {}))
@client_context.require_sessions
def test_gridfs_bucket(self):
listener = SessionTestListener()
client = rs_or_single_client(event_listeners=[listener])
client.drop_database('pymongo_test')
self.addCleanup(client.drop_database, 'pymongo_test')
bucket = GridFSBucket(client.pymongo_test)
def upload(session):
stream = bucket.open_upload_stream('f', session=session)
stream.write(b'a')
stream.close()
def upload_with_id(session):
stream = bucket.open_upload_stream_with_id(1, 'f1', session=session)
stream.write(b'a')
stream.close()
def open_download_stream(session):
stream = bucket.open_download_stream(1, session=session)
stream.read()
def open_download_stream_by_name(session):
stream = bucket.open_download_stream_by_name('f', session=session)
stream.read()
def find(session):
stream = bucket.find({'_id': 1}, session=session).next()
stream.read()
sio = StringIO()
self._test_ops(
client,
(upload, [], {}),
(upload_with_id, [], {}),
(bucket.upload_from_stream, ['f', 'data'], {}),
(bucket.upload_from_stream_with_id, [2, 'f', 'data'], {}),
(open_download_stream, [], {}),
(open_download_stream_by_name, [], {}),
(bucket.download_to_stream, [1, sio], {}),
(bucket.download_to_stream_by_name, ['f', sio], {}),
(find, [], {}),
(bucket.rename, [1, 'f2'], {}))