Introduce DocumentTooLarge exception PYTHON-630

The idea here is to unify the handling of oversize documents when using
the bulk API in MongoDB 2.6 and previous versions. This also means that
using bulk Collection.insert against legacy servers will attempt to insert
all documents previous to the oversize document before raising.
This commit is contained in:
Bernie Hackett 2014-01-29 12:55:49 -08:00
parent af08da3bcb
commit ee9ecd05ee
8 changed files with 167 additions and 104 deletions

View File

@ -542,27 +542,48 @@ static PyObject* _cbson_get_more_message(PyObject* self, PyObject* args) {
static void
_set_document_too_large(int size, long max) {
PyObject* InvalidDocument = _error("InvalidDocument");
if (InvalidDocument) {
PyObject* DocumentTooLarge = _error("DocumentTooLarge");
if (DocumentTooLarge) {
#if PY_MAJOR_VERSION >= 3
PyObject* error = PyUnicode_FromFormat(DOC_TOO_LARGE_FMT, size, max);
#else
PyObject* error = PyString_FromFormat(DOC_TOO_LARGE_FMT, size, max);
#endif
if (error) {
PyErr_SetObject(InvalidDocument, error);
PyErr_SetObject(DocumentTooLarge, error);
Py_DECREF(error);
}
Py_DECREF(InvalidDocument);
Py_DECREF(DocumentTooLarge);
}
}
static PyObject*
_send_insert(PyObject* self, PyObject* client,
PyObject* gle_args, buffer_t buffer,
char* coll_name, int coll_len, int request_id, int safe) {
PyObject* result;
if (safe) {
if (!add_last_error(self, buffer, request_id,
coll_name, coll_len, gle_args)) {
return NULL;
}
}
result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id,
buffer_get_buffer(buffer),
buffer_get_position(buffer));
return PyObject_CallMethod(client, "_send_message", "NN",
result, PyBool_FromLong((long)safe));
}
static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
struct module_state *state = GETSTATE(self);
/* NOTE just using a random number as the request_id */
int request_id = rand();
int options = 0;
int send_safe, options = 0;
int length_location, message_length;
int collection_name_length;
char* collection_name = NULL;
@ -574,7 +595,6 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
PyObject* result;
PyObject* max_bson_size_obj;
PyObject* max_message_size_obj;
PyObject* send_message_result;
unsigned char check_keys;
unsigned char safe;
unsigned char continue_on_error;
@ -598,6 +618,11 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
if (continue_on_error) {
options += 1;
}
/*
* If we are doing unacknowledged writes *and* continue_on_error
* is True it's pointless (and slower) to send GLE.
*/
send_safe = (safe || !continue_on_error);
max_bson_size_obj = PyObject_GetAttrString(client, "max_bson_size");
#if PY_MAJOR_VERSION >= 3
@ -651,7 +676,6 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
while ((doc = PyIter_Next(iterator)) != NULL) {
int before = buffer_get_position(buffer);
int cur_size;
empty = 0;
if (!write_dict(state->_cbson, buffer, doc, check_keys, uuid_subtype, 1)) {
Py_DECREF(doc);
goto iterfail;
@ -660,15 +684,28 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
cur_size = buffer_get_position(buffer) - before;
if (cur_size > max_bson_size) {
/* If we've encoded anything send it before raising. */
if (!empty) {
buffer_update_position(buffer, before);
message_length = buffer_get_position(buffer) - length_location;
memcpy(buffer_get_buffer(buffer) + length_location,
&message_length, 4);
result = _send_insert(self, client, last_error_args, buffer,
collection_name, collection_name_length,
request_id, send_safe);
if (!result)
goto iterfail;
Py_DECREF(result);
}
_set_document_too_large(cur_size, max_bson_size);
goto iterfail;
}
empty = 0;
/* We have enough data, send this batch. */
if (buffer_get_position(buffer) > max_message_size) {
int new_request_id = rand();
int message_start;
PyObject* send_gle = Py_False;
buffer_t new_buffer = buffer_new();
if (!new_buffer) {
PyErr_NoMemory();
@ -696,29 +733,16 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
message_length = buffer_get_position(buffer) - length_location;
memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4);
/* If we are doing unacknowledged writes *and* continue_on_error
* is True it's pointless (and slower) to send GLE. */
if (safe || !continue_on_error) {
send_gle = Py_True;
if (!add_last_error(self, buffer, request_id, collection_name,
collection_name_length, last_error_args)) {
buffer_free(new_buffer);
goto iterfail;
}
}
/* Objectify buffer */
result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id,
buffer_get_buffer(buffer),
buffer_get_position(buffer));
result = _send_insert(self, client, last_error_args, buffer,
collection_name, collection_name_length,
request_id, send_safe);
buffer_free(buffer);
buffer = new_buffer;
request_id = new_request_id;
length_location = message_start;
send_message_result = PyObject_CallMethod(client, "_send_message",
"NO", result, send_gle);
if (!send_message_result) {
if (!result) {
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
PyObject* OperationFailure;
PyErr_Fetch(&etype, &evalue, &etrace);
@ -757,7 +781,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
PyErr_Restore(etype, evalue, etrace);
goto iterfail;
} else {
Py_DECREF(send_message_result);
Py_DECREF(result);
}
}
}
@ -779,33 +803,21 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
message_length = buffer_get_position(buffer) - length_location;
memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4);
if (safe) {
if (!add_last_error(self, buffer, request_id, collection_name,
collection_name_length, last_error_args)) {
goto insertfail;
}
}
/* Send the last (or only) batch */
result = _send_insert(self, client, last_error_args, buffer,
collection_name, collection_name_length,
request_id, safe);
PyMem_Free(collection_name);
/* objectify buffer */
result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id,
buffer_get_buffer(buffer),
buffer_get_position(buffer));
buffer_free(buffer);
/* Send the last (or only) batch */
send_message_result = PyObject_CallMethod(client, "_send_message", "NN",
result,
PyBool_FromLong((long)safe));
if (!send_message_result) {
if (!result) {
Py_XDECREF(exc_type);
Py_XDECREF(exc_value);
Py_XDECREF(exc_trace);
return NULL;
} else {
Py_DECREF(send_message_result);
Py_DECREF(result);
}
if (exc_type) {
@ -1050,15 +1062,15 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
if (op == _INSERT) {
_set_document_too_large(cur_size, max_bson_size);
} else {
PyObject* InvalidDocument = _error("InvalidDocument");
if (InvalidDocument) {
PyObject* DocumentTooLarge = _error("DocumentTooLarge");
if (DocumentTooLarge) {
/*
* There's nothing intelligent we can say
* about size for update and remove.
*/
PyErr_SetString(InvalidDocument,
PyErr_SetString(DocumentTooLarge,
"command document too large");
Py_DECREF(InvalidDocument);
Py_DECREF(DocumentTooLarge);
}
}
goto cmditerfail;

View File

@ -17,6 +17,7 @@
from bson.objectid import ObjectId
from bson.son import SON
from pymongo.errors import (BulkWriteError,
DocumentTooLarge,
InvalidOperation,
OperationFailure)
from pymongo.message import (_INSERT, _UPDATE, _DELETE,
@ -26,6 +27,8 @@ from pymongo.message import (_INSERT, _UPDATE, _DELETE,
_DELETE_ALL = 0
_DELETE_ONE = 1
# For backwards compatibility. See MongoDB src/mongo/base/error_codes.err
_BAD_VALUE = 2
_UNKNOWN_ERROR = 8
_WRITE_CONCERN_ERROR = 64
@ -84,7 +87,7 @@ def _merge_legacy(run, full_result, result, index):
# will fail.
note = result.get("jnote", result.get("wnote"))
if note:
raise OperationFailure(note, 2, result)
raise OperationFailure(note, _BAD_VALUE, result)
affected = result.get('n', 0)
@ -368,6 +371,14 @@ class _Bulk(object):
multi=(not operation['limit']),
**write_concern)
_merge_legacy(run, full_result, result, idx)
except DocumentTooLarge, exc:
# MongoDB 2.6 uses error code 2 for "too large".
error = _make_error(
run.index(idx), _BAD_VALUE, str(exc), operation)
full_result['writeErrors'].append(error)
if self.ordered:
stop = True
break
except OperationFailure, exc:
if not exc.details:
# Some error not related to the write operation

View File

@ -172,3 +172,8 @@ class ExceededMaxWaiters(Exception):
"""
pass
class DocumentTooLarge(InvalidDocument):
"""Raised when an encoded document is too large for the connected server.
"""
pass

View File

@ -34,7 +34,7 @@ try:
_use_c = True
except ImportError:
_use_c = False
from pymongo.errors import InvalidDocument, InvalidOperation, OperationFailure
from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure
MAX_INT32 = 2147483647
@ -217,6 +217,7 @@ def _do_batched_insert(collection_name, docs, check_keys,
final_message += error_message
return request_id, final_message
send_safe = safe or not continue_on_error
last_error = None
begin = struct.pack("<i", int(continue_on_error))
begin += bson._make_c_string(collection_name)
@ -224,38 +225,42 @@ def _do_batched_insert(collection_name, docs, check_keys,
data = [begin]
has_docs = False
for doc in docs:
has_docs = True
encoded = bson.BSON.encode(doc, check_keys, uuid_subtype)
encoded_length = len(encoded)
if encoded_length > client.max_bson_size:
raise InvalidDocument("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(encoded_length, client.max_bson_size))
too_large = (encoded_length > client.max_bson_size)
message_length += encoded_length
if message_length < client.max_message_size:
if message_length < client.max_message_size and not too_large:
data.append(encoded)
has_docs = True
continue
# We have enough data, send this message.
send_safe = safe or not continue_on_error
try:
client._send_message(_insert_message(_EMPTY.join(data),
send_safe), send_safe)
# Exception type could be OperationFailure or a subtype
# (e.g. DuplicateKeyError)
except OperationFailure, exc:
# Like it says, continue on error...
if continue_on_error:
# Store exception details to re-raise after the final batch.
last_error = exc
# With unacknowledged writes just return at the first error.
elif not safe:
return
# With acknowledged writes raise immediately.
else:
raise
if has_docs:
# We have enough data, send this message.
try:
client._send_message(_insert_message(_EMPTY.join(data),
send_safe), send_safe)
# Exception type could be OperationFailure or a subtype
# (e.g. DuplicateKeyError)
except OperationFailure, exc:
# Like it says, continue on error...
if continue_on_error:
# Store exception details to re-raise after the final batch.
last_error = exc
# With unacknowledged writes just return at the first error.
elif not safe:
return
# With acknowledged writes raise immediately.
else:
raise
if too_large:
raise DocumentTooLarge("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(encoded_length, client.max_bson_size))
message_length = len(begin) + encoded_length
data = [begin, encoded]
@ -352,14 +357,14 @@ def _do_batched_write_command(namespace, operation, command,
if (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size:
if not idx:
if operation == _INSERT:
raise InvalidDocument("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." % (len(value),
max_bson_size))
raise DocumentTooLarge("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." % (len(value),
max_bson_size))
# There's nothing intelligent we can say
# about size for update and remove
raise InvalidDocument("command document too large")
raise DocumentTooLarge("command document too large")
result = send_message()
results.append((idx_offset, result))
if ordered and "writeErrors" in result:

View File

@ -56,8 +56,8 @@ from pymongo.cursor_manager import CursorManager
from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
DocumentTooLarge,
DuplicateKeyError,
InvalidDocument,
InvalidURI,
OperationFailure)
from pymongo.member import Member
@ -1049,11 +1049,11 @@ class MongoClient(common.BaseObject):
if len(message) == 3:
(request_id, data, max_doc_size) = message
if max_doc_size > self.max_bson_size:
raise InvalidDocument("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(max_doc_size, self.max_bson_size))
raise DocumentTooLarge("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(max_doc_size, self.max_bson_size))
return (request_id, data)
else:
# get_more and kill_cursors messages

View File

@ -55,8 +55,8 @@ from pymongo.read_preferences import (
from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
DocumentTooLarge,
DuplicateKeyError,
InvalidDocument,
OperationFailure,
InvalidOperation)
from pymongo.thread_util import DummyLock
@ -1433,11 +1433,11 @@ class MongoReplicaSetClient(common.BaseObject):
if len(msg) == 3:
request_id, data, max_doc_size = msg
if max_doc_size > max_size:
raise InvalidDocument("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(max_doc_size, max_size))
raise DocumentTooLarge("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(max_doc_size, max_size))
return (request_id, data)
# get_more and kill_cursors messages
# don't include BSON documents.

View File

@ -17,15 +17,12 @@
import sys
import unittest
from nose.plugins.skip import SkipTest
sys.path[0:0] = [""]
from pymongo.errors import (BulkWriteError,
InvalidOperation, OperationFailure)
from test.test_client import get_client
from test.utils import server_started_with_option
from test import version
class TestBulk(unittest.TestCase):
@ -359,13 +356,11 @@ class TestBulk(unittest.TestCase):
self.coll.drop_index([('a', 1)])
def test_large_inserts_ordered(self):
client = self.coll.database.connection
if not version.at_least(client, (2, 5, 4)):
raise SkipTest('Legacy server...')
big = 'x' * self.coll.database.connection.max_bson_size
batch = self.coll.initialize_ordered_bulk_op()
batch.insert({'b': 1, 'a': 1})
batch.insert({'big': big})
batch.insert({'b': 2, 'a': 2})
try:
batch.execute()
@ -375,6 +370,40 @@ class TestBulk(unittest.TestCase):
else:
self.fail("Error not raised")
self.assertEqual(1, result['nInserted'])
self.coll.remove()
big = 'x' * (1024 * 1024 * 4)
batch = self.coll.initialize_ordered_bulk_op()
batch.insert({'a': 1, 'big': big})
batch.insert({'a': 2, 'big': big})
batch.insert({'a': 3, 'big': big})
batch.insert({'a': 4, 'big': big})
batch.insert({'a': 5, 'big': big})
batch.insert({'a': 6, 'big': big})
result = batch.execute()
self.assertEqual(6, result['nInserted'])
self.assertEqual(6, self.coll.count())
def test_large_inserts_unordered(self):
big = 'x' * self.coll.database.connection.max_bson_size
batch = self.coll.initialize_unordered_bulk_op()
batch.insert({'b': 1, 'a': 1})
batch.insert({'big': big})
batch.insert({'b': 2, 'a': 2})
try:
batch.execute()
except BulkWriteError, exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqual(2, result['nInserted'])
self.coll.remove()
big = 'x' * (1024 * 1024 * 4)

View File

@ -43,7 +43,8 @@ from pymongo import message as message_module
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from pymongo.son_manipulator import SONManipulator
from pymongo.errors import (DuplicateKeyError,
from pymongo.errors import (DocumentTooLarge,
DuplicateKeyError,
InvalidDocument,
InvalidName,
InvalidOperation,
@ -1752,7 +1753,7 @@ class TestCollection(unittest.TestCase):
if version.at_least(self.db.connection, (1, 7, 4)):
self.assertEqual(max_size, 16777216)
expected = InvalidDocument
expected = DocumentTooLarge
if version.at_least(self.client, (2, 5, 4, -1)):
# Document too large handled by the server
expected = OperationFailure
@ -1767,7 +1768,7 @@ class TestCollection(unittest.TestCase):
self.db.test.insert({"bar": "x"})
# Use w=0 here to test legacy doc size checking in all server versions
self.assertRaises(InvalidDocument, self.db.test.update,
self.assertRaises(DocumentTooLarge, self.db.test.update,
{"bar": "x"}, {"bar": "x" * (max_size - 14)}, w=0)
# This will pass with OP_UPDATE or the update command.
self.db.test.update({"bar": "x"}, {"bar": "x" * (max_size - 15)})