PYTHON-1939 Add prose tests for BSON size limits and batch splitting

This commit is contained in:
Shane Harvey 2019-08-03 21:43:48 -07:00
parent 94e2b103f1
commit 55c8bdd346
8 changed files with 1672 additions and 29 deletions

View File

@ -1565,30 +1565,29 @@ _batched_write_command(
/* We have enough data, return this batch.
* max_cmd_size accounts for the two trailing null bytes.
*/
cur_size = buffer_get_position(buffer) - cur_doc_begin;
enough_data = (buffer_get_position(buffer) > max_cmd_size);
if (enough_data) {
cur_size = buffer_get_position(buffer) - cur_doc_begin;
/* This single document is too large for the command. */
if (!idx) {
if (op == _INSERT) {
_set_document_too_large(cur_size, max_bson_size);
} else {
PyObject* DocumentTooLarge = _error("DocumentTooLarge");
if (DocumentTooLarge) {
/*
* There's nothing intelligent we can say
* about size for update and delete.
*/
PyErr_Format(
DocumentTooLarge,
"%s command document too large",
(op == _UPDATE) ? "update": "delete");
Py_DECREF(DocumentTooLarge);
}
/* This single document is too large for the command. */
if (cur_size > max_cmd_size) {
if (op == _INSERT) {
_set_document_too_large(cur_size, max_bson_size);
} else {
PyObject* DocumentTooLarge = _error("DocumentTooLarge");
if (DocumentTooLarge) {
/*
* There's nothing intelligent we can say
* about size for update and delete.
*/
PyErr_Format(
DocumentTooLarge,
"%s command document too large",
(op == _UPDATE) ? "update": "delete");
Py_DECREF(DocumentTooLarge);
}
goto fail;
}
goto fail;
}
if (enough_data) {
/*
* Roll the existing buffer back to the beginning
* of the last document encoded.

View File

@ -49,6 +49,9 @@ from pymongo.errors import (ConfigurationError,
EncryptionError,
InvalidOperation,
ServerSelectionTimeoutError)
from pymongo.message import (_COMMAND_OVERHEAD,
_MAX_ENC_BSON_SIZE,
_raise_document_too_large)
from pymongo.mongo_client import MongoClient
from pymongo.pool import _configured_socket, PoolOptions
from pymongo.read_concern import ReadConcern
@ -265,11 +268,15 @@ class _Encrypter(object):
The encrypted command to execute.
"""
self._check_closed()
# Workaround for $clusterTime which is incompatible with
# check_keys.
cluster_time = check_keys and cmd.pop('$clusterTime', None)
encoded_cmd = _dict_to_bson(cmd, check_keys, codec_options)
max_cmd_size = _MAX_ENC_BSON_SIZE + _COMMAND_OVERHEAD
if len(encoded_cmd) > max_cmd_size:
raise _raise_document_too_large(
next(iter(cmd)), len(encoded_cmd), max_cmd_size)
with _wrap_encryption_errors():
# Workaround for $clusterTime which is incompatible with
# check_keys.
cluster_time = check_keys and cmd.pop('$clusterTime', None)
encoded_cmd = _dict_to_bson(cmd, check_keys, codec_options)
encrypted_cmd = self._auto_encrypter.encrypt(database, encoded_cmd)
# TODO: PYTHON-1922 avoid decoding the encrypted_cmd.
encrypt_cmd = _inflate_bson(

View File

@ -1421,13 +1421,14 @@ def _batched_write_command_impl(
value = bson.BSON.encode(doc, check_keys, opts)
# Is there enough room to add this document? max_cmd_size accounts for
# the two trailing null bytes.
doc_too_large = len(value) > max_cmd_size
enough_data = (buf.tell() + len(key) + len(value)) >= max_cmd_size
enough_documents = (idx >= max_write_batch_size)
if doc_too_large:
write_op = list(_FIELD_MAP.keys())[operation]
_raise_document_too_large(
write_op, len(value), max_bson_size)
if enough_data or enough_documents:
if not idx:
write_op = list(_FIELD_MAP.keys())[operation]
_raise_document_too_large(
write_op, len(value), max_bson_size)
break
buf.write(_BSONOBJ)
buf.write(key)

View File

@ -0,0 +1,102 @@
{
"00": "a",
"01": "a",
"02": "a",
"03": "a",
"04": "a",
"05": "a",
"06": "a",
"07": "a",
"08": "a",
"09": "a",
"10": "a",
"11": "a",
"12": "a",
"13": "a",
"14": "a",
"15": "a",
"16": "a",
"17": "a",
"18": "a",
"19": "a",
"20": "a",
"21": "a",
"22": "a",
"23": "a",
"24": "a",
"25": "a",
"26": "a",
"27": "a",
"28": "a",
"29": "a",
"30": "a",
"31": "a",
"32": "a",
"33": "a",
"34": "a",
"35": "a",
"36": "a",
"37": "a",
"38": "a",
"39": "a",
"40": "a",
"41": "a",
"42": "a",
"43": "a",
"44": "a",
"45": "a",
"46": "a",
"47": "a",
"48": "a",
"49": "a",
"50": "a",
"51": "a",
"52": "a",
"53": "a",
"54": "a",
"55": "a",
"56": "a",
"57": "a",
"58": "a",
"59": "a",
"60": "a",
"61": "a",
"62": "a",
"63": "a",
"64": "a",
"65": "a",
"66": "a",
"67": "a",
"68": "a",
"69": "a",
"70": "a",
"71": "a",
"72": "a",
"73": "a",
"74": "a",
"75": "a",
"76": "a",
"77": "a",
"78": "a",
"79": "a",
"80": "a",
"81": "a",
"82": "a",
"83": "a",
"84": "a",
"85": "a",
"86": "a",
"87": "a",
"88": "a",
"89": "a",
"90": "a",
"91": "a",
"92": "a",
"93": "a",
"94": "a",
"95": "a",
"96": "a",
"97": "a",
"98": "a",
"99": "a"
}

View File

@ -0,0 +1,31 @@
{
"status": {
"$numberInt": "1"
},
"_id": {
"$binary": {
"base64": "LOCALAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
},
"masterKey": {
"provider": "local"
},
"updateDate": {
"$date": {
"$numberLong": "1557827033449"
}
},
"keyMaterial": {
"$binary": {
"base64": "Ce9HSz/HKKGkIt4uyy+jDuKGA+rLC2cycykMo6vc8jXxqa1UVDYHWq1r+vZKbnnSRBfB981akzRKZCFpC05CTyFqDhXv6OnMjpG97OZEREGIsHEYiJkBW0jJJvfLLgeLsEpBzsro9FztGGXASxyxFRZFhXvHxyiLOKrdWfs7X1O/iK3pEoHMx6uSNSfUOgbebLfIqW7TO++iQS5g1xovXA==",
"subType": "00"
}
},
"creationDate": {
"$date": {
"$numberLong": "1557827033449"
}
},
"keyAltNames": [ "local" ]
}

File diff suppressed because it is too large Load Diff

View File

@ -40,7 +40,9 @@ from pymongo.errors import (ConfigurationError,
OperationFailure)
from pymongo.encryption import (Algorithm,
ClientEncryption)
from pymongo.errors import ConfigurationError, DocumentTooLarge
from pymongo.encryption_options import AutoEncryptionOpts, _HAVE_PYMONGOCRYPT
from pymongo.message import _COMMAND_OVERHEAD
from pymongo.mongo_client import MongoClient
from pymongo.operations import InsertOne
from pymongo.write_concern import WriteConcern
@ -48,6 +50,7 @@ from pymongo.write_concern import WriteConcern
from test import unittest, IntegrationTest, PyMongoTestCase, client_context
from test.utils import (TestCreator,
camel_to_snake_args,
OvertCommandListener,
rs_or_single_client,
wait_until)
from test.utils_spec_runner import SpecRunner
@ -904,5 +907,96 @@ class TestCorpus(EncryptionIntegrationTest):
self._test_corpus(opts)
class TestBsonSizeBatches(EncryptionIntegrationTest):
"""Prose tests for BSON size limits and batch splitting."""
@classmethod
def setUpClass(cls):
super(TestBsonSizeBatches, cls).setUpClass()
db = client_context.client.db
cls.coll = db.coll
cls.coll.drop()
# Configure the encrypted 'db.coll' collection via jsonSchema.
json_schema = json_data('limits', 'limits-schema.json')
db.create_collection(
'coll', validator={'$jsonSchema': json_schema}, codec_options=OPTS,
write_concern=WriteConcern(w='majority'))
# Create the key vault.
coll = client_context.client.get_database(
'admin',
write_concern=WriteConcern(w='majority'),
codec_options=OPTS)['datakeys']
coll.drop()
coll.insert_one(json_data('limits', 'limits-key.json'))
opts = AutoEncryptionOpts(
{'local': {'key': LOCAL_MASTER_KEY}}, 'admin.datakeys')
cls.listener = OvertCommandListener()
cls.client_encrypted = rs_or_single_client(
auto_encryption_opts=opts, event_listeners=[cls.listener])
cls.coll_encrypted = cls.client_encrypted.db.coll
@classmethod
def tearDownClass(cls):
cls.coll_encrypted.drop()
cls.client_encrypted.close()
super(TestBsonSizeBatches, cls).tearDownClass()
def test_01_insert_succeeds_under_2MiB(self):
doc = {'_id': 'no_encryption_under_2mib',
'unencrypted': 'a' * ((2**21) - 1000)}
self.coll_encrypted.insert_one(doc)
# Same with bulk_write.
doc = {'_id': 'no_encryption_under_2mib_bulk',
'unencrypted': 'a' * ((2**21) - 1000)}
self.coll_encrypted.bulk_write([InsertOne(doc)])
def test_02_insert_fails_over_2MiB(self):
doc = {'_id': 'no_encryption_over_2mib',
'unencrypted': 'a' * (2**21 + _COMMAND_OVERHEAD)}
with self.assertRaises(DocumentTooLarge):
self.coll_encrypted.insert_one(doc)
with self.assertRaises(DocumentTooLarge):
self.coll_encrypted.insert_many([doc])
with self.assertRaises(DocumentTooLarge):
self.coll_encrypted.bulk_write([InsertOne(doc)])
def test_03_insert_succeeds_over_2MiB_post_encryption(self):
doc = {'_id': 'encryption_exceeds_2mib',
'unencrypted': 'a' * ((2**21) - 2000)}
doc.update(json_data('limits', 'limits-doc.json'))
self.coll_encrypted.insert_one(doc)
# Same with bulk_write.
doc['_id'] = 'encryption_exceeds_2mib_bulk'
self.coll_encrypted.bulk_write([InsertOne(doc)])
def test_04_bulk_batch_split(self):
doc1 = {'_id': 'no_encryption_under_2mib_1',
'unencrypted': 'a' * ((2**21) - 1000)}
doc2 = {'_id': 'no_encryption_under_2mib_2',
'unencrypted': 'a' * ((2**21) - 1000)}
self.listener.reset()
self.coll_encrypted.bulk_write([InsertOne(doc1), InsertOne(doc2)])
self.assertEqual(
self.listener.started_command_names(), ['insert', 'insert'])
def test_05_bulk_batch_split(self):
limits_doc = json_data('limits', 'limits-doc.json')
doc1 = {'_id': 'encryption_exceeds_2mib_1',
'unencrypted': 'a' * ((2**21) - 2000)}
doc1.update(limits_doc)
doc2 = {'_id': 'encryption_exceeds_2mib_2',
'unencrypted': 'a' * ((2**21) - 2000)}
doc2.update(limits_doc)
self.listener.reset()
self.coll_encrypted.bulk_write([InsertOne(doc1), InsertOne(doc2)])
self.assertEqual(
self.listener.started_command_names(), ['insert', 'insert'])
if __name__ == "__main__":
unittest.main()

View File

@ -132,6 +132,10 @@ class EventListener(monitoring.CommandListener):
"""Return list of command names started."""
return [event.command_name for event in self.results['started']]
def reset(self):
"""Reset the state of this listener."""
self.results.clear()
class OvertCommandListener(EventListener):
"""A CommandListener that ignores sensitive commands."""