Use maxWriteBatchSize for write command batch splitting, PYTHON-642.

This commit is contained in:
A. Jesse Jiryu Davis 2014-02-21 00:13:45 -05:00
parent 6c6375d516
commit 6d5f658c2a
14 changed files with 163 additions and 12 deletions

View File

@ -911,6 +911,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
long max_bson_size;
long max_cmd_size;
long max_write_batch_size;
long idx_offset = 0;
int idx = 0;
int cmd_len_loc;
@ -919,6 +920,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
int ordered;
char *ns = NULL;
PyObject* max_bson_size_obj;
PyObject* max_write_batch_size_obj;
PyObject* command;
PyObject* doc;
PyObject* docs;
@ -956,6 +958,18 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
*/
max_cmd_size = max_bson_size + 16382;
max_write_batch_size_obj = PyObject_GetAttrString(client, "max_write_batch_size");
#if PY_MAJOR_VERSION >= 3
max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj);
#else
max_write_batch_size = PyInt_AsLong(max_write_batch_size_obj);
#endif
Py_XDECREF(max_write_batch_size_obj);
if (max_write_batch_size == -1) {
PyMem_Free(ns);
return NULL;
}
/* Default to True */
ordered = !((PyDict_GetItemString(command, "ordered")) == Py_False);
@ -1035,6 +1049,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
int sub_doc_begin = buffer_get_position(buffer);
int cur_doc_begin;
int cur_size;
int enough_data = 0;
int enough_documents = 0;
char key[16];
empty = 0;
INT2STRING(key, idx);
@ -1052,7 +1068,9 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
Py_DECREF(doc);
/* We have enough data, maybe send this batch. */
if (buffer_get_position(buffer) > max_cmd_size) {
enough_data = (buffer_get_position(buffer) > max_cmd_size);
enough_documents = (idx >= max_write_batch_size);
if (enough_data || enough_documents) {
buffer_t new_buffer;
cur_size = buffer_get_position(buffer) - cur_doc_begin;

View File

@ -235,7 +235,7 @@ class _Bulk(object):
for idx, (op_type, operation) in enumerate(self.ops):
if run is None:
run = _Run(op_type)
elif run.op_type != op_type or len(run.ops) > 1000:
elif run.op_type != op_type:
yield run
run = _Run(op_type)
run.add(idx, operation)
@ -248,9 +248,6 @@ class _Bulk(object):
operations = [_Run(_INSERT), _Run(_UPDATE), _Run(_DELETE)]
for idx, (op_type, operation) in enumerate(self.ops):
operations[op_type].add(idx, operation)
if len(operations[op_type].ops) > 1000:
yield operations[op_type]
operations[op_type] = _Run(op_type)
for run in operations:
if run.ops:

View File

@ -357,9 +357,9 @@ class Collection(common.BaseObject):
.. mongodoc:: insert
"""
client = self.database.connection
# Batch inserts require us to know the connected master's
# max_bson_size and max_message_size. We have to be connected
# to a master to know that.
# Batch inserts require us to know the connected primary's
# max_bson_size, max_message_size, and max_write_batch_size.
# We have to be connected to the primary to know that.
client._ensure_connected(True)
docs = doc_or_docs

View File

@ -41,6 +41,7 @@ MAX_BSON_SIZE = 16 * (1024 ** 2)
MAX_MESSAGE_SIZE = 2 * MAX_BSON_SIZE
MIN_WIRE_VERSION = 0
MAX_WIRE_VERSION = 0
MAX_WRITE_BATCH_SIZE = 1000
# What this version of PyMongo supports.
MIN_SUPPORTED_WIRE_VERSION = 0

View File

@ -155,6 +155,17 @@ class MasterSlaveConnection(BaseObject):
"""
return self.master.max_wire_version
@property
def max_write_batch_size(self):
"""The maxWriteBatchSize reported by the server.
Returns a default value when connected to server versions prior to
MongoDB 2.6.
.. versionadded:: 2.7
"""
return self.master.max_write_batch_size
def disconnect(self):
"""Disconnect from MongoDB.

View File

@ -64,6 +64,8 @@ class Member(object):
'minWireVersion', common.MIN_WIRE_VERSION)
self.max_wire_version = ismaster_response.get(
'maxWireVersion', common.MAX_WIRE_VERSION)
self.max_write_batch_size = ismaster_response.get(
'maxWriteBatchSize', common.MAX_WRITE_BATCH_SIZE)
# self.min/max_wire_version is the server's wire protocol.
# MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports.

View File

@ -281,6 +281,7 @@ def _do_batched_write_command(namespace, operation, command,
"""Execute a batch of insert, update, or delete commands.
"""
max_bson_size = client.max_bson_size
max_write_batch_size = client.max_write_batch_size
# Max BSON object size + 16k - 2 bytes for ending NUL bytes
# XXX: This should come from the server - SERVER-10643
max_cmd_size = max_bson_size + 16382
@ -354,7 +355,9 @@ def _do_batched_write_command(namespace, operation, command,
key = b(str(idx))
value = bson.BSON.encode(doc, check_keys, uuid_subtype)
# Send a batch?
if (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size:
enough_data = (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size
enough_documents = (idx >= max_write_batch_size)
if enough_data or enough_documents:
if not idx:
if operation == _INSERT:
raise DocumentTooLarge("BSON document too large (%d bytes)"

View File

@ -655,6 +655,18 @@ class MongoClient(common.BaseObject):
return self.__member_property(
'max_wire_version', common.MAX_WIRE_VERSION)
@property
def max_write_batch_size(self):
"""The maxWriteBatchSize reported by the server.
Returns a default value when connected to server versions prior to
MongoDB 2.6.
.. versionadded:: 2.7
"""
return self.__member_property(
'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE)
def __simple_command(self, sock_info, dbname, spec):
"""Send a command to the server.
"""

View File

@ -986,6 +986,20 @@ class MongoReplicaSetClient(common.BaseObject):
return rs_state.primary_member.max_wire_version
return common.MAX_WIRE_VERSION
@property
def max_write_batch_size(self):
"""The maxWriteBatchSize reported by the server.
Returns a default value when connected to server versions prior to
MongoDB 2.6.
.. versionadded:: 2.7
"""
rs_state = self.__rs_state
if rs_state.primary_member:
return rs_state.primary_member.max_write_batch_size
return common.MAX_WRITE_BATCH_SIZE
@property
def auto_start_request(self):
"""Is auto_start_request enabled?

View File

@ -81,6 +81,9 @@ class MockClientBase(object):
# Hostname -> (min wire version, max wire version)
self.mock_wire_versions = {}
# Hostname -> max write batch size
self.mock_max_write_batch_sizes = {}
def kill_host(self, host):
"""Host is like 'a:1'."""
self.mock_down_hosts.append(host)
@ -92,11 +95,17 @@ class MockClientBase(object):
def set_wire_version_range(self, host, min_version, max_version):
self.mock_wire_versions[host] = (min_version, max_version)
def set_max_write_batch_size(self, host, size):
self.mock_max_write_batch_sizes[host] = size
def mock_is_master(self, host):
min_wire_version, max_wire_version = self.mock_wire_versions.get(
host,
(common.MIN_WIRE_VERSION, common.MAX_WIRE_VERSION))
max_write_batch_size = self.mock_max_write_batch_sizes.get(
host, common.MAX_WRITE_BATCH_SIZE)
# host is like 'a:1'.
if host in self.mock_down_hosts:
raise socket.timeout('mock timeout')
@ -105,7 +114,8 @@ class MockClientBase(object):
return {
'ismaster': True,
'minWireVersion': min_wire_version,
'maxWireVersion': max_wire_version}
'maxWireVersion': max_wire_version,
'maxWriteBatchSize': max_write_batch_size}
if host in self.mock_members:
ismaster = (host == self.mock_primary)
@ -117,7 +127,8 @@ class MockClientBase(object):
'setName': 'rs',
'hosts': self.mock_ismaster_hosts,
'minWireVersion': min_wire_version,
'maxWireVersion': max_wire_version}
'maxWireVersion': max_wire_version,
'maxWriteBatchSize': max_write_batch_size}
if self.mock_primary:
response['primary'] = self.mock_primary
@ -129,7 +140,8 @@ class MockClientBase(object):
'ismaster': True,
'minWireVersion': min_wire_version,
'maxWireVersion': max_wire_version,
'msg': 'isdbgrid'}
'msg': 'isdbgrid',
'maxWriteBatchSize': max_write_batch_size}
# In test_internal_ips(), we try to connect to a host listed
# in ismaster['hosts'] but not publicly accessible.

View File

@ -427,6 +427,27 @@ class TestBulk(unittest.TestCase):
self.assertEqual(6, result['nInserted'])
self.assertEqual(6, self.coll.count())
def test_numerous_inserts(self):
# Ensure we don't exceed server's 1000-document batch size limit.
n_docs = 2100
batch = self.coll.initialize_unordered_bulk_op()
for _ in range(n_docs):
batch.insert({})
result = batch.execute()
self.assertEqual(n_docs, result['nInserted'])
self.assertEqual(n_docs, self.coll.count())
# Same with ordered bulk.
self.coll.remove()
batch = self.coll.initialize_ordered_bulk_op()
for _ in range(n_docs):
batch.insert({})
result = batch.execute()
self.assertEqual(n_docs, result['nInserted'])
self.assertEqual(n_docs, self.coll.count())
def test_multiple_execution(self):
batch = self.coll.initialize_ordered_bulk_op()
batch.insert({})

View File

@ -104,6 +104,7 @@ class TestClient(unittest.TestCase, TestRequestMixin):
self.assertIsInstance(c.max_bson_size, int)
self.assertIsInstance(c.min_wire_version, int)
self.assertIsInstance(c.max_wire_version, int)
self.assertIsInstance(c.max_write_batch_size, int)
self.assertEqual(None, c.host)
self.assertEqual(None, c.port)
@ -916,6 +917,31 @@ with client.start_request() as request:
c.disconnect()
self.assertRaises(ConfigurationError, c.db.collection.find_one)
def test_max_wire_version(self):
c = MockClient(
standalones=[],
members=['a:1', 'b:2', 'c:3'],
mongoses=[],
host='b:2', # Pass a secondary.
replicaSet='rs',
_connect=False)
c.set_max_write_batch_size('a:1', 1)
c.set_max_write_batch_size('b:2', 2)
# Starts with default max batch size.
self.assertEqual(1000, c.max_write_batch_size)
c.db.collection.find_one() # Connect.
# Uses primary's max batch size.
self.assertEqual(c.max_write_batch_size, 1)
# b becomes primary.
c.mock_primary = 'b:2'
c.disconnect()
self.assertEqual(1000, c.max_write_batch_size)
c.db.collection.find_one() # Connect.
self.assertEqual(c.max_write_batch_size, 2)
def test_wire_version_mongos_ha(self):
c = MockClient(
standalones=[],

View File

@ -1893,6 +1893,14 @@ class TestCollection(unittest.TestCase):
finally:
self.client.end_request()
def test_numerous_inserts(self):
# Ensure we don't exceed server's 1000-document batch size limit.
self.db.test.remove()
n_docs = 2100
self.db.test.insert({} for _ in range(n_docs))
self.assertEqual(n_docs, self.db.test.count())
self.db.test.remove()
# Starting in PyMongo 2.6 we no longer use message.insert for inserts, but
# message.insert is part of the public API. Do minimal testing here; there
# isn't really a better place.

View File

@ -1212,5 +1212,31 @@ class TestReplicaSetClientInternalIPs(unittest.TestCase):
replicaSet='rs')
class TestReplicaSetClientMaxWriteBatchSize(unittest.TestCase):
def test_max_write_batch_size(self):
c = MockReplicaSetClient(
standalones=[],
members=['a:1', 'b:2'],
mongoses=[],
host='a:1',
replicaSet='rs',
_connect=False)
c.set_max_write_batch_size('a:1', 1)
c.set_max_write_batch_size('b:2', 2)
# Starts with default max batch size.
self.assertEqual(1000, c.max_write_batch_size)
c.db.collection.find_one() # Connect.
# Uses primary's max batch size.
self.assertEqual(c.max_write_batch_size, 1)
# b becomes primary.
c.mock_primary = 'b:2'
c.refresh()
self.assertEqual(c.max_write_batch_size, 2)
if __name__ == "__main__":
unittest.main()