PYTHON-952 - Bulk write operations monitoring

This change adds monitoring of bulk write operations (i.e.
Collection.bulk_write, Collection.insert_many, Collection.insert with multiple
documents, Bulk.execute, etc.). It also fixes bugs in conversion of legacy
write results to write command result format and conversion of legacy queries
to find command documents. Finally, it adds an operation_id attribute to the
published events to tie related events together.
This commit is contained in:
Bernie Hackett 2015-09-10 09:10:19 -07:00
parent c4a82814f4
commit b80fa6d632
8 changed files with 763 additions and 314 deletions

View File

@ -580,10 +580,10 @@ _set_document_too_large(int size, long max) {
}
static PyObject*
_send_insert(PyObject* self, PyObject* sock_info,
_send_insert(PyObject* self, PyObject* ctx,
PyObject* gle_args, buffer_t buffer,
char* coll_name, int coll_len, int request_id, int safe,
codec_options_t* options) {
codec_options_t* options, PyObject* to_publish) {
if (safe) {
if (!add_last_error(self, buffer, request_id,
@ -594,13 +594,14 @@ _send_insert(PyObject* self, PyObject* sock_info,
/* The max_doc_size parameter for legacy_write is the max size of any
* document in buffer. We enforced max size already, pass 0 here. */
return PyObject_CallMethod(sock_info, "legacy_write",
"i" BYTES_FORMAT_STRING "iN",
return PyObject_CallMethod(ctx, "legacy_write",
"i" BYTES_FORMAT_STRING "iNO",
request_id,
buffer_get_buffer(buffer),
buffer_get_position(buffer),
0,
PyBool_FromLong((long)safe));
PyBool_FromLong((long)safe),
to_publish);
}
static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
@ -615,11 +616,12 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
PyObject* docs;
PyObject* doc;
PyObject* iterator;
PyObject* sock_info;
PyObject* ctx;
PyObject* last_error_args;
PyObject* result;
PyObject* max_bson_size_obj;
PyObject* max_message_size_obj;
PyObject* to_publish = NULL;
unsigned char check_keys;
unsigned char safe;
unsigned char continue_on_error;
@ -638,7 +640,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
&last_error_args,
&continue_on_error,
convert_codec_options, &options,
&sock_info)) {
&ctx)) {
return NULL;
}
if (continue_on_error) {
@ -649,7 +651,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
* is True it's pointless (and slower) to send GLE.
*/
send_safe = (safe || !continue_on_error);
max_bson_size_obj = PyObject_GetAttrString(sock_info, "max_bson_size");
max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size");
#if PY_MAJOR_VERSION >= 3
max_bson_size = PyLong_AsLong(max_bson_size_obj);
#else
@ -662,7 +664,7 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
return NULL;
}
max_message_size_obj = PyObject_GetAttrString(sock_info, "max_message_size");
max_message_size_obj = PyObject_GetAttrString(ctx, "max_message_size");
#if PY_MAJOR_VERSION >= 3
max_message_size = PyLong_AsLong(max_message_size_obj);
#else
@ -692,6 +694,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
goto insertfail;
}
if (!(to_publish = PyList_New(0))) {
goto insertfail;
}
iterator = PyObject_GetIter(docs);
if (iterator == NULL) {
PyObject* InvalidOperation = _error("InvalidOperation");
@ -706,10 +712,8 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
int cur_size;
if (!write_dict(state->_cbson, buffer, doc, check_keys,
&options, 1)) {
Py_DECREF(doc);
goto iterfail;
}
Py_DECREF(doc);
cur_size = buffer_get_position(buffer) - before;
if (cur_size > max_bson_size) {
@ -719,9 +723,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
message_length = buffer_get_position(buffer) - length_location;
memcpy(buffer_get_buffer(buffer) + length_location,
&message_length, 4);
result = _send_insert(self, sock_info, last_error_args, buffer,
result = _send_insert(self, ctx, last_error_args, buffer,
collection_name, collection_name_length,
request_id, send_safe, &options);
request_id, send_safe, &options,
to_publish);
if (!result)
goto iterfail;
Py_DECREF(result);
@ -762,15 +767,20 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
message_length = buffer_get_position(buffer) - length_location;
memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4);
result = _send_insert(self, sock_info, last_error_args, buffer,
result = _send_insert(self, ctx, last_error_args, buffer,
collection_name, collection_name_length,
request_id, send_safe, &options);
request_id, send_safe, &options, to_publish);
buffer_free(buffer);
buffer = new_buffer;
request_id = new_request_id;
length_location = message_start;
Py_DECREF(to_publish);
if (!(to_publish = PyList_New(0))) {
goto insertfail;
}
if (!result) {
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
PyObject* OperationFailure;
@ -786,7 +796,9 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
Py_DECREF(etype);
Py_XDECREF(evalue);
Py_XDECREF(etrace);
Py_DECREF(to_publish);
Py_DECREF(iterator);
Py_DECREF(doc);
buffer_free(buffer);
PyMem_Free(collection_name);
Py_RETURN_NONE;
@ -799,6 +811,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
exc_type = etype;
exc_value = evalue;
exc_trace = etrace;
if (PyList_Append(to_publish, doc) < 0) {
goto iterfail;
}
Py_CLEAR(doc);
continue;
}
}
@ -813,6 +829,10 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
Py_DECREF(result);
}
}
if (PyList_Append(to_publish, doc) < 0) {
goto iterfail;
}
Py_CLEAR(doc);
}
Py_DECREF(iterator);
@ -833,10 +853,11 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4);
/* Send the last (or only) batch */
result = _send_insert(self, sock_info, last_error_args, buffer,
result = _send_insert(self, ctx, last_error_args, buffer,
collection_name, collection_name_length,
request_id, safe, &options);
request_id, safe, &options, to_publish);
Py_DECREF(to_publish);
PyMem_Free(collection_name);
buffer_free(buffer);
@ -859,19 +880,22 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
Py_RETURN_NONE;
iterfail:
Py_XDECREF(doc);
Py_DECREF(iterator);
insertfail:
Py_XDECREF(exc_type);
Py_XDECREF(exc_value);
Py_XDECREF(exc_trace);
Py_XDECREF(to_publish);
buffer_free(buffer);
PyMem_Free(collection_name);
return NULL;
}
static PyObject*
_send_write_command(PyObject* sock_info, buffer_t buffer,
int lst_len_loc, int cmd_len_loc, unsigned char* errors) {
_send_write_command(PyObject* ctx, buffer_t buffer, int lst_len_loc,
int cmd_len_loc, unsigned char* errors,
PyObject* to_publish) {
PyObject* result;
@ -885,11 +909,12 @@ _send_write_command(PyObject* sock_info, buffer_t buffer,
memcpy(buffer_get_buffer(buffer) + 4, &request_id, 4);
/* Send the current batch */
result = PyObject_CallMethod(sock_info, "write_command",
"i" BYTES_FORMAT_STRING,
result = PyObject_CallMethod(ctx, "write_command",
"i" BYTES_FORMAT_STRING "O",
request_id,
buffer_get_buffer(buffer),
buffer_get_position(buffer));
buffer_get_position(buffer),
to_publish);
if (result && PyDict_GetItemString(result, "writeErrors"))
*errors = 1;
return result;
@ -948,10 +973,11 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
PyObject* command;
PyObject* doc;
PyObject* docs;
PyObject* sock_info;
PyObject* ctx;
PyObject* iterator;
PyObject* result;
PyObject* results;
PyObject* to_publish = NULL;
unsigned char op;
unsigned char check_keys;
codec_options_t options;
@ -962,11 +988,11 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
if (!PyArg_ParseTuple(args, "et#bOObO&O", "utf-8",
&ns, &ns_len, &op, &command, &docs, &check_keys,
convert_codec_options, &options,
&sock_info)) {
&ctx)) {
return NULL;
}
max_bson_size_obj = PyObject_GetAttrString(sock_info, "max_bson_size");
max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size");
#if PY_MAJOR_VERSION >= 3
max_bson_size = PyLong_AsLong(max_bson_size_obj);
#else
@ -984,7 +1010,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
*/
max_cmd_size = max_bson_size + 16382;
max_write_batch_size_obj = PyObject_GetAttrString(sock_info, "max_write_batch_size");
max_write_batch_size_obj = PyObject_GetAttrString(ctx, "max_write_batch_size");
#if PY_MAJOR_VERSION >= 3
max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj);
#else
@ -1006,10 +1032,18 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
return NULL;
}
if (!(to_publish = PyList_New(0))) {
destroy_codec_options(&options);
PyMem_Free(ns);
Py_DECREF(results);
return NULL;
}
if (!(buffer = _command_buffer_new(ns, ns_len))) {
destroy_codec_options(&options);
PyMem_Free(ns);
Py_DECREF(results);
Py_DECREF(to_publish);
return NULL;
}
@ -1086,16 +1120,13 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
INT2STRING(key, idx);
if (!buffer_write_bytes(buffer, "\x03", 1) ||
!buffer_write_bytes(buffer, key, (int)strlen(key) + 1)) {
Py_DECREF(doc);
goto cmditerfail;
}
cur_doc_begin = buffer_get_position(buffer);
if (!write_dict(state->_cbson, buffer, doc,
check_keys, &options, 1)) {
Py_DECREF(doc);
goto cmditerfail;
}
Py_DECREF(doc);
/* We have enough data, maybe send this batch. */
enough_data = (buffer_get_position(buffer) > max_cmd_size);
@ -1147,8 +1178,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
goto cmditerfail;
}
result = _send_write_command(sock_info, buffer,
lst_len_loc, cmd_len_loc, &errors);
result = _send_write_command(ctx, buffer, lst_len_loc,
cmd_len_loc, &errors, to_publish);
buffer_free(buffer);
buffer = new_buffer;
@ -1166,7 +1197,10 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
if (!result)
goto cmditerfail;
PyList_Append(results, result);
if (PyList_Append(results, result) < 0) {
Py_DECREF(result);
goto cmditerfail;
}
Py_DECREF(result);
if (errors && ordered) {
@ -1177,7 +1211,15 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
}
idx_offset += idx;
idx = 0;
Py_DECREF(to_publish);
if (!(to_publish = PyList_New(0))) {
goto cmditerfail;
}
}
if (PyList_Append(to_publish, doc) < 0) {
goto cmditerfail;
}
Py_CLEAR(doc);
idx += 1;
}
Py_DECREF(iterator);
@ -1198,8 +1240,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
if (!buffer_write_bytes(buffer, "\x00\x00", 2))
goto cmdfail;
result = _send_write_command(sock_info, buffer,
lst_len_loc, cmd_len_loc, &errors);
result = _send_write_command(ctx, buffer, lst_len_loc,
cmd_len_loc, &errors, to_publish);
if (!result)
goto cmdfail;
@ -1213,16 +1255,22 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
buffer_free(buffer);
PyList_Append(results, result);
if (PyList_Append(results, result) < 0) {
Py_DECREF(result);
goto cmdfail;
}
Py_DECREF(result);
Py_DECREF(to_publish);
destroy_codec_options(&options);
return results;
cmditerfail:
Py_XDECREF(doc);
Py_DECREF(iterator);
cmdfail:
destroy_codec_options(&options);
Py_DECREF(results);
Py_XDECREF(to_publish);
buffer_free(buffer);
return NULL;
}

View File

@ -29,7 +29,9 @@ from pymongo.errors import (BulkWriteError,
InvalidOperation,
OperationFailure)
from pymongo.message import (_INSERT, _UPDATE, _DELETE,
_do_batched_write_command)
_do_batched_write_command,
_randint,
_BulkWriteContext)
from pymongo.write_concern import WriteConcern
@ -181,11 +183,13 @@ def _merge_command(run, full_result, results):
write_errors = result.get("writeErrors")
if write_errors:
for doc in write_errors:
# Leave the server response intact for APM.
replacement = doc.copy()
idx = doc["index"] + offset
doc["index"] = run.index(idx)
replacement["index"] = run.index(idx)
# Add the failed operation to the error document.
doc[_UOP] = run.ops[idx]
full_result["writeErrors"].extend(write_errors)
replacement[_UOP] = run.ops[idx]
full_result["writeErrors"].append(replacement)
wc_error = result.get("writeConcernError")
if wc_error:
@ -276,15 +280,19 @@ class _Bulk(object):
"nRemoved": 0,
"upserted": [],
}
op_id = _randint()
db_name = self.collection.database.name
for run in generator:
cmd = SON([(_COMMANDS[run.op_type], self.collection.name),
('ordered', self.ordered)])
if write_concern.document:
cmd['writeConcern'] = write_concern.document
bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id)
results = _do_batched_write_command(
self.namespace, run.op_type, cmd,
run.ops, True, self.collection.codec_options, sock_info)
run.ops, True, self.collection.codec_options, bwc)
_merge_command(run, full_result, results)
# We're supposed to continue if errors are
@ -306,6 +314,7 @@ class _Bulk(object):
# If ordered is True we have to send GLE or use write
# commands so we can abort on the first error.
write_concern = WriteConcern(w=int(self.ordered))
op_id = _randint()
for run in generator:
try:
@ -313,7 +322,8 @@ class _Bulk(object):
coll._insert(sock_info,
run.ops,
self.ordered,
write_concern=write_concern)
write_concern=write_concern,
op_id=op_id)
elif run.op_type == _UPDATE:
for operation in run.ops:
doc = operation['u']
@ -326,13 +336,15 @@ class _Bulk(object):
operation['upsert'],
check_keys,
operation['multi'],
write_concern=write_concern)
write_concern=write_concern,
op_id=op_id)
else:
for operation in run.ops:
coll._delete(sock_info,
operation['q'],
not operation['limit'],
write_concern)
write_concern,
op_id)
except OperationFailure:
if self.ordered:
break
@ -350,6 +362,7 @@ class _Bulk(object):
"nRemoved": 0,
"upserted": [],
}
op_id = _randint()
stop = False
for run in generator:
for idx, operation in enumerate(run.ops):
@ -360,7 +373,8 @@ class _Bulk(object):
if run.op_type == _INSERT:
coll._insert(sock_info,
operation,
write_concern=write_concern)
write_concern=write_concern,
op_id=op_id)
result = {}
elif run.op_type == _UPDATE:
doc = operation['u']
@ -373,12 +387,14 @@ class _Bulk(object):
operation['upsert'],
check_keys,
operation['multi'],
write_concern=write_concern)
write_concern=write_concern,
op_id=op_id)
else:
result = coll._delete(sock_info,
operation['q'],
not operation['limit'],
write_concern)
write_concern,
op_id)
_merge_legacy(run, full_result, result, idx)
except DocumentTooLarge as exc:
# MongoDB 2.6 uses error code 2 for "too large".

View File

@ -35,7 +35,6 @@ from pymongo.command_cursor import CommandCursor
from pymongo.cursor import Cursor
from pymongo.errors import ConfigurationError, InvalidName, OperationFailure
from pymongo.helpers import _check_write_command_response
from pymongo.message import _INSERT
from pymongo.operations import _WriteOp, IndexModel
from pymongo.read_preferences import ReadPreference
from pymongo.results import (BulkWriteResult,
@ -383,7 +382,7 @@ class Collection(common.BaseObject):
return BulkWriteResult({}, False)
def _legacy_write(
self, sock_info, name, command, acknowledged, func, *args):
self, sock_info, name, cmd, acknowledged, op_id, func, *args):
"""Internal legacy write helper."""
publish = monitoring.enabled()
if publish:
@ -392,39 +391,37 @@ class Collection(common.BaseObject):
if publish:
duration = datetime.datetime.now() - start
monitoring.publish_command_start(
command, self.__database.name, rqst_id, sock_info.address)
cmd, self.__database.name, rqst_id, sock_info.address, op_id)
start = datetime.datetime.now()
try:
result = sock_info.legacy_write(
rqst_id, msg, max_size, acknowledged)
except OperationFailure as exc:
if publish:
duration = (datetime.datetime.now() - start) + duration
dur = (datetime.datetime.now() - start) + duration
details = exc.details
# Succeed if GLE was successful and this is a write error.
# XXX: Is checking if "n" is in details the best way to
# differentiate write errors from something else?
if details.get("ok") and "n" in details:
reply = helpers._upconvert_write_result(
name, command, details)
reply = message._convert_write_result(
name, cmd, details)
monitoring.publish_command_success(
duration, reply, name, rqst_id, sock_info.address)
dur, reply, name, rqst_id, sock_info.address, op_id)
else:
monitoring.publish_command_failure(
duration, details, name, rqst_id, sock_info.address)
dur, details, name, rqst_id, sock_info.address, op_id)
raise
if publish:
# No result for w=0
reply = None
# Comply with APM spec.
reply = {'ok': 1}
if result:
reply = helpers._upconvert_write_result(name, command, result)
reply = message._convert_write_result(name, cmd, result)
duration = (datetime.datetime.now() - start) + duration
monitoring.publish_command_success(
duration, reply, name, rqst_id, sock_info.address)
duration, reply, name, rqst_id, sock_info.address, op_id)
return result
def _insert_one(
self, sock_info, doc, check_keys, manipulate, write_concern):
self, sock_info, doc, check_keys, manipulate, write_concern, op_id):
"""Internal helper for inserting a single document."""
if manipulate:
doc = self.__database._apply_incoming_manipulators(doc, self)
@ -435,9 +432,9 @@ class Collection(common.BaseObject):
concern = (write_concern or self.write_concern).document
acknowledged = concern.get("w") != 0
command = SON([('insert', self.name),
('documents', [doc]),
('ordered', True)])
if acknowledged and concern:
('ordered', True),
('documents', [doc])])
if concern:
command['writeConcern'] = concern
if sock_info.max_wire_version > 1 and acknowledged:
@ -450,17 +447,17 @@ class Collection(common.BaseObject):
else:
# Legacy OP_INSERT.
self._legacy_write(
sock_info, 'insert', command, acknowledged,
sock_info, 'insert', command, acknowledged, op_id,
message.insert, self.__full_name, [doc], check_keys,
acknowledged, concern, False, self.codec_options)
return doc.get('_id')
def _insert(self, sock_info, docs, ordered=True,
check_keys=True, manipulate=False, write_concern=None):
def _insert(self, sock_info, docs, ordered=True, check_keys=True,
manipulate=False, write_concern=None, op_id=None):
"""Internal insert helper."""
if isinstance(docs, collections.MutableMapping):
return self._insert_one(
sock_info, docs, check_keys, manipulate, write_concern)
sock_info, docs, check_keys, manipulate, write_concern, op_id)
ids = []
@ -489,25 +486,27 @@ class Collection(common.BaseObject):
yield doc
concern = (write_concern or self.write_concern).document
safe = concern.get("w") != 0
acknowledged = concern.get("w") != 0
if sock_info.max_wire_version > 1 and safe:
command = SON([('insert', self.name),
('ordered', ordered)])
if concern:
command['writeConcern'] = concern
if op_id is None:
op_id = message._randint()
bwc = message._BulkWriteContext(
self.database.name, command, sock_info, op_id)
if sock_info.max_wire_version > 1 and acknowledged:
# Batched insert command.
command = SON([('insert', self.name),
('ordered', ordered)])
if concern:
command['writeConcern'] = concern
results = message._do_batched_write_command(
self.database.name + ".$cmd", _INSERT, command,
gen(), check_keys, self.codec_options, sock_info)
self.database.name + ".$cmd", message._INSERT, command,
gen(), check_keys, self.codec_options, bwc)
_check_write_command_response(results)
else:
# Legacy batched OP_INSERT.
message._do_batched_insert(self.__full_name, gen(), check_keys,
safe, concern, not ordered,
self.codec_options, sock_info)
acknowledged, concern, not ordered,
self.codec_options, bwc)
return ids
def insert_one(self, document):
@ -572,7 +571,7 @@ class Collection(common.BaseObject):
if "_id" not in document:
document["_id"] = ObjectId()
inserted_ids.append(document["_id"])
yield (_INSERT, document)
yield (message._INSERT, document)
blk = _Bulk(self, ordered)
blk.ops = [doc for doc in gen()]
@ -581,7 +580,7 @@ class Collection(common.BaseObject):
def _update(self, sock_info, criteria, document, upsert=False,
check_keys=True, multi=False, manipulate=False,
write_concern=None):
write_concern=None, op_id=None):
"""Internal update / replace helper."""
common.validate_boolean("upsert", upsert)
if manipulate:
@ -589,12 +588,12 @@ class Collection(common.BaseObject):
concern = (write_concern or self.write_concern).document
acknowledged = concern.get("w") != 0
command = SON([('update', self.name),
('ordered', True),
('updates', [SON([('q', criteria),
('u', document),
('multi', multi),
('upsert', upsert)])]),
('ordered', True)])
if acknowledged and concern:
('upsert', upsert)])])])
if concern:
command['writeConcern'] = concern
if sock_info.max_wire_version > 1 and acknowledged:
# Update command.
@ -619,9 +618,9 @@ class Collection(common.BaseObject):
else:
# Legacy OP_UPDATE.
return self._legacy_write(
sock_info, 'update', command, acknowledged, message.update,
self.__full_name, upsert, multi, criteria, document,
acknowledged, concern, check_keys, self.codec_options)
sock_info, 'update', command, acknowledged, op_id,
message.update, self.__full_name, upsert, multi, criteria,
document, acknowledged, concern, check_keys, self.codec_options)
def replace_one(self, filter, replacement, upsert=False):
"""Replace a single document matching the filter.
@ -758,16 +757,17 @@ class Collection(common.BaseObject):
"""
self.__database.drop_collection(self.__name)
def _delete(self, sock_info, criteria, multi, write_concern=None):
def _delete(
self, sock_info, criteria, multi, write_concern=None, op_id=None):
"""Internal delete helper."""
common.validate_is_mapping("filter", criteria)
concern = (write_concern or self.write_concern).document
acknowledged = concern.get("w") != 0
command = SON([('delete', self.name),
('ordered', True),
('deletes', [SON([('q', criteria),
('limit', int(not multi))])]),
('ordered', True)])
if acknowledged and concern:
('limit', int(not multi))])])])
if concern:
command['writeConcern'] = concern
if sock_info.max_wire_version > 1 and acknowledged:
@ -780,9 +780,9 @@ class Collection(common.BaseObject):
else:
# Legacy OP_DELETE.
return self._legacy_write(
sock_info, 'delete', command, acknowledged, message.delete,
self.__full_name, criteria, acknowledged, concern,
self.codec_options, int(not multi))
sock_info, 'delete', command, acknowledged, op_id,
message.delete, self.__full_name, criteria, acknowledged,
concern, self.codec_options, int(not multi))
def delete_one(self, filter):
"""Delete a single document matching the filter.

View File

@ -108,7 +108,7 @@ def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok" : 0, "errmsg" : msg, "code" : 43}
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
@ -274,47 +274,6 @@ def _check_write_command_response(results):
error.get("errmsg"), error.get("code"), error)
def _upconvert_write_result(operation, command, result):
"""Convert a legacy write result to write commmand format."""
# Based on _merge_legacy from bulk.py
affected = result.get("n", 0)
res = {"ok": 1, "n": affected}
errmsg = result.get("errmsg", result.get("err", ""))
if errmsg:
# The write was successful on at least the primary so don't return.
if result.get("wtimeout"):
res["writeConcernError"] = {"errmsg": errmsg,
"code": 64,
"errInfo": {"wtimeout": True}}
else:
# The write failed.
error = {"index": 0,
"code": result.get("code", 8),
"errmsg": errmsg}
if "errInfo" in result:
error["errInfo"] = result["errInfo"]
res["writeErrors"] = [error]
return res
if operation == "insert":
# GLE result for insert is always 0 in most MongoDB versions.
res["n"] = 1
elif operation == "update":
res["nModified"] = 0
if "upserted" in result:
res["upserted"] = [{"index": 0, "_id": result["upserted"]}]
# Versions of MongoDB before 2.6 don't return the _id for an
# upsert if _id is not an ObjectId.
elif result.get("updatedExisting") is False and affected == 1:
# If _id is in both the update document *and* the query spec
# the update document _id takes precedence.
_id = command["u"].get("_id", command["q"].get("_id"))
res["upserted"] = [{"index": 0, "_id": _id}]
else:
res["nModified"] = affected
return res
def _fields_list_to_dict(fields, option_name):
"""Takes a sequence of field names and returns a matching dictionary.

View File

@ -20,6 +20,7 @@ MongoDB.
application developers.
"""
import datetime
import random
import struct
@ -32,6 +33,7 @@ try:
_use_c = True
except ImportError:
_use_c = False
from pymongo import monitoring
from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure
from pymongo.read_preferences import ReadPreference
@ -57,6 +59,11 @@ _OP_MAP = {
}
def _randint():
"""Generate a pseudo random 32 bit integer."""
return random.randint(MIN_INT32, MAX_INT32)
def _maybe_add_read_preference(spec, read_preference):
"""Add $readPreference to spec when appropriate."""
mode = read_preference.mode
@ -75,6 +82,44 @@ def _maybe_add_read_preference(spec, read_preference):
return spec
def _convert_write_result(operation, command, result):
"""Convert a legacy write result to write commmand format."""
# Based on _merge_legacy from bulk.py
affected = result.get("n", 0)
res = {"ok": 1, "n": affected}
errmsg = result.get("errmsg", result.get("err", ""))
if errmsg:
# The write was successful on at least the primary so don't return.
if result.get("wtimeout"):
res["writeConcernError"] = {"errmsg": errmsg,
"code": 64,
"errInfo": {"wtimeout": True}}
else:
# The write failed.
error = {"index": 0,
"code": result.get("code", 8),
"errmsg": errmsg}
if "errInfo" in result:
error["errInfo"] = result["errInfo"]
res["writeErrors"] = [error]
return res
if operation == "insert":
# GLE result for insert is always 0 in most MongoDB versions.
res["n"] = len(command['documents'])
elif operation == "update":
if "upserted" in result:
res["upserted"] = [{"index": 0, "_id": result["upserted"]}]
# Versions of MongoDB before 2.6 don't return the _id for an
# upsert if _id is not an ObjectId.
elif result.get("updatedExisting") is False and affected == 1:
# If _id is in both the update document *and* the query spec
# the update document _id takes precedence.
_id = command["u"].get("_id", command["q"].get("_id"))
res["upserted"] = [{"index": 0, "_id": _id}]
return res
_OPTIONS = SON([
('tailable', 2),
('oplogReplay', 8),
@ -90,7 +135,6 @@ _MODIFIERS = SON([
('$comment', 'comment'),
('$maxScan', 'maxScan'),
('$maxTimeMS', 'maxTimeMS'),
('$readPreference', 'readPreference'),
('$max', 'max'),
('$min', 'min'),
('$returnKey', 'returnKey'),
@ -121,12 +165,11 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options):
if skip:
cmd['skip'] = skip
if limit:
cmd['limit'] = limit
cmd['limit'] = abs(limit)
if limit < 0:
cmd['singleBatch'] = True
if batch_size:
cmd['batchSize'] = batch_size
# XXX: Should the check for 1 be here?
if limit < 0 or limit == 1:
cmd['singleBatch'] = True
if options:
cmd.update([(opt, True)
@ -262,7 +305,7 @@ def __pack_message(operation, data):
Returns the resultant message string.
"""
request_id = random.randint(MIN_INT32, MAX_INT32)
request_id = _randint()
message = struct.pack("<i", 16 + len(data))
message += struct.pack("<i", request_id)
message += _ZERO_32 # responseTo
@ -390,9 +433,119 @@ def kill_cursors(cursor_ids):
return __pack_message(2007, data)
_FIELD_MAP = {
'insert': 'documents',
'update': 'updates',
'delete': 'deletes'
}
class _BulkWriteContext(object):
"""A wrapper around SocketInfo for use with write splitting functions."""
__slots__ = ('db_name', 'command', 'sock_info',
'op_id', 'name', 'field', 'publish', 'start_time')
def __init__(self, database_name, command, sock_info, operation_id):
self.db_name = database_name
self.command = command
self.sock_info = sock_info
self.op_id = operation_id
self.name = next(iter(command))
self.field = _FIELD_MAP[self.name]
self.publish = monitoring.enabled()
self.start_time = datetime.datetime.now() if self.publish else None
@property
def max_bson_size(self):
"""A proxy for SockInfo.max_bson_size."""
return self.sock_info.max_bson_size
@property
def max_message_size(self):
"""A proxy for SockInfo.max_message_size."""
return self.sock_info.max_message_size
@property
def max_write_batch_size(self):
"""A proxy for SockInfo.max_write_batch_size."""
return self.sock_info.max_write_batch_size
def legacy_write(self, request_id, msg, max_doc_size, acknowledged, docs):
"""A proxy for SocketInfo.legacy_write that handles event publishing.
"""
if self.publish:
duration = datetime.datetime.now() - self.start_time
cmd = self._start(request_id, docs)
start = datetime.datetime.now()
try:
reply = self.sock_info.legacy_write(
request_id, msg, max_doc_size, acknowledged)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
self._succeed(
request_id,
_convert_write_result(self.name, cmd, reply),
duration)
except OperationFailure as exc:
if self.publish:
duration = (datetime.datetime.now() - start) + duration
self._fail(
request_id,
_convert_write_result(
self.name, cmd, exc.details),
duration)
raise
finally:
self.start_time = datetime.datetime.now()
return reply
def write_command(self, request_id, msg, docs):
"""A proxy for SocketInfo.write_command that handles event publishing.
"""
if self.publish:
duration = datetime.datetime.now() - self.start_time
self._start(request_id, docs)
start = datetime.datetime.now()
try:
reply = self.sock_info.write_command(request_id, msg)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
self._succeed(request_id, reply, duration)
except OperationFailure as exc:
if self.publish:
duration = (datetime.datetime.now() - start) + duration
self._fail(request_id, exc.details, duration)
raise
finally:
self.start_time = datetime.datetime.now()
return reply
def _start(self, request_id, docs):
"""Publish a CommandStartedEvent."""
cmd = self.command.copy()
cmd[self.field] = docs
monitoring.publish_command_start(
cmd, self.db_name,
request_id, self.sock_info.address, self.op_id)
return cmd
def _succeed(self, request_id, reply, duration):
"""Publish a CommandSucceededEvent."""
monitoring.publish_command_success(
duration, reply, self.name,
request_id, self.sock_info.address, self.op_id)
def _fail(self, request_id, failure, duration):
"""Publish a CommandFailedEvent."""
monitoring.publish_command_failure(
duration, failure, self.name,
request_id, self.sock_info.address, self.op_id)
def _do_batched_insert(collection_name, docs, check_keys,
safe, last_error_args, continue_on_error, opts,
sock_info):
ctx):
"""Insert `docs` using multiple batches.
"""
def _insert_message(insert_message, send_safe):
@ -412,14 +565,16 @@ def _do_batched_insert(collection_name, docs, check_keys,
data.write(bson._make_c_string(collection_name))
message_length = begin_loc = data.tell()
has_docs = False
to_send = []
for doc in docs:
encoded = bson.BSON.encode(doc, check_keys, opts)
encoded_length = len(encoded)
too_large = (encoded_length > sock_info.max_bson_size)
too_large = (encoded_length > ctx.max_bson_size)
message_length += encoded_length
if message_length < sock_info.max_message_size and not too_large:
if message_length < ctx.max_message_size and not too_large:
data.write(encoded)
to_send.append(doc)
has_docs = True
continue
@ -427,7 +582,7 @@ def _do_batched_insert(collection_name, docs, check_keys,
# We have enough data, send this message.
try:
request_id, msg = _insert_message(data.getvalue(), send_safe)
sock_info.legacy_write(request_id, msg, 0, send_safe)
ctx.legacy_write(request_id, msg, 0, send_safe, to_send)
# Exception type could be OperationFailure or a subtype
# (e.g. DuplicateKeyError)
except OperationFailure as exc:
@ -447,18 +602,19 @@ def _do_batched_insert(collection_name, docs, check_keys,
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(encoded_length, sock_info.max_bson_size))
(encoded_length, ctx.max_bson_size))
message_length = begin_loc + encoded_length
data.seek(begin_loc)
data.truncate()
data.write(encoded)
to_send = [doc]
if not has_docs:
raise InvalidOperation("cannot do an empty bulk insert")
request_id, msg = _insert_message(data.getvalue(), safe)
sock_info.legacy_write(request_id, msg, 0, safe)
ctx.legacy_write(request_id, msg, 0, safe, to_send)
# Re-raise any exception stored due to continue_on_error
if last_error is not None:
@ -468,11 +624,11 @@ if _use_c:
def _do_batched_write_command(namespace, operation, command,
docs, check_keys, opts, sock_info):
docs, check_keys, opts, ctx):
"""Execute a batch of insert, update, or delete commands.
"""
max_bson_size = sock_info.max_bson_size
max_write_batch_size = sock_info.max_write_batch_size
max_bson_size = ctx.max_bson_size
max_write_batch_size = ctx.max_write_batch_size
# Max BSON object size + 16k - 2 bytes for ending NUL bytes.
# Server guarantees there is enough room: SERVER-10643.
max_cmd_size = max_bson_size + 16382
@ -511,6 +667,8 @@ def _do_batched_write_command(namespace, operation, command,
# Where to write list document length
list_start = buf.tell() - 4
to_send = []
def send_message():
"""Finalize and send the current OP_QUERY message.
"""
@ -524,11 +682,11 @@ def _do_batched_write_command(namespace, operation, command,
buf.seek(command_start)
buf.write(struct.pack('<i', length - command_start))
buf.seek(4)
request_id = random.randint(MIN_INT32, MAX_INT32)
request_id = _randint()
buf.write(struct.pack('<i', request_id))
buf.seek(0)
buf.write(struct.pack('<i', length))
return sock_info.write_command(request_id, buf.getvalue())
return ctx.write_command(request_id, buf.getvalue(), to_send)
# If there are multiple batches we'll
# merge results in the caller.
@ -567,10 +725,12 @@ def _do_batched_write_command(namespace, operation, command,
idx_offset += idx
idx = 0
key = b'0'
to_send = []
buf.write(_BSONOBJ)
buf.write(key)
buf.write(_ZERO_8)
buf.write(value)
to_send.append(doc)
idx += 1
if not has_docs:

View File

@ -145,7 +145,8 @@ def _handle_exception():
del einfo
def publish_command_start(command, database_name, request_id, connection_id):
def publish_command_start(
command, database_name, request_id, connection_id, op_id=None):
"""Publish a CommandStartedEvent to all command event subscribers.
:Parameters:
@ -154,9 +155,12 @@ def publish_command_start(command, database_name, request_id, connection_id):
- `request_id`: The request id for this operation.
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `op_id`: The (optional) operation id for this operation.
"""
if op_id is None:
op_id = request_id
event = CommandStartedEvent(
command, database_name, request_id, connection_id)
command, database_name, request_id, connection_id, op_id)
for subscriber in get_subscribers(COMMAND):
try:
subscriber.started(event)
@ -165,7 +169,7 @@ def publish_command_start(command, database_name, request_id, connection_id):
def publish_command_success(
duration, reply, command_name, request_id, connection_id):
duration, reply, command_name, request_id, connection_id, op_id=None):
"""Publish a CommandSucceededEvent to all command event subscribers.
:Parameters:
@ -175,9 +179,12 @@ def publish_command_success(
- `request_id`: The request id for this operation.
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `op_id`: The (optional) operation id for this operation.
"""
if op_id is None:
op_id = request_id
event = CommandSucceededEvent(
duration, reply, command_name, request_id, connection_id)
duration, reply, command_name, request_id, connection_id, op_id)
for subscriber in get_subscribers(COMMAND):
try:
subscriber.succeeded(event)
@ -186,7 +193,7 @@ def publish_command_success(
def publish_command_failure(
duration, failure, command_name, request_id, connection_id):
duration, failure, command_name, request_id, connection_id, op_id=None):
"""Publish a CommandFailedEvent to all command event subscribers.
:Parameters:
@ -196,9 +203,12 @@ def publish_command_failure(
- `request_id`: The request id for this operation.
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `op_id`: The (optional) operation id for this operation.
"""
if op_id is None:
op_id = request_id
event = CommandFailedEvent(
duration, failure, command_name, request_id, connection_id)
duration, failure, command_name, request_id, connection_id, op_id)
for subscriber in get_subscribers(COMMAND):
try:
subscriber.failed(event)
@ -209,12 +219,13 @@ def publish_command_failure(
class _CommandEvent(object):
"""Base class for command events."""
__slots__ = ("__cmd_name", "__rqst_id", "__conn_id")
__slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id")
def __init__(self, command_name, request_id, connection_id):
def __init__(self, command_name, request_id, connection_id, operation_id):
self.__cmd_name = command_name
self.__rqst_id = request_id
self.__conn_id = connection_id
self.__op_id = operation_id
@property
def command_name(self):
@ -231,6 +242,11 @@ class _CommandEvent(object):
"""The address (host, port) of the server this command was sent to."""
return self.__conn_id
@property
def operation_id(self):
"""An id for this series of events or None."""
return self.__op_id
class CommandStartedEvent(_CommandEvent):
"""Event published when a command starts.
@ -241,17 +257,16 @@ class CommandStartedEvent(_CommandEvent):
- `request_id`: The request id for this operation.
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `operation_id`: An optional identifier for a series of related events.
"""
__slots__ = ("__cmd", "__db")
def __init__(self, command, database_name, request_id, connection_id):
def __init__(self, command, database_name, *args):
if not command:
raise ValueError("%r is not a valid command" % (command,))
# Command name must be first key.
command_name = next(iter(command))
super(CommandStartedEvent, self).__init__(command_name,
request_id,
connection_id)
super(CommandStartedEvent, self).__init__(command_name, *args)
self.__cmd = command
self.__db = database_name
@ -276,6 +291,7 @@ class CommandSucceededEvent(_CommandEvent):
- `request_id`: The request id for this operation.
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `operation_id`: An optional identifier for a series of related events.
"""
__slots__ = ("__duration_micros", "__reply")
@ -305,6 +321,7 @@ class CommandFailedEvent(_CommandEvent):
- `request_id`: The request id for this operation.
- `connection_id`: The address (host, port) of the server this command
was sent to.
- `operation_id`: An optional identifier for a series of related events.
"""
__slots__ = ("__duration_micros", "__failure")

View File

@ -58,6 +58,8 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
name = next(iter(spec))
ns = dbname + '.$cmd'
flags = 4 if slave_ok else 0
# Publish the original command document.
orig = spec
if is_mongos:
spec = message._maybe_add_read_preference(spec, read_preference)
@ -70,7 +72,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
if publish:
encoding_duration = datetime.datetime.now() - start
monitoring.publish_command_start(spec, dbname, request_id, address)
monitoring.publish_command_start(orig, dbname, request_id, address)
start = datetime.datetime.now()
sock.sendall(msg)

View File

@ -16,14 +16,17 @@ import sys
import time
import warnings
from collections import defaultdict
sys.path[0:0] = [""]
from bson.objectid import ObjectId
from bson.py3compat import text_type
from bson.son import SON
from pymongo import CursorType, monitoring
from pymongo import CursorType, monitoring, InsertOne, UpdateOne, DeleteOne
from pymongo.command_cursor import CommandCursor
from pymongo.errors import NotMasterError, OperationFailure
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
from test import unittest, IntegrationTest, client_context, client_knobs
from test.utils import single_client
@ -32,16 +35,16 @@ from test.utils import single_client
class EventListener(monitoring.Subscriber):
def __init__(self):
self.results = {}
self.results = defaultdict(list)
def started(self, event):
self.results['started'] = event
self.results['started'].append(event)
def succeeded(self, event):
self.results['succeeded'] = event
self.results['succeeded'].append(event)
def failed(self, event):
self.results['failed'] = event
self.results['failed'].append(event)
class TestCommandMonitoring(IntegrationTest):
@ -58,14 +61,14 @@ class TestCommandMonitoring(IntegrationTest):
monitoring._SUBSCRIBERS = cls.saved_subscribers
def tearDown(self):
self.listener.results = {}
self.listener.results.clear()
def test_started_simple(self):
self.client.pymongo_test.command('ismaster')
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(
@ -79,9 +82,9 @@ class TestCommandMonitoring(IntegrationTest):
def test_succeeded_simple(self):
self.client.pymongo_test.command('ismaster')
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(
@ -98,9 +101,9 @@ class TestCommandMonitoring(IntegrationTest):
except OperationFailure:
pass
results = self.listener.results
started = results.get('started')
failed = results.get('failed')
self.assertIsNone(results.get('succeeded'))
started = results['started'][0]
failed = results['failed'][0]
self.assertEqual(0, len(results['succeeded']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(
@ -114,9 +117,9 @@ class TestCommandMonitoring(IntegrationTest):
def test_find_one(self):
self.client.pymongo_test.test.find_one()
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(
@ -124,7 +127,7 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(
SON([('find', 'test'),
('filter', {}),
('limit', -1),
('limit', 1),
('singleBatch', True)]),
started.command)
self.assertEqual('find', started.command_name)
@ -135,7 +138,7 @@ class TestCommandMonitoring(IntegrationTest):
def test_find_and_get_more(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
self.listener.results = {}
self.listener.results.clear()
cursor = self.client.pymongo_test.test.find(
projection={'_id': False},
batch_size=4)
@ -143,9 +146,9 @@ class TestCommandMonitoring(IntegrationTest):
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
@ -171,14 +174,14 @@ class TestCommandMonitoring(IntegrationTest):
'ok': 1}
self.assertEqual(expected_result, succeeded.reply)
self.listener.results = {}
self.listener.results.clear()
# Next batch. Exhausting the cursor could cause a getMore
# that returns id of 0 and no results.
next(cursor)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
@ -204,20 +207,24 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(expected_result, succeeded.reply)
def test_find_with_explain(self):
cmd = SON([('explain', SON([('find', 'test'),
('filter', {})]))])
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_one({})
self.listener.results = {}
res = self.client.pymongo_test.test.find().explain()
self.listener.results.clear()
coll = self.client.pymongo_test.test
# Test that we publish the unwrapped command.
if self.client.is_mongos and client_context.version.at_least(2, 4, 0):
coll = coll.with_options(
read_preference=ReadPreference.PRIMARY_PREFERRED)
res = coll.find().explain()
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
SON([('explain', SON([('find', 'test'),
('filter', {})]))]),
started.command)
self.assertEqual(cmd, started.command)
self.assertEqual('explain', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
@ -230,21 +237,77 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(self.client.address, succeeded.connection_id)
self.assertEqual(res, succeeded.reply)
def test_find_options(self):
cmd = SON([('find', 'test'),
('filter', {}),
('comment', 'this is a test'),
('sort', SON([('_id', 1)])),
('projection', {'x': False}),
('skip', 1),
('limit', 2),
('batchSize', 2),
('noCursorTimeout', True),
('allowPartialResults', True)])
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{'x': i} for i in range(5)])
self.listener.results.clear()
coll = self.client.pymongo_test.test
# Test that we publish the unwrapped command.
if self.client.is_mongos and client_context.version.at_least(2, 4, 0):
coll = coll.with_options(
read_preference=ReadPreference.PRIMARY_PREFERRED)
cursor = coll.find(
filter={},
projection={'x': False},
skip=1,
limit=2,
no_cursor_timeout=True,
sort=[('_id', 1)],
allow_partial_results=True,
modifiers=SON([('$comment', 'this is a test')]),
batch_size=2)
next(cursor)
try:
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(cmd, started.command)
self.assertEqual('find', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('find', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertEqual(self.client.address, succeeded.connection_id)
finally:
cursor.close()
@client_context.require_version_min(2, 6, 0)
def test_command_and_get_more(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many(
[{'x': 1} for _ in range(10)])
self.listener.results = {}
cursor = self.client.pymongo_test.test.aggregate(
self.listener.results.clear()
coll = self.client.pymongo_test.test
# Test that we publish the unwrapped command.
if self.client.is_mongos and client_context.version.at_least(2, 4, 0):
coll = coll.with_options(
read_preference=ReadPreference.PRIMARY_PREFERRED)
cursor = coll.aggregate(
[{'$project': {'_id': False, 'x': 1}}], batchSize=4)
for _ in range(4):
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
@ -267,12 +330,12 @@ class TestCommandMonitoring(IntegrationTest):
'firstBatch': [{'x': 1} for _ in range(4)]}
self.assertEqual(expected_cursor, succeeded.reply.get('cursor'))
self.listener.results = {}
self.listener.results.clear()
next(cursor)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
@ -307,9 +370,9 @@ class TestCommandMonitoring(IntegrationTest):
except Exception:
pass
results = self.listener.results
started = results.get('started')
self.assertIsNone(results.get('succeeded'))
failed = results.get('failed')
started = results['started'][0]
self.assertEqual(0, len(results['succeeded']))
failed = results['failed'][0]
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
@ -334,16 +397,16 @@ class TestCommandMonitoring(IntegrationTest):
client = single_client(*address)
# Clear authentication command results from the listener.
client.admin.command('ismaster')
self.listener.results = {}
self.listener.results.clear()
error = None
try:
client.pymongo_test.test.find_one_and_delete({})
except NotMasterError as exc:
error = exc.errors
results = self.listener.results
started = results.get('started')
failed = results.get('failed')
self.assertIsNone(results.get('succeeded'))
started = results['started'][0]
failed = results['failed'][0]
self.assertEqual(0, len(results['succeeded']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(
@ -359,7 +422,7 @@ class TestCommandMonitoring(IntegrationTest):
def test_exhaust(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
self.listener.results = {}
self.listener.results.clear()
cursor = self.client.pymongo_test.test.find(
projection={'_id': False},
batch_size=5,
@ -367,9 +430,9 @@ class TestCommandMonitoring(IntegrationTest):
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
@ -395,13 +458,13 @@ class TestCommandMonitoring(IntegrationTest):
'ok': 1}
self.assertEqual(expected_result, succeeded.reply)
self.listener.results = {}
self.listener.results.clear()
for _ in cursor:
pass
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
@ -433,13 +496,13 @@ class TestCommandMonitoring(IntegrationTest):
cursor = self.client.pymongo_test.test.find().batch_size(5)
next(cursor)
cursor_id = cursor.cursor_id
self.listener.results = {}
self.listener.results.clear()
cursor.close()
time.sleep(2)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
# There could be more than one cursor_id here depending on
@ -462,17 +525,18 @@ class TestCommandMonitoring(IntegrationTest):
def test_non_bulk_writes(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
# Implied write concern insert_one
res = coll.insert_one({'x': 1})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('documents', [{'_id': res.inserted_id, 'x': 1}]),
('ordered', True)])
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}])])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
@ -488,16 +552,18 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(1, reply.get('n'))
# Unacknowledged insert_one
self.listener.results.clear()
coll = coll.with_options(write_concern=WriteConcern(w=0))
res = coll.insert_one({'x': 1})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}]),
('ordered', True)])
('writeConcern', {'w': 0})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
@ -508,20 +574,20 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
# The reply document is supposed to be None.
self.assertIsNone(succeeded.reply)
self.assertEqual(succeeded.reply, {'ok': 1})
# Explicit write concern insert_one
self.listener.results.clear()
coll = coll.with_options(write_concern=WriteConcern(w=1))
res = coll.insert_one({'x': 1})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('documents', [{'_id': res.inserted_id, 'x': 1}]),
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -538,16 +604,17 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(1, reply.get('n'))
# delete_many
self.listener.results.clear()
res = coll.delete_many({'x': 1})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'x': 1}),
('limit', 0)])]),
('ordered', True),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -564,19 +631,20 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(3, reply.get('n'))
# replace_one
self.listener.results.clear()
oid = ObjectId()
res = coll.replace_one({'_id': oid}, {'_id': oid, 'x': 1}, upsert=True)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'_id': oid}),
('u', {'_id': oid, 'x': 1}),
('multi', False),
('upsert', True)])]),
('ordered', True),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -591,22 +659,22 @@ class TestCommandMonitoring(IntegrationTest):
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual(0, reply.get('nModified'))
self.assertEqual([{'index': 0, '_id': oid}], reply.get('upserted'))
# update_one
self.listener.results.clear()
res = coll.update_one({'x': 1}, {'$inc': {'x': 1}})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'x': 1}),
('u', {'$inc': {'x': 1}}),
('multi', False),
('upsert', False)])]),
('ordered', True),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -621,21 +689,21 @@ class TestCommandMonitoring(IntegrationTest):
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual(1, reply.get('nModified'))
# update_many
self.listener.results.clear()
res = coll.update_many({'x': 2}, {'$inc': {'x': 1}})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'x': 2}),
('u', {'$inc': {'x': 1}}),
('multi', True),
('upsert', False)])]),
('ordered', True),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -650,19 +718,19 @@ class TestCommandMonitoring(IntegrationTest):
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual(1, reply.get('nModified'))
# delete_one
self.listener.results.clear()
res = coll.delete_one({'x': 3})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'x': 3}),
('limit', 1)])]),
('ordered', True),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -683,17 +751,18 @@ class TestCommandMonitoring(IntegrationTest):
# write errors
coll.insert_one({'_id': 1})
try:
self.listener.results.clear()
coll.insert_one({'_id': 1})
except OperationFailure:
pass
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('documents', [{'_id': 1}]),
('ordered', True),
('documents', [{'_id': 1}]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -721,17 +790,18 @@ class TestCommandMonitoring(IntegrationTest):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
# Implied write concern insert
_id = coll.insert({'x': 1})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('documents', [{'_id': _id, 'x': 1}]),
('ordered', True)])
('ordered', True),
('documents', [{'_id': _id, 'x': 1}])])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
@ -747,15 +817,17 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(1, reply.get('n'))
# Unacknowledged insert
self.listener.results.clear()
_id = coll.insert({'x': 1}, w=0)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': _id, 'x': 1}]),
('ordered', True)])
('writeConcern', {'w': 0})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
@ -766,19 +838,19 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
# The reply document is supposed to be None.
self.assertIsNone(succeeded.reply)
self.assertEqual(succeeded.reply, {'ok': 1})
# Explicit write concern insert
self.listener.results.clear()
_id = coll.insert({'x': 1}, w=1)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('documents', [{'_id': _id, 'x': 1}]),
('ordered', True),
('documents', [{'_id': _id, 'x': 1}]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -795,16 +867,17 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(1, reply.get('n'))
# remove all
self.listener.results.clear()
coll.remove({'x': 1}, w=1)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'x': 1}),
('limit', 0)])]),
('ordered', True),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -821,19 +894,20 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(3, reply.get('n'))
# upsert
self.listener.results.clear()
oid = ObjectId()
coll.update({'_id': oid}, {'_id': oid, 'x': 1}, upsert=True, w=1)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'_id': oid}),
('u', {'_id': oid, 'x': 1}),
('multi', False),
('upsert', True)])]),
('ordered', True),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
@ -848,22 +922,22 @@ class TestCommandMonitoring(IntegrationTest):
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual(0, reply.get('nModified'))
self.assertEqual([{'index': 0, '_id': oid}], reply.get('upserted'))
# update one
self.listener.results.clear()
coll.update({'x': 1}, {'$inc': {'x': 1}})
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'x': 1}),
('u', {'$inc': {'x': 1}}),
('multi', False),
('upsert', False)])]),
('ordered', True)])
('upsert', False)])])])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
@ -877,21 +951,21 @@ class TestCommandMonitoring(IntegrationTest):
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual(1, reply.get('nModified'))
# update many
self.listener.results.clear()
coll.update({'x': 2}, {'$inc': {'x': 1}}, multi=True)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'x': 2}),
('u', {'$inc': {'x': 1}}),
('multi', True),
('upsert', False)])]),
('ordered', True)])
('upsert', False)])])])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
@ -905,19 +979,19 @@ class TestCommandMonitoring(IntegrationTest):
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual(1, reply.get('nModified'))
# remove one
self.listener.results.clear()
coll.remove({'x': 3}, multi=False)
results = self.listener.results
started = results.get('started')
succeeded = results.get('succeeded')
self.assertIsNone(results.get('failed'))
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'x': 3}),
('limit', 1)])]),
('ordered', True)])
('limit', 1)])])])
self.assertEqual(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
@ -934,6 +1008,179 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(0, coll.count())
def test_insert_many(self):
# This always uses the bulk API.
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
big = 'x' * (1024 * 1024 * 4)
docs = [{'_id': i, 'big': big} for i in range(6)]
coll.insert_many(docs)
results = self.listener.results
started = results['started']
succeeded = results['succeeded']
self.assertEqual(0, len(results['failed']))
documents = []
count = 0
operation_id = started[0].operation_id
self.assertIsInstance(operation_id, int)
for start, succeed in zip(started, succeeded):
self.assertIsInstance(start, monitoring.CommandStartedEvent)
cmd = start.command
self.assertEqual(['insert', 'ordered', 'documents'],
list(cmd.keys()))
self.assertEqual(coll.name, cmd['insert'])
self.assertIs(True, cmd['ordered'])
documents.extend(cmd['documents'])
self.assertEqual('pymongo_test', start.database_name)
self.assertEqual('insert', start.command_name)
self.assertIsInstance(start.request_id, int)
self.assertEqual(self.client.address, start.connection_id)
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeed.duration_micros, int)
self.assertEqual(start.command_name, succeed.command_name)
self.assertEqual(start.request_id, succeed.request_id)
self.assertEqual(start.connection_id, succeed.connection_id)
self.assertEqual(start.operation_id, operation_id)
self.assertEqual(succeed.operation_id, operation_id)
reply = succeed.reply
self.assertEqual(1, reply.get('ok'))
count += reply.get('n', 0)
self.assertEqual(documents, docs)
self.assertEqual(6, count)
def test_legacy_insert_many(self):
# On legacy servers this uses bulk OP_INSERT.
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
# Force two batches on legacy servers.
big = 'x' * (1024 * 1024 * 12)
docs = [{'_id': i, 'big': big} for i in range(6)]
coll.insert(docs)
results = self.listener.results
started = results['started']
succeeded = results['succeeded']
self.assertEqual(0, len(results['failed']))
documents = []
count = 0
operation_id = started[0].operation_id
self.assertIsInstance(operation_id, int)
for start, succeed in zip(started, succeeded):
self.assertIsInstance(start, monitoring.CommandStartedEvent)
cmd = start.command
self.assertEqual(['insert', 'ordered', 'documents'],
list(cmd.keys()))
self.assertEqual(coll.name, cmd['insert'])
self.assertIs(True, cmd['ordered'])
documents.extend(cmd['documents'])
self.assertEqual('pymongo_test', start.database_name)
self.assertEqual('insert', start.command_name)
self.assertIsInstance(start.request_id, int)
self.assertEqual(self.client.address, start.connection_id)
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeed.duration_micros, int)
self.assertEqual(start.command_name, succeed.command_name)
self.assertEqual(start.request_id, succeed.request_id)
self.assertEqual(start.connection_id, succeed.connection_id)
self.assertEqual(start.operation_id, operation_id)
self.assertEqual(succeed.operation_id, operation_id)
reply = succeed.reply
self.assertEqual(1, reply.get('ok'))
count += reply.get('n', 0)
self.assertEqual(documents, docs)
self.assertEqual(6, count)
def test_bulk_write(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
coll.bulk_write([InsertOne({'_id': 1}),
UpdateOne({'_id': 1}, {'$set': {'x': 1}}),
DeleteOne({'_id': 1})])
results = self.listener.results
started = results['started']
succeeded = results['succeeded']
self.assertEqual(0, len(results['failed']))
operation_id = started[0].operation_id
pairs = list(zip(started, succeeded))
self.assertEqual(3, len(pairs))
for start, succeed in pairs:
self.assertIsInstance(start, monitoring.CommandStartedEvent)
self.assertEqual('pymongo_test', start.database_name)
self.assertIsInstance(start.request_id, int)
self.assertEqual(self.client.address, start.connection_id)
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeed.duration_micros, int)
self.assertEqual(start.command_name, succeed.command_name)
self.assertEqual(start.request_id, succeed.request_id)
self.assertEqual(start.connection_id, succeed.connection_id)
self.assertEqual(start.operation_id, operation_id)
self.assertEqual(succeed.operation_id, operation_id)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': 1}])])
self.assertEqual(expected, started[0].command)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'_id': 1}),
('u', {'$set': {'x': 1}}),
('multi', False),
('upsert', False)])])])
self.assertEqual(expected, started[1].command)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'_id': 1}),
('limit', 1)])])])
self.assertEqual(expected, started[2].command)
def test_write_errors(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
try:
coll.bulk_write([InsertOne({'_id': 1}),
InsertOne({'_id': 1}),
InsertOne({'_id': 1}),
DeleteOne({'_id': 1})],
ordered=False)
except OperationFailure:
pass
results = self.listener.results
started = results['started']
succeeded = results['succeeded']
self.assertEqual(0, len(results['failed']))
operation_id = started[0].operation_id
pairs = list(zip(started, succeeded))
errors = []
for start, succeed in pairs:
self.assertIsInstance(start, monitoring.CommandStartedEvent)
self.assertEqual('pymongo_test', start.database_name)
self.assertIsInstance(start.request_id, int)
self.assertEqual(self.client.address, start.connection_id)
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeed.duration_micros, int)
self.assertEqual(start.command_name, succeed.command_name)
self.assertEqual(start.request_id, succeed.request_id)
self.assertEqual(start.connection_id, succeed.connection_id)
self.assertEqual(start.operation_id, operation_id)
self.assertEqual(succeed.operation_id, operation_id)
if 'writeErrors' in succeed.reply:
errors.extend(succeed.reply['writeErrors'])
self.assertEqual(2, len(errors))
fields = set(['index', 'code', 'errmsg'])
for error in errors:
self.assertEqual(fields, set(error))
if __name__ == "__main__":
unittest.main()