PYTHON-1899 Implement encryption corpus test

Validate type to decrypt is Binary subtype 6.
Skip tests for deprecated symbol and dbPointer types.
This commit is contained in:
Shane Harvey 2019-08-07 12:26:37 -07:00
parent 59e3bcb66a
commit 72c777257a
7 changed files with 9263 additions and 20 deletions

View File

@ -30,7 +30,7 @@ except ImportError:
MongoCryptCallback = object
from bson import _bson_to_dict, _dict_to_bson, decode, encode
from bson.binary import STANDARD
from bson.binary import STANDARD, Binary
from bson.codec_options import CodecOptions
from bson.raw_bson import (DEFAULT_RAW_BSON_OPTIONS,
RawBSONDocument,
@ -401,7 +401,7 @@ class ClientEncryption(object):
The encrypted value, a :class:`~bson.binary.Binary` with subtype 6.
"""
# TODO: Add a required codec_options argument for encoding?
doc = encode({'v': value})
doc = encode({'v': value}, codec_options=_DATA_KEY_OPTS)
if isinstance(key_id, uuid.UUID):
raw_key_id = key_id.bytes
else:
@ -420,10 +420,13 @@ class ClientEncryption(object):
:Returns:
The decrypted BSON value.
"""
if not (isinstance(value, Binary) and value.subtype == 6):
raise TypeError(
'value to decrypt must be a bson.binary.Binary with subtype 6')
doc = encode({'v': value})
decrypted_doc = self._encryption.decrypt(doc)
# TODO: Add a required codec_options argument for decoding?
return decode(decrypted_doc)['v']
return decode(decrypted_doc, codec_options=_DATA_KEY_OPTS)['v']
def close(self):
"""Release resources."""

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,33 @@
{
"status": {
"$numberInt": "1"
},
"_id": {
"$binary": {
"base64": "AWSAAAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
},
"masterKey": {
"region": "us-east-1",
"key": "arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0",
"provider": "aws"
},
"updateDate": {
"$date": {
"$numberLong": "1557827033449"
}
},
"keyMaterial": {
"$binary": {
"base64": "AQICAHhQNmWG2CzOm1dq3kWLM+iDUZhEqnhJwH9wZVpuZ94A8gEqnsxXlR51T5EbEVezUqqKAAAAwjCBvwYJKoZIhvcNAQcGoIGxMIGuAgEAMIGoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDHa4jo6yp0Z18KgbUgIBEIB74sKxWtV8/YHje5lv5THTl0HIbhSwM6EqRlmBiFFatmEWaeMk4tO4xBX65eq670I5TWPSLMzpp8ncGHMmvHqRajNBnmFtbYxN3E3/WjxmdbOOe+OXpnGJPcGsftc7cB2shRfA4lICPnE26+oVNXT6p0Lo20nY5XC7jyCO",
"subType": "00"
}
},
"creationDate": {
"$date": {
"$numberLong": "1557827033449"
}
},
"keyAltNames": ["aws"]
}

View File

@ -0,0 +1,31 @@
{
"status": {
"$numberInt": "1"
},
"_id": {
"$binary": {
"base64": "LOCALAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
},
"masterKey": {
"provider": "local"
},
"updateDate": {
"$date": {
"$numberLong": "1557827033449"
}
},
"keyMaterial": {
"$binary": {
"base64": "Ce9HSz/HKKGkIt4uyy+jDuKGA+rLC2cycykMo6vc8jXxqa1UVDYHWq1r+vZKbnnSRBfB981akzRKZCFpC05CTyFqDhXv6OnMjpG97OZEREGIsHEYiJkBW0jJJvfLLgeLsEpBzsro9FztGGXASxyxFRZFhXvHxyiLOKrdWfs7X1O/iK3pEoHMx6uSNSfUOgbebLfIqW7TO++iQS5g1xovXA==",
"subType": "00"
}
},
"creationDate": {
"$date": {
"$numberLong": "1557827033449"
}
},
"keyAltNames": [ "local" ]
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -14,15 +14,18 @@
"""Test client side encryption spec."""
import base64
import copy
import os
import socket
import traceback
import sys
import uuid
sys.path[0:0] = [""]
from bson import BSON, json_util
from bson.binary import STANDARD, Binary
from bson.binary import STANDARD, Binary, UUID_SUBTYPE
from bson.codec_options import CodecOptions
from bson.json_util import JSONOptions
from bson.raw_bson import RawBSONDocument
@ -34,7 +37,10 @@ from pymongo.mongo_client import MongoClient
from pymongo.write_concern import WriteConcern
from test import unittest, IntegrationTest, PyMongoTestCase, client_context
from test.utils import TestCreator, camel_to_snake_args, wait_until
from test.utils import (TestCreator,
camel_to_snake_args,
rs_or_single_client,
wait_until)
from test.utils_spec_runner import SpecRunner
@ -133,42 +139,48 @@ class EncryptionIntegrationTest(IntegrationTest):
def setUpClass(cls):
super(EncryptionIntegrationTest, cls).setUpClass()
def assertEncrypted(self, val):
self.assertIsInstance(val, Binary)
self.assertEqual(val.subtype, 6)
# Location of JSON test files.
BASE = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'client-side-encryption')
CUSTOM_PATH = os.path.join(BASE, 'custom')
SPEC_PATH = os.path.join(BASE, 'spec')
OPTS = CodecOptions(uuid_representation=STANDARD)
# Use SON to preserve the order of fields while parsing json.
JSON_OPTS = JSONOptions(document_class=SON, uuid_representation=STANDARD)
# Use SON to preserve the order of fields while parsing json. Use tz_aware
# =False to match how CodecOptions decodes dates.
JSON_OPTS = JSONOptions(document_class=SON, uuid_representation=STANDARD,
tz_aware=False)
def read(filename):
with open(os.path.join(CUSTOM_PATH, filename)) as fp:
def read(*paths):
with open(os.path.join(BASE, *paths)) as fp:
return fp.read()
def json_data(filename):
return json_util.loads(read(filename), json_options=JSON_OPTS)
def json_data(*paths):
return json_util.loads(read(*paths), json_options=JSON_OPTS)
def bson_data(filename):
return BSON.encode(json_data(filename), codec_options=OPTS)
def bson_data(*paths):
return BSON.encode(json_data(*paths), codec_options=OPTS)
class TestClientSimple(EncryptionIntegrationTest):
def _test_auto_encrypt(self, opts):
client = MongoClient(auto_encryption_opts=opts)
client = rs_or_single_client(auto_encryption_opts=opts)
self.addCleanup(client.close)
# Create the encrypted field's data key.
key_vault = self.client.admin.get_collection(
'datakeys', codec_options=OPTS)
data_key = RawBSONDocument(bson_data('key-document-local.json'))
data_key = RawBSONDocument(
bson_data('custom', 'key-document-local.json'))
key_vault.insert_one(data_key)
self.addCleanup(key_vault.drop)
@ -217,12 +229,11 @@ class TestClientSimple(EncryptionIntegrationTest):
# Make sure the field is actually encrypted.
for encrypted_doc in self.db.test.find():
self.assertIsInstance(encrypted_doc['_id'], int)
self.assertIsInstance(encrypted_doc['ssn'], Binary)
self.assertEqual(encrypted_doc['ssn'].subtype, 6)
self.assertEncrypted(encrypted_doc['ssn'])
def test_auto_encrypt(self):
# Configure the encrypted field via jsonSchema.
json_schema = json_data('schema.json')
json_schema = json_data('custom', 'schema.json')
coll = self.db.create_collection(
'test', validator={'$jsonSchema': json_schema}, codec_options=OPTS)
self.addCleanup(coll.drop)
@ -232,7 +243,7 @@ class TestClientSimple(EncryptionIntegrationTest):
def test_auto_encrypt_local_schema_map(self):
# Configure the encrypted field via the local schema_map option.
schemas = {'pymongo_test.test': json_data('schema.json')}
schemas = {'pymongo_test.test': json_data('custom', 'schema.json')}
opts = AutoEncryptionOpts(
KMS_PROVIDERS, 'admin.datakeys', schema_map=schemas)
@ -276,6 +287,17 @@ class TestExplicitSimple(EncryptionIntegrationTest):
decrypted_ssn = client_encryption.decrypt(encrypted_ssn)
self.assertEqual(decrypted_ssn, doc['ssn'])
def test_validation(self):
client_encryption = ClientEncryption(
KMS_PROVIDERS, 'admin.datakeys', client_context.client)
self.addCleanup(client_encryption.close)
msg = 'value to decrypt must be a bson.binary.Binary with subtype 6'
with self.assertRaisesRegex(TypeError, msg):
client_encryption.decrypt('str')
with self.assertRaisesRegex(TypeError, msg):
client_encryption.decrypt(Binary(b'123'))
# Spec tests
AWS_CREDS = {
@ -375,5 +397,172 @@ test_creator = TestCreator(create_test, TestSpec, SPEC_PATH)
test_creator.create_tests()
# Prose Tests
LOCAL_MASTER_KEY = base64.b64decode(
b'Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ'
b'5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk')
LOCAL_KEY_ID = Binary(
base64.b64decode(b'LOCALAAAAAAAAAAAAAAAAA=='), UUID_SUBTYPE)
AWS_KEY_ID = Binary(
base64.b64decode(b'AWSAAAAAAAAAAAAAAAAAAA=='), UUID_SUBTYPE)
def create_with_schema(coll, json_schema):
"""Create and return a Collection with a jsonSchema."""
coll.drop()
return coll.database.create_collection(
coll.name, validator={'$jsonSchema': json_schema}, codec_options=OPTS)
def create_key_vault(vault, *data_keys):
"""Create the key vault collection with optional data keys."""
vault = vault.with_options(
write_concern=WriteConcern(w='majority'),
codec_options=OPTS)
vault.drop()
if data_keys:
vault.insert_many(data_keys)
return vault
class TestCorpus(EncryptionIntegrationTest):
@classmethod
@unittest.skipUnless(all(AWS_CREDS.values()),
'AWS environment credentials are not set')
def setUpClass(cls):
super(TestCorpus, cls).setUpClass()
@staticmethod
def kms_providers():
return {'aws': AWS_CREDS, 'local': {'key': LOCAL_MASTER_KEY}}
@staticmethod
def fix_up_schema(json_schema):
"""Remove deprecated symbol/dbPointer types from json schema."""
for key in json_schema['properties'].keys():
if '_symbol_' in key or '_dbPointer_' in key:
del json_schema['properties'][key]
return json_schema
@staticmethod
def fix_up_curpus(corpus):
"""Disallow deprecated symbol/dbPointer types from corpus test."""
for key in corpus:
if '_symbol_' in key or '_dbPointer_' in key:
corpus[key]['allowed'] = False
return corpus
@staticmethod
def fix_up_curpus_encrypted(corpus_encrypted, corpus):
"""Fix the expected values for deprecated symbol/dbPointer types."""
for key in corpus_encrypted:
if '_symbol_' in key or '_dbPointer_' in key:
corpus_encrypted[key] = copy.deepcopy(corpus[key])
return corpus_encrypted
def _test_corpus(self, opts):
# Drop and create the collection 'db.coll' with jsonSchema.
coll = create_with_schema(
self.client.db.coll,
self.fix_up_schema(json_data('corpus', 'corpus-schema.json')))
self.addCleanup(coll.drop)
vault = create_key_vault(
self.client.admin.datakeys,
json_data('corpus', 'corpus-key-local.json'),
json_data('corpus', 'corpus-key-aws.json'))
self.addCleanup(vault.drop)
client_encrypted = rs_or_single_client(
auto_encryption_opts=opts, uuidRepresentation='standard')
self.addCleanup(client_encrypted.close)
client_encryption = ClientEncryption(
self.kms_providers(), 'admin.datakeys', client_context.client)
self.addCleanup(client_encryption.close)
corpus = self.fix_up_curpus(json_data('corpus', 'corpus.json'))
corpus_copied = SON()
for key, value in corpus.items():
corpus_copied[key] = copy.deepcopy(value)
if key in ('_id', 'altname_aws', 'altname_local'):
continue
if value['method'] == 'auto':
continue
if value['method'] == 'explicit':
identifier = value['identifier']
self.assertIn(identifier, ('id', 'altname'))
kms = value['kms']
self.assertIn(kms, ('local', 'aws'))
if identifier == 'id':
if kms == 'local':
kwargs = dict(key_id=LOCAL_KEY_ID)
else:
kwargs = dict(key_id=AWS_KEY_ID)
else:
kwargs = dict(key_alt_name=kms)
self.assertIn(value['algo'], ('det', 'rand'))
if value['algo'] == 'det':
algo = Algorithm.Deterministic
else:
algo = Algorithm.Random
try:
encrypted_val = client_encryption.encrypt(
value['value'], algo, **kwargs)
if not value['allowed']:
self.fail('encrypt should have failed: %r: %r' % (
key, value))
corpus_copied[key]['value'] = encrypted_val
except Exception:
if value['allowed']:
tb = traceback.format_exc()
self.fail('encrypt failed: %r: %r, traceback: %s' % (
key, value, tb))
client_encrypted.db.coll.insert_one(corpus_copied)
corpus_decrypted = client_encrypted.db.coll.find_one()
self.assertEqual(corpus_decrypted, corpus)
corpus_encrypted_expected = self.fix_up_curpus_encrypted(json_data(
'corpus', 'corpus-encrypted.json'), corpus)
corpus_encrypted_actual = coll.find_one()
for key, value in corpus_encrypted_actual.items():
if key in ('_id', 'altname_aws', 'altname_local'):
continue
if value['algo'] == 'det':
self.assertEqual(
value['value'], corpus_encrypted_expected[key]['value'],
key)
elif value['algo'] == 'rand' and value['allowed']:
self.assertNotEqual(
value['value'], corpus_encrypted_expected[key]['value'],
key)
if value['allowed']:
decrypt_actual = client_encryption.decrypt(value['value'])
decrypt_expected = client_encryption.decrypt(
corpus_encrypted_expected[key]['value'])
self.assertEqual(decrypt_actual, decrypt_expected, key)
else:
self.assertEqual(value['value'], corpus[key]['value'], key)
def test_corpus(self):
opts = AutoEncryptionOpts(self.kms_providers(), 'admin.datakeys')
self._test_corpus(opts)
def test_corpus_local_schema(self):
# Configure the encrypted field via the local schema_map option.
schemas = {'db.coll': self.fix_up_schema(
json_data('corpus', 'corpus-schema.json'))}
opts = AutoEncryptionOpts(
self.kms_providers(), 'admin.datakeys', schema_map=schemas)
self._test_corpus(opts)
if __name__ == "__main__":
unittest.main()