PYTHON-1791 Fix reference counting leaks
Fix batched op_msg/op_query reference leak of overflow doc. Fix theoretically possible (but practically impossible) reference leak of $clusterTime in op_query. Optimization: Don't encode document past the batch size in batched op_query.
This commit is contained in:
parent
92ddc09b7e
commit
cd787dbb2c
@ -346,9 +346,9 @@ static int _load_object(PyObject** object, char* module_name, char* object_name)
|
||||
*
|
||||
* Returns non-zero on failure. */
|
||||
static int _load_python_objects(PyObject* module) {
|
||||
PyObject* empty_string;
|
||||
PyObject* re_compile;
|
||||
PyObject* compiled;
|
||||
PyObject* empty_string = NULL;
|
||||
PyObject* re_compile = NULL;
|
||||
PyObject* compiled = NULL;
|
||||
struct module_state *state = GETSTATE(module);
|
||||
|
||||
if (_load_object(&state->Binary, "bson.binary", "Binary") ||
|
||||
|
||||
@ -87,6 +87,8 @@ Changes in Version 3.8.0.dev0
|
||||
- Changes can now be requested from a ``ChangeStream`` cursor without blocking
|
||||
indefinitely using the new
|
||||
:meth:`pymongo.change_stream.ChangeStream.try_next` method.
|
||||
- Fixed a reference leak bug when splitting a batched write command based on
|
||||
maxWriteBatchSize or the max message size.
|
||||
|
||||
Issues Resolved
|
||||
...............
|
||||
|
||||
@ -434,7 +434,7 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
buffer_t buffer;
|
||||
int length_location, message_length;
|
||||
unsigned char check_keys = 0;
|
||||
PyObject* result;
|
||||
PyObject* result = NULL;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "Iet#iiOOO&|b",
|
||||
&flags,
|
||||
@ -477,18 +477,14 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
/* PyDict_GetItemString returns a borrowed reference. */
|
||||
Py_INCREF(cluster_time);
|
||||
if (-1 == PyMapping_DelItemString(query, "$clusterTime")) {
|
||||
destroy_codec_options(&options);
|
||||
PyMem_Free(collection_name);
|
||||
return NULL;
|
||||
goto fail;
|
||||
}
|
||||
}
|
||||
} else if (PyMapping_HasKeyString(query, "$clusterTime")) {
|
||||
cluster_time = PyMapping_GetItemString(query, "$clusterTime");
|
||||
if (!cluster_time
|
||||
|| -1 == PyMapping_DelItemString(query, "$clusterTime")) {
|
||||
destroy_codec_options(&options);
|
||||
PyMem_Free(collection_name);
|
||||
return NULL;
|
||||
goto fail;
|
||||
}
|
||||
}
|
||||
if (!buffer_write_int32(buffer, (int32_t)request_id) ||
|
||||
@ -498,20 +494,12 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
collection_name_length + 1) ||
|
||||
!buffer_write_int32(buffer, (int32_t)num_to_skip) ||
|
||||
!buffer_write_int32(buffer, (int32_t)num_to_return)) {
|
||||
destroy_codec_options(&options);
|
||||
buffer_free(buffer);
|
||||
PyMem_Free(collection_name);
|
||||
Py_XDECREF(cluster_time);
|
||||
return NULL;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
begin = buffer_get_position(buffer);
|
||||
if (!write_dict(state->_cbson, buffer, query, check_keys, &options, 1)) {
|
||||
destroy_codec_options(&options);
|
||||
buffer_free(buffer);
|
||||
PyMem_Free(collection_name);
|
||||
Py_XDECREF(cluster_time);
|
||||
return NULL;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* back up a byte and write $clusterTime */
|
||||
@ -522,19 +510,11 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
buffer_update_position(buffer, buffer_get_position(buffer) - 1);
|
||||
if (!write_pair(state->_cbson, buffer, "$clusterTime", 12, cluster_time,
|
||||
0, &options, 1)) {
|
||||
destroy_codec_options(&options);
|
||||
buffer_free(buffer);
|
||||
PyMem_Free(collection_name);
|
||||
Py_DECREF(cluster_time);
|
||||
return NULL;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (!buffer_write_bytes(buffer, &zero, 1)) {
|
||||
destroy_codec_options(&options);
|
||||
buffer_free(buffer);
|
||||
PyMem_Free(collection_name);
|
||||
Py_DECREF(cluster_time);
|
||||
return NULL;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
length = buffer_get_position(buffer) - begin;
|
||||
@ -543,14 +523,10 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
/* undo popping $clusterTime */
|
||||
if (-1 == PyMapping_SetItemString(
|
||||
query, "$clusterTime", cluster_time)) {
|
||||
destroy_codec_options(&options);
|
||||
buffer_free(buffer);
|
||||
PyMem_Free(collection_name);
|
||||
Py_DECREF(cluster_time);
|
||||
return NULL;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
Py_DECREF(cluster_time);
|
||||
Py_CLEAR(cluster_time);
|
||||
}
|
||||
|
||||
max_size = buffer_get_position(buffer) - begin;
|
||||
@ -559,17 +535,12 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
begin = buffer_get_position(buffer);
|
||||
if (!write_dict(state->_cbson, buffer, field_selector, 0,
|
||||
&options, 1)) {
|
||||
destroy_codec_options(&options);
|
||||
buffer_free(buffer);
|
||||
PyMem_Free(collection_name);
|
||||
return NULL;
|
||||
goto fail;
|
||||
}
|
||||
cur_size = buffer_get_position(buffer) - begin;
|
||||
max_size = (cur_size > max_size) ? cur_size : max_size;
|
||||
}
|
||||
|
||||
PyMem_Free(collection_name);
|
||||
|
||||
message_length = buffer_get_position(buffer) - length_location;
|
||||
buffer_write_int32_at_position(
|
||||
buffer, length_location, (int32_t)message_length);
|
||||
@ -579,8 +550,12 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
|
||||
buffer_get_buffer(buffer),
|
||||
buffer_get_position(buffer),
|
||||
max_size);
|
||||
|
||||
fail:
|
||||
PyMem_Free(collection_name);
|
||||
destroy_codec_options(&options);
|
||||
buffer_free(buffer);
|
||||
Py_XDECREF(cluster_time);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -1142,11 +1117,11 @@ _batched_op_msg(
|
||||
int size_location;
|
||||
int position;
|
||||
int length;
|
||||
PyObject* max_bson_size_obj;
|
||||
PyObject* max_write_batch_size_obj;
|
||||
PyObject* max_message_size_obj;
|
||||
PyObject* doc;
|
||||
PyObject* iterator;
|
||||
PyObject* max_bson_size_obj = NULL;
|
||||
PyObject* max_write_batch_size_obj = NULL;
|
||||
PyObject* max_message_size_obj = NULL;
|
||||
PyObject* doc = NULL;
|
||||
PyObject* iterator = NULL;
|
||||
char* flags = ack ? "\x00\x00\x00\x00" : "\x02\x00\x00\x00";
|
||||
|
||||
max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size");
|
||||
@ -1209,7 +1184,7 @@ _batched_op_msg(
|
||||
case _INSERT:
|
||||
{
|
||||
if (!buffer_write_bytes(buffer, "documents\x00", 10))
|
||||
goto cmdfail;
|
||||
goto fail;
|
||||
break;
|
||||
}
|
||||
case _UPDATE:
|
||||
@ -1217,7 +1192,7 @@ _batched_op_msg(
|
||||
/* MongoDB does key validation for update. */
|
||||
check_keys = 0;
|
||||
if (!buffer_write_bytes(buffer, "updates\x00", 8))
|
||||
goto cmdfail;
|
||||
goto fail;
|
||||
break;
|
||||
}
|
||||
case _DELETE:
|
||||
@ -1225,7 +1200,7 @@ _batched_op_msg(
|
||||
/* Never check keys in a delete command. */
|
||||
check_keys = 0;
|
||||
if (!buffer_write_bytes(buffer, "deletes\x00", 8))
|
||||
goto cmdfail;
|
||||
goto fail;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
@ -1255,7 +1230,7 @@ _batched_op_msg(
|
||||
int unacked_doc_too_large = 0;
|
||||
if (!write_dict(state->_cbson, buffer, doc, check_keys,
|
||||
&options, 1)) {
|
||||
goto cmditerfail;
|
||||
goto fail;
|
||||
}
|
||||
cur_size = buffer_get_position(buffer) - cur_doc_begin;
|
||||
|
||||
@ -1285,7 +1260,7 @@ _batched_op_msg(
|
||||
Py_DECREF(DocumentTooLarge);
|
||||
}
|
||||
}
|
||||
goto cmditerfail;
|
||||
goto fail;
|
||||
}
|
||||
/* We have enough data, return this batch. */
|
||||
if (buffer_get_position(buffer) > max_message_size) {
|
||||
@ -1294,10 +1269,11 @@ _batched_op_msg(
|
||||
* of the last document encoded.
|
||||
*/
|
||||
buffer_update_position(buffer, cur_doc_begin);
|
||||
Py_CLEAR(doc);
|
||||
break;
|
||||
}
|
||||
if (PyList_Append(to_publish, doc) < 0) {
|
||||
goto cmditerfail;
|
||||
goto fail;
|
||||
}
|
||||
Py_CLEAR(doc);
|
||||
idx += 1;
|
||||
@ -1306,10 +1282,10 @@ _batched_op_msg(
|
||||
break;
|
||||
}
|
||||
}
|
||||
Py_DECREF(iterator);
|
||||
Py_CLEAR(iterator);
|
||||
|
||||
if (PyErr_Occurred()) {
|
||||
goto cmdfail;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
position = buffer_get_position(buffer);
|
||||
@ -1317,10 +1293,9 @@ _batched_op_msg(
|
||||
buffer_write_int32_at_position(buffer, size_location, (int32_t)length);
|
||||
return 1;
|
||||
|
||||
cmditerfail:
|
||||
fail:
|
||||
Py_XDECREF(doc);
|
||||
Py_DECREF(iterator);
|
||||
cmdfail:
|
||||
Py_XDECREF(iterator);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1466,10 +1441,10 @@ _batched_write_command(
|
||||
int lst_len_loc;
|
||||
int position;
|
||||
int length;
|
||||
PyObject* max_bson_size_obj;
|
||||
PyObject* max_write_batch_size_obj;
|
||||
PyObject* doc;
|
||||
PyObject* iterator;
|
||||
PyObject* max_bson_size_obj = NULL;
|
||||
PyObject* max_write_batch_size_obj = NULL;
|
||||
PyObject* doc = NULL;
|
||||
PyObject* iterator = NULL;
|
||||
|
||||
max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size");
|
||||
#if PY_MAJOR_VERSION >= 3
|
||||
@ -1524,7 +1499,7 @@ _batched_write_command(
|
||||
case _INSERT:
|
||||
{
|
||||
if (!buffer_write_bytes(buffer, "documents\x00", 10))
|
||||
goto cmdfail;
|
||||
goto fail;
|
||||
break;
|
||||
}
|
||||
case _UPDATE:
|
||||
@ -1532,7 +1507,7 @@ _batched_write_command(
|
||||
/* MongoDB does key validation for update. */
|
||||
check_keys = 0;
|
||||
if (!buffer_write_bytes(buffer, "updates\x00", 8))
|
||||
goto cmdfail;
|
||||
goto fail;
|
||||
break;
|
||||
}
|
||||
case _DELETE:
|
||||
@ -1540,7 +1515,7 @@ _batched_write_command(
|
||||
/* Never check keys in a delete command. */
|
||||
check_keys = 0;
|
||||
if (!buffer_write_bytes(buffer, "deletes\x00", 8))
|
||||
goto cmdfail;
|
||||
goto fail;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
@ -1575,25 +1550,23 @@ _batched_write_command(
|
||||
int cur_doc_begin;
|
||||
int cur_size;
|
||||
int enough_data = 0;
|
||||
int enough_documents = 0;
|
||||
char key[16];
|
||||
INT2STRING(key, idx);
|
||||
if (!buffer_write_bytes(buffer, "\x03", 1) ||
|
||||
!buffer_write_bytes(buffer, key, (int)strlen(key) + 1)) {
|
||||
goto cmditerfail;
|
||||
goto fail;
|
||||
}
|
||||
cur_doc_begin = buffer_get_position(buffer);
|
||||
if (!write_dict(state->_cbson, buffer, doc,
|
||||
check_keys, &options, 1)) {
|
||||
goto cmditerfail;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* We have enough data, return this batch.
|
||||
* max_cmd_size accounts for the two trailing null bytes.
|
||||
*/
|
||||
enough_data = (buffer_get_position(buffer) > max_cmd_size);
|
||||
enough_documents = (idx >= max_write_batch_size);
|
||||
if (enough_data || enough_documents) {
|
||||
if (enough_data) {
|
||||
cur_size = buffer_get_position(buffer) - cur_doc_begin;
|
||||
|
||||
/* This single document is too large for the command. */
|
||||
@ -1614,30 +1587,35 @@ _batched_write_command(
|
||||
Py_DECREF(DocumentTooLarge);
|
||||
}
|
||||
}
|
||||
goto cmditerfail;
|
||||
goto fail;
|
||||
}
|
||||
/*
|
||||
* Roll the existing buffer back to the beginning
|
||||
* of the last document encoded.
|
||||
*/
|
||||
buffer_update_position(buffer, sub_doc_begin);
|
||||
Py_CLEAR(doc);
|
||||
break;
|
||||
}
|
||||
if (PyList_Append(to_publish, doc) < 0) {
|
||||
goto cmditerfail;
|
||||
goto fail;
|
||||
}
|
||||
Py_CLEAR(doc);
|
||||
idx += 1;
|
||||
/* We have enough documents, return this batch. */
|
||||
if (idx == max_write_batch_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Py_DECREF(iterator);
|
||||
Py_CLEAR(iterator);
|
||||
|
||||
if (PyErr_Occurred()) {
|
||||
goto cmdfail;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (!buffer_write_bytes(buffer, "\x00\x00", 2))
|
||||
goto cmdfail;
|
||||
|
||||
if (!buffer_write_bytes(buffer, "\x00\x00", 2)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
position = buffer_get_position(buffer);
|
||||
length = position - lst_len_loc - 1;
|
||||
@ -1646,10 +1624,9 @@ _batched_write_command(
|
||||
buffer_write_int32_at_position(buffer, cmd_len_loc, (int32_t)length);
|
||||
return 1;
|
||||
|
||||
cmditerfail:
|
||||
fail:
|
||||
Py_XDECREF(doc);
|
||||
Py_DECREF(iterator);
|
||||
cmdfail:
|
||||
Py_XDECREF(iterator);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user