some tests and fixes for new gridfs api
This commit is contained in:
parent
0baad44c7e
commit
0997fbf9e8
@ -14,5 +14,5 @@
|
||||
|
||||
.. autoattribute:: _id
|
||||
|
||||
.. autoclass:: GridFile(file-spec, database[, mode='r'[, collection='fs']])
|
||||
.. autoclass:: GridFile
|
||||
:members:
|
||||
|
||||
@ -38,20 +38,24 @@ except AttributeError: # before 2.5
|
||||
_SEEK_END = 2
|
||||
|
||||
|
||||
"""Default chunk size, in bytes."""
|
||||
DEFAULT_CHUNK_SIZE = 256 * 1024
|
||||
|
||||
|
||||
def _create_property(field_name, docstring,
|
||||
read_only=False, closed_only=False):
|
||||
"""Helper for creating properties to read/write to files.
|
||||
"""
|
||||
def getter(self):
|
||||
if closed_only and not self.__closed:
|
||||
if closed_only and not self._closed:
|
||||
raise AttributeError("can only get %r on a closed file" %
|
||||
field_name)
|
||||
return self.__file.get(field_name, None)
|
||||
return self._file.get(field_name, None)
|
||||
def setter(self, value):
|
||||
if self.__closed:
|
||||
if self._closed:
|
||||
raise AttributeError("cannot set %r on a closed file" %
|
||||
field_name)
|
||||
self.__file[field_name] = value
|
||||
self._file[field_name] = value
|
||||
|
||||
if read_only:
|
||||
docstring = docstring + "\n\nThis attribute is read-only."""
|
||||
@ -107,32 +111,40 @@ class GridIn(object):
|
||||
if not isinstance(root_collection, Collection):
|
||||
raise TypeError("root_collection must be an instance of Collection")
|
||||
|
||||
# Convert from kwargs to a file document
|
||||
# Handle alternative naming
|
||||
if "content_type" in kwargs:
|
||||
kwargs["contentType"] = kwargs.pop("content_type")
|
||||
if "chunk_size" in kwargs:
|
||||
kwargs["chunkSize"] = kwargs.pop("chunk_size")
|
||||
|
||||
# Move bonus kwargs into metadata
|
||||
to_move = []
|
||||
for key in kwargs:
|
||||
if key not in ["_id", "filename", "contentType",
|
||||
"chunkSize", "aliases", "metadata"]:
|
||||
kwargs["metadata"] = kwargs.get("metadata", {})
|
||||
to_move.append(key)
|
||||
if to_move:
|
||||
kwargs["metadata"] = kwargs.get("metadata", {})
|
||||
for key in to_move:
|
||||
kwargs["metadata"][key] = kwargs.pop(key)
|
||||
if "_id" not in kwargs:
|
||||
kwargs["_id"] = ObjectId()
|
||||
|
||||
# Defaults
|
||||
kwargs["_id"] = kwargs.get("_id", ObjectId())
|
||||
kwargs["chunkSize"] = kwargs.get("chunkSize", DEFAULT_CHUNK_SIZE)
|
||||
|
||||
self.__coll = root_collection
|
||||
self.__chunks = root_collection.chunks
|
||||
self.__file = kwargs
|
||||
self._file = kwargs
|
||||
self.__buffer = StringIO()
|
||||
self.__position = 0
|
||||
self.__chunk_number = 0
|
||||
self.__closed = False
|
||||
self._closed = False
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
"""Is this file closed?
|
||||
"""
|
||||
return self.__closed
|
||||
return self._closed
|
||||
|
||||
_id = _create_property("_id", "The ``'_id'`` value for this file.",
|
||||
read_only=True)
|
||||
@ -158,7 +170,7 @@ class GridIn(object):
|
||||
return
|
||||
assert(len(data) <= self.chunk_size)
|
||||
|
||||
chunk = {"files_id": self.__file["_id"],
|
||||
chunk = {"files_id": self._file["_id"],
|
||||
"n": self.__chunk_number,
|
||||
"data": Binary(data)}
|
||||
|
||||
@ -181,10 +193,10 @@ class GridIn(object):
|
||||
md5 = self.__coll.database.command("filemd5", self._id,
|
||||
root=self.__coll.name)["md5"]
|
||||
|
||||
self.__file["md5"] = md5
|
||||
self.__file["length"] = self.__position
|
||||
self.__file["uploadDate"] = datetime.datetime.utcnow()
|
||||
return self.__coll.files.insert(self.__file)
|
||||
self._file["md5"] = md5
|
||||
self._file["length"] = self.__position
|
||||
self._file["uploadDate"] = datetime.datetime.utcnow()
|
||||
return self.__coll.files.insert(self._file)
|
||||
|
||||
def close(self):
|
||||
"""Flush the file and close it.
|
||||
@ -192,9 +204,9 @@ class GridIn(object):
|
||||
A closed file cannot be written any more. Calling
|
||||
:meth:`close` more than once is allowed.
|
||||
"""
|
||||
if not self.__closed:
|
||||
if not self._closed:
|
||||
self.__flush()
|
||||
self.__closed = True
|
||||
self._closed = True
|
||||
|
||||
# TODO should support writing unicode to a file. this means that files will
|
||||
# need to have an encoding attribute.
|
||||
@ -210,7 +222,7 @@ class GridIn(object):
|
||||
:Parameters:
|
||||
- `data`: string of bytes to be written to the file
|
||||
"""
|
||||
if self.__closed:
|
||||
if self._closed:
|
||||
raise ValueError("cannot write to a closed file")
|
||||
|
||||
if not isinstance(data, str):
|
||||
@ -270,9 +282,9 @@ class GridOut(object):
|
||||
raise TypeError("root_collection must be an instance of Collection")
|
||||
|
||||
self.__chunks = root_collection.chunks
|
||||
self.__file = root_collection.files.find_one({"_id": file_id})
|
||||
self._file = root_collection.files.find_one({"_id": file_id})
|
||||
|
||||
if not self.__file:
|
||||
if not self._file:
|
||||
raise NoFile("no file in gridfs collection %r with _id %r" %
|
||||
(root_collection, file_id))
|
||||
|
||||
@ -315,7 +327,7 @@ class GridOut(object):
|
||||
size = remainder
|
||||
|
||||
data = self.__buffer
|
||||
chunk_number = (len(bytes) + self.__position) / self.chunk_size
|
||||
chunk_number = (len(data) + self.__position) / self.chunk_size
|
||||
|
||||
while len(data) < size:
|
||||
chunk = self.__chunks.find_one({"files_id": self._id,
|
||||
|
||||
@ -15,421 +15,336 @@
|
||||
"""Tests for the grid_file module.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import datetime
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
import qcheck
|
||||
from gridfs.grid_file import (_SEEK_CUR,
|
||||
_SEEK_END,
|
||||
GridIn,
|
||||
GridFile,
|
||||
GridOut)
|
||||
from gridfs.errors import (NoFile,
|
||||
UnsupportedAPI)
|
||||
from pymongo.objectid import ObjectId
|
||||
from test_connection import get_connection
|
||||
from gridfs.grid_file import GridFile, _SEEK_END, _SEEK_CUR
|
||||
import qcheck
|
||||
|
||||
|
||||
class TestGridFile(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.db = get_connection().pymongo_test
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
|
||||
def test_basic(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
f = GridIn(self.db.fs, filename="test")
|
||||
f.write("hello world")
|
||||
f.close()
|
||||
self.assertEqual(1, self.db.fs.files.find().count())
|
||||
self.assertEqual(1, self.db.fs.chunks.find().count())
|
||||
|
||||
self.assertEqual(self.db.fs.files.find().count(), 0)
|
||||
self.assertEqual(self.db.fs.chunks.find().count(), 0)
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.write("hello world")
|
||||
file.close()
|
||||
|
||||
self.assertEqual(self.db.fs.files.find().count(), 1)
|
||||
self.assertEqual(self.db.fs.chunks.find().count(), 1)
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db)
|
||||
self.assertEqual(file.read(), "hello world")
|
||||
file.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual("hello world", g.read())
|
||||
|
||||
# make sure it's still there...
|
||||
file = GridFile({"filename": "test"}, self.db)
|
||||
self.assertEqual(file.read(), "hello world")
|
||||
file.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual("hello world", g.read())
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.close()
|
||||
|
||||
self.assertEqual(self.db.fs.files.find().count(), 1)
|
||||
self.assertEqual(self.db.fs.chunks.find().count(), 0)
|
||||
f = GridIn(self.db.fs, filename="test")
|
||||
f.close()
|
||||
self.assertEqual(2, self.db.fs.files.find().count())
|
||||
self.assertEqual(1, self.db.fs.chunks.find().count())
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db)
|
||||
self.assertEqual(file.read(), "")
|
||||
file.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual("", g.read())
|
||||
|
||||
def test_md5(self):
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.write("hello world\n")
|
||||
file.close()
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db)
|
||||
self.assertEqual(file.md5, "6f5902ac237024bdd0c176cb93063dc4")
|
||||
file.close()
|
||||
f = GridIn(self.db.fs)
|
||||
f.write("hello world\n")
|
||||
f.close()
|
||||
self.assertEqual("6f5902ac237024bdd0c176cb93063dc4", f.md5)
|
||||
|
||||
def test_alternate_collection(self):
|
||||
self.db.pymongo_test.files.remove({})
|
||||
self.db.pymongo_test.chunks.remove({})
|
||||
self.db.alt.files.remove({})
|
||||
self.db.alt.chunks.remove({})
|
||||
|
||||
self.assertEqual(self.db.pymongo_test.files.find().count(), 0)
|
||||
self.assertEqual(self.db.pymongo_test.chunks.find().count(), 0)
|
||||
file = GridFile({"filename": "test"}, self.db, "w",
|
||||
collection="pymongo_test")
|
||||
file.write("hello world")
|
||||
file.close()
|
||||
f = GridIn(self.db.alt)
|
||||
f.write("hello world")
|
||||
f.close()
|
||||
|
||||
self.assertEqual(self.db.pymongo_test.files.find().count(), 1)
|
||||
self.assertEqual(self.db.pymongo_test.chunks.find().count(), 1)
|
||||
self.assertEqual(1, self.db.alt.files.find().count())
|
||||
self.assertEqual(1, self.db.alt.chunks.find().count())
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db,
|
||||
collection="pymongo_test")
|
||||
self.assertEqual(file.read(), "hello world")
|
||||
file.close()
|
||||
g = GridOut(self.db.alt, f._id)
|
||||
self.assertEqual("hello world", g.read())
|
||||
|
||||
# test that md5 still works...
|
||||
self.assertEqual(file.md5, "5eb63bbbe01eeed093cb22bb8f5acdc3")
|
||||
self.assertEqual("5eb63bbbe01eeed093cb22bb8f5acdc3", g.md5)
|
||||
|
||||
# make sure it's still there...
|
||||
file = GridFile({"filename": "test"}, self.db,
|
||||
collection="pymongo_test")
|
||||
self.assertEqual(file.read(), "hello world")
|
||||
file.close()
|
||||
def test_grid_file(self):
|
||||
self.assertRaises(UnsupportedAPI, GridFile)
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w",
|
||||
collection="pymongo_test")
|
||||
file.close()
|
||||
def test_grid_in_default_opts(self):
|
||||
self.assertRaises(TypeError, GridIn, "foo")
|
||||
|
||||
self.assertEqual(self.db.pymongo_test.files.find().count(), 1)
|
||||
self.assertEqual(self.db.pymongo_test.chunks.find().count(), 0)
|
||||
a = GridIn(self.db.fs)
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db,
|
||||
collection="pymongo_test")
|
||||
self.assertEqual(file.read(), "")
|
||||
file.close()
|
||||
self.assert_(isinstance(a._id, ObjectId))
|
||||
self.assertRaises(AttributeError, setattr, a, "_id", 5)
|
||||
|
||||
def test_create_grid_file(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
self.assertEqual(None, a.name)
|
||||
a.name = "my_file"
|
||||
self.assertEqual("my_file", a.name)
|
||||
|
||||
# just write a blank file so that reads on {} don't fail
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.close()
|
||||
self.assertEqual(None, a.content_type)
|
||||
a.content_type = "text/html"
|
||||
self.assertEqual("text/html", a.content_type)
|
||||
|
||||
self.assertRaises(TypeError, GridFile, "hello", self.db)
|
||||
self.assertRaises(TypeError, GridFile, None, self.db)
|
||||
self.assertRaises(TypeError, GridFile, 5, self.db)
|
||||
self.assertRaises(AttributeError, getattr, a, "length")
|
||||
self.assertRaises(AttributeError, setattr, a, "length", 5)
|
||||
|
||||
self.assertRaises(TypeError, GridFile, {}, "hello")
|
||||
self.assertRaises(TypeError, GridFile, {}, None)
|
||||
self.assertRaises(TypeError, GridFile, {}, 5)
|
||||
GridFile({}, self.db).close()
|
||||
self.assertEqual(256 * 1024, a.chunk_size)
|
||||
self.assertRaises(AttributeError, setattr, a, "chunk_size", 5)
|
||||
|
||||
self.assertRaises(TypeError, GridFile, {}, self.db, None)
|
||||
self.assertRaises(TypeError, GridFile, {}, self.db, 5)
|
||||
self.assertRaises(TypeError, GridFile, {}, self.db, [])
|
||||
self.assertRaises(ValueError, GridFile, {}, self.db, "m")
|
||||
self.assertRaises(ValueError, GridFile, {}, self.db, u"m")
|
||||
GridFile({}, self.db, "r").close()
|
||||
GridFile({}, self.db, u"r").close()
|
||||
GridFile({}, self.db, "w").close()
|
||||
GridFile({}, self.db, u"w").close()
|
||||
self.assertRaises(AttributeError, getattr, a, "upload_date")
|
||||
self.assertRaises(AttributeError, setattr, a, "upload_date", 5)
|
||||
|
||||
self.assertRaises(TypeError, GridFile, {}, self.db, "r", None)
|
||||
self.assertRaises(TypeError, GridFile, {}, self.db, "r", 5)
|
||||
self.assertRaises(TypeError, GridFile, {}, self.db, "r", [])
|
||||
self.assertEqual(None, a.aliases)
|
||||
a.aliases = ["foo"]
|
||||
self.assertEqual(["foo"], a.aliases)
|
||||
|
||||
self.assertRaises(IOError, GridFile, {"filename": "mike"}, self.db)
|
||||
GridFile({"filename": "test"}, self.db).close()
|
||||
self.assertEqual(None, a.metadata)
|
||||
a.metadata = {"foo": 1}
|
||||
self.assertEqual({"foo": 1}, a.metadata)
|
||||
|
||||
def test_properties(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
self.assertRaises(AttributeError, getattr, a, "md5")
|
||||
self.assertRaises(AttributeError, setattr, a, "md5", 5)
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
self.assertEqual(file.mode, "w")
|
||||
self.failIf(file.closed)
|
||||
file.close()
|
||||
self.assert_(file.closed)
|
||||
a.close()
|
||||
|
||||
self.assertRaises(IOError, GridFile, {"filename": "mike"}, self.db)
|
||||
a = GridFile({"filename": "test"}, self.db)
|
||||
self.assert_(isinstance(a._id, ObjectId))
|
||||
self.assertRaises(AttributeError, setattr, a, "_id", 5)
|
||||
|
||||
self.assertEqual(a.mode, "r")
|
||||
self.failIf(a.closed)
|
||||
self.assertEqual("my_file", a.name)
|
||||
self.assertRaises(AttributeError, setattr, a, "name", "foo")
|
||||
|
||||
self.assertEqual("text/html", a.content_type)
|
||||
self.assertRaises(AttributeError, setattr, a, "content_type", "foo")
|
||||
|
||||
self.assertEqual(0, a.length)
|
||||
self.assertRaises(AttributeError, setattr, a, "length", 5)
|
||||
|
||||
self.assertEqual(256 * 1024, a.chunk_size)
|
||||
self.assertRaises(AttributeError, setattr, a, "chunk_size", 5)
|
||||
|
||||
self.assertEqual(a.length, 0)
|
||||
self.assertEqual(a.content_type, None)
|
||||
self.assertEqual(a.name, "test")
|
||||
self.assertEqual(a.chunk_size, 256000)
|
||||
self.assert_(isinstance(a.upload_date, datetime.datetime))
|
||||
self.assertEqual(a.aliases, None)
|
||||
self.assertEqual(a.metadata, None)
|
||||
self.assertEqual(a.md5, "d41d8cd98f00b204e9800998ecf8427e")
|
||||
self.assertRaises(AttributeError, setattr, a, "upload_date", 5)
|
||||
|
||||
a.content_type = "something"
|
||||
self.assertEqual(a.content_type, "something")
|
||||
self.assertEqual(["foo"], a.aliases)
|
||||
self.assertRaises(AttributeError, setattr, a, "aliases", [])
|
||||
|
||||
def set_length():
|
||||
a.length = 10
|
||||
self.assertRaises(AttributeError, set_length)
|
||||
self.assertEqual({"foo": 1}, a.metadata)
|
||||
self.assertRaises(AttributeError, setattr, a, "metadata", {})
|
||||
|
||||
def set_chunk_size():
|
||||
a.chunk_size = 100
|
||||
self.assertRaises(AttributeError, set_chunk_size)
|
||||
self.assertEqual("d41d8cd98f00b204e9800998ecf8427e", a.md5)
|
||||
self.assertRaises(AttributeError, setattr, a, "md5", 5)
|
||||
|
||||
def set_upload_date():
|
||||
a.upload_date = datetime.datetime.utcnow()
|
||||
self.assertRaises(AttributeError, set_upload_date)
|
||||
def test_grid_in_custom_opts(self):
|
||||
self.assertRaises(TypeError, GridIn, "foo")
|
||||
|
||||
a.aliases = ["hello", "world"]
|
||||
self.assertEqual(a.aliases, ["hello", "world"])
|
||||
a = GridIn(self.db.fs, _id=5, filename="my_file",
|
||||
contentType="text/html", chunkSize=1000, aliases=["foo"],
|
||||
metadata={"foo": 1, "bar": 2}, bar=3, baz="hello")
|
||||
|
||||
a.metadata = {"something": "else"}
|
||||
self.assertEqual(a.metadata, {"something": "else"})
|
||||
self.assertEqual(5, a._id)
|
||||
self.assertEqual("my_file", a.name)
|
||||
self.assertEqual("text/html", a.content_type)
|
||||
self.assertEqual(1000, a.chunk_size)
|
||||
self.assertEqual(["foo"], a.aliases)
|
||||
self.assertEqual({"foo": 1, "bar": 3, "baz": "hello"}, a.metadata)
|
||||
|
||||
def set_name():
|
||||
a.name = "hello"
|
||||
self.assertRaises(AttributeError, set_name)
|
||||
b = GridIn(self.db.fs, content_type="text/html", chunk_size=1000, baz=100)
|
||||
self.assertEqual("text/html", b.content_type)
|
||||
self.assertEqual(1000, b.chunk_size)
|
||||
self.assertEqual({"baz": 100}, b.metadata)
|
||||
|
||||
def set_md5():
|
||||
a.md5 = "what"
|
||||
self.assertRaises(AttributeError, set_md5)
|
||||
def test_grid_out_default_opts(self):
|
||||
self.assertRaises(TypeError, GridOut, "foo")
|
||||
|
||||
self.assertRaises(NoFile, GridOut, self.db.fs, 5)
|
||||
|
||||
a = GridIn(self.db.fs)
|
||||
a.close()
|
||||
|
||||
def test_rename(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
b = GridOut(self.db.fs, a._id)
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.close()
|
||||
self.assertEqual(a._id, b._id)
|
||||
self.assertEqual(0, b.length)
|
||||
self.assertEqual(None, b.content_type)
|
||||
self.assertEqual(256 * 1024, b.chunk_size)
|
||||
self.assert_(isinstance(b.upload_date, datetime.datetime))
|
||||
self.assertEqual(None, b.aliases)
|
||||
self.assertEqual(None, b.metadata)
|
||||
self.assertEqual("d41d8cd98f00b204e9800998ecf8427e", b.md5)
|
||||
|
||||
self.assertRaises(IOError, GridFile, {"filename": "mike"}, self.db)
|
||||
a = GridFile({"filename": "test"}, self.db)
|
||||
for attr in ["_id", "name", "content_type", "length", "chunk_size",
|
||||
"upload_date", "aliases", "metadata", "md5"]:
|
||||
self.assertRaises(AttributeError, setattr, b, attr, 5)
|
||||
|
||||
a.rename("mike")
|
||||
self.assertEqual("mike", a.name)
|
||||
def test_grid_out_custom_opts(self):
|
||||
a = GridIn(self.db.fs, _id=5, filename="my_file",
|
||||
contentType="text/html", chunkSize=1000, aliases=["foo"],
|
||||
metadata={"foo": 1, "bar": 2}, bar=3, baz="hello")
|
||||
a.write("hello world")
|
||||
a.close()
|
||||
|
||||
self.assertRaises(IOError, GridFile, {"filename": "test"}, self.db)
|
||||
GridFile({"filename": "mike"}, self.db).close()
|
||||
b = GridOut(self.db.fs, 5)
|
||||
|
||||
def test_flush_close(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
self.assertEqual(5, b._id)
|
||||
self.assertEqual(11, b.length)
|
||||
self.assertEqual("text/html", b.content_type)
|
||||
self.assertEqual(1000, b.chunk_size)
|
||||
self.assert_(isinstance(b.upload_date, datetime.datetime))
|
||||
self.assertEqual(["foo"], b.aliases)
|
||||
self.assertEqual({"foo": 1, "bar": 3, "baz": "hello"}, b.metadata)
|
||||
self.assertEqual("5eb63bbbe01eeed093cb22bb8f5acdc3", b.md5)
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.flush()
|
||||
file.close()
|
||||
file.close()
|
||||
self.assertRaises(ValueError, file.write, "test")
|
||||
for attr in ["_id", "name", "content_type", "length", "chunk_size",
|
||||
"upload_date", "aliases", "metadata", "md5"]:
|
||||
self.assertRaises(AttributeError, setattr, b, attr, 5)
|
||||
|
||||
file = GridFile({}, self.db)
|
||||
self.assertEqual(file.read(), "")
|
||||
file.close()
|
||||
def test_write_file_like(self):
|
||||
a = GridIn(self.db.fs)
|
||||
a.write("hello world")
|
||||
a.close()
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.write("mike")
|
||||
file.flush()
|
||||
file.write("test")
|
||||
file.flush()
|
||||
file.write("huh")
|
||||
file.flush()
|
||||
file.flush()
|
||||
file.close()
|
||||
file.close()
|
||||
self.assertRaises(ValueError, file.write, "test")
|
||||
file = GridFile({}, self.db)
|
||||
self.assertEqual(file.read(), "miketesthuh")
|
||||
file.close()
|
||||
b = GridOut(self.db.fs, a._id)
|
||||
|
||||
def test_overwrite(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
c = GridIn(self.db.fs)
|
||||
c.write(b)
|
||||
c.close()
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.write("test")
|
||||
file.close()
|
||||
d = GridOut(self.db.fs, c._id)
|
||||
self.assertEqual("hello world", d.read())
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.write("mike")
|
||||
file.close()
|
||||
|
||||
f = GridFile({}, self.db)
|
||||
self.assertEqual(f.read(), "mike")
|
||||
def test_close(self):
|
||||
f = GridIn(self.db.fs)
|
||||
f.close()
|
||||
self.assertRaises(ValueError, f.write, "test")
|
||||
f.close()
|
||||
|
||||
def test_multi_chunk_file(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
|
||||
random_string = qcheck.gen_string(qcheck.lift(300000))()
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.write(random_string)
|
||||
file.close()
|
||||
|
||||
self.assertEqual(self.db.fs.files.find().count(), 1)
|
||||
self.assertEqual(self.db.fs.chunks.find().count(), 2)
|
||||
|
||||
f = GridFile({}, self.db)
|
||||
self.assertEqual(f.read(), random_string)
|
||||
f = GridIn(self.db.fs)
|
||||
f.write(random_string)
|
||||
f.close()
|
||||
|
||||
def test_small_chunks(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
self.assertEqual(1, self.db.fs.files.find().count())
|
||||
self.assertEqual(2, self.db.fs.chunks.find().count())
|
||||
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual(random_string, g.read())
|
||||
|
||||
def test_small_chunks(self):
|
||||
self.files = 0
|
||||
self.chunks = 0
|
||||
|
||||
def helper(data):
|
||||
filename = qcheck.gen_printable_string(qcheck.lift(20))()
|
||||
|
||||
f = GridFile({"filename": filename, "chunkSize": 1}, self.db, "w")
|
||||
f = GridIn(self.db.fs, chunkSize=1)
|
||||
f.write(data)
|
||||
f.close()
|
||||
|
||||
self.files += 1
|
||||
self.chunks += len(data)
|
||||
|
||||
self.assertEqual(self.db.fs.files.find().count(), self.files)
|
||||
self.assertEqual(self.db.fs.chunks.find().count(), self.chunks)
|
||||
self.assertEqual(self.files, self.db.fs.files.find().count())
|
||||
self.assertEqual(self.chunks, self.db.fs.chunks.find().count())
|
||||
|
||||
f = GridFile({"filename": filename}, self.db)
|
||||
self.assertEqual(f.read(), data)
|
||||
f.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual(data, g.read())
|
||||
|
||||
f = GridFile({"filename": filename}, self.db)
|
||||
self.assertEqual(f.read(10) + f.read(10), data)
|
||||
f.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual(data, g.read(10) + g.read(10))
|
||||
return True
|
||||
|
||||
qcheck.check_unittest(self, helper,
|
||||
qcheck.gen_string(qcheck.gen_range(0, 20)))
|
||||
|
||||
def test_seek(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
f = GridIn(self.db.fs, chunkSize=3)
|
||||
f.write("hello world")
|
||||
f.close()
|
||||
|
||||
file = GridFile({"filename": "test", "chunkSize": 3}, self.db, "w")
|
||||
file.write("hello world")
|
||||
self.assertRaises(ValueError, file.seek, 0)
|
||||
file.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual("hello world", g.read())
|
||||
g.seek(0)
|
||||
self.assertEqual("hello world", g.read())
|
||||
g.seek(1)
|
||||
self.assertEqual("ello world", g.read())
|
||||
self.assertRaises(IOError, g.seek, -1)
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "r")
|
||||
self.assertEqual(file.read(), "hello world")
|
||||
file.seek(0)
|
||||
self.assertEqual(file.read(), "hello world")
|
||||
file.seek(1)
|
||||
self.assertEqual(file.read(), "ello world")
|
||||
self.assertRaises(IOError, file.seek, -1)
|
||||
g.seek(-3, _SEEK_END)
|
||||
self.assertEqual("rld", g.read())
|
||||
g.seek(0, _SEEK_END)
|
||||
self.assertEqual("", g.read())
|
||||
self.assertRaises(IOError, g.seek, -100, _SEEK_END)
|
||||
|
||||
file.seek(-3, _SEEK_END)
|
||||
self.assertEqual(file.read(), "rld")
|
||||
file.seek(0, _SEEK_END)
|
||||
self.assertEqual(file.read(), "")
|
||||
self.assertRaises(IOError, file.seek, -100, _SEEK_END)
|
||||
|
||||
file.seek(3)
|
||||
file.seek(3, _SEEK_CUR)
|
||||
self.assertEqual(file.read(), "world")
|
||||
self.assertRaises(IOError, file.seek, -100, _SEEK_CUR)
|
||||
|
||||
file.close()
|
||||
g.seek(3)
|
||||
g.seek(3, _SEEK_CUR)
|
||||
self.assertEqual("world", g.read())
|
||||
self.assertRaises(IOError, g.seek, -100, _SEEK_CUR)
|
||||
|
||||
def test_tell(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
f = GridIn(self.db.fs, chunkSize=3)
|
||||
f.write("hello world")
|
||||
f.close()
|
||||
|
||||
file = GridFile({"filename": "test", "chunkSize": 3}, self.db, "w")
|
||||
file.write("hello world")
|
||||
self.assertRaises(ValueError, file.tell)
|
||||
file.close()
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "r")
|
||||
self.assertEqual(file.tell(), 0)
|
||||
file.read(0)
|
||||
self.assertEqual(file.tell(), 0)
|
||||
file.read(1)
|
||||
self.assertEqual(file.tell(), 1)
|
||||
file.read(2)
|
||||
self.assertEqual(file.tell(), 3)
|
||||
file.read()
|
||||
self.assertEqual(file.tell(), file.length)
|
||||
|
||||
file.close()
|
||||
|
||||
def test_modes(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
self.assertRaises(ValueError, file.read)
|
||||
file.write("hello")
|
||||
file.close()
|
||||
self.assertRaises(ValueError, file.read)
|
||||
self.assertRaises(ValueError, file.write, "hello")
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "r")
|
||||
self.assertRaises(ValueError, file.write, "hello")
|
||||
file.read()
|
||||
file.close()
|
||||
self.assertRaises(ValueError, file.read)
|
||||
self.assertRaises(ValueError, file.write, "hello")
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual(0, g.tell())
|
||||
g.read(0)
|
||||
self.assertEqual(0, g.tell())
|
||||
g.read(1)
|
||||
self.assertEqual(1, g.tell())
|
||||
g.read(2)
|
||||
self.assertEqual(3, g.tell())
|
||||
g.read()
|
||||
self.assertEqual(g.length, g.tell())
|
||||
|
||||
def test_multiple_reads(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
f = GridIn(self.db.fs, chunkSize=3)
|
||||
f.write("hello world")
|
||||
f.close()
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "w")
|
||||
file.write("hello world")
|
||||
file.close()
|
||||
|
||||
file = GridFile({"filename": "test"}, self.db, "r")
|
||||
self.assertEqual(file.read(2), "he")
|
||||
self.assertEqual(file.read(2), "ll")
|
||||
self.assertEqual(file.read(2), "o ")
|
||||
self.assertEqual(file.read(2), "wo")
|
||||
self.assertEqual(file.read(2), "rl")
|
||||
self.assertEqual(file.read(2), "d")
|
||||
self.assertEqual(file.read(2), "")
|
||||
file.close()
|
||||
|
||||
def test_spec_with_id(self):
|
||||
# This was raising a TypeError at one point - make sure it doesn't
|
||||
file = GridFile({"_id": "foobar", "filename": "foobar"}, self.db, "w")
|
||||
file.close()
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
self.assertEqual("he", g.read(2))
|
||||
self.assertEqual("ll", g.read(2))
|
||||
self.assertEqual("o ", g.read(2))
|
||||
self.assertEqual("wo", g.read(2))
|
||||
self.assertEqual("rl", g.read(2))
|
||||
self.assertEqual("d", g.read(2))
|
||||
self.assertEqual("", g.read(2))
|
||||
|
||||
def test_read_chunks_unaligned_buffer_size(self):
|
||||
self.db.fs.files.remove({})
|
||||
self.db.fs.chunks.remove({})
|
||||
|
||||
in_data = "This is a text that doesn't quite fit in a single 16-byte chunk."
|
||||
f = GridFile({"filename":"test", "chunkSize":16}, self.db, "w")
|
||||
f = GridIn(self.db.fs, chunkSize=16)
|
||||
f.write(in_data)
|
||||
f.close()
|
||||
|
||||
f = GridFile({"filename":"test"}, self.db)
|
||||
g = GridOut(self.db.fs, f._id)
|
||||
out_data = ''
|
||||
while 1:
|
||||
s = f.read(13)
|
||||
s = g.read(13)
|
||||
if not s:
|
||||
break
|
||||
out_data += s
|
||||
f.close()
|
||||
|
||||
self.assertEqual(in_data, out_data)
|
||||
|
||||
def test_id(self):
|
||||
file = GridFile({"_id": "test"}, self.db, "w")
|
||||
self.assertEqual("test", file._id)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -15,12 +15,14 @@
|
||||
"""Tests for the gridfs package.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import unittest
|
||||
import threading
|
||||
import sys
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
import gridfs
|
||||
from gridfs.errors import NoFile
|
||||
from test_connection import get_connection
|
||||
|
||||
|
||||
@ -59,164 +61,183 @@ class TestGridfs(unittest.TestCase):
|
||||
self.db.drop_collection("pymongo_test.files")
|
||||
self.db.drop_collection("pymongo_test.chunks")
|
||||
self.fs = gridfs.GridFS(self.db)
|
||||
self.alt = gridfs.GridFS(self.db, "pymongo_test")
|
||||
|
||||
def test_open(self):
|
||||
self.assertRaises(IOError, self.fs.open, "my file", "r")
|
||||
f = self.fs.open("my file", "w")
|
||||
f.write("hello gridfs world!")
|
||||
f.close()
|
||||
def test_basic(self):
|
||||
oid = self.fs.put("hello world")
|
||||
self.assertEqual("hello world", self.fs.get(oid).read())
|
||||
self.assertEqual(1, self.db.fs.files.count())
|
||||
self.assertEqual(1, self.db.fs.chunks.count())
|
||||
|
||||
self.fs.delete(oid)
|
||||
self.assertRaises(NoFile, self.fs.get, oid)
|
||||
self.assertEqual(0, self.db.fs.files.count())
|
||||
self.assertEqual(0, self.db.fs.chunks.count())
|
||||
|
||||
self.assertRaises(NoFile, self.fs.get, "foo")
|
||||
oid = self.fs.put("hello world", _id="foo")
|
||||
self.assertEqual("foo", oid)
|
||||
self.assertEqual("hello world", self.fs.get("foo").read())
|
||||
|
||||
g = self.fs.open("my file", "r")
|
||||
self.assertEqual("hello gridfs world!", g.read())
|
||||
g.close()
|
||||
|
||||
def test_list(self):
|
||||
self.assertEqual(self.fs.list(), [])
|
||||
self.assertEqual([], self.fs.list())
|
||||
self.fs.put("hello world")
|
||||
self.assertEqual([], self.fs.list())
|
||||
|
||||
f = self.fs.open("mike", "w")
|
||||
f.close()
|
||||
|
||||
f = self.fs.open("test", "w")
|
||||
f.close()
|
||||
|
||||
f = self.fs.open("hello world", "w")
|
||||
f.close()
|
||||
self.fs.put("", filename="mike")
|
||||
self.fs.put("foo", filename="test")
|
||||
self.fs.put("", filename="hello world")
|
||||
|
||||
self.assertEqual(set(["mike", "test", "hello world"]),
|
||||
set(self.fs.list()))
|
||||
|
||||
def test_remove(self):
|
||||
self.assertRaises(TypeError, self.fs.remove, 5)
|
||||
self.assertRaises(TypeError, self.fs.remove, None)
|
||||
self.assertRaises(TypeError, self.fs.remove, [])
|
||||
def test_empty_file(self):
|
||||
oid = self.fs.put("")
|
||||
self.assertEqual("", self.fs.get(oid).read())
|
||||
self.assertEqual(1, self.db.fs.files.count())
|
||||
self.assertEqual(0, self.db.fs.chunks.count())
|
||||
|
||||
f = self.fs.open("mike", "w")
|
||||
f.write("hi")
|
||||
f.close()
|
||||
f = self.fs.open("test", "w")
|
||||
f.write("bye")
|
||||
f.close()
|
||||
f = self.fs.open("hello world", "w")
|
||||
f.write("fly")
|
||||
f.close()
|
||||
self.assertEqual(set(["mike", "test", "hello world"]),
|
||||
set(self.fs.list()))
|
||||
self.assertEqual(self.db.fs.files.find().count(), 3)
|
||||
self.assertEqual(self.db.fs.chunks.find().count(), 3)
|
||||
raw = self.db.fs.files.find_one()
|
||||
self.assertEqual(0, raw["length"])
|
||||
self.assertEqual(oid, raw["_id"])
|
||||
self.assert_(isinstance(raw["uploadDate"], datetime.datetime))
|
||||
self.assertEqual(256*1024, raw["chunkSize"])
|
||||
self.assert_(isinstance(raw["md5"], basestring))
|
||||
|
||||
self.fs.remove("test")
|
||||
|
||||
self.assertEqual(set(["mike", "hello world"]), set(self.fs.list()))
|
||||
self.assertEqual(self.db.fs.files.find().count(), 2)
|
||||
self.assertEqual(self.db.fs.chunks.find().count(), 2)
|
||||
f = self.fs.open("mike")
|
||||
self.assertEqual(f.read(), "hi")
|
||||
f.close()
|
||||
f = self.fs.open("hello world")
|
||||
self.assertEqual(f.read(), "fly")
|
||||
f.close()
|
||||
self.assertRaises(IOError, self.fs.open, "test")
|
||||
# def test_remove(self):
|
||||
# self.assertRaises(TypeError, self.fs.remove, 5)
|
||||
# self.assertRaises(TypeError, self.fs.remove, None)
|
||||
# self.assertRaises(TypeError, self.fs.remove, [])
|
||||
|
||||
self.fs.remove({})
|
||||
# f = self.fs.open("mike", "w")
|
||||
# f.write("hi")
|
||||
# f.close()
|
||||
# f = self.fs.open("test", "w")
|
||||
# f.write("bye")
|
||||
# f.close()
|
||||
# f = self.fs.open("hello world", "w")
|
||||
# f.write("fly")
|
||||
# f.close()
|
||||
# self.assertEqual(set(["mike", "test", "hello world"]),
|
||||
# set(self.fs.list()))
|
||||
# self.assertEqual(self.db.fs.files.find().count(), 3)
|
||||
# self.assertEqual(self.db.fs.chunks.find().count(), 3)
|
||||
|
||||
self.assertEqual([], self.fs.list())
|
||||
self.assertEqual(self.db.fs.files.find().count(), 0)
|
||||
self.assertEqual(self.db.fs.chunks.find().count(), 0)
|
||||
self.assertRaises(IOError, self.fs.open, "test")
|
||||
self.assertRaises(IOError, self.fs.open, "mike")
|
||||
self.assertRaises(IOError, self.fs.open, "hello world")
|
||||
# self.fs.remove("test")
|
||||
|
||||
def test_open_alt_coll(self):
|
||||
f = self.fs.open("my file", "w", "pymongo_test")
|
||||
f.write("hello gridfs world!")
|
||||
f.close()
|
||||
# self.assertEqual(set(["mike", "hello world"]), set(self.fs.list()))
|
||||
# self.assertEqual(self.db.fs.files.find().count(), 2)
|
||||
# self.assertEqual(self.db.fs.chunks.find().count(), 2)
|
||||
# f = self.fs.open("mike")
|
||||
# self.assertEqual(f.read(), "hi")
|
||||
# f.close()
|
||||
# f = self.fs.open("hello world")
|
||||
# self.assertEqual(f.read(), "fly")
|
||||
# f.close()
|
||||
# self.assertRaises(IOError, self.fs.open, "test")
|
||||
|
||||
self.assertRaises(IOError, self.fs.open, "my file", "r")
|
||||
g = self.fs.open("my file", "r", "pymongo_test")
|
||||
self.assertEqual("hello gridfs world!", g.read())
|
||||
g.close()
|
||||
# self.fs.remove({})
|
||||
|
||||
def test_list_alt_coll(self):
|
||||
f = self.fs.open("mike", "w", "pymongo_test")
|
||||
f.close()
|
||||
# self.assertEqual([], self.fs.list())
|
||||
# self.assertEqual(self.db.fs.files.find().count(), 0)
|
||||
# self.assertEqual(self.db.fs.chunks.find().count(), 0)
|
||||
# self.assertRaises(IOError, self.fs.open, "test")
|
||||
# self.assertRaises(IOError, self.fs.open, "mike")
|
||||
# self.assertRaises(IOError, self.fs.open, "hello world")
|
||||
|
||||
f = self.fs.open("test", "w", "pymongo_test")
|
||||
f.close()
|
||||
# def test_open_alt_coll(self):
|
||||
# f = self.fs.open("my file", "w", "pymongo_test")
|
||||
# f.write("hello gridfs world!")
|
||||
# f.close()
|
||||
|
||||
f = self.fs.open("hello world", "w", "pymongo_test")
|
||||
f.close()
|
||||
# self.assertRaises(IOError, self.fs.open, "my file", "r")
|
||||
# g = self.fs.open("my file", "r", "pymongo_test")
|
||||
# self.assertEqual("hello gridfs world!", g.read())
|
||||
# g.close()
|
||||
|
||||
self.assertEqual([], self.fs.list())
|
||||
self.assertEqual(set(["mike", "test", "hello world"]),
|
||||
set(self.fs.list("pymongo_test")))
|
||||
# def test_list_alt_coll(self):
|
||||
# f = self.fs.open("mike", "w", "pymongo_test")
|
||||
# f.close()
|
||||
|
||||
def test_remove_alt_coll(self):
|
||||
f = self.fs.open("mike", "w", "pymongo_test")
|
||||
f.write("hi")
|
||||
f.close()
|
||||
f = self.fs.open("test", "w", "pymongo_test")
|
||||
f.write("bye")
|
||||
f.close()
|
||||
f = self.fs.open("hello world", "w", "pymongo_test")
|
||||
f.write("fly")
|
||||
f.close()
|
||||
# f = self.fs.open("test", "w", "pymongo_test")
|
||||
# f.close()
|
||||
|
||||
self.fs.remove("test")
|
||||
self.assertEqual(set(["mike", "test", "hello world"]),
|
||||
set(self.fs.list("pymongo_test")))
|
||||
self.fs.remove("test", "pymongo_test")
|
||||
self.assertEqual(set(["mike", "hello world"]),
|
||||
set(self.fs.list("pymongo_test")))
|
||||
# f = self.fs.open("hello world", "w", "pymongo_test")
|
||||
# f.close()
|
||||
|
||||
f = self.fs.open("mike", collection="pymongo_test")
|
||||
self.assertEqual(f.read(), "hi")
|
||||
f.close()
|
||||
f = self.fs.open("hello world", collection="pymongo_test")
|
||||
self.assertEqual(f.read(), "fly")
|
||||
f.close()
|
||||
# self.assertEqual([], self.fs.list())
|
||||
# self.assertEqual(set(["mike", "test", "hello world"]),
|
||||
# set(self.fs.list("pymongo_test")))
|
||||
|
||||
self.fs.remove({}, "pymongo_test")
|
||||
# def test_remove_alt_coll(self):
|
||||
# f = self.fs.open("mike", "w", "pymongo_test")
|
||||
# f.write("hi")
|
||||
# f.close()
|
||||
# f = self.fs.open("test", "w", "pymongo_test")
|
||||
# f.write("bye")
|
||||
# f.close()
|
||||
# f = self.fs.open("hello world", "w", "pymongo_test")
|
||||
# f.write("fly")
|
||||
# f.close()
|
||||
|
||||
self.assertEqual([], self.fs.list("pymongo_test"))
|
||||
self.assertEqual(self.db.pymongo_test.files.find().count(), 0)
|
||||
self.assertEqual(self.db.pymongo_test.chunks.find().count(), 0)
|
||||
# self.fs.remove("test")
|
||||
# self.assertEqual(set(["mike", "test", "hello world"]),
|
||||
# set(self.fs.list("pymongo_test")))
|
||||
# self.fs.remove("test", "pymongo_test")
|
||||
# self.assertEqual(set(["mike", "hello world"]),
|
||||
# set(self.fs.list("pymongo_test")))
|
||||
|
||||
def test_threaded_reads(self):
|
||||
f = self.fs.open("test", "w")
|
||||
f.write("hello")
|
||||
f.close()
|
||||
# f = self.fs.open("mike", collection="pymongo_test")
|
||||
# self.assertEqual(f.read(), "hi")
|
||||
# f.close()
|
||||
# f = self.fs.open("hello world", collection="pymongo_test")
|
||||
# self.assertEqual(f.read(), "fly")
|
||||
# f.close()
|
||||
|
||||
threads = []
|
||||
for i in range(10):
|
||||
threads.append(JustRead(self.fs))
|
||||
threads[i].start()
|
||||
# self.fs.remove({}, "pymongo_test")
|
||||
|
||||
for i in range(10):
|
||||
threads[i].join()
|
||||
# self.assertEqual([], self.fs.list("pymongo_test"))
|
||||
# self.assertEqual(self.db.pymongo_test.files.find().count(), 0)
|
||||
# self.assertEqual(self.db.pymongo_test.chunks.find().count(), 0)
|
||||
|
||||
def test_threaded_writes(self):
|
||||
threads = []
|
||||
for i in range(10):
|
||||
threads.append(JustWrite(self.fs))
|
||||
threads[i].start()
|
||||
# def test_threaded_reads(self):
|
||||
# f = self.fs.open("test", "w")
|
||||
# f.write("hello")
|
||||
# f.close()
|
||||
|
||||
for i in range(10):
|
||||
threads[i].join()
|
||||
# threads = []
|
||||
# for i in range(10):
|
||||
# threads.append(JustRead(self.fs))
|
||||
# threads[i].start()
|
||||
|
||||
f = self.fs.open("test")
|
||||
self.assertEqual(f.read(), "hello")
|
||||
f.close()
|
||||
# for i in range(10):
|
||||
# threads[i].join()
|
||||
|
||||
# NOTE I do recognize how gross this is. There is no good way to
|
||||
# test the with statement because it is a syntax error in older
|
||||
# python versions. One option would be to use eval and skip the
|
||||
# test if it is a syntax error.
|
||||
if sys.version_info[:2] == (2, 5):
|
||||
import gridfs15
|
||||
test_with_statement = gridfs15.test_with_statement
|
||||
elif sys.version_info[:3] >= (2, 6, 0):
|
||||
import gridfs16
|
||||
test_with_statement = gridfs16.test_with_statement
|
||||
# def test_threaded_writes(self):
|
||||
# threads = []
|
||||
# for i in range(10):
|
||||
# threads.append(JustWrite(self.fs))
|
||||
# threads[i].start()
|
||||
|
||||
# for i in range(10):
|
||||
# threads[i].join()
|
||||
|
||||
# f = self.fs.open("test")
|
||||
# self.assertEqual(f.read(), "hello")
|
||||
# f.close()
|
||||
|
||||
# # NOTE I do recognize how gross this is. There is no good way to
|
||||
# # test the with statement because it is a syntax error in older
|
||||
# # python versions. One option would be to use eval and skip the
|
||||
# # test if it is a syntax error.
|
||||
# if sys.version_info[:2] == (2, 5):
|
||||
# import gridfs15
|
||||
# test_with_statement = gridfs15.test_with_statement
|
||||
# elif sys.version_info[:3] >= (2, 6, 0):
|
||||
# import gridfs16
|
||||
# test_with_statement = gridfs16.test_with_statement
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user