PYTHON-4669 - Update Async GridFS APIs for Motor Compatibility (#1821)
This commit is contained in:
parent
5a49ccc759
commit
4e74c8274e
@ -1176,24 +1176,6 @@ class AsyncGridIn:
|
||||
raise AttributeError("GridIn object has no attribute '%s'" % name)
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None:
|
||||
# For properties of this instance like _buffer, or descriptors set on
|
||||
# the class like filename, use regular __setattr__
|
||||
if name in self.__dict__ or name in self.__class__.__dict__:
|
||||
object.__setattr__(self, name, value)
|
||||
else:
|
||||
if _IS_SYNC:
|
||||
# All other attributes are part of the document in db.fs.files.
|
||||
# Store them to be sent to server on close() or if closed, send
|
||||
# them now.
|
||||
self._file[name] = value
|
||||
if self._closed:
|
||||
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
|
||||
else:
|
||||
raise AttributeError(
|
||||
"AsyncGridIn does not support __setattr__. Use AsyncGridIn.set() instead"
|
||||
)
|
||||
|
||||
async def set(self, name: str, value: Any) -> None:
|
||||
# For properties of this instance like _buffer, or descriptors set on
|
||||
# the class like filename, use regular __setattr__
|
||||
if name in self.__dict__ or name in self.__class__.__dict__:
|
||||
@ -1204,9 +1186,17 @@ class AsyncGridIn:
|
||||
# them now.
|
||||
self._file[name] = value
|
||||
if self._closed:
|
||||
await self._coll.files.update_one(
|
||||
{"_id": self._file["_id"]}, {"$set": {name: value}}
|
||||
)
|
||||
if _IS_SYNC:
|
||||
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
|
||||
else:
|
||||
raise AttributeError(
|
||||
"AsyncGridIn does not support __setattr__ after being closed(). Set the attribute before closing the file or use AsyncGridIn.set() instead"
|
||||
)
|
||||
|
||||
async def set(self, name: str, value: Any) -> None:
|
||||
self._file[name] = value
|
||||
if self._closed:
|
||||
await self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
|
||||
|
||||
async def _flush_data(self, data: Any, force: bool = False) -> None:
|
||||
"""Flush `data` to a chunk."""
|
||||
@ -1400,7 +1390,11 @@ class AsyncGridIn:
|
||||
return False
|
||||
|
||||
|
||||
class AsyncGridOut(io.IOBase):
|
||||
GRIDOUT_BASE_CLASS = io.IOBase if _IS_SYNC else object # type: Any
|
||||
|
||||
|
||||
class AsyncGridOut(GRIDOUT_BASE_CLASS): # type: ignore
|
||||
|
||||
"""Class to read data out of GridFS."""
|
||||
|
||||
def __init__(
|
||||
@ -1460,6 +1454,8 @@ class AsyncGridOut(io.IOBase):
|
||||
self._position = 0
|
||||
self._file = file_document
|
||||
self._session = session
|
||||
if not _IS_SYNC:
|
||||
self.closed = False
|
||||
|
||||
_id: Any = _a_grid_out_property("_id", "The ``'_id'`` value for this file.")
|
||||
filename: str = _a_grid_out_property("filename", "Name of this file.")
|
||||
@ -1486,16 +1482,43 @@ class AsyncGridOut(io.IOBase):
|
||||
_file: Any
|
||||
_chunk_iter: Any
|
||||
|
||||
async def __anext__(self) -> bytes:
|
||||
return super().__next__()
|
||||
if not _IS_SYNC:
|
||||
closed: bool
|
||||
|
||||
def __next__(self) -> bytes: # noqa: F811, RUF100
|
||||
if _IS_SYNC:
|
||||
return super().__next__()
|
||||
else:
|
||||
raise TypeError(
|
||||
"AsyncGridOut does not support synchronous iteration. Use `async for` instead"
|
||||
)
|
||||
async def __anext__(self) -> bytes:
|
||||
line = await self.readline()
|
||||
if line:
|
||||
return line
|
||||
raise StopAsyncIteration()
|
||||
|
||||
async def to_list(self) -> list[bytes]:
|
||||
return [x async for x in self] # noqa: C416, RUF100
|
||||
|
||||
async def readline(self, size: int = -1) -> bytes:
|
||||
"""Read one line or up to `size` bytes from the file.
|
||||
|
||||
:param size: the maximum number of bytes to read
|
||||
"""
|
||||
return await self._read_size_or_line(size=size, line=True)
|
||||
|
||||
async def readlines(self, size: int = -1) -> list[bytes]:
|
||||
"""Read one line or up to `size` bytes from the file.
|
||||
|
||||
:param size: the maximum number of bytes to read
|
||||
"""
|
||||
await self.open()
|
||||
lines = []
|
||||
remainder = int(self.length) - self._position
|
||||
bytes_read = 0
|
||||
while remainder > 0:
|
||||
line = await self._read_size_or_line(line=True)
|
||||
bytes_read += len(line)
|
||||
lines.append(line)
|
||||
remainder = int(self.length) - self._position
|
||||
if 0 < size < bytes_read:
|
||||
break
|
||||
|
||||
return lines
|
||||
|
||||
async def open(self) -> None:
|
||||
if not self._file:
|
||||
@ -1616,18 +1639,11 @@ class AsyncGridOut(io.IOBase):
|
||||
"""
|
||||
return await self._read_size_or_line(size=size)
|
||||
|
||||
async def readline(self, size: int = -1) -> bytes: # type: ignore[override]
|
||||
"""Read one line or up to `size` bytes from the file.
|
||||
|
||||
:param size: the maximum number of bytes to read
|
||||
"""
|
||||
return await self._read_size_or_line(size=size, line=True)
|
||||
|
||||
def tell(self) -> int:
|
||||
"""Return the current position of this file."""
|
||||
return self._position
|
||||
|
||||
async def seek(self, pos: int, whence: int = _SEEK_SET) -> int: # type: ignore[override]
|
||||
async def seek(self, pos: int, whence: int = _SEEK_SET) -> int:
|
||||
"""Set the current position of this file.
|
||||
|
||||
:param pos: the position (or offset if using relative
|
||||
@ -1690,12 +1706,15 @@ class AsyncGridOut(io.IOBase):
|
||||
"""
|
||||
return self
|
||||
|
||||
async def close(self) -> None: # type: ignore[override]
|
||||
async def close(self) -> None:
|
||||
"""Make GridOut more generically file-like."""
|
||||
if self._chunk_iter:
|
||||
await self._chunk_iter.close()
|
||||
self._chunk_iter = None
|
||||
super().close()
|
||||
if _IS_SYNC:
|
||||
super().close()
|
||||
else:
|
||||
self.closed = True
|
||||
|
||||
def write(self, value: Any) -> NoReturn:
|
||||
raise io.UnsupportedOperation("write")
|
||||
|
||||
@ -38,7 +38,15 @@ def _a_grid_in_property(
|
||||
) -> Any:
|
||||
"""Create a GridIn property."""
|
||||
|
||||
warn_str = ""
|
||||
if docstring.startswith("DEPRECATED,"):
|
||||
warn_str = (
|
||||
f"GridIn property '{field_name}' is deprecated and will be removed in PyMongo 5.0"
|
||||
)
|
||||
|
||||
def getter(self: Any) -> Any:
|
||||
if warn_str:
|
||||
warnings.warn(warn_str, stacklevel=2, category=DeprecationWarning)
|
||||
if closed_only and not self._closed:
|
||||
raise AttributeError("can only get %r on a closed file" % field_name)
|
||||
# Protect against PHP-237
|
||||
@ -46,6 +54,15 @@ def _a_grid_in_property(
|
||||
return self._file.get(field_name, 0)
|
||||
return self._file.get(field_name, None)
|
||||
|
||||
def setter(self: Any, value: Any) -> Any:
|
||||
if warn_str:
|
||||
warnings.warn(warn_str, stacklevel=2, category=DeprecationWarning)
|
||||
if self._closed:
|
||||
raise InvalidOperation(
|
||||
"AsyncGridIn does not support __setattr__ after being closed(). Set the attribute before closing the file or use AsyncGridIn.set() instead"
|
||||
)
|
||||
self._file[field_name] = value
|
||||
|
||||
if read_only:
|
||||
docstring += "\n\nThis attribute is read-only."
|
||||
elif closed_only:
|
||||
@ -56,6 +73,8 @@ def _a_grid_in_property(
|
||||
"has been called.",
|
||||
)
|
||||
|
||||
if not read_only and not closed_only:
|
||||
return property(getter, setter, doc=docstring)
|
||||
return property(getter, doc=docstring)
|
||||
|
||||
|
||||
|
||||
@ -1166,24 +1166,6 @@ class GridIn:
|
||||
raise AttributeError("GridIn object has no attribute '%s'" % name)
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None:
|
||||
# For properties of this instance like _buffer, or descriptors set on
|
||||
# the class like filename, use regular __setattr__
|
||||
if name in self.__dict__ or name in self.__class__.__dict__:
|
||||
object.__setattr__(self, name, value)
|
||||
else:
|
||||
if _IS_SYNC:
|
||||
# All other attributes are part of the document in db.fs.files.
|
||||
# Store them to be sent to server on close() or if closed, send
|
||||
# them now.
|
||||
self._file[name] = value
|
||||
if self._closed:
|
||||
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
|
||||
else:
|
||||
raise AttributeError(
|
||||
"GridIn does not support __setattr__. Use GridIn.set() instead"
|
||||
)
|
||||
|
||||
def set(self, name: str, value: Any) -> None:
|
||||
# For properties of this instance like _buffer, or descriptors set on
|
||||
# the class like filename, use regular __setattr__
|
||||
if name in self.__dict__ or name in self.__class__.__dict__:
|
||||
@ -1194,7 +1176,17 @@ class GridIn:
|
||||
# them now.
|
||||
self._file[name] = value
|
||||
if self._closed:
|
||||
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
|
||||
if _IS_SYNC:
|
||||
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
|
||||
else:
|
||||
raise AttributeError(
|
||||
"GridIn does not support __setattr__ after being closed(). Set the attribute before closing the file or use GridIn.set() instead"
|
||||
)
|
||||
|
||||
def set(self, name: str, value: Any) -> None:
|
||||
self._file[name] = value
|
||||
if self._closed:
|
||||
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
|
||||
|
||||
def _flush_data(self, data: Any, force: bool = False) -> None:
|
||||
"""Flush `data` to a chunk."""
|
||||
@ -1388,7 +1380,11 @@ class GridIn:
|
||||
return False
|
||||
|
||||
|
||||
class GridOut(io.IOBase):
|
||||
GRIDOUT_BASE_CLASS = io.IOBase if _IS_SYNC else object # type: Any
|
||||
|
||||
|
||||
class GridOut(GRIDOUT_BASE_CLASS): # type: ignore
|
||||
|
||||
"""Class to read data out of GridFS."""
|
||||
|
||||
def __init__(
|
||||
@ -1448,6 +1444,8 @@ class GridOut(io.IOBase):
|
||||
self._position = 0
|
||||
self._file = file_document
|
||||
self._session = session
|
||||
if not _IS_SYNC:
|
||||
self.closed = False
|
||||
|
||||
_id: Any = _grid_out_property("_id", "The ``'_id'`` value for this file.")
|
||||
filename: str = _grid_out_property("filename", "Name of this file.")
|
||||
@ -1474,14 +1472,43 @@ class GridOut(io.IOBase):
|
||||
_file: Any
|
||||
_chunk_iter: Any
|
||||
|
||||
def __next__(self) -> bytes:
|
||||
return super().__next__()
|
||||
if not _IS_SYNC:
|
||||
closed: bool
|
||||
|
||||
def __next__(self) -> bytes: # noqa: F811, RUF100
|
||||
if _IS_SYNC:
|
||||
return super().__next__()
|
||||
else:
|
||||
raise TypeError("GridOut does not support synchronous iteration. Use `for` instead")
|
||||
def __next__(self) -> bytes:
|
||||
line = self.readline()
|
||||
if line:
|
||||
return line
|
||||
raise StopIteration()
|
||||
|
||||
def to_list(self) -> list[bytes]:
|
||||
return [x for x in self] # noqa: C416, RUF100
|
||||
|
||||
def readline(self, size: int = -1) -> bytes:
|
||||
"""Read one line or up to `size` bytes from the file.
|
||||
|
||||
:param size: the maximum number of bytes to read
|
||||
"""
|
||||
return self._read_size_or_line(size=size, line=True)
|
||||
|
||||
def readlines(self, size: int = -1) -> list[bytes]:
|
||||
"""Read one line or up to `size` bytes from the file.
|
||||
|
||||
:param size: the maximum number of bytes to read
|
||||
"""
|
||||
self.open()
|
||||
lines = []
|
||||
remainder = int(self.length) - self._position
|
||||
bytes_read = 0
|
||||
while remainder > 0:
|
||||
line = self._read_size_or_line(line=True)
|
||||
bytes_read += len(line)
|
||||
lines.append(line)
|
||||
remainder = int(self.length) - self._position
|
||||
if 0 < size < bytes_read:
|
||||
break
|
||||
|
||||
return lines
|
||||
|
||||
def open(self) -> None:
|
||||
if not self._file:
|
||||
@ -1602,18 +1629,11 @@ class GridOut(io.IOBase):
|
||||
"""
|
||||
return self._read_size_or_line(size=size)
|
||||
|
||||
def readline(self, size: int = -1) -> bytes: # type: ignore[override]
|
||||
"""Read one line or up to `size` bytes from the file.
|
||||
|
||||
:param size: the maximum number of bytes to read
|
||||
"""
|
||||
return self._read_size_or_line(size=size, line=True)
|
||||
|
||||
def tell(self) -> int:
|
||||
"""Return the current position of this file."""
|
||||
return self._position
|
||||
|
||||
def seek(self, pos: int, whence: int = _SEEK_SET) -> int: # type: ignore[override]
|
||||
def seek(self, pos: int, whence: int = _SEEK_SET) -> int:
|
||||
"""Set the current position of this file.
|
||||
|
||||
:param pos: the position (or offset if using relative
|
||||
@ -1676,12 +1696,15 @@ class GridOut(io.IOBase):
|
||||
"""
|
||||
return self
|
||||
|
||||
def close(self) -> None: # type: ignore[override]
|
||||
def close(self) -> None:
|
||||
"""Make GridOut more generically file-like."""
|
||||
if self._chunk_iter:
|
||||
self._chunk_iter.close()
|
||||
self._chunk_iter = None
|
||||
super().close()
|
||||
if _IS_SYNC:
|
||||
super().close()
|
||||
else:
|
||||
self.closed = True
|
||||
|
||||
def write(self, value: Any) -> NoReturn:
|
||||
raise io.UnsupportedOperation("write")
|
||||
|
||||
@ -70,8 +70,13 @@ def _handle_reauth(func: F) -> F:
|
||||
|
||||
if sys.version_info >= (3, 10):
|
||||
anext = builtins.anext
|
||||
aiter = builtins.aiter
|
||||
else:
|
||||
|
||||
async def anext(cls: Any) -> Any:
|
||||
"""Compatibility function until we drop 3.9 support: https://docs.python.org/3/library/functions.html#anext."""
|
||||
return await cls.__anext__()
|
||||
|
||||
def aiter(cls: Any) -> Any:
|
||||
"""Compatibility function until we drop 3.9 support: https://docs.python.org/3/library/functions.html#anext."""
|
||||
return cls.__aiter__()
|
||||
|
||||
@ -521,7 +521,7 @@ class Topology:
|
||||
if server:
|
||||
await server.pool.reset(interrupt_connections=interrupt_connections)
|
||||
|
||||
# Wake waiters in select_servers().
|
||||
# Wake anything waiting in select_servers().
|
||||
self._condition.notify_all()
|
||||
|
||||
async def on_change(
|
||||
|
||||
@ -70,8 +70,13 @@ def _handle_reauth(func: F) -> F:
|
||||
|
||||
if sys.version_info >= (3, 10):
|
||||
next = builtins.next
|
||||
iter = builtins.iter
|
||||
else:
|
||||
|
||||
def next(cls: Any) -> Any:
|
||||
"""Compatibility function until we drop 3.9 support: https://docs.python.org/3/library/functions.html#next."""
|
||||
return cls.__next__()
|
||||
|
||||
def iter(cls: Any) -> Any:
|
||||
"""Compatibility function until we drop 3.9 support: https://docs.python.org/3/library/functions.html#next."""
|
||||
return cls.__iter__()
|
||||
|
||||
@ -521,7 +521,7 @@ class Topology:
|
||||
if server:
|
||||
server.pool.reset(interrupt_connections=interrupt_connections)
|
||||
|
||||
# Wake waiters in select_servers().
|
||||
# Wake anything waiting in select_servers().
|
||||
self._condition.notify_all()
|
||||
|
||||
def on_change(
|
||||
|
||||
871
test/asynchronous/test_grid_file.py
Normal file
871
test/asynchronous/test_grid_file.py
Normal file
@ -0,0 +1,871 @@
|
||||
#
|
||||
# Copyright 2009-present MongoDB, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Tests for the grid_file module."""
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import io
|
||||
import sys
|
||||
import zipfile
|
||||
from io import BytesIO
|
||||
from test.asynchronous import AsyncIntegrationTest, AsyncUnitTest, async_client_context
|
||||
|
||||
from pymongo.asynchronous.database import AsyncDatabase
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from test import IntegrationTest, qcheck, unittest
|
||||
from test.utils import EventListener, async_rs_or_single_client, rs_or_single_client
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
from gridfs import GridFS
|
||||
from gridfs.asynchronous.grid_file import (
|
||||
_SEEK_CUR,
|
||||
_SEEK_END,
|
||||
DEFAULT_CHUNK_SIZE,
|
||||
AsyncGridFS,
|
||||
AsyncGridIn,
|
||||
AsyncGridOut,
|
||||
AsyncGridOutCursor,
|
||||
)
|
||||
from gridfs.errors import NoFile
|
||||
from pymongo import AsyncMongoClient
|
||||
from pymongo.asynchronous.helpers import aiter, anext
|
||||
from pymongo.errors import ConfigurationError, InvalidOperation, ServerSelectionTimeoutError
|
||||
from pymongo.message import _CursorAddress
|
||||
|
||||
_IS_SYNC = False
|
||||
|
||||
|
||||
class AsyncTestGridFileNoConnect(AsyncUnitTest):
|
||||
"""Test GridFile features on a client that does not connect."""
|
||||
|
||||
db: AsyncDatabase
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.db = AsyncMongoClient(connect=False).pymongo_test
|
||||
|
||||
def test_grid_in_custom_opts(self):
|
||||
self.assertRaises(TypeError, AsyncGridIn, "foo")
|
||||
|
||||
a = AsyncGridIn(
|
||||
self.db.fs,
|
||||
_id=5,
|
||||
filename="my_file",
|
||||
contentType="text/html",
|
||||
chunkSize=1000,
|
||||
aliases=["foo"],
|
||||
metadata={"foo": 1, "bar": 2},
|
||||
bar=3,
|
||||
baz="hello",
|
||||
)
|
||||
|
||||
self.assertEqual(5, a._id)
|
||||
self.assertEqual("my_file", a.filename)
|
||||
self.assertEqual("my_file", a.name)
|
||||
self.assertEqual("text/html", a.content_type)
|
||||
self.assertEqual(1000, a.chunk_size)
|
||||
self.assertEqual(["foo"], a.aliases)
|
||||
self.assertEqual({"foo": 1, "bar": 2}, a.metadata)
|
||||
self.assertEqual(3, a.bar)
|
||||
self.assertEqual("hello", a.baz)
|
||||
self.assertRaises(AttributeError, getattr, a, "mike")
|
||||
|
||||
b = AsyncGridIn(self.db.fs, content_type="text/html", chunk_size=1000, baz=100)
|
||||
self.assertEqual("text/html", b.content_type)
|
||||
self.assertEqual(1000, b.chunk_size)
|
||||
self.assertEqual(100, b.baz)
|
||||
|
||||
|
||||
class AsyncTestGridFile(AsyncIntegrationTest):
|
||||
async def asyncSetUp(self):
|
||||
await self.cleanup_colls(self.db.fs.files, self.db.fs.chunks)
|
||||
|
||||
async def test_basic(self):
|
||||
f = AsyncGridIn(self.db.fs, filename="test")
|
||||
await f.write(b"hello world")
|
||||
await f.close()
|
||||
self.assertEqual(1, await self.db.fs.files.count_documents({}))
|
||||
self.assertEqual(1, await self.db.fs.chunks.count_documents({}))
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"hello world", await g.read())
|
||||
|
||||
# make sure it's still there...
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"hello world", await g.read())
|
||||
|
||||
f = AsyncGridIn(self.db.fs, filename="test")
|
||||
await f.close()
|
||||
self.assertEqual(2, await self.db.fs.files.count_documents({}))
|
||||
self.assertEqual(1, await self.db.fs.chunks.count_documents({}))
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"", await g.read())
|
||||
|
||||
# test that reading 0 returns proper type
|
||||
self.assertEqual(b"", await g.read(0))
|
||||
|
||||
async def test_md5(self):
|
||||
f = AsyncGridIn(self.db.fs)
|
||||
await f.write(b"hello world\n")
|
||||
await f.close()
|
||||
self.assertEqual(None, f.md5)
|
||||
|
||||
async def test_alternate_collection(self):
|
||||
await self.db.alt.files.delete_many({})
|
||||
await self.db.alt.chunks.delete_many({})
|
||||
|
||||
f = AsyncGridIn(self.db.alt)
|
||||
await f.write(b"hello world")
|
||||
await f.close()
|
||||
|
||||
self.assertEqual(1, await self.db.alt.files.count_documents({}))
|
||||
self.assertEqual(1, await self.db.alt.chunks.count_documents({}))
|
||||
|
||||
g = AsyncGridOut(self.db.alt, f._id)
|
||||
self.assertEqual(b"hello world", await g.read())
|
||||
|
||||
async def test_grid_in_default_opts(self):
|
||||
self.assertRaises(TypeError, AsyncGridIn, "foo")
|
||||
|
||||
a = AsyncGridIn(self.db.fs)
|
||||
|
||||
self.assertTrue(isinstance(a._id, ObjectId))
|
||||
self.assertRaises(AttributeError, setattr, a, "_id", 5)
|
||||
|
||||
self.assertEqual(None, a.filename)
|
||||
self.assertEqual(None, a.name)
|
||||
a.filename = "my_file"
|
||||
self.assertEqual("my_file", a.filename)
|
||||
self.assertEqual("my_file", a.name)
|
||||
|
||||
self.assertEqual(None, a.content_type)
|
||||
a.content_type = "text/html"
|
||||
|
||||
self.assertEqual("text/html", a.content_type)
|
||||
|
||||
self.assertRaises(AttributeError, getattr, a, "length")
|
||||
self.assertRaises(AttributeError, setattr, a, "length", 5)
|
||||
|
||||
self.assertEqual(255 * 1024, a.chunk_size)
|
||||
self.assertRaises(AttributeError, setattr, a, "chunk_size", 5)
|
||||
|
||||
self.assertRaises(AttributeError, getattr, a, "upload_date")
|
||||
self.assertRaises(AttributeError, setattr, a, "upload_date", 5)
|
||||
|
||||
self.assertRaises(AttributeError, getattr, a, "aliases")
|
||||
a.aliases = ["foo"]
|
||||
|
||||
self.assertEqual(["foo"], a.aliases)
|
||||
|
||||
self.assertRaises(AttributeError, getattr, a, "metadata")
|
||||
a.metadata = {"foo": 1}
|
||||
|
||||
self.assertEqual({"foo": 1}, a.metadata)
|
||||
|
||||
self.assertRaises(AttributeError, setattr, a, "md5", 5)
|
||||
|
||||
await a.close()
|
||||
|
||||
if _IS_SYNC:
|
||||
a.forty_two = 42
|
||||
else:
|
||||
self.assertRaises(AttributeError, setattr, a, "forty_two", 42)
|
||||
await a.set("forty_two", 42)
|
||||
|
||||
self.assertEqual(42, a.forty_two)
|
||||
|
||||
self.assertTrue(isinstance(a._id, ObjectId))
|
||||
self.assertRaises(AttributeError, setattr, a, "_id", 5)
|
||||
|
||||
self.assertEqual("my_file", a.filename)
|
||||
self.assertEqual("my_file", a.name)
|
||||
|
||||
self.assertEqual("text/html", a.content_type)
|
||||
|
||||
self.assertEqual(0, a.length)
|
||||
self.assertRaises(AttributeError, setattr, a, "length", 5)
|
||||
|
||||
self.assertEqual(255 * 1024, a.chunk_size)
|
||||
self.assertRaises(AttributeError, setattr, a, "chunk_size", 5)
|
||||
|
||||
self.assertTrue(isinstance(a.upload_date, datetime.datetime))
|
||||
self.assertRaises(AttributeError, setattr, a, "upload_date", 5)
|
||||
|
||||
self.assertEqual(["foo"], a.aliases)
|
||||
|
||||
self.assertEqual({"foo": 1}, a.metadata)
|
||||
|
||||
self.assertEqual(None, a.md5)
|
||||
self.assertRaises(AttributeError, setattr, a, "md5", 5)
|
||||
|
||||
# Make sure custom attributes that were set both before and after
|
||||
# a.close() are reflected in b. PYTHON-411.
|
||||
b = await AsyncGridFS(self.db).get_last_version(filename=a.filename)
|
||||
self.assertEqual(a.metadata, b.metadata)
|
||||
self.assertEqual(a.aliases, b.aliases)
|
||||
self.assertEqual(a.forty_two, b.forty_two)
|
||||
|
||||
async def test_grid_out_default_opts(self):
|
||||
self.assertRaises(TypeError, AsyncGridOut, "foo")
|
||||
|
||||
gout = AsyncGridOut(self.db.fs, 5)
|
||||
with self.assertRaises(NoFile):
|
||||
if not _IS_SYNC:
|
||||
await gout.open()
|
||||
gout.name
|
||||
|
||||
a = AsyncGridIn(self.db.fs)
|
||||
await a.close()
|
||||
|
||||
b = AsyncGridOut(self.db.fs, a._id)
|
||||
if not _IS_SYNC:
|
||||
await b.open()
|
||||
|
||||
self.assertEqual(a._id, b._id)
|
||||
self.assertEqual(0, b.length)
|
||||
self.assertEqual(None, b.content_type)
|
||||
self.assertEqual(None, b.name)
|
||||
self.assertEqual(None, b.filename)
|
||||
self.assertEqual(255 * 1024, b.chunk_size)
|
||||
self.assertTrue(isinstance(b.upload_date, datetime.datetime))
|
||||
self.assertEqual(None, b.aliases)
|
||||
self.assertEqual(None, b.metadata)
|
||||
self.assertEqual(None, b.md5)
|
||||
|
||||
for attr in [
|
||||
"_id",
|
||||
"name",
|
||||
"content_type",
|
||||
"length",
|
||||
"chunk_size",
|
||||
"upload_date",
|
||||
"aliases",
|
||||
"metadata",
|
||||
"md5",
|
||||
]:
|
||||
self.assertRaises(AttributeError, setattr, b, attr, 5)
|
||||
|
||||
async def test_grid_out_cursor_options(self):
|
||||
self.assertRaises(
|
||||
TypeError, AsyncGridOutCursor.__init__, self.db.fs, {}, projection={"filename": 1}
|
||||
)
|
||||
|
||||
cursor = AsyncGridOutCursor(self.db.fs, {})
|
||||
cursor_clone = cursor.clone()
|
||||
|
||||
cursor_dict = cursor.__dict__.copy()
|
||||
cursor_dict.pop("_session")
|
||||
cursor_clone_dict = cursor_clone.__dict__.copy()
|
||||
cursor_clone_dict.pop("_session")
|
||||
self.assertDictEqual(cursor_dict, cursor_clone_dict)
|
||||
|
||||
self.assertRaises(NotImplementedError, cursor.add_option, 0)
|
||||
self.assertRaises(NotImplementedError, cursor.remove_option, 0)
|
||||
|
||||
async def test_grid_out_custom_opts(self):
|
||||
one = AsyncGridIn(
|
||||
self.db.fs,
|
||||
_id=5,
|
||||
filename="my_file",
|
||||
contentType="text/html",
|
||||
chunkSize=1000,
|
||||
aliases=["foo"],
|
||||
metadata={"foo": 1, "bar": 2},
|
||||
bar=3,
|
||||
baz="hello",
|
||||
)
|
||||
await one.write(b"hello world")
|
||||
await one.close()
|
||||
|
||||
two = AsyncGridOut(self.db.fs, 5)
|
||||
|
||||
if not _IS_SYNC:
|
||||
await two.open()
|
||||
|
||||
self.assertEqual("my_file", two.name)
|
||||
self.assertEqual("my_file", two.filename)
|
||||
self.assertEqual(5, two._id)
|
||||
self.assertEqual(11, two.length)
|
||||
self.assertEqual("text/html", two.content_type)
|
||||
self.assertEqual(1000, two.chunk_size)
|
||||
self.assertTrue(isinstance(two.upload_date, datetime.datetime))
|
||||
self.assertEqual(["foo"], two.aliases)
|
||||
self.assertEqual({"foo": 1, "bar": 2}, two.metadata)
|
||||
self.assertEqual(3, two.bar)
|
||||
self.assertEqual(None, two.md5)
|
||||
|
||||
for attr in [
|
||||
"_id",
|
||||
"name",
|
||||
"content_type",
|
||||
"length",
|
||||
"chunk_size",
|
||||
"upload_date",
|
||||
"aliases",
|
||||
"metadata",
|
||||
"md5",
|
||||
]:
|
||||
self.assertRaises(AttributeError, setattr, two, attr, 5)
|
||||
|
||||
async def test_grid_out_file_document(self):
|
||||
one = AsyncGridIn(self.db.fs)
|
||||
await one.write(b"foo bar")
|
||||
await one.close()
|
||||
|
||||
two = AsyncGridOut(self.db.fs, file_document=await self.db.fs.files.find_one())
|
||||
self.assertEqual(b"foo bar", await two.read())
|
||||
|
||||
three = AsyncGridOut(self.db.fs, 5, file_document=await self.db.fs.files.find_one())
|
||||
self.assertEqual(b"foo bar", await three.read())
|
||||
|
||||
four = AsyncGridOut(self.db.fs, file_document={})
|
||||
with self.assertRaises(NoFile):
|
||||
if not _IS_SYNC:
|
||||
await four.open()
|
||||
four.name
|
||||
|
||||
async def test_write_file_like(self):
|
||||
one = AsyncGridIn(self.db.fs)
|
||||
await one.write(b"hello world")
|
||||
await one.close()
|
||||
|
||||
two = AsyncGridOut(self.db.fs, one._id)
|
||||
|
||||
three = AsyncGridIn(self.db.fs)
|
||||
await three.write(two)
|
||||
await three.close()
|
||||
|
||||
four = AsyncGridOut(self.db.fs, three._id)
|
||||
self.assertEqual(b"hello world", await four.read())
|
||||
|
||||
five = AsyncGridIn(self.db.fs, chunk_size=2)
|
||||
await five.write(b"hello")
|
||||
buffer = BytesIO(b" world")
|
||||
await five.write(buffer)
|
||||
await five.write(b" and mongodb")
|
||||
await five.close()
|
||||
self.assertEqual(
|
||||
b"hello world and mongodb", await AsyncGridOut(self.db.fs, five._id).read()
|
||||
)
|
||||
|
||||
async def test_write_lines(self):
|
||||
a = AsyncGridIn(self.db.fs)
|
||||
await a.writelines([b"hello ", b"world"])
|
||||
await a.close()
|
||||
|
||||
self.assertEqual(b"hello world", await AsyncGridOut(self.db.fs, a._id).read())
|
||||
|
||||
async def test_close(self):
|
||||
f = AsyncGridIn(self.db.fs)
|
||||
await f.close()
|
||||
with self.assertRaises(ValueError):
|
||||
await f.write("test")
|
||||
await f.close()
|
||||
|
||||
async def test_closed(self):
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=5)
|
||||
await f.write(b"Hello world.\nHow are you?")
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
if not _IS_SYNC:
|
||||
await g.open()
|
||||
self.assertFalse(g.closed)
|
||||
await g.read(1)
|
||||
self.assertFalse(g.closed)
|
||||
await g.read(100)
|
||||
self.assertFalse(g.closed)
|
||||
await g.close()
|
||||
self.assertTrue(g.closed)
|
||||
|
||||
async def test_multi_chunk_file(self):
|
||||
random_string = b"a" * (DEFAULT_CHUNK_SIZE + 1000)
|
||||
|
||||
f = AsyncGridIn(self.db.fs)
|
||||
await f.write(random_string)
|
||||
await f.close()
|
||||
|
||||
self.assertEqual(1, await self.db.fs.files.count_documents({}))
|
||||
self.assertEqual(2, await self.db.fs.chunks.count_documents({}))
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(random_string, await g.read())
|
||||
|
||||
# TODO: https://jira.mongodb.org/browse/PYTHON-4708
|
||||
@async_client_context.require_sync
|
||||
async def test_small_chunks(self):
|
||||
self.files = 0
|
||||
self.chunks = 0
|
||||
|
||||
async def helper(data):
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=1)
|
||||
await f.write(data)
|
||||
await f.close()
|
||||
|
||||
self.files += 1
|
||||
self.chunks += len(data)
|
||||
|
||||
self.assertEqual(self.files, await self.db.fs.files.count_documents({}))
|
||||
self.assertEqual(self.chunks, await self.db.fs.chunks.count_documents({}))
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(data, await g.read())
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(data, await g.read(10) + await g.read(10))
|
||||
return True
|
||||
|
||||
qcheck.check_unittest(self, helper, qcheck.gen_string(qcheck.gen_range(0, 20)))
|
||||
|
||||
async def test_seek(self):
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=3)
|
||||
await f.write(b"hello world")
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"hello world", await g.read())
|
||||
await g.seek(0)
|
||||
self.assertEqual(b"hello world", await g.read())
|
||||
await g.seek(1)
|
||||
self.assertEqual(b"ello world", await g.read())
|
||||
with self.assertRaises(IOError):
|
||||
await g.seek(-1)
|
||||
|
||||
await g.seek(-3, _SEEK_END)
|
||||
self.assertEqual(b"rld", await g.read())
|
||||
await g.seek(0, _SEEK_END)
|
||||
self.assertEqual(b"", await g.read())
|
||||
with self.assertRaises(IOError):
|
||||
await g.seek(-100, _SEEK_END)
|
||||
|
||||
await g.seek(3)
|
||||
await g.seek(3, _SEEK_CUR)
|
||||
self.assertEqual(b"world", await g.read())
|
||||
with self.assertRaises(IOError):
|
||||
await g.seek(-100, _SEEK_CUR)
|
||||
|
||||
async def test_tell(self):
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=3)
|
||||
await f.write(b"hello world")
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(0, g.tell())
|
||||
await g.read(0)
|
||||
self.assertEqual(0, g.tell())
|
||||
await g.read(1)
|
||||
self.assertEqual(1, g.tell())
|
||||
await g.read(2)
|
||||
self.assertEqual(3, g.tell())
|
||||
await g.read()
|
||||
self.assertEqual(g.length, g.tell())
|
||||
|
||||
async def test_multiple_reads(self):
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=3)
|
||||
await f.write(b"hello world")
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"he", await g.read(2))
|
||||
self.assertEqual(b"ll", await g.read(2))
|
||||
self.assertEqual(b"o ", await g.read(2))
|
||||
self.assertEqual(b"wo", await g.read(2))
|
||||
self.assertEqual(b"rl", await g.read(2))
|
||||
self.assertEqual(b"d", await g.read(2))
|
||||
self.assertEqual(b"", await g.read(2))
|
||||
|
||||
async def test_readline(self):
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=5)
|
||||
await f.write(
|
||||
b"""Hello world,
|
||||
How are you?
|
||||
Hope all is well.
|
||||
Bye"""
|
||||
)
|
||||
await f.close()
|
||||
|
||||
# Try read(), then readline().
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"H", await g.read(1))
|
||||
self.assertEqual(b"ello world,\n", await g.readline())
|
||||
self.assertEqual(b"How a", await g.readline(5))
|
||||
self.assertEqual(b"", await g.readline(0))
|
||||
self.assertEqual(b"re you?\n", await g.readline())
|
||||
self.assertEqual(b"Hope all is well.\n", await g.readline(1000))
|
||||
self.assertEqual(b"Bye", await g.readline())
|
||||
self.assertEqual(b"", await g.readline())
|
||||
|
||||
# Try readline() first, then read().
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"He", await g.readline(2))
|
||||
self.assertEqual(b"l", await g.read(1))
|
||||
self.assertEqual(b"lo", await g.readline(2))
|
||||
self.assertEqual(b" world,\n", await g.readline())
|
||||
|
||||
# Only readline().
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"H", await g.readline(1))
|
||||
self.assertEqual(b"e", await g.readline(1))
|
||||
self.assertEqual(b"llo world,\n", await g.readline())
|
||||
|
||||
async def test_readlines(self):
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=5)
|
||||
await f.write(
|
||||
b"""Hello world,
|
||||
How are you?
|
||||
Hope all is well.
|
||||
Bye"""
|
||||
)
|
||||
await f.close()
|
||||
|
||||
# Try read(), then readlines().
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"He", await g.read(2))
|
||||
self.assertEqual([b"llo world,\n", b"How are you?\n"], await g.readlines(11))
|
||||
self.assertEqual([b"Hope all is well.\n", b"Bye"], await g.readlines())
|
||||
self.assertEqual([], await g.readlines())
|
||||
|
||||
# Try readline(), then readlines().
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"Hello world,\n", await g.readline())
|
||||
self.assertEqual([b"How are you?\n", b"Hope all is well.\n"], await g.readlines(13))
|
||||
self.assertEqual(b"Bye", await g.readline())
|
||||
self.assertEqual([], await g.readlines())
|
||||
|
||||
# Only readlines().
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(
|
||||
[b"Hello world,\n", b"How are you?\n", b"Hope all is well.\n", b"Bye"],
|
||||
await g.readlines(),
|
||||
)
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(
|
||||
[b"Hello world,\n", b"How are you?\n", b"Hope all is well.\n", b"Bye"],
|
||||
await g.readlines(0),
|
||||
)
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual([b"Hello world,\n"], await g.readlines(1))
|
||||
self.assertEqual([b"How are you?\n"], await g.readlines(12))
|
||||
self.assertEqual([b"Hope all is well.\n", b"Bye"], await g.readlines(18))
|
||||
|
||||
# Try readlines() first, then read().
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual([b"Hello world,\n"], await g.readlines(1))
|
||||
self.assertEqual(b"H", await g.read(1))
|
||||
self.assertEqual([b"ow are you?\n", b"Hope all is well.\n"], await g.readlines(29))
|
||||
self.assertEqual([b"Bye"], await g.readlines(1))
|
||||
|
||||
# Try readlines() first, then readline().
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual([b"Hello world,\n"], await g.readlines(1))
|
||||
self.assertEqual(b"How are you?\n", await g.readline())
|
||||
self.assertEqual([b"Hope all is well.\n"], await g.readlines(17))
|
||||
self.assertEqual(b"Bye", await g.readline())
|
||||
|
||||
async def test_iterator(self):
|
||||
f = AsyncGridIn(self.db.fs)
|
||||
await f.close()
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
if _IS_SYNC:
|
||||
self.assertEqual([], list(g))
|
||||
else:
|
||||
self.assertEqual([], await g.to_list())
|
||||
|
||||
f = AsyncGridIn(self.db.fs)
|
||||
await f.write(b"hello world\nhere are\nsome lines.")
|
||||
await f.close()
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
if _IS_SYNC:
|
||||
self.assertEqual([b"hello world\n", b"here are\n", b"some lines."], list(g))
|
||||
else:
|
||||
self.assertEqual([b"hello world\n", b"here are\n", b"some lines."], await g.to_list())
|
||||
|
||||
self.assertEqual(b"", await g.read(5))
|
||||
if _IS_SYNC:
|
||||
self.assertEqual([], list(g))
|
||||
else:
|
||||
self.assertEqual([], await g.to_list())
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"hello world\n", await anext(aiter(g)))
|
||||
self.assertEqual(b"here", await g.read(4))
|
||||
self.assertEqual(b" are\n", await anext(aiter(g)))
|
||||
self.assertEqual(b"some lines", await g.read(10))
|
||||
self.assertEqual(b".", await anext(aiter(g)))
|
||||
with self.assertRaises(StopAsyncIteration):
|
||||
await aiter(g).__anext__()
|
||||
|
||||
f = AsyncGridIn(self.db.fs, chunk_size=2)
|
||||
await f.write(b"hello world")
|
||||
await f.close()
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
if _IS_SYNC:
|
||||
self.assertEqual([b"hello world"], list(g))
|
||||
else:
|
||||
self.assertEqual([b"hello world"], await g.to_list())
|
||||
|
||||
async def test_read_unaligned_buffer_size(self):
|
||||
in_data = b"This is a text that doesn't quite fit in a single 16-byte chunk."
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=16)
|
||||
await f.write(in_data)
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
out_data = b""
|
||||
while 1:
|
||||
s = await g.read(13)
|
||||
if not s:
|
||||
break
|
||||
out_data += s
|
||||
|
||||
self.assertEqual(in_data, out_data)
|
||||
|
||||
async def test_readchunk(self):
|
||||
in_data = b"a" * 10
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=3)
|
||||
await f.write(in_data)
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(3, len(await g.readchunk()))
|
||||
|
||||
self.assertEqual(2, len(await g.read(2)))
|
||||
self.assertEqual(1, len(await g.readchunk()))
|
||||
|
||||
self.assertEqual(3, len(await g.read(3)))
|
||||
|
||||
self.assertEqual(1, len(await g.readchunk()))
|
||||
|
||||
self.assertEqual(0, len(await g.readchunk()))
|
||||
|
||||
async def test_write_unicode(self):
|
||||
f = AsyncGridIn(self.db.fs)
|
||||
with self.assertRaises(TypeError):
|
||||
await f.write("foo")
|
||||
|
||||
f = AsyncGridIn(self.db.fs, encoding="utf-8")
|
||||
await f.write("foo")
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"foo", await g.read())
|
||||
|
||||
f = AsyncGridIn(self.db.fs, encoding="iso-8859-1")
|
||||
await f.write("aé")
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
self.assertEqual("aé".encode("iso-8859-1"), await g.read())
|
||||
|
||||
async def test_set_after_close(self):
|
||||
f = AsyncGridIn(self.db.fs, _id="foo", bar="baz")
|
||||
|
||||
self.assertEqual("foo", f._id)
|
||||
self.assertEqual("baz", f.bar)
|
||||
self.assertRaises(AttributeError, getattr, f, "baz")
|
||||
self.assertRaises(AttributeError, getattr, f, "uploadDate")
|
||||
|
||||
self.assertRaises(AttributeError, setattr, f, "_id", 5)
|
||||
if _IS_SYNC:
|
||||
f.bar = "foo"
|
||||
f.baz = 5
|
||||
else:
|
||||
await f.set("bar", "foo")
|
||||
await f.set("baz", 5)
|
||||
|
||||
self.assertEqual("foo", f._id)
|
||||
self.assertEqual("foo", f.bar)
|
||||
self.assertEqual(5, f.baz)
|
||||
self.assertRaises(AttributeError, getattr, f, "uploadDate")
|
||||
|
||||
await f.close()
|
||||
|
||||
self.assertEqual("foo", f._id)
|
||||
self.assertEqual("foo", f.bar)
|
||||
self.assertEqual(5, f.baz)
|
||||
self.assertTrue(f.uploadDate)
|
||||
|
||||
self.assertRaises(AttributeError, setattr, f, "_id", 5)
|
||||
if _IS_SYNC:
|
||||
f.bar = "a"
|
||||
f.baz = "b"
|
||||
else:
|
||||
await f.set("bar", "a")
|
||||
await f.set("baz", "b")
|
||||
self.assertRaises(AttributeError, setattr, f, "upload_date", 5)
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
if not _IS_SYNC:
|
||||
await g.open()
|
||||
self.assertEqual("a", g.bar)
|
||||
self.assertEqual("b", g.baz)
|
||||
# Versions 2.0.1 and older saved a _closed field for some reason.
|
||||
self.assertRaises(AttributeError, getattr, g, "_closed")
|
||||
|
||||
async def test_context_manager(self):
|
||||
contents = b"Imagine this is some important data..."
|
||||
|
||||
async with AsyncGridIn(self.db.fs, filename="important") as infile:
|
||||
await infile.write(contents)
|
||||
|
||||
async with AsyncGridOut(self.db.fs, infile._id) as outfile:
|
||||
self.assertEqual(contents, await outfile.read())
|
||||
|
||||
async def test_exception_file_non_existence(self):
|
||||
contents = b"Imagine this is some important data..."
|
||||
|
||||
with self.assertRaises(ConnectionError):
|
||||
async with AsyncGridIn(self.db.fs, filename="important") as infile:
|
||||
await infile.write(contents)
|
||||
raise ConnectionError("Test exception")
|
||||
|
||||
# Expectation: File chunks are written, entry in files doesn't appear.
|
||||
self.assertEqual(
|
||||
await self.db.fs.chunks.count_documents({"files_id": infile._id}), infile._chunk_number
|
||||
)
|
||||
|
||||
self.assertIsNone(await self.db.fs.files.find_one({"_id": infile._id}))
|
||||
self.assertTrue(infile.closed)
|
||||
|
||||
async def test_prechunked_string(self):
|
||||
async def write_me(s, chunk_size):
|
||||
buf = BytesIO(s)
|
||||
infile = AsyncGridIn(self.db.fs)
|
||||
while True:
|
||||
to_write = buf.read(chunk_size)
|
||||
if to_write == b"":
|
||||
break
|
||||
await infile.write(to_write)
|
||||
await infile.close()
|
||||
buf.close()
|
||||
|
||||
outfile = AsyncGridOut(self.db.fs, infile._id)
|
||||
data = await outfile.read()
|
||||
self.assertEqual(s, data)
|
||||
|
||||
s = b"x" * DEFAULT_CHUNK_SIZE * 4
|
||||
# Test with default chunk size
|
||||
await write_me(s, DEFAULT_CHUNK_SIZE)
|
||||
# Multiple
|
||||
await write_me(s, DEFAULT_CHUNK_SIZE * 3)
|
||||
# Custom
|
||||
await write_me(s, 262300)
|
||||
|
||||
async def test_grid_out_lazy_connect(self):
|
||||
fs = self.db.fs
|
||||
outfile = AsyncGridOut(fs, file_id=-1)
|
||||
with self.assertRaises(NoFile):
|
||||
await outfile.read()
|
||||
with self.assertRaises(NoFile):
|
||||
if not _IS_SYNC:
|
||||
await outfile.open()
|
||||
outfile.filename
|
||||
|
||||
infile = AsyncGridIn(fs, filename=1)
|
||||
await infile.close()
|
||||
|
||||
outfile = AsyncGridOut(fs, infile._id)
|
||||
await outfile.read()
|
||||
outfile.filename
|
||||
|
||||
outfile = AsyncGridOut(fs, infile._id)
|
||||
await outfile.readchunk()
|
||||
|
||||
async def test_grid_in_lazy_connect(self):
|
||||
client = AsyncMongoClient("badhost", connect=False, serverSelectionTimeoutMS=10)
|
||||
fs = client.db.fs
|
||||
infile = AsyncGridIn(fs, file_id=-1, chunk_size=1)
|
||||
with self.assertRaises(ServerSelectionTimeoutError):
|
||||
await infile.write(b"data")
|
||||
with self.assertRaises(ServerSelectionTimeoutError):
|
||||
await infile.close()
|
||||
|
||||
async def test_unacknowledged(self):
|
||||
# w=0 is prohibited.
|
||||
with self.assertRaises(ConfigurationError):
|
||||
AsyncGridIn((await async_rs_or_single_client(w=0)).pymongo_test.fs)
|
||||
|
||||
async def test_survive_cursor_not_found(self):
|
||||
# By default the find command returns 101 documents in the first batch.
|
||||
# Use 102 batches to cause a single getMore.
|
||||
chunk_size = 1024
|
||||
data = b"d" * (102 * chunk_size)
|
||||
listener = EventListener()
|
||||
client = await async_rs_or_single_client(event_listeners=[listener])
|
||||
db = client.pymongo_test
|
||||
async with AsyncGridIn(db.fs, chunk_size=chunk_size) as infile:
|
||||
await infile.write(data)
|
||||
|
||||
async with AsyncGridOut(db.fs, infile._id) as outfile:
|
||||
self.assertEqual(len(await outfile.readchunk()), chunk_size)
|
||||
|
||||
# Kill the cursor to simulate the cursor timing out on the server
|
||||
# when an application spends a long time between two calls to
|
||||
# readchunk().
|
||||
assert await client.address is not None
|
||||
await client._close_cursor_now(
|
||||
outfile._chunk_iter._cursor.cursor_id,
|
||||
_CursorAddress(await client.address, db.fs.chunks.full_name), # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
# Read the rest of the file without error.
|
||||
self.assertEqual(len(await outfile.read()), len(data) - chunk_size)
|
||||
|
||||
# Paranoid, ensure that a getMore was actually sent.
|
||||
self.assertIn("getMore", listener.started_command_names())
|
||||
|
||||
@async_client_context.require_sync
|
||||
async def test_zip(self):
|
||||
zf = BytesIO()
|
||||
z = zipfile.ZipFile(zf, "w")
|
||||
z.writestr("test.txt", b"hello world")
|
||||
z.close()
|
||||
zf.seek(0)
|
||||
|
||||
f = AsyncGridIn(self.db.fs, filename="test.zip")
|
||||
await f.write(zf)
|
||||
await f.close()
|
||||
self.assertEqual(1, await self.db.fs.files.count_documents({}))
|
||||
self.assertEqual(1, await self.db.fs.chunks.count_documents({}))
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
z = zipfile.ZipFile(g)
|
||||
self.assertSequenceEqual(z.namelist(), ["test.txt"])
|
||||
self.assertEqual(z.read("test.txt"), b"hello world")
|
||||
|
||||
async def test_grid_out_unsupported_operations(self):
|
||||
f = AsyncGridIn(self.db.fs, chunkSize=3)
|
||||
await f.write(b"hello world")
|
||||
await f.close()
|
||||
|
||||
g = AsyncGridOut(self.db.fs, f._id)
|
||||
|
||||
self.assertRaises(io.UnsupportedOperation, g.writelines, [b"some", b"lines"])
|
||||
self.assertRaises(io.UnsupportedOperation, g.write, b"some text")
|
||||
self.assertRaises(io.UnsupportedOperation, g.fileno)
|
||||
self.assertRaises(io.UnsupportedOperation, g.truncate)
|
||||
|
||||
self.assertFalse(g.writable())
|
||||
self.assertFalse(g.isatty())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@ -21,6 +21,7 @@ import io
|
||||
import sys
|
||||
import zipfile
|
||||
from io import BytesIO
|
||||
from test import IntegrationTest, UnitTest, client_context
|
||||
|
||||
from pymongo.synchronous.database import Database
|
||||
|
||||
@ -36,16 +37,20 @@ from gridfs.synchronous.grid_file import (
|
||||
_SEEK_CUR,
|
||||
_SEEK_END,
|
||||
DEFAULT_CHUNK_SIZE,
|
||||
GridFS,
|
||||
GridIn,
|
||||
GridOut,
|
||||
GridOutCursor,
|
||||
)
|
||||
from pymongo import MongoClient
|
||||
from pymongo.errors import ConfigurationError, ServerSelectionTimeoutError
|
||||
from pymongo.errors import ConfigurationError, InvalidOperation, ServerSelectionTimeoutError
|
||||
from pymongo.message import _CursorAddress
|
||||
from pymongo.synchronous.helpers import iter, next
|
||||
|
||||
_IS_SYNC = True
|
||||
|
||||
|
||||
class TestGridFileNoConnect(unittest.TestCase):
|
||||
class TestGridFileNoConnect(UnitTest):
|
||||
"""Test GridFile features on a client that does not connect."""
|
||||
|
||||
db: Database
|
||||
@ -151,6 +156,7 @@ class TestGridFile(IntegrationTest):
|
||||
|
||||
self.assertEqual(None, a.content_type)
|
||||
a.content_type = "text/html"
|
||||
|
||||
self.assertEqual("text/html", a.content_type)
|
||||
|
||||
self.assertRaises(AttributeError, getattr, a, "length")
|
||||
@ -164,17 +170,24 @@ class TestGridFile(IntegrationTest):
|
||||
|
||||
self.assertRaises(AttributeError, getattr, a, "aliases")
|
||||
a.aliases = ["foo"]
|
||||
|
||||
self.assertEqual(["foo"], a.aliases)
|
||||
|
||||
self.assertRaises(AttributeError, getattr, a, "metadata")
|
||||
a.metadata = {"foo": 1}
|
||||
|
||||
self.assertEqual({"foo": 1}, a.metadata)
|
||||
|
||||
self.assertRaises(AttributeError, setattr, a, "md5", 5)
|
||||
|
||||
a.close()
|
||||
|
||||
a.forty_two = 42
|
||||
if _IS_SYNC:
|
||||
a.forty_two = 42
|
||||
else:
|
||||
self.assertRaises(AttributeError, setattr, a, "forty_two", 42)
|
||||
a.set("forty_two", 42)
|
||||
|
||||
self.assertEqual(42, a.forty_two)
|
||||
|
||||
self.assertTrue(isinstance(a._id, ObjectId))
|
||||
@ -213,12 +226,16 @@ class TestGridFile(IntegrationTest):
|
||||
|
||||
gout = GridOut(self.db.fs, 5)
|
||||
with self.assertRaises(NoFile):
|
||||
if not _IS_SYNC:
|
||||
gout.open()
|
||||
gout.name
|
||||
|
||||
a = GridIn(self.db.fs)
|
||||
a.close()
|
||||
|
||||
b = GridOut(self.db.fs, a._id)
|
||||
if not _IS_SYNC:
|
||||
b.open()
|
||||
|
||||
self.assertEqual(a._id, b._id)
|
||||
self.assertEqual(0, b.length)
|
||||
@ -278,6 +295,9 @@ class TestGridFile(IntegrationTest):
|
||||
|
||||
two = GridOut(self.db.fs, 5)
|
||||
|
||||
if not _IS_SYNC:
|
||||
two.open()
|
||||
|
||||
self.assertEqual("my_file", two.name)
|
||||
self.assertEqual("my_file", two.filename)
|
||||
self.assertEqual(5, two._id)
|
||||
@ -316,6 +336,8 @@ class TestGridFile(IntegrationTest):
|
||||
|
||||
four = GridOut(self.db.fs, file_document={})
|
||||
with self.assertRaises(NoFile):
|
||||
if not _IS_SYNC:
|
||||
four.open()
|
||||
four.name
|
||||
|
||||
def test_write_file_like(self):
|
||||
@ -350,7 +372,8 @@ class TestGridFile(IntegrationTest):
|
||||
def test_close(self):
|
||||
f = GridIn(self.db.fs)
|
||||
f.close()
|
||||
self.assertRaises(ValueError, f.write, "test")
|
||||
with self.assertRaises(ValueError):
|
||||
f.write("test")
|
||||
f.close()
|
||||
|
||||
def test_closed(self):
|
||||
@ -359,6 +382,8 @@ class TestGridFile(IntegrationTest):
|
||||
f.close()
|
||||
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
if not _IS_SYNC:
|
||||
g.open()
|
||||
self.assertFalse(g.closed)
|
||||
g.read(1)
|
||||
self.assertFalse(g.closed)
|
||||
@ -380,6 +405,8 @@ class TestGridFile(IntegrationTest):
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual(random_string, g.read())
|
||||
|
||||
# TODO: https://jira.mongodb.org/browse/PYTHON-4708
|
||||
@client_context.require_sync
|
||||
def test_small_chunks(self):
|
||||
self.files = 0
|
||||
self.chunks = 0
|
||||
@ -415,18 +442,21 @@ class TestGridFile(IntegrationTest):
|
||||
self.assertEqual(b"hello world", g.read())
|
||||
g.seek(1)
|
||||
self.assertEqual(b"ello world", g.read())
|
||||
self.assertRaises(IOError, g.seek, -1)
|
||||
with self.assertRaises(IOError):
|
||||
g.seek(-1)
|
||||
|
||||
g.seek(-3, _SEEK_END)
|
||||
self.assertEqual(b"rld", g.read())
|
||||
g.seek(0, _SEEK_END)
|
||||
self.assertEqual(b"", g.read())
|
||||
self.assertRaises(IOError, g.seek, -100, _SEEK_END)
|
||||
with self.assertRaises(IOError):
|
||||
g.seek(-100, _SEEK_END)
|
||||
|
||||
g.seek(3)
|
||||
g.seek(3, _SEEK_CUR)
|
||||
self.assertEqual(b"world", g.read())
|
||||
self.assertRaises(IOError, g.seek, -100, _SEEK_CUR)
|
||||
with self.assertRaises(IOError):
|
||||
g.seek(-100, _SEEK_CUR)
|
||||
|
||||
def test_tell(self):
|
||||
f = GridIn(self.db.fs, chunkSize=3)
|
||||
@ -519,12 +549,14 @@ Bye"""
|
||||
# Only readlines().
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual(
|
||||
[b"Hello world,\n", b"How are you?\n", b"Hope all is well.\n", b"Bye"], g.readlines()
|
||||
[b"Hello world,\n", b"How are you?\n", b"Hope all is well.\n", b"Bye"],
|
||||
g.readlines(),
|
||||
)
|
||||
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual(
|
||||
[b"Hello world,\n", b"How are you?\n", b"Hope all is well.\n", b"Bye"], g.readlines(0)
|
||||
[b"Hello world,\n", b"How are you?\n", b"Hope all is well.\n", b"Bye"],
|
||||
g.readlines(0),
|
||||
)
|
||||
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
@ -550,15 +582,25 @@ Bye"""
|
||||
f = GridIn(self.db.fs)
|
||||
f.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual([], list(g))
|
||||
if _IS_SYNC:
|
||||
self.assertEqual([], list(g))
|
||||
else:
|
||||
self.assertEqual([], g.to_list())
|
||||
|
||||
f = GridIn(self.db.fs)
|
||||
f.write(b"hello world\nhere are\nsome lines.")
|
||||
f.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual([b"hello world\n", b"here are\n", b"some lines."], list(g))
|
||||
if _IS_SYNC:
|
||||
self.assertEqual([b"hello world\n", b"here are\n", b"some lines."], list(g))
|
||||
else:
|
||||
self.assertEqual([b"hello world\n", b"here are\n", b"some lines."], g.to_list())
|
||||
|
||||
self.assertEqual(b"", g.read(5))
|
||||
self.assertEqual([], list(g))
|
||||
if _IS_SYNC:
|
||||
self.assertEqual([], list(g))
|
||||
else:
|
||||
self.assertEqual([], g.to_list())
|
||||
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual(b"hello world\n", next(iter(g)))
|
||||
@ -566,13 +608,17 @@ Bye"""
|
||||
self.assertEqual(b" are\n", next(iter(g)))
|
||||
self.assertEqual(b"some lines", g.read(10))
|
||||
self.assertEqual(b".", next(iter(g)))
|
||||
self.assertRaises(StopIteration, iter(g).__next__)
|
||||
with self.assertRaises(StopIteration):
|
||||
iter(g).__next__()
|
||||
|
||||
f = GridIn(self.db.fs, chunk_size=2)
|
||||
f.write(b"hello world")
|
||||
f.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual([b"hello world"], list(g))
|
||||
if _IS_SYNC:
|
||||
self.assertEqual([b"hello world"], list(g))
|
||||
else:
|
||||
self.assertEqual([b"hello world"], g.to_list())
|
||||
|
||||
def test_read_unaligned_buffer_size(self):
|
||||
in_data = b"This is a text that doesn't quite fit in a single 16-byte chunk."
|
||||
@ -610,7 +656,8 @@ Bye"""
|
||||
|
||||
def test_write_unicode(self):
|
||||
f = GridIn(self.db.fs)
|
||||
self.assertRaises(TypeError, f.write, "foo")
|
||||
with self.assertRaises(TypeError):
|
||||
f.write("foo")
|
||||
|
||||
f = GridIn(self.db.fs, encoding="utf-8")
|
||||
f.write("foo")
|
||||
@ -635,8 +682,12 @@ Bye"""
|
||||
self.assertRaises(AttributeError, getattr, f, "uploadDate")
|
||||
|
||||
self.assertRaises(AttributeError, setattr, f, "_id", 5)
|
||||
f.bar = "foo"
|
||||
f.baz = 5
|
||||
if _IS_SYNC:
|
||||
f.bar = "foo"
|
||||
f.baz = 5
|
||||
else:
|
||||
f.set("bar", "foo")
|
||||
f.set("baz", 5)
|
||||
|
||||
self.assertEqual("foo", f._id)
|
||||
self.assertEqual("foo", f.bar)
|
||||
@ -651,11 +702,17 @@ Bye"""
|
||||
self.assertTrue(f.uploadDate)
|
||||
|
||||
self.assertRaises(AttributeError, setattr, f, "_id", 5)
|
||||
f.bar = "a"
|
||||
f.baz = "b"
|
||||
if _IS_SYNC:
|
||||
f.bar = "a"
|
||||
f.baz = "b"
|
||||
else:
|
||||
f.set("bar", "a")
|
||||
f.set("baz", "b")
|
||||
self.assertRaises(AttributeError, setattr, f, "upload_date", 5)
|
||||
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
if not _IS_SYNC:
|
||||
g.open()
|
||||
self.assertEqual("a", g.bar)
|
||||
self.assertEqual("b", g.baz)
|
||||
# Versions 2.0.1 and older saved a _closed field for some reason.
|
||||
@ -713,8 +770,12 @@ Bye"""
|
||||
def test_grid_out_lazy_connect(self):
|
||||
fs = self.db.fs
|
||||
outfile = GridOut(fs, file_id=-1)
|
||||
self.assertRaises(NoFile, outfile.read)
|
||||
self.assertRaises(NoFile, getattr, outfile, "filename")
|
||||
with self.assertRaises(NoFile):
|
||||
outfile.read()
|
||||
with self.assertRaises(NoFile):
|
||||
if not _IS_SYNC:
|
||||
outfile.open()
|
||||
outfile.filename
|
||||
|
||||
infile = GridIn(fs, filename=1)
|
||||
infile.close()
|
||||
@ -730,13 +791,15 @@ Bye"""
|
||||
client = MongoClient("badhost", connect=False, serverSelectionTimeoutMS=10)
|
||||
fs = client.db.fs
|
||||
infile = GridIn(fs, file_id=-1, chunk_size=1)
|
||||
self.assertRaises(ServerSelectionTimeoutError, infile.write, b"data")
|
||||
self.assertRaises(ServerSelectionTimeoutError, infile.close)
|
||||
with self.assertRaises(ServerSelectionTimeoutError):
|
||||
infile.write(b"data")
|
||||
with self.assertRaises(ServerSelectionTimeoutError):
|
||||
infile.close()
|
||||
|
||||
def test_unacknowledged(self):
|
||||
# w=0 is prohibited.
|
||||
with self.assertRaises(ConfigurationError):
|
||||
GridIn(rs_or_single_client(w=0).pymongo_test.fs)
|
||||
GridIn((rs_or_single_client(w=0)).pymongo_test.fs)
|
||||
|
||||
def test_survive_cursor_not_found(self):
|
||||
# By default the find command returns 101 documents in the first batch.
|
||||
@ -758,7 +821,7 @@ Bye"""
|
||||
assert client.address is not None
|
||||
client._close_cursor_now(
|
||||
outfile._chunk_iter._cursor.cursor_id,
|
||||
_CursorAddress(client.address, db.fs.chunks.full_name),
|
||||
_CursorAddress(client.address, db.fs.chunks.full_name), # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
# Read the rest of the file without error.
|
||||
@ -767,6 +830,7 @@ Bye"""
|
||||
# Paranoid, ensure that a getMore was actually sent.
|
||||
self.assertIn("getMore", listener.started_command_names())
|
||||
|
||||
@client_context.require_sync
|
||||
def test_zip(self):
|
||||
zf = BytesIO()
|
||||
z = zipfile.ZipFile(zf, "w")
|
||||
|
||||
@ -47,6 +47,7 @@ replacements = {
|
||||
"asynchronous": "synchronous",
|
||||
"Asynchronous": "Synchronous",
|
||||
"anext": "next",
|
||||
"aiter": "iter",
|
||||
"_ALock": "_Lock",
|
||||
"_ACondition": "_Condition",
|
||||
"AsyncGridFS": "GridFS",
|
||||
@ -98,6 +99,8 @@ replacements = {
|
||||
"default_async": "default",
|
||||
"aclose": "close",
|
||||
"PyMongo|async": "PyMongo",
|
||||
"AsyncTestGridFile": "TestGridFile",
|
||||
"AsyncTestGridFileNoConnect": "TestGridFileNoConnect",
|
||||
}
|
||||
|
||||
docstring_replacements: dict[tuple[str, str], str] = {
|
||||
@ -160,6 +163,7 @@ converted_tests = [
|
||||
"test_cursor.py",
|
||||
"test_database.py",
|
||||
"test_encryption.py",
|
||||
"test_grid_file.py",
|
||||
"test_logger.py",
|
||||
"test_session.py",
|
||||
"test_transactions.py",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user