diff --git a/test/test_encryption.py b/test/test_encryption.py index 94a588bd6..00f76b7c9 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -24,7 +24,7 @@ import sys import textwrap import traceback import uuid -from typing import Any, Dict +from typing import Any, Dict, Mapping from pymongo.collection import Collection @@ -2202,6 +2202,86 @@ class TestExplicitQueryableEncryption(EncryptionIntegrationTest): self.assertEqual(decrypted, val) +# https://github.com/mongodb/specifications/blob/072601/source/client-side-encryption/tests/README.rst#rewrap +class TestRewrapWithSeparateClientEncryption(EncryptionIntegrationTest): + + MASTER_KEYS: Mapping[str, Mapping[str, Any]] = { + "aws": { + "region": "us-east-1", + "key": "arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0", + }, + "azure": { + "keyVaultEndpoint": "key-vault-csfle.vault.azure.net", + "keyName": "key-name-csfle", + }, + "gcp": { + "projectId": "devprod-drivers", + "location": "global", + "keyRing": "key-ring-csfle", + "keyName": "key-name-csfle", + }, + "kmip": {}, + "local": {}, + } + + def test_rewrap(self): + for src_provider in self.MASTER_KEYS: + for dst_provider in self.MASTER_KEYS: + with self.subTest(src_provider=src_provider, dst_provider=dst_provider): + self.run_test(src_provider, dst_provider) + + def run_test(self, src_provider, dst_provider): + # Step 1. Drop the collection ``keyvault.datakeys``. + self.client.keyvault.drop_collection("datakeys") + + # Step 2. Create a ``ClientEncryption`` object named ``client_encryption1`` + client_encryption1 = ClientEncryption( + key_vault_client=self.client, + key_vault_namespace="keyvault.datakeys", + kms_providers=ALL_KMS_PROVIDERS, + kms_tls_options=KMS_TLS_OPTS, + codec_options=OPTS, + ) + self.addCleanup(client_encryption1.close) + + # Step 3. Call ``client_encryption1.create_data_key`` with ``src_provider``. + key_id = client_encryption1.create_data_key( + master_key=self.MASTER_KEYS[src_provider], kms_provider=src_provider + ) + + # Step 4. Call ``client_encryption1.encrypt`` with the value "test" + cipher_text = client_encryption1.encrypt( + "test", key_id=key_id, algorithm=Algorithm.AEAD_AES_256_CBC_HMAC_SHA_512_Deterministic + ) + + # Step 5. Create a ``ClientEncryption`` object named ``client_encryption2`` + client2 = MongoClient() + self.addCleanup(client2.close) + client_encryption2 = ClientEncryption( + key_vault_client=client2, + key_vault_namespace="keyvault.datakeys", + kms_providers=ALL_KMS_PROVIDERS, + kms_tls_options=KMS_TLS_OPTS, + codec_options=OPTS, + ) + self.addCleanup(client_encryption1.close) + + # Step 6. Call ``client_encryption2.rewrap_many_data_key`` with an empty ``filter``. + rewrap_many_data_key_result = client_encryption2.rewrap_many_data_key( + {}, provider=dst_provider, master_key=self.MASTER_KEYS[dst_provider] + ) + + self.assertEqual(rewrap_many_data_key_result.bulk_write_result.modified_count, 1) + + # 7. Call ``client_encryption1.decrypt`` with the ``cipher_text``. Assert the return value is "test". + decrypt_result1 = client_encryption1.decrypt(cipher_text) + self.assertEqual(decrypt_result1, "test") + + # 8. Call ``client_encryption2.decrypt`` with the ``cipher_text``. Assert the return value is "test". + decrypt_result2 = client_encryption2.decrypt(cipher_text) + self.assertEqual(decrypt_result2, "test") + + class TestQueryableEncryptionDocsExample(EncryptionIntegrationTest): # Queryable Encryption is not supported on Standalone topology. @client_context.require_no_standalone