diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index 80a88ee5e..4aa2681f4 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -542,27 +542,48 @@ static PyObject* _cbson_get_more_message(PyObject* self, PyObject* args) { static void _set_document_too_large(int size, long max) { - PyObject* InvalidDocument = _error("InvalidDocument"); - if (InvalidDocument) { + PyObject* DocumentTooLarge = _error("DocumentTooLarge"); + if (DocumentTooLarge) { #if PY_MAJOR_VERSION >= 3 PyObject* error = PyUnicode_FromFormat(DOC_TOO_LARGE_FMT, size, max); #else PyObject* error = PyString_FromFormat(DOC_TOO_LARGE_FMT, size, max); #endif if (error) { - PyErr_SetObject(InvalidDocument, error); + PyErr_SetObject(DocumentTooLarge, error); Py_DECREF(error); } - Py_DECREF(InvalidDocument); + Py_DECREF(DocumentTooLarge); } } +static PyObject* +_send_insert(PyObject* self, PyObject* client, + PyObject* gle_args, buffer_t buffer, + char* coll_name, int coll_len, int request_id, int safe) { + + PyObject* result; + if (safe) { + if (!add_last_error(self, buffer, request_id, + coll_name, coll_len, gle_args)) { + return NULL; + } + } + + result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id, + buffer_get_buffer(buffer), + buffer_get_position(buffer)); + + return PyObject_CallMethod(client, "_send_message", "NN", + result, PyBool_FromLong((long)safe)); +} + static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { struct module_state *state = GETSTATE(self); /* NOTE just using a random number as the request_id */ int request_id = rand(); - int options = 0; + int send_safe, options = 0; int length_location, message_length; int collection_name_length; char* collection_name = NULL; @@ -574,7 +595,6 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { PyObject* result; PyObject* max_bson_size_obj; PyObject* max_message_size_obj; - PyObject* send_message_result; unsigned char check_keys; unsigned char safe; unsigned char continue_on_error; @@ -598,6 +618,11 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { if (continue_on_error) { options += 1; } + /* + * If we are doing unacknowledged writes *and* continue_on_error + * is True it's pointless (and slower) to send GLE. + */ + send_safe = (safe || !continue_on_error); max_bson_size_obj = PyObject_GetAttrString(client, "max_bson_size"); #if PY_MAJOR_VERSION >= 3 @@ -651,7 +676,6 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { while ((doc = PyIter_Next(iterator)) != NULL) { int before = buffer_get_position(buffer); int cur_size; - empty = 0; if (!write_dict(state->_cbson, buffer, doc, check_keys, uuid_subtype, 1)) { Py_DECREF(doc); goto iterfail; @@ -660,15 +684,28 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { cur_size = buffer_get_position(buffer) - before; if (cur_size > max_bson_size) { + /* If we've encoded anything send it before raising. */ + if (!empty) { + buffer_update_position(buffer, before); + message_length = buffer_get_position(buffer) - length_location; + memcpy(buffer_get_buffer(buffer) + length_location, + &message_length, 4); + result = _send_insert(self, client, last_error_args, buffer, + collection_name, collection_name_length, + request_id, send_safe); + if (!result) + goto iterfail; + Py_DECREF(result); + } _set_document_too_large(cur_size, max_bson_size); goto iterfail; } + empty = 0; /* We have enough data, send this batch. */ if (buffer_get_position(buffer) > max_message_size) { int new_request_id = rand(); int message_start; - PyObject* send_gle = Py_False; buffer_t new_buffer = buffer_new(); if (!new_buffer) { PyErr_NoMemory(); @@ -696,29 +733,16 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { message_length = buffer_get_position(buffer) - length_location; memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); - /* If we are doing unacknowledged writes *and* continue_on_error - * is True it's pointless (and slower) to send GLE. */ - if (safe || !continue_on_error) { - send_gle = Py_True; - if (!add_last_error(self, buffer, request_id, collection_name, - collection_name_length, last_error_args)) { - buffer_free(new_buffer); - goto iterfail; - } - } - /* Objectify buffer */ - result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id, - buffer_get_buffer(buffer), - buffer_get_position(buffer)); + result = _send_insert(self, client, last_error_args, buffer, + collection_name, collection_name_length, + request_id, send_safe); + buffer_free(buffer); buffer = new_buffer; request_id = new_request_id; length_location = message_start; - send_message_result = PyObject_CallMethod(client, "_send_message", - "NO", result, send_gle); - - if (!send_message_result) { + if (!result) { PyObject *etype = NULL, *evalue = NULL, *etrace = NULL; PyObject* OperationFailure; PyErr_Fetch(&etype, &evalue, &etrace); @@ -757,7 +781,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { PyErr_Restore(etype, evalue, etrace); goto iterfail; } else { - Py_DECREF(send_message_result); + Py_DECREF(result); } } } @@ -779,33 +803,21 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { message_length = buffer_get_position(buffer) - length_location; memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); - if (safe) { - if (!add_last_error(self, buffer, request_id, collection_name, - collection_name_length, last_error_args)) { - goto insertfail; - } - } + /* Send the last (or only) batch */ + result = _send_insert(self, client, last_error_args, buffer, + collection_name, collection_name_length, + request_id, safe); PyMem_Free(collection_name); - - /* objectify buffer */ - result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id, - buffer_get_buffer(buffer), - buffer_get_position(buffer)); buffer_free(buffer); - /* Send the last (or only) batch */ - send_message_result = PyObject_CallMethod(client, "_send_message", "NN", - result, - PyBool_FromLong((long)safe)); - - if (!send_message_result) { + if (!result) { Py_XDECREF(exc_type); Py_XDECREF(exc_value); Py_XDECREF(exc_trace); return NULL; } else { - Py_DECREF(send_message_result); + Py_DECREF(result); } if (exc_type) { @@ -1050,15 +1062,15 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { if (op == _INSERT) { _set_document_too_large(cur_size, max_bson_size); } else { - PyObject* InvalidDocument = _error("InvalidDocument"); - if (InvalidDocument) { + PyObject* DocumentTooLarge = _error("DocumentTooLarge"); + if (DocumentTooLarge) { /* * There's nothing intelligent we can say * about size for update and remove. */ - PyErr_SetString(InvalidDocument, + PyErr_SetString(DocumentTooLarge, "command document too large"); - Py_DECREF(InvalidDocument); + Py_DECREF(DocumentTooLarge); } } goto cmditerfail; diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 1a7e5ef2e..d828d567d 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -17,6 +17,7 @@ from bson.objectid import ObjectId from bson.son import SON from pymongo.errors import (BulkWriteError, + DocumentTooLarge, InvalidOperation, OperationFailure) from pymongo.message import (_INSERT, _UPDATE, _DELETE, @@ -26,6 +27,8 @@ from pymongo.message import (_INSERT, _UPDATE, _DELETE, _DELETE_ALL = 0 _DELETE_ONE = 1 +# For backwards compatibility. See MongoDB src/mongo/base/error_codes.err +_BAD_VALUE = 2 _UNKNOWN_ERROR = 8 _WRITE_CONCERN_ERROR = 64 @@ -84,7 +87,7 @@ def _merge_legacy(run, full_result, result, index): # will fail. note = result.get("jnote", result.get("wnote")) if note: - raise OperationFailure(note, 2, result) + raise OperationFailure(note, _BAD_VALUE, result) affected = result.get('n', 0) @@ -368,6 +371,14 @@ class _Bulk(object): multi=(not operation['limit']), **write_concern) _merge_legacy(run, full_result, result, idx) + except DocumentTooLarge, exc: + # MongoDB 2.6 uses error code 2 for "too large". + error = _make_error( + run.index(idx), _BAD_VALUE, str(exc), operation) + full_result['writeErrors'].append(error) + if self.ordered: + stop = True + break except OperationFailure, exc: if not exc.details: # Some error not related to the write operation diff --git a/pymongo/errors.py b/pymongo/errors.py index b76878750..0825e7bc1 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -172,3 +172,8 @@ class ExceededMaxWaiters(Exception): """ pass + +class DocumentTooLarge(InvalidDocument): + """Raised when an encoded document is too large for the connected server. + """ + pass diff --git a/pymongo/message.py b/pymongo/message.py index 662e8e168..34f41be1e 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -34,7 +34,7 @@ try: _use_c = True except ImportError: _use_c = False -from pymongo.errors import InvalidDocument, InvalidOperation, OperationFailure +from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure MAX_INT32 = 2147483647 @@ -217,6 +217,7 @@ def _do_batched_insert(collection_name, docs, check_keys, final_message += error_message return request_id, final_message + send_safe = safe or not continue_on_error last_error = None begin = struct.pack(" client.max_bson_size: - raise InvalidDocument("BSON document too large (%d bytes)" - " - the connected server supports" - " BSON document sizes up to %d" - " bytes." % - (encoded_length, client.max_bson_size)) + too_large = (encoded_length > client.max_bson_size) + message_length += encoded_length - if message_length < client.max_message_size: + if message_length < client.max_message_size and not too_large: data.append(encoded) + has_docs = True continue - # We have enough data, send this message. - send_safe = safe or not continue_on_error - try: - client._send_message(_insert_message(_EMPTY.join(data), - send_safe), send_safe) - # Exception type could be OperationFailure or a subtype - # (e.g. DuplicateKeyError) - except OperationFailure, exc: - # Like it says, continue on error... - if continue_on_error: - # Store exception details to re-raise after the final batch. - last_error = exc - # With unacknowledged writes just return at the first error. - elif not safe: - return - # With acknowledged writes raise immediately. - else: - raise + if has_docs: + # We have enough data, send this message. + try: + client._send_message(_insert_message(_EMPTY.join(data), + send_safe), send_safe) + # Exception type could be OperationFailure or a subtype + # (e.g. DuplicateKeyError) + except OperationFailure, exc: + # Like it says, continue on error... + if continue_on_error: + # Store exception details to re-raise after the final batch. + last_error = exc + # With unacknowledged writes just return at the first error. + elif not safe: + return + # With acknowledged writes raise immediately. + else: + raise + + if too_large: + raise DocumentTooLarge("BSON document too large (%d bytes)" + " - the connected server supports" + " BSON document sizes up to %d" + " bytes." % + (encoded_length, client.max_bson_size)) + message_length = len(begin) + encoded_length data = [begin, encoded] @@ -352,14 +357,14 @@ def _do_batched_write_command(namespace, operation, command, if (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size: if not idx: if operation == _INSERT: - raise InvalidDocument("BSON document too large (%d bytes)" - " - the connected server supports" - " BSON document sizes up to %d" - " bytes." % (len(value), - max_bson_size)) + raise DocumentTooLarge("BSON document too large (%d bytes)" + " - the connected server supports" + " BSON document sizes up to %d" + " bytes." % (len(value), + max_bson_size)) # There's nothing intelligent we can say # about size for update and remove - raise InvalidDocument("command document too large") + raise DocumentTooLarge("command document too large") result = send_message() results.append((idx_offset, result)) if ordered and "writeErrors" in result: diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index dc887620f..b0e0f07c6 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -56,8 +56,8 @@ from pymongo.cursor_manager import CursorManager from pymongo.errors import (AutoReconnect, ConfigurationError, ConnectionFailure, + DocumentTooLarge, DuplicateKeyError, - InvalidDocument, InvalidURI, OperationFailure) from pymongo.member import Member @@ -1049,11 +1049,11 @@ class MongoClient(common.BaseObject): if len(message) == 3: (request_id, data, max_doc_size) = message if max_doc_size > self.max_bson_size: - raise InvalidDocument("BSON document too large (%d bytes)" - " - the connected server supports" - " BSON document sizes up to %d" - " bytes." % - (max_doc_size, self.max_bson_size)) + raise DocumentTooLarge("BSON document too large (%d bytes)" + " - the connected server supports" + " BSON document sizes up to %d" + " bytes." % + (max_doc_size, self.max_bson_size)) return (request_id, data) else: # get_more and kill_cursors messages diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index 6fde0b2fc..6d04f1a59 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -55,8 +55,8 @@ from pymongo.read_preferences import ( from pymongo.errors import (AutoReconnect, ConfigurationError, ConnectionFailure, + DocumentTooLarge, DuplicateKeyError, - InvalidDocument, OperationFailure, InvalidOperation) from pymongo.thread_util import DummyLock @@ -1433,11 +1433,11 @@ class MongoReplicaSetClient(common.BaseObject): if len(msg) == 3: request_id, data, max_doc_size = msg if max_doc_size > max_size: - raise InvalidDocument("BSON document too large (%d bytes)" - " - the connected server supports" - " BSON document sizes up to %d" - " bytes." % - (max_doc_size, max_size)) + raise DocumentTooLarge("BSON document too large (%d bytes)" + " - the connected server supports" + " BSON document sizes up to %d" + " bytes." % + (max_doc_size, max_size)) return (request_id, data) # get_more and kill_cursors messages # don't include BSON documents. diff --git a/test/test_bulk.py b/test/test_bulk.py index a68800b7c..58f36fccd 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -17,15 +17,12 @@ import sys import unittest -from nose.plugins.skip import SkipTest - sys.path[0:0] = [""] from pymongo.errors import (BulkWriteError, InvalidOperation, OperationFailure) from test.test_client import get_client from test.utils import server_started_with_option -from test import version class TestBulk(unittest.TestCase): @@ -359,13 +356,11 @@ class TestBulk(unittest.TestCase): self.coll.drop_index([('a', 1)]) def test_large_inserts_ordered(self): - client = self.coll.database.connection - if not version.at_least(client, (2, 5, 4)): - raise SkipTest('Legacy server...') big = 'x' * self.coll.database.connection.max_bson_size batch = self.coll.initialize_ordered_bulk_op() batch.insert({'b': 1, 'a': 1}) batch.insert({'big': big}) + batch.insert({'b': 2, 'a': 2}) try: batch.execute() @@ -375,6 +370,40 @@ class TestBulk(unittest.TestCase): else: self.fail("Error not raised") + self.assertEqual(1, result['nInserted']) + + self.coll.remove() + + big = 'x' * (1024 * 1024 * 4) + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1, 'big': big}) + batch.insert({'a': 2, 'big': big}) + batch.insert({'a': 3, 'big': big}) + batch.insert({'a': 4, 'big': big}) + batch.insert({'a': 5, 'big': big}) + batch.insert({'a': 6, 'big': big}) + result = batch.execute() + + self.assertEqual(6, result['nInserted']) + self.assertEqual(6, self.coll.count()) + + def test_large_inserts_unordered(self): + big = 'x' * self.coll.database.connection.max_bson_size + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'b': 1, 'a': 1}) + batch.insert({'big': big}) + batch.insert({'b': 2, 'a': 2}) + + try: + batch.execute() + except BulkWriteError, exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + self.coll.remove() big = 'x' * (1024 * 1024 * 4) diff --git a/test/test_collection.py b/test/test_collection.py index 1cc12c4b1..44e41217f 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -43,7 +43,8 @@ from pymongo import message as message_module from pymongo.collection import Collection from pymongo.cursor import Cursor from pymongo.son_manipulator import SONManipulator -from pymongo.errors import (DuplicateKeyError, +from pymongo.errors import (DocumentTooLarge, + DuplicateKeyError, InvalidDocument, InvalidName, InvalidOperation, @@ -1752,7 +1753,7 @@ class TestCollection(unittest.TestCase): if version.at_least(self.db.connection, (1, 7, 4)): self.assertEqual(max_size, 16777216) - expected = InvalidDocument + expected = DocumentTooLarge if version.at_least(self.client, (2, 5, 4, -1)): # Document too large handled by the server expected = OperationFailure @@ -1767,7 +1768,7 @@ class TestCollection(unittest.TestCase): self.db.test.insert({"bar": "x"}) # Use w=0 here to test legacy doc size checking in all server versions - self.assertRaises(InvalidDocument, self.db.test.update, + self.assertRaises(DocumentTooLarge, self.db.test.update, {"bar": "x"}, {"bar": "x" * (max_size - 14)}, w=0) # This will pass with OP_UPDATE or the update command. self.db.test.update({"bar": "x"}, {"bar": "x" * (max_size - 15)})