diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index b8faa4973..2dc8262b7 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -580,10 +580,10 @@ _set_document_too_large(int size, long max) { } static PyObject* -_send_insert(PyObject* self, PyObject* sock_info, +_send_insert(PyObject* self, PyObject* ctx, PyObject* gle_args, buffer_t buffer, char* coll_name, int coll_len, int request_id, int safe, - codec_options_t* options) { + codec_options_t* options, PyObject* to_publish) { if (safe) { if (!add_last_error(self, buffer, request_id, @@ -594,13 +594,14 @@ _send_insert(PyObject* self, PyObject* sock_info, /* The max_doc_size parameter for legacy_write is the max size of any * document in buffer. We enforced max size already, pass 0 here. */ - return PyObject_CallMethod(sock_info, "legacy_write", - "i" BYTES_FORMAT_STRING "iN", + return PyObject_CallMethod(ctx, "legacy_write", + "i" BYTES_FORMAT_STRING "iNO", request_id, buffer_get_buffer(buffer), buffer_get_position(buffer), 0, - PyBool_FromLong((long)safe)); + PyBool_FromLong((long)safe), + to_publish); } static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { @@ -615,11 +616,12 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { PyObject* docs; PyObject* doc; PyObject* iterator; - PyObject* sock_info; + PyObject* ctx; PyObject* last_error_args; PyObject* result; PyObject* max_bson_size_obj; PyObject* max_message_size_obj; + PyObject* to_publish = NULL; unsigned char check_keys; unsigned char safe; unsigned char continue_on_error; @@ -638,7 +640,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { &last_error_args, &continue_on_error, convert_codec_options, &options, - &sock_info)) { + &ctx)) { return NULL; } if (continue_on_error) { @@ -649,7 +651,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { * is True it's pointless (and slower) to send GLE. */ send_safe = (safe || !continue_on_error); - max_bson_size_obj = PyObject_GetAttrString(sock_info, "max_bson_size"); + max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size"); #if PY_MAJOR_VERSION >= 3 max_bson_size = PyLong_AsLong(max_bson_size_obj); #else @@ -662,7 +664,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { return NULL; } - max_message_size_obj = PyObject_GetAttrString(sock_info, "max_message_size"); + max_message_size_obj = PyObject_GetAttrString(ctx, "max_message_size"); #if PY_MAJOR_VERSION >= 3 max_message_size = PyLong_AsLong(max_message_size_obj); #else @@ -692,6 +694,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { goto insertfail; } + if (!(to_publish = PyList_New(0))) { + goto insertfail; + } + iterator = PyObject_GetIter(docs); if (iterator == NULL) { PyObject* InvalidOperation = _error("InvalidOperation"); @@ -706,10 +712,8 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { int cur_size; if (!write_dict(state->_cbson, buffer, doc, check_keys, &options, 1)) { - Py_DECREF(doc); goto iterfail; } - Py_DECREF(doc); cur_size = buffer_get_position(buffer) - before; if (cur_size > max_bson_size) { @@ -719,9 +723,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { message_length = buffer_get_position(buffer) - length_location; memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); - result = _send_insert(self, sock_info, last_error_args, buffer, + result = _send_insert(self, ctx, last_error_args, buffer, collection_name, collection_name_length, - request_id, send_safe, &options); + request_id, send_safe, &options, + to_publish); if (!result) goto iterfail; Py_DECREF(result); @@ -762,15 +767,20 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { message_length = buffer_get_position(buffer) - length_location; memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); - result = _send_insert(self, sock_info, last_error_args, buffer, + result = _send_insert(self, ctx, last_error_args, buffer, collection_name, collection_name_length, - request_id, send_safe, &options); + request_id, send_safe, &options, to_publish); buffer_free(buffer); buffer = new_buffer; request_id = new_request_id; length_location = message_start; + Py_DECREF(to_publish); + if (!(to_publish = PyList_New(0))) { + goto insertfail; + } + if (!result) { PyObject *etype = NULL, *evalue = NULL, *etrace = NULL; PyObject* OperationFailure; @@ -786,7 +796,9 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { Py_DECREF(etype); Py_XDECREF(evalue); Py_XDECREF(etrace); + Py_DECREF(to_publish); Py_DECREF(iterator); + Py_DECREF(doc); buffer_free(buffer); PyMem_Free(collection_name); Py_RETURN_NONE; @@ -799,6 +811,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { exc_type = etype; exc_value = evalue; exc_trace = etrace; + if (PyList_Append(to_publish, doc) < 0) { + goto iterfail; + } + Py_CLEAR(doc); continue; } } @@ -813,6 +829,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { Py_DECREF(result); } } + if (PyList_Append(to_publish, doc) < 0) { + goto iterfail; + } + Py_CLEAR(doc); } Py_DECREF(iterator); @@ -833,10 +853,11 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); /* Send the last (or only) batch */ - result = _send_insert(self, sock_info, last_error_args, buffer, + result = _send_insert(self, ctx, last_error_args, buffer, collection_name, collection_name_length, - request_id, safe, &options); + request_id, safe, &options, to_publish); + Py_DECREF(to_publish); PyMem_Free(collection_name); buffer_free(buffer); @@ -859,19 +880,22 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { Py_RETURN_NONE; iterfail: + Py_XDECREF(doc); Py_DECREF(iterator); insertfail: Py_XDECREF(exc_type); Py_XDECREF(exc_value); Py_XDECREF(exc_trace); + Py_XDECREF(to_publish); buffer_free(buffer); PyMem_Free(collection_name); return NULL; } static PyObject* -_send_write_command(PyObject* sock_info, buffer_t buffer, - int lst_len_loc, int cmd_len_loc, unsigned char* errors) { +_send_write_command(PyObject* ctx, buffer_t buffer, int lst_len_loc, + int cmd_len_loc, unsigned char* errors, + PyObject* to_publish) { PyObject* result; @@ -885,11 +909,12 @@ _send_write_command(PyObject* sock_info, buffer_t buffer, memcpy(buffer_get_buffer(buffer) + 4, &request_id, 4); /* Send the current batch */ - result = PyObject_CallMethod(sock_info, "write_command", - "i" BYTES_FORMAT_STRING, + result = PyObject_CallMethod(ctx, "write_command", + "i" BYTES_FORMAT_STRING "O", request_id, buffer_get_buffer(buffer), - buffer_get_position(buffer)); + buffer_get_position(buffer), + to_publish); if (result && PyDict_GetItemString(result, "writeErrors")) *errors = 1; return result; @@ -948,10 +973,11 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { PyObject* command; PyObject* doc; PyObject* docs; - PyObject* sock_info; + PyObject* ctx; PyObject* iterator; PyObject* result; PyObject* results; + PyObject* to_publish = NULL; unsigned char op; unsigned char check_keys; codec_options_t options; @@ -962,11 +988,11 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { if (!PyArg_ParseTuple(args, "et#bOObO&O", "utf-8", &ns, &ns_len, &op, &command, &docs, &check_keys, convert_codec_options, &options, - &sock_info)) { + &ctx)) { return NULL; } - max_bson_size_obj = PyObject_GetAttrString(sock_info, "max_bson_size"); + max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size"); #if PY_MAJOR_VERSION >= 3 max_bson_size = PyLong_AsLong(max_bson_size_obj); #else @@ -984,7 +1010,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { */ max_cmd_size = max_bson_size + 16382; - max_write_batch_size_obj = PyObject_GetAttrString(sock_info, "max_write_batch_size"); + max_write_batch_size_obj = PyObject_GetAttrString(ctx, "max_write_batch_size"); #if PY_MAJOR_VERSION >= 3 max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj); #else @@ -1006,10 +1032,18 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { return NULL; } + if (!(to_publish = PyList_New(0))) { + destroy_codec_options(&options); + PyMem_Free(ns); + Py_DECREF(results); + return NULL; + } + if (!(buffer = _command_buffer_new(ns, ns_len))) { destroy_codec_options(&options); PyMem_Free(ns); Py_DECREF(results); + Py_DECREF(to_publish); return NULL; } @@ -1086,16 +1120,13 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { INT2STRING(key, idx); if (!buffer_write_bytes(buffer, "\x03", 1) || !buffer_write_bytes(buffer, key, (int)strlen(key) + 1)) { - Py_DECREF(doc); goto cmditerfail; } cur_doc_begin = buffer_get_position(buffer); if (!write_dict(state->_cbson, buffer, doc, check_keys, &options, 1)) { - Py_DECREF(doc); goto cmditerfail; } - Py_DECREF(doc); /* We have enough data, maybe send this batch. */ enough_data = (buffer_get_position(buffer) > max_cmd_size); @@ -1147,8 +1178,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { goto cmditerfail; } - result = _send_write_command(sock_info, buffer, - lst_len_loc, cmd_len_loc, &errors); + result = _send_write_command(ctx, buffer, lst_len_loc, + cmd_len_loc, &errors, to_publish); buffer_free(buffer); buffer = new_buffer; @@ -1166,7 +1197,10 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { if (!result) goto cmditerfail; - PyList_Append(results, result); + if (PyList_Append(results, result) < 0) { + Py_DECREF(result); + goto cmditerfail; + } Py_DECREF(result); if (errors && ordered) { @@ -1177,7 +1211,15 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { } idx_offset += idx; idx = 0; + Py_DECREF(to_publish); + if (!(to_publish = PyList_New(0))) { + goto cmditerfail; + } } + if (PyList_Append(to_publish, doc) < 0) { + goto cmditerfail; + } + Py_CLEAR(doc); idx += 1; } Py_DECREF(iterator); @@ -1198,8 +1240,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { if (!buffer_write_bytes(buffer, "\x00\x00", 2)) goto cmdfail; - result = _send_write_command(sock_info, buffer, - lst_len_loc, cmd_len_loc, &errors); + result = _send_write_command(ctx, buffer, lst_len_loc, + cmd_len_loc, &errors, to_publish); if (!result) goto cmdfail; @@ -1213,16 +1255,22 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { buffer_free(buffer); - PyList_Append(results, result); + if (PyList_Append(results, result) < 0) { + Py_DECREF(result); + goto cmdfail; + } Py_DECREF(result); + Py_DECREF(to_publish); destroy_codec_options(&options); return results; cmditerfail: + Py_XDECREF(doc); Py_DECREF(iterator); cmdfail: destroy_codec_options(&options); Py_DECREF(results); + Py_XDECREF(to_publish); buffer_free(buffer); return NULL; } diff --git a/pymongo/bulk.py b/pymongo/bulk.py index d7658a209..1bd3ba77c 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -29,7 +29,9 @@ from pymongo.errors import (BulkWriteError, InvalidOperation, OperationFailure) from pymongo.message import (_INSERT, _UPDATE, _DELETE, - _do_batched_write_command) + _do_batched_write_command, + _randint, + _BulkWriteContext) from pymongo.write_concern import WriteConcern @@ -181,11 +183,13 @@ def _merge_command(run, full_result, results): write_errors = result.get("writeErrors") if write_errors: for doc in write_errors: + # Leave the server response intact for APM. + replacement = doc.copy() idx = doc["index"] + offset - doc["index"] = run.index(idx) + replacement["index"] = run.index(idx) # Add the failed operation to the error document. - doc[_UOP] = run.ops[idx] - full_result["writeErrors"].extend(write_errors) + replacement[_UOP] = run.ops[idx] + full_result["writeErrors"].append(replacement) wc_error = result.get("writeConcernError") if wc_error: @@ -276,15 +280,19 @@ class _Bulk(object): "nRemoved": 0, "upserted": [], } + op_id = _randint() + db_name = self.collection.database.name + for run in generator: cmd = SON([(_COMMANDS[run.op_type], self.collection.name), ('ordered', self.ordered)]) if write_concern.document: cmd['writeConcern'] = write_concern.document + bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id) results = _do_batched_write_command( self.namespace, run.op_type, cmd, - run.ops, True, self.collection.codec_options, sock_info) + run.ops, True, self.collection.codec_options, bwc) _merge_command(run, full_result, results) # We're supposed to continue if errors are @@ -306,6 +314,7 @@ class _Bulk(object): # If ordered is True we have to send GLE or use write # commands so we can abort on the first error. write_concern = WriteConcern(w=int(self.ordered)) + op_id = _randint() for run in generator: try: @@ -313,7 +322,8 @@ class _Bulk(object): coll._insert(sock_info, run.ops, self.ordered, - write_concern=write_concern) + write_concern=write_concern, + op_id=op_id) elif run.op_type == _UPDATE: for operation in run.ops: doc = operation['u'] @@ -326,13 +336,15 @@ class _Bulk(object): operation['upsert'], check_keys, operation['multi'], - write_concern=write_concern) + write_concern=write_concern, + op_id=op_id) else: for operation in run.ops: coll._delete(sock_info, operation['q'], not operation['limit'], - write_concern) + write_concern, + op_id) except OperationFailure: if self.ordered: break @@ -350,6 +362,7 @@ class _Bulk(object): "nRemoved": 0, "upserted": [], } + op_id = _randint() stop = False for run in generator: for idx, operation in enumerate(run.ops): @@ -360,7 +373,8 @@ class _Bulk(object): if run.op_type == _INSERT: coll._insert(sock_info, operation, - write_concern=write_concern) + write_concern=write_concern, + op_id=op_id) result = {} elif run.op_type == _UPDATE: doc = operation['u'] @@ -373,12 +387,14 @@ class _Bulk(object): operation['upsert'], check_keys, operation['multi'], - write_concern=write_concern) + write_concern=write_concern, + op_id=op_id) else: result = coll._delete(sock_info, operation['q'], not operation['limit'], - write_concern) + write_concern, + op_id) _merge_legacy(run, full_result, result, idx) except DocumentTooLarge as exc: # MongoDB 2.6 uses error code 2 for "too large". diff --git a/pymongo/collection.py b/pymongo/collection.py index d2db1b02b..5263307e1 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -35,7 +35,6 @@ from pymongo.command_cursor import CommandCursor from pymongo.cursor import Cursor from pymongo.errors import ConfigurationError, InvalidName, OperationFailure from pymongo.helpers import _check_write_command_response -from pymongo.message import _INSERT from pymongo.operations import _WriteOp, IndexModel from pymongo.read_preferences import ReadPreference from pymongo.results import (BulkWriteResult, @@ -383,7 +382,7 @@ class Collection(common.BaseObject): return BulkWriteResult({}, False) def _legacy_write( - self, sock_info, name, command, acknowledged, func, *args): + self, sock_info, name, cmd, acknowledged, op_id, func, *args): """Internal legacy write helper.""" publish = monitoring.enabled() if publish: @@ -392,39 +391,37 @@ class Collection(common.BaseObject): if publish: duration = datetime.datetime.now() - start monitoring.publish_command_start( - command, self.__database.name, rqst_id, sock_info.address) + cmd, self.__database.name, rqst_id, sock_info.address, op_id) start = datetime.datetime.now() try: result = sock_info.legacy_write( rqst_id, msg, max_size, acknowledged) except OperationFailure as exc: if publish: - duration = (datetime.datetime.now() - start) + duration + dur = (datetime.datetime.now() - start) + duration details = exc.details # Succeed if GLE was successful and this is a write error. - # XXX: Is checking if "n" is in details the best way to - # differentiate write errors from something else? if details.get("ok") and "n" in details: - reply = helpers._upconvert_write_result( - name, command, details) + reply = message._convert_write_result( + name, cmd, details) monitoring.publish_command_success( - duration, reply, name, rqst_id, sock_info.address) + dur, reply, name, rqst_id, sock_info.address, op_id) else: monitoring.publish_command_failure( - duration, details, name, rqst_id, sock_info.address) + dur, details, name, rqst_id, sock_info.address, op_id) raise if publish: - # No result for w=0 - reply = None + # Comply with APM spec. + reply = {'ok': 1} if result: - reply = helpers._upconvert_write_result(name, command, result) + reply = message._convert_write_result(name, cmd, result) duration = (datetime.datetime.now() - start) + duration monitoring.publish_command_success( - duration, reply, name, rqst_id, sock_info.address) + duration, reply, name, rqst_id, sock_info.address, op_id) return result def _insert_one( - self, sock_info, doc, check_keys, manipulate, write_concern): + self, sock_info, doc, check_keys, manipulate, write_concern, op_id): """Internal helper for inserting a single document.""" if manipulate: doc = self.__database._apply_incoming_manipulators(doc, self) @@ -435,9 +432,9 @@ class Collection(common.BaseObject): concern = (write_concern or self.write_concern).document acknowledged = concern.get("w") != 0 command = SON([('insert', self.name), - ('documents', [doc]), - ('ordered', True)]) - if acknowledged and concern: + ('ordered', True), + ('documents', [doc])]) + if concern: command['writeConcern'] = concern if sock_info.max_wire_version > 1 and acknowledged: @@ -450,17 +447,17 @@ class Collection(common.BaseObject): else: # Legacy OP_INSERT. self._legacy_write( - sock_info, 'insert', command, acknowledged, + sock_info, 'insert', command, acknowledged, op_id, message.insert, self.__full_name, [doc], check_keys, acknowledged, concern, False, self.codec_options) return doc.get('_id') - def _insert(self, sock_info, docs, ordered=True, - check_keys=True, manipulate=False, write_concern=None): + def _insert(self, sock_info, docs, ordered=True, check_keys=True, + manipulate=False, write_concern=None, op_id=None): """Internal insert helper.""" if isinstance(docs, collections.MutableMapping): return self._insert_one( - sock_info, docs, check_keys, manipulate, write_concern) + sock_info, docs, check_keys, manipulate, write_concern, op_id) ids = [] @@ -489,25 +486,27 @@ class Collection(common.BaseObject): yield doc concern = (write_concern or self.write_concern).document - safe = concern.get("w") != 0 + acknowledged = concern.get("w") != 0 - if sock_info.max_wire_version > 1 and safe: + command = SON([('insert', self.name), + ('ordered', ordered)]) + if concern: + command['writeConcern'] = concern + if op_id is None: + op_id = message._randint() + bwc = message._BulkWriteContext( + self.database.name, command, sock_info, op_id) + if sock_info.max_wire_version > 1 and acknowledged: # Batched insert command. - command = SON([('insert', self.name), - ('ordered', ordered)]) - - if concern: - command['writeConcern'] = concern - results = message._do_batched_write_command( - self.database.name + ".$cmd", _INSERT, command, - gen(), check_keys, self.codec_options, sock_info) + self.database.name + ".$cmd", message._INSERT, command, + gen(), check_keys, self.codec_options, bwc) _check_write_command_response(results) else: # Legacy batched OP_INSERT. message._do_batched_insert(self.__full_name, gen(), check_keys, - safe, concern, not ordered, - self.codec_options, sock_info) + acknowledged, concern, not ordered, + self.codec_options, bwc) return ids def insert_one(self, document): @@ -572,7 +571,7 @@ class Collection(common.BaseObject): if "_id" not in document: document["_id"] = ObjectId() inserted_ids.append(document["_id"]) - yield (_INSERT, document) + yield (message._INSERT, document) blk = _Bulk(self, ordered) blk.ops = [doc for doc in gen()] @@ -581,7 +580,7 @@ class Collection(common.BaseObject): def _update(self, sock_info, criteria, document, upsert=False, check_keys=True, multi=False, manipulate=False, - write_concern=None): + write_concern=None, op_id=None): """Internal update / replace helper.""" common.validate_boolean("upsert", upsert) if manipulate: @@ -589,12 +588,12 @@ class Collection(common.BaseObject): concern = (write_concern or self.write_concern).document acknowledged = concern.get("w") != 0 command = SON([('update', self.name), + ('ordered', True), ('updates', [SON([('q', criteria), ('u', document), ('multi', multi), - ('upsert', upsert)])]), - ('ordered', True)]) - if acknowledged and concern: + ('upsert', upsert)])])]) + if concern: command['writeConcern'] = concern if sock_info.max_wire_version > 1 and acknowledged: # Update command. @@ -619,9 +618,9 @@ class Collection(common.BaseObject): else: # Legacy OP_UPDATE. return self._legacy_write( - sock_info, 'update', command, acknowledged, message.update, - self.__full_name, upsert, multi, criteria, document, - acknowledged, concern, check_keys, self.codec_options) + sock_info, 'update', command, acknowledged, op_id, + message.update, self.__full_name, upsert, multi, criteria, + document, acknowledged, concern, check_keys, self.codec_options) def replace_one(self, filter, replacement, upsert=False): """Replace a single document matching the filter. @@ -758,16 +757,17 @@ class Collection(common.BaseObject): """ self.__database.drop_collection(self.__name) - def _delete(self, sock_info, criteria, multi, write_concern=None): + def _delete( + self, sock_info, criteria, multi, write_concern=None, op_id=None): """Internal delete helper.""" common.validate_is_mapping("filter", criteria) concern = (write_concern or self.write_concern).document acknowledged = concern.get("w") != 0 command = SON([('delete', self.name), + ('ordered', True), ('deletes', [SON([('q', criteria), - ('limit', int(not multi))])]), - ('ordered', True)]) - if acknowledged and concern: + ('limit', int(not multi))])])]) + if concern: command['writeConcern'] = concern if sock_info.max_wire_version > 1 and acknowledged: @@ -780,9 +780,9 @@ class Collection(common.BaseObject): else: # Legacy OP_DELETE. return self._legacy_write( - sock_info, 'delete', command, acknowledged, message.delete, - self.__full_name, criteria, acknowledged, concern, - self.codec_options, int(not multi)) + sock_info, 'delete', command, acknowledged, op_id, + message.delete, self.__full_name, criteria, acknowledged, + concern, self.codec_options, int(not multi)) def delete_one(self, filter): """Delete a single document matching the filter. diff --git a/pymongo/helpers.py b/pymongo/helpers.py index 866ede72e..8c79f408b 100644 --- a/pymongo/helpers.py +++ b/pymongo/helpers.py @@ -108,7 +108,7 @@ def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()): # Fake a getMore command response. OP_GET_MORE provides no document. msg = "Cursor not found, cursor id: %d" % (cursor_id,) - errobj = {"ok" : 0, "errmsg" : msg, "code" : 43} + errobj = {"ok": 0, "errmsg": msg, "code": 43} raise CursorNotFound(msg, 43, errobj) elif response_flag & 2: error_object = bson.BSON(response[20:]).decode() @@ -274,47 +274,6 @@ def _check_write_command_response(results): error.get("errmsg"), error.get("code"), error) -def _upconvert_write_result(operation, command, result): - """Convert a legacy write result to write commmand format.""" - - # Based on _merge_legacy from bulk.py - affected = result.get("n", 0) - res = {"ok": 1, "n": affected} - errmsg = result.get("errmsg", result.get("err", "")) - if errmsg: - # The write was successful on at least the primary so don't return. - if result.get("wtimeout"): - res["writeConcernError"] = {"errmsg": errmsg, - "code": 64, - "errInfo": {"wtimeout": True}} - else: - # The write failed. - error = {"index": 0, - "code": result.get("code", 8), - "errmsg": errmsg} - if "errInfo" in result: - error["errInfo"] = result["errInfo"] - res["writeErrors"] = [error] - return res - if operation == "insert": - # GLE result for insert is always 0 in most MongoDB versions. - res["n"] = 1 - elif operation == "update": - res["nModified"] = 0 - if "upserted" in result: - res["upserted"] = [{"index": 0, "_id": result["upserted"]}] - # Versions of MongoDB before 2.6 don't return the _id for an - # upsert if _id is not an ObjectId. - elif result.get("updatedExisting") is False and affected == 1: - # If _id is in both the update document *and* the query spec - # the update document _id takes precedence. - _id = command["u"].get("_id", command["q"].get("_id")) - res["upserted"] = [{"index": 0, "_id": _id}] - else: - res["nModified"] = affected - return res - - def _fields_list_to_dict(fields, option_name): """Takes a sequence of field names and returns a matching dictionary. diff --git a/pymongo/message.py b/pymongo/message.py index 6c51af1d7..b9840c34a 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -20,6 +20,7 @@ MongoDB. application developers. """ +import datetime import random import struct @@ -32,6 +33,7 @@ try: _use_c = True except ImportError: _use_c = False +from pymongo import monitoring from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure from pymongo.read_preferences import ReadPreference @@ -57,6 +59,11 @@ _OP_MAP = { } +def _randint(): + """Generate a pseudo random 32 bit integer.""" + return random.randint(MIN_INT32, MAX_INT32) + + def _maybe_add_read_preference(spec, read_preference): """Add $readPreference to spec when appropriate.""" mode = read_preference.mode @@ -75,6 +82,44 @@ def _maybe_add_read_preference(spec, read_preference): return spec +def _convert_write_result(operation, command, result): + """Convert a legacy write result to write commmand format.""" + + # Based on _merge_legacy from bulk.py + affected = result.get("n", 0) + res = {"ok": 1, "n": affected} + errmsg = result.get("errmsg", result.get("err", "")) + if errmsg: + # The write was successful on at least the primary so don't return. + if result.get("wtimeout"): + res["writeConcernError"] = {"errmsg": errmsg, + "code": 64, + "errInfo": {"wtimeout": True}} + else: + # The write failed. + error = {"index": 0, + "code": result.get("code", 8), + "errmsg": errmsg} + if "errInfo" in result: + error["errInfo"] = result["errInfo"] + res["writeErrors"] = [error] + return res + if operation == "insert": + # GLE result for insert is always 0 in most MongoDB versions. + res["n"] = len(command['documents']) + elif operation == "update": + if "upserted" in result: + res["upserted"] = [{"index": 0, "_id": result["upserted"]}] + # Versions of MongoDB before 2.6 don't return the _id for an + # upsert if _id is not an ObjectId. + elif result.get("updatedExisting") is False and affected == 1: + # If _id is in both the update document *and* the query spec + # the update document _id takes precedence. + _id = command["u"].get("_id", command["q"].get("_id")) + res["upserted"] = [{"index": 0, "_id": _id}] + return res + + _OPTIONS = SON([ ('tailable', 2), ('oplogReplay', 8), @@ -90,7 +135,6 @@ _MODIFIERS = SON([ ('$comment', 'comment'), ('$maxScan', 'maxScan'), ('$maxTimeMS', 'maxTimeMS'), - ('$readPreference', 'readPreference'), ('$max', 'max'), ('$min', 'min'), ('$returnKey', 'returnKey'), @@ -121,12 +165,11 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options): if skip: cmd['skip'] = skip if limit: - cmd['limit'] = limit + cmd['limit'] = abs(limit) + if limit < 0: + cmd['singleBatch'] = True if batch_size: cmd['batchSize'] = batch_size - # XXX: Should the check for 1 be here? - if limit < 0 or limit == 1: - cmd['singleBatch'] = True if options: cmd.update([(opt, True) @@ -262,7 +305,7 @@ def __pack_message(operation, data): Returns the resultant message string. """ - request_id = random.randint(MIN_INT32, MAX_INT32) + request_id = _randint() message = struct.pack(" sock_info.max_bson_size) + too_large = (encoded_length > ctx.max_bson_size) message_length += encoded_length - if message_length < sock_info.max_message_size and not too_large: + if message_length < ctx.max_message_size and not too_large: data.write(encoded) + to_send.append(doc) has_docs = True continue @@ -427,7 +582,7 @@ def _do_batched_insert(collection_name, docs, check_keys, # We have enough data, send this message. try: request_id, msg = _insert_message(data.getvalue(), send_safe) - sock_info.legacy_write(request_id, msg, 0, send_safe) + ctx.legacy_write(request_id, msg, 0, send_safe, to_send) # Exception type could be OperationFailure or a subtype # (e.g. DuplicateKeyError) except OperationFailure as exc: @@ -447,18 +602,19 @@ def _do_batched_insert(collection_name, docs, check_keys, " - the connected server supports" " BSON document sizes up to %d" " bytes." % - (encoded_length, sock_info.max_bson_size)) + (encoded_length, ctx.max_bson_size)) message_length = begin_loc + encoded_length data.seek(begin_loc) data.truncate() data.write(encoded) + to_send = [doc] if not has_docs: raise InvalidOperation("cannot do an empty bulk insert") request_id, msg = _insert_message(data.getvalue(), safe) - sock_info.legacy_write(request_id, msg, 0, safe) + ctx.legacy_write(request_id, msg, 0, safe, to_send) # Re-raise any exception stored due to continue_on_error if last_error is not None: @@ -468,11 +624,11 @@ if _use_c: def _do_batched_write_command(namespace, operation, command, - docs, check_keys, opts, sock_info): + docs, check_keys, opts, ctx): """Execute a batch of insert, update, or delete commands. """ - max_bson_size = sock_info.max_bson_size - max_write_batch_size = sock_info.max_write_batch_size + max_bson_size = ctx.max_bson_size + max_write_batch_size = ctx.max_write_batch_size # Max BSON object size + 16k - 2 bytes for ending NUL bytes. # Server guarantees there is enough room: SERVER-10643. max_cmd_size = max_bson_size + 16382 @@ -511,6 +667,8 @@ def _do_batched_write_command(namespace, operation, command, # Where to write list document length list_start = buf.tell() - 4 + to_send = [] + def send_message(): """Finalize and send the current OP_QUERY message. """ @@ -524,11 +682,11 @@ def _do_batched_write_command(namespace, operation, command, buf.seek(command_start) buf.write(struct.pack('