* no longer keeps the whole file in memory for write
* cache chunk_size so we dont read the file each time * dont modify user file_spec argument Signed-off-by: Mike Dirolf <mike@10gen.com>
This commit is contained in:
parent
af4516f0dc
commit
df7ee8e618
@ -19,6 +19,10 @@ import datetime
|
||||
import math
|
||||
import os
|
||||
from threading import Condition
|
||||
try:
|
||||
from cStringIO import StringIO
|
||||
except:
|
||||
from StringIO import StringIO
|
||||
|
||||
from pymongo import _SEEK_SET
|
||||
from pymongo import _SEEK_CUR
|
||||
@ -97,7 +101,7 @@ class GridFile(object):
|
||||
raise ValueError("mode must be one of ('r', 'w')")
|
||||
|
||||
self.__collection = database[collection]
|
||||
self.__collection.chunks.ensure_index([("files_id", ASCENDING), ("n", ASCENDING)])
|
||||
self.__collection.chunks.ensure_index([("files_id", ASCENDING), ("n", ASCENDING)], unique=True)
|
||||
|
||||
_files_lock.acquire()
|
||||
|
||||
@ -108,10 +112,11 @@ class GridFile(object):
|
||||
if mode == "r":
|
||||
_files_lock.release()
|
||||
raise IOError("No such file: %r" % file_spec)
|
||||
file_spec["length"] = 0
|
||||
file_spec["uploadDate"] = datetime.datetime.utcnow()
|
||||
file_spec.setdefault("chunkSize", 256000)
|
||||
self.__id = self.__collection.files.insert(file_spec)
|
||||
grid_file = file_spec.copy()
|
||||
grid_file["length"] = 0
|
||||
grid_file["uploadDate"] = datetime.datetime.utcnow()
|
||||
grid_file.setdefault("chunkSize", 256000)
|
||||
self.__id = self.__collection.files.insert(grid_file)
|
||||
|
||||
# we use repr(self.__id) here because we need it to be string and
|
||||
# filename gets tricky with renaming. this is a hack.
|
||||
@ -124,8 +129,10 @@ class GridFile(object):
|
||||
if mode == "w":
|
||||
self.__erase()
|
||||
self.__buffer = ""
|
||||
self.__write_buffer = StringIO()
|
||||
self.__position = 0
|
||||
self.__chunk_number = 0
|
||||
self.__chunk_size = grid_file["chunkSize"]
|
||||
self.__closed = False
|
||||
|
||||
def __erase(self):
|
||||
@ -179,50 +186,51 @@ class GridFile(object):
|
||||
grid_file["filename"] = filename
|
||||
self.__collection.files.save(grid_file)
|
||||
|
||||
def __max_chunk(self):
|
||||
return self.__collection.chunks.find_one({"files_id": self.__id, "n": self.__chunk_number})
|
||||
def __flush_write_buffer(self):
|
||||
"""Flush the write buffer contents out to a chunk.
|
||||
|
||||
def __new_chunk(self, n):
|
||||
chunk = {"files_id": self.__id,
|
||||
"n": n,
|
||||
"data": ""}
|
||||
self.__collection.chunks.insert(chunk)
|
||||
return chunk
|
||||
|
||||
def __write_buffer_to_chunks(self):
|
||||
"""Write the buffer contents out to chunks.
|
||||
"""
|
||||
while len(self.__buffer):
|
||||
max_chunk = self.__max_chunk()
|
||||
if not max_chunk:
|
||||
max_chunk = self.__new_chunk(self.__chunk_number)
|
||||
space = (self.__chunk_number + 1) * self.chunk_size - self.__position
|
||||
if not space:
|
||||
self.__chunk_number += 1
|
||||
max_chunk = self.__new_chunk(self.__chunk_number)
|
||||
space = self.chunk_size
|
||||
to_write = len(self.__buffer) > space and space or len(self.__buffer)
|
||||
data = self.__write_buffer.getvalue()
|
||||
|
||||
max_chunk["data"] = Binary(max_chunk["data"] + self.__buffer[:to_write])
|
||||
self.__collection.chunks.save(max_chunk)
|
||||
self.__buffer = self.__buffer[to_write:]
|
||||
self.__position += to_write
|
||||
if not data:
|
||||
return
|
||||
|
||||
assert(len(data) <= self.__chunk_size)
|
||||
|
||||
chunk = {"files_id": self.__id,
|
||||
"n": self.__chunk_number,
|
||||
"data": Binary(data) }
|
||||
|
||||
self.__collection.chunks.update({"files_id": self.__id,
|
||||
"n": self.__chunk_number},
|
||||
chunk,
|
||||
upsert=True)
|
||||
|
||||
if len(data) == self.__chunk_size:
|
||||
self.__chunk_number += 1
|
||||
self.__position += len(data)
|
||||
self.__write_buffer.close()
|
||||
self.__write_buffer = StringIO()
|
||||
|
||||
def flush(self):
|
||||
"""Flush the GridFile to the database.
|
||||
|
||||
Updates md5 and length to reflect the currently written chunks
|
||||
(there may still be buffered data to be written to the next
|
||||
chunk).
|
||||
"""
|
||||
self.__assert_open()
|
||||
if self.mode != "w":
|
||||
return
|
||||
|
||||
self.__write_buffer_to_chunks()
|
||||
self.__flush_write_buffer()
|
||||
|
||||
md5 = self.__collection.database()._command(SON([("filemd5", self.__id),
|
||||
("root", self.__collection.name())]))["md5"]
|
||||
|
||||
grid_file = self.__collection.files.find_one({"_id": self.__id})
|
||||
grid_file["md5"] = md5
|
||||
grid_file["length"] = self.__position + len(self.__buffer)
|
||||
grid_file["length"] = self.__position + self.__write_buffer.tell()
|
||||
self.__collection.files.save(grid_file)
|
||||
|
||||
def close(self):
|
||||
@ -267,7 +275,7 @@ class GridFile(object):
|
||||
size = remainder
|
||||
|
||||
bytes = self.__buffer
|
||||
chunk_number = math.floor(self.__position / self.chunk_size)
|
||||
chunk_number = math.floor(self.__position / self.__chunk_size)
|
||||
|
||||
while len(bytes) < size:
|
||||
chunk = self.__collection.chunks.find_one({"files_id": self.__id, "n": chunk_number})
|
||||
@ -275,7 +283,7 @@ class GridFile(object):
|
||||
raise CorruptGridFile("no chunk for n = " + chunk_number)
|
||||
|
||||
if not bytes:
|
||||
bytes += chunk["data"][self.__position % self.chunk_size:]
|
||||
bytes += chunk["data"][self.__position % self.__chunk_size:]
|
||||
else:
|
||||
bytes += chunk["data"]
|
||||
|
||||
@ -304,10 +312,16 @@ class GridFile(object):
|
||||
if not isinstance(str, types.StringType):
|
||||
raise TypeError("can only write strings")
|
||||
|
||||
if not len(str):
|
||||
return
|
||||
while str:
|
||||
space = self.__chunk_size - self.__write_buffer.tell()
|
||||
|
||||
self.__buffer += str
|
||||
if len(str) <= space:
|
||||
self.__write_buffer.write(str)
|
||||
break
|
||||
else:
|
||||
self.__write_buffer.write(str[:space])
|
||||
self.__flush_write_buffer()
|
||||
str = str[space:]
|
||||
|
||||
def tell(self):
|
||||
"""Return the GridFile's current position (read-mode files only).
|
||||
|
||||
Loading…
Reference in New Issue
Block a user