From b80fa6d632069ae7abf811481fb4b28cc6835953 Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Thu, 10 Sep 2015 09:10:19 -0700 Subject: [PATCH] PYTHON-952 - Bulk write operations monitoring This change adds monitoring of bulk write operations (i.e. Collection.bulk_write, Collection.insert_many, Collection.insert with multiple documents, Bulk.execute, etc.). It also fixes bugs in conversion of legacy write results to write command result format and conversion of legacy queries to find command documents. Finally, it adds an operation_id attribute to the published events to tie related events together. --- pymongo/_cmessagemodule.c | 118 ++++++--- pymongo/bulk.py | 38 ++- pymongo/collection.py | 98 +++---- pymongo/helpers.py | 43 +-- pymongo/message.py | 194 ++++++++++++-- pymongo/monitoring.py | 41 ++- pymongo/network.py | 4 +- test/test_monitoring.py | 541 +++++++++++++++++++++++++++----------- 8 files changed, 763 insertions(+), 314 deletions(-) diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index b8faa4973..2dc8262b7 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -580,10 +580,10 @@ _set_document_too_large(int size, long max) { } static PyObject* -_send_insert(PyObject* self, PyObject* sock_info, +_send_insert(PyObject* self, PyObject* ctx, PyObject* gle_args, buffer_t buffer, char* coll_name, int coll_len, int request_id, int safe, - codec_options_t* options) { + codec_options_t* options, PyObject* to_publish) { if (safe) { if (!add_last_error(self, buffer, request_id, @@ -594,13 +594,14 @@ _send_insert(PyObject* self, PyObject* sock_info, /* The max_doc_size parameter for legacy_write is the max size of any * document in buffer. We enforced max size already, pass 0 here. */ - return PyObject_CallMethod(sock_info, "legacy_write", - "i" BYTES_FORMAT_STRING "iN", + return PyObject_CallMethod(ctx, "legacy_write", + "i" BYTES_FORMAT_STRING "iNO", request_id, buffer_get_buffer(buffer), buffer_get_position(buffer), 0, - PyBool_FromLong((long)safe)); + PyBool_FromLong((long)safe), + to_publish); } static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { @@ -615,11 +616,12 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { PyObject* docs; PyObject* doc; PyObject* iterator; - PyObject* sock_info; + PyObject* ctx; PyObject* last_error_args; PyObject* result; PyObject* max_bson_size_obj; PyObject* max_message_size_obj; + PyObject* to_publish = NULL; unsigned char check_keys; unsigned char safe; unsigned char continue_on_error; @@ -638,7 +640,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { &last_error_args, &continue_on_error, convert_codec_options, &options, - &sock_info)) { + &ctx)) { return NULL; } if (continue_on_error) { @@ -649,7 +651,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { * is True it's pointless (and slower) to send GLE. */ send_safe = (safe || !continue_on_error); - max_bson_size_obj = PyObject_GetAttrString(sock_info, "max_bson_size"); + max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size"); #if PY_MAJOR_VERSION >= 3 max_bson_size = PyLong_AsLong(max_bson_size_obj); #else @@ -662,7 +664,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { return NULL; } - max_message_size_obj = PyObject_GetAttrString(sock_info, "max_message_size"); + max_message_size_obj = PyObject_GetAttrString(ctx, "max_message_size"); #if PY_MAJOR_VERSION >= 3 max_message_size = PyLong_AsLong(max_message_size_obj); #else @@ -692,6 +694,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { goto insertfail; } + if (!(to_publish = PyList_New(0))) { + goto insertfail; + } + iterator = PyObject_GetIter(docs); if (iterator == NULL) { PyObject* InvalidOperation = _error("InvalidOperation"); @@ -706,10 +712,8 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { int cur_size; if (!write_dict(state->_cbson, buffer, doc, check_keys, &options, 1)) { - Py_DECREF(doc); goto iterfail; } - Py_DECREF(doc); cur_size = buffer_get_position(buffer) - before; if (cur_size > max_bson_size) { @@ -719,9 +723,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { message_length = buffer_get_position(buffer) - length_location; memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); - result = _send_insert(self, sock_info, last_error_args, buffer, + result = _send_insert(self, ctx, last_error_args, buffer, collection_name, collection_name_length, - request_id, send_safe, &options); + request_id, send_safe, &options, + to_publish); if (!result) goto iterfail; Py_DECREF(result); @@ -762,15 +767,20 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { message_length = buffer_get_position(buffer) - length_location; memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); - result = _send_insert(self, sock_info, last_error_args, buffer, + result = _send_insert(self, ctx, last_error_args, buffer, collection_name, collection_name_length, - request_id, send_safe, &options); + request_id, send_safe, &options, to_publish); buffer_free(buffer); buffer = new_buffer; request_id = new_request_id; length_location = message_start; + Py_DECREF(to_publish); + if (!(to_publish = PyList_New(0))) { + goto insertfail; + } + if (!result) { PyObject *etype = NULL, *evalue = NULL, *etrace = NULL; PyObject* OperationFailure; @@ -786,7 +796,9 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { Py_DECREF(etype); Py_XDECREF(evalue); Py_XDECREF(etrace); + Py_DECREF(to_publish); Py_DECREF(iterator); + Py_DECREF(doc); buffer_free(buffer); PyMem_Free(collection_name); Py_RETURN_NONE; @@ -799,6 +811,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { exc_type = etype; exc_value = evalue; exc_trace = etrace; + if (PyList_Append(to_publish, doc) < 0) { + goto iterfail; + } + Py_CLEAR(doc); continue; } } @@ -813,6 +829,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { Py_DECREF(result); } } + if (PyList_Append(to_publish, doc) < 0) { + goto iterfail; + } + Py_CLEAR(doc); } Py_DECREF(iterator); @@ -833,10 +853,11 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4); /* Send the last (or only) batch */ - result = _send_insert(self, sock_info, last_error_args, buffer, + result = _send_insert(self, ctx, last_error_args, buffer, collection_name, collection_name_length, - request_id, safe, &options); + request_id, safe, &options, to_publish); + Py_DECREF(to_publish); PyMem_Free(collection_name); buffer_free(buffer); @@ -859,19 +880,22 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { Py_RETURN_NONE; iterfail: + Py_XDECREF(doc); Py_DECREF(iterator); insertfail: Py_XDECREF(exc_type); Py_XDECREF(exc_value); Py_XDECREF(exc_trace); + Py_XDECREF(to_publish); buffer_free(buffer); PyMem_Free(collection_name); return NULL; } static PyObject* -_send_write_command(PyObject* sock_info, buffer_t buffer, - int lst_len_loc, int cmd_len_loc, unsigned char* errors) { +_send_write_command(PyObject* ctx, buffer_t buffer, int lst_len_loc, + int cmd_len_loc, unsigned char* errors, + PyObject* to_publish) { PyObject* result; @@ -885,11 +909,12 @@ _send_write_command(PyObject* sock_info, buffer_t buffer, memcpy(buffer_get_buffer(buffer) + 4, &request_id, 4); /* Send the current batch */ - result = PyObject_CallMethod(sock_info, "write_command", - "i" BYTES_FORMAT_STRING, + result = PyObject_CallMethod(ctx, "write_command", + "i" BYTES_FORMAT_STRING "O", request_id, buffer_get_buffer(buffer), - buffer_get_position(buffer)); + buffer_get_position(buffer), + to_publish); if (result && PyDict_GetItemString(result, "writeErrors")) *errors = 1; return result; @@ -948,10 +973,11 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { PyObject* command; PyObject* doc; PyObject* docs; - PyObject* sock_info; + PyObject* ctx; PyObject* iterator; PyObject* result; PyObject* results; + PyObject* to_publish = NULL; unsigned char op; unsigned char check_keys; codec_options_t options; @@ -962,11 +988,11 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { if (!PyArg_ParseTuple(args, "et#bOObO&O", "utf-8", &ns, &ns_len, &op, &command, &docs, &check_keys, convert_codec_options, &options, - &sock_info)) { + &ctx)) { return NULL; } - max_bson_size_obj = PyObject_GetAttrString(sock_info, "max_bson_size"); + max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size"); #if PY_MAJOR_VERSION >= 3 max_bson_size = PyLong_AsLong(max_bson_size_obj); #else @@ -984,7 +1010,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { */ max_cmd_size = max_bson_size + 16382; - max_write_batch_size_obj = PyObject_GetAttrString(sock_info, "max_write_batch_size"); + max_write_batch_size_obj = PyObject_GetAttrString(ctx, "max_write_batch_size"); #if PY_MAJOR_VERSION >= 3 max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj); #else @@ -1006,10 +1032,18 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { return NULL; } + if (!(to_publish = PyList_New(0))) { + destroy_codec_options(&options); + PyMem_Free(ns); + Py_DECREF(results); + return NULL; + } + if (!(buffer = _command_buffer_new(ns, ns_len))) { destroy_codec_options(&options); PyMem_Free(ns); Py_DECREF(results); + Py_DECREF(to_publish); return NULL; } @@ -1086,16 +1120,13 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { INT2STRING(key, idx); if (!buffer_write_bytes(buffer, "\x03", 1) || !buffer_write_bytes(buffer, key, (int)strlen(key) + 1)) { - Py_DECREF(doc); goto cmditerfail; } cur_doc_begin = buffer_get_position(buffer); if (!write_dict(state->_cbson, buffer, doc, check_keys, &options, 1)) { - Py_DECREF(doc); goto cmditerfail; } - Py_DECREF(doc); /* We have enough data, maybe send this batch. */ enough_data = (buffer_get_position(buffer) > max_cmd_size); @@ -1147,8 +1178,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { goto cmditerfail; } - result = _send_write_command(sock_info, buffer, - lst_len_loc, cmd_len_loc, &errors); + result = _send_write_command(ctx, buffer, lst_len_loc, + cmd_len_loc, &errors, to_publish); buffer_free(buffer); buffer = new_buffer; @@ -1166,7 +1197,10 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { if (!result) goto cmditerfail; - PyList_Append(results, result); + if (PyList_Append(results, result) < 0) { + Py_DECREF(result); + goto cmditerfail; + } Py_DECREF(result); if (errors && ordered) { @@ -1177,7 +1211,15 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { } idx_offset += idx; idx = 0; + Py_DECREF(to_publish); + if (!(to_publish = PyList_New(0))) { + goto cmditerfail; + } } + if (PyList_Append(to_publish, doc) < 0) { + goto cmditerfail; + } + Py_CLEAR(doc); idx += 1; } Py_DECREF(iterator); @@ -1198,8 +1240,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { if (!buffer_write_bytes(buffer, "\x00\x00", 2)) goto cmdfail; - result = _send_write_command(sock_info, buffer, - lst_len_loc, cmd_len_loc, &errors); + result = _send_write_command(ctx, buffer, lst_len_loc, + cmd_len_loc, &errors, to_publish); if (!result) goto cmdfail; @@ -1213,16 +1255,22 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { buffer_free(buffer); - PyList_Append(results, result); + if (PyList_Append(results, result) < 0) { + Py_DECREF(result); + goto cmdfail; + } Py_DECREF(result); + Py_DECREF(to_publish); destroy_codec_options(&options); return results; cmditerfail: + Py_XDECREF(doc); Py_DECREF(iterator); cmdfail: destroy_codec_options(&options); Py_DECREF(results); + Py_XDECREF(to_publish); buffer_free(buffer); return NULL; } diff --git a/pymongo/bulk.py b/pymongo/bulk.py index d7658a209..1bd3ba77c 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -29,7 +29,9 @@ from pymongo.errors import (BulkWriteError, InvalidOperation, OperationFailure) from pymongo.message import (_INSERT, _UPDATE, _DELETE, - _do_batched_write_command) + _do_batched_write_command, + _randint, + _BulkWriteContext) from pymongo.write_concern import WriteConcern @@ -181,11 +183,13 @@ def _merge_command(run, full_result, results): write_errors = result.get("writeErrors") if write_errors: for doc in write_errors: + # Leave the server response intact for APM. + replacement = doc.copy() idx = doc["index"] + offset - doc["index"] = run.index(idx) + replacement["index"] = run.index(idx) # Add the failed operation to the error document. - doc[_UOP] = run.ops[idx] - full_result["writeErrors"].extend(write_errors) + replacement[_UOP] = run.ops[idx] + full_result["writeErrors"].append(replacement) wc_error = result.get("writeConcernError") if wc_error: @@ -276,15 +280,19 @@ class _Bulk(object): "nRemoved": 0, "upserted": [], } + op_id = _randint() + db_name = self.collection.database.name + for run in generator: cmd = SON([(_COMMANDS[run.op_type], self.collection.name), ('ordered', self.ordered)]) if write_concern.document: cmd['writeConcern'] = write_concern.document + bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id) results = _do_batched_write_command( self.namespace, run.op_type, cmd, - run.ops, True, self.collection.codec_options, sock_info) + run.ops, True, self.collection.codec_options, bwc) _merge_command(run, full_result, results) # We're supposed to continue if errors are @@ -306,6 +314,7 @@ class _Bulk(object): # If ordered is True we have to send GLE or use write # commands so we can abort on the first error. write_concern = WriteConcern(w=int(self.ordered)) + op_id = _randint() for run in generator: try: @@ -313,7 +322,8 @@ class _Bulk(object): coll._insert(sock_info, run.ops, self.ordered, - write_concern=write_concern) + write_concern=write_concern, + op_id=op_id) elif run.op_type == _UPDATE: for operation in run.ops: doc = operation['u'] @@ -326,13 +336,15 @@ class _Bulk(object): operation['upsert'], check_keys, operation['multi'], - write_concern=write_concern) + write_concern=write_concern, + op_id=op_id) else: for operation in run.ops: coll._delete(sock_info, operation['q'], not operation['limit'], - write_concern) + write_concern, + op_id) except OperationFailure: if self.ordered: break @@ -350,6 +362,7 @@ class _Bulk(object): "nRemoved": 0, "upserted": [], } + op_id = _randint() stop = False for run in generator: for idx, operation in enumerate(run.ops): @@ -360,7 +373,8 @@ class _Bulk(object): if run.op_type == _INSERT: coll._insert(sock_info, operation, - write_concern=write_concern) + write_concern=write_concern, + op_id=op_id) result = {} elif run.op_type == _UPDATE: doc = operation['u'] @@ -373,12 +387,14 @@ class _Bulk(object): operation['upsert'], check_keys, operation['multi'], - write_concern=write_concern) + write_concern=write_concern, + op_id=op_id) else: result = coll._delete(sock_info, operation['q'], not operation['limit'], - write_concern) + write_concern, + op_id) _merge_legacy(run, full_result, result, idx) except DocumentTooLarge as exc: # MongoDB 2.6 uses error code 2 for "too large". diff --git a/pymongo/collection.py b/pymongo/collection.py index d2db1b02b..5263307e1 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -35,7 +35,6 @@ from pymongo.command_cursor import CommandCursor from pymongo.cursor import Cursor from pymongo.errors import ConfigurationError, InvalidName, OperationFailure from pymongo.helpers import _check_write_command_response -from pymongo.message import _INSERT from pymongo.operations import _WriteOp, IndexModel from pymongo.read_preferences import ReadPreference from pymongo.results import (BulkWriteResult, @@ -383,7 +382,7 @@ class Collection(common.BaseObject): return BulkWriteResult({}, False) def _legacy_write( - self, sock_info, name, command, acknowledged, func, *args): + self, sock_info, name, cmd, acknowledged, op_id, func, *args): """Internal legacy write helper.""" publish = monitoring.enabled() if publish: @@ -392,39 +391,37 @@ class Collection(common.BaseObject): if publish: duration = datetime.datetime.now() - start monitoring.publish_command_start( - command, self.__database.name, rqst_id, sock_info.address) + cmd, self.__database.name, rqst_id, sock_info.address, op_id) start = datetime.datetime.now() try: result = sock_info.legacy_write( rqst_id, msg, max_size, acknowledged) except OperationFailure as exc: if publish: - duration = (datetime.datetime.now() - start) + duration + dur = (datetime.datetime.now() - start) + duration details = exc.details # Succeed if GLE was successful and this is a write error. - # XXX: Is checking if "n" is in details the best way to - # differentiate write errors from something else? if details.get("ok") and "n" in details: - reply = helpers._upconvert_write_result( - name, command, details) + reply = message._convert_write_result( + name, cmd, details) monitoring.publish_command_success( - duration, reply, name, rqst_id, sock_info.address) + dur, reply, name, rqst_id, sock_info.address, op_id) else: monitoring.publish_command_failure( - duration, details, name, rqst_id, sock_info.address) + dur, details, name, rqst_id, sock_info.address, op_id) raise if publish: - # No result for w=0 - reply = None + # Comply with APM spec. + reply = {'ok': 1} if result: - reply = helpers._upconvert_write_result(name, command, result) + reply = message._convert_write_result(name, cmd, result) duration = (datetime.datetime.now() - start) + duration monitoring.publish_command_success( - duration, reply, name, rqst_id, sock_info.address) + duration, reply, name, rqst_id, sock_info.address, op_id) return result def _insert_one( - self, sock_info, doc, check_keys, manipulate, write_concern): + self, sock_info, doc, check_keys, manipulate, write_concern, op_id): """Internal helper for inserting a single document.""" if manipulate: doc = self.__database._apply_incoming_manipulators(doc, self) @@ -435,9 +432,9 @@ class Collection(common.BaseObject): concern = (write_concern or self.write_concern).document acknowledged = concern.get("w") != 0 command = SON([('insert', self.name), - ('documents', [doc]), - ('ordered', True)]) - if acknowledged and concern: + ('ordered', True), + ('documents', [doc])]) + if concern: command['writeConcern'] = concern if sock_info.max_wire_version > 1 and acknowledged: @@ -450,17 +447,17 @@ class Collection(common.BaseObject): else: # Legacy OP_INSERT. self._legacy_write( - sock_info, 'insert', command, acknowledged, + sock_info, 'insert', command, acknowledged, op_id, message.insert, self.__full_name, [doc], check_keys, acknowledged, concern, False, self.codec_options) return doc.get('_id') - def _insert(self, sock_info, docs, ordered=True, - check_keys=True, manipulate=False, write_concern=None): + def _insert(self, sock_info, docs, ordered=True, check_keys=True, + manipulate=False, write_concern=None, op_id=None): """Internal insert helper.""" if isinstance(docs, collections.MutableMapping): return self._insert_one( - sock_info, docs, check_keys, manipulate, write_concern) + sock_info, docs, check_keys, manipulate, write_concern, op_id) ids = [] @@ -489,25 +486,27 @@ class Collection(common.BaseObject): yield doc concern = (write_concern or self.write_concern).document - safe = concern.get("w") != 0 + acknowledged = concern.get("w") != 0 - if sock_info.max_wire_version > 1 and safe: + command = SON([('insert', self.name), + ('ordered', ordered)]) + if concern: + command['writeConcern'] = concern + if op_id is None: + op_id = message._randint() + bwc = message._BulkWriteContext( + self.database.name, command, sock_info, op_id) + if sock_info.max_wire_version > 1 and acknowledged: # Batched insert command. - command = SON([('insert', self.name), - ('ordered', ordered)]) - - if concern: - command['writeConcern'] = concern - results = message._do_batched_write_command( - self.database.name + ".$cmd", _INSERT, command, - gen(), check_keys, self.codec_options, sock_info) + self.database.name + ".$cmd", message._INSERT, command, + gen(), check_keys, self.codec_options, bwc) _check_write_command_response(results) else: # Legacy batched OP_INSERT. message._do_batched_insert(self.__full_name, gen(), check_keys, - safe, concern, not ordered, - self.codec_options, sock_info) + acknowledged, concern, not ordered, + self.codec_options, bwc) return ids def insert_one(self, document): @@ -572,7 +571,7 @@ class Collection(common.BaseObject): if "_id" not in document: document["_id"] = ObjectId() inserted_ids.append(document["_id"]) - yield (_INSERT, document) + yield (message._INSERT, document) blk = _Bulk(self, ordered) blk.ops = [doc for doc in gen()] @@ -581,7 +580,7 @@ class Collection(common.BaseObject): def _update(self, sock_info, criteria, document, upsert=False, check_keys=True, multi=False, manipulate=False, - write_concern=None): + write_concern=None, op_id=None): """Internal update / replace helper.""" common.validate_boolean("upsert", upsert) if manipulate: @@ -589,12 +588,12 @@ class Collection(common.BaseObject): concern = (write_concern or self.write_concern).document acknowledged = concern.get("w") != 0 command = SON([('update', self.name), + ('ordered', True), ('updates', [SON([('q', criteria), ('u', document), ('multi', multi), - ('upsert', upsert)])]), - ('ordered', True)]) - if acknowledged and concern: + ('upsert', upsert)])])]) + if concern: command['writeConcern'] = concern if sock_info.max_wire_version > 1 and acknowledged: # Update command. @@ -619,9 +618,9 @@ class Collection(common.BaseObject): else: # Legacy OP_UPDATE. return self._legacy_write( - sock_info, 'update', command, acknowledged, message.update, - self.__full_name, upsert, multi, criteria, document, - acknowledged, concern, check_keys, self.codec_options) + sock_info, 'update', command, acknowledged, op_id, + message.update, self.__full_name, upsert, multi, criteria, + document, acknowledged, concern, check_keys, self.codec_options) def replace_one(self, filter, replacement, upsert=False): """Replace a single document matching the filter. @@ -758,16 +757,17 @@ class Collection(common.BaseObject): """ self.__database.drop_collection(self.__name) - def _delete(self, sock_info, criteria, multi, write_concern=None): + def _delete( + self, sock_info, criteria, multi, write_concern=None, op_id=None): """Internal delete helper.""" common.validate_is_mapping("filter", criteria) concern = (write_concern or self.write_concern).document acknowledged = concern.get("w") != 0 command = SON([('delete', self.name), + ('ordered', True), ('deletes', [SON([('q', criteria), - ('limit', int(not multi))])]), - ('ordered', True)]) - if acknowledged and concern: + ('limit', int(not multi))])])]) + if concern: command['writeConcern'] = concern if sock_info.max_wire_version > 1 and acknowledged: @@ -780,9 +780,9 @@ class Collection(common.BaseObject): else: # Legacy OP_DELETE. return self._legacy_write( - sock_info, 'delete', command, acknowledged, message.delete, - self.__full_name, criteria, acknowledged, concern, - self.codec_options, int(not multi)) + sock_info, 'delete', command, acknowledged, op_id, + message.delete, self.__full_name, criteria, acknowledged, + concern, self.codec_options, int(not multi)) def delete_one(self, filter): """Delete a single document matching the filter. diff --git a/pymongo/helpers.py b/pymongo/helpers.py index 866ede72e..8c79f408b 100644 --- a/pymongo/helpers.py +++ b/pymongo/helpers.py @@ -108,7 +108,7 @@ def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()): # Fake a getMore command response. OP_GET_MORE provides no document. msg = "Cursor not found, cursor id: %d" % (cursor_id,) - errobj = {"ok" : 0, "errmsg" : msg, "code" : 43} + errobj = {"ok": 0, "errmsg": msg, "code": 43} raise CursorNotFound(msg, 43, errobj) elif response_flag & 2: error_object = bson.BSON(response[20:]).decode() @@ -274,47 +274,6 @@ def _check_write_command_response(results): error.get("errmsg"), error.get("code"), error) -def _upconvert_write_result(operation, command, result): - """Convert a legacy write result to write commmand format.""" - - # Based on _merge_legacy from bulk.py - affected = result.get("n", 0) - res = {"ok": 1, "n": affected} - errmsg = result.get("errmsg", result.get("err", "")) - if errmsg: - # The write was successful on at least the primary so don't return. - if result.get("wtimeout"): - res["writeConcernError"] = {"errmsg": errmsg, - "code": 64, - "errInfo": {"wtimeout": True}} - else: - # The write failed. - error = {"index": 0, - "code": result.get("code", 8), - "errmsg": errmsg} - if "errInfo" in result: - error["errInfo"] = result["errInfo"] - res["writeErrors"] = [error] - return res - if operation == "insert": - # GLE result for insert is always 0 in most MongoDB versions. - res["n"] = 1 - elif operation == "update": - res["nModified"] = 0 - if "upserted" in result: - res["upserted"] = [{"index": 0, "_id": result["upserted"]}] - # Versions of MongoDB before 2.6 don't return the _id for an - # upsert if _id is not an ObjectId. - elif result.get("updatedExisting") is False and affected == 1: - # If _id is in both the update document *and* the query spec - # the update document _id takes precedence. - _id = command["u"].get("_id", command["q"].get("_id")) - res["upserted"] = [{"index": 0, "_id": _id}] - else: - res["nModified"] = affected - return res - - def _fields_list_to_dict(fields, option_name): """Takes a sequence of field names and returns a matching dictionary. diff --git a/pymongo/message.py b/pymongo/message.py index 6c51af1d7..b9840c34a 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -20,6 +20,7 @@ MongoDB. application developers. """ +import datetime import random import struct @@ -32,6 +33,7 @@ try: _use_c = True except ImportError: _use_c = False +from pymongo import monitoring from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure from pymongo.read_preferences import ReadPreference @@ -57,6 +59,11 @@ _OP_MAP = { } +def _randint(): + """Generate a pseudo random 32 bit integer.""" + return random.randint(MIN_INT32, MAX_INT32) + + def _maybe_add_read_preference(spec, read_preference): """Add $readPreference to spec when appropriate.""" mode = read_preference.mode @@ -75,6 +82,44 @@ def _maybe_add_read_preference(spec, read_preference): return spec +def _convert_write_result(operation, command, result): + """Convert a legacy write result to write commmand format.""" + + # Based on _merge_legacy from bulk.py + affected = result.get("n", 0) + res = {"ok": 1, "n": affected} + errmsg = result.get("errmsg", result.get("err", "")) + if errmsg: + # The write was successful on at least the primary so don't return. + if result.get("wtimeout"): + res["writeConcernError"] = {"errmsg": errmsg, + "code": 64, + "errInfo": {"wtimeout": True}} + else: + # The write failed. + error = {"index": 0, + "code": result.get("code", 8), + "errmsg": errmsg} + if "errInfo" in result: + error["errInfo"] = result["errInfo"] + res["writeErrors"] = [error] + return res + if operation == "insert": + # GLE result for insert is always 0 in most MongoDB versions. + res["n"] = len(command['documents']) + elif operation == "update": + if "upserted" in result: + res["upserted"] = [{"index": 0, "_id": result["upserted"]}] + # Versions of MongoDB before 2.6 don't return the _id for an + # upsert if _id is not an ObjectId. + elif result.get("updatedExisting") is False and affected == 1: + # If _id is in both the update document *and* the query spec + # the update document _id takes precedence. + _id = command["u"].get("_id", command["q"].get("_id")) + res["upserted"] = [{"index": 0, "_id": _id}] + return res + + _OPTIONS = SON([ ('tailable', 2), ('oplogReplay', 8), @@ -90,7 +135,6 @@ _MODIFIERS = SON([ ('$comment', 'comment'), ('$maxScan', 'maxScan'), ('$maxTimeMS', 'maxTimeMS'), - ('$readPreference', 'readPreference'), ('$max', 'max'), ('$min', 'min'), ('$returnKey', 'returnKey'), @@ -121,12 +165,11 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options): if skip: cmd['skip'] = skip if limit: - cmd['limit'] = limit + cmd['limit'] = abs(limit) + if limit < 0: + cmd['singleBatch'] = True if batch_size: cmd['batchSize'] = batch_size - # XXX: Should the check for 1 be here? - if limit < 0 or limit == 1: - cmd['singleBatch'] = True if options: cmd.update([(opt, True) @@ -262,7 +305,7 @@ def __pack_message(operation, data): Returns the resultant message string. """ - request_id = random.randint(MIN_INT32, MAX_INT32) + request_id = _randint() message = struct.pack(" sock_info.max_bson_size) + too_large = (encoded_length > ctx.max_bson_size) message_length += encoded_length - if message_length < sock_info.max_message_size and not too_large: + if message_length < ctx.max_message_size and not too_large: data.write(encoded) + to_send.append(doc) has_docs = True continue @@ -427,7 +582,7 @@ def _do_batched_insert(collection_name, docs, check_keys, # We have enough data, send this message. try: request_id, msg = _insert_message(data.getvalue(), send_safe) - sock_info.legacy_write(request_id, msg, 0, send_safe) + ctx.legacy_write(request_id, msg, 0, send_safe, to_send) # Exception type could be OperationFailure or a subtype # (e.g. DuplicateKeyError) except OperationFailure as exc: @@ -447,18 +602,19 @@ def _do_batched_insert(collection_name, docs, check_keys, " - the connected server supports" " BSON document sizes up to %d" " bytes." % - (encoded_length, sock_info.max_bson_size)) + (encoded_length, ctx.max_bson_size)) message_length = begin_loc + encoded_length data.seek(begin_loc) data.truncate() data.write(encoded) + to_send = [doc] if not has_docs: raise InvalidOperation("cannot do an empty bulk insert") request_id, msg = _insert_message(data.getvalue(), safe) - sock_info.legacy_write(request_id, msg, 0, safe) + ctx.legacy_write(request_id, msg, 0, safe, to_send) # Re-raise any exception stored due to continue_on_error if last_error is not None: @@ -468,11 +624,11 @@ if _use_c: def _do_batched_write_command(namespace, operation, command, - docs, check_keys, opts, sock_info): + docs, check_keys, opts, ctx): """Execute a batch of insert, update, or delete commands. """ - max_bson_size = sock_info.max_bson_size - max_write_batch_size = sock_info.max_write_batch_size + max_bson_size = ctx.max_bson_size + max_write_batch_size = ctx.max_write_batch_size # Max BSON object size + 16k - 2 bytes for ending NUL bytes. # Server guarantees there is enough room: SERVER-10643. max_cmd_size = max_bson_size + 16382 @@ -511,6 +667,8 @@ def _do_batched_write_command(namespace, operation, command, # Where to write list document length list_start = buf.tell() - 4 + to_send = [] + def send_message(): """Finalize and send the current OP_QUERY message. """ @@ -524,11 +682,11 @@ def _do_batched_write_command(namespace, operation, command, buf.seek(command_start) buf.write(struct.pack('