PYTHON-3385 Add prose test for RewrapManyDataKey (#1034)

This commit is contained in:
Steven Silvester 2022-08-12 13:53:07 -05:00 committed by GitHub
parent 3204290e93
commit c0dadcb6ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -24,7 +24,7 @@ import sys
import textwrap
import traceback
import uuid
from typing import Any, Dict
from typing import Any, Dict, Mapping
from pymongo.collection import Collection
@ -2202,6 +2202,86 @@ class TestExplicitQueryableEncryption(EncryptionIntegrationTest):
self.assertEqual(decrypted, val)
# https://github.com/mongodb/specifications/blob/072601/source/client-side-encryption/tests/README.rst#rewrap
class TestRewrapWithSeparateClientEncryption(EncryptionIntegrationTest):
MASTER_KEYS: Mapping[str, Mapping[str, Any]] = {
"aws": {
"region": "us-east-1",
"key": "arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0",
},
"azure": {
"keyVaultEndpoint": "key-vault-csfle.vault.azure.net",
"keyName": "key-name-csfle",
},
"gcp": {
"projectId": "devprod-drivers",
"location": "global",
"keyRing": "key-ring-csfle",
"keyName": "key-name-csfle",
},
"kmip": {},
"local": {},
}
def test_rewrap(self):
for src_provider in self.MASTER_KEYS:
for dst_provider in self.MASTER_KEYS:
with self.subTest(src_provider=src_provider, dst_provider=dst_provider):
self.run_test(src_provider, dst_provider)
def run_test(self, src_provider, dst_provider):
# Step 1. Drop the collection ``keyvault.datakeys``.
self.client.keyvault.drop_collection("datakeys")
# Step 2. Create a ``ClientEncryption`` object named ``client_encryption1``
client_encryption1 = ClientEncryption(
key_vault_client=self.client,
key_vault_namespace="keyvault.datakeys",
kms_providers=ALL_KMS_PROVIDERS,
kms_tls_options=KMS_TLS_OPTS,
codec_options=OPTS,
)
self.addCleanup(client_encryption1.close)
# Step 3. Call ``client_encryption1.create_data_key`` with ``src_provider``.
key_id = client_encryption1.create_data_key(
master_key=self.MASTER_KEYS[src_provider], kms_provider=src_provider
)
# Step 4. Call ``client_encryption1.encrypt`` with the value "test"
cipher_text = client_encryption1.encrypt(
"test", key_id=key_id, algorithm=Algorithm.AEAD_AES_256_CBC_HMAC_SHA_512_Deterministic
)
# Step 5. Create a ``ClientEncryption`` object named ``client_encryption2``
client2 = MongoClient()
self.addCleanup(client2.close)
client_encryption2 = ClientEncryption(
key_vault_client=client2,
key_vault_namespace="keyvault.datakeys",
kms_providers=ALL_KMS_PROVIDERS,
kms_tls_options=KMS_TLS_OPTS,
codec_options=OPTS,
)
self.addCleanup(client_encryption1.close)
# Step 6. Call ``client_encryption2.rewrap_many_data_key`` with an empty ``filter``.
rewrap_many_data_key_result = client_encryption2.rewrap_many_data_key(
{}, provider=dst_provider, master_key=self.MASTER_KEYS[dst_provider]
)
self.assertEqual(rewrap_many_data_key_result.bulk_write_result.modified_count, 1)
# 7. Call ``client_encryption1.decrypt`` with the ``cipher_text``. Assert the return value is "test".
decrypt_result1 = client_encryption1.decrypt(cipher_text)
self.assertEqual(decrypt_result1, "test")
# 8. Call ``client_encryption2.decrypt`` with the ``cipher_text``. Assert the return value is "test".
decrypt_result2 = client_encryption2.decrypt(cipher_text)
self.assertEqual(decrypt_result2, "test")
class TestQueryableEncryptionDocsExample(EncryptionIntegrationTest):
# Queryable Encryption is not supported on Standalone topology.
@client_context.require_no_standalone