PYTHON-2879 Fix get_ssl_context for CSFLE and ocsptest.py (#713)

This commit is contained in:
Shane Harvey 2021-08-24 13:36:37 -04:00 committed by GitHub
parent fa9531b4bf
commit 2eb0df812c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 32 additions and 56 deletions

View File

@ -109,8 +109,8 @@ def _parse_ssl_options(options):
certfile,
passphrase,
ca_certs,
allow_invalid_certificates,
crlfile,
allow_invalid_certificates,
allow_invalid_hostnames,
disable_ocsp_endpoint_check)
return ctx, allow_invalid_hostnames

View File

@ -49,15 +49,11 @@ from pymongo.errors import (ConfigurationError,
from pymongo.mongo_client import MongoClient
from pymongo.pool import _configured_socket, PoolOptions
from pymongo.read_concern import ReadConcern
from pymongo.ssl_support import get_ssl_context, HAVE_SSL
from pymongo.ssl_support import get_ssl_context
from pymongo.uri_parser import parse_host
from pymongo.write_concern import WriteConcern
from pymongo.daemon import _spawn_daemon
if HAVE_SSL:
from ssl import CERT_REQUIRED
else:
CERT_REQUIRED = None
_HTTPS_PORT = 443
_KMS_CONNECT_TIMEOUT = 10 # TODO: CDRIVER-3262 will define this value.
@ -114,14 +110,13 @@ class _EncryptionIO(MongoCryptCallback):
# Enable strict certificate verification, OCSP, match hostname, and
# SNI using the system default CA certificates.
ctx = get_ssl_context(
None, # certfile
None, # keyfile
None, # passphrase
None, # ca_certs
CERT_REQUIRED, # cert_reqs
None, # crlfile
True, # match_hostname
True) # check_ocsp_endpoint
None, # certfile
None, # passphrase
None, # ca_certs
None, # crlfile
False, # allow_invalid_certificates
False, # allow_invalid_hostnames
False) # disable_ocsp_endpoint_check
opts = PoolOptions(connect_timeout=_KMS_CONNECT_TIMEOUT,
socket_timeout=_KMS_CONNECT_TIMEOUT,
ssl_context=ctx)

View File

@ -401,7 +401,7 @@ class PoolOptions(object):
@property
def tls_allow_invalid_hostnames(self):
"""Call ssl.match_hostname if cert_reqs is not ssl.CERT_NONE.
"""If True skip ssl.match_hostname.
"""
return self.__tls_allow_invalid_hostnames

View File

@ -35,20 +35,15 @@ if HAVE_SSL:
# at a high level. This is legacy behavior, but requires us to
# import the ssl module even if we're only using it for this purpose.
import ssl as _stdlibssl
from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from ssl import CERT_NONE, CERT_REQUIRED
HAS_SNI = _ssl.HAS_SNI
IPADDR_SAFE = _ssl.IS_PYOPENSSL or sys.version_info[:2] >= (3, 7)
SSLError = _ssl.SSLError
def get_ssl_context(*args):
def get_ssl_context(certfile, passphrase, ca_certs, crlfile,
allow_invalid_certificates, allow_invalid_hostnames,
disable_ocsp_endpoint_check):
"""Create and return an SSLContext object."""
(certfile,
passphrase,
ca_certs,
allow_invalid_certificates,
crlfile,
allow_invalid_hostnames,
disable_ocsp_endpoint_check) = args
verify_mode = CERT_NONE if allow_invalid_certificates else CERT_REQUIRED
ctx = _ssl.SSLContext(_ssl.PROTOCOL_SSLv23)
# SSLContext.check_hostname was added in CPython 3.4.

View File

@ -115,7 +115,7 @@ class ClientUnitTest(unittest.TestCase):
read_preference=ReadPreference.PRIMARY,
ssl=False,
tlsCertificateKeyFile=None,
tlsAllowInvalidCertificates=True, # ssl.CERT_NONE
tlsAllowInvalidCertificates=True,
tlsCAFile=None,
connect=False,
serverSelectionTimeoutMS=12000)

View File

@ -291,17 +291,13 @@ class TestSSL(IntegrationTest):
#
# --sslPEMKeyFile=/path/to/pymongo/test/certificates/server.pem
# --sslCAFile=/path/to/pymongo/test/certificates/ca.pem
ctx = get_ssl_context(
None, None, None, True, None, True, False)
ctx = get_ssl_context(None, None, None, None, True, True, False)
self.assertFalse(ctx.check_hostname)
ctx = get_ssl_context(
None, None, None, True, None, False, False)
ctx = get_ssl_context(None, None, None, None, True, False, False)
self.assertFalse(ctx.check_hostname)
ctx = get_ssl_context(
None, None, None, False, None, True, False)
ctx = get_ssl_context(None, None, None, None, False, True, False)
self.assertFalse(ctx.check_hostname)
ctx = get_ssl_context(
None, None, None, False, None, False, False)
ctx = get_ssl_context(None, None, None, None, False, False, False)
if _PY37PLUS or _HAVE_PYOPENSSL:
self.assertTrue(ctx.check_hostname)
else:
@ -424,8 +420,7 @@ class TestSSL(IntegrationTest):
**self.credentials))
def test_system_certs_config_error(self):
ctx = get_ssl_context(
None, None, None, ssl.CERT_NONE, None, True, False)
ctx = get_ssl_context(None, None, None, None, True, True, False)
if ((sys.platform != "win32"
and hasattr(ctx, "set_default_verify_paths"))
or hasattr(ctx, "load_default_certs")):
@ -457,12 +452,12 @@ class TestSSL(IntegrationTest):
# Force the test on Windows, regardless of environment.
ssl_support.HAVE_WINCERTSTORE = False
try:
ctx = get_ssl_context(
None, None, None, CA_PEM, ssl.CERT_REQUIRED, None, True, True)
ctx = get_ssl_context(None, None, CA_PEM, None, False, False,
False)
ssl_sock = ctx.wrap_socket(socket.socket())
self.assertEqual(ssl_sock.ca_certs, CA_PEM)
ctx = get_ssl_context(None, None, None, None, None, None, True, True)
ctx = get_ssl_context(None, None, None, None, False, False, False)
ssl_sock = ctx.wrap_socket(socket.socket())
self.assertEqual(ssl_sock.ca_certs, ssl_support.certifi.where())
finally:
@ -479,12 +474,11 @@ class TestSSL(IntegrationTest):
if not ssl_support.HAVE_WINCERTSTORE:
raise SkipTest("Need wincertstore to test wincertstore.")
ctx = get_ssl_context(
None, None, None, CA_PEM, ssl.CERT_REQUIRED, None, True, True)
ctx = get_ssl_context(None, None, CA_PEM, None, False, False, False)
ssl_sock = ctx.wrap_socket(socket.socket())
self.assertEqual(ssl_sock.ca_certs, CA_PEM)
ctx = get_ssl_context(None, None, None, None, None, None, True, True)
ctx = get_ssl_context(None, None, None, None, False, False, False)
ssl_sock = ctx.wrap_socket(socket.socket())
self.assertEqual(ssl_sock.ca_certs, ssl_support._WINCERTS.name)

View File

@ -18,11 +18,6 @@ import copy
import sys
import warnings
try:
from ssl import CERT_NONE
except ImportError:
CERT_NONE = 0
sys.path[0:0] = [""]
from bson.binary import JAVA_LEGACY

View File

@ -16,8 +16,6 @@ import argparse
import logging
import socket
from ssl import CERT_REQUIRED
from pymongo.pyopenssl_context import SSLContext
from pymongo.ssl_support import get_ssl_context
@ -28,14 +26,13 @@ logging.basicConfig(format=FORMAT, level=logging.DEBUG)
def check_ocsp(host, port, capath):
ctx = get_ssl_context(
None, # certfile
None, # keyfile
None, # passphrase
capath,
CERT_REQUIRED,
None, # crlfile
True, # match_hostname
True) # check_ocsp_endpoint
None, # certfile
None, # passphrase
capath, # ca_certs
None, # crlfile
False, # allow_invalid_certificates
False, # allow_invalid_hostnames
False) # disable_ocsp_endpoint_check
# Ensure we're using pyOpenSSL.
assert isinstance(ctx, SSLContext)