PYTHON-2899 Remove code for MongoDB <= 3.4 (#729)

Remove unneeded memoryview to bytes conversion.
This commit is contained in:
Shane Harvey 2021-09-16 16:53:50 -07:00 committed by GitHub
parent 88e744d506
commit 11752ed594
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 224 additions and 1961 deletions

View File

@ -62,302 +62,6 @@ static int buffer_write_bytes_ssize_t(buffer_t buffer, const char* data, Py_ssiz
return buffer_write_bytes(buffer, data, downsize);
}
/* add a lastError message on the end of the buffer.
* returns 0 on failure */
static int add_last_error(PyObject* self, buffer_t buffer,
int request_id, char* ns, Py_ssize_t nslen,
codec_options_t* options, PyObject* args) {
struct module_state *state = GETSTATE(self);
int message_start;
int document_start;
int message_length;
int document_length;
PyObject* key = NULL;
PyObject* value = NULL;
Py_ssize_t pos = 0;
PyObject* one;
char *p = strchr(ns, '.');
/* Length of the database portion of ns. */
nslen = p ? (int)(p - ns) : nslen;
message_start = buffer_save_space(buffer, 4);
if (message_start == -1) {
return 0;
}
if (!buffer_write_int32(buffer, (int32_t)request_id) ||
!buffer_write_bytes(buffer,
"\x00\x00\x00\x00" /* responseTo */
"\xd4\x07\x00\x00" /* opcode */
"\x00\x00\x00\x00", /* options */
12) ||
!buffer_write_bytes_ssize_t(buffer, ns, nslen) || /* database */
!buffer_write_bytes(buffer,
".$cmd\x00" /* collection name */
"\x00\x00\x00\x00" /* skip */
"\xFF\xFF\xFF\xFF", /* limit (-1) */
14)) {
return 0;
}
/* save space for length */
document_start = buffer_save_space(buffer, 4);
if (document_start == -1) {
return 0;
}
/* getlasterror: 1 */
if (!(one = PyLong_FromLong(1)))
return 0;
if (!write_pair(state->_cbson, buffer, "getlasterror", 12, one, 0,
options, 1)) {
Py_DECREF(one);
return 0;
}
Py_DECREF(one);
/* getlasterror options */
while (PyDict_Next(args, &pos, &key, &value)) {
if (!decode_and_write_pair(state->_cbson, buffer, key, value, 0,
options, 0)) {
return 0;
}
}
/* EOD */
if (!buffer_write_bytes(buffer, "\x00", 1)) {
return 0;
}
message_length = buffer_get_position(buffer) - message_start;
document_length = buffer_get_position(buffer) - document_start;
buffer_write_int32_at_position(
buffer, message_start, (int32_t)message_length);
buffer_write_int32_at_position(
buffer, document_start, (int32_t)document_length);
return 1;
}
static int init_insert_buffer(buffer_t buffer, int request_id, int options,
const char* coll_name, Py_ssize_t coll_name_len,
int compress) {
int length_location = 0;
if (!compress) {
/* Save space for message length */
int length_location = buffer_save_space(buffer, 4);
if (length_location == -1) {
return length_location;
}
if (!buffer_write_int32(buffer, (int32_t)request_id) ||
!buffer_write_bytes(buffer,
"\x00\x00\x00\x00"
"\xd2\x07\x00\x00",
8)) {
return -1;
}
}
if (!buffer_write_int32(buffer, (int32_t)options) ||
!buffer_write_bytes_ssize_t(buffer,
coll_name,
coll_name_len + 1)) {
return -1;
}
return length_location;
}
static PyObject* _cbson_insert_message(PyObject* self, PyObject* args) {
/* Used by the Bulk API to insert into pre-2.6 servers. Collection.insert
* uses _cbson_do_batched_insert. */
struct module_state *state = GETSTATE(self);
/* NOTE just using a random number as the request_id */
int request_id = rand();
char* collection_name = NULL;
Py_ssize_t collection_name_length;
PyObject* docs;
PyObject* doc;
PyObject* iterator;
int before, cur_size, max_size = 0;
int flags = 0;
unsigned char check_keys;
unsigned char continue_on_error;
codec_options_t options;
buffer_t buffer = NULL;
int length_location, message_length;
PyObject* result = NULL;
if (!PyArg_ParseTuple(args, "et#ObbO&",
"utf-8",
&collection_name,
&collection_name_length,
&docs, &check_keys,
&continue_on_error,
convert_codec_options, &options)) {
return NULL;
}
if (continue_on_error) {
flags += 1;
}
buffer = buffer_new();
if (!buffer) {
goto fail;
}
length_location = init_insert_buffer(buffer,
request_id,
flags,
collection_name,
collection_name_length,
0);
if (length_location == -1) {
goto fail;
}
iterator = PyObject_GetIter(docs);
if (iterator == NULL) {
PyObject* InvalidOperation = _error("InvalidOperation");
if (InvalidOperation) {
PyErr_SetString(InvalidOperation, "input is not iterable");
Py_DECREF(InvalidOperation);
}
goto fail;
}
while ((doc = PyIter_Next(iterator)) != NULL) {
before = buffer_get_position(buffer);
if (!write_dict(state->_cbson, buffer, doc, check_keys,
&options, 1)) {
Py_DECREF(doc);
Py_DECREF(iterator);
goto fail;
}
Py_DECREF(doc);
cur_size = buffer_get_position(buffer) - before;
max_size = (cur_size > max_size) ? cur_size : max_size;
}
Py_DECREF(iterator);
if (PyErr_Occurred()) {
goto fail;
}
if (!max_size) {
PyObject* InvalidOperation = _error("InvalidOperation");
if (InvalidOperation) {
PyErr_SetString(InvalidOperation, "cannot do an empty bulk insert");
Py_DECREF(InvalidOperation);
}
goto fail;
}
message_length = buffer_get_position(buffer) - length_location;
buffer_write_int32_at_position(
buffer, length_location, (int32_t)message_length);
/* objectify buffer */
result = Py_BuildValue("iy#i", request_id,
buffer_get_buffer(buffer),
(Py_ssize_t)buffer_get_position(buffer),
max_size);
fail:
PyMem_Free(collection_name);
destroy_codec_options(&options);
if (buffer) {
buffer_free(buffer);
}
return result;
}
static PyObject* _cbson_update_message(PyObject* self, PyObject* args) {
/* NOTE just using a random number as the request_id */
struct module_state *state = GETSTATE(self);
int request_id = rand();
char* collection_name = NULL;
Py_ssize_t collection_name_length;
int before, cur_size, max_size = 0;
PyObject* doc;
PyObject* spec;
unsigned char multi;
unsigned char upsert;
unsigned char check_keys;
codec_options_t options;
int flags;
buffer_t buffer = NULL;
int length_location, message_length;
PyObject* result = NULL;
if (!PyArg_ParseTuple(args, "et#bbOObO&",
"utf-8",
&collection_name,
&collection_name_length,
&upsert, &multi, &spec, &doc, &check_keys,
convert_codec_options, &options)) {
return NULL;
}
flags = 0;
if (upsert) {
flags += 1;
}
if (multi) {
flags += 2;
}
buffer = buffer_new();
if (!buffer) {
goto fail;
}
// save space for message length
length_location = buffer_save_space(buffer, 4);
if (length_location == -1) {
goto fail;
}
if (!buffer_write_int32(buffer, (int32_t)request_id) ||
!buffer_write_bytes(buffer,
"\x00\x00\x00\x00"
"\xd1\x07\x00\x00"
"\x00\x00\x00\x00",
12) ||
!buffer_write_bytes_ssize_t(buffer,
collection_name,
collection_name_length + 1) ||
!buffer_write_int32(buffer, (int32_t)flags)) {
goto fail;
}
before = buffer_get_position(buffer);
if (!write_dict(state->_cbson, buffer, spec, 0, &options, 1)) {
goto fail;
}
max_size = buffer_get_position(buffer) - before;
before = buffer_get_position(buffer);
if (!write_dict(state->_cbson, buffer, doc, check_keys,
&options, 1)) {
goto fail;
}
cur_size = buffer_get_position(buffer) - before;
max_size = (cur_size > max_size) ? cur_size : max_size;
message_length = buffer_get_position(buffer) - length_location;
buffer_write_int32_at_position(
buffer, length_location, (int32_t)message_length);
/* objectify buffer */
result = Py_BuildValue("iy#i", request_id,
buffer_get_buffer(buffer),
(Py_ssize_t)buffer_get_position(buffer),
max_size);
fail:
PyMem_Free(collection_name);
destroy_codec_options(&options);
if (buffer) {
buffer_free(buffer);
}
return result;
}
static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
/* NOTE just using a random number as the request_id */
struct module_state *state = GETSTATE(self);
@ -688,333 +392,6 @@ _set_document_too_large(int size, long max) {
}
}
static PyObject*
_send_insert(PyObject* self, PyObject* ctx,
PyObject* gle_args, buffer_t buffer,
char* coll_name, Py_ssize_t coll_len, int request_id, int safe,
codec_options_t* options, PyObject* to_publish, int compress) {
if (safe) {
if (!add_last_error(self, buffer, request_id,
coll_name, coll_len, options, gle_args)) {
return NULL;
}
}
/* The max_doc_size parameter for legacy_bulk_insert is the max size of
* any document in buffer. We enforced max size already, pass 0 here. */
return PyObject_CallMethod(ctx, "legacy_bulk_insert",
"iy#iNOi",
request_id,
buffer_get_buffer(buffer),
(Py_ssize_t)buffer_get_position(buffer),
0,
PyBool_FromLong((long)safe),
to_publish, compress);
}
static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
struct module_state *state = GETSTATE(self);
/* NOTE just using a random number as the request_id */
int request_id = rand();
int send_safe, flags = 0;
int length_location, message_length;
Py_ssize_t collection_name_length;
int compress;
char* collection_name = NULL;
PyObject* docs;
PyObject* doc;
PyObject* iterator;
PyObject* ctx;
PyObject* last_error_args;
PyObject* result;
PyObject* max_bson_size_obj;
PyObject* max_message_size_obj;
PyObject* compress_obj;
PyObject* to_publish = NULL;
unsigned char check_keys;
unsigned char safe;
unsigned char continue_on_error;
codec_options_t options;
unsigned char empty = 1;
long max_bson_size;
long max_message_size;
buffer_t buffer;
PyObject *exc_type = NULL, *exc_value = NULL, *exc_trace = NULL;
if (!PyArg_ParseTuple(args, "et#ObbObO&O",
"utf-8",
&collection_name,
&collection_name_length,
&docs, &check_keys, &safe,
&last_error_args,
&continue_on_error,
convert_codec_options, &options,
&ctx)) {
return NULL;
}
if (continue_on_error) {
flags += 1;
}
/*
* If we are doing unacknowledged writes *and* continue_on_error
* is True it's pointless (and slower) to send GLE.
*/
send_safe = (safe || !continue_on_error);
max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size");
max_bson_size = PyLong_AsLong(max_bson_size_obj);
Py_XDECREF(max_bson_size_obj);
if (max_bson_size == -1) {
destroy_codec_options(&options);
PyMem_Free(collection_name);
return NULL;
}
max_message_size_obj = PyObject_GetAttrString(ctx, "max_message_size");
max_message_size = PyLong_AsLong(max_message_size_obj);
Py_XDECREF(max_message_size_obj);
if (max_message_size == -1) {
destroy_codec_options(&options);
PyMem_Free(collection_name);
return NULL;
}
compress_obj = PyObject_GetAttrString(ctx, "compress");
compress = PyObject_IsTrue(compress_obj);
Py_XDECREF(compress_obj);
if (compress == -1) {
destroy_codec_options(&options);
PyMem_Free(collection_name);
return NULL;
}
compress = compress && !(safe || send_safe);
buffer = buffer_new();
if (!buffer) {
destroy_codec_options(&options);
PyMem_Free(collection_name);
return NULL;
}
length_location = init_insert_buffer(buffer,
request_id,
flags,
collection_name,
collection_name_length,
compress);
if (length_location == -1) {
goto insertfail;
}
if (!(to_publish = PyList_New(0))) {
goto insertfail;
}
iterator = PyObject_GetIter(docs);
if (iterator == NULL) {
PyObject* InvalidOperation = _error("InvalidOperation");
if (InvalidOperation) {
PyErr_SetString(InvalidOperation, "input is not iterable");
Py_DECREF(InvalidOperation);
}
goto insertfail;
}
while ((doc = PyIter_Next(iterator)) != NULL) {
int before = buffer_get_position(buffer);
int cur_size;
if (!write_dict(state->_cbson, buffer, doc, check_keys,
&options, 1)) {
goto iterfail;
}
cur_size = buffer_get_position(buffer) - before;
if (cur_size > max_bson_size) {
/* If we've encoded anything send it before raising. */
if (!empty) {
buffer_update_position(buffer, before);
if (!compress) {
message_length = buffer_get_position(buffer) - length_location;
buffer_write_int32_at_position(
buffer, length_location, (int32_t)message_length);
}
result = _send_insert(self, ctx, last_error_args, buffer,
collection_name, collection_name_length,
request_id, send_safe, &options,
to_publish, compress);
if (!result)
goto iterfail;
Py_DECREF(result);
}
_set_document_too_large(cur_size, max_bson_size);
goto iterfail;
}
empty = 0;
/* We have enough data, send this batch. */
if (buffer_get_position(buffer) > max_message_size) {
int new_request_id = rand();
int message_start;
buffer_t new_buffer = buffer_new();
if (!new_buffer) {
goto iterfail;
}
message_start = init_insert_buffer(new_buffer,
new_request_id,
flags,
collection_name,
collection_name_length,
compress);
if (message_start == -1) {
buffer_free(new_buffer);
goto iterfail;
}
/* Copy the overflow encoded document into the new buffer. */
if (!buffer_write_bytes(new_buffer,
(const char*)buffer_get_buffer(buffer) + before, cur_size)) {
buffer_free(new_buffer);
goto iterfail;
}
/* Roll back to the beginning of this document. */
buffer_update_position(buffer, before);
if (!compress) {
message_length = buffer_get_position(buffer) - length_location;
buffer_write_int32_at_position(
buffer, length_location, (int32_t)message_length);
}
result = _send_insert(self, ctx, last_error_args, buffer,
collection_name, collection_name_length,
request_id, send_safe, &options, to_publish,
compress);
buffer_free(buffer);
buffer = new_buffer;
request_id = new_request_id;
length_location = message_start;
Py_DECREF(to_publish);
if (!(to_publish = PyList_New(0))) {
goto insertfail;
}
if (!result) {
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
PyObject* OperationFailure;
PyErr_Fetch(&etype, &evalue, &etrace);
OperationFailure = _error("OperationFailure");
if (OperationFailure) {
if (PyErr_GivenExceptionMatches(etype, OperationFailure)) {
if (!safe || continue_on_error) {
Py_DECREF(OperationFailure);
if (!safe) {
/* We're doing unacknowledged writes and
* continue_on_error is False. Just return. */
Py_DECREF(etype);
Py_XDECREF(evalue);
Py_XDECREF(etrace);
Py_DECREF(to_publish);
Py_DECREF(iterator);
Py_DECREF(doc);
buffer_free(buffer);
PyMem_Free(collection_name);
Py_RETURN_NONE;
}
/* continue_on_error is True, store the error
* details to re-raise after the final batch */
Py_XDECREF(exc_type);
Py_XDECREF(exc_value);
Py_XDECREF(exc_trace);
exc_type = etype;
exc_value = evalue;
exc_trace = etrace;
if (PyList_Append(to_publish, doc) < 0) {
goto iterfail;
}
Py_CLEAR(doc);
continue;
}
}
Py_DECREF(OperationFailure);
}
/* This isn't OperationFailure, we couldn't
* import OperationFailure, or we are doing
* acknowledged writes. Re-raise immediately. */
PyErr_Restore(etype, evalue, etrace);
goto iterfail;
} else {
Py_DECREF(result);
}
}
if (PyList_Append(to_publish, doc) < 0) {
goto iterfail;
}
Py_CLEAR(doc);
}
Py_DECREF(iterator);
if (PyErr_Occurred()) {
goto insertfail;
}
if (empty) {
PyObject* InvalidOperation = _error("InvalidOperation");
if (InvalidOperation) {
PyErr_SetString(InvalidOperation, "cannot do an empty bulk insert");
Py_DECREF(InvalidOperation);
}
goto insertfail;
}
if (!compress) {
message_length = buffer_get_position(buffer) - length_location;
buffer_write_int32_at_position(
buffer, length_location, (int32_t)message_length);
}
/* Send the last (or only) batch */
result = _send_insert(self, ctx, last_error_args, buffer,
collection_name, collection_name_length,
request_id, safe, &options, to_publish, compress);
Py_DECREF(to_publish);
PyMem_Free(collection_name);
buffer_free(buffer);
if (!result) {
Py_XDECREF(exc_type);
Py_XDECREF(exc_value);
Py_XDECREF(exc_trace);
return NULL;
} else {
Py_DECREF(result);
}
if (exc_type) {
/* Re-raise any previously stored exception
* due to continue_on_error being True */
PyErr_Restore(exc_type, exc_value, exc_trace);
return NULL;
}
Py_RETURN_NONE;
iterfail:
Py_XDECREF(doc);
Py_DECREF(iterator);
insertfail:
Py_XDECREF(exc_type);
Py_XDECREF(exc_value);
Py_XDECREF(exc_trace);
Py_XDECREF(to_publish);
buffer_free(buffer);
PyMem_Free(collection_name);
return NULL;
}
#define _INSERT 0
#define _UPDATE 1
#define _DELETE 2
@ -1591,94 +968,13 @@ fail:
return result;
}
static PyObject*
_cbson_batched_write_command(PyObject* self, PyObject* args) {
char *ns = NULL;
unsigned char op;
unsigned char check_keys;
Py_ssize_t ns_len;
int request_id;
int position;
PyObject* command;
PyObject* docs;
PyObject* ctx = NULL;
PyObject* to_publish = NULL;
PyObject* result = NULL;
codec_options_t options;
buffer_t buffer;
struct module_state *state = GETSTATE(self);
if (!PyArg_ParseTuple(args, "et#bOObO&O", "utf-8",
&ns, &ns_len, &op, &command, &docs, &check_keys,
convert_codec_options, &options,
&ctx)) {
return NULL;
}
if (!(buffer = buffer_new())) {
PyMem_Free(ns);
destroy_codec_options(&options);
return NULL;
}
/* Save space for message length and request id */
if ((buffer_save_space(buffer, 8)) == -1) {
goto fail;
}
if (!buffer_write_bytes(buffer,
"\x00\x00\x00\x00" /* responseTo */
"\xd4\x07\x00\x00", /* opcode */
8)) {
goto fail;
}
if (!(to_publish = PyList_New(0))) {
goto fail;
}
if (!_batched_write_command(
ns,
ns_len,
op,
check_keys,
command,
docs,
ctx,
to_publish,
options,
buffer,
state)) {
goto fail;
}
request_id = rand();
position = buffer_get_position(buffer);
buffer_write_int32_at_position(buffer, 0, (int32_t)position);
buffer_write_int32_at_position(buffer, 4, (int32_t)request_id);
result = Py_BuildValue("iy#O", request_id,
buffer_get_buffer(buffer),
(Py_ssize_t)buffer_get_position(buffer),
to_publish);
fail:
PyMem_Free(ns);
destroy_codec_options(&options);
buffer_free(buffer);
Py_XDECREF(to_publish);
return result;
}
static PyMethodDef _CMessageMethods[] = {
{"_insert_message", _cbson_insert_message, METH_VARARGS,
"Create an insert message to be sent to MongoDB"},
{"_update_message", _cbson_update_message, METH_VARARGS,
"create an update message to be sent to MongoDB"},
{"_query_message", _cbson_query_message, METH_VARARGS,
"create a query message to be sent to MongoDB"},
{"_get_more_message", _cbson_get_more_message, METH_VARARGS,
"create a get more message to be sent to MongoDB"},
{"_op_msg", _cbson_op_msg, METH_VARARGS,
"create an OP_MSG message to be sent to MongoDB"},
{"_do_batched_insert", _cbson_do_batched_insert, METH_VARARGS,
"insert a batch of documents, splitting the batch as needed"},
{"_batched_write_command", _cbson_batched_write_command, METH_VARARGS,
"Create the next batched insert, update, or delete command"},
{"_encode_batched_write_command", _cbson_encode_batched_write_command, METH_VARARGS,
"Encode the next batched insert, update, or delete command"},
{"_batched_op_msg", _cbson_batched_op_msg, METH_VARARGS,

View File

@ -88,11 +88,6 @@ class _AggregationCommand(object):
"""The database against which the aggregation command is run."""
raise NotImplementedError
@staticmethod
def _check_compat(sock_info):
"""Check whether the server version in-use supports aggregation."""
pass
def _process_result(self, result, session, server, sock_info, secondary_ok):
if self._result_processor:
self._result_processor(
@ -104,9 +99,6 @@ class _AggregationCommand(object):
return self._target._read_preference_for(session)
def get_cursor(self, session, server, sock_info, secondary_ok):
# Ensure command compatibility.
self._check_compat(sock_info)
# Serialize command.
cmd = SON([("aggregate", self._aggregation_target),
("pipeline", self._pipeline)])
@ -117,8 +109,7 @@ class _AggregationCommand(object):
# - server version is >= 4.2 or
# - server version is >= 3.2 and pipeline doesn't use $out
if (('readConcern' not in cmd) and
((sock_info.max_wire_version >= 4 and
not self._performs_write) or
(not self._performs_write or
(sock_info.max_wire_version >= 8))):
read_concern = self._target.read_concern
else:
@ -218,11 +209,3 @@ class _DatabaseAggregationCommand(_AggregationCommand):
# aggregate too by defaulting to the <db>.$cmd.aggregate namespace.
_, collname = cursor.get("ns", self._cursor_namespace).split(".", 1)
return self._database[collname]
@staticmethod
def _check_compat(sock_info):
# Older server version don't raise a descriptive error, so we raise
# one instead.
if not sock_info.max_wire_version >= 6:
err_msg = "Database.aggregate() is only supported on MongoDB 3.6+."
raise ConfigurationError(err_msg)

View File

@ -455,10 +455,6 @@ def _authenticate_x509(credentials, sock_info):
return
cmd = _X509Context(credentials).speculate_command()
if credentials.username is None and sock_info.max_wire_version < 5:
raise ConfigurationError(
"A username is required for MONGODB-X509 authentication "
"when connected to MongoDB versions older than 3.4.")
sock_info.command('$external', cmd)
@ -496,10 +492,8 @@ def _authenticate_default(credentials, sock_info):
return _authenticate_scram(credentials, sock_info, 'SCRAM-SHA-256')
else:
return _authenticate_scram(credentials, sock_info, 'SCRAM-SHA-1')
elif sock_info.max_wire_version >= 3:
return _authenticate_scram(credentials, sock_info, 'SCRAM-SHA-1')
else:
return _authenticate_mongo_cr(credentials, sock_info)
return _authenticate_scram(credentials, sock_info, 'SCRAM-SHA-1')
_AUTH_MAP = {

View File

@ -35,7 +35,6 @@ from pymongo.errors import (BulkWriteError,
InvalidOperation,
OperationFailure)
from pymongo.message import (_INSERT, _UPDATE, _DELETE,
_do_batched_insert,
_randint,
_BulkWriteContext,
_EncryptedBulkWriteContext)
@ -256,17 +255,6 @@ class _Bulk(object):
def _execute_command(self, generator, write_concern, session,
sock_info, op_id, retryable, full_result):
if sock_info.max_wire_version < 5:
if self.uses_collation:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use a collation.')
if self.uses_hint:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use hint.')
if sock_info.max_wire_version < 6 and self.uses_array_filters:
raise ConfigurationError(
'Must be connected to MongoDB 3.6+ to use arrayFilters.')
db_name = self.collection.database.name
client = self.collection.database.client
listeners = client._event_listeners
@ -283,7 +271,7 @@ class _Bulk(object):
('ordered', self.ordered)])
if not write_concern.is_server_default:
cmd['writeConcern'] = write_concern.document
if self.bypass_doc_val and sock_info.max_wire_version >= 4:
if self.bypass_doc_val:
cmd['bypassDocumentValidation'] = True
bwc = self.bulk_ctx_class(
db_name, cmd, sock_info, op_id, listeners, session,
@ -358,24 +346,6 @@ class _Bulk(object):
_raise_bulk_write_error(full_result)
return full_result
def execute_insert_no_results(self, sock_info, run, op_id, acknowledged):
"""Execute insert, returning no results.
"""
command = SON([('insert', self.collection.name),
('ordered', self.ordered)])
concern = {'w': int(self.ordered)}
command['writeConcern'] = concern
if self.bypass_doc_val and sock_info.max_wire_version >= 4:
command['bypassDocumentValidation'] = True
db = self.collection.database
bwc = _BulkWriteContext(
db.name, command, sock_info, op_id, db.client._event_listeners,
None, _INSERT, self.collection.codec_options)
# Legacy batched OP_INSERT.
_do_batched_insert(
self.collection.full_name, run.ops, True, acknowledged, concern,
not self.ordered, self.collection.codec_options, bwc)
def execute_op_msg_no_results(self, sock_info, generator):
"""Execute write commands with OP_MSG and w=0 writeConcern, unordered.
"""
@ -441,62 +411,13 @@ class _Bulk(object):
raise ConfigurationError(
'hint is unsupported for unacknowledged writes.')
# Cannot have both unacknowledged writes and bypass document validation.
if self.bypass_doc_val and sock_info.max_wire_version >= 4:
if self.bypass_doc_val:
raise OperationFailure("Cannot set bypass_document_validation with"
" unacknowledged write concern")
# OP_MSG
if sock_info.max_wire_version > 5:
if self.ordered:
return self.execute_command_no_results(sock_info, generator)
return self.execute_op_msg_no_results(sock_info, generator)
coll = self.collection
# If ordered is True we have to send GLE or use write
# commands so we can abort on the first error.
write_concern = WriteConcern(w=int(self.ordered))
op_id = _randint()
next_run = next(generator)
while next_run:
# An ordered bulk write needs to send acknowledged writes to short
# circuit the next run. However, the final message on the final
# run can be unacknowledged.
run = next_run
next_run = next(generator, None)
needs_ack = self.ordered and next_run is not None
try:
if run.op_type == _INSERT:
self.execute_insert_no_results(
sock_info, run, op_id, needs_ack)
elif run.op_type == _UPDATE:
for operation in run.ops:
doc = operation['u']
check_keys = True
if doc and next(iter(doc)).startswith('$'):
check_keys = False
coll._update(
sock_info,
operation['q'],
doc,
upsert=operation['upsert'],
check_keys=check_keys,
multi=operation['multi'],
write_concern=write_concern,
op_id=op_id,
ordered=self.ordered,
bypass_doc_val=self.bypass_doc_val)
else:
for operation in run.ops:
coll._delete(sock_info,
operation['q'],
not operation['limit'],
write_concern,
op_id,
self.ordered)
except OperationFailure:
if self.ordered:
break
if self.ordered:
return self.execute_command_no_results(sock_info, generator)
return self.execute_op_msg_no_results(sock_info, generator)
def execute(self, write_concern, session):
"""Execute operations.

View File

@ -435,59 +435,6 @@ class Collection(common.BaseObject):
return BulkWriteResult(bulk_api_result, True)
return BulkWriteResult({}, False)
def _legacy_write(self, sock_info, name, cmd, op_id,
bypass_doc_val, func, *args):
"""Internal legacy unacknowledged write helper."""
# Cannot have both unacknowledged write and bypass document validation.
if bypass_doc_val and sock_info.max_wire_version >= 4:
raise OperationFailure("Cannot set bypass_document_validation with"
" unacknowledged write concern")
listeners = self.database.client._event_listeners
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
args = args + (sock_info.compression_context,)
rqst_id, msg, max_size = func(*args)
if publish:
duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, self.__database.name, rqst_id, sock_info.address, op_id,
sock_info.service_id)
start = datetime.datetime.now()
try:
result = sock_info.legacy_write(rqst_id, msg, max_size, False)
except Exception as exc:
if publish:
dur = (datetime.datetime.now() - start) + duration
if isinstance(exc, OperationFailure):
details = exc.details
# Succeed if GLE was successful and this is a write error.
if details.get("ok") and "n" in details:
reply = message._convert_write_result(
name, cmd, details)
listeners.publish_command_success(
dur, reply, name, rqst_id, sock_info.address,
op_id, sock_info.service_id)
raise
else:
details = message._convert_exception(exc)
listeners.publish_command_failure(
dur, details, name, rqst_id, sock_info.address, op_id,
sock_info.service_id)
raise
if publish:
if result is not None:
reply = message._convert_write_result(name, cmd, result)
else:
# Comply with APM spec.
reply = {'ok': 1}
duration = (datetime.datetime.now() - start) + duration
listeners.publish_command_success(
duration, reply, name, rqst_id, sock_info.address, op_id,
sock_info.service_id)
return result
def _insert_one(
self, doc, ordered,
check_keys, write_concern, op_id, bypass_doc_val,
@ -502,15 +449,7 @@ class Collection(common.BaseObject):
command['writeConcern'] = write_concern.document
def _insert_command(session, sock_info, retryable_write):
if not sock_info.op_msg_enabled and not acknowledged:
# Legacy OP_INSERT.
return self._legacy_write(
sock_info, 'insert', command, op_id,
bypass_doc_val, message._insert, self.__full_name,
[doc], check_keys, False,
self.__write_response_codec_options)
if bypass_doc_val and sock_info.max_wire_version >= 4:
if bypass_doc_val:
command['bypassDocumentValidation'] = True
result = sock_info.command(
@ -658,28 +597,19 @@ class Collection(common.BaseObject):
('multi', multi),
('upsert', upsert)])
if collation is not None:
if sock_info.max_wire_version < 5:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use collations.')
elif not acknowledged:
if not acknowledged:
raise ConfigurationError(
'Collation is unsupported for unacknowledged writes.')
else:
update_doc['collation'] = collation
if array_filters is not None:
if sock_info.max_wire_version < 6:
raise ConfigurationError(
'Must be connected to MongoDB 3.6+ to use array_filters.')
elif not acknowledged:
if not acknowledged:
raise ConfigurationError(
'arrayFilters is unsupported for unacknowledged writes.')
else:
update_doc['arrayFilters'] = array_filters
if hint is not None:
if sock_info.max_wire_version < 5:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use hint.')
elif not acknowledged:
if not acknowledged:
raise ConfigurationError(
'hint is unsupported for unacknowledged writes.')
if not isinstance(hint, str):
@ -692,16 +622,8 @@ class Collection(common.BaseObject):
if not write_concern.is_server_default:
command['writeConcern'] = write_concern.document
if not sock_info.op_msg_enabled and not acknowledged:
# Legacy OP_UPDATE.
return self._legacy_write(
sock_info, 'update', command, op_id,
bypass_doc_val, message._update, self.__full_name, upsert,
multi, criteria, document, check_keys,
self.__write_response_codec_options)
# Update command.
if bypass_doc_val and sock_info.max_wire_version >= 4:
if bypass_doc_val:
command['bypassDocumentValidation'] = True
# The command result has to be published for APM unmodified
@ -1018,19 +940,13 @@ class Collection(common.BaseObject):
('limit', int(not multi))])
collation = validate_collation_or_none(collation)
if collation is not None:
if sock_info.max_wire_version < 5:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use collations.')
elif not acknowledged:
if not acknowledged:
raise ConfigurationError(
'Collation is unsupported for unacknowledged writes.')
else:
delete_doc['collation'] = collation
if hint is not None:
if sock_info.max_wire_version < 5:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use hint.')
elif not acknowledged:
if not acknowledged:
raise ConfigurationError(
'hint is unsupported for unacknowledged writes.')
if not isinstance(hint, str):
@ -1042,13 +958,6 @@ class Collection(common.BaseObject):
if not write_concern.is_server_default:
command['writeConcern'] = write_concern.document
if not sock_info.op_msg_enabled and not acknowledged:
# Legacy OP_DELETE.
return self._legacy_write(
sock_info, 'delete', command, op_id,
False, message._delete, self.__full_name, criteria,
self.__write_response_codec_options,
int(not multi))
# Delete command.
result = sock_info.command(
self.__database.name,
@ -1635,7 +1544,6 @@ class Collection(common.BaseObject):
"""
names = []
with self._socket_for_writes(session) as sock_info:
supports_collations = sock_info.max_wire_version >= 5
supports_quorum = sock_info.max_wire_version >= 9
def gen_indexes():
@ -1645,10 +1553,6 @@ class Collection(common.BaseObject):
"%r is not an instance of "
"pymongo.operations.IndexModel" % (index,))
document = index.document
if "collation" in document and not supports_collations:
raise ConfigurationError(
"Must be connected to MongoDB "
"3.4+ to use collations.")
names.append(document["name"])
yield document
@ -1880,32 +1784,21 @@ class Collection(common.BaseObject):
def _cmd(session, server, sock_info, secondary_ok):
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
if sock_info.max_wire_version > 2:
with self.__database.client._tmp_session(session, False) as s:
try:
cursor = self._command(sock_info, cmd, secondary_ok,
read_pref,
codec_options,
session=s)["cursor"]
except OperationFailure as exc:
# Ignore NamespaceNotFound errors to match the behavior
# of reading from *.system.indexes.
if exc.code != 26:
raise
cursor = {'id': 0, 'firstBatch': []}
cmd_cursor = CommandCursor(
coll, cursor, sock_info.address, session=s,
explicit_session=session is not None)
else:
res = message._first_batch(
sock_info, self.__database.name, "system.indexes",
{"ns": self.__full_name}, 0, secondary_ok, codec_options,
read_pref, cmd,
self.database.client._event_listeners)
cursor = res["cursor"]
# Note that a collection can only have 64 indexes, so there
# will never be a getMore call.
cmd_cursor = CommandCursor(coll, cursor, sock_info.address)
with self.__database.client._tmp_session(session, False) as s:
try:
cursor = self._command(sock_info, cmd, secondary_ok,
read_pref,
codec_options,
session=s)["cursor"]
except OperationFailure as exc:
# Ignore NamespaceNotFound errors to match the behavior
# of reading from *.system.indexes.
if exc.code != 26:
raise
cursor = {'id': 0, 'firstBatch': []}
cmd_cursor = CommandCursor(
coll, cursor, sock_info.address, session=s,
explicit_session=session is not None)
cmd_cursor._maybe_pin_connection(sock_info)
return cmd_cursor
@ -2356,10 +2249,6 @@ class Collection(common.BaseObject):
def _find_and_modify(session, sock_info, retryable_write):
if array_filters is not None:
if sock_info.max_wire_version < 6:
raise ConfigurationError(
'Must be connected to MongoDB 3.6+ to use '
'arrayFilters.')
if not write_concern.acknowledged:
raise ConfigurationError(
'arrayFilters is unsupported for unacknowledged '
@ -2373,8 +2262,7 @@ class Collection(common.BaseObject):
raise ConfigurationError(
'hint is unsupported for unacknowledged writes.')
cmd['hint'] = hint
if (sock_info.max_wire_version >= 4 and
not write_concern.is_server_default):
if not write_concern.is_server_default:
cmd['writeConcern'] = write_concern.document
out = self._command(sock_info, cmd,
read_preference=ReadPreference.PRIMARY,

View File

@ -14,25 +14,19 @@
"""Database level operations."""
import warnings
from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.dbref import DBRef
from bson.son import SON
from pymongo import auth, common
from pymongo import common
from pymongo.aggregation import _DatabaseAggregationCommand
from pymongo.change_stream import DatabaseChangeStream
from pymongo.collection import Collection
from pymongo.command_cursor import CommandCursor
from pymongo.errors import (CollectionInvalid,
InvalidName)
from pymongo.message import _first_batch
from pymongo.read_preferences import ReadPreference
_INDEX_REGEX = {"name": {"$regex": r"^(?!.*\$)"}}
def _check_name(name):
"""Check if a database name is valid.
"""
@ -618,37 +612,21 @@ class Database(common.BaseObject):
coll = self.get_collection(
"$cmd", read_preference=read_preference)
if sock_info.max_wire_version > 2:
cmd = SON([("listCollections", 1),
("cursor", {})])
cmd.update(kwargs)
with self.__client._tmp_session(
session, close=False) as tmp_session:
cursor = self._command(
sock_info, cmd, secondary_okay,
read_preference=read_preference,
session=tmp_session)["cursor"]
cmd_cursor = CommandCursor(
coll,
cursor,
sock_info.address,
session=tmp_session,
explicit_session=session is not None)
else:
match = _INDEX_REGEX
if "filter" in kwargs:
match = {"$and": [_INDEX_REGEX, kwargs["filter"]]}
dblen = len(self.name.encode("utf8") + b".")
pipeline = [
{"$project": {"name": {"$substr": ["$name", dblen, -1]},
"options": 1}},
{"$match": match}
]
cmd = SON([("aggregate", "system.namespaces"),
("pipeline", pipeline),
("cursor", kwargs.get("cursor", {}))])
cursor = self._command(sock_info, cmd, secondary_okay)["cursor"]
cmd_cursor = CommandCursor(coll, cursor, sock_info.address)
cmd = SON([("listCollections", 1),
("cursor", {})])
cmd.update(kwargs)
with self.__client._tmp_session(
session, close=False) as tmp_session:
cursor = self._command(
sock_info, cmd, secondary_okay,
read_preference=read_preference,
session=tmp_session)["cursor"]
cmd_cursor = CommandCursor(
coll,
cursor,
sock_info.address,
session=tmp_session,
explicit_session=session is not None)
cmd_cursor._maybe_pin_connection(sock_info)
return cmd_cursor

View File

@ -168,41 +168,6 @@ def _check_command_response(response, max_wire_version,
raise OperationFailure(errmsg, code, response, max_wire_version)
def _check_gle_response(result, max_wire_version):
"""Return getlasterror response as a dict, or raise OperationFailure."""
# Did getlasterror itself fail?
_check_command_response(result, max_wire_version)
if result.get("wtimeout", False):
# MongoDB versions before 1.8.0 return the error message in an "errmsg"
# field. If "errmsg" exists "err" will also exist set to None, so we
# have to check for "errmsg" first.
raise WTimeoutError(result.get("errmsg", result.get("err")),
result.get("code"),
result)
error_msg = result.get("err", "")
if error_msg is None:
return result
if error_msg.startswith(HelloCompat.LEGACY_ERROR):
raise NotPrimaryError(error_msg, result)
details = result
# mongos returns the error code in an error object for some errors.
if "errObjects" in result:
for errobj in result["errObjects"]:
if errobj.get("err") == error_msg:
details = errobj
break
code = details.get("code")
if code in (11000, 11001, 12582):
raise DuplicateKeyError(details["err"], code, result)
raise OperationFailure(details["err"], code, result)
def _raise_last_write_error(write_errors):
# If the last batch had multiple errors only report
# the last error to emulate continue_on_error.

View File

@ -32,7 +32,6 @@ from bson import (CodecOptions,
_decode_selective,
_dict_to_bson,
_make_c_string)
from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.int64 import Int64
from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS,
RawBSONDocument)
@ -52,7 +51,6 @@ from pymongo.errors import (ConfigurationError,
OperationFailure,
ProtocolError)
from pymongo.hello import HelloCompat
from pymongo.read_concern import DEFAULT_READ_CONCERN
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
@ -271,7 +269,7 @@ class _Query(object):
def use_command(self, sock_info):
use_find_cmd = False
if sock_info.max_wire_version >= 4 and not self.exhaust:
if not self.exhaust:
use_find_cmd = True
elif sock_info.max_wire_version >= 8:
# OP_MSG supports exhaust on MongoDB 4.2+
@ -283,18 +281,7 @@ class _Query(object):
% (self.read_concern.level,
sock_info.max_wire_version))
if sock_info.max_wire_version < 5 and self.collation is not None:
raise ConfigurationError(
'Specifying a collation is unsupported with a max wire '
'version of %d.' % (sock_info.max_wire_version,))
if sock_info.max_wire_version < 4 and self.allow_disk_use is not None:
raise ConfigurationError(
'Specifying allowDiskUse is unsupported with a max wire '
'version of %d.' % (sock_info.max_wire_version,))
sock_info.validate_session(self.client, self.session)
return use_find_cmd
def as_command(self, sock_info):
@ -342,24 +329,21 @@ class _Query(object):
if use_cmd:
spec = self.as_command(sock_info)[0]
if sock_info.op_msg_enabled:
request_id, msg, size, _ = _op_msg(
0, spec, self.db, self.read_preference,
set_secondary_ok, False, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size
ns = "%s.%s" % (self.db, "$cmd")
ntoreturn = -1 # All DB commands return 1 document
else:
# OP_QUERY treats ntoreturn of -1 and 1 the same, return
# one document and close the cursor. We have to use 2 for
# batch size if 1 is specified.
ntoreturn = self.batch_size == 1 and 2 or self.batch_size
if self.limit:
if ntoreturn:
ntoreturn = min(self.limit, ntoreturn)
else:
ntoreturn = self.limit
request_id, msg, size, _ = _op_msg(
0, spec, self.db, self.read_preference,
set_secondary_ok, False, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size
# OP_QUERY treats ntoreturn of -1 and 1 the same, return
# one document and close the cursor. We have to use 2 for
# batch size if 1 is specified.
ntoreturn = self.batch_size == 1 and 2 or self.batch_size
if self.limit:
if ntoreturn:
ntoreturn = min(self.limit, ntoreturn)
else:
ntoreturn = self.limit
if sock_info.is_mongos:
spec = _maybe_add_read_preference(spec,
@ -400,7 +384,7 @@ class _GetMore(object):
def use_command(self, sock_info):
use_cmd = False
if sock_info.max_wire_version >= 4 and not self.exhaust:
if not self.exhaust:
use_cmd = True
elif sock_info.max_wire_version >= 8:
# OP_MSG supports exhaust on MongoDB 4.2+
@ -440,19 +424,15 @@ class _GetMore(object):
if use_cmd:
spec = self.as_command(sock_info)[0]
if sock_info.op_msg_enabled:
if self.sock_mgr:
flags = _OpMsg.EXHAUST_ALLOWED
else:
flags = 0
request_id, msg, size, _ = _op_msg(
flags, spec, self.db, None,
False, False, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size
ns = "%s.%s" % (self.db, "$cmd")
return _query(0, ns, 0, -1, spec, None, self.codec_options,
ctx=ctx)
if self.sock_mgr:
flags = _OpMsg.EXHAUST_ALLOWED
else:
flags = 0
request_id, msg, size, _ = _op_msg(
flags, spec, self.db, None,
False, False, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size
return _get_more(ns, self.ntoreturn, self.cursor_id, ctx)
@ -464,7 +444,7 @@ class _RawBatchQuery(_Query):
if sock_info.max_wire_version >= 8:
# MongoDB 4.2+ supports exhaust over OP_MSG
return True
elif sock_info.op_msg_enabled and not self.exhaust:
elif not self.exhaust:
return True
return False
@ -476,7 +456,7 @@ class _RawBatchGetMore(_GetMore):
if sock_info.max_wire_version >= 8:
# MongoDB 4.2+ supports exhaust over OP_MSG
return True
elif sock_info.op_msg_enabled and not self.exhaust:
elif not self.exhaust:
return True
return False
@ -528,16 +508,6 @@ def _compress(operation, data, ctx):
return request_id, header + compressed
def __last_error(namespace, args):
"""Data to send to do a lastError.
"""
cmd = SON([("getlasterror", 1)])
cmd.update(args)
splitns = namespace.split('.', 1)
return _query(0, splitns[0] + '.$cmd', 0, -1, cmd,
None, DEFAULT_CODEC_OPTIONS)
_pack_header = struct.Struct("<iiii").pack
@ -552,104 +522,6 @@ def __pack_message(operation, data):
_pack_int = struct.Struct("<i").pack
def _insert_impl(collection_name, docs, check_keys, flags, opts):
"""Get an OP_INSERT message"""
encode = _dict_to_bson # Make local. Uses extensions.
if len(docs) == 1:
encoded = encode(docs[0], check_keys, opts)
return b"".join([
b"\x00\x00\x00\x00", # Flags don't matter for one doc.
_make_c_string(collection_name),
encoded]), len(encoded)
encoded = [encode(doc, check_keys, opts) for doc in docs]
if not encoded:
raise InvalidOperation("cannot do an empty bulk insert")
return b"".join([
_pack_int(flags),
_make_c_string(collection_name),
b"".join(encoded)]), max(map(len, encoded))
def _insert_compressed(
collection_name, docs, check_keys, continue_on_error, opts, ctx):
"""Internal compressed unacknowledged insert message helper."""
op_insert, max_bson_size = _insert_impl(
collection_name, docs, check_keys, continue_on_error, opts)
rid, msg = _compress(2002, op_insert, ctx)
return rid, msg, max_bson_size
def _insert_uncompressed(collection_name, docs, check_keys, continue_on_error,
opts):
"""Internal insert message helper."""
op_insert, max_bson_size = _insert_impl(
collection_name, docs, check_keys, continue_on_error, opts)
rid, msg = __pack_message(2002, op_insert)
return rid, msg, max_bson_size
if _use_c:
_insert_uncompressed = _cmessage._insert_message
def _insert(collection_name, docs, check_keys, continue_on_error, opts,
ctx=None):
"""Get an **insert** message."""
if ctx:
return _insert_compressed(
collection_name, docs, check_keys, continue_on_error, opts, ctx)
return _insert_uncompressed(collection_name, docs, check_keys,
continue_on_error, opts)
def _update_impl(collection_name, upsert, multi, spec, doc, check_keys, opts):
"""Get an OP_UPDATE message."""
flags = 0
if upsert:
flags += 1
if multi:
flags += 2
encode = _dict_to_bson # Make local. Uses extensions.
encoded_update = encode(doc, check_keys, opts)
return b"".join([
_ZERO_32,
_make_c_string(collection_name),
_pack_int(flags),
encode(spec, False, opts),
encoded_update]), len(encoded_update)
def _update_compressed(
collection_name, upsert, multi, spec, doc, check_keys, opts, ctx):
"""Internal compressed unacknowledged update message helper."""
op_update, max_bson_size = _update_impl(
collection_name, upsert, multi, spec, doc, check_keys, opts)
rid, msg = _compress(2001, op_update, ctx)
return rid, msg, max_bson_size
def _update_uncompressed(collection_name, upsert, multi, spec, doc,
check_keys, opts):
"""Internal update message helper."""
op_update, max_bson_size = _update_impl(
collection_name, upsert, multi, spec, doc, check_keys, opts)
rid, msg = __pack_message(2001, op_update)
return rid, msg, max_bson_size
if _use_c:
_update_uncompressed = _cmessage._update_message
def _update(collection_name, upsert, multi, spec, doc, check_keys, opts,
ctx=None):
"""Get an **update** message."""
if ctx:
return _update_compressed(
collection_name, upsert, multi, spec, doc, check_keys, opts, ctx)
return _update_uncompressed(collection_name, upsert, multi, spec,
doc, check_keys, opts)
_pack_op_msg_flags_type = struct.Struct("<IB").pack
_pack_byte = struct.Struct("<B").pack
@ -829,52 +701,6 @@ def _get_more(collection_name, num_to_return, cursor_id, ctx=None):
return _get_more_uncompressed(collection_name, num_to_return, cursor_id)
def _delete_impl(collection_name, spec, opts, flags):
"""Get an OP_DELETE message."""
encoded = _dict_to_bson(spec, False, opts) # Uses extensions.
return b"".join([
_ZERO_32,
_make_c_string(collection_name),
_pack_int(flags),
encoded]), len(encoded)
def _delete_compressed(collection_name, spec, opts, flags, ctx):
"""Internal compressed unacknowledged delete message helper."""
op_delete, max_bson_size = _delete_impl(collection_name, spec, opts, flags)
rid, msg = _compress(2006, op_delete, ctx)
return rid, msg, max_bson_size
def _delete_uncompressed(collection_name, spec, opts, flags=0):
"""Internal delete message helper."""
op_delete, max_bson_size = _delete_impl(collection_name, spec, opts, flags)
rid, msg = __pack_message(2006, op_delete)
return rid, msg, max_bson_size
def _delete(collection_name, spec, opts, flags=0, ctx=None):
"""Get a **delete** message.
`opts` is a CodecOptions. `flags` is a bit vector that may contain
the SingleRemove flag or not:
http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#op-delete
"""
if ctx:
return _delete_compressed(collection_name, spec, opts, flags, ctx)
return _delete_uncompressed(collection_name, spec, opts, flags)
def _kill_cursors(cursor_ids):
"""Get a **killCursors** message.
"""
num_cursors = len(cursor_ids)
pack = struct.Struct("<ii" + ("q" * num_cursors)).pack
op_kill_cursors = pack(0, num_cursors, *cursor_ids)
return __pack_message(2007, op_kill_cursors)
class _BulkWriteContext(object):
"""A wrapper around SocketInfo for use with write splitting functions."""
@ -901,7 +727,7 @@ class _BulkWriteContext(object):
def _batch_command(self, docs):
namespace = self.db_name + '.$cmd'
request_id, msg, to_send = _do_bulk_write_command(
request_id, msg, to_send = _do_batched_op_msg(
namespace, self.op_type, self.command, docs, self.check_keys,
self.codec, self)
if not to_send:
@ -921,7 +747,7 @@ class _BulkWriteContext(object):
# without receiving a result. Send 0 for max_doc_size
# to disable size checking. Size checking is handled while
# the documents are encoded to BSON.
self.legacy_write(request_id, msg, 0, False, to_send)
self.unack_write(request_id, msg, 0, to_send)
return to_send
@property
@ -952,24 +778,15 @@ class _BulkWriteContext(object):
"""The maximum size of a BSON command before batch splitting."""
return self.max_bson_size
def legacy_bulk_insert(
self, request_id, msg, max_doc_size, acknowledged, docs, compress):
if compress:
request_id, msg = _compress(
2002, msg, self.sock_info.compression_context)
return self.legacy_write(
request_id, msg, max_doc_size, acknowledged, docs)
def legacy_write(self, request_id, msg, max_doc_size, acknowledged, docs):
"""A proxy for SocketInfo.legacy_write that handles event publishing.
def unack_write(self, request_id, msg, max_doc_size, docs):
"""A proxy for SocketInfo.unack_write that handles event publishing.
"""
if self.publish:
duration = datetime.datetime.now() - self.start_time
cmd = self._start(request_id, docs)
start = datetime.datetime.now()
try:
result = self.sock_info.legacy_write(
request_id, msg, max_doc_size, acknowledged)
result = self.sock_info.unack_write(msg, max_doc_size)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
if result is not None:
@ -1100,92 +917,6 @@ def _raise_document_too_large(operation, doc_size, max_size):
# about size for update and delete
raise DocumentTooLarge("%r command document too large" % (operation,))
def _do_batched_insert(collection_name, docs, check_keys,
safe, last_error_args, continue_on_error, opts,
ctx):
"""Insert `docs` using multiple batches.
"""
def _insert_message(insert_message, send_safe):
"""Build the insert message with header and GLE.
"""
request_id, final_message = __pack_message(2002, insert_message)
if send_safe:
request_id, error_message, _ = __last_error(collection_name,
last_error_args)
final_message += error_message
return request_id, final_message
send_safe = safe or not continue_on_error
last_error = None
data = _BytesIO()
data.write(struct.pack("<i", int(continue_on_error)))
data.write(_make_c_string(collection_name))
message_length = begin_loc = data.tell()
has_docs = False
to_send = []
encode = _dict_to_bson # Make local
compress = ctx.compress and not (safe or send_safe)
for doc in docs:
encoded = encode(doc, check_keys, opts)
encoded_length = len(encoded)
too_large = (encoded_length > ctx.max_bson_size)
message_length += encoded_length
if message_length < ctx.max_message_size and not too_large:
data.write(encoded)
to_send.append(doc)
has_docs = True
continue
if has_docs:
# We have enough data, send this message.
try:
if compress:
rid, msg = None, data.getvalue()
else:
rid, msg = _insert_message(data.getvalue(), send_safe)
ctx.legacy_bulk_insert(
rid, msg, 0, send_safe, to_send, compress)
# Exception type could be OperationFailure or a subtype
# (e.g. DuplicateKeyError)
except OperationFailure as exc:
# Like it says, continue on error...
if continue_on_error:
# Store exception details to re-raise after the final batch.
last_error = exc
# With unacknowledged writes just return at the first error.
elif not safe:
return
# With acknowledged writes raise immediately.
else:
raise
if too_large:
_raise_document_too_large(
"insert", encoded_length, ctx.max_bson_size)
message_length = begin_loc + encoded_length
data.seek(begin_loc)
data.truncate()
data.write(encoded)
to_send = [doc]
if not has_docs:
raise InvalidOperation("cannot do an empty bulk insert")
if compress:
request_id, msg = None, data.getvalue()
else:
request_id, msg = _insert_message(data.getvalue(), safe)
ctx.legacy_bulk_insert(request_id, msg, 0, safe, to_send, compress)
# Re-raise any exception stored due to continue_on_error
if last_error is not None:
raise last_error
if _use_c:
_do_batched_insert = _cmessage._do_batched_insert
# OP_MSG -------------------------------------------------------------
@ -1335,20 +1066,6 @@ def _do_batched_op_msg(
# End OP_MSG -----------------------------------------------------
def _batched_write_command_compressed(
namespace, operation, command, docs, check_keys, opts, ctx):
"""Create the next batched insert, update, or delete command, compressed.
"""
data, to_send = _encode_batched_write_command(
namespace, operation, command, docs, check_keys, opts, ctx)
request_id, msg = _compress(
2004,
data,
ctx.sock_info.compression_context)
return request_id, msg, to_send
def _encode_batched_write_command(
namespace, operation, command, docs, check_keys, opts, ctx):
"""Encode the next batched insert, update, or delete command.
@ -1362,53 +1079,6 @@ if _use_c:
_encode_batched_write_command = _cmessage._encode_batched_write_command
def _batched_write_command(
namespace, operation, command, docs, check_keys, opts, ctx):
"""Create the next batched insert, update, or delete command.
"""
buf = _BytesIO()
# Save space for message length and request id
buf.write(_ZERO_64)
# responseTo, opCode
buf.write(b"\x00\x00\x00\x00\xd4\x07\x00\x00")
# Write OP_QUERY write command
to_send, length = _batched_write_command_impl(
namespace, operation, command, docs, check_keys, opts, ctx, buf)
# Header - request id and message length
buf.seek(4)
request_id = _randint()
buf.write(_pack_int(request_id))
buf.seek(0)
buf.write(_pack_int(length))
return request_id, buf.getvalue(), to_send
if _use_c:
_batched_write_command = _cmessage._batched_write_command
def _do_batched_write_command(
namespace, operation, command, docs, check_keys, opts, ctx):
"""Batched write commands entry point."""
if ctx.sock_info.compression_context:
return _batched_write_command_compressed(
namespace, operation, command, docs, check_keys, opts, ctx)
return _batched_write_command(
namespace, operation, command, docs, check_keys, opts, ctx)
def _do_bulk_write_command(
namespace, operation, command, docs, check_keys, opts, ctx):
"""Bulk write commands entry point."""
if ctx.sock_info.max_wire_version > 5:
return _do_batched_op_msg(
namespace, operation, command, docs, check_keys, opts, ctx)
return _do_batched_write_command(
namespace, operation, command, docs, check_keys, opts, ctx)
def _batched_write_command_impl(
namespace, operation, command, docs, check_keys, opts, ctx, buf):
"""Create a batched OP_QUERY write command."""
@ -1585,7 +1255,7 @@ class _OpReply(object):
# PYTHON-945: ignore starting_from field.
flags, cursor_id, _, number_returned = cls.UNPACK_FROM(msg)
documents = bytes(msg[20:])
documents = msg[20:]
return cls(flags, cursor_id, number_returned, documents)
@ -1665,7 +1335,7 @@ class _OpMsg(object):
if len(msg) != first_payload_size + 5:
raise ProtocolError("Unsupported OP_MSG reply: >1 section")
payload_document = bytes(msg[5:])
payload_document = msg[5:]
return cls(flags, payload_document)
@ -1673,63 +1343,3 @@ _UNPACK_REPLY = {
_OpReply.OP_CODE: _OpReply.unpack,
_OpMsg.OP_CODE: _OpMsg.unpack,
}
def _first_batch(sock_info, db, coll, query, ntoreturn,
secondary_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None, codec_options,
read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, None,
None, None, False)
name = next(iter(cmd))
publish = listeners.enabled_for_commands
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(secondary_ok, sock_info)
if publish:
encoding_duration = datetime.datetime.now() - start
listeners.publish_command_start(
cmd, db, request_id, sock_info.address,
service_id=sock_info.service_id)
start = datetime.datetime.now()
sock_info.send_message(msg, max_doc_size)
reply = sock_info.receive_message(request_id)
try:
docs = reply.unpack_response(None, codec_options)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure = exc.details
else:
failure = _convert_exception(exc)
listeners.publish_command_failure(
duration, failure, name, request_id, sock_info.address,
service_id=sock_info.service_id)
raise
# listIndexes
if 'cursor' in cmd:
result = {
'cursor': {
'firstBatch': docs,
'id': reply.cursor_id,
'ns': '%s.%s' % (db, coll)
},
'ok': 1.0
}
# fsyncUnlock, currentOp
else:
result = docs[0] if docs else {}
result['ok'] = 1.0
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
listeners.publish_command_success(
duration, result, name, request_id, sock_info.address,
service_id=sock_info.service_id)
return result

View File

@ -32,9 +32,7 @@ access:
"""
import contextlib
import datetime
import threading
import warnings
import weakref
from collections import defaultdict
@ -62,8 +60,7 @@ from pymongo.errors import (AutoReconnect,
ServerSelectionTimeoutError)
from pymongo.pool import ConnectionClosedReason
from pymongo.read_preferences import ReadPreference
from pymongo.server_selectors import (writable_preferred_server_selector,
writable_server_selector)
from pymongo.server_selectors import writable_server_selector
from pymongo.server_type import SERVER_TYPE
from pymongo.topology import (Topology,
_ErrorContext)
@ -1377,8 +1374,6 @@ class MongoClient(common.BaseObject):
try:
server = self._select_server(
read_pref, session, address=address)
if not server.description.retryable_reads_supported:
retryable = False
with self._secondaryok_for_server(read_pref, server, session) as (
sock_info, secondary_ok):
if retrying and not retryable:
@ -1561,51 +1556,10 @@ class MongoClient(common.BaseObject):
self._kill_cursor_impl(cursor_ids, address, session, sock_info)
def _kill_cursor_impl(self, cursor_ids, address, session, sock_info):
listeners = self._event_listeners
publish = listeners.enabled_for_commands
try:
namespace = address.namespace
db, coll = namespace.split('.', 1)
except AttributeError:
namespace = None
db = coll = "OP_KILL_CURSORS"
namespace = address.namespace
db, coll = namespace.split('.', 1)
spec = SON([('killCursors', coll), ('cursors', cursor_ids)])
if sock_info.max_wire_version >= 4 and namespace is not None:
sock_info.command(db, spec, session=session, client=self)
else:
if publish:
start = datetime.datetime.now()
request_id, msg = message._kill_cursors(cursor_ids)
if publish:
duration = datetime.datetime.now() - start
# Here and below, address could be a tuple or
# _CursorAddress. We always want to publish a
# tuple to match the rest of the monitoring
# API.
listeners.publish_command_start(
spec, db, request_id, tuple(address),
service_id=sock_info.service_id)
start = datetime.datetime.now()
try:
sock_info.send_message(msg, 0)
except Exception as exc:
if publish:
dur = ((datetime.datetime.now() - start) + duration)
listeners.publish_command_failure(
dur, message._convert_exception(exc),
'killCursors', request_id,
tuple(address), service_id=sock_info.service_id)
raise
if publish:
duration = ((datetime.datetime.now() - start) + duration)
# OP_KILL_CURSORS returns no reply, fake one.
reply = {'cursorsUnknown': cursor_ids, 'ok': 1}
listeners.publish_command_success(
duration, reply, 'killCursors', request_id,
tuple(address), service_id=sock_info.service_id)
sock_info.command(db, spec, session=session, client=self)
def _process_kill_cursors(self):
"""Process any pending kill cursors requests."""

View File

@ -580,7 +580,7 @@ class SocketInfo(object):
self.sock.settimeout(
self.opts.connect_timeout + heartbeat_frequency)
if self.max_wire_version >= 6 and cluster_time is not None:
if not performing_handshake and cluster_time is not None:
cmd['$clusterTime'] = cluster_time
# XXX: Simplify in PyMongo 4.0 when all_credentials is always a single
@ -615,7 +615,7 @@ class SocketInfo(object):
hello.compressors)
self.compression_context = ctx
self.op_msg_enabled = hello.max_wire_version >= 6
self.op_msg_enabled = True
if creds:
self.negotiated_mechanisms[creds] = hello.sasl_supported_mechs
if auth_ctx:
@ -687,23 +687,13 @@ class SocketInfo(object):
if not isinstance(spec, ORDERED_TYPES):
spec = SON(spec)
if (read_concern and self.max_wire_version < 4
and not read_concern.ok_for_legacy):
raise ConfigurationError(
'read concern level of %s is not valid '
'with a max wire version of %d.'
% (read_concern.level, self.max_wire_version))
if not (write_concern is None or write_concern.acknowledged or
collation is None):
raise ConfigurationError(
'Collation is unsupported for unacknowledged writes.')
if (self.max_wire_version >= 5 and
write_concern and
if (write_concern and
not write_concern.is_server_default):
spec['writeConcern'] = write_concern.document
elif self.max_wire_version < 5 and collation is not None:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use a collation.')
self.add_server_api(spec)
if session:
@ -769,25 +759,17 @@ class SocketInfo(object):
raise NotPrimaryError("not primary", {
"ok": 0, "errmsg": "not primary", "code": 10107})
def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
"""Send OP_INSERT, etc., optionally returning response as a dict.
def unack_write(self, msg, max_doc_size):
"""Send unack OP_MSG.
Can raise ConnectionFailure or OperationFailure.
Can raise ConnectionFailure or InvalidDocument.
:Parameters:
- `request_id`: an int.
- `msg`: bytes, an OP_INSERT, OP_UPDATE, or OP_DELETE message,
perhaps with a getlasterror command appended.
- `msg`: bytes, an OP_MSG message.
- `max_doc_size`: size in bytes of the largest document in `msg`.
- `with_last_error`: True if a getlasterror command is appended.
"""
self._raise_if_not_writable(not with_last_error)
self._raise_if_not_writable(True)
self.send_message(msg, max_doc_size)
if with_last_error:
reply = self.receive_message(request_id)
return helpers._check_gle_response(reply.command_response(),
self.max_wire_version)
def write_command(self, request_id, msg):
"""Send "insert" etc. command, returning response as a dict.
@ -881,7 +863,7 @@ class SocketInfo(object):
def send_cluster_time(self, command, session, client):
"""Add cluster time for MongoDB >= 3.6."""
if self.max_wire_version >= 6 and client:
if client:
client._send_cluster_time(command, session)
def add_server_api(self, command):

View File

@ -605,10 +605,9 @@ class ClientContext(object):
def require_auth(self, func):
"""Run a test only if the server is running with auth enabled."""
return self.check_auth_with_sharding(
self._require(lambda: self.auth_enabled,
"Authentication is not enabled on the server",
func=func))
return self._require(lambda: self.auth_enabled,
"Authentication is not enabled on the server",
func=func)
def require_no_auth(self, func):
"""Run a test only if the server is running without auth enabled."""
@ -706,14 +705,6 @@ class ClientContext(object):
"Must not be connected to a load balancer",
func=func)
def check_auth_with_sharding(self, func):
"""Skip a test when connected to mongos < 2.0 and running with auth."""
condition = lambda: not (self.auth_enabled and
self.is_mongos and self.version < (2,))
return self._require(condition,
"Auth with sharding requires MongoDB >= 2.0.0",
func=func)
def is_topology_type(self, topologies):
unknown = set(topologies) - {'single', 'replicaset', 'sharded',
'sharded-replicaset', 'load-balanced'}
@ -818,9 +809,7 @@ class ClientContext(object):
return False
if not self.sessions_enabled:
return False
if self.version.at_least(3, 6):
return self.is_mongos or self.is_rs
return False
return self.is_mongos or self.is_rs
def require_retryable_writes(self, func):
"""Run a test only if the deployment supports retryable writes."""

View File

@ -307,17 +307,8 @@ class TestSASLPlain(unittest.TestCase):
class TestSCRAMSHA1(IntegrationTest):
@client_context.require_auth
@client_context.require_version_min(2, 7, 2)
def setUp(self):
super(TestSCRAMSHA1, self).setUp()
# Before 2.7.7, SCRAM-SHA-1 had to be enabled from the command line.
if client_context.version < Version(2, 7, 7):
cmd_line = client_context.cmd_line
if 'SCRAM-SHA-1' not in cmd_line.get(
'parsed', {}).get('setParameter',
{}).get('authenticationMechanisms', ''):
raise SkipTest('SCRAM-SHA-1 mechanism not enabled')
client_context.create_user(
'pymongo_test', 'user', 'pass', roles=['userAdmin', 'readWrite'])

View File

@ -164,18 +164,6 @@ class TestBulk(BulkTestBase):
def test_update_many_pipeline(self):
self._test_update_many([{'$set': {'foo': 'bar'}}])
@client_context.require_version_max(3, 5, 5)
def test_array_filters_unsupported(self):
requests = [
UpdateMany(
{}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}]),
UpdateOne(
{}, {'$set': {"y.$[i].b": 2}}, array_filters=[{'i.b': 3}])
]
for bulk_op in requests:
self.assertRaises(
ConfigurationError, self.coll.bulk_write, [bulk_op])
def test_array_filters_validation(self):
self.assertRaises(TypeError, UpdateMany, {}, {}, array_filters={})
self.assertRaises(TypeError, UpdateOne, {}, {}, array_filters={})
@ -307,7 +295,6 @@ class TestBulk(BulkTestBase):
self.assertEqual(n_docs, result.inserted_count)
self.assertEqual(n_docs, self.coll.count_documents({}))
@client_context.require_version_min(3, 6)
def test_bulk_max_message_size(self):
self.coll.delete_many({})
self.addCleanup(self.coll.delete_many, {})
@ -781,38 +768,29 @@ class TestBulkWriteConcern(BulkTestBase):
cls.secondary = single_client(*partition_node(member))
break
# We tested wtimeout errors by specifying a write concern greater than
# the number of members, but in MongoDB 2.7.8+ this causes a different
# sort of error, "Not enough data-bearing nodes". In recent servers we
# use a failpoint to pause replication on a secondary.
cls.need_replication_stopped = client_context.version.at_least(2, 7, 8)
@classmethod
def tearDownClass(cls):
if cls.secondary:
cls.secondary.close()
def cause_wtimeout(self, requests, ordered):
if self.need_replication_stopped:
if not client_context.test_commands_enabled:
self.skipTest("Test commands must be enabled.")
if not client_context.test_commands_enabled:
self.skipTest("Test commands must be enabled.")
# Use the rsSyncApplyStop failpoint to pause replication on a
# secondary which will cause a wtimeout error.
self.secondary.admin.command('configureFailPoint',
'rsSyncApplyStop',
mode='alwaysOn')
try:
coll = self.coll.with_options(
write_concern=WriteConcern(w=self.w, wtimeout=1))
return coll.bulk_write(requests, ordered=ordered)
finally:
self.secondary.admin.command('configureFailPoint',
'rsSyncApplyStop',
mode='alwaysOn')
try:
coll = self.coll.with_options(
write_concern=WriteConcern(w=self.w, wtimeout=1))
return coll.bulk_write(requests, ordered=ordered)
finally:
self.secondary.admin.command('configureFailPoint',
'rsSyncApplyStop',
mode='off')
else:
coll = self.coll.with_options(
write_concern=WriteConcern(w=self.w + 1, wtimeout=1))
return coll.bulk_write(requests, ordered=ordered)
mode='off')
@client_context.require_replica_set
@client_context.require_secondaries_count(1)

View File

@ -729,17 +729,15 @@ class TestClient(IntegrationTest):
for doc in client.list_databases():
self.assertIs(type(doc), dict)
if client_context.version.at_least(3, 4, 2):
self.client.pymongo_test.test.insert_one({})
cursor = self.client.list_databases(filter={"name": "admin"})
docs = list(cursor)
self.assertEqual(1, len(docs))
self.assertEqual(docs[0]["name"], "admin")
self.client.pymongo_test.test.insert_one({})
cursor = self.client.list_databases(filter={"name": "admin"})
docs = list(cursor)
self.assertEqual(1, len(docs))
self.assertEqual(docs[0]["name"], "admin")
if client_context.version.at_least(3, 4, 3):
cursor = self.client.list_databases(nameOnly=True)
for doc in cursor:
self.assertEqual(["name"], list(doc))
cursor = self.client.list_databases(nameOnly=True)
for doc in cursor:
self.assertEqual(["name"], list(doc))
def test_list_database_names(self):
self.client.pymongo_test.test.insert_one({"dummy": "object"})
@ -763,15 +761,12 @@ class TestClient(IntegrationTest):
self.assertIn("pymongo_test2", dbs)
self.client.drop_database("pymongo_test")
if client_context.version.at_least(3, 3, 9) and client_context.is_rs:
if client_context.is_rs:
wc_client = rs_or_single_client(w=len(client_context.nodes) + 1)
with self.assertRaises(WriteConcernError):
wc_client.drop_database('pymongo_test2')
self.client.drop_database(self.client.pymongo_test2)
raise SkipTest("This test often fails due to SERVER-2329")
dbs = self.client.list_database_names()
self.assertNotIn("pymongo_test", dbs)
self.assertNotIn("pymongo_test2", dbs)

View File

@ -77,17 +77,6 @@ class TestCollationObject(unittest.TestCase):
}, Collation('en_US', backwards=True).document)
def raisesConfigurationErrorForOldMongoDB(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if client_context.version.at_least(3, 3, 9):
return func(self, *args, **kwargs)
else:
with self.assertRaises(ConfigurationError):
return func(self, *args, **kwargs)
return wrapper
class TestCollation(IntegrationTest):
@classmethod
@client_context.require_connection
@ -120,7 +109,6 @@ class TestCollation(IntegrationTest):
self.collation.document,
self.last_command_started()['collation'])
@raisesConfigurationErrorForOldMongoDB
def test_create_collection(self):
self.db.test.drop()
self.db.create_collection('test', collation=self.collation)
@ -136,7 +124,6 @@ class TestCollation(IntegrationTest):
model = IndexModel([('a', 1), ('b', -1)], collation=self.collation)
self.assertEqual(self.collation.document, model.document['collation'])
@raisesConfigurationErrorForOldMongoDB
def test_create_index(self):
self.db.test.create_index('foo', collation=self.collation)
ci_cmd = self.listener.results['started'][0].command
@ -144,18 +131,15 @@ class TestCollation(IntegrationTest):
self.collation.document,
ci_cmd['indexes'][0]['collation'])
@raisesConfigurationErrorForOldMongoDB
def test_aggregate(self):
self.db.test.aggregate([{'$group': {'_id': 42}}],
collation=self.collation)
self.assertCollationInLastCommand()
@raisesConfigurationErrorForOldMongoDB
def test_count_documents(self):
self.db.test.count_documents({}, collation=self.collation)
self.assertCollationInLastCommand()
@raisesConfigurationErrorForOldMongoDB
def test_distinct(self):
self.db.test.distinct('foo', collation=self.collation)
self.assertCollationInLastCommand()
@ -164,14 +148,12 @@ class TestCollation(IntegrationTest):
self.db.test.find(collation=self.collation).distinct('foo')
self.assertCollationInLastCommand()
@raisesConfigurationErrorForOldMongoDB
def test_find_command(self):
self.db.test.insert_one({'is this thing on?': True})
self.listener.results.clear()
next(self.db.test.find(collation=self.collation))
self.assertCollationInLastCommand()
@raisesConfigurationErrorForOldMongoDB
def test_explain_command(self):
self.listener.results.clear()
self.db.test.find(collation=self.collation).explain()
@ -180,7 +162,6 @@ class TestCollation(IntegrationTest):
self.collation.document,
self.last_command_started()['explain']['collation'])
@raisesConfigurationErrorForOldMongoDB
def test_delete(self):
self.db.test.delete_one({'foo': 42}, collation=self.collation)
command = self.listener.results['started'][0].command
@ -195,7 +176,6 @@ class TestCollation(IntegrationTest):
self.collation.document,
command['deletes'][0]['collation'])
@raisesConfigurationErrorForOldMongoDB
def test_update(self):
self.db.test.replace_one({'foo': 42}, {'foo': 43},
collation=self.collation)
@ -220,7 +200,6 @@ class TestCollation(IntegrationTest):
self.collation.document,
command['updates'][0]['collation'])
@raisesConfigurationErrorForOldMongoDB
def test_find_and(self):
self.db.test.find_one_and_delete({'foo': 42}, collation=self.collation)
self.assertCollationInLastCommand()
@ -235,7 +214,6 @@ class TestCollation(IntegrationTest):
collation=self.collation)
self.assertCollationInLastCommand()
@raisesConfigurationErrorForOldMongoDB
def test_bulk_write(self):
self.db.test.collection.bulk_write([
DeleteOne({'noCollation': 42}),
@ -266,7 +244,6 @@ class TestCollation(IntegrationTest):
check_ops(delete_cmd['deletes'])
check_ops(update_cmd['updates'])
@raisesConfigurationErrorForOldMongoDB
def test_indexes_same_keys_different_collations(self):
self.db.test.drop()
usa_collation = Collation('en_US')
@ -302,7 +279,6 @@ class TestCollation(IntegrationTest):
with self.assertRaises(ConfigurationError):
collection.bulk_write([update_one])
@raisesConfigurationErrorForOldMongoDB
def test_cursor_collation(self):
self.db.test.insert_one({'hello': 'world'})
next(self.db.test.find().collation(self.collation))

View File

@ -134,7 +134,7 @@ class TestCollection(IntegrationTest):
@contextlib.contextmanager
def write_concern_collection(self):
if client_context.version.at_least(3, 3, 9) and client_context.is_rs:
if client_context.is_rs:
with self.assertRaises(WriteConcernError):
# Unsatisfiable write concern.
yield Collection(
@ -153,7 +153,6 @@ class TestCollection(IntegrationTest):
def test_hashable(self):
self.assertIn(self.db.test.mike, {self.db["test.mike"]})
@client_context.require_version_min(3, 3, 9)
def test_create(self):
# No Exception.
db = client_context.client.pymongo_test
@ -322,9 +321,6 @@ class TestCollection(IntegrationTest):
@client_context.require_no_mongos
@client_context.require_test_commands
def test_index_management_max_time_ms(self):
if (client_context.version[:2] == (3, 4) and
client_context.version[2] < 4):
raise unittest.SkipTest("SERVER-27711")
coll = self.db.test
self.client.admin.command("configureFailPoint",
"maxTimeAlwaysTimeOut",
@ -531,21 +527,6 @@ class TestCollection(IntegrationTest):
db.test.insert_one({'i': 2}) # duplicate
db.test.insert_one({'i': 3})
@client_context.require_version_max(2, 6)
def test_index_drop_dups(self):
# Try dropping duplicates
db = self.db
self._drop_dups_setup(db)
# No error, just drop the duplicate
db.test.create_index([('i', ASCENDING)], unique=True, dropDups=True)
# Duplicate was dropped
self.assertEqual(3, db.test.count_documents({}))
# Index was created, plus the index on _id
self.assertEqual(2, len(db.test.index_information()))
def test_index_dont_drop_dups(self):
# Try *not* dropping duplicates
db = self.db
@ -587,7 +568,6 @@ class TestCollection(IntegrationTest):
return stage
return {}
@client_context.require_version_min(3, 1, 9, -1)
def test_index_filter(self):
db = self.db
db.drop_collection("test")
@ -901,10 +881,8 @@ class TestCollection(IntegrationTest):
unack_coll = self.db.test.with_options(write_concern=WriteConcern(w=0))
self.assertRaises(DocumentTooLarge, unack_coll.replace_one,
{"bar": "x"}, {"bar": "x" * (max_size - 14)})
# This will pass with OP_UPDATE or the update command.
self.db.test.replace_one({"bar": "x"}, {"bar": "x" * (max_size - 32)})
@client_context.require_version_min(3, 1, 9, -1)
def test_insert_bypass_document_validation(self):
db = self.db
db.test.drop()
@ -923,14 +901,9 @@ class TestCollection(IntegrationTest):
self.assertTrue(isinstance(result, InsertOneResult))
self.assertEqual(2, result.inserted_id)
if client_context.version < (3, 6):
# Uses OP_INSERT which does not support bypass_document_validation.
self.assertRaises(OperationFailure, db_w0.test.insert_one,
{"y": 1}, bypass_document_validation=True)
else:
db_w0.test.insert_one({"y": 1}, bypass_document_validation=True)
wait_until(lambda: db_w0.test.find_one({"y": 1}),
"find w:0 inserted document")
db_w0.test.insert_one({"y": 1}, bypass_document_validation=True)
wait_until(lambda: db_w0.test.find_one({"y": 1}),
"find w:0 inserted document")
# Test insert_many
docs = [{"_id": i, "x": 100 - i} for i in range(3, 100)]
@ -959,7 +932,6 @@ class TestCollection(IntegrationTest):
[{"x": 1}, {"x": 2}],
bypass_document_validation=True)
@client_context.require_version_min(3, 1, 9, -1)
def test_replace_bypass_document_validation(self):
db = self.db
db.test.drop()
@ -997,18 +969,11 @@ class TestCollection(IntegrationTest):
self.assertEqual(1, db.test.count_documents({"a": 103}))
db.test.insert_one({"y": 1}, bypass_document_validation=True)
if client_context.version < (3, 6):
# Uses OP_UPDATE which does not support bypass_document_validation.
self.assertRaises(OperationFailure, db_w0.test.replace_one,
{"y": 1}, {"x": 1},
bypass_document_validation=True)
else:
db_w0.test.replace_one({"y": 1}, {"x": 1},
bypass_document_validation=True)
wait_until(lambda: db_w0.test.find_one({"x": 1}),
"find w:0 replaced document")
db_w0.test.replace_one({"y": 1}, {"x": 1},
bypass_document_validation=True)
wait_until(lambda: db_w0.test.find_one({"x": 1}),
"find w:0 replaced document")
@client_context.require_version_min(3, 1, 9, -1)
def test_update_bypass_document_validation(self):
db = self.db
db.test.drop()
@ -1048,16 +1013,10 @@ class TestCollection(IntegrationTest):
self.assertEqual(1, db.test.count_documents({"z": 0}))
db.test.insert_one({"y": 1, "x": 0}, bypass_document_validation=True)
if client_context.version < (3, 6):
# Uses OP_UPDATE which does not support bypass_document_validation.
self.assertRaises(OperationFailure, db_w0.test.update_one,
{"y": 1}, {"$inc": {"x": 1}},
db_w0.test.update_one({"y": 1}, {"$inc": {"x": 1}},
bypass_document_validation=True)
else:
db_w0.test.update_one({"y": 1}, {"$inc": {"x": 1}},
bypass_document_validation=True)
wait_until(lambda: db_w0.test.find_one({"y": 1, "x": 1}),
"find w:0 updated document")
wait_until(lambda: db_w0.test.find_one({"y": 1, "x": 1}),
"find w:0 updated document")
# Test update_many
db.test.insert_many([{"z": i} for i in range(3, 101)])
@ -1094,19 +1053,12 @@ class TestCollection(IntegrationTest):
db.test.insert_one({"m": 1, "x": 0}, bypass_document_validation=True)
db.test.insert_one({"m": 1, "x": 0}, bypass_document_validation=True)
if client_context.version < (3, 6):
# Uses OP_UPDATE which does not support bypass_document_validation.
self.assertRaises(OperationFailure, db_w0.test.update_many,
{"m": 1}, {"$inc": {"x": 1}},
bypass_document_validation=True)
else:
db_w0.test.update_many({"m": 1}, {"$inc": {"x": 1}},
bypass_document_validation=True)
wait_until(
lambda: db_w0.test.count_documents({"m": 1, "x": 1}) == 2,
"find w:0 updated documents")
db_w0.test.update_many({"m": 1}, {"$inc": {"x": 1}},
bypass_document_validation=True)
wait_until(
lambda: db_w0.test.count_documents({"m": 1, "x": 1}) == 2,
"find w:0 updated documents")
@client_context.require_version_min(3, 1, 9, -1)
def test_bypass_document_validation_bulk_write(self):
db = self.db
db.test.drop()
@ -1499,24 +1451,6 @@ class TestCollection(IntegrationTest):
self.assertRaises(InvalidOperation, lambda: result.upserted_id)
self.assertFalse(result.acknowledged)
# MongoDB >= 3.5.8 allows dotted fields in updates
@client_context.require_version_max(3, 5, 7)
def test_update_with_invalid_keys(self):
self.db.drop_collection("test")
self.assertTrue(self.db.test.insert_one({"hello": "world"}))
doc = self.db.test.find_one()
doc['a.b'] = 'c'
# Replace
self.assertRaises(OperationFailure, self.db.test.replace_one,
{"hello": "world"}, doc)
# Upsert
self.assertRaises(OperationFailure, self.db.test.replace_one,
{"foo": "bar"}, doc, upsert=True)
# Check that the last two ops didn't actually modify anything
self.assertTrue('a.b' not in self.db.test.find_one())
def test_update_check_keys(self):
self.db.drop_collection("test")
self.assertTrue(self.db.test.insert_one({"hello": "world"}))
@ -2041,19 +1975,6 @@ class TestCollection(IntegrationTest):
c.insert_one({'bad': bad})
self.assertEqual('bar', c.find_one()['bad']['foo'])
@client_context.require_version_max(3, 5, 5)
def test_array_filters_unsupported(self):
c = self.db.test
with self.assertRaises(ConfigurationError):
c.update_one(
{}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}])
with self.assertRaises(ConfigurationError):
c.update_many(
{}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}])
with self.assertRaises(ConfigurationError):
c.find_one_and_update(
{}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}])
def test_array_filters_validation(self):
# array_filters must be a list.
c = self.db.test
@ -2136,56 +2057,42 @@ class TestCollection(IntegrationTest):
# Authenticate the client and throw out auth commands from the listener.
db.command('ping')
results.clear()
if client_context.version.at_least(3, 1, 9, -1):
c_w0.find_one_and_update(
c_w0.find_one_and_update(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_delete({'_id': 1})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
# Test write concern errors.
if client_context.is_rs:
c_wc_error = db.get_collection(
'test',
write_concern=WriteConcern(
w=len(client_context.nodes) + 1))
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_update,
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertEqual(
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_replace,
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
self.assertEqual(
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_delete,
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_delete({'_id': 1})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
# Test write concern errors.
if client_context.is_rs:
c_wc_error = db.get_collection(
'test',
write_concern=WriteConcern(
w=len(client_context.nodes) + 1))
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_update,
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_replace,
{'w': 0}, results['started'][0].command['writeConcern'])
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_delete,
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
else:
c_w0.find_one_and_update(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_w0.find_one_and_delete({'_id': 1})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_default.find_one_and_update({'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()

View File

@ -122,17 +122,6 @@ def create_test(scenario_def, test):
tuple(coll.find(**args))
except OperationFailure:
pass
# Wait for the killCursors thread to run if necessary.
if 'limit' in args and client_context.version[:2] < (3, 1):
self.client._kill_cursors_executor.wake()
started = self.listener.results['started']
succeeded = self.listener.results['succeeded']
wait_until(
lambda: started[-1].command_name == 'killCursors',
"publish a start event for killCursors.")
wait_until(
lambda: succeeded[-1].command_name == 'killCursors',
"publish a succeeded event for killCursors.")
else:
try:
getattr(coll, name)(**args)

View File

@ -198,7 +198,6 @@ class TestCursor(IntegrationTest):
"maxTimeAlwaysTimeOut",
mode="off")
@client_context.require_version_min(3, 1, 9, -1)
def test_max_await_time_ms(self):
db = self.db
db.pymongo_test.drop()
@ -573,12 +572,8 @@ class TestCursor(IntegrationTest):
cur = db.test.find().batch_size(1)
next(cur)
if client_context.version.at_least(3, 1, 9):
# find command batchSize should be 1
self.assertEqual(0, len(cur._Cursor__data))
else:
# OP_QUERY ntoreturn should be 2
self.assertEqual(1, len(cur._Cursor__data))
# find command batchSize should be 1
self.assertEqual(0, len(cur._Cursor__data))
next(cur)
self.assertEqual(0, len(cur._Cursor__data))
next(cur)
@ -1169,27 +1164,19 @@ class TestCursor(IntegrationTest):
@client_context.require_no_mongos
def test_comment(self):
# MongoDB 3.1.5 changed the ns for commands.
regex = {'$regex': r'pymongo_test.(\$cmd|test)'}
if client_context.version.at_least(3, 5, 8, -1):
query_key = "command.comment"
elif client_context.version.at_least(3, 1, 8, -1):
query_key = "query.comment"
else:
query_key = "query.$comment"
self.client.drop_database(self.db)
self.db.command('profile', 2) # Profile ALL commands.
try:
list(self.db.test.find().comment('foo'))
count = self.db.system.profile.count_documents(
{'ns': 'pymongo_test.test', 'op': 'query', query_key: 'foo'})
{'ns': 'pymongo_test.test', 'op': 'query',
'command.comment': 'foo'})
self.assertEqual(count, 1)
self.db.test.find().comment('foo').distinct('type')
count = self.db.system.profile.count_documents(
{'ns': regex, 'op': 'command', 'command.distinct': 'test',
{'ns': 'pymongo_test.test', 'op': 'command',
'command.distinct': 'test',
'command.comment': 'foo'})
self.assertEqual(count, 1)
finally:
@ -1266,7 +1253,6 @@ class TestCursor(IntegrationTest):
cursor = Cursor.__new__(Cursor) # Skip calling __init__
cursor.__del__() # no error
@client_context.require_version_min(3, 6)
def test_getMore_does_not_send_readPreference(self):
listener = AllowListEventListener('find', 'getMore')
client = rs_or_single_client(
@ -1408,16 +1394,9 @@ class TestRawBatchCursor(IntegrationTest):
with self.assertRaises(InvalidOperation):
self.db.test.find_raw_batches()[0]
@client_context.require_version_min(3, 4)
def test_collation(self):
next(self.db.test.find_raw_batches(collation=Collation('en_US')))
@client_context.require_version_max(3, 2)
def test_collation_error(self):
with self.assertRaises(ConfigurationError):
next(self.db.test.find_raw_batches(collation=Collation('en_US')))
@client_context.require_version_min(3, 2)
@client_context.require_no_mmap # MMAPv1 does not support read concern
def test_read_concern(self):
self.db.get_collection(
@ -1425,12 +1404,6 @@ class TestRawBatchCursor(IntegrationTest):
c = self.db.get_collection("test", read_concern=ReadConcern("majority"))
next(c.find_raw_batches())
@client_context.require_version_max(3, 1)
def test_read_concern_error(self):
c = self.db.get_collection("test", read_concern=ReadConcern("majority"))
with self.assertRaises(ConfigurationError):
next(c.find_raw_batches())
def test_monitoring(self):
listener = EventListener()
client = rs_or_single_client(event_listeners=[listener])
@ -1588,15 +1561,9 @@ class TestRawBatchCommandCursor(IntegrationTest):
with self.assertRaises(InvalidOperation):
self.db.test.aggregate_raw_batches([])[0]
@client_context.require_version_min(3, 4)
def test_collation(self):
next(self.db.test.aggregate_raw_batches([], collation=Collation('en_US')))
@client_context.require_version_max(3, 2)
def test_collation_error(self):
with self.assertRaises(ConfigurationError):
next(self.db.test.aggregate_raw_batches([], collation=Collation('en_US')))
def test_monitoring(self):
listener = EventListener()
client = rs_or_single_client(event_listeners=[listener])

View File

@ -790,7 +790,6 @@ class ChangeStreamsWCustomTypesTestMixin(object):
class TestCollectionChangeStreamsWCustomTypes(
IntegrationTest, ChangeStreamsWCustomTypesTestMixin):
@classmethod
@client_context.require_version_min(3, 6, 0)
@client_context.require_no_mmap
@client_context.require_no_standalone
def setUpClass(cls):

View File

@ -204,11 +204,8 @@ class TestDatabase(IntegrationTest):
self.assertIn("capped", names)
self.assertIn("non_capped", names)
command = results["started"][0].command
if client_context.version >= (3, 0):
self.assertIn("nameOnly", command)
self.assertTrue(command["nameOnly"])
else:
self.assertNotIn("nameOnly", command)
self.assertIn("nameOnly", command)
self.assertTrue(command["nameOnly"])
def test_list_collections(self):
self.client.drop_database("pymongo_test")
@ -324,7 +321,7 @@ class TestDatabase(IntegrationTest):
db.drop_collection(db.test.doesnotexist)
if client_context.version.at_least(3, 3, 9) and client_context.is_rs:
if client_context.is_rs:
db_wc = Database(self.client, 'pymongo_test',
write_concern=IMPOSSIBLE_WRITE_CONCERN)
with self.assertRaises(WriteConcernError):
@ -377,19 +374,15 @@ class TestDatabase(IntegrationTest):
self.assertEqualReply(second, third)
# We use 'aggregate' as our example command, since it's an easy way to
# retrieve a BSON regex from a collection using a command. But until
# MongoDB 2.3.2, aggregation turned regexes into strings: SERVER-6470.
# Note: MongoDB 3.5.2 requires the 'cursor' or 'explain' option for
# aggregate.
@client_context.require_version_max(3, 5, 0)
# retrieve a BSON regex from a collection using a command.
def test_command_with_regex(self):
db = self.client.pymongo_test
db.test.drop()
db.test.insert_one({'r': re.compile('.*')})
db.test.insert_one({'r': Regex('.*')})
result = db.command('aggregate', 'test', pipeline=[])
for doc in result['result']:
result = db.command('aggregate', 'test', pipeline=[], cursor={})
for doc in result['cursor']['firstBatch']:
self.assertTrue(isinstance(doc['r'], Regex))
def test_password_digest(self):
@ -647,13 +640,11 @@ class TestDatabaseAggregation(IntegrationTest):
self.result = {"dummy": "dummy field"}
self.admin = self.client.admin
@client_context.require_version_min(3, 6, 0)
def test_database_aggregation(self):
with self.admin.aggregate(self.pipeline) as cursor:
result = next(cursor)
self.assertEqual(result, self.result)
@client_context.require_version_min(3, 6, 0)
@client_context.require_no_mongos
def test_database_aggregation_fake_cursor(self):
coll_name = "test_output"
@ -680,13 +671,6 @@ class TestDatabaseAggregation(IntegrationTest):
result = wait_until(output_coll.find_one, "read unacknowledged write")
self.assertEqual(result["dummy"], self.result["dummy"])
@client_context.require_version_max(3, 6, 0, -1)
def test_database_aggregation_unsupported(self):
err_msg = r"Database.aggregate\(\) is only supported on MongoDB 3.6\+."
with self.assertRaisesRegex(ConfigurationError, err_msg):
with self.admin.aggregate(self.pipeline) as _:
pass
def test_bool(self):
with self.assertRaises(NotImplementedError):
bool(Database(self.client, "test"))

View File

@ -27,10 +27,6 @@ from test import client_context, unittest
class TestDecimal128(unittest.TestCase):
def test_round_trip(self):
if not client_context.version.at_least(3, 3, 6):
raise unittest.SkipTest(
'Round trip test requires MongoDB >= 3.3.6')
coll = client_context.client.pymongo_test.test
coll.drop()

View File

@ -479,7 +479,6 @@ class TestSpec(SpecRunner):
@classmethod
@unittest.skipUnless(_HAVE_PYMONGOCRYPT, 'pymongocrypt is not installed')
@client_context.require_version_min(3, 6) # SpecRunner requires sessions.
def setUpClass(cls):
super(TestSpec, cls).setUpClass()

View File

@ -660,7 +660,6 @@ class TestSampleShellCommands(IntegrationTest):
self.assertEqual(db.inventory.count_documents({}), 0)
@client_context.require_version_min(3, 5, 11)
@client_context.require_replica_set
@client_context.require_no_mmap
def test_change_streams(self):
@ -762,34 +761,31 @@ class TestSampleShellCommands(IntegrationTest):
])
# End Aggregation Example 3
# $lookup was new in 3.2. The let and pipeline options
# were added in 3.6.
if client_context.version.at_least(3, 6, 0):
# Start Aggregation Example 4
db.air_alliances.aggregate([
{"$lookup": {
"from": "air_airlines",
"let": {"constituents": "$airlines"},
"pipeline": [
{"$match": {"$expr": {"$in": ["$name", "$$constituents"]}}}
],
"as": "airlines"
}
},
{"$project": {
"_id": 0,
"name": 1,
"airlines": {
"$filter": {
"input": "$airlines",
"as": "airline",
"cond": {"$eq": ["$$airline.country", "Canada"]}
}
# Start Aggregation Example 4
db.air_alliances.aggregate([
{"$lookup": {
"from": "air_airlines",
"let": {"constituents": "$airlines"},
"pipeline": [
{"$match": {"$expr": {"$in": ["$name", "$$constituents"]}}}
],
"as": "airlines"
}
},
{"$project": {
"_id": 0,
"name": 1,
"airlines": {
"$filter": {
"input": "$airlines",
"as": "airline",
"cond": {"$eq": ["$$airline.country", "Canada"]}
}
}
}
])
# End Aggregation Example 4
}
])
# End Aggregation Example 4
def test_commands(self):
db = self.db
@ -817,7 +813,6 @@ class TestSampleShellCommands(IntegrationTest):
)
# End Index Example 1
@client_context.require_version_min(3, 6, 0)
@client_context.require_replica_set
def test_misc(self):
# Marketing examples
@ -1069,7 +1064,6 @@ class TestTransactionExamples(IntegrationTest):
class TestCausalConsistencyExamples(IntegrationTest):
@client_context.require_version_min(3, 6, 0)
@client_context.require_secondaries_count(1)
@client_context.require_no_mmap
def test_causal_consistency(self):

View File

@ -116,7 +116,6 @@ class TestMaxStaleness(unittest.TestCase):
self.assertEqual(-1, client.read_preference.max_staleness)
self.assertIn("must be a positive integer", str(ctx[0]))
@client_context.require_version_min(3, 3, 6) # SERVER-8858
@client_context.require_replica_set
def test_last_write_date(self):
# From max-staleness-tests.rst, "Parse lastWriteDate".
@ -138,12 +137,6 @@ class TestMaxStaleness(unittest.TestCase):
self.assertGreater(second, first)
self.assertLess(second, first + 10)
@client_context.require_version_max(3, 3)
def test_last_write_date_absent(self):
# From max-staleness-tests.rst, "Absent lastWriteDate".
client = rs_or_single_client()
sd = client._topology.select_server(writable_server_selector)
self.assertIsNone(sd.description.last_write_date)
if __name__ == "__main__":
unittest.main()

View File

@ -860,7 +860,6 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(6, count)
def test_insert_many_unacknowledged(self):
# On legacy servers this uses bulk OP_INSERT.
coll = self.client.pymongo_test.test
coll.drop()
unack_coll = coll.with_options(write_concern=WriteConcern(w=0))
@ -996,29 +995,6 @@ class TestCommandMonitoring(IntegrationTest):
self.assertEqual(event.failure['code'], 10107)
self.assertTrue(event.failure['errmsg'])
@client_context.require_version_max(3, 4, 99)
def test_bulk_write_legacy_network_error(self):
self.listener.results.clear()
# Make the delete operation run on a closed connection.
self.client.admin.command('ping')
pool = get_pool(self.client)
sock_info = pool.sockets[0]
sock_info.sock.close()
# Test legacy unacknowledged write network error.
coll = self.client.pymongo_test.get_collection(
'test', write_concern=WriteConcern(w=0))
with self.assertRaises(AutoReconnect):
coll.bulk_write([InsertOne({'_id': 1})], ordered=False)
failed = self.listener.results['failed']
self.assertEqual(1, len(failed))
event = failed[0]
self.assertEqual(event.command_name, 'insert')
self.assertIsInstance(event.failure, dict)
self.assertEqual(event.failure['errtype'], 'AutoReconnect')
self.assertTrue(event.failure['errmsg'])
def test_write_errors(self):
coll = self.client.pymongo_test.test
coll.drop()

View File

@ -15,7 +15,7 @@
"""Test the read_concern module."""
from bson.son import SON
from pymongo.errors import ConfigurationError
from pymongo.errors import OperationFailure
from pymongo.read_concern import ReadConcern
from test import client_context, IntegrationTest
@ -64,17 +64,13 @@ class TestReadConcern(IntegrationTest):
client = rs_or_single_client(uri, connect=False)
self.assertEqual(ReadConcern('majority'), client.read_concern)
@client_context.require_version_max(3, 1)
def test_invalid_read_concern(self):
coll = self.db.get_collection(
'coll', read_concern=ReadConcern('majority'))
with self.assertRaisesRegex(
ConfigurationError,
'read concern level of majority is not valid '
'with a max wire version of [0-3]'):
'coll', read_concern=ReadConcern('unknown'))
# We rely on the server to validate read concern.
with self.assertRaises(OperationFailure):
coll.find_one()
@client_context.require_version_min(3, 1, 9, -1)
def test_find_command(self):
# readConcern not sent in command if not specified.
coll = self.db.coll
@ -93,7 +89,6 @@ class TestReadConcern(IntegrationTest):
('readConcern', {'level': 'local'})]),
self.listener.results['started'][0].command)
@client_context.require_version_min(3, 1, 9, -1)
def test_command_cursor(self):
# readConcern not sent in command if not specified.
coll = self.db.coll

View File

@ -579,8 +579,6 @@ class TestMongosAndReadPreference(IntegrationTest):
self.assertEqual(
out, SON([("$query", {}), ("$readPreference", pref.document)]))
# Require OP_MSG so that $readPreference is visible in the command event.
@client_context.require_version_min(3, 6)
def test_send_hedge(self):
cases = {
'primaryPreferred': PrimaryPreferred,
@ -697,7 +695,6 @@ class TestMongosAndReadPreference(IntegrationTest):
self.assertEqual(last_id, results[0]["_id"])
@client_context.require_mongos
@client_context.require_version_min(3, 3, 12)
def test_mongos_max_staleness(self):
# Sanity check that we're sending maxStalenessSeconds
coll = client_context.client.pymongo_test.get_collection(

View File

@ -123,8 +123,6 @@ class TestReadWriteConcernSpec(IntegrationTest):
('delete_many', lambda: coll.delete_many({})),
('bulk_write', lambda: coll.bulk_write([InsertOne({})])),
('command', insert_command),
]
ops_require_34 = [
('aggregate', lambda: coll.aggregate([{'$out': 'out'}])),
# SERVER-46668 Delete all the documents in the collection to
# workaround a hang in createIndexes.
@ -136,11 +134,9 @@ class TestReadWriteConcernSpec(IntegrationTest):
('rename', lambda: coll.rename('new')),
('drop', lambda: db.new.drop()),
]
if client_context.version > (3, 4):
ops.extend(ops_require_34)
# SERVER-47194: dropDatabase does not respect wtimeout in 3.6.
if client_context.version[:2] != (3, 6):
ops.append(('drop_database', lambda: client.drop_database(db)))
# SERVER-47194: dropDatabase does not respect wtimeout in 3.6.
if client_context.version[:2] != (3, 6):
ops.append(('drop_database', lambda: client.drop_database(db)))
for name, f in ops:
# Ensure insert_many and bulk_write still raise BulkWriteError.
@ -161,8 +157,6 @@ class TestReadWriteConcernSpec(IntegrationTest):
self.assertWriteOpsRaise(
WriteConcern(w=client_context.w+1, wtimeout=1), WriteConcernError)
# MongoDB 3.2 introduced the stopReplProducer failpoint.
@client_context.require_version_min(3, 2)
@client_context.require_secondaries_count(1)
@client_context.require_test_commands
def test_raise_wtimeout(self):

View File

@ -165,7 +165,6 @@ class TestRetryableWritesMMAPv1(IgnoreDeprecationsTest):
cls.client.close()
super(TestRetryableWritesMMAPv1, cls).tearDownClass()
@client_context.require_version_min(3, 5)
@client_context.require_no_standalone
def test_actionable_error_message(self):
if client_context.storage_engine != 'mmapv1':
@ -202,15 +201,13 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
super(TestRetryableWrites, cls).tearDownClass()
def setUp(self):
if (client_context.version.at_least(3, 5) and client_context.is_rs
and client_context.test_commands_enabled):
if client_context.is_rs and client_context.test_commands_enabled:
self.client.admin.command(SON([
('configureFailPoint', 'onPrimaryTransactionalWrite'),
('mode', 'alwaysOn')]))
def tearDown(self):
if (client_context.version.at_least(3, 5) and client_context.is_rs
and client_context.test_commands_enabled):
if client_context.is_rs and client_context.test_commands_enabled:
self.client.admin.command(SON([
('configureFailPoint', 'onPrimaryTransactionalWrite'),
('mode', 'off')]))
@ -230,7 +227,6 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
'txnNumber', event.command,
'%s sent txnNumber with %s' % (msg, event.command_name))
@client_context.require_version_min(3, 5)
@client_context.require_no_standalone
def test_supported_single_statement_supported_cluster(self):
for method, args, kwargs in retryable_single_statement_ops(
@ -271,8 +267,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
initial_transaction_id, msg)
def test_supported_single_statement_unsupported_cluster(self):
if client_context.version.at_least(3, 5) and (
client_context.is_rs or client_context.is_mongos):
if client_context.is_rs or client_context.is_mongos:
raise SkipTest('This cluster supports retryable writes')
for method, args, kwargs in retryable_single_statement_ops(
@ -319,7 +314,6 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
method(*args, **kwargs)
self.assertEqual(len(listener.results['started']), 0, msg)
@client_context.require_version_min(3, 5)
@client_context.require_replica_set
@client_context.require_test_commands
def test_retry_timeout_raises_original_error(self):
@ -352,7 +346,6 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
method(*args, **kwargs)
self.assertEqual(len(listener.results['started']), 1, msg)
@client_context.require_version_min(3, 5)
@client_context.require_replica_set
@client_context.require_test_commands
def test_batch_splitting(self):
@ -388,7 +381,6 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
}
self.assertEqual(bulk_result.bulk_api_result, expected_result)
@client_context.require_version_min(3, 5)
@client_context.require_replica_set
@client_context.require_test_commands
def test_batch_splitting_retry_fails(self):
@ -572,7 +564,6 @@ class TestPoolPausedError(IntegrationTest):
# TODO: Make this a real integration test where we stepdown the primary.
class TestRetryableWritesTxnNumber(IgnoreDeprecationsTest):
@client_context.require_version_min(3, 6)
@client_context.require_replica_set
@client_context.require_no_mmap
def test_increment_transaction_id_without_sending_command(self):

View File

@ -1020,14 +1020,6 @@ class TestCausalConsistency(unittest.TestCase):
self.assertIsNone(after_cluster_time)
class TestSessionsNotSupported(IntegrationTest):
@client_context.require_version_max(3, 5, 10)
def test_sessions_not_supported(self):
with self.assertRaisesRegex(
ConfigurationError, "Sessions are not supported"):
self.client.start_session()
class TestClusterTime(IntegrationTest):
def setUp(self):
super(TestClusterTime, self).setUp()

View File

@ -513,19 +513,14 @@ class TestSSL(IntegrationTest):
tlsCertificateKeyFile=CLIENT_PEM,
event_listeners=[listener])
if client_context.version.at_least(3, 3, 12):
# No error
auth.pymongo_test.test.find_one()
names = listener.started_command_names()
if client_context.version.at_least(4, 4, -1):
# Speculative auth skips the authenticate command.
self.assertEqual(names, ['find'])
else:
self.assertEqual(names, ['authenticate', 'find'])
# No error
auth.pymongo_test.test.find_one()
names = listener.started_command_names()
if client_context.version.at_least(4, 4, -1):
# Speculative auth skips the authenticate command.
self.assertEqual(names, ['find'])
else:
# Should require a username
with self.assertRaises(ConfigurationError):
auth.pymongo_test.test.find_one()
self.assertEqual(names, ['authenticate', 'find'])
uri = ('mongodb://%s@%s:%d/?authMechanism='
'MONGODB-X509' % (
@ -542,14 +537,8 @@ class TestSSL(IntegrationTest):
ssl=True,
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CLIENT_PEM)
if client_context.version.at_least(3, 3, 12):
# No error
client.pymongo_test.test.find_one()
else:
# Should require a username
with self.assertRaises(ConfigurationError):
client.pymongo_test.test.find_one()
# No error
client.pymongo_test.test.find_one()
# Auth should fail if username and certificate do not match
uri = ('mongodb://%s@%s:%d/?authMechanism='
'MONGODB-X509' % (

View File

@ -522,10 +522,11 @@ class TestMultiServerTopology(TopologyTest):
'setName': 'rs',
'hosts': ['a'],
'minWireVersion': 1,
'maxWireVersion': 5})
'maxWireVersion': 6})
self.assertEqual(server.description.min_wire_version, 1)
self.assertEqual(server.description.max_wire_version, 5)
self.assertEqual(server.description.max_wire_version, 6)
t.select_servers(any_server_selector)
# Incompatible.
got_hello(t, address, {