From ee9ecd05ee5f8d83bb598c80269adf8632562663 Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Wed, 29 Jan 2014 12:55:49 -0800 Subject: [PATCH] Introduce DocumentTooLarge exception PYTHON-630 The idea here is to unify the handling of oversize documents when using the bulk API in MongoDB 2.6 and previous versions. This also means that using bulk Collection.insert against legacy servers will attempt to insert all documents previous to the oversize document before raising. --- pymongo/_cmessagemodule.c | 110 +++++++++++++++------------- pymongo/bulk.py | 13 +++- pymongo/errors.py | 5 ++ pymongo/message.py | 71 +++++++++--------- pymongo/mongo_client.py | 12 +-- pymongo/mongo_replica_set_client.py | 12 +-- test/test_bulk.py | 41 +++++++++-- test/test_collection.py | 7 +- 8 files changed, 167 insertions(+), 104 deletions(-) diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index 80a88ee5e..4aa2681f4 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -542,27 +542,48 @@ static PyObject* _cbson_get_more_message(PyObject* self, PyObject* args) { static void _set_document_too_large(int size, long max) { - PyObject* InvalidDocument = _error("InvalidDocument"); - if (InvalidDocument) { + PyObject* DocumentTooLarge = _error("DocumentTooLarge"); + if (DocumentTooLarge) { #if PY_MAJOR_VERSION >= 3 PyObject* error = PyUnicode_FromFormat(DOC_TOO_LARGE_FMT, size, max); #else PyObject* error = PyString_FromFormat(DOC_TOO_LARGE_FMT, size, max); #endif if (error) { - PyErr_SetObject(InvalidDocument, error); + PyErr_SetObject(DocumentTooLarge, error); Py_DECREF(error); } - Py_DECREF(InvalidDocument); + Py_DECREF(DocumentTooLarge); } } +static PyObject* +_send_insert(PyObject* self, PyObject* client, + PyObject* gle_args, buffer_t buffer, + char* coll_name, int coll_len, int request_id, int safe) { + + PyObject* result; + if (safe) { + if (!add_last_error(self, buffer, request_id, + coll_name, coll_len, gle_args)) { + return NULL; + } + } + + result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id, + buffer_get_buffer(buffer), + buffer_get_position(buffer)); + + return PyObject_CallMethod(client, "_send_message", "NN", + result, PyBool_FromLong((long)safe)); +} + static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { struct module_state *state = GETSTATE(self); /* NOTE just using a random number as the request_id */ int request_id = rand(); - int options = 0; + int send_safe, options = 0; int length_location, message_length; int collection_name_length; char* collection_name = NULL; @@ -574,7 +595,6 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { PyObject* result; PyObject* max_bson_size_obj; PyObject* max_message_size_obj; - PyObject* send_message_result; unsigned char check_keys; unsigned char safe; unsigned char continue_on_error; @@ -598,6 +618,11 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { if (continue_on_error) { options += 1; } + /* + * If we are doing unacknowledged writes *and* continue_on_error + * is True it's pointless (and slower) to send GLE. + */ + send_safe = (safe || !continue_on_error); max_bson_size_obj = PyObject_GetAttrString(client, "max_bson_size"); #if PY_MAJOR_VERSION >= 3 @@ -651,7 +676,6 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { while ((doc = PyIter_Next(iterator)) != NULL) { int before = buffer_get_position(buffer); int cur_size; - empty = 0; if (!write_dict(state->_cbson, buffer, doc, check_keys, uuid_subtype, 1)) { Py_DECREF(doc); goto iterfail; @@ -660,15 +684,28 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { cur_size = buffer_get_position(buffer) - before; if (cur_size > max_bson_size) { + /* If we've encoded anything send it before raising. */ + if (!empty) { + buffer_update_position(buffer, before); + message_length = buffer_get_position(buffer) - length_location; + memcpy(buffer_get_buffer(buffer) + length_location, + &message_length, 4); + result = _send_insert(self, client, last_error_args, buffer, + collection_name, collection_name_length, + request_id, send_safe); + if (!result) + goto iterfail; + Py_DECREF(result); + } _set_document_too_large(cur_size, max_bson_size); goto iterfail; } + empty = 0; /* We have enough data, send this batch. */ if (buffer_get_position(buffer) > max_message_size) { int new_request_id = rand(); int message_start; - PyObject* send_gle = Py_False; buffer_t new_buffer = buffer_new(); if (!new_buffer) { PyErr_NoMemory(); @@ -696,29 +733,16 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { message_length = buffer_get_position(buffer) - length_location; memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); - /* If we are doing unacknowledged writes *and* continue_on_error - * is True it's pointless (and slower) to send GLE. */ - if (safe || !continue_on_error) { - send_gle = Py_True; - if (!add_last_error(self, buffer, request_id, collection_name, - collection_name_length, last_error_args)) { - buffer_free(new_buffer); - goto iterfail; - } - } - /* Objectify buffer */ - result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id, - buffer_get_buffer(buffer), - buffer_get_position(buffer)); + result = _send_insert(self, client, last_error_args, buffer, + collection_name, collection_name_length, + request_id, send_safe); + buffer_free(buffer); buffer = new_buffer; request_id = new_request_id; length_location = message_start; - send_message_result = PyObject_CallMethod(client, "_send_message", - "NO", result, send_gle); - - if (!send_message_result) { + if (!result) { PyObject *etype = NULL, *evalue = NULL, *etrace = NULL; PyObject* OperationFailure; PyErr_Fetch(&etype, &evalue, &etrace); @@ -757,7 +781,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { PyErr_Restore(etype, evalue, etrace); goto iterfail; } else { - Py_DECREF(send_message_result); + Py_DECREF(result); } } } @@ -779,33 +803,21 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { message_length = buffer_get_position(buffer) - length_location; memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); - if (safe) { - if (!add_last_error(self, buffer, request_id, collection_name, - collection_name_length, last_error_args)) { - goto insertfail; - } - } + /* Send the last (or only) batch */ + result = _send_insert(self, client, last_error_args, buffer, + collection_name, collection_name_length, + request_id, safe); PyMem_Free(collection_name); - - /* objectify buffer */ - result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id, - buffer_get_buffer(buffer), - buffer_get_position(buffer)); buffer_free(buffer); - /* Send the last (or only) batch */ - send_message_result = PyObject_CallMethod(client, "_send_message", "NN", - result, - PyBool_FromLong((long)safe)); - - if (!send_message_result) { + if (!result) { Py_XDECREF(exc_type); Py_XDECREF(exc_value); Py_XDECREF(exc_trace); return NULL; } else { - Py_DECREF(send_message_result); + Py_DECREF(result); } if (exc_type) { @@ -1050,15 +1062,15 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { if (op == _INSERT) { _set_document_too_large(cur_size, max_bson_size); } else { - PyObject* InvalidDocument = _error("InvalidDocument"); - if (InvalidDocument) { + PyObject* DocumentTooLarge = _error("DocumentTooLarge"); + if (DocumentTooLarge) { /* * There's nothing intelligent we can say * about size for update and remove. */ - PyErr_SetString(InvalidDocument, + PyErr_SetString(DocumentTooLarge, "command document too large"); - Py_DECREF(InvalidDocument); + Py_DECREF(DocumentTooLarge); } } goto cmditerfail; diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 1a7e5ef2e..d828d567d 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -17,6 +17,7 @@ from bson.objectid import ObjectId from bson.son import SON from pymongo.errors import (BulkWriteError, + DocumentTooLarge, InvalidOperation, OperationFailure) from pymongo.message import (_INSERT, _UPDATE, _DELETE, @@ -26,6 +27,8 @@ from pymongo.message import (_INSERT, _UPDATE, _DELETE, _DELETE_ALL = 0 _DELETE_ONE = 1 +# For backwards compatibility. See MongoDB src/mongo/base/error_codes.err +_BAD_VALUE = 2 _UNKNOWN_ERROR = 8 _WRITE_CONCERN_ERROR = 64 @@ -84,7 +87,7 @@ def _merge_legacy(run, full_result, result, index): # will fail. note = result.get("jnote", result.get("wnote")) if note: - raise OperationFailure(note, 2, result) + raise OperationFailure(note, _BAD_VALUE, result) affected = result.get('n', 0) @@ -368,6 +371,14 @@ class _Bulk(object): multi=(not operation['limit']), **write_concern) _merge_legacy(run, full_result, result, idx) + except DocumentTooLarge, exc: + # MongoDB 2.6 uses error code 2 for "too large". + error = _make_error( + run.index(idx), _BAD_VALUE, str(exc), operation) + full_result['writeErrors'].append(error) + if self.ordered: + stop = True + break except OperationFailure, exc: if not exc.details: # Some error not related to the write operation diff --git a/pymongo/errors.py b/pymongo/errors.py index b76878750..0825e7bc1 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -172,3 +172,8 @@ class ExceededMaxWaiters(Exception): """ pass + +class DocumentTooLarge(InvalidDocument): + """Raised when an encoded document is too large for the connected server. + """ + pass diff --git a/pymongo/message.py b/pymongo/message.py index 662e8e168..34f41be1e 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -34,7 +34,7 @@ try: _use_c = True except ImportError: _use_c = False -from pymongo.errors import InvalidDocument, InvalidOperation, OperationFailure +from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure MAX_INT32 = 2147483647 @@ -217,6 +217,7 @@ def _do_batched_insert(collection_name, docs, check_keys, final_message += error_message return request_id, final_message + send_safe = safe or not continue_on_error last_error = None begin = struct.pack(" client.max_bson_size: - raise InvalidDocument("BSON document too large (%d bytes)" - " - the connected server supports" - " BSON document sizes up to %d" - " bytes." % - (encoded_length, client.max_bson_size)) + too_large = (encoded_length > client.max_bson_size) + message_length += encoded_length - if message_length < client.max_message_size: + if message_length < client.max_message_size and not too_large: data.append(encoded) + has_docs = True continue - # We have enough data, send this message. - send_safe = safe or not continue_on_error - try: - client._send_message(_insert_message(_EMPTY.join(data), - send_safe), send_safe) - # Exception type could be OperationFailure or a subtype - # (e.g. DuplicateKeyError) - except OperationFailure, exc: - # Like it says, continue on error... - if continue_on_error: - # Store exception details to re-raise after the final batch. - last_error = exc - # With unacknowledged writes just return at the first error. - elif not safe: - return - # With acknowledged writes raise immediately. - else: - raise + if has_docs: + # We have enough data, send this message. + try: + client._send_message(_insert_message(_EMPTY.join(data), + send_safe), send_safe) + # Exception type could be OperationFailure or a subtype + # (e.g. DuplicateKeyError) + except OperationFailure, exc: + # Like it says, continue on error... + if continue_on_error: + # Store exception details to re-raise after the final batch. + last_error = exc + # With unacknowledged writes just return at the first error. + elif not safe: + return + # With acknowledged writes raise immediately. + else: + raise + + if too_large: + raise DocumentTooLarge("BSON document too large (%d bytes)" + " - the connected server supports" + " BSON document sizes up to %d" + " bytes." % + (encoded_length, client.max_bson_size)) + message_length = len(begin) + encoded_length data = [begin, encoded] @@ -352,14 +357,14 @@ def _do_batched_write_command(namespace, operation, command, if (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size: if not idx: if operation == _INSERT: - raise InvalidDocument("BSON document too large (%d bytes)" - " - the connected server supports" - " BSON document sizes up to %d" - " bytes." % (len(value), - max_bson_size)) + raise DocumentTooLarge("BSON document too large (%d bytes)" + " - the connected server supports" + " BSON document sizes up to %d" + " bytes." % (len(value), + max_bson_size)) # There's nothing intelligent we can say # about size for update and remove - raise InvalidDocument("command document too large") + raise DocumentTooLarge("command document too large") result = send_message() results.append((idx_offset, result)) if ordered and "writeErrors" in result: diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index dc887620f..b0e0f07c6 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -56,8 +56,8 @@ from pymongo.cursor_manager import CursorManager from pymongo.errors import (AutoReconnect, ConfigurationError, ConnectionFailure, + DocumentTooLarge, DuplicateKeyError, - InvalidDocument, InvalidURI, OperationFailure) from pymongo.member import Member @@ -1049,11 +1049,11 @@ class MongoClient(common.BaseObject): if len(message) == 3: (request_id, data, max_doc_size) = message if max_doc_size > self.max_bson_size: - raise InvalidDocument("BSON document too large (%d bytes)" - " - the connected server supports" - " BSON document sizes up to %d" - " bytes." % - (max_doc_size, self.max_bson_size)) + raise DocumentTooLarge("BSON document too large (%d bytes)" + " - the connected server supports" + " BSON document sizes up to %d" + " bytes." % + (max_doc_size, self.max_bson_size)) return (request_id, data) else: # get_more and kill_cursors messages diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index 6fde0b2fc..6d04f1a59 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -55,8 +55,8 @@ from pymongo.read_preferences import ( from pymongo.errors import (AutoReconnect, ConfigurationError, ConnectionFailure, + DocumentTooLarge, DuplicateKeyError, - InvalidDocument, OperationFailure, InvalidOperation) from pymongo.thread_util import DummyLock @@ -1433,11 +1433,11 @@ class MongoReplicaSetClient(common.BaseObject): if len(msg) == 3: request_id, data, max_doc_size = msg if max_doc_size > max_size: - raise InvalidDocument("BSON document too large (%d bytes)" - " - the connected server supports" - " BSON document sizes up to %d" - " bytes." % - (max_doc_size, max_size)) + raise DocumentTooLarge("BSON document too large (%d bytes)" + " - the connected server supports" + " BSON document sizes up to %d" + " bytes." % + (max_doc_size, max_size)) return (request_id, data) # get_more and kill_cursors messages # don't include BSON documents. diff --git a/test/test_bulk.py b/test/test_bulk.py index a68800b7c..58f36fccd 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -17,15 +17,12 @@ import sys import unittest -from nose.plugins.skip import SkipTest - sys.path[0:0] = [""] from pymongo.errors import (BulkWriteError, InvalidOperation, OperationFailure) from test.test_client import get_client from test.utils import server_started_with_option -from test import version class TestBulk(unittest.TestCase): @@ -359,13 +356,11 @@ class TestBulk(unittest.TestCase): self.coll.drop_index([('a', 1)]) def test_large_inserts_ordered(self): - client = self.coll.database.connection - if not version.at_least(client, (2, 5, 4)): - raise SkipTest('Legacy server...') big = 'x' * self.coll.database.connection.max_bson_size batch = self.coll.initialize_ordered_bulk_op() batch.insert({'b': 1, 'a': 1}) batch.insert({'big': big}) + batch.insert({'b': 2, 'a': 2}) try: batch.execute() @@ -375,6 +370,40 @@ class TestBulk(unittest.TestCase): else: self.fail("Error not raised") + self.assertEqual(1, result['nInserted']) + + self.coll.remove() + + big = 'x' * (1024 * 1024 * 4) + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1, 'big': big}) + batch.insert({'a': 2, 'big': big}) + batch.insert({'a': 3, 'big': big}) + batch.insert({'a': 4, 'big': big}) + batch.insert({'a': 5, 'big': big}) + batch.insert({'a': 6, 'big': big}) + result = batch.execute() + + self.assertEqual(6, result['nInserted']) + self.assertEqual(6, self.coll.count()) + + def test_large_inserts_unordered(self): + big = 'x' * self.coll.database.connection.max_bson_size + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'b': 1, 'a': 1}) + batch.insert({'big': big}) + batch.insert({'b': 2, 'a': 2}) + + try: + batch.execute() + except BulkWriteError, exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + self.coll.remove() big = 'x' * (1024 * 1024 * 4) diff --git a/test/test_collection.py b/test/test_collection.py index 1cc12c4b1..44e41217f 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -43,7 +43,8 @@ from pymongo import message as message_module from pymongo.collection import Collection from pymongo.cursor import Cursor from pymongo.son_manipulator import SONManipulator -from pymongo.errors import (DuplicateKeyError, +from pymongo.errors import (DocumentTooLarge, + DuplicateKeyError, InvalidDocument, InvalidName, InvalidOperation, @@ -1752,7 +1753,7 @@ class TestCollection(unittest.TestCase): if version.at_least(self.db.connection, (1, 7, 4)): self.assertEqual(max_size, 16777216) - expected = InvalidDocument + expected = DocumentTooLarge if version.at_least(self.client, (2, 5, 4, -1)): # Document too large handled by the server expected = OperationFailure @@ -1767,7 +1768,7 @@ class TestCollection(unittest.TestCase): self.db.test.insert({"bar": "x"}) # Use w=0 here to test legacy doc size checking in all server versions - self.assertRaises(InvalidDocument, self.db.test.update, + self.assertRaises(DocumentTooLarge, self.db.test.update, {"bar": "x"}, {"bar": "x" * (max_size - 14)}, w=0) # This will pass with OP_UPDATE or the update command. self.db.test.update({"bar": "x"}, {"bar": "x" * (max_size - 15)})