PYTHON-2763 Remove outdated check_keys and $clusterTime logic (#817)
This commit is contained in:
parent
c94a3ad1df
commit
797197e73b
@ -67,7 +67,6 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
struct module_state *state = GETSTATE(self);
|
||||
|
||||
int request_id = rand();
|
||||
PyObject* cluster_time = NULL;
|
||||
unsigned int flags;
|
||||
char* collection_name = NULL;
|
||||
Py_ssize_t collection_name_length;
|
||||
@ -79,18 +78,16 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
codec_options_t options;
|
||||
buffer_t buffer = NULL;
|
||||
int length_location, message_length;
|
||||
unsigned char check_keys = 0;
|
||||
PyObject* result = NULL;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "Iet#iiOOO&|b",
|
||||
if (!PyArg_ParseTuple(args, "Iet#iiOOO&",
|
||||
&flags,
|
||||
"utf-8",
|
||||
&collection_name,
|
||||
&collection_name_length,
|
||||
&num_to_skip, &num_to_return,
|
||||
&query, &field_selector,
|
||||
convert_codec_options, &options,
|
||||
&check_keys)) {
|
||||
convert_codec_options, &options)) {
|
||||
return NULL;
|
||||
}
|
||||
buffer = buffer_new();
|
||||
@ -104,29 +101,6 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* Pop $clusterTime from dict and write it at the end, avoiding an error
|
||||
* from the $-prefix and check_keys.
|
||||
*
|
||||
* If "dict" is a defaultdict we don't want to call PyMapping_GetItemString
|
||||
* on it. That would **create** an _id where one didn't previously exist
|
||||
* (PYTHON-871).
|
||||
*/
|
||||
if (PyDict_Check(query)) {
|
||||
cluster_time = PyDict_GetItemString(query, "$clusterTime");
|
||||
if (cluster_time) {
|
||||
/* PyDict_GetItemString returns a borrowed reference. */
|
||||
Py_INCREF(cluster_time);
|
||||
if (-1 == PyMapping_DelItemString(query, "$clusterTime")) {
|
||||
goto fail;
|
||||
}
|
||||
}
|
||||
} else if (PyMapping_HasKeyString(query, "$clusterTime")) {
|
||||
cluster_time = PyMapping_GetItemString(query, "$clusterTime");
|
||||
if (!cluster_time
|
||||
|| -1 == PyMapping_DelItemString(query, "$clusterTime")) {
|
||||
goto fail;
|
||||
}
|
||||
}
|
||||
if (!buffer_write_int32(buffer, (int32_t)request_id) ||
|
||||
!buffer_write_bytes(buffer, "\x00\x00\x00\x00\xd4\x07\x00\x00", 8) ||
|
||||
!buffer_write_int32(buffer, (int32_t)flags) ||
|
||||
@ -138,37 +112,10 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
}
|
||||
|
||||
begin = buffer_get_position(buffer);
|
||||
if (!write_dict(state->_cbson, buffer, query, check_keys, &options, 1)) {
|
||||
if (!write_dict(state->_cbson, buffer, query, 0, &options, 1)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* back up a byte and write $clusterTime */
|
||||
if (cluster_time) {
|
||||
int length;
|
||||
char zero = 0;
|
||||
|
||||
buffer_update_position(buffer, buffer_get_position(buffer) - 1);
|
||||
if (!write_pair(state->_cbson, buffer, "$clusterTime", 12, cluster_time,
|
||||
0, &options, 1)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (!buffer_write_bytes(buffer, &zero, 1)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
length = buffer_get_position(buffer) - begin;
|
||||
buffer_write_int32_at_position(buffer, begin, (int32_t)length);
|
||||
|
||||
/* undo popping $clusterTime */
|
||||
if (-1 == PyMapping_SetItemString(
|
||||
query, "$clusterTime", cluster_time)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
Py_CLEAR(cluster_time);
|
||||
}
|
||||
|
||||
max_size = buffer_get_position(buffer) - begin;
|
||||
|
||||
if (field_selector != Py_None) {
|
||||
@ -196,7 +143,6 @@ fail:
|
||||
if (buffer) {
|
||||
buffer_free(buffer);
|
||||
}
|
||||
Py_XDECREF(cluster_time);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -274,7 +220,6 @@ static PyObject* _cbson_op_msg(PyObject* self, PyObject* args) {
|
||||
Py_ssize_t identifier_length = 0;
|
||||
PyObject* docs;
|
||||
PyObject* doc;
|
||||
unsigned char check_keys = 0;
|
||||
codec_options_t options;
|
||||
buffer_t buffer = NULL;
|
||||
int length_location, message_length;
|
||||
@ -283,15 +228,14 @@ static PyObject* _cbson_op_msg(PyObject* self, PyObject* args) {
|
||||
PyObject* result = NULL;
|
||||
PyObject* iterator = NULL;
|
||||
|
||||
/*flags, command, identifier, docs, check_keys, opts*/
|
||||
if (!PyArg_ParseTuple(args, "IOet#ObO&",
|
||||
/*flags, command, identifier, docs, opts*/
|
||||
if (!PyArg_ParseTuple(args, "IOet#OO&",
|
||||
&flags,
|
||||
&command,
|
||||
"utf-8",
|
||||
&identifier,
|
||||
&identifier_length,
|
||||
&docs,
|
||||
&check_keys,
|
||||
convert_codec_options, &options)) {
|
||||
return NULL;
|
||||
}
|
||||
@ -340,8 +284,7 @@ static PyObject* _cbson_op_msg(PyObject* self, PyObject* args) {
|
||||
}
|
||||
while ((doc = PyIter_Next(iterator)) != NULL) {
|
||||
int encoded_doc_size = write_dict(
|
||||
state->_cbson, buffer, doc, check_keys,
|
||||
&options, 1);
|
||||
state->_cbson, buffer, doc, 0, &options, 1);
|
||||
if (!encoded_doc_size) {
|
||||
Py_CLEAR(doc);
|
||||
goto fail;
|
||||
@ -400,7 +343,7 @@ _set_document_too_large(int size, long max) {
|
||||
|
||||
static int
|
||||
_batched_op_msg(
|
||||
unsigned char op, unsigned char check_keys, unsigned char ack,
|
||||
unsigned char op, unsigned char ack,
|
||||
PyObject* command, PyObject* docs, PyObject* ctx,
|
||||
PyObject* to_publish, codec_options_t options,
|
||||
buffer_t buffer, struct module_state *state) {
|
||||
@ -471,16 +414,12 @@ _batched_op_msg(
|
||||
}
|
||||
case _UPDATE:
|
||||
{
|
||||
/* MongoDB does key validation for update. */
|
||||
check_keys = 0;
|
||||
if (!buffer_write_bytes(buffer, "updates\x00", 8))
|
||||
goto fail;
|
||||
break;
|
||||
}
|
||||
case _DELETE:
|
||||
{
|
||||
/* Never check keys in a delete command. */
|
||||
check_keys = 0;
|
||||
if (!buffer_write_bytes(buffer, "deletes\x00", 8))
|
||||
goto fail;
|
||||
break;
|
||||
@ -510,8 +449,7 @@ _batched_op_msg(
|
||||
int cur_size;
|
||||
int doc_too_large = 0;
|
||||
int unacked_doc_too_large = 0;
|
||||
if (!write_dict(state->_cbson, buffer, doc, check_keys,
|
||||
&options, 1)) {
|
||||
if (!write_dict(state->_cbson, buffer, doc, 0, &options, 1)) {
|
||||
goto fail;
|
||||
}
|
||||
cur_size = buffer_get_position(buffer) - cur_doc_begin;
|
||||
@ -584,7 +522,6 @@ fail:
|
||||
static PyObject*
|
||||
_cbson_encode_batched_op_msg(PyObject* self, PyObject* args) {
|
||||
unsigned char op;
|
||||
unsigned char check_keys;
|
||||
unsigned char ack;
|
||||
PyObject* command;
|
||||
PyObject* docs;
|
||||
@ -595,8 +532,8 @@ _cbson_encode_batched_op_msg(PyObject* self, PyObject* args) {
|
||||
buffer_t buffer;
|
||||
struct module_state *state = GETSTATE(self);
|
||||
|
||||
if (!PyArg_ParseTuple(args, "bOObbO&O",
|
||||
&op, &command, &docs, &check_keys, &ack,
|
||||
if (!PyArg_ParseTuple(args, "bOObO&O",
|
||||
&op, &command, &docs, &ack,
|
||||
convert_codec_options, &options,
|
||||
&ctx)) {
|
||||
return NULL;
|
||||
@ -611,7 +548,6 @@ _cbson_encode_batched_op_msg(PyObject* self, PyObject* args) {
|
||||
|
||||
if (!_batched_op_msg(
|
||||
op,
|
||||
check_keys,
|
||||
ack,
|
||||
command,
|
||||
docs,
|
||||
@ -637,7 +573,6 @@ fail:
|
||||
static PyObject*
|
||||
_cbson_batched_op_msg(PyObject* self, PyObject* args) {
|
||||
unsigned char op;
|
||||
unsigned char check_keys;
|
||||
unsigned char ack;
|
||||
int request_id;
|
||||
int position;
|
||||
@ -650,8 +585,8 @@ _cbson_batched_op_msg(PyObject* self, PyObject* args) {
|
||||
buffer_t buffer;
|
||||
struct module_state *state = GETSTATE(self);
|
||||
|
||||
if (!PyArg_ParseTuple(args, "bOObbO&O",
|
||||
&op, &command, &docs, &check_keys, &ack,
|
||||
if (!PyArg_ParseTuple(args, "bOObO&O",
|
||||
&op, &command, &docs, &ack,
|
||||
convert_codec_options, &options,
|
||||
&ctx)) {
|
||||
return NULL;
|
||||
@ -676,7 +611,6 @@ _cbson_batched_op_msg(PyObject* self, PyObject* args) {
|
||||
|
||||
if (!_batched_op_msg(
|
||||
op,
|
||||
check_keys,
|
||||
ack,
|
||||
command,
|
||||
docs,
|
||||
@ -707,7 +641,7 @@ fail:
|
||||
|
||||
static int
|
||||
_batched_write_command(
|
||||
char* ns, Py_ssize_t ns_len, unsigned char op, int check_keys,
|
||||
char* ns, Py_ssize_t ns_len, unsigned char op,
|
||||
PyObject* command, PyObject* docs, PyObject* ctx,
|
||||
PyObject* to_publish, codec_options_t options,
|
||||
buffer_t buffer, struct module_state *state) {
|
||||
@ -786,16 +720,12 @@ _batched_write_command(
|
||||
}
|
||||
case _UPDATE:
|
||||
{
|
||||
/* MongoDB does key validation for update. */
|
||||
check_keys = 0;
|
||||
if (!buffer_write_bytes(buffer, "updates\x00", 8))
|
||||
goto fail;
|
||||
break;
|
||||
}
|
||||
case _DELETE:
|
||||
{
|
||||
/* Never check keys in a delete command. */
|
||||
check_keys = 0;
|
||||
if (!buffer_write_bytes(buffer, "deletes\x00", 8))
|
||||
goto fail;
|
||||
break;
|
||||
@ -838,8 +768,7 @@ _batched_write_command(
|
||||
goto fail;
|
||||
}
|
||||
cur_doc_begin = buffer_get_position(buffer);
|
||||
if (!write_dict(state->_cbson, buffer, doc,
|
||||
check_keys, &options, 1)) {
|
||||
if (!write_dict(state->_cbson, buffer, doc, 0, &options, 1)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
@ -915,7 +844,6 @@ static PyObject*
|
||||
_cbson_encode_batched_write_command(PyObject* self, PyObject* args) {
|
||||
char *ns = NULL;
|
||||
unsigned char op;
|
||||
unsigned char check_keys;
|
||||
Py_ssize_t ns_len;
|
||||
PyObject* command;
|
||||
PyObject* docs;
|
||||
@ -926,8 +854,8 @@ _cbson_encode_batched_write_command(PyObject* self, PyObject* args) {
|
||||
buffer_t buffer;
|
||||
struct module_state *state = GETSTATE(self);
|
||||
|
||||
if (!PyArg_ParseTuple(args, "et#bOObO&O", "utf-8",
|
||||
&ns, &ns_len, &op, &command, &docs, &check_keys,
|
||||
if (!PyArg_ParseTuple(args, "et#bOOO&O", "utf-8",
|
||||
&ns, &ns_len, &op, &command, &docs,
|
||||
convert_codec_options, &options,
|
||||
&ctx)) {
|
||||
return NULL;
|
||||
@ -945,7 +873,6 @@ _cbson_encode_batched_write_command(PyObject* self, PyObject* args) {
|
||||
ns,
|
||||
ns_len,
|
||||
op,
|
||||
check_keys,
|
||||
command,
|
||||
docs,
|
||||
ctx,
|
||||
|
||||
@ -441,9 +441,7 @@ class Collection(common.BaseObject):
|
||||
return BulkWriteResult({}, False)
|
||||
|
||||
def _insert_one(
|
||||
self, doc, ordered,
|
||||
check_keys, write_concern, op_id, bypass_doc_val,
|
||||
session):
|
||||
self, doc, ordered, write_concern, op_id, bypass_doc_val, session):
|
||||
"""Internal helper for inserting a single document."""
|
||||
write_concern = write_concern or self.write_concern
|
||||
acknowledged = write_concern.acknowledged
|
||||
@ -462,7 +460,6 @@ class Collection(common.BaseObject):
|
||||
command,
|
||||
write_concern=write_concern,
|
||||
codec_options=self.__write_response_codec_options,
|
||||
check_keys=check_keys,
|
||||
session=session,
|
||||
client=self.__database.client,
|
||||
retryable_write=retryable_write)
|
||||
@ -520,7 +517,7 @@ class Collection(common.BaseObject):
|
||||
write_concern = self._write_concern_for(session)
|
||||
return InsertOneResult(
|
||||
self._insert_one(
|
||||
document, ordered=True, check_keys=False,
|
||||
document, ordered=True,
|
||||
write_concern=write_concern, op_id=None,
|
||||
bypass_doc_val=bypass_document_validation, session=session),
|
||||
write_concern.acknowledged)
|
||||
@ -588,8 +585,7 @@ class Collection(common.BaseObject):
|
||||
return InsertManyResult(inserted_ids, write_concern.acknowledged)
|
||||
|
||||
def _update(self, sock_info, criteria, document, upsert=False,
|
||||
check_keys=False, multi=False,
|
||||
write_concern=None, op_id=None, ordered=True,
|
||||
multi=False, write_concern=None, op_id=None, ordered=True,
|
||||
bypass_doc_val=False, collation=None, array_filters=None,
|
||||
hint=None, session=None, retryable_write=False, let=None):
|
||||
"""Internal update / replace helper."""
|
||||
@ -660,16 +656,14 @@ class Collection(common.BaseObject):
|
||||
return result
|
||||
|
||||
def _update_retryable(
|
||||
self, criteria, document, upsert=False,
|
||||
check_keys=False, multi=False,
|
||||
self, criteria, document, upsert=False, multi=False,
|
||||
write_concern=None, op_id=None, ordered=True,
|
||||
bypass_doc_val=False, collation=None, array_filters=None,
|
||||
hint=None, session=None, let=None):
|
||||
"""Internal update / replace helper."""
|
||||
def _update(session, sock_info, retryable_write):
|
||||
return self._update(
|
||||
sock_info, criteria, document, upsert=upsert,
|
||||
check_keys=check_keys, multi=multi,
|
||||
sock_info, criteria, document, upsert=upsert, multi=multi,
|
||||
write_concern=write_concern, op_id=op_id, ordered=ordered,
|
||||
bypass_doc_val=bypass_doc_val, collation=collation,
|
||||
array_filters=array_filters, hint=hint, session=session,
|
||||
@ -830,7 +824,7 @@ class Collection(common.BaseObject):
|
||||
write_concern = self._write_concern_for(session)
|
||||
return UpdateResult(
|
||||
self._update_retryable(
|
||||
filter, update, upsert, check_keys=False,
|
||||
filter, update, upsert,
|
||||
write_concern=write_concern,
|
||||
bypass_doc_val=bypass_document_validation,
|
||||
collation=collation, array_filters=array_filters,
|
||||
@ -910,7 +904,7 @@ class Collection(common.BaseObject):
|
||||
write_concern = self._write_concern_for(session)
|
||||
return UpdateResult(
|
||||
self._update_retryable(
|
||||
filter, update, upsert, check_keys=False, multi=True,
|
||||
filter, update, upsert, multi=True,
|
||||
write_concern=write_concern,
|
||||
bypass_doc_val=bypass_document_validation,
|
||||
collation=collation, array_filters=array_filters,
|
||||
|
||||
@ -301,30 +301,24 @@ class _Encrypter(object):
|
||||
opts._kms_providers, schema_map))
|
||||
self._closed = False
|
||||
|
||||
def encrypt(self, database, cmd, check_keys, codec_options):
|
||||
def encrypt(self, database, cmd, codec_options):
|
||||
"""Encrypt a MongoDB command.
|
||||
|
||||
:Parameters:
|
||||
- `database`: The database for this command.
|
||||
- `cmd`: A command document.
|
||||
- `check_keys`: If True, check `cmd` for invalid keys.
|
||||
- `codec_options`: The CodecOptions to use while encoding `cmd`.
|
||||
|
||||
:Returns:
|
||||
The encrypted command to execute.
|
||||
"""
|
||||
self._check_closed()
|
||||
# Workaround for $clusterTime which is incompatible with
|
||||
# check_keys.
|
||||
cluster_time = check_keys and cmd.pop('$clusterTime', None)
|
||||
encoded_cmd = _dict_to_bson(cmd, check_keys, codec_options)
|
||||
encoded_cmd = _dict_to_bson(cmd, False, codec_options)
|
||||
with _wrap_encryption_errors():
|
||||
encrypted_cmd = self._auto_encrypter.encrypt(database, encoded_cmd)
|
||||
# TODO: PYTHON-1922 avoid decoding the encrypted_cmd.
|
||||
encrypt_cmd = _inflate_bson(
|
||||
encrypted_cmd, DEFAULT_RAW_BSON_OPTIONS)
|
||||
if cluster_time:
|
||||
encrypt_cmd['$clusterTime'] = cluster_time
|
||||
return encrypt_cmd
|
||||
|
||||
def decrypt(self, response):
|
||||
|
||||
@ -331,7 +331,7 @@ class _Query(object):
|
||||
spec = self.as_command(sock_info)[0]
|
||||
request_id, msg, size, _ = _op_msg(
|
||||
0, spec, self.db, self.read_preference,
|
||||
set_secondary_ok, False, self.codec_options,
|
||||
set_secondary_ok, self.codec_options,
|
||||
ctx=sock_info.compression_context)
|
||||
return request_id, msg, size
|
||||
|
||||
@ -430,7 +430,7 @@ class _GetMore(object):
|
||||
flags = 0
|
||||
request_id, msg, size, _ = _op_msg(
|
||||
flags, spec, self.db, None,
|
||||
False, False, self.codec_options,
|
||||
False, self.codec_options,
|
||||
ctx=sock_info.compression_context)
|
||||
return request_id, msg, size
|
||||
|
||||
@ -526,7 +526,7 @@ _pack_op_msg_flags_type = struct.Struct("<IB").pack
|
||||
_pack_byte = struct.Struct("<B").pack
|
||||
|
||||
|
||||
def _op_msg_no_header(flags, command, identifier, docs, check_keys, opts):
|
||||
def _op_msg_no_header(flags, command, identifier, docs, opts):
|
||||
"""Get a OP_MSG message.
|
||||
|
||||
Note: this method handles multiple documents in a type one payload but
|
||||
@ -541,7 +541,7 @@ def _op_msg_no_header(flags, command, identifier, docs, check_keys, opts):
|
||||
if identifier:
|
||||
type_one = _pack_byte(1)
|
||||
cstring = _make_c_string(identifier)
|
||||
encoded_docs = [_dict_to_bson(doc, check_keys, opts) for doc in docs]
|
||||
encoded_docs = [_dict_to_bson(doc, False, opts) for doc in docs]
|
||||
size = len(cstring) + sum(len(doc) for doc in encoded_docs) + 4
|
||||
encoded_size = _pack_int(size)
|
||||
total_size += size
|
||||
@ -553,26 +553,26 @@ def _op_msg_no_header(flags, command, identifier, docs, check_keys, opts):
|
||||
return b''.join(data), total_size, max_doc_size
|
||||
|
||||
|
||||
def _op_msg_compressed(flags, command, identifier, docs, check_keys, opts,
|
||||
def _op_msg_compressed(flags, command, identifier, docs, opts,
|
||||
ctx):
|
||||
"""Internal OP_MSG message helper."""
|
||||
msg, total_size, max_bson_size = _op_msg_no_header(
|
||||
flags, command, identifier, docs, check_keys, opts)
|
||||
flags, command, identifier, docs, opts)
|
||||
rid, msg = _compress(2013, msg, ctx)
|
||||
return rid, msg, total_size, max_bson_size
|
||||
|
||||
|
||||
def _op_msg_uncompressed(flags, command, identifier, docs, check_keys, opts):
|
||||
def _op_msg_uncompressed(flags, command, identifier, docs, opts):
|
||||
"""Internal compressed OP_MSG message helper."""
|
||||
data, total_size, max_bson_size = _op_msg_no_header(
|
||||
flags, command, identifier, docs, check_keys, opts)
|
||||
flags, command, identifier, docs, opts)
|
||||
request_id, op_message = __pack_message(2013, data)
|
||||
return request_id, op_message, total_size, max_bson_size
|
||||
if _use_c:
|
||||
_op_msg_uncompressed = _cmessage._op_msg
|
||||
|
||||
|
||||
def _op_msg(flags, command, dbname, read_preference, secondary_ok, check_keys,
|
||||
def _op_msg(flags, command, dbname, read_preference, secondary_ok,
|
||||
opts, ctx=None):
|
||||
"""Get a OP_MSG message."""
|
||||
command['$db'] = dbname
|
||||
@ -593,9 +593,9 @@ def _op_msg(flags, command, dbname, read_preference, secondary_ok, check_keys,
|
||||
try:
|
||||
if ctx:
|
||||
return _op_msg_compressed(
|
||||
flags, command, identifier, docs, check_keys, opts, ctx)
|
||||
flags, command, identifier, docs, opts, ctx)
|
||||
return _op_msg_uncompressed(
|
||||
flags, command, identifier, docs, check_keys, opts)
|
||||
flags, command, identifier, docs, opts)
|
||||
finally:
|
||||
# Add the field back to the command.
|
||||
if identifier:
|
||||
@ -603,9 +603,9 @@ def _op_msg(flags, command, dbname, read_preference, secondary_ok, check_keys,
|
||||
|
||||
|
||||
def _query_impl(options, collection_name, num_to_skip, num_to_return,
|
||||
query, field_selector, opts, check_keys):
|
||||
query, field_selector, opts):
|
||||
"""Get an OP_QUERY message."""
|
||||
encoded = _dict_to_bson(query, check_keys, opts)
|
||||
encoded = _dict_to_bson(query, False, opts)
|
||||
if field_selector:
|
||||
efs = _dict_to_bson(field_selector, False, opts)
|
||||
else:
|
||||
@ -622,7 +622,7 @@ def _query_impl(options, collection_name, num_to_skip, num_to_return,
|
||||
|
||||
def _query_compressed(options, collection_name, num_to_skip,
|
||||
num_to_return, query, field_selector,
|
||||
opts, check_keys=False, ctx=None):
|
||||
opts, ctx=None):
|
||||
"""Internal compressed query message helper."""
|
||||
op_query, max_bson_size = _query_impl(
|
||||
options,
|
||||
@ -631,14 +631,13 @@ def _query_compressed(options, collection_name, num_to_skip,
|
||||
num_to_return,
|
||||
query,
|
||||
field_selector,
|
||||
opts,
|
||||
check_keys)
|
||||
opts)
|
||||
rid, msg = _compress(2004, op_query, ctx)
|
||||
return rid, msg, max_bson_size
|
||||
|
||||
|
||||
def _query_uncompressed(options, collection_name, num_to_skip, num_to_return,
|
||||
query, field_selector, opts, check_keys=False):
|
||||
query, field_selector, opts):
|
||||
"""Internal query message helper."""
|
||||
op_query, max_bson_size = _query_impl(
|
||||
options,
|
||||
@ -647,8 +646,7 @@ def _query_uncompressed(options, collection_name, num_to_skip, num_to_return,
|
||||
num_to_return,
|
||||
query,
|
||||
field_selector,
|
||||
opts,
|
||||
check_keys)
|
||||
opts)
|
||||
rid, msg = __pack_message(2004, op_query)
|
||||
return rid, msg, max_bson_size
|
||||
if _use_c:
|
||||
@ -656,15 +654,14 @@ if _use_c:
|
||||
|
||||
|
||||
def _query(options, collection_name, num_to_skip, num_to_return,
|
||||
query, field_selector, opts, check_keys=False, ctx=None):
|
||||
query, field_selector, opts, ctx=None):
|
||||
"""Get a **query** message."""
|
||||
if ctx:
|
||||
return _query_compressed(options, collection_name, num_to_skip,
|
||||
num_to_return, query, field_selector,
|
||||
opts, check_keys, ctx)
|
||||
opts, ctx)
|
||||
return _query_uncompressed(options, collection_name, num_to_skip,
|
||||
num_to_return, query, field_selector, opts,
|
||||
check_keys)
|
||||
num_to_return, query, field_selector, opts)
|
||||
|
||||
|
||||
_pack_long_long = struct.Struct("<q").pack
|
||||
@ -726,8 +723,7 @@ class _BulkWriteContext(object):
|
||||
def _batch_command(self, cmd, docs):
|
||||
namespace = self.db_name + '.$cmd'
|
||||
request_id, msg, to_send = _do_batched_op_msg(
|
||||
namespace, self.op_type, cmd, docs, self.check_keys,
|
||||
self.codec, self)
|
||||
namespace, self.op_type, cmd, docs, self.codec, self)
|
||||
if not to_send:
|
||||
raise InvalidOperation("cannot do an empty bulk write")
|
||||
return request_id, msg, to_send
|
||||
@ -748,11 +744,6 @@ class _BulkWriteContext(object):
|
||||
self.unack_write(cmd, request_id, msg, 0, to_send)
|
||||
return to_send
|
||||
|
||||
@property
|
||||
def check_keys(self):
|
||||
"""Should we check keys for this operation type?"""
|
||||
return False
|
||||
|
||||
@property
|
||||
def max_bson_size(self):
|
||||
"""A proxy for SockInfo.max_bson_size."""
|
||||
@ -871,8 +862,7 @@ class _EncryptedBulkWriteContext(_BulkWriteContext):
|
||||
def _batch_command(self, cmd, docs):
|
||||
namespace = self.db_name + '.$cmd'
|
||||
msg, to_send = _encode_batched_write_command(
|
||||
namespace, self.op_type, cmd, docs, self.check_keys,
|
||||
self.codec, self)
|
||||
namespace, self.op_type, cmd, docs, self.codec, self)
|
||||
if not to_send:
|
||||
raise InvalidOperation("cannot do an empty bulk write")
|
||||
|
||||
@ -926,7 +916,7 @@ _OP_MSG_MAP = {
|
||||
|
||||
|
||||
def _batched_op_msg_impl(
|
||||
operation, command, docs, check_keys, ack, opts, ctx, buf):
|
||||
operation, command, docs, ack, opts, ctx, buf):
|
||||
"""Create a batched OP_MSG write."""
|
||||
max_bson_size = ctx.max_bson_size
|
||||
max_write_batch_size = ctx.max_write_batch_size
|
||||
@ -950,14 +940,11 @@ def _batched_op_msg_impl(
|
||||
except KeyError:
|
||||
raise InvalidOperation('Unknown command')
|
||||
|
||||
if operation in (_UPDATE, _DELETE):
|
||||
check_keys = False
|
||||
|
||||
to_send = []
|
||||
idx = 0
|
||||
for doc in docs:
|
||||
# Encode the current operation
|
||||
value = _dict_to_bson(doc, check_keys, opts)
|
||||
value = _dict_to_bson(doc, False, opts)
|
||||
doc_length = len(value)
|
||||
new_message_size = buf.tell() + doc_length
|
||||
# Does first document exceed max_message_size?
|
||||
@ -991,26 +978,26 @@ def _batched_op_msg_impl(
|
||||
|
||||
|
||||
def _encode_batched_op_msg(
|
||||
operation, command, docs, check_keys, ack, opts, ctx):
|
||||
operation, command, docs, ack, opts, ctx):
|
||||
"""Encode the next batched insert, update, or delete operation
|
||||
as OP_MSG.
|
||||
"""
|
||||
buf = _BytesIO()
|
||||
|
||||
to_send, _ = _batched_op_msg_impl(
|
||||
operation, command, docs, check_keys, ack, opts, ctx, buf)
|
||||
operation, command, docs, ack, opts, ctx, buf)
|
||||
return buf.getvalue(), to_send
|
||||
if _use_c:
|
||||
_encode_batched_op_msg = _cmessage._encode_batched_op_msg
|
||||
|
||||
|
||||
def _batched_op_msg_compressed(
|
||||
operation, command, docs, check_keys, ack, opts, ctx):
|
||||
operation, command, docs, ack, opts, ctx):
|
||||
"""Create the next batched insert, update, or delete operation
|
||||
with OP_MSG, compressed.
|
||||
"""
|
||||
data, to_send = _encode_batched_op_msg(
|
||||
operation, command, docs, check_keys, ack, opts, ctx)
|
||||
operation, command, docs, ack, opts, ctx)
|
||||
|
||||
request_id, msg = _compress(
|
||||
2013,
|
||||
@ -1020,7 +1007,7 @@ def _batched_op_msg_compressed(
|
||||
|
||||
|
||||
def _batched_op_msg(
|
||||
operation, command, docs, check_keys, ack, opts, ctx):
|
||||
operation, command, docs, ack, opts, ctx):
|
||||
"""OP_MSG implementation entry point."""
|
||||
buf = _BytesIO()
|
||||
|
||||
@ -1030,7 +1017,7 @@ def _batched_op_msg(
|
||||
buf.write(b"\x00\x00\x00\x00\xdd\x07\x00\x00")
|
||||
|
||||
to_send, length = _batched_op_msg_impl(
|
||||
operation, command, docs, check_keys, ack, opts, ctx, buf)
|
||||
operation, command, docs, ack, opts, ctx, buf)
|
||||
|
||||
# Header - request id and message length
|
||||
buf.seek(4)
|
||||
@ -1045,7 +1032,7 @@ if _use_c:
|
||||
|
||||
|
||||
def _do_batched_op_msg(
|
||||
namespace, operation, command, docs, check_keys, opts, ctx):
|
||||
namespace, operation, command, docs, opts, ctx):
|
||||
"""Create the next batched insert, update, or delete operation
|
||||
using OP_MSG.
|
||||
"""
|
||||
@ -1056,29 +1043,29 @@ def _do_batched_op_msg(
|
||||
ack = True
|
||||
if ctx.sock_info.compression_context:
|
||||
return _batched_op_msg_compressed(
|
||||
operation, command, docs, check_keys, ack, opts, ctx)
|
||||
operation, command, docs, ack, opts, ctx)
|
||||
return _batched_op_msg(
|
||||
operation, command, docs, check_keys, ack, opts, ctx)
|
||||
operation, command, docs, ack, opts, ctx)
|
||||
|
||||
|
||||
# End OP_MSG -----------------------------------------------------
|
||||
|
||||
|
||||
def _encode_batched_write_command(
|
||||
namespace, operation, command, docs, check_keys, opts, ctx):
|
||||
namespace, operation, command, docs, opts, ctx):
|
||||
"""Encode the next batched insert, update, or delete command.
|
||||
"""
|
||||
buf = _BytesIO()
|
||||
|
||||
to_send, _ = _batched_write_command_impl(
|
||||
namespace, operation, command, docs, check_keys, opts, ctx, buf)
|
||||
namespace, operation, command, docs, opts, ctx, buf)
|
||||
return buf.getvalue(), to_send
|
||||
if _use_c:
|
||||
_encode_batched_write_command = _cmessage._encode_batched_write_command
|
||||
|
||||
|
||||
def _batched_write_command_impl(
|
||||
namespace, operation, command, docs, check_keys, opts, ctx, buf):
|
||||
namespace, operation, command, docs, opts, ctx, buf):
|
||||
"""Create a batched OP_QUERY write command."""
|
||||
max_bson_size = ctx.max_bson_size
|
||||
max_write_batch_size = ctx.max_write_batch_size
|
||||
@ -1108,9 +1095,6 @@ def _batched_write_command_impl(
|
||||
except KeyError:
|
||||
raise InvalidOperation('Unknown command')
|
||||
|
||||
if operation in (_UPDATE, _DELETE):
|
||||
check_keys = False
|
||||
|
||||
# Where to write list document length
|
||||
list_start = buf.tell() - 4
|
||||
to_send = []
|
||||
@ -1118,7 +1102,7 @@ def _batched_write_command_impl(
|
||||
for doc in docs:
|
||||
# Encode the current operation
|
||||
key = str(idx).encode('utf8')
|
||||
value = encode(doc, check_keys, opts)
|
||||
value = _dict_to_bson(doc, False, opts)
|
||||
# Is there enough room to add this document? max_cmd_size accounts for
|
||||
# the two trailing null bytes.
|
||||
doc_too_large = len(value) > max_cmd_size
|
||||
|
||||
@ -41,7 +41,7 @@ _UNPACK_HEADER = struct.Struct("<iiii").unpack
|
||||
def command(sock_info, dbname, spec, secondary_ok, is_mongos,
|
||||
read_preference, codec_options, session, client, check=True,
|
||||
allowable_errors=None, address=None,
|
||||
check_keys=False, listeners=None, max_bson_size=None,
|
||||
listeners=None, max_bson_size=None,
|
||||
read_concern=None,
|
||||
parse_write_concern_error=False,
|
||||
collation=None,
|
||||
@ -65,7 +65,6 @@ def command(sock_info, dbname, spec, secondary_ok, is_mongos,
|
||||
- `check`: raise OperationFailure if there are errors
|
||||
- `allowable_errors`: errors to ignore if `check` is True
|
||||
- `address`: the (host, port) of `sock`
|
||||
- `check_keys`: if True, check `spec` for invalid keys
|
||||
- `listeners`: An instance of :class:`~pymongo.monitoring.EventListeners`
|
||||
- `max_bson_size`: The maximum encoded bson size for this server
|
||||
- `read_concern`: The read concern for this command.
|
||||
@ -107,16 +106,13 @@ def command(sock_info, dbname, spec, secondary_ok, is_mongos,
|
||||
|
||||
if (client and client._encrypter and
|
||||
not client._encrypter._bypass_auto_encryption):
|
||||
spec = orig = client._encrypter.encrypt(
|
||||
dbname, spec, check_keys, codec_options)
|
||||
# We already checked the keys, no need to do it again.
|
||||
check_keys = False
|
||||
spec = orig = client._encrypter.encrypt(dbname, spec, codec_options)
|
||||
|
||||
if use_op_msg:
|
||||
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
|
||||
flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0
|
||||
request_id, msg, size, max_doc_size = message._op_msg(
|
||||
flags, spec, dbname, read_preference, secondary_ok, check_keys,
|
||||
flags, spec, dbname, read_preference, secondary_ok,
|
||||
codec_options, ctx=compression_ctx)
|
||||
# If this is an unacknowledged write then make sure the encoded doc(s)
|
||||
# are small enough, otherwise rely on the server to return an error.
|
||||
@ -125,7 +121,7 @@ def command(sock_info, dbname, spec, secondary_ok, is_mongos,
|
||||
message._raise_document_too_large(name, size, max_bson_size)
|
||||
else:
|
||||
request_id, msg, size = message._query(
|
||||
flags, ns, 0, -1, spec, None, codec_options, check_keys,
|
||||
flags, ns, 0, -1, spec, None, codec_options,
|
||||
compression_ctx)
|
||||
|
||||
if (max_bson_size is not None
|
||||
|
||||
@ -644,7 +644,7 @@ class SocketInfo(object):
|
||||
def command(self, dbname, spec, secondary_ok=False,
|
||||
read_preference=ReadPreference.PRIMARY,
|
||||
codec_options=DEFAULT_CODEC_OPTIONS, check=True,
|
||||
allowable_errors=None, check_keys=False,
|
||||
allowable_errors=None,
|
||||
read_concern=None,
|
||||
write_concern=None,
|
||||
parse_write_concern_error=False,
|
||||
@ -665,7 +665,6 @@ class SocketInfo(object):
|
||||
- `codec_options`: a CodecOptions instance
|
||||
- `check`: raise OperationFailure if there are errors
|
||||
- `allowable_errors`: errors to ignore if `check` is True
|
||||
- `check_keys`: if True, check `spec` for invalid keys
|
||||
- `read_concern`: The read concern for this command.
|
||||
- `write_concern`: The write concern for this command.
|
||||
- `parse_write_concern_error`: Whether to parse the
|
||||
@ -707,7 +706,7 @@ class SocketInfo(object):
|
||||
return command(self, dbname, spec, secondary_ok,
|
||||
self.is_mongos, read_preference, codec_options,
|
||||
session, client, check, allowable_errors,
|
||||
self.address, check_keys, listeners,
|
||||
self.address, listeners,
|
||||
self.max_bson_size, read_concern,
|
||||
parse_write_concern_error=parse_write_concern_error,
|
||||
collation=collation,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user