Clean up write commands support PYTHON-554

A few changes:

- Eliminate the ordered parameter - no reason this needs to be a
separate param.
- Use integers to represent the operation instead of strings - this
will make part of the bulk API implementation a bit cleaner and
avoid a bunch of string comparisons.
- Clean up namespace and collection name handling in collection.py - No
need to create a bunch of new strings or do unnecessary string copies.
- Pass client.disconnect to _check_command_response to deal with
"not master".
This commit is contained in:
Bernie Hackett 2013-12-01 08:59:07 -05:00
parent caf83b4e02
commit 681205ab5a
3 changed files with 49 additions and 50 deletions

View File

@ -926,6 +926,10 @@ _command_buffer_new(char* ns, int ns_len) {
return buffer;
}
#define _INSERT 0
#define _UPDATE 1
#define _DELETE 2
static PyObject*
_cbson_do_batched_write_command(PyObject* self, PyObject* args) {
struct module_state *state = GETSTATE(self);
@ -937,7 +941,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
int cmd_len_loc;
int lst_len_loc;
int ns_len;
char *ns = NULL, *cmd = NULL;
int ordered;
char *ns = NULL;
PyObject* max_bson_size_obj;
PyObject* command;
PyObject* doc;
@ -946,16 +951,16 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
PyObject* iterator;
PyObject* result;
PyObject* results;
unsigned char op;
unsigned char check_keys;
unsigned char ordered;
unsigned char uuid_subtype;
unsigned char empty = 1;
unsigned char errors = 0;
buffer_t buffer;
if (!PyArg_ParseTuple(args, "et#sOObbbO", "utf-8",
&ns, &ns_len, &cmd, &command, &docs,
&check_keys, &ordered, &uuid_subtype, &client)) {
if (!PyArg_ParseTuple(args, "et#bOObbO", "utf-8",
&ns, &ns_len, &op, &command, &docs,
&check_keys, &uuid_subtype, &client)) {
return NULL;
}
@ -976,6 +981,9 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
*/
max_cmd_size = max_bson_size + 16382;
/* Default to True */
ordered = !((PyDict_GetItemString(command, "ordered")) == Py_False);
if (!(results = PyList_New(0))) {
PyMem_Free(ns);
return NULL;
@ -998,14 +1006,14 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
/* Write type byte for array */
*(buffer_get_buffer(buffer) + (buffer_get_position(buffer) - 1)) = 0x4;
switch (*cmd) {
case 'i':
switch (op) {
case _INSERT:
{
if (!buffer_write_bytes(buffer, "documents\x00", 10))
goto cmdfail;
break;
}
case 'u':
case _UPDATE:
{
/* MongoDB does key validation for update. */
check_keys = 0;
@ -1013,7 +1021,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
goto cmdfail;
break;
}
case 'd':
case _DELETE:
{
/* Never check keys in a delete command. */
check_keys = 0;
@ -1025,17 +1033,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
{
PyObject* InvalidOperation = _error("InvalidOperation");
if (InvalidOperation) {
#if PY_MAJOR_VERSION >= 3
PyObject* error = PyUnicode_FromFormat("Unknown command: %s",
cmd);
#else
PyObject* error = PyString_FromFormat("Unknown command: %s",
cmd);
#endif
if (error) {
PyErr_SetObject(InvalidOperation, error);
Py_DECREF(error);
}
PyErr_SetString(InvalidOperation, "Unknown command");
Py_DECREF(InvalidOperation);
}
goto cmdfail;
@ -1085,9 +1083,9 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
/* This single document is too large for the command. */
if (!idx) {
if (*cmd == 'i') { /* Insert */
if (op == _INSERT) {
_set_document_too_large(cur_size, max_bson_size);
} else { /* Update and delete */
} else {
PyObject* InvalidDocument = _error("InvalidDocument");
if (InvalidDocument) {
/*

View File

@ -25,6 +25,7 @@ from pymongo import (common,
from pymongo.cursor import Cursor
from pymongo.errors import InvalidName
from pymongo.helpers import _check_command_response
from pymongo.message import _INSERT, _UPDATE, _DELETE
try:
@ -359,15 +360,13 @@ class Collection(common.BaseObject):
if client.max_wire_version > 1 and safe:
# Insert command
dbname, collname = self.__full_name.split('.', 1)
namespace = '%s.%s' % (dbname, '$cmd')
command = SON([('insert', collname),
command = SON([('insert', self.name),
('ordered', not continue_on_error),
('writeConcern', options)])
results = message._do_batched_write_command(
namespace, 'insert', command, gen(), check_keys,
not continue_on_error, self.uuid_subtype, client)
self.database.name + ".$cmd", _INSERT, command,
gen(), check_keys, self.uuid_subtype, client)
errors = [result for result in results if not result[1]['ok']]
if errors:
@ -383,7 +382,7 @@ class Collection(common.BaseObject):
# but we have to add the 'ok' field if we're passing it
# a subdocument from errDetails.
error['ok'] = 0
_check_command_response(error, None)
_check_command_response(error, client.disconnect)
else:
# Legacy batched OP_INSERT
message._do_batched_insert(self.__full_name, gen(), check_keys,
@ -512,19 +511,17 @@ class Collection(common.BaseObject):
client = self.database.connection
if client.max_wire_version > 1 and safe:
# Update command
dbname, collname = self.__full_name.split('.', 1)
namespace = '%s.%s' % (dbname, '$cmd')
command = SON([('update', collname),
command = SON([('update', self.name),
('writeConcern', options)])
docs = [SON([('q', spec), ('u', document),
('multi', multi), ('upsert', upsert)])]
_, result = message._do_batched_write_command(
namespace, 'update', command, docs,
check_keys, True, self.uuid_subtype, client)[0]
self.database.name + '.$cmd', _UPDATE, command,
docs, check_keys, self.uuid_subtype, client)[0]
if not result['ok']:
_check_command_response(result, None)
_check_command_response(result, client.disconnect)
# Add the updatedExisting field for compatibility
if result.get('n') and 'upserted' not in result:
@ -625,18 +622,16 @@ class Collection(common.BaseObject):
client = self.database.connection
if client.max_wire_version > 1 and safe:
# Delete command
dbname, collname = self.__full_name.split('.', 1)
namespace = '%s.%s' % (dbname, '$cmd')
command = SON([('delete', collname),
command = SON([('delete', self.name),
('writeConcern', options)])
docs = [SON([('q', spec_or_id), ('limit', 0)])]
_, result = message._do_batched_write_command(
namespace, 'delete', command, docs,
False, True, self.uuid_subtype, client)[0]
self.database.name + '.$cmd', _DELETE, command,
docs, False, self.uuid_subtype, client)[0]
if not result['ok']:
_check_command_response(result, None)
_check_command_response(result, client.disconnect)
return result

View File

@ -40,6 +40,10 @@ from pymongo.errors import InvalidDocument, InvalidOperation, OperationFailure
MAX_INT32 = 2147483647
MIN_INT32 = -2147483648
_INSERT = 0
_UPDATE = 1
_DELETE = 2
_EMPTY = b('')
_BSONOBJ = b('\x03')
_ZERO_8 = b('\x00')
@ -47,10 +51,10 @@ _ZERO_16 = b('\x00\x00')
_ZERO_32 = b('\x00\x00\x00\x00')
_ZERO_64 = b('\x00\x00\x00\x00\x00\x00\x00\x00')
_SKIPLIM = b('\x00\x00\x00\x00\xff\xff\xff\xff')
_CMD_MAP = {
'insert': b('\x04documents\x00\x00\x00\x00\x00'),
'update': b('\x04updates\x00\x00\x00\x00\x00'),
'delete': b('\x04deletes\x00\x00\x00\x00\x00'),
_OP_MAP = {
_INSERT: b('\x04documents\x00\x00\x00\x00\x00'),
_UPDATE: b('\x04updates\x00\x00\x00\x00\x00'),
_DELETE: b('\x04deletes\x00\x00\x00\x00\x00'),
}
@ -266,8 +270,8 @@ if _use_c:
_do_batched_insert = _cmessage._do_batched_insert
def _do_batched_write_command(namespace, name, command, docs,
check_keys, ordered, uuid_subtype, client):
def _do_batched_write_command(namespace, operation, command,
docs, check_keys, uuid_subtype, client):
"""Execute a batch of insert, update, or delete commands.
"""
max_bson_size = client.max_bson_size
@ -275,6 +279,8 @@ def _do_batched_write_command(namespace, name, command, docs,
# XXX: This should come from the server - SERVER-10643
max_cmd_size = max_bson_size + 16382
ordered = command.get('ordered', True)
buf = StringIO()
# Save space for message length and request id
buf.write(_ZERO_64)
@ -297,11 +303,11 @@ def _do_batched_write_command(namespace, name, command, docs,
# Work around some Jython weirdness.
buf.truncate()
try:
buf.write(_CMD_MAP[name])
buf.write(_OP_MAP[operation])
except KeyError:
raise InvalidOperation('Unknown command: %s' % (name,))
raise InvalidOperation('Unknown command')
if name in ('update', 'delete'):
if operation in (_UPDATE, _DELETE):
check_keys = False
# Where to write list document length
@ -359,7 +365,7 @@ def _do_batched_write_command(namespace, name, command, docs,
# Send a batch?
if (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size:
if not idx:
if name == 'insert':
if operation == _INSERT:
raise InvalidDocument("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"