diff --git a/bson/_cbsonmodule.c b/bson/_cbsonmodule.c index ca4b13108..d83b3dc0c 100644 --- a/bson/_cbsonmodule.c +++ b/bson/_cbsonmodule.c @@ -346,9 +346,9 @@ static int _load_object(PyObject** object, char* module_name, char* object_name) * * Returns non-zero on failure. */ static int _load_python_objects(PyObject* module) { - PyObject* empty_string; - PyObject* re_compile; - PyObject* compiled; + PyObject* empty_string = NULL; + PyObject* re_compile = NULL; + PyObject* compiled = NULL; struct module_state *state = GETSTATE(module); if (_load_object(&state->Binary, "bson.binary", "Binary") || diff --git a/doc/changelog.rst b/doc/changelog.rst index ef2c0a201..cabb3232e 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -87,6 +87,8 @@ Changes in Version 3.8.0.dev0 - Changes can now be requested from a ``ChangeStream`` cursor without blocking indefinitely using the new :meth:`pymongo.change_stream.ChangeStream.try_next` method. +- Fixed a reference leak bug when splitting a batched write command based on + maxWriteBatchSize or the max message size. Issues Resolved ............... diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index 2ae6d7948..cb6c65533 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -434,7 +434,7 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { buffer_t buffer; int length_location, message_length; unsigned char check_keys = 0; - PyObject* result; + PyObject* result = NULL; if (!PyArg_ParseTuple(args, "Iet#iiOOO&|b", &flags, @@ -477,18 +477,14 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { /* PyDict_GetItemString returns a borrowed reference. */ Py_INCREF(cluster_time); if (-1 == PyMapping_DelItemString(query, "$clusterTime")) { - destroy_codec_options(&options); - PyMem_Free(collection_name); - return NULL; + goto fail; } } } else if (PyMapping_HasKeyString(query, "$clusterTime")) { cluster_time = PyMapping_GetItemString(query, "$clusterTime"); if (!cluster_time || -1 == PyMapping_DelItemString(query, "$clusterTime")) { - destroy_codec_options(&options); - PyMem_Free(collection_name); - return NULL; + goto fail; } } if (!buffer_write_int32(buffer, (int32_t)request_id) || @@ -498,20 +494,12 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { collection_name_length + 1) || !buffer_write_int32(buffer, (int32_t)num_to_skip) || !buffer_write_int32(buffer, (int32_t)num_to_return)) { - destroy_codec_options(&options); - buffer_free(buffer); - PyMem_Free(collection_name); - Py_XDECREF(cluster_time); - return NULL; + goto fail; } begin = buffer_get_position(buffer); if (!write_dict(state->_cbson, buffer, query, check_keys, &options, 1)) { - destroy_codec_options(&options); - buffer_free(buffer); - PyMem_Free(collection_name); - Py_XDECREF(cluster_time); - return NULL; + goto fail; } /* back up a byte and write $clusterTime */ @@ -522,19 +510,11 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { buffer_update_position(buffer, buffer_get_position(buffer) - 1); if (!write_pair(state->_cbson, buffer, "$clusterTime", 12, cluster_time, 0, &options, 1)) { - destroy_codec_options(&options); - buffer_free(buffer); - PyMem_Free(collection_name); - Py_DECREF(cluster_time); - return NULL; + goto fail; } if (!buffer_write_bytes(buffer, &zero, 1)) { - destroy_codec_options(&options); - buffer_free(buffer); - PyMem_Free(collection_name); - Py_DECREF(cluster_time); - return NULL; + goto fail; } length = buffer_get_position(buffer) - begin; @@ -543,14 +523,10 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { /* undo popping $clusterTime */ if (-1 == PyMapping_SetItemString( query, "$clusterTime", cluster_time)) { - destroy_codec_options(&options); - buffer_free(buffer); - PyMem_Free(collection_name); - Py_DECREF(cluster_time); - return NULL; + goto fail; } - Py_DECREF(cluster_time); + Py_CLEAR(cluster_time); } max_size = buffer_get_position(buffer) - begin; @@ -559,17 +535,12 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { begin = buffer_get_position(buffer); if (!write_dict(state->_cbson, buffer, field_selector, 0, &options, 1)) { - destroy_codec_options(&options); - buffer_free(buffer); - PyMem_Free(collection_name); - return NULL; + goto fail; } cur_size = buffer_get_position(buffer) - begin; max_size = (cur_size > max_size) ? cur_size : max_size; } - PyMem_Free(collection_name); - message_length = buffer_get_position(buffer) - length_location; buffer_write_int32_at_position( buffer, length_location, (int32_t)message_length); @@ -579,8 +550,12 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { buffer_get_buffer(buffer), buffer_get_position(buffer), max_size); + +fail: + PyMem_Free(collection_name); destroy_codec_options(&options); buffer_free(buffer); + Py_XDECREF(cluster_time); return result; } @@ -1142,11 +1117,11 @@ _batched_op_msg( int size_location; int position; int length; - PyObject* max_bson_size_obj; - PyObject* max_write_batch_size_obj; - PyObject* max_message_size_obj; - PyObject* doc; - PyObject* iterator; + PyObject* max_bson_size_obj = NULL; + PyObject* max_write_batch_size_obj = NULL; + PyObject* max_message_size_obj = NULL; + PyObject* doc = NULL; + PyObject* iterator = NULL; char* flags = ack ? "\x00\x00\x00\x00" : "\x02\x00\x00\x00"; max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size"); @@ -1209,7 +1184,7 @@ _batched_op_msg( case _INSERT: { if (!buffer_write_bytes(buffer, "documents\x00", 10)) - goto cmdfail; + goto fail; break; } case _UPDATE: @@ -1217,7 +1192,7 @@ _batched_op_msg( /* MongoDB does key validation for update. */ check_keys = 0; if (!buffer_write_bytes(buffer, "updates\x00", 8)) - goto cmdfail; + goto fail; break; } case _DELETE: @@ -1225,7 +1200,7 @@ _batched_op_msg( /* Never check keys in a delete command. */ check_keys = 0; if (!buffer_write_bytes(buffer, "deletes\x00", 8)) - goto cmdfail; + goto fail; break; } default: @@ -1255,7 +1230,7 @@ _batched_op_msg( int unacked_doc_too_large = 0; if (!write_dict(state->_cbson, buffer, doc, check_keys, &options, 1)) { - goto cmditerfail; + goto fail; } cur_size = buffer_get_position(buffer) - cur_doc_begin; @@ -1285,7 +1260,7 @@ _batched_op_msg( Py_DECREF(DocumentTooLarge); } } - goto cmditerfail; + goto fail; } /* We have enough data, return this batch. */ if (buffer_get_position(buffer) > max_message_size) { @@ -1294,10 +1269,11 @@ _batched_op_msg( * of the last document encoded. */ buffer_update_position(buffer, cur_doc_begin); + Py_CLEAR(doc); break; } if (PyList_Append(to_publish, doc) < 0) { - goto cmditerfail; + goto fail; } Py_CLEAR(doc); idx += 1; @@ -1306,10 +1282,10 @@ _batched_op_msg( break; } } - Py_DECREF(iterator); + Py_CLEAR(iterator); if (PyErr_Occurred()) { - goto cmdfail; + goto fail; } position = buffer_get_position(buffer); @@ -1317,10 +1293,9 @@ _batched_op_msg( buffer_write_int32_at_position(buffer, size_location, (int32_t)length); return 1; -cmditerfail: +fail: Py_XDECREF(doc); - Py_DECREF(iterator); -cmdfail: + Py_XDECREF(iterator); return 0; } @@ -1466,10 +1441,10 @@ _batched_write_command( int lst_len_loc; int position; int length; - PyObject* max_bson_size_obj; - PyObject* max_write_batch_size_obj; - PyObject* doc; - PyObject* iterator; + PyObject* max_bson_size_obj = NULL; + PyObject* max_write_batch_size_obj = NULL; + PyObject* doc = NULL; + PyObject* iterator = NULL; max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size"); #if PY_MAJOR_VERSION >= 3 @@ -1524,7 +1499,7 @@ _batched_write_command( case _INSERT: { if (!buffer_write_bytes(buffer, "documents\x00", 10)) - goto cmdfail; + goto fail; break; } case _UPDATE: @@ -1532,7 +1507,7 @@ _batched_write_command( /* MongoDB does key validation for update. */ check_keys = 0; if (!buffer_write_bytes(buffer, "updates\x00", 8)) - goto cmdfail; + goto fail; break; } case _DELETE: @@ -1540,7 +1515,7 @@ _batched_write_command( /* Never check keys in a delete command. */ check_keys = 0; if (!buffer_write_bytes(buffer, "deletes\x00", 8)) - goto cmdfail; + goto fail; break; } default: @@ -1575,25 +1550,23 @@ _batched_write_command( int cur_doc_begin; int cur_size; int enough_data = 0; - int enough_documents = 0; char key[16]; INT2STRING(key, idx); if (!buffer_write_bytes(buffer, "\x03", 1) || !buffer_write_bytes(buffer, key, (int)strlen(key) + 1)) { - goto cmditerfail; + goto fail; } cur_doc_begin = buffer_get_position(buffer); if (!write_dict(state->_cbson, buffer, doc, check_keys, &options, 1)) { - goto cmditerfail; + goto fail; } /* We have enough data, return this batch. * max_cmd_size accounts for the two trailing null bytes. */ enough_data = (buffer_get_position(buffer) > max_cmd_size); - enough_documents = (idx >= max_write_batch_size); - if (enough_data || enough_documents) { + if (enough_data) { cur_size = buffer_get_position(buffer) - cur_doc_begin; /* This single document is too large for the command. */ @@ -1614,30 +1587,35 @@ _batched_write_command( Py_DECREF(DocumentTooLarge); } } - goto cmditerfail; + goto fail; } /* * Roll the existing buffer back to the beginning * of the last document encoded. */ buffer_update_position(buffer, sub_doc_begin); + Py_CLEAR(doc); break; } if (PyList_Append(to_publish, doc) < 0) { - goto cmditerfail; + goto fail; } Py_CLEAR(doc); idx += 1; + /* We have enough documents, return this batch. */ + if (idx == max_write_batch_size) { + break; + } } - Py_DECREF(iterator); + Py_CLEAR(iterator); if (PyErr_Occurred()) { - goto cmdfail; + goto fail; } - if (!buffer_write_bytes(buffer, "\x00\x00", 2)) - goto cmdfail; - + if (!buffer_write_bytes(buffer, "\x00\x00", 2)) { + goto fail; + } position = buffer_get_position(buffer); length = position - lst_len_loc - 1; @@ -1646,10 +1624,9 @@ _batched_write_command( buffer_write_int32_at_position(buffer, cmd_len_loc, (int32_t)length); return 1; -cmditerfail: +fail: Py_XDECREF(doc); - Py_DECREF(iterator); -cmdfail: + Py_XDECREF(iterator); return 0; }