mongo-python-driver/test/asynchronous/test_grid_file.py
Noah Stapp a1ade45dd3
PYTHON-4881 - Use OvertCommandListener wherever sensitive events are not needed (#1943)
Co-authored-by: Steven Silvester <steven.silvester@ieee.org>
2024-10-18 13:32:09 -04:00

874 lines
29 KiB
Python

#
# Copyright 2009-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the grid_file module."""
from __future__ import annotations
import datetime
import io
import sys
import zipfile
from io import BytesIO
from test.asynchronous import (
AsyncIntegrationTest,
AsyncUnitTest,
async_client_context,
qcheck,
unittest,
)
from pymongo.asynchronous.database import AsyncDatabase
sys.path[0:0] = [""]
from test.utils import OvertCommandListener
from bson.objectid import ObjectId
from gridfs.asynchronous.grid_file import (
_SEEK_CUR,
_SEEK_END,
DEFAULT_CHUNK_SIZE,
AsyncGridFS,
AsyncGridIn,
AsyncGridOut,
AsyncGridOutCursor,
)
from gridfs.errors import NoFile
from pymongo import AsyncMongoClient
from pymongo.asynchronous.helpers import aiter, anext
from pymongo.errors import ConfigurationError, ServerSelectionTimeoutError
from pymongo.message import _CursorAddress
_IS_SYNC = False
class AsyncTestGridFileNoConnect(AsyncUnitTest):
"""Test GridFile features on a client that does not connect."""
db: AsyncDatabase
@classmethod
def setUpClass(cls):
cls.db = AsyncMongoClient(connect=False).pymongo_test
def test_grid_in_custom_opts(self):
self.assertRaises(TypeError, AsyncGridIn, "foo")
a = AsyncGridIn(
self.db.fs,
_id=5,
filename="my_file",
contentType="text/html",
chunkSize=1000,
aliases=["foo"],
metadata={"foo": 1, "bar": 2},
bar=3,
baz="hello",
)
self.assertEqual(5, a._id)
self.assertEqual("my_file", a.filename)
self.assertEqual("my_file", a.name)
self.assertEqual("text/html", a.content_type)
self.assertEqual(1000, a.chunk_size)
self.assertEqual(["foo"], a.aliases)
self.assertEqual({"foo": 1, "bar": 2}, a.metadata)
self.assertEqual(3, a.bar)
self.assertEqual("hello", a.baz)
self.assertRaises(AttributeError, getattr, a, "mike")
b = AsyncGridIn(self.db.fs, content_type="text/html", chunk_size=1000, baz=100)
self.assertEqual("text/html", b.content_type)
self.assertEqual(1000, b.chunk_size)
self.assertEqual(100, b.baz)
class AsyncTestGridFile(AsyncIntegrationTest):
async def asyncSetUp(self):
await self.cleanup_colls(self.db.fs.files, self.db.fs.chunks)
async def test_basic(self):
f = AsyncGridIn(self.db.fs, filename="test")
await f.write(b"hello world")
await f.close()
self.assertEqual(1, await self.db.fs.files.count_documents({}))
self.assertEqual(1, await self.db.fs.chunks.count_documents({}))
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"hello world", await g.read())
# make sure it's still there...
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"hello world", await g.read())
f = AsyncGridIn(self.db.fs, filename="test")
await f.close()
self.assertEqual(2, await self.db.fs.files.count_documents({}))
self.assertEqual(1, await self.db.fs.chunks.count_documents({}))
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"", await g.read())
# test that reading 0 returns proper type
self.assertEqual(b"", await g.read(0))
async def test_md5(self):
f = AsyncGridIn(self.db.fs)
await f.write(b"hello world\n")
await f.close()
self.assertEqual(None, f.md5)
async def test_alternate_collection(self):
await self.db.alt.files.delete_many({})
await self.db.alt.chunks.delete_many({})
f = AsyncGridIn(self.db.alt)
await f.write(b"hello world")
await f.close()
self.assertEqual(1, await self.db.alt.files.count_documents({}))
self.assertEqual(1, await self.db.alt.chunks.count_documents({}))
g = AsyncGridOut(self.db.alt, f._id)
self.assertEqual(b"hello world", await g.read())
async def test_grid_in_default_opts(self):
self.assertRaises(TypeError, AsyncGridIn, "foo")
a = AsyncGridIn(self.db.fs)
self.assertTrue(isinstance(a._id, ObjectId))
self.assertRaises(AttributeError, setattr, a, "_id", 5)
self.assertEqual(None, a.filename)
self.assertEqual(None, a.name)
a.filename = "my_file"
self.assertEqual("my_file", a.filename)
self.assertEqual("my_file", a.name)
self.assertEqual(None, a.content_type)
a.content_type = "text/html"
self.assertEqual("text/html", a.content_type)
self.assertRaises(AttributeError, getattr, a, "length")
self.assertRaises(AttributeError, setattr, a, "length", 5)
self.assertEqual(255 * 1024, a.chunk_size)
self.assertRaises(AttributeError, setattr, a, "chunk_size", 5)
self.assertRaises(AttributeError, getattr, a, "upload_date")
self.assertRaises(AttributeError, setattr, a, "upload_date", 5)
self.assertRaises(AttributeError, getattr, a, "aliases")
a.aliases = ["foo"]
self.assertEqual(["foo"], a.aliases)
self.assertRaises(AttributeError, getattr, a, "metadata")
a.metadata = {"foo": 1}
self.assertEqual({"foo": 1}, a.metadata)
self.assertRaises(AttributeError, setattr, a, "md5", 5)
await a.close()
if _IS_SYNC:
a.forty_two = 42
else:
self.assertRaises(AttributeError, setattr, a, "forty_two", 42)
await a.set("forty_two", 42)
self.assertEqual(42, a.forty_two)
self.assertTrue(isinstance(a._id, ObjectId))
self.assertRaises(AttributeError, setattr, a, "_id", 5)
self.assertEqual("my_file", a.filename)
self.assertEqual("my_file", a.name)
self.assertEqual("text/html", a.content_type)
self.assertEqual(0, a.length)
self.assertRaises(AttributeError, setattr, a, "length", 5)
self.assertEqual(255 * 1024, a.chunk_size)
self.assertRaises(AttributeError, setattr, a, "chunk_size", 5)
self.assertTrue(isinstance(a.upload_date, datetime.datetime))
self.assertRaises(AttributeError, setattr, a, "upload_date", 5)
self.assertEqual(["foo"], a.aliases)
self.assertEqual({"foo": 1}, a.metadata)
self.assertEqual(None, a.md5)
self.assertRaises(AttributeError, setattr, a, "md5", 5)
# Make sure custom attributes that were set both before and after
# a.close() are reflected in b. PYTHON-411.
b = await AsyncGridFS(self.db).get_last_version(filename=a.filename)
self.assertEqual(a.metadata, b.metadata)
self.assertEqual(a.aliases, b.aliases)
self.assertEqual(a.forty_two, b.forty_two)
async def test_grid_out_default_opts(self):
self.assertRaises(TypeError, AsyncGridOut, "foo")
gout = AsyncGridOut(self.db.fs, 5)
with self.assertRaises(NoFile):
if not _IS_SYNC:
await gout.open()
gout.name
a = AsyncGridIn(self.db.fs)
await a.close()
b = AsyncGridOut(self.db.fs, a._id)
if not _IS_SYNC:
await b.open()
self.assertEqual(a._id, b._id)
self.assertEqual(0, b.length)
self.assertEqual(None, b.content_type)
self.assertEqual(None, b.name)
self.assertEqual(None, b.filename)
self.assertEqual(255 * 1024, b.chunk_size)
self.assertTrue(isinstance(b.upload_date, datetime.datetime))
self.assertEqual(None, b.aliases)
self.assertEqual(None, b.metadata)
self.assertEqual(None, b.md5)
for attr in [
"_id",
"name",
"content_type",
"length",
"chunk_size",
"upload_date",
"aliases",
"metadata",
"md5",
]:
self.assertRaises(AttributeError, setattr, b, attr, 5)
async def test_grid_out_cursor_options(self):
self.assertRaises(
TypeError, AsyncGridOutCursor.__init__, self.db.fs, {}, projection={"filename": 1}
)
cursor = AsyncGridOutCursor(self.db.fs, {})
cursor_clone = cursor.clone()
cursor_dict = cursor.__dict__.copy()
cursor_dict.pop("_session")
cursor_clone_dict = cursor_clone.__dict__.copy()
cursor_clone_dict.pop("_session")
self.assertDictEqual(cursor_dict, cursor_clone_dict)
self.assertRaises(NotImplementedError, cursor.add_option, 0)
self.assertRaises(NotImplementedError, cursor.remove_option, 0)
async def test_grid_out_custom_opts(self):
one = AsyncGridIn(
self.db.fs,
_id=5,
filename="my_file",
contentType="text/html",
chunkSize=1000,
aliases=["foo"],
metadata={"foo": 1, "bar": 2},
bar=3,
baz="hello",
)
await one.write(b"hello world")
await one.close()
two = AsyncGridOut(self.db.fs, 5)
if not _IS_SYNC:
await two.open()
self.assertEqual("my_file", two.name)
self.assertEqual("my_file", two.filename)
self.assertEqual(5, two._id)
self.assertEqual(11, two.length)
self.assertEqual("text/html", two.content_type)
self.assertEqual(1000, two.chunk_size)
self.assertTrue(isinstance(two.upload_date, datetime.datetime))
self.assertEqual(["foo"], two.aliases)
self.assertEqual({"foo": 1, "bar": 2}, two.metadata)
self.assertEqual(3, two.bar)
self.assertEqual(None, two.md5)
for attr in [
"_id",
"name",
"content_type",
"length",
"chunk_size",
"upload_date",
"aliases",
"metadata",
"md5",
]:
self.assertRaises(AttributeError, setattr, two, attr, 5)
async def test_grid_out_file_document(self):
one = AsyncGridIn(self.db.fs)
await one.write(b"foo bar")
await one.close()
two = AsyncGridOut(self.db.fs, file_document=await self.db.fs.files.find_one())
self.assertEqual(b"foo bar", await two.read())
three = AsyncGridOut(self.db.fs, 5, file_document=await self.db.fs.files.find_one())
self.assertEqual(b"foo bar", await three.read())
four = AsyncGridOut(self.db.fs, file_document={})
with self.assertRaises(NoFile):
if not _IS_SYNC:
await four.open()
four.name
async def test_write_file_like(self):
one = AsyncGridIn(self.db.fs)
await one.write(b"hello world")
await one.close()
two = AsyncGridOut(self.db.fs, one._id)
three = AsyncGridIn(self.db.fs)
await three.write(two)
await three.close()
four = AsyncGridOut(self.db.fs, three._id)
self.assertEqual(b"hello world", await four.read())
five = AsyncGridIn(self.db.fs, chunk_size=2)
await five.write(b"hello")
buffer = BytesIO(b" world")
await five.write(buffer)
await five.write(b" and mongodb")
await five.close()
self.assertEqual(
b"hello world and mongodb", await AsyncGridOut(self.db.fs, five._id).read()
)
async def test_write_lines(self):
a = AsyncGridIn(self.db.fs)
await a.writelines([b"hello ", b"world"])
await a.close()
self.assertEqual(b"hello world", await AsyncGridOut(self.db.fs, a._id).read())
async def test_close(self):
f = AsyncGridIn(self.db.fs)
await f.close()
with self.assertRaises(ValueError):
await f.write("test")
await f.close()
async def test_closed(self):
f = AsyncGridIn(self.db.fs, chunkSize=5)
await f.write(b"Hello world.\nHow are you?")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
if not _IS_SYNC:
await g.open()
self.assertFalse(g.closed)
await g.read(1)
self.assertFalse(g.closed)
await g.read(100)
self.assertFalse(g.closed)
await g.close()
self.assertTrue(g.closed)
async def test_multi_chunk_file(self):
random_string = b"a" * (DEFAULT_CHUNK_SIZE + 1000)
f = AsyncGridIn(self.db.fs)
await f.write(random_string)
await f.close()
self.assertEqual(1, await self.db.fs.files.count_documents({}))
self.assertEqual(2, await self.db.fs.chunks.count_documents({}))
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(random_string, await g.read())
async def test_small_chunks(self):
self.files = 0
self.chunks = 0
async def helper(data):
f = AsyncGridIn(self.db.fs, chunkSize=1)
await f.write(data)
await f.close()
self.files += 1
self.chunks += len(data)
self.assertEqual(self.files, await self.db.fs.files.count_documents({}))
self.assertEqual(self.chunks, await self.db.fs.chunks.count_documents({}))
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(data, await g.read())
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(data, await g.read(10) + await g.read(10))
return True
await qcheck.check_unittest(self, helper, qcheck.gen_string(qcheck.gen_range(0, 20)))
async def test_seek(self):
f = AsyncGridIn(self.db.fs, chunkSize=3)
await f.write(b"hello world")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"hello world", await g.read())
await g.seek(0)
self.assertEqual(b"hello world", await g.read())
await g.seek(1)
self.assertEqual(b"ello world", await g.read())
with self.assertRaises(IOError):
await g.seek(-1)
await g.seek(-3, _SEEK_END)
self.assertEqual(b"rld", await g.read())
await g.seek(0, _SEEK_END)
self.assertEqual(b"", await g.read())
with self.assertRaises(IOError):
await g.seek(-100, _SEEK_END)
await g.seek(3)
await g.seek(3, _SEEK_CUR)
self.assertEqual(b"world", await g.read())
with self.assertRaises(IOError):
await g.seek(-100, _SEEK_CUR)
async def test_tell(self):
f = AsyncGridIn(self.db.fs, chunkSize=3)
await f.write(b"hello world")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(0, g.tell())
await g.read(0)
self.assertEqual(0, g.tell())
await g.read(1)
self.assertEqual(1, g.tell())
await g.read(2)
self.assertEqual(3, g.tell())
await g.read()
self.assertEqual(g.length, g.tell())
async def test_multiple_reads(self):
f = AsyncGridIn(self.db.fs, chunkSize=3)
await f.write(b"hello world")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"he", await g.read(2))
self.assertEqual(b"ll", await g.read(2))
self.assertEqual(b"o ", await g.read(2))
self.assertEqual(b"wo", await g.read(2))
self.assertEqual(b"rl", await g.read(2))
self.assertEqual(b"d", await g.read(2))
self.assertEqual(b"", await g.read(2))
async def test_readline(self):
f = AsyncGridIn(self.db.fs, chunkSize=5)
await f.write(
b"""Hello world,
How are you?
Hope all is well.
Bye"""
)
await f.close()
# Try read(), then readline().
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"H", await g.read(1))
self.assertEqual(b"ello world,\n", await g.readline())
self.assertEqual(b"How a", await g.readline(5))
self.assertEqual(b"", await g.readline(0))
self.assertEqual(b"re you?\n", await g.readline())
self.assertEqual(b"Hope all is well.\n", await g.readline(1000))
self.assertEqual(b"Bye", await g.readline())
self.assertEqual(b"", await g.readline())
# Try readline() first, then read().
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"He", await g.readline(2))
self.assertEqual(b"l", await g.read(1))
self.assertEqual(b"lo", await g.readline(2))
self.assertEqual(b" world,\n", await g.readline())
# Only readline().
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"H", await g.readline(1))
self.assertEqual(b"e", await g.readline(1))
self.assertEqual(b"llo world,\n", await g.readline())
async def test_readlines(self):
f = AsyncGridIn(self.db.fs, chunkSize=5)
await f.write(
b"""Hello world,
How are you?
Hope all is well.
Bye"""
)
await f.close()
# Try read(), then readlines().
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"He", await g.read(2))
self.assertEqual([b"llo world,\n", b"How are you?\n"], await g.readlines(11))
self.assertEqual([b"Hope all is well.\n", b"Bye"], await g.readlines())
self.assertEqual([], await g.readlines())
# Try readline(), then readlines().
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"Hello world,\n", await g.readline())
self.assertEqual([b"How are you?\n", b"Hope all is well.\n"], await g.readlines(13))
self.assertEqual(b"Bye", await g.readline())
self.assertEqual([], await g.readlines())
# Only readlines().
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(
[b"Hello world,\n", b"How are you?\n", b"Hope all is well.\n", b"Bye"],
await g.readlines(),
)
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(
[b"Hello world,\n", b"How are you?\n", b"Hope all is well.\n", b"Bye"],
await g.readlines(0),
)
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual([b"Hello world,\n"], await g.readlines(1))
self.assertEqual([b"How are you?\n"], await g.readlines(12))
self.assertEqual([b"Hope all is well.\n", b"Bye"], await g.readlines(18))
# Try readlines() first, then read().
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual([b"Hello world,\n"], await g.readlines(1))
self.assertEqual(b"H", await g.read(1))
self.assertEqual([b"ow are you?\n", b"Hope all is well.\n"], await g.readlines(29))
self.assertEqual([b"Bye"], await g.readlines(1))
# Try readlines() first, then readline().
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual([b"Hello world,\n"], await g.readlines(1))
self.assertEqual(b"How are you?\n", await g.readline())
self.assertEqual([b"Hope all is well.\n"], await g.readlines(17))
self.assertEqual(b"Bye", await g.readline())
async def test_iterator(self):
f = AsyncGridIn(self.db.fs)
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
if _IS_SYNC:
self.assertEqual([], list(g))
else:
self.assertEqual([], await g.to_list())
f = AsyncGridIn(self.db.fs)
await f.write(b"hello world\nhere are\nsome lines.")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
if _IS_SYNC:
self.assertEqual([b"hello world\n", b"here are\n", b"some lines."], list(g))
else:
self.assertEqual([b"hello world\n", b"here are\n", b"some lines."], await g.to_list())
self.assertEqual(b"", await g.read(5))
if _IS_SYNC:
self.assertEqual([], list(g))
else:
self.assertEqual([], await g.to_list())
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"hello world\n", await anext(aiter(g)))
self.assertEqual(b"here", await g.read(4))
self.assertEqual(b" are\n", await anext(aiter(g)))
self.assertEqual(b"some lines", await g.read(10))
self.assertEqual(b".", await anext(aiter(g)))
with self.assertRaises(StopAsyncIteration):
await aiter(g).__anext__()
f = AsyncGridIn(self.db.fs, chunk_size=2)
await f.write(b"hello world")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
if _IS_SYNC:
self.assertEqual([b"hello world"], list(g))
else:
self.assertEqual([b"hello world"], await g.to_list())
async def test_read_unaligned_buffer_size(self):
in_data = b"This is a text that doesn't quite fit in a single 16-byte chunk."
f = AsyncGridIn(self.db.fs, chunkSize=16)
await f.write(in_data)
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
out_data = b""
while 1:
s = await g.read(13)
if not s:
break
out_data += s
self.assertEqual(in_data, out_data)
async def test_readchunk(self):
in_data = b"a" * 10
f = AsyncGridIn(self.db.fs, chunkSize=3)
await f.write(in_data)
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(3, len(await g.readchunk()))
self.assertEqual(2, len(await g.read(2)))
self.assertEqual(1, len(await g.readchunk()))
self.assertEqual(3, len(await g.read(3)))
self.assertEqual(1, len(await g.readchunk()))
self.assertEqual(0, len(await g.readchunk()))
async def test_write_unicode(self):
f = AsyncGridIn(self.db.fs)
with self.assertRaises(TypeError):
await f.write("foo")
f = AsyncGridIn(self.db.fs, encoding="utf-8")
await f.write("foo")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual(b"foo", await g.read())
f = AsyncGridIn(self.db.fs, encoding="iso-8859-1")
await f.write("")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
self.assertEqual("".encode("iso-8859-1"), await g.read())
async def test_set_after_close(self):
f = AsyncGridIn(self.db.fs, _id="foo", bar="baz")
self.assertEqual("foo", f._id)
self.assertEqual("baz", f.bar)
self.assertRaises(AttributeError, getattr, f, "baz")
self.assertRaises(AttributeError, getattr, f, "uploadDate")
self.assertRaises(AttributeError, setattr, f, "_id", 5)
if _IS_SYNC:
f.bar = "foo"
f.baz = 5
else:
await f.set("bar", "foo")
await f.set("baz", 5)
self.assertEqual("foo", f._id)
self.assertEqual("foo", f.bar)
self.assertEqual(5, f.baz)
self.assertRaises(AttributeError, getattr, f, "uploadDate")
await f.close()
self.assertEqual("foo", f._id)
self.assertEqual("foo", f.bar)
self.assertEqual(5, f.baz)
self.assertTrue(f.uploadDate)
self.assertRaises(AttributeError, setattr, f, "_id", 5)
if _IS_SYNC:
f.bar = "a"
f.baz = "b"
else:
await f.set("bar", "a")
await f.set("baz", "b")
self.assertRaises(AttributeError, setattr, f, "upload_date", 5)
g = AsyncGridOut(self.db.fs, f._id)
if not _IS_SYNC:
await g.open()
self.assertEqual("a", g.bar)
self.assertEqual("b", g.baz)
# Versions 2.0.1 and older saved a _closed field for some reason.
self.assertRaises(AttributeError, getattr, g, "_closed")
async def test_context_manager(self):
contents = b"Imagine this is some important data..."
async with AsyncGridIn(self.db.fs, filename="important") as infile:
await infile.write(contents)
async with AsyncGridOut(self.db.fs, infile._id) as outfile:
self.assertEqual(contents, await outfile.read())
async def test_exception_file_non_existence(self):
contents = b"Imagine this is some important data..."
with self.assertRaises(ConnectionError):
async with AsyncGridIn(self.db.fs, filename="important") as infile:
await infile.write(contents)
raise ConnectionError("Test exception")
# Expectation: File chunks are written, entry in files doesn't appear.
self.assertEqual(
await self.db.fs.chunks.count_documents({"files_id": infile._id}), infile._chunk_number
)
self.assertIsNone(await self.db.fs.files.find_one({"_id": infile._id}))
self.assertTrue(infile.closed)
async def test_prechunked_string(self):
async def write_me(s, chunk_size):
buf = BytesIO(s)
infile = AsyncGridIn(self.db.fs)
while True:
to_write = buf.read(chunk_size)
if to_write == b"":
break
await infile.write(to_write)
await infile.close()
buf.close()
outfile = AsyncGridOut(self.db.fs, infile._id)
data = await outfile.read()
self.assertEqual(s, data)
s = b"x" * DEFAULT_CHUNK_SIZE * 4
# Test with default chunk size
await write_me(s, DEFAULT_CHUNK_SIZE)
# Multiple
await write_me(s, DEFAULT_CHUNK_SIZE * 3)
# Custom
await write_me(s, 262300)
async def test_grid_out_lazy_connect(self):
fs = self.db.fs
outfile = AsyncGridOut(fs, file_id=-1)
with self.assertRaises(NoFile):
await outfile.read()
with self.assertRaises(NoFile):
if not _IS_SYNC:
await outfile.open()
outfile.filename
infile = AsyncGridIn(fs, filename=1)
await infile.close()
outfile = AsyncGridOut(fs, infile._id)
await outfile.read()
outfile.filename
outfile = AsyncGridOut(fs, infile._id)
await outfile.readchunk()
async def test_grid_in_lazy_connect(self):
client = self.simple_client("badhost", connect=False, serverSelectionTimeoutMS=10)
fs = client.db.fs
infile = AsyncGridIn(fs, file_id=-1, chunk_size=1)
with self.assertRaises(ServerSelectionTimeoutError):
await infile.write(b"data")
with self.assertRaises(ServerSelectionTimeoutError):
await infile.close()
async def test_unacknowledged(self):
# w=0 is prohibited.
with self.assertRaises(ConfigurationError):
AsyncGridIn((await self.async_rs_or_single_client(w=0)).pymongo_test.fs)
async def test_survive_cursor_not_found(self):
# By default the find command returns 101 documents in the first batch.
# Use 102 batches to cause a single getMore.
chunk_size = 1024
data = b"d" * (102 * chunk_size)
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
db = client.pymongo_test
async with AsyncGridIn(db.fs, chunk_size=chunk_size) as infile:
await infile.write(data)
async with AsyncGridOut(db.fs, infile._id) as outfile:
self.assertEqual(len(await outfile.readchunk()), chunk_size)
# Kill the cursor to simulate the cursor timing out on the server
# when an application spends a long time between two calls to
# readchunk().
assert await client.address is not None
await client._close_cursor_now(
outfile._chunk_iter._cursor.cursor_id,
_CursorAddress(await client.address, db.fs.chunks.full_name), # type: ignore[arg-type]
)
# Read the rest of the file without error.
self.assertEqual(len(await outfile.read()), len(data) - chunk_size)
# Paranoid, ensure that a getMore was actually sent.
self.assertIn("getMore", listener.started_command_names())
@async_client_context.require_sync
async def test_zip(self):
zf = BytesIO()
z = zipfile.ZipFile(zf, "w")
z.writestr("test.txt", b"hello world")
z.close()
zf.seek(0)
f = AsyncGridIn(self.db.fs, filename="test.zip")
await f.write(zf)
await f.close()
self.assertEqual(1, await self.db.fs.files.count_documents({}))
self.assertEqual(1, await self.db.fs.chunks.count_documents({}))
g = AsyncGridOut(self.db.fs, f._id)
z = zipfile.ZipFile(g)
self.assertSequenceEqual(z.namelist(), ["test.txt"])
self.assertEqual(z.read("test.txt"), b"hello world")
async def test_grid_out_unsupported_operations(self):
f = AsyncGridIn(self.db.fs, chunkSize=3)
await f.write(b"hello world")
await f.close()
g = AsyncGridOut(self.db.fs, f._id)
self.assertRaises(io.UnsupportedOperation, g.writelines, [b"some", b"lines"])
self.assertRaises(io.UnsupportedOperation, g.write, b"some text")
self.assertRaises(io.UnsupportedOperation, g.fileno)
self.assertRaises(io.UnsupportedOperation, g.truncate)
self.assertFalse(g.writable())
self.assertFalse(g.isatty())
if __name__ == "__main__":
unittest.main()