From 6d5f658c2a36de0e4eb79d3b054a81b675b95bab Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Fri, 21 Feb 2014 00:13:45 -0500 Subject: [PATCH] Use maxWriteBatchSize for write command batch splitting, PYTHON-642. --- pymongo/_cmessagemodule.c | 20 +++++++++++++++++++- pymongo/bulk.py | 5 +---- pymongo/collection.py | 6 +++--- pymongo/common.py | 1 + pymongo/master_slave_connection.py | 11 +++++++++++ pymongo/member.py | 2 ++ pymongo/message.py | 5 ++++- pymongo/mongo_client.py | 12 ++++++++++++ pymongo/mongo_replica_set_client.py | 14 ++++++++++++++ test/pymongo_mocks.py | 18 +++++++++++++++--- test/test_bulk.py | 21 +++++++++++++++++++++ test/test_client.py | 26 ++++++++++++++++++++++++++ test/test_collection.py | 8 ++++++++ test/test_replica_set_client.py | 26 ++++++++++++++++++++++++++ 14 files changed, 163 insertions(+), 12 deletions(-) diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index ab7840967..cfe0049a0 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -911,6 +911,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { long max_bson_size; long max_cmd_size; + long max_write_batch_size; long idx_offset = 0; int idx = 0; int cmd_len_loc; @@ -919,6 +920,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { int ordered; char *ns = NULL; PyObject* max_bson_size_obj; + PyObject* max_write_batch_size_obj; PyObject* command; PyObject* doc; PyObject* docs; @@ -956,6 +958,18 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { */ max_cmd_size = max_bson_size + 16382; + max_write_batch_size_obj = PyObject_GetAttrString(client, "max_write_batch_size"); +#if PY_MAJOR_VERSION >= 3 + max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj); +#else + max_write_batch_size = PyInt_AsLong(max_write_batch_size_obj); +#endif + Py_XDECREF(max_write_batch_size_obj); + if (max_write_batch_size == -1) { + PyMem_Free(ns); + return NULL; + } + /* Default to True */ ordered = !((PyDict_GetItemString(command, "ordered")) == Py_False); @@ -1035,6 +1049,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { int sub_doc_begin = buffer_get_position(buffer); int cur_doc_begin; int cur_size; + int enough_data = 0; + int enough_documents = 0; char key[16]; empty = 0; INT2STRING(key, idx); @@ -1052,7 +1068,9 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) { Py_DECREF(doc); /* We have enough data, maybe send this batch. */ - if (buffer_get_position(buffer) > max_cmd_size) { + enough_data = (buffer_get_position(buffer) > max_cmd_size); + enough_documents = (idx >= max_write_batch_size); + if (enough_data || enough_documents) { buffer_t new_buffer; cur_size = buffer_get_position(buffer) - cur_doc_begin; diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 775cec67d..224aeb847 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -235,7 +235,7 @@ class _Bulk(object): for idx, (op_type, operation) in enumerate(self.ops): if run is None: run = _Run(op_type) - elif run.op_type != op_type or len(run.ops) > 1000: + elif run.op_type != op_type: yield run run = _Run(op_type) run.add(idx, operation) @@ -248,9 +248,6 @@ class _Bulk(object): operations = [_Run(_INSERT), _Run(_UPDATE), _Run(_DELETE)] for idx, (op_type, operation) in enumerate(self.ops): operations[op_type].add(idx, operation) - if len(operations[op_type].ops) > 1000: - yield operations[op_type] - operations[op_type] = _Run(op_type) for run in operations: if run.ops: diff --git a/pymongo/collection.py b/pymongo/collection.py index f07e0031e..8e83cf8a3 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -357,9 +357,9 @@ class Collection(common.BaseObject): .. mongodoc:: insert """ client = self.database.connection - # Batch inserts require us to know the connected master's - # max_bson_size and max_message_size. We have to be connected - # to a master to know that. + # Batch inserts require us to know the connected primary's + # max_bson_size, max_message_size, and max_write_batch_size. + # We have to be connected to the primary to know that. client._ensure_connected(True) docs = doc_or_docs diff --git a/pymongo/common.py b/pymongo/common.py index 15006641c..f246339c7 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -41,6 +41,7 @@ MAX_BSON_SIZE = 16 * (1024 ** 2) MAX_MESSAGE_SIZE = 2 * MAX_BSON_SIZE MIN_WIRE_VERSION = 0 MAX_WIRE_VERSION = 0 +MAX_WRITE_BATCH_SIZE = 1000 # What this version of PyMongo supports. MIN_SUPPORTED_WIRE_VERSION = 0 diff --git a/pymongo/master_slave_connection.py b/pymongo/master_slave_connection.py index 3699b9598..dbee7897c 100644 --- a/pymongo/master_slave_connection.py +++ b/pymongo/master_slave_connection.py @@ -155,6 +155,17 @@ class MasterSlaveConnection(BaseObject): """ return self.master.max_wire_version + @property + def max_write_batch_size(self): + """The maxWriteBatchSize reported by the server. + + Returns a default value when connected to server versions prior to + MongoDB 2.6. + + .. versionadded:: 2.7 + """ + return self.master.max_write_batch_size + def disconnect(self): """Disconnect from MongoDB. diff --git a/pymongo/member.py b/pymongo/member.py index 576928f3e..dbdafdf0f 100644 --- a/pymongo/member.py +++ b/pymongo/member.py @@ -64,6 +64,8 @@ class Member(object): 'minWireVersion', common.MIN_WIRE_VERSION) self.max_wire_version = ismaster_response.get( 'maxWireVersion', common.MAX_WIRE_VERSION) + self.max_write_batch_size = ismaster_response.get( + 'maxWriteBatchSize', common.MAX_WRITE_BATCH_SIZE) # self.min/max_wire_version is the server's wire protocol. # MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports. diff --git a/pymongo/message.py b/pymongo/message.py index 4f005ca99..57059034c 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -281,6 +281,7 @@ def _do_batched_write_command(namespace, operation, command, """Execute a batch of insert, update, or delete commands. """ max_bson_size = client.max_bson_size + max_write_batch_size = client.max_write_batch_size # Max BSON object size + 16k - 2 bytes for ending NUL bytes # XXX: This should come from the server - SERVER-10643 max_cmd_size = max_bson_size + 16382 @@ -354,7 +355,9 @@ def _do_batched_write_command(namespace, operation, command, key = b(str(idx)) value = bson.BSON.encode(doc, check_keys, uuid_subtype) # Send a batch? - if (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size: + enough_data = (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size + enough_documents = (idx >= max_write_batch_size) + if enough_data or enough_documents: if not idx: if operation == _INSERT: raise DocumentTooLarge("BSON document too large (%d bytes)" diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 2ad4768a4..213436425 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -655,6 +655,18 @@ class MongoClient(common.BaseObject): return self.__member_property( 'max_wire_version', common.MAX_WIRE_VERSION) + @property + def max_write_batch_size(self): + """The maxWriteBatchSize reported by the server. + + Returns a default value when connected to server versions prior to + MongoDB 2.6. + + .. versionadded:: 2.7 + """ + return self.__member_property( + 'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE) + def __simple_command(self, sock_info, dbname, spec): """Send a command to the server. """ diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index a3e578fc2..0f501ffd0 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -986,6 +986,20 @@ class MongoReplicaSetClient(common.BaseObject): return rs_state.primary_member.max_wire_version return common.MAX_WIRE_VERSION + @property + def max_write_batch_size(self): + """The maxWriteBatchSize reported by the server. + + Returns a default value when connected to server versions prior to + MongoDB 2.6. + + .. versionadded:: 2.7 + """ + rs_state = self.__rs_state + if rs_state.primary_member: + return rs_state.primary_member.max_write_batch_size + return common.MAX_WRITE_BATCH_SIZE + @property def auto_start_request(self): """Is auto_start_request enabled? diff --git a/test/pymongo_mocks.py b/test/pymongo_mocks.py index d42555861..af60a20ee 100644 --- a/test/pymongo_mocks.py +++ b/test/pymongo_mocks.py @@ -81,6 +81,9 @@ class MockClientBase(object): # Hostname -> (min wire version, max wire version) self.mock_wire_versions = {} + # Hostname -> max write batch size + self.mock_max_write_batch_sizes = {} + def kill_host(self, host): """Host is like 'a:1'.""" self.mock_down_hosts.append(host) @@ -92,11 +95,17 @@ class MockClientBase(object): def set_wire_version_range(self, host, min_version, max_version): self.mock_wire_versions[host] = (min_version, max_version) + def set_max_write_batch_size(self, host, size): + self.mock_max_write_batch_sizes[host] = size + def mock_is_master(self, host): min_wire_version, max_wire_version = self.mock_wire_versions.get( host, (common.MIN_WIRE_VERSION, common.MAX_WIRE_VERSION)) + max_write_batch_size = self.mock_max_write_batch_sizes.get( + host, common.MAX_WRITE_BATCH_SIZE) + # host is like 'a:1'. if host in self.mock_down_hosts: raise socket.timeout('mock timeout') @@ -105,7 +114,8 @@ class MockClientBase(object): return { 'ismaster': True, 'minWireVersion': min_wire_version, - 'maxWireVersion': max_wire_version} + 'maxWireVersion': max_wire_version, + 'maxWriteBatchSize': max_write_batch_size} if host in self.mock_members: ismaster = (host == self.mock_primary) @@ -117,7 +127,8 @@ class MockClientBase(object): 'setName': 'rs', 'hosts': self.mock_ismaster_hosts, 'minWireVersion': min_wire_version, - 'maxWireVersion': max_wire_version} + 'maxWireVersion': max_wire_version, + 'maxWriteBatchSize': max_write_batch_size} if self.mock_primary: response['primary'] = self.mock_primary @@ -129,7 +140,8 @@ class MockClientBase(object): 'ismaster': True, 'minWireVersion': min_wire_version, 'maxWireVersion': max_wire_version, - 'msg': 'isdbgrid'} + 'msg': 'isdbgrid', + 'maxWriteBatchSize': max_write_batch_size} # In test_internal_ips(), we try to connect to a host listed # in ismaster['hosts'] but not publicly accessible. diff --git a/test/test_bulk.py b/test/test_bulk.py index 584a04883..45b09a74a 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -427,6 +427,27 @@ class TestBulk(unittest.TestCase): self.assertEqual(6, result['nInserted']) self.assertEqual(6, self.coll.count()) + def test_numerous_inserts(self): + # Ensure we don't exceed server's 1000-document batch size limit. + n_docs = 2100 + batch = self.coll.initialize_unordered_bulk_op() + for _ in range(n_docs): + batch.insert({}) + + result = batch.execute() + self.assertEqual(n_docs, result['nInserted']) + self.assertEqual(n_docs, self.coll.count()) + + # Same with ordered bulk. + self.coll.remove() + batch = self.coll.initialize_ordered_bulk_op() + for _ in range(n_docs): + batch.insert({}) + + result = batch.execute() + self.assertEqual(n_docs, result['nInserted']) + self.assertEqual(n_docs, self.coll.count()) + def test_multiple_execution(self): batch = self.coll.initialize_ordered_bulk_op() batch.insert({}) diff --git a/test/test_client.py b/test/test_client.py index 196b41dff..c1d98ba3d 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -104,6 +104,7 @@ class TestClient(unittest.TestCase, TestRequestMixin): self.assertIsInstance(c.max_bson_size, int) self.assertIsInstance(c.min_wire_version, int) self.assertIsInstance(c.max_wire_version, int) + self.assertIsInstance(c.max_write_batch_size, int) self.assertEqual(None, c.host) self.assertEqual(None, c.port) @@ -916,6 +917,31 @@ with client.start_request() as request: c.disconnect() self.assertRaises(ConfigurationError, c.db.collection.find_one) + def test_max_wire_version(self): + c = MockClient( + standalones=[], + members=['a:1', 'b:2', 'c:3'], + mongoses=[], + host='b:2', # Pass a secondary. + replicaSet='rs', + _connect=False) + + c.set_max_write_batch_size('a:1', 1) + c.set_max_write_batch_size('b:2', 2) + + # Starts with default max batch size. + self.assertEqual(1000, c.max_write_batch_size) + c.db.collection.find_one() # Connect. + # Uses primary's max batch size. + self.assertEqual(c.max_write_batch_size, 1) + + # b becomes primary. + c.mock_primary = 'b:2' + c.disconnect() + self.assertEqual(1000, c.max_write_batch_size) + c.db.collection.find_one() # Connect. + self.assertEqual(c.max_write_batch_size, 2) + def test_wire_version_mongos_ha(self): c = MockClient( standalones=[], diff --git a/test/test_collection.py b/test/test_collection.py index c5b59d8ec..d300ec5ba 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -1893,6 +1893,14 @@ class TestCollection(unittest.TestCase): finally: self.client.end_request() + def test_numerous_inserts(self): + # Ensure we don't exceed server's 1000-document batch size limit. + self.db.test.remove() + n_docs = 2100 + self.db.test.insert({} for _ in range(n_docs)) + self.assertEqual(n_docs, self.db.test.count()) + self.db.test.remove() + # Starting in PyMongo 2.6 we no longer use message.insert for inserts, but # message.insert is part of the public API. Do minimal testing here; there # isn't really a better place. diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index 8c701ec42..cf1bfee6b 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -1212,5 +1212,31 @@ class TestReplicaSetClientInternalIPs(unittest.TestCase): replicaSet='rs') +class TestReplicaSetClientMaxWriteBatchSize(unittest.TestCase): + def test_max_write_batch_size(self): + c = MockReplicaSetClient( + standalones=[], + members=['a:1', 'b:2'], + mongoses=[], + host='a:1', + replicaSet='rs', + _connect=False) + + c.set_max_write_batch_size('a:1', 1) + c.set_max_write_batch_size('b:2', 2) + + # Starts with default max batch size. + self.assertEqual(1000, c.max_write_batch_size) + c.db.collection.find_one() # Connect. + + # Uses primary's max batch size. + self.assertEqual(c.max_write_batch_size, 1) + + # b becomes primary. + c.mock_primary = 'b:2' + c.refresh() + self.assertEqual(c.max_write_batch_size, 2) + + if __name__ == "__main__": unittest.main()