From a3556c44472b25c213fc0d29122e80fea1989860 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 31 Oct 2019 15:45:59 -0700 Subject: [PATCH] PYTHON-2012 Update FLE to support commands larger than 6MiB Bulk write command are batched at 2MiB when auto encryption is enabled. --- pymongo/_cmessagemodule.c | 19 +++++++++- pymongo/encryption.py | 7 ---- pymongo/message.py | 31 ++++++++-------- test/test_encryption.py | 75 +++++++++++++++++++++++---------------- 4 files changed, 80 insertions(+), 52 deletions(-) diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index b3a82d631..7c4a517c5 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -1441,6 +1441,7 @@ _batched_write_command( long max_bson_size; long max_cmd_size; long max_write_batch_size; + long max_split_size; int idx = 0; int cmd_len_loc; int lst_len_loc; @@ -1448,6 +1449,7 @@ _batched_write_command( int length; PyObject* max_bson_size_obj = NULL; PyObject* max_write_batch_size_obj = NULL; + PyObject* max_split_size_obj = NULL; PyObject* doc = NULL; PyObject* iterator = NULL; @@ -1478,6 +1480,20 @@ _batched_write_command( return 0; } + // max_split_size is the size at which to perform a batch split. + // Normally this this value is equal to max_bson_size (16MiB). However, + // when auto encryption is enabled max_split_size is reduced to 2MiB. + max_split_size_obj = PyObject_GetAttrString(ctx, "max_split_size"); +#if PY_MAJOR_VERSION >= 3 + max_split_size = PyLong_AsLong(max_split_size_obj); +#else + max_split_size = PyInt_AsLong(max_split_size_obj); +#endif + Py_XDECREF(max_split_size_obj); + if (max_split_size == -1) { + return 0; + } + if (!buffer_write_bytes(buffer, "\x00\x00\x00\x00", /* flags */ 4) || @@ -1570,7 +1586,6 @@ _batched_write_command( * max_cmd_size accounts for the two trailing null bytes. */ cur_size = buffer_get_position(buffer) - cur_doc_begin; - enough_data = (buffer_get_position(buffer) > max_cmd_size); /* This single document is too large for the command. */ if (cur_size > max_cmd_size) { if (op == _INSERT) { @@ -1591,6 +1606,8 @@ _batched_write_command( } goto fail; } + enough_data = (idx >= 1 && + (buffer_get_position(buffer) > max_split_size)); if (enough_data) { /* * Roll the existing buffer back to the beginning diff --git a/pymongo/encryption.py b/pymongo/encryption.py index b61937cb6..bdbd4fc72 100644 --- a/pymongo/encryption.py +++ b/pymongo/encryption.py @@ -50,9 +50,6 @@ from pymongo.errors import (ConfigurationError, EncryptionError, InvalidOperation, ServerSelectionTimeoutError) -from pymongo.message import (_COMMAND_OVERHEAD, - _MAX_ENC_BSON_SIZE, - _raise_document_too_large) from pymongo.mongo_client import MongoClient from pymongo.pool import _configured_socket, PoolOptions from pymongo.read_concern import ReadConcern @@ -277,10 +274,6 @@ class _Encrypter(object): # check_keys. cluster_time = check_keys and cmd.pop('$clusterTime', None) encoded_cmd = _dict_to_bson(cmd, check_keys, codec_options) - max_cmd_size = _MAX_ENC_BSON_SIZE + _COMMAND_OVERHEAD - if len(encoded_cmd) > max_cmd_size: - raise _raise_document_too_large( - next(iter(cmd)), len(encoded_cmd), max_cmd_size) with _wrap_encryption_errors(): encrypted_cmd = self._auto_encrypter.encrypt(database, encoded_cmd) # TODO: PYTHON-1922 avoid decoding the encrypted_cmd. diff --git a/pymongo/message.py b/pymongo/message.py index 2b8ff1042..6e8c59695 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -929,6 +929,11 @@ class _BulkWriteContext(object): """A proxy for SockInfo.max_write_batch_size.""" return self.sock_info.max_write_batch_size + @property + def max_split_size(self): + """The maximum size of a BSON command before batch splitting.""" + return self.max_bson_size + def legacy_bulk_insert( self, request_id, msg, max_doc_size, acknowledged, docs, compress): if compress: @@ -1011,10 +1016,11 @@ class _BulkWriteContext(object): request_id, self.sock_info.address, self.op_id) -# 2MiB -_MAX_ENC_BSON_SIZE = 2 * (1024 * 1024) -# 6MB -_MAX_ENC_MESSAGE_SIZE = 6 * (1000 * 1000) +# From the Client Side Encryption spec: +# Because automatic encryption increases the size of commands, the driver +# MUST split bulk writes at a reduced size limit before undergoing automatic +# encryption. The write payload MUST be split at 2MiB (2097152). +_MAX_SPLIT_SIZE_ENC = 2097152 class _EncryptedBulkWriteContext(_BulkWriteContext): @@ -1049,14 +1055,9 @@ class _EncryptedBulkWriteContext(_BulkWriteContext): return to_send @property - def max_bson_size(self): - """A proxy for SockInfo.max_bson_size.""" - return min(self.sock_info.max_bson_size, _MAX_ENC_BSON_SIZE) - - @property - def max_message_size(self): - """A proxy for SockInfo.max_message_size.""" - return min(self.sock_info.max_message_size, _MAX_ENC_MESSAGE_SIZE) + def max_split_size(self): + """Reduce the batch splitting size.""" + return _MAX_SPLIT_SIZE_ENC def _raise_document_too_large(operation, doc_size, max_size): @@ -1388,6 +1389,7 @@ def _batched_write_command_impl( # Max BSON object size + 16k - 2 bytes for ending NUL bytes. # Server guarantees there is enough room: SERVER-10643. max_cmd_size = max_bson_size + _COMMAND_OVERHEAD + max_split_size = ctx.max_split_size # No options buf.write(_ZERO_32) @@ -1424,12 +1426,13 @@ def _batched_write_command_impl( # Is there enough room to add this document? max_cmd_size accounts for # the two trailing null bytes. doc_too_large = len(value) > max_cmd_size - enough_data = (buf.tell() + len(key) + len(value)) >= max_cmd_size - enough_documents = (idx >= max_write_batch_size) if doc_too_large: write_op = list(_FIELD_MAP.keys())[operation] _raise_document_too_large( write_op, len(value), max_bson_size) + enough_data = (idx >= 1 and + (buf.tell() + len(key) + len(value)) >= max_split_size) + enough_documents = (idx >= max_write_batch_size) if enough_data or enough_documents: break buf.write(_BSONOBJ) diff --git a/test/test_encryption.py b/test/test_encryption.py index fbd6c7010..91018d74b 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -35,15 +35,15 @@ from bson.json_util import JSONOptions from bson.son import SON from pymongo.cursor import CursorType -from pymongo.errors import (ConfigurationError, - EncryptionError, - InvalidOperation, - OperationFailure) from pymongo.encryption import (Algorithm, ClientEncryption) -from pymongo.errors import ConfigurationError, DocumentTooLarge from pymongo.encryption_options import AutoEncryptionOpts, _HAVE_PYMONGOCRYPT -from pymongo.message import _COMMAND_OVERHEAD +from pymongo.errors import (BulkWriteError, + ConfigurationError, + EncryptionError, + InvalidOperation, + OperationFailure, + WriteError) from pymongo.mongo_client import MongoClient from pymongo.operations import InsertOne from pymongo.write_concern import WriteConcern @@ -918,6 +918,10 @@ class TestCorpus(EncryptionIntegrationTest): self._test_corpus(opts) +_2_MiB = 2097152 +_16_MiB = 16777216 + + class TestBsonSizeBatches(EncryptionIntegrationTest): """Prose tests for BSON size limits and batch splitting.""" @@ -955,27 +959,14 @@ class TestBsonSizeBatches(EncryptionIntegrationTest): super(TestBsonSizeBatches, cls).tearDownClass() def test_01_insert_succeeds_under_2MiB(self): - doc = {'_id': 'no_encryption_under_2mib', - 'unencrypted': 'a' * ((2**21) - 1000)} + doc = {'_id': 'over_2mib_under_16mib', 'unencrypted': 'a' * _2_MiB} self.coll_encrypted.insert_one(doc) # Same with bulk_write. - doc = {'_id': 'no_encryption_under_2mib_bulk', - 'unencrypted': 'a' * ((2**21) - 1000)} + doc['_id'] = 'over_2mib_under_16mib_bulk' self.coll_encrypted.bulk_write([InsertOne(doc)]) - def test_02_insert_fails_over_2MiB(self): - doc = {'_id': 'no_encryption_over_2mib', - 'unencrypted': 'a' * (2**21 + _COMMAND_OVERHEAD)} - - with self.assertRaises(DocumentTooLarge): - self.coll_encrypted.insert_one(doc) - with self.assertRaises(DocumentTooLarge): - self.coll_encrypted.insert_many([doc]) - with self.assertRaises(DocumentTooLarge): - self.coll_encrypted.bulk_write([InsertOne(doc)]) - - def test_03_insert_succeeds_over_2MiB_post_encryption(self): + def test_02_insert_succeeds_over_2MiB_post_encryption(self): doc = {'_id': 'encryption_exceeds_2mib', 'unencrypted': 'a' * ((2**21) - 2000)} doc.update(json_data('limits', 'limits-doc.json')) @@ -985,29 +976,53 @@ class TestBsonSizeBatches(EncryptionIntegrationTest): doc['_id'] = 'encryption_exceeds_2mib_bulk' self.coll_encrypted.bulk_write([InsertOne(doc)]) - def test_04_bulk_batch_split(self): - doc1 = {'_id': 'no_encryption_under_2mib_1', - 'unencrypted': 'a' * ((2**21) - 1000)} - doc2 = {'_id': 'no_encryption_under_2mib_2', - 'unencrypted': 'a' * ((2**21) - 1000)} + def test_03_bulk_batch_split(self): + doc1 = {'_id': 'over_2mib_1', 'unencrypted': 'a' * _2_MiB} + doc2 = {'_id': 'over_2mib_2', 'unencrypted': 'a' * _2_MiB} self.listener.reset() self.coll_encrypted.bulk_write([InsertOne(doc1), InsertOne(doc2)]) self.assertEqual( self.listener.started_command_names(), ['insert', 'insert']) - def test_05_bulk_batch_split(self): + def test_04_bulk_batch_split(self): limits_doc = json_data('limits', 'limits-doc.json') doc1 = {'_id': 'encryption_exceeds_2mib_1', - 'unencrypted': 'a' * ((2**21) - 2000)} + 'unencrypted': 'a' * (_2_MiB - 2000)} doc1.update(limits_doc) doc2 = {'_id': 'encryption_exceeds_2mib_2', - 'unencrypted': 'a' * ((2**21) - 2000)} + 'unencrypted': 'a' * (_2_MiB - 2000)} doc2.update(limits_doc) self.listener.reset() self.coll_encrypted.bulk_write([InsertOne(doc1), InsertOne(doc2)]) self.assertEqual( self.listener.started_command_names(), ['insert', 'insert']) + def test_05_insert_succeeds_just_under_16MiB(self): + doc = {'_id': 'under_16mib', 'unencrypted': 'a' * (_16_MiB - 2000)} + self.coll_encrypted.insert_one(doc) + + # Same with bulk_write. + doc['_id'] = 'under_16mib_bulk' + self.coll_encrypted.bulk_write([InsertOne(doc)]) + + def test_06_insert_fails_over_16MiB(self): + limits_doc = json_data('limits', 'limits-doc.json') + doc = {'_id': 'encryption_exceeds_16mib', + 'unencrypted': 'a' * (_16_MiB - 2000)} + doc.update(limits_doc) + + with self.assertRaisesRegex(WriteError, 'object to insert too large'): + self.coll_encrypted.insert_one(doc) + + # Same with bulk_write. + doc['_id'] = 'encryption_exceeds_16mib_bulk' + with self.assertRaises(BulkWriteError) as ctx: + self.coll_encrypted.bulk_write([InsertOne(doc)]) + err = ctx.exception.details['writeErrors'][0] + self.assertEqual(2, err['code']) + self.assertIn('object to insert too large', err['errmsg']) + + class TestCustomEndpoint(EncryptionIntegrationTest): """Prose tests for creating data keys with a custom endpoint."""