PYTHON-3110 Remove use of example.com in CSFLE tests (#848)
This commit is contained in:
parent
51691246e9
commit
561ee7cf77
@ -1205,8 +1205,8 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
kms_tls_options=KMS_TLS_OPTS)
|
||||
|
||||
kms_providers_invalid = copy.deepcopy(kms_providers)
|
||||
kms_providers_invalid['azure']['identityPlatformEndpoint'] = 'example.com:443'
|
||||
kms_providers_invalid['gcp']['endpoint'] = 'example.com:443'
|
||||
kms_providers_invalid['azure']['identityPlatformEndpoint'] = 'doesnotexist.invalid:443'
|
||||
kms_providers_invalid['gcp']['endpoint'] = 'doesnotexist.invalid:443'
|
||||
kms_providers_invalid['kmip']['endpoint'] = 'doesnotexist.local:5698'
|
||||
self.client_encryption_invalid = ClientEncryption(
|
||||
kms_providers=kms_providers_invalid,
|
||||
@ -1214,7 +1214,8 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
key_vault_client=client_context.client,
|
||||
codec_options=OPTS,
|
||||
kms_tls_options=KMS_TLS_OPTS)
|
||||
self._kmip_host_error = ''
|
||||
self._kmip_host_error = None
|
||||
self._invalid_host_error = None
|
||||
|
||||
def tearDown(self):
|
||||
self.client_encryption.close()
|
||||
@ -1295,9 +1296,9 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
"region": "us-east-1",
|
||||
"key": ("arn:aws:kms:us-east-1:579766882180:key/"
|
||||
"89fcc2c4-08b0-4bd9-9f25-e30687b580d0"),
|
||||
"endpoint": "example.com"
|
||||
"endpoint": "doesnotexist.invalid"
|
||||
}
|
||||
with self.assertRaisesRegex(EncryptionError, 'parse error'):
|
||||
with self.assertRaisesRegex(EncryptionError, self.invalid_host_error):
|
||||
self.client_encryption.create_data_key(
|
||||
'aws', master_key=master_key)
|
||||
|
||||
@ -1309,8 +1310,8 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
self.run_test_expected_success('azure', master_key)
|
||||
|
||||
# The full error should be something like:
|
||||
# "Invalid JSON in KMS response. HTTP status=404. Error: Got parse error at '<', position 0: 'SPECIAL_EXPECTED'"
|
||||
with self.assertRaisesRegex(EncryptionError, 'parse error'):
|
||||
# "[Errno 8] nodename nor servname provided, or not known"
|
||||
with self.assertRaisesRegex(EncryptionError, self.invalid_host_error):
|
||||
self.client_encryption_invalid.create_data_key(
|
||||
'azure', master_key=master_key)
|
||||
|
||||
@ -1326,8 +1327,8 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
self.run_test_expected_success('gcp', master_key)
|
||||
|
||||
# The full error should be something like:
|
||||
# "Invalid JSON in KMS response. HTTP status=404. Error: Got parse error at '<', position 0: 'SPECIAL_EXPECTED'"
|
||||
with self.assertRaisesRegex(EncryptionError, 'parse error'):
|
||||
# "[Errno 8] nodename nor servname provided, or not known"
|
||||
with self.assertRaisesRegex(EncryptionError, self.invalid_host_error):
|
||||
self.client_encryption_invalid.create_data_key(
|
||||
'gcp', master_key=master_key)
|
||||
|
||||
@ -1339,7 +1340,7 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
"location": "global",
|
||||
"keyRing": "key-ring-csfle",
|
||||
"keyName": "key-name-csfle",
|
||||
"endpoint": "example.com:443"}
|
||||
"endpoint": "doesnotexist.invalid:443"}
|
||||
|
||||
# The full error should be something like:
|
||||
# "Invalid KMS response, no access_token returned. HTTP status=200"
|
||||
@ -1347,22 +1348,30 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
self.client_encryption.create_data_key(
|
||||
'gcp', master_key=master_key)
|
||||
|
||||
def kmip_host_error(self):
|
||||
if self._kmip_host_error:
|
||||
return self._kmip_host_error
|
||||
def dns_error(self, host, port):
|
||||
# The full error should be something like:
|
||||
# "[Errno 8] nodename nor servname provided, or not known"
|
||||
try:
|
||||
socket.getaddrinfo('doesnotexist.local', 5698, socket.AF_INET,
|
||||
socket.SOCK_STREAM)
|
||||
except Exception as exc:
|
||||
self._kmip_host_error = re.escape(str(exc))
|
||||
return self._kmip_host_error
|
||||
with self.assertRaises(Exception) as ctx:
|
||||
socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
|
||||
return re.escape(str(ctx.exception))
|
||||
|
||||
@property
|
||||
def invalid_host_error(self):
|
||||
if self._invalid_host_error is None:
|
||||
self._invalid_host_error = self.dns_error(
|
||||
'doesnotexist.invalid', 443)
|
||||
return self._invalid_host_error
|
||||
|
||||
@property
|
||||
def kmip_host_error(self):
|
||||
if self._kmip_host_error is None:
|
||||
self._kmip_host_error = self.dns_error('doesnotexist.local', 5698)
|
||||
return self._kmip_host_error
|
||||
|
||||
def test_10_kmip_invalid_endpoint(self):
|
||||
key = {'keyId': '1'}
|
||||
self.run_test_expected_success('kmip', key)
|
||||
with self.assertRaisesRegex(EncryptionError, self.kmip_host_error()):
|
||||
with self.assertRaisesRegex(EncryptionError, self.kmip_host_error):
|
||||
self.client_encryption_invalid.create_data_key('kmip', key)
|
||||
|
||||
def test_11_kmip_master_key_endpoint(self):
|
||||
@ -1379,7 +1388,7 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
|
||||
|
||||
def test_12_kmip_master_key_invalid_endpoint(self):
|
||||
key = {'keyId': '1', 'endpoint': 'doesnotexist.local:5698'}
|
||||
with self.assertRaisesRegex(EncryptionError, self.kmip_host_error()):
|
||||
with self.assertRaisesRegex(EncryptionError, self.kmip_host_error):
|
||||
self.client_encryption.create_data_key('kmip', key)
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user