PYTHON-2012 Update FLE to support commands larger than 6MiB
Bulk write command are batched at 2MiB when auto encryption is enabled.
This commit is contained in:
parent
c65367b8f0
commit
a3556c4447
@ -1441,6 +1441,7 @@ _batched_write_command(
|
||||
long max_bson_size;
|
||||
long max_cmd_size;
|
||||
long max_write_batch_size;
|
||||
long max_split_size;
|
||||
int idx = 0;
|
||||
int cmd_len_loc;
|
||||
int lst_len_loc;
|
||||
@ -1448,6 +1449,7 @@ _batched_write_command(
|
||||
int length;
|
||||
PyObject* max_bson_size_obj = NULL;
|
||||
PyObject* max_write_batch_size_obj = NULL;
|
||||
PyObject* max_split_size_obj = NULL;
|
||||
PyObject* doc = NULL;
|
||||
PyObject* iterator = NULL;
|
||||
|
||||
@ -1478,6 +1480,20 @@ _batched_write_command(
|
||||
return 0;
|
||||
}
|
||||
|
||||
// max_split_size is the size at which to perform a batch split.
|
||||
// Normally this this value is equal to max_bson_size (16MiB). However,
|
||||
// when auto encryption is enabled max_split_size is reduced to 2MiB.
|
||||
max_split_size_obj = PyObject_GetAttrString(ctx, "max_split_size");
|
||||
#if PY_MAJOR_VERSION >= 3
|
||||
max_split_size = PyLong_AsLong(max_split_size_obj);
|
||||
#else
|
||||
max_split_size = PyInt_AsLong(max_split_size_obj);
|
||||
#endif
|
||||
Py_XDECREF(max_split_size_obj);
|
||||
if (max_split_size == -1) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!buffer_write_bytes(buffer,
|
||||
"\x00\x00\x00\x00", /* flags */
|
||||
4) ||
|
||||
@ -1570,7 +1586,6 @@ _batched_write_command(
|
||||
* max_cmd_size accounts for the two trailing null bytes.
|
||||
*/
|
||||
cur_size = buffer_get_position(buffer) - cur_doc_begin;
|
||||
enough_data = (buffer_get_position(buffer) > max_cmd_size);
|
||||
/* This single document is too large for the command. */
|
||||
if (cur_size > max_cmd_size) {
|
||||
if (op == _INSERT) {
|
||||
@ -1591,6 +1606,8 @@ _batched_write_command(
|
||||
}
|
||||
goto fail;
|
||||
}
|
||||
enough_data = (idx >= 1 &&
|
||||
(buffer_get_position(buffer) > max_split_size));
|
||||
if (enough_data) {
|
||||
/*
|
||||
* Roll the existing buffer back to the beginning
|
||||
|
||||
@ -50,9 +50,6 @@ from pymongo.errors import (ConfigurationError,
|
||||
EncryptionError,
|
||||
InvalidOperation,
|
||||
ServerSelectionTimeoutError)
|
||||
from pymongo.message import (_COMMAND_OVERHEAD,
|
||||
_MAX_ENC_BSON_SIZE,
|
||||
_raise_document_too_large)
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.pool import _configured_socket, PoolOptions
|
||||
from pymongo.read_concern import ReadConcern
|
||||
@ -277,10 +274,6 @@ class _Encrypter(object):
|
||||
# check_keys.
|
||||
cluster_time = check_keys and cmd.pop('$clusterTime', None)
|
||||
encoded_cmd = _dict_to_bson(cmd, check_keys, codec_options)
|
||||
max_cmd_size = _MAX_ENC_BSON_SIZE + _COMMAND_OVERHEAD
|
||||
if len(encoded_cmd) > max_cmd_size:
|
||||
raise _raise_document_too_large(
|
||||
next(iter(cmd)), len(encoded_cmd), max_cmd_size)
|
||||
with _wrap_encryption_errors():
|
||||
encrypted_cmd = self._auto_encrypter.encrypt(database, encoded_cmd)
|
||||
# TODO: PYTHON-1922 avoid decoding the encrypted_cmd.
|
||||
|
||||
@ -929,6 +929,11 @@ class _BulkWriteContext(object):
|
||||
"""A proxy for SockInfo.max_write_batch_size."""
|
||||
return self.sock_info.max_write_batch_size
|
||||
|
||||
@property
|
||||
def max_split_size(self):
|
||||
"""The maximum size of a BSON command before batch splitting."""
|
||||
return self.max_bson_size
|
||||
|
||||
def legacy_bulk_insert(
|
||||
self, request_id, msg, max_doc_size, acknowledged, docs, compress):
|
||||
if compress:
|
||||
@ -1011,10 +1016,11 @@ class _BulkWriteContext(object):
|
||||
request_id, self.sock_info.address, self.op_id)
|
||||
|
||||
|
||||
# 2MiB
|
||||
_MAX_ENC_BSON_SIZE = 2 * (1024 * 1024)
|
||||
# 6MB
|
||||
_MAX_ENC_MESSAGE_SIZE = 6 * (1000 * 1000)
|
||||
# From the Client Side Encryption spec:
|
||||
# Because automatic encryption increases the size of commands, the driver
|
||||
# MUST split bulk writes at a reduced size limit before undergoing automatic
|
||||
# encryption. The write payload MUST be split at 2MiB (2097152).
|
||||
_MAX_SPLIT_SIZE_ENC = 2097152
|
||||
|
||||
|
||||
class _EncryptedBulkWriteContext(_BulkWriteContext):
|
||||
@ -1049,14 +1055,9 @@ class _EncryptedBulkWriteContext(_BulkWriteContext):
|
||||
return to_send
|
||||
|
||||
@property
|
||||
def max_bson_size(self):
|
||||
"""A proxy for SockInfo.max_bson_size."""
|
||||
return min(self.sock_info.max_bson_size, _MAX_ENC_BSON_SIZE)
|
||||
|
||||
@property
|
||||
def max_message_size(self):
|
||||
"""A proxy for SockInfo.max_message_size."""
|
||||
return min(self.sock_info.max_message_size, _MAX_ENC_MESSAGE_SIZE)
|
||||
def max_split_size(self):
|
||||
"""Reduce the batch splitting size."""
|
||||
return _MAX_SPLIT_SIZE_ENC
|
||||
|
||||
|
||||
def _raise_document_too_large(operation, doc_size, max_size):
|
||||
@ -1388,6 +1389,7 @@ def _batched_write_command_impl(
|
||||
# Max BSON object size + 16k - 2 bytes for ending NUL bytes.
|
||||
# Server guarantees there is enough room: SERVER-10643.
|
||||
max_cmd_size = max_bson_size + _COMMAND_OVERHEAD
|
||||
max_split_size = ctx.max_split_size
|
||||
|
||||
# No options
|
||||
buf.write(_ZERO_32)
|
||||
@ -1424,12 +1426,13 @@ def _batched_write_command_impl(
|
||||
# Is there enough room to add this document? max_cmd_size accounts for
|
||||
# the two trailing null bytes.
|
||||
doc_too_large = len(value) > max_cmd_size
|
||||
enough_data = (buf.tell() + len(key) + len(value)) >= max_cmd_size
|
||||
enough_documents = (idx >= max_write_batch_size)
|
||||
if doc_too_large:
|
||||
write_op = list(_FIELD_MAP.keys())[operation]
|
||||
_raise_document_too_large(
|
||||
write_op, len(value), max_bson_size)
|
||||
enough_data = (idx >= 1 and
|
||||
(buf.tell() + len(key) + len(value)) >= max_split_size)
|
||||
enough_documents = (idx >= max_write_batch_size)
|
||||
if enough_data or enough_documents:
|
||||
break
|
||||
buf.write(_BSONOBJ)
|
||||
|
||||
@ -35,15 +35,15 @@ from bson.json_util import JSONOptions
|
||||
from bson.son import SON
|
||||
|
||||
from pymongo.cursor import CursorType
|
||||
from pymongo.errors import (ConfigurationError,
|
||||
EncryptionError,
|
||||
InvalidOperation,
|
||||
OperationFailure)
|
||||
from pymongo.encryption import (Algorithm,
|
||||
ClientEncryption)
|
||||
from pymongo.errors import ConfigurationError, DocumentTooLarge
|
||||
from pymongo.encryption_options import AutoEncryptionOpts, _HAVE_PYMONGOCRYPT
|
||||
from pymongo.message import _COMMAND_OVERHEAD
|
||||
from pymongo.errors import (BulkWriteError,
|
||||
ConfigurationError,
|
||||
EncryptionError,
|
||||
InvalidOperation,
|
||||
OperationFailure,
|
||||
WriteError)
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.operations import InsertOne
|
||||
from pymongo.write_concern import WriteConcern
|
||||
@ -918,6 +918,10 @@ class TestCorpus(EncryptionIntegrationTest):
|
||||
self._test_corpus(opts)
|
||||
|
||||
|
||||
_2_MiB = 2097152
|
||||
_16_MiB = 16777216
|
||||
|
||||
|
||||
class TestBsonSizeBatches(EncryptionIntegrationTest):
|
||||
"""Prose tests for BSON size limits and batch splitting."""
|
||||
|
||||
@ -955,27 +959,14 @@ class TestBsonSizeBatches(EncryptionIntegrationTest):
|
||||
super(TestBsonSizeBatches, cls).tearDownClass()
|
||||
|
||||
def test_01_insert_succeeds_under_2MiB(self):
|
||||
doc = {'_id': 'no_encryption_under_2mib',
|
||||
'unencrypted': 'a' * ((2**21) - 1000)}
|
||||
doc = {'_id': 'over_2mib_under_16mib', 'unencrypted': 'a' * _2_MiB}
|
||||
self.coll_encrypted.insert_one(doc)
|
||||
|
||||
# Same with bulk_write.
|
||||
doc = {'_id': 'no_encryption_under_2mib_bulk',
|
||||
'unencrypted': 'a' * ((2**21) - 1000)}
|
||||
doc['_id'] = 'over_2mib_under_16mib_bulk'
|
||||
self.coll_encrypted.bulk_write([InsertOne(doc)])
|
||||
|
||||
def test_02_insert_fails_over_2MiB(self):
|
||||
doc = {'_id': 'no_encryption_over_2mib',
|
||||
'unencrypted': 'a' * (2**21 + _COMMAND_OVERHEAD)}
|
||||
|
||||
with self.assertRaises(DocumentTooLarge):
|
||||
self.coll_encrypted.insert_one(doc)
|
||||
with self.assertRaises(DocumentTooLarge):
|
||||
self.coll_encrypted.insert_many([doc])
|
||||
with self.assertRaises(DocumentTooLarge):
|
||||
self.coll_encrypted.bulk_write([InsertOne(doc)])
|
||||
|
||||
def test_03_insert_succeeds_over_2MiB_post_encryption(self):
|
||||
def test_02_insert_succeeds_over_2MiB_post_encryption(self):
|
||||
doc = {'_id': 'encryption_exceeds_2mib',
|
||||
'unencrypted': 'a' * ((2**21) - 2000)}
|
||||
doc.update(json_data('limits', 'limits-doc.json'))
|
||||
@ -985,29 +976,53 @@ class TestBsonSizeBatches(EncryptionIntegrationTest):
|
||||
doc['_id'] = 'encryption_exceeds_2mib_bulk'
|
||||
self.coll_encrypted.bulk_write([InsertOne(doc)])
|
||||
|
||||
def test_04_bulk_batch_split(self):
|
||||
doc1 = {'_id': 'no_encryption_under_2mib_1',
|
||||
'unencrypted': 'a' * ((2**21) - 1000)}
|
||||
doc2 = {'_id': 'no_encryption_under_2mib_2',
|
||||
'unencrypted': 'a' * ((2**21) - 1000)}
|
||||
def test_03_bulk_batch_split(self):
|
||||
doc1 = {'_id': 'over_2mib_1', 'unencrypted': 'a' * _2_MiB}
|
||||
doc2 = {'_id': 'over_2mib_2', 'unencrypted': 'a' * _2_MiB}
|
||||
self.listener.reset()
|
||||
self.coll_encrypted.bulk_write([InsertOne(doc1), InsertOne(doc2)])
|
||||
self.assertEqual(
|
||||
self.listener.started_command_names(), ['insert', 'insert'])
|
||||
|
||||
def test_05_bulk_batch_split(self):
|
||||
def test_04_bulk_batch_split(self):
|
||||
limits_doc = json_data('limits', 'limits-doc.json')
|
||||
doc1 = {'_id': 'encryption_exceeds_2mib_1',
|
||||
'unencrypted': 'a' * ((2**21) - 2000)}
|
||||
'unencrypted': 'a' * (_2_MiB - 2000)}
|
||||
doc1.update(limits_doc)
|
||||
doc2 = {'_id': 'encryption_exceeds_2mib_2',
|
||||
'unencrypted': 'a' * ((2**21) - 2000)}
|
||||
'unencrypted': 'a' * (_2_MiB - 2000)}
|
||||
doc2.update(limits_doc)
|
||||
self.listener.reset()
|
||||
self.coll_encrypted.bulk_write([InsertOne(doc1), InsertOne(doc2)])
|
||||
self.assertEqual(
|
||||
self.listener.started_command_names(), ['insert', 'insert'])
|
||||
|
||||
def test_05_insert_succeeds_just_under_16MiB(self):
|
||||
doc = {'_id': 'under_16mib', 'unencrypted': 'a' * (_16_MiB - 2000)}
|
||||
self.coll_encrypted.insert_one(doc)
|
||||
|
||||
# Same with bulk_write.
|
||||
doc['_id'] = 'under_16mib_bulk'
|
||||
self.coll_encrypted.bulk_write([InsertOne(doc)])
|
||||
|
||||
def test_06_insert_fails_over_16MiB(self):
|
||||
limits_doc = json_data('limits', 'limits-doc.json')
|
||||
doc = {'_id': 'encryption_exceeds_16mib',
|
||||
'unencrypted': 'a' * (_16_MiB - 2000)}
|
||||
doc.update(limits_doc)
|
||||
|
||||
with self.assertRaisesRegex(WriteError, 'object to insert too large'):
|
||||
self.coll_encrypted.insert_one(doc)
|
||||
|
||||
# Same with bulk_write.
|
||||
doc['_id'] = 'encryption_exceeds_16mib_bulk'
|
||||
with self.assertRaises(BulkWriteError) as ctx:
|
||||
self.coll_encrypted.bulk_write([InsertOne(doc)])
|
||||
err = ctx.exception.details['writeErrors'][0]
|
||||
self.assertEqual(2, err['code'])
|
||||
self.assertIn('object to insert too large', err['errmsg'])
|
||||
|
||||
|
||||
|
||||
class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
"""Prose tests for creating data keys with a custom endpoint."""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user