diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index 7e60f9701..8de378e0f 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -926,6 +926,10 @@ _command_buffer_new(char* ns, int ns_len) { return buffer; } +#define _INSERT 0 +#define _UPDATE 1 +#define _DELETE 2 + static PyObject* _cbson_do_batched_write_command(PyObject* self, PyObject* args) { struct module_state *state = GETSTATE(self); @@ -937,7 +941,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { int cmd_len_loc; int lst_len_loc; int ns_len; - char *ns = NULL, *cmd = NULL; + int ordered; + char *ns = NULL; PyObject* max_bson_size_obj; PyObject* command; PyObject* doc; @@ -946,16 +951,16 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { PyObject* iterator; PyObject* result; PyObject* results; + unsigned char op; unsigned char check_keys; - unsigned char ordered; unsigned char uuid_subtype; unsigned char empty = 1; unsigned char errors = 0; buffer_t buffer; - if (!PyArg_ParseTuple(args, "et#sOObbbO", "utf-8", - &ns, &ns_len, &cmd, &command, &docs, - &check_keys, &ordered, &uuid_subtype, &client)) { + if (!PyArg_ParseTuple(args, "et#bOObbO", "utf-8", + &ns, &ns_len, &op, &command, &docs, + &check_keys, &uuid_subtype, &client)) { return NULL; } @@ -976,6 +981,9 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { */ max_cmd_size = max_bson_size + 16382; + /* Default to True */ + ordered = !((PyDict_GetItemString(command, "ordered")) == Py_False); + if (!(results = PyList_New(0))) { PyMem_Free(ns); return NULL; @@ -998,14 +1006,14 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { /* Write type byte for array */ *(buffer_get_buffer(buffer) + (buffer_get_position(buffer) - 1)) = 0x4; - switch (*cmd) { - case 'i': + switch (op) { + case _INSERT: { if (!buffer_write_bytes(buffer, "documents\x00", 10)) goto cmdfail; break; } - case 'u': + case _UPDATE: { /* MongoDB does key validation for update. */ check_keys = 0; @@ -1013,7 +1021,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { goto cmdfail; break; } - case 'd': + case _DELETE: { /* Never check keys in a delete command. */ check_keys = 0; @@ -1025,17 +1033,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { { PyObject* InvalidOperation = _error("InvalidOperation"); if (InvalidOperation) { -#if PY_MAJOR_VERSION >= 3 - PyObject* error = PyUnicode_FromFormat("Unknown command: %s", - cmd); -#else - PyObject* error = PyString_FromFormat("Unknown command: %s", - cmd); -#endif - if (error) { - PyErr_SetObject(InvalidOperation, error); - Py_DECREF(error); - } + PyErr_SetString(InvalidOperation, "Unknown command"); Py_DECREF(InvalidOperation); } goto cmdfail; @@ -1085,9 +1083,9 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { /* This single document is too large for the command. */ if (!idx) { - if (*cmd == 'i') { /* Insert */ + if (op == _INSERT) { _set_document_too_large(cur_size, max_bson_size); - } else { /* Update and delete */ + } else { PyObject* InvalidDocument = _error("InvalidDocument"); if (InvalidDocument) { /* diff --git a/pymongo/collection.py b/pymongo/collection.py index c98224439..52568faca 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -25,6 +25,7 @@ from pymongo import (common, from pymongo.cursor import Cursor from pymongo.errors import InvalidName from pymongo.helpers import _check_command_response +from pymongo.message import _INSERT, _UPDATE, _DELETE try: @@ -359,15 +360,13 @@ class Collection(common.BaseObject): if client.max_wire_version > 1 and safe: # Insert command - dbname, collname = self.__full_name.split('.', 1) - namespace = '%s.%s' % (dbname, '$cmd') - command = SON([('insert', collname), + command = SON([('insert', self.name), ('ordered', not continue_on_error), ('writeConcern', options)]) results = message._do_batched_write_command( - namespace, 'insert', command, gen(), check_keys, - not continue_on_error, self.uuid_subtype, client) + self.database.name + ".$cmd", _INSERT, command, + gen(), check_keys, self.uuid_subtype, client) errors = [result for result in results if not result[1]['ok']] if errors: @@ -383,7 +382,7 @@ class Collection(common.BaseObject): # but we have to add the 'ok' field if we're passing it # a subdocument from errDetails. error['ok'] = 0 - _check_command_response(error, None) + _check_command_response(error, client.disconnect) else: # Legacy batched OP_INSERT message._do_batched_insert(self.__full_name, gen(), check_keys, @@ -512,19 +511,17 @@ class Collection(common.BaseObject): client = self.database.connection if client.max_wire_version > 1 and safe: # Update command - dbname, collname = self.__full_name.split('.', 1) - namespace = '%s.%s' % (dbname, '$cmd') - command = SON([('update', collname), + command = SON([('update', self.name), ('writeConcern', options)]) docs = [SON([('q', spec), ('u', document), ('multi', multi), ('upsert', upsert)])] _, result = message._do_batched_write_command( - namespace, 'update', command, docs, - check_keys, True, self.uuid_subtype, client)[0] + self.database.name + '.$cmd', _UPDATE, command, + docs, check_keys, self.uuid_subtype, client)[0] if not result['ok']: - _check_command_response(result, None) + _check_command_response(result, client.disconnect) # Add the updatedExisting field for compatibility if result.get('n') and 'upserted' not in result: @@ -625,18 +622,16 @@ class Collection(common.BaseObject): client = self.database.connection if client.max_wire_version > 1 and safe: # Delete command - dbname, collname = self.__full_name.split('.', 1) - namespace = '%s.%s' % (dbname, '$cmd') - command = SON([('delete', collname), + command = SON([('delete', self.name), ('writeConcern', options)]) docs = [SON([('q', spec_or_id), ('limit', 0)])] _, result = message._do_batched_write_command( - namespace, 'delete', command, docs, - False, True, self.uuid_subtype, client)[0] + self.database.name + '.$cmd', _DELETE, command, + docs, False, self.uuid_subtype, client)[0] if not result['ok']: - _check_command_response(result, None) + _check_command_response(result, client.disconnect) return result diff --git a/pymongo/message.py b/pymongo/message.py index 74b667107..282a40d17 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -40,6 +40,10 @@ from pymongo.errors import InvalidDocument, InvalidOperation, OperationFailure MAX_INT32 = 2147483647 MIN_INT32 = -2147483648 +_INSERT = 0 +_UPDATE = 1 +_DELETE = 2 + _EMPTY = b('') _BSONOBJ = b('\x03') _ZERO_8 = b('\x00') @@ -47,10 +51,10 @@ _ZERO_16 = b('\x00\x00') _ZERO_32 = b('\x00\x00\x00\x00') _ZERO_64 = b('\x00\x00\x00\x00\x00\x00\x00\x00') _SKIPLIM = b('\x00\x00\x00\x00\xff\xff\xff\xff') -_CMD_MAP = { - 'insert': b('\x04documents\x00\x00\x00\x00\x00'), - 'update': b('\x04updates\x00\x00\x00\x00\x00'), - 'delete': b('\x04deletes\x00\x00\x00\x00\x00'), +_OP_MAP = { + _INSERT: b('\x04documents\x00\x00\x00\x00\x00'), + _UPDATE: b('\x04updates\x00\x00\x00\x00\x00'), + _DELETE: b('\x04deletes\x00\x00\x00\x00\x00'), } @@ -266,8 +270,8 @@ if _use_c: _do_batched_insert = _cmessage._do_batched_insert -def _do_batched_write_command(namespace, name, command, docs, - check_keys, ordered, uuid_subtype, client): +def _do_batched_write_command(namespace, operation, command, + docs, check_keys, uuid_subtype, client): """Execute a batch of insert, update, or delete commands. """ max_bson_size = client.max_bson_size @@ -275,6 +279,8 @@ def _do_batched_write_command(namespace, name, command, docs, # XXX: This should come from the server - SERVER-10643 max_cmd_size = max_bson_size + 16382 + ordered = command.get('ordered', True) + buf = StringIO() # Save space for message length and request id buf.write(_ZERO_64) @@ -297,11 +303,11 @@ def _do_batched_write_command(namespace, name, command, docs, # Work around some Jython weirdness. buf.truncate() try: - buf.write(_CMD_MAP[name]) + buf.write(_OP_MAP[operation]) except KeyError: - raise InvalidOperation('Unknown command: %s' % (name,)) + raise InvalidOperation('Unknown command') - if name in ('update', 'delete'): + if operation in (_UPDATE, _DELETE): check_keys = False # Where to write list document length @@ -359,7 +365,7 @@ def _do_batched_write_command(namespace, name, command, docs, # Send a batch? if (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size: if not idx: - if name == 'insert': + if operation == _INSERT: raise InvalidDocument("BSON document too large (%d bytes)" " - the connected server supports" " BSON document sizes up to %d"