From 4d786fd0cdc061320a9c7e3c32aae95635652c72 Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Thu, 21 Jun 2018 16:03:20 -0700 Subject: [PATCH] PYTHON-1329 - OP_MSG bulk writes --- pymongo/_cmessagemodule.c | 317 +++++++++++++++++++++++++++++++++- pymongo/bulk.py | 12 +- pymongo/message.py | 182 +++++++++++++++++-- test/test_retryable_writes.py | 9 +- 4 files changed, 495 insertions(+), 25 deletions(-) diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index e374f6420..5d32e3375 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -1117,6 +1117,315 @@ insertfail: #define _UPDATE 1 #define _DELETE 2 +/* OP_MSG ----------------------------------------------- */ + +static int +_batched_op_msg( + unsigned char op, unsigned char check_keys, unsigned char ack, + PyObject* command, PyObject* docs, PyObject* ctx, + PyObject* to_publish, codec_options_t options, + buffer_t buffer, struct module_state *state) { + + long max_bson_size; + long max_write_batch_size; + long max_message_size; + int idx = 0; + int size_location; + int position; + int length; + PyObject* max_bson_size_obj; + PyObject* max_write_batch_size_obj; + PyObject* max_message_size_obj; + PyObject* doc; + PyObject* iterator; + char* flags = ack ? "\x00\x00\x00\x00" : "\x02\x00\x00\x00"; + + max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size"); +#if PY_MAJOR_VERSION >= 3 + max_bson_size = PyLong_AsLong(max_bson_size_obj); +#else + max_bson_size = PyInt_AsLong(max_bson_size_obj); +#endif + Py_XDECREF(max_bson_size_obj); + if (max_bson_size == -1) { + return 0; + } + + max_write_batch_size_obj = PyObject_GetAttrString(ctx, "max_write_batch_size"); +#if PY_MAJOR_VERSION >= 3 + max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj); +#else + max_write_batch_size = PyInt_AsLong(max_write_batch_size_obj); +#endif + Py_XDECREF(max_write_batch_size_obj); + if (max_write_batch_size == -1) { + return 0; + } + + max_message_size_obj = PyObject_GetAttrString(ctx, "max_message_size"); +#if PY_MAJOR_VERSION >= 3 + max_message_size = PyLong_AsLong(max_message_size_obj); +#else + max_message_size = PyInt_AsLong(max_message_size_obj); +#endif + Py_XDECREF(max_message_size_obj); + if (max_message_size == -1) { + return 0; + } + + if (!buffer_write_bytes(buffer, flags, 4)) { + return 0; + } + /* Type 0 Section */ + if (!buffer_write_bytes(buffer, "\x00", 1)) { + return 0; + } + if (!write_dict(state->_cbson, buffer, command, 0, + &options, 0)) { + return 0; + } + + /* Type 1 Section */ + if (!buffer_write_bytes(buffer, "\x01", 1)) { + return 0; + } + /* Save space for size */ + size_location = buffer_save_space(buffer, 4); + + switch (op) { + case _INSERT: + { + if (!buffer_write_bytes(buffer, "documents\x00", 10)) + goto cmdfail; + break; + } + case _UPDATE: + { + /* MongoDB does key validation for update. */ + check_keys = 0; + if (!buffer_write_bytes(buffer, "updates\x00", 8)) + goto cmdfail; + break; + } + case _DELETE: + { + /* Never check keys in a delete command. */ + check_keys = 0; + if (!buffer_write_bytes(buffer, "deletes\x00", 8)) + goto cmdfail; + break; + } + default: + { + PyObject* InvalidOperation = _error("InvalidOperation"); + if (InvalidOperation) { + PyErr_SetString(InvalidOperation, "Unknown command"); + Py_DECREF(InvalidOperation); + } + return 0; + } + } + + iterator = PyObject_GetIter(docs); + if (iterator == NULL) { + PyObject* InvalidOperation = _error("InvalidOperation"); + if (InvalidOperation) { + PyErr_SetString(InvalidOperation, "input is not iterable"); + Py_DECREF(InvalidOperation); + } + return 0; + } + while ((doc = PyIter_Next(iterator)) != NULL) { + int cur_doc_begin = buffer_get_position(buffer); + int cur_size; + int enough_data = 0; + int enough_documents = 0; + if (!write_dict(state->_cbson, buffer, doc, check_keys, + &options, 1)) { + goto cmditerfail; + } + /* We have enough data, return this batch. */ + enough_data = (buffer_get_position(buffer) > max_message_size); + enough_documents = (idx >= max_write_batch_size); + if (enough_data || enough_documents) { + cur_size = buffer_get_position(buffer) - cur_doc_begin; + + /* This single document is too large for the message. */ + if (!idx) { + if (op == _INSERT) { + _set_document_too_large(cur_size, max_bson_size); + } else { + PyObject* DocumentTooLarge = _error("DocumentTooLarge"); + if (DocumentTooLarge) { + /* + * There's nothing intelligent we can say + * about size for update and remove. + */ + PyErr_SetString(DocumentTooLarge, + "operation document too large"); + Py_DECREF(DocumentTooLarge); + } + } + goto cmditerfail; + } + /* + * Roll the existing buffer back to the beginning + * of the last document encoded. + */ + buffer_update_position(buffer, cur_doc_begin); + break; + } + if (PyList_Append(to_publish, doc) < 0) { + goto cmditerfail; + } + Py_CLEAR(doc); + idx += 1; + } + Py_DECREF(iterator); + + if (PyErr_Occurred()) { + goto cmdfail; + } + + position = buffer_get_position(buffer); + length = position - size_location; + buffer_write_int32_at_position(buffer, size_location, (int32_t)length); + return 1; + +cmditerfail: + Py_XDECREF(doc); + Py_DECREF(iterator); +cmdfail: + return 0; +} + +static PyObject* +_cbson_encode_batched_op_msg(PyObject* self, PyObject* args) { + unsigned char op; + unsigned char check_keys; + unsigned char ack; + PyObject* command; + PyObject* docs; + PyObject* ctx = NULL; + PyObject* to_publish = NULL; + PyObject* result = NULL; + codec_options_t options; + buffer_t buffer; + struct module_state *state = GETSTATE(self); + + if (!PyArg_ParseTuple(args, "bOObbO&O", + &op, &command, &docs, &check_keys, &ack, + convert_codec_options, &options, + &ctx)) { + return NULL; + } + if (!(buffer = buffer_new())) { + PyErr_NoMemory(); + destroy_codec_options(&options); + return NULL; + } + if (!(to_publish = PyList_New(0))) { + goto fail; + } + + if (!_batched_op_msg( + op, + check_keys, + ack, + command, + docs, + ctx, + to_publish, + options, + buffer, + state)) { + goto fail; + } + + result = Py_BuildValue(BYTES_FORMAT_STRING "O", + buffer_get_buffer(buffer), + buffer_get_position(buffer), + to_publish); +fail: + destroy_codec_options(&options); + buffer_free(buffer); + Py_XDECREF(to_publish); + return result; +} + +static PyObject* +_cbson_batched_op_msg(PyObject* self, PyObject* args) { + unsigned char op; + unsigned char check_keys; + unsigned char ack; + int request_id; + int position; + PyObject* command; + PyObject* docs; + PyObject* ctx = NULL; + PyObject* to_publish = NULL; + PyObject* result = NULL; + codec_options_t options; + buffer_t buffer; + struct module_state *state = GETSTATE(self); + + if (!PyArg_ParseTuple(args, "bOObbO&O", + &op, &command, &docs, &check_keys, &ack, + convert_codec_options, &options, + &ctx)) { + return NULL; + } + if (!(buffer = buffer_new())) { + PyErr_NoMemory(); + destroy_codec_options(&options); + return NULL; + } + /* Save space for message length and request id */ + if ((buffer_save_space(buffer, 8)) == -1) { + PyErr_NoMemory(); + goto fail; + } + if (!buffer_write_bytes(buffer, + "\x00\x00\x00\x00" /* responseTo */ + "\xdd\x07\x00\x00", /* opcode */ + 8)) { + goto fail; + } + if (!(to_publish = PyList_New(0))) { + goto fail; + } + + if (!_batched_op_msg( + op, + check_keys, + ack, + command, + docs, + ctx, + to_publish, + options, + buffer, + state)) { + goto fail; + } + + request_id = rand(); + position = buffer_get_position(buffer); + buffer_write_int32_at_position(buffer, 0, (int32_t)position); + buffer_write_int32_at_position(buffer, 4, (int32_t)request_id); + result = Py_BuildValue("i" BYTES_FORMAT_STRING "O", request_id, + buffer_get_buffer(buffer), + buffer_get_position(buffer), + to_publish); +fail: + destroy_codec_options(&options); + buffer_free(buffer); + Py_XDECREF(to_publish); + return result; +} + +/* End OP_MSG -------------------------------------------- */ + static int _batched_write_command( char* ns, int ns_len, unsigned char op, int check_keys, @@ -1376,7 +1685,7 @@ fail: } static PyObject* -_cbson_do_batched_write_command(PyObject* self, PyObject* args) { +_cbson_batched_write_command(PyObject* self, PyObject* args) { char *ns = NULL; unsigned char op; unsigned char check_keys; @@ -1463,10 +1772,14 @@ static PyMethodDef _CMessageMethods[] = { "create an OP_MSG message to be sent to MongoDB"}, {"_do_batched_insert", _cbson_do_batched_insert, METH_VARARGS, "insert a batch of documents, splitting the batch as needed"}, - {"_do_batched_write_command", _cbson_do_batched_write_command, METH_VARARGS, + {"_batched_write_command", _cbson_batched_write_command, METH_VARARGS, "Create the next batched insert, update, or delete command"}, {"_encode_batched_write_command", _cbson_encode_batched_write_command, METH_VARARGS, "Encode the next batched insert, update, or delete command"}, + {"_batched_op_msg", _cbson_batched_op_msg, METH_VARARGS, + "Create the next batched insert, update, or delete using OP_MSG"}, + {"_encode_batched_op_msg", _cbson_encode_batched_op_msg, METH_VARARGS, + "Encode the next batched insert, update, or delete using OP_MSG"}, {NULL, NULL, 0, NULL} }; diff --git a/pymongo/bulk.py b/pymongo/bulk.py index ec74a84ce..20158cd16 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -27,7 +27,7 @@ from pymongo.common import (validate_is_mapping, validate_is_document_type, validate_ok_for_replace, validate_ok_for_update) -from pymongo.helpers import _RETRYABLE_ERROR_CODES, _raise_write_concern_error +from pymongo.helpers import _RETRYABLE_ERROR_CODES from pymongo.collation import validate_collation_or_none from pymongo.errors import (BulkWriteError, ConfigurationError, @@ -35,8 +35,7 @@ from pymongo.errors import (BulkWriteError, OperationFailure) from pymongo.message import (_INSERT, _UPDATE, _DELETE, _do_batched_insert, - _do_batched_write_command, - _do_batched_write_command_compressed, + _do_bulk_write_command, _randint, _BulkWriteContext) from pymongo.read_preferences import ReadPreference @@ -260,11 +259,6 @@ class _Bulk(object): self.current_run = next(generator) run = self.current_run - if sock_info.compression_context: - do_writes = _do_batched_write_command_compressed - else: - do_writes = _do_batched_write_command - # sock_info.command validates the session, but we use # sock_info.write_command. sock_info.validate_session(client, session) @@ -285,7 +279,7 @@ class _Bulk(object): check_keys = run.op_type == _INSERT ops = islice(run.ops, run.idx_offset, None) # Run as many ops as possible. - request_id, msg, to_send = do_writes( + request_id, msg, to_send = _do_bulk_write_command( self.namespace, run.op_type, cmd, ops, check_keys, self.collection.codec_options, bwc) if not to_send: diff --git a/pymongo/message.py b/pymongo/message.py index 5083a69a9..9c6d7c5cb 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -25,7 +25,10 @@ import random import struct import bson -from bson import CodecOptions, _make_c_string, _dict_to_bson +from bson import (CodecOptions, + _bson_to_dict, + _dict_to_bson, + _make_c_string) from bson.codec_options import DEFAULT_CODEC_OPTIONS from bson.py3compat import b, StringIO from bson.son import SON @@ -1043,8 +1046,145 @@ def _do_batched_insert(collection_name, docs, check_keys, if _use_c: _do_batched_insert = _cmessage._do_batched_insert +# OP_MSG ------------------------------------------------------------- -def _do_batched_write_command_compressed( + +_OP_MSG_MAP = { + _INSERT: b'documents\x00', + _UPDATE: b'updates\x00', + _DELETE: b'deletes\x00', +} + + +def _batched_op_msg_impl( + operation, command, docs, check_keys, ack, opts, ctx, buf): + """Create a batched OP_MSG write.""" + max_bson_size = ctx.max_bson_size + max_write_batch_size = ctx.max_write_batch_size + max_message_size = ctx.max_message_size + + flags = b"\x00\x00\x00\x00" if ack else b"\x02\x00\x00\x00" + # Flags + buf.write(flags) + + # Type 0 Section + buf.write(b"\x00") + buf.write(_dict_to_bson(command, False, opts)) + + # Type 1 Section + buf.write(b"\x01") + size_location = buf.tell() + # Save space for size + buf.write(b"\x00\x00\x00\x00") + try: + buf.write(_OP_MSG_MAP[operation]) + except KeyError: + raise InvalidOperation('Unknown command') + + if operation in (_UPDATE, _DELETE): + check_keys = False + + to_send = [] + idx = 0 + for doc in docs: + # Encode the current operation + value = _dict_to_bson(doc, check_keys, opts) + # Is there enough room to add this document? + enough_data = (buf.tell() + len(value)) >= max_message_size + enough_documents = (idx >= max_write_batch_size) + if enough_data or enough_documents: + if not idx: + write_op = "insert" if operation == _INSERT else None + _raise_document_too_large( + write_op, len(value), max_bson_size) + break + buf.write(value) + to_send.append(doc) + idx += 1 + + # Write type 1 section size + length = buf.tell() + buf.seek(size_location) + buf.write(_pack_int(length - size_location)) + + return to_send, length + + +def _encode_batched_op_msg( + operation, command, docs, check_keys, ack, opts, ctx): + """Encode the next batched insert, update, or delete operation + as OP_MSG. + """ + buf = StringIO() + + to_send, _ = _batched_op_msg_impl( + operation, command, docs, check_keys, ack, opts, ctx, buf) + return buf.getvalue(), to_send +if _use_c: + _encode_batched_op_message = _cmessage._encode_batched_op_msg + + +def _batched_op_msg_compressed( + operation, command, docs, check_keys, ack, opts, ctx): + """Create the next batched insert, update, or delete operation + with OP_MSG, compressed. + """ + data, to_send = _encode_batched_op_msg( + operation, command, docs, check_keys, ack, opts, ctx) + + request_id, msg = _compress( + 2013, + data, + ctx.sock_info.compression_context) + return request_id, msg, to_send + + +def _batched_op_msg( + operation, command, docs, check_keys, ack, opts, ctx): + """OP_MSG implementation entry point.""" + buf = StringIO() + + # Save space for message length and request id + buf.write(_ZERO_64) + # responseTo, opCode + buf.write(b"\x00\x00\x00\x00\xdd\x07\x00\x00") + + to_send, length = _batched_op_msg_impl( + operation, command, docs, check_keys, ack, opts, ctx, buf) + + # Header - request id and message length + buf.seek(4) + request_id = _randint() + buf.write(_pack_int(request_id)) + buf.seek(0) + buf.write(_pack_int(length)) + + return request_id, buf.getvalue(), to_send +if _use_c: + _batched_op_msg = _cmessage._batched_op_msg + + +def _do_batched_op_msg( + namespace, operation, command, docs, check_keys, opts, ctx): + """Create the next batched insert, update, or delete operation + using OP_MSG. + """ + command['$db'] = namespace.split('.', 1)[0] + if 'writeConcern' in command: + ack = bool(command['writeConcern'].get('w', 1)) + else: + ack = True + if ctx.sock_info.compression_context: + return _batched_op_msg_compressed( + operation, command, docs, check_keys, ack, opts, ctx) + return _batched_op_msg( + operation, command, docs, check_keys, ack, opts, ctx) + + +# End OP_MSG ----------------------------------------------------- + + +def _batched_write_command_compressed( namespace, operation, command, docs, check_keys, opts, ctx): """Create the next batched insert, update, or delete command, compressed. """ @@ -1064,14 +1204,14 @@ def _encode_batched_write_command( """ buf = StringIO() - to_send, _ = _batched_write_command( + to_send, _ = _batched_write_command_impl( namespace, operation, command, docs, check_keys, opts, ctx, buf) return buf.getvalue(), to_send if _use_c: _encode_batched_write_command = _cmessage._encode_batched_write_command -def _do_batched_write_command( +def _batched_write_command( namespace, operation, command, docs, check_keys, opts, ctx): """Create the next batched insert, update, or delete command. """ @@ -1083,22 +1223,42 @@ def _do_batched_write_command( buf.write(b"\x00\x00\x00\x00\xd4\x07\x00\x00") # Write OP_QUERY write command - to_send, length = _batched_write_command( + to_send, length = _batched_write_command_impl( namespace, operation, command, docs, check_keys, opts, ctx, buf) # Header - request id and message length buf.seek(4) request_id = _randint() - buf.write(struct.pack(' 5: + return _do_batched_op_msg( + namespace, operation, command, docs, check_keys, opts, ctx) + return _do_batched_write_command( + namespace, operation, command, docs, check_keys, opts, ctx) + + +def _batched_write_command_impl( namespace, operation, command, docs, check_keys, opts, ctx, buf): """Create a batched OP_QUERY write command.""" max_bson_size = ctx.max_bson_size @@ -1163,9 +1323,9 @@ def _batched_write_command( # Write document lengths and request id length = buf.tell() buf.seek(list_start) - buf.write(struct.pack('