PYTHON-2162 Remove support for ssl* URI options (#706)

This commit is contained in:
Prashant Mital 2021-08-19 14:58:31 -07:00 committed by GitHub
parent f9bfd11290
commit b3118e034e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 245 additions and 346 deletions

View File

@ -79,6 +79,40 @@ Removed the ``socketKeepAlive`` keyword argument to
keepalive. For more information see:
https://docs.mongodb.com/manual/faq/diagnostics/#does-tcp-keepalive-time-affect-mongodb-deployments
Renamed URI options
...................
Several deprecated URI options have been renamed to the standardized
option names defined in the
`URI options specification <https://github.com/mongodb/specifications/blob/master/source/uri-options/uri-options.rst>`_.
The old option names and their renamed equivalents are summarized in the table
below. Some renamed options have different semantics from the option being
replaced as noted in the 'Migration Notes' column.
+--------------------+-------------------------------+-------------------------------------------------------------+
| Old URI Option | Renamed URI Option | Migration Notes |
+====================+===============================+=============================================================+
| ssl_pem_passphrase | tlsCertificateKeyFilePassword | - |
+--------------------+-------------------------------+-------------------------------------------------------------+
| ssl_ca_certs | tlsCAFile | - |
+--------------------+-------------------------------+-------------------------------------------------------------+
| ssl_crlfile | tlsCRLFile | - |
+--------------------+-------------------------------+-------------------------------------------------------------+
| ssl_match_hostname | tlsAllowInvalidHostnames | ``ssl_match_hostname=True`` is equivalent to |
| | | ``tlsAllowInvalidHostnames=False`` and vice-versa. |
+--------------------+-------------------------------+-------------------------------------------------------------+
| ssl_cert_reqs | tlsAllowInvalidCertificates | Instead of ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` |
| | | and ``ssl.CERT_REQUIRED``, ``tlsAllowInvalidCertificates`` |
| | | expects a boolean value - ``True`` is equivalent to |
| | | ``ssl.CERT_NONE``, while ``False`` is equivalent to |
| | | ``ssl.CERT_REQUIRED``. |
+--------------------+-------------------------------+-------------------------------------------------------------+
| ssl_certfile | tlsCertificateKeyFile | Instead of using ``ssl_certfile`` and ``ssl_keyfile`` |
| | | to specify the certificate and private key files, |
+--------------------+ | ``tlsCertificateKeyFile`` expects a single file containing |
| ssl_keyfile | | both the client certificate and the private key. |
+--------------------+-------------------------------+-------------------------------------------------------------+
MongoClient.fsync is removed
............................

View File

@ -69,43 +69,52 @@ def _parse_read_concern(options):
def _parse_ssl_options(options):
"""Parse ssl options."""
use_ssl = options.get('ssl')
if use_ssl is not None:
validate_boolean('ssl', use_ssl)
use_tls = options.get('tls')
if use_tls is not None:
validate_boolean('tls', use_tls)
certfile = options.get('ssl_certfile')
keyfile = options.get('ssl_keyfile')
passphrase = options.get('ssl_pem_passphrase')
ca_certs = options.get('ssl_ca_certs')
cert_reqs = options.get('ssl_cert_reqs')
match_hostname = options.get('ssl_match_hostname', True)
crlfile = options.get('ssl_crlfile')
check_ocsp_endpoint = options.get('ssl_check_ocsp_endpoint', True)
certfile = options.get('tlscertificatekeyfile')
passphrase = options.get('tlscertificatekeyfilepassword')
ca_certs = options.get('tlscafile')
crlfile = options.get('tlscrlfile')
allow_invalid_certificates = options.get('tlsallowinvalidcertificates', False)
allow_invalid_hostnames = options.get('tlsallowinvalidhostnames', False)
disable_ocsp_endpoint_check = options.get('tlsdisableocspendpointcheck', False)
ssl_kwarg_keys = [k for k in options
if k.startswith('ssl_') and options[k]]
if use_ssl is False and ssl_kwarg_keys:
raise ConfigurationError("ssl has not been enabled but the "
"following ssl parameters have been set: "
"%s. Please set `ssl=True` or remove."
% ', '.join(ssl_kwarg_keys))
enabled_tls_opts = []
for opt in ('tlscertificatekeyfile', 'tlscertificatekeyfilepassword',
'tlscafile', 'tlscrlfile'):
# Any non-null value of these options implies tls=True.
if opt in options and options[opt]:
enabled_tls_opts.append(opt)
for opt in ('tlsallowinvalidcertificates', 'tlsallowinvalidhostnames',
'tlsdisableocspendpointcheck'):
# A value of False for these options implies tls=True.
if opt in options and not options[opt]:
enabled_tls_opts.append(opt)
if ssl_kwarg_keys and use_ssl is None:
# ssl options imply ssl = True
use_ssl = True
if enabled_tls_opts:
if use_tls is None:
# Implicitly enable TLS when one of the tls* options is set.
use_tls = True
elif not use_tls:
# Error since tls is explicitly disabled but a tls option is set.
raise ConfigurationError("TLS has not been enabled but the "
"following tls parameters have been set: "
"%s. Please set `tls=True` or remove."
% ', '.join(enabled_tls_opts))
if use_ssl is True:
if use_tls:
ctx = get_ssl_context(
certfile,
keyfile,
passphrase,
ca_certs,
cert_reqs,
allow_invalid_certificates,
crlfile,
match_hostname,
check_ocsp_endpoint)
return ctx, match_hostname
return None, match_hostname
allow_invalid_hostnames,
disable_ocsp_endpoint_check)
return ctx, allow_invalid_hostnames
return None, allow_invalid_hostnames
def _parse_pool_options(options):
@ -127,14 +136,14 @@ def _parse_pool_options(options):
compression_settings = CompressionSettings(
options.get('compressors', []),
options.get('zlibcompressionlevel', -1))
ssl_context, ssl_match_hostname = _parse_ssl_options(options)
ssl_context, tls_allow_invalid_hostnames = _parse_ssl_options(options)
load_balanced = options.get('loadbalanced')
return PoolOptions(max_pool_size,
min_pool_size,
max_idle_time_seconds,
connect_timeout, socket_timeout,
wait_queue_timeout,
ssl_context, ssl_match_hostname,
ssl_context, tls_allow_invalid_hostnames,
_EventListeners(event_listeners),
appname,
driver,

View File

@ -35,8 +35,6 @@ from pymongo.errors import ConfigurationError
from pymongo.monitoring import _validate_event_listeners
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import _MONGOS_MODES, _ServerMode
from pymongo.ssl_support import (validate_cert_reqs,
validate_allow_invalid_certs)
from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern
ORDERED_TYPES = (SON, OrderedDict)
@ -585,18 +583,11 @@ def validate_tzinfo(dummy, value):
# Dictionary where keys are the names of public URI options, and values
# are lists of aliases for that option. Aliases of option names are assumed
# to have been deprecated.
# are lists of aliases for that option.
URI_OPTIONS_ALIAS_MAP = {
'journal': ['j'],
'wtimeoutms': ['wtimeout'],
'tls': ['ssl'],
'tlsallowinvalidcertificates': ['ssl_cert_reqs'],
'tlsallowinvalidhostnames': ['ssl_match_hostname'],
'tlscrlfile': ['ssl_crlfile'],
'tlscafile': ['ssl_ca_certs'],
'tlscertificatekeyfile': ['ssl_certfile'],
'tlscertificatekeyfilepassword': ['ssl_pem_passphrase'],
}
# Dictionary where keys are the names of URI options, and values
@ -626,18 +617,13 @@ URI_OPTIONS_VALIDATOR_MAP = {
'loadbalanced': validate_boolean_or_string,
'serverselectiontimeoutms': validate_timeout_or_zero,
'sockettimeoutms': validate_timeout_or_none_or_zero,
'ssl_keyfile': validate_readable,
'tls': validate_boolean_or_string,
'tlsallowinvalidcertificates': validate_allow_invalid_certs,
'ssl_cert_reqs': validate_cert_reqs,
# Normalized to ssl_match_hostname which is the logical inverse of tlsallowinvalidhostnames
'tlsallowinvalidhostnames': lambda *x: not validate_boolean_or_string(*x),
'ssl_match_hostname': validate_boolean_or_string,
'tlsallowinvalidcertificates': validate_boolean_or_string,
'tlsallowinvalidhostnames': validate_boolean_or_string,
'tlscafile': validate_readable,
'tlscertificatekeyfile': validate_readable,
'tlscertificatekeyfilepassword': validate_string_or_none,
# Normalized to ssl_check_ocsp_endpoint which is the logical inverse of tlsdisableocspendpointcheck
'tlsdisableocspendpointcheck': lambda *x: not validate_boolean_or_string(*x),
'tlsdisableocspendpointcheck': validate_boolean_or_string,
'tlsinsecure': validate_boolean_or_string,
'w': validate_non_negative_int_or_basestring,
'wtimeoutms': validate_non_negative_integer,
@ -682,14 +668,7 @@ KW_VALIDATORS = {
INTERNAL_URI_OPTION_NAME_MAP = {
'j': 'journal',
'wtimeout': 'wtimeoutms',
'tls': 'ssl',
'tlsallowinvalidcertificates': 'ssl_cert_reqs',
'tlsallowinvalidhostnames': 'ssl_match_hostname',
'tlscrlfile': 'ssl_crlfile',
'tlscafile': 'ssl_ca_certs',
'tlscertificatekeyfile': 'ssl_certfile',
'tlscertificatekeyfilepassword': 'ssl_pem_passphrase',
'tlsdisableocspendpointcheck': 'ssl_check_ocsp_endpoint',
'ssl': 'tls',
}
# Map from deprecated URI option names to a tuple indicating the method of
@ -704,19 +683,6 @@ URI_OPTIONS_DEPRECATION_MAP = {
# option and/or recommend remedial action.
'j': ('renamed', 'journal'),
'wtimeout': ('renamed', 'wTimeoutMS'),
'ssl_cert_reqs': ('renamed', 'tlsAllowInvalidCertificates'),
'ssl_match_hostname': ('renamed', 'tlsAllowInvalidHostnames'),
'ssl_crlfile': ('renamed', 'tlsCRLFile'),
'ssl_ca_certs': ('renamed', 'tlsCAFile'),
'ssl_certfile': ('removed', (
'Instead of using ssl_certfile to specify the certificate file, '
'use tlsCertificateKeyFile to pass a single file containing both '
'the client certificate and the private key')),
'ssl_keyfile': ('removed', (
'Instead of using ssl_keyfile to specify the private keyfile, '
'use tlsCertificateKeyFile to pass a single file containing both '
'the client certificate and the private key')),
'ssl_pem_passphrase': ('renamed', 'tlsCertificateKeyFilePassword'),
}
# Augment the option validator map with pymongo-specific option information.

View File

@ -456,7 +456,9 @@ class MongoClient(common.BaseObject):
python 2.7.9+ (pypy 2.5.1+) and 3.3+. Defaults to ``None``.
- `tlsDisableOCSPEndpointCheck`: (boolean) If ``True``, disables
certificate revocation status checking via the OCSP responder
specified on the server certificate. Defaults to ``False``.
specified on the server certificate.
``tlsDisableOCSPEndpointCheck=False`` implies ``tls=True``.
Defaults to ``False``.
- `ssl`: (boolean) Alias for ``tls``.
| **Read Concern options:**

View File

@ -88,7 +88,7 @@ def _get_issuer_cert(cert, chain, trusted_ca_certs):
# Depending on the server's TLS library, the peer's cert chain may not
# include the self signed root CA. In this case we check the user
# provided tlsCAFile (ssl_ca_certs) for the issuer.
# provided tlsCAFile for the issuer.
# Remove once we use the verified peer cert chain in PYTHON-2147.
if trusted_ca_certs:
for candidate in trusted_ca_certs:

View File

@ -261,7 +261,7 @@ class PoolOptions(object):
'__max_idle_time_seconds',
'__connect_timeout', '__socket_timeout',
'__wait_queue_timeout',
'__ssl_context', '__ssl_match_hostname',
'__ssl_context', '__tls_allow_invalid_hostnames',
'__event_listeners', '__appname', '__driver', '__metadata',
'__compression_settings', '__max_connecting',
'__pause_enabled', '__server_api', '__load_balanced')
@ -271,7 +271,7 @@ class PoolOptions(object):
max_idle_time_seconds=MAX_IDLE_TIME_SEC, connect_timeout=None,
socket_timeout=None, wait_queue_timeout=WAIT_QUEUE_TIMEOUT,
ssl_context=None,
ssl_match_hostname=True,
tls_allow_invalid_hostnames=False,
event_listeners=None, appname=None, driver=None,
compression_settings=None, max_connecting=MAX_CONNECTING,
pause_enabled=True, server_api=None, load_balanced=None):
@ -282,7 +282,7 @@ class PoolOptions(object):
self.__socket_timeout = socket_timeout
self.__wait_queue_timeout = wait_queue_timeout
self.__ssl_context = ssl_context
self.__ssl_match_hostname = ssl_match_hostname
self.__tls_allow_invalid_hostnames = tls_allow_invalid_hostnames
self.__event_listeners = event_listeners
self.__appname = appname
self.__driver = driver
@ -400,10 +400,10 @@ class PoolOptions(object):
return self.__ssl_context
@property
def ssl_match_hostname(self):
def tls_allow_invalid_hostnames(self):
"""Call ssl.match_hostname if cert_reqs is not ssl.CERT_NONE.
"""
return self.__ssl_match_hostname
return self.__tls_allow_invalid_hostnames
@property
def event_listeners(self):
@ -1047,7 +1047,7 @@ def _configured_socket(address, options):
_raise_connection_failure(address, exc, "SSL handshake failed: ")
if (ssl_context.verify_mode and not
getattr(ssl_context, "check_hostname", False) and
options.ssl_match_hostname):
not options.tls_allow_invalid_hostnames):
try:
ssl.match_hostname(sock.getpeercert(), hostname=host)
except _CertificateError:

View File

@ -39,51 +39,26 @@ if HAVE_SSL:
HAS_SNI = _ssl.HAS_SNI
IPADDR_SAFE = _ssl.IS_PYOPENSSL or sys.version_info[:2] >= (3, 7)
SSLError = _ssl.SSLError
def validate_cert_reqs(option, value):
"""Validate the cert reqs are valid. It must be None or one of the
three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
``ssl.CERT_REQUIRED``.
"""
if value is None:
return value
if isinstance(value, str) and hasattr(_stdlibssl, value):
value = getattr(_stdlibssl, value)
if value in (CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED):
return value
raise ValueError("The value of %s must be one of: "
"`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
"`ssl.CERT_REQUIRED`" % (option,))
def validate_allow_invalid_certs(option, value):
"""Validate the option to allow invalid certificates is valid."""
# Avoid circular import.
from pymongo.common import validate_boolean_or_string
boolean_cert_reqs = validate_boolean_or_string(option, value)
if boolean_cert_reqs:
return CERT_NONE
return CERT_REQUIRED
def get_ssl_context(*args):
"""Create and return an SSLContext object."""
(certfile,
keyfile,
passphrase,
ca_certs,
cert_reqs,
allow_invalid_certificates,
crlfile,
match_hostname,
check_ocsp_endpoint) = args
verify_mode = CERT_REQUIRED if cert_reqs is None else cert_reqs
allow_invalid_hostnames,
disable_ocsp_endpoint_check) = args
verify_mode = CERT_NONE if allow_invalid_certificates else CERT_REQUIRED
ctx = _ssl.SSLContext(_ssl.PROTOCOL_SSLv23)
# SSLContext.check_hostname was added in CPython 3.4.
if hasattr(ctx, "check_hostname"):
if _ssl.CHECK_HOSTNAME_SAFE and verify_mode != CERT_NONE:
ctx.check_hostname = match_hostname
ctx.check_hostname = not allow_invalid_hostnames
else:
ctx.check_hostname = False
if hasattr(ctx, "check_ocsp_endpoint"):
ctx.check_ocsp_endpoint = check_ocsp_endpoint
ctx.check_ocsp_endpoint = not disable_ocsp_endpoint_check
if hasattr(ctx, "options"):
# Explicitly disable SSLv2, SSLv3 and TLS compression. Note that
# up to date versions of MongoDB 2.4 and above already disable
@ -95,20 +70,20 @@ if HAVE_SSL:
ctx.options |= _ssl.OP_NO_RENEGOTIATION
if certfile is not None:
try:
ctx.load_cert_chain(certfile, keyfile, passphrase)
ctx.load_cert_chain(certfile, None, passphrase)
except _ssl.SSLError as exc:
raise ConfigurationError(
"Private key doesn't match certificate: %s" % (exc,))
if crlfile is not None:
if _ssl.IS_PYOPENSSL:
raise ConfigurationError(
"ssl_crlfile cannot be used with PyOpenSSL")
"tlsCRLFile cannot be used with PyOpenSSL")
# Match the server's behavior.
ctx.verify_flags = getattr(_ssl, "VERIFY_CRL_CHECK_LEAF", 0)
ctx.load_verify_locations(crlfile)
if ca_certs is not None:
ctx.load_verify_locations(ca_certs)
elif cert_reqs != CERT_NONE:
elif verify_mode != CERT_NONE:
ctx.load_default_certs()
ctx.verify_mode = verify_mode
return ctx
@ -117,15 +92,6 @@ else:
pass
HAS_SNI = False
IPADDR_SAFE = False
def validate_cert_reqs(option, dummy):
"""No ssl module, raise ConfigurationError."""
raise ConfigurationError("The value of %s is set but can't be "
"validated. The ssl module is not available"
% (option,))
def validate_allow_invalid_certs(option, dummy):
"""No ssl module, raise ConfigurationError."""
return validate_cert_reqs(option, dummy)
def get_ssl_context(*dummy):
"""No ssl module, raise ConfigurationError."""

View File

@ -721,7 +721,7 @@ class Topology(object):
connect_timeout=options.connect_timeout,
socket_timeout=options.connect_timeout,
ssl_context=options.ssl_context,
ssl_match_hostname=options.ssl_match_hostname,
tls_allow_invalid_hostnames=options.tls_allow_invalid_hostnames,
event_listeners=options.event_listeners,
appname=options.appname,
driver=options.driver,

View File

@ -119,12 +119,7 @@ def parse_host(entity, default_port=DEFAULT_PORT):
_IMPLICIT_TLSINSECURE_OPTS = {
"tlsallowinvalidcertificates",
"tlsallowinvalidhostnames",
"tlsdisableocspendpointcheck",}
# Options that cannot be specified when tlsInsecure is also specified.
_TLSINSECURE_EXCLUDE_OPTS = (
{k for k in _IMPLICIT_TLSINSECURE_OPTS} |
{INTERNAL_URI_OPTION_NAME_MAP[k] for k in _IMPLICIT_TLSINSECURE_OPTS})
"tlsdisableocspendpointcheck"}
def _parse_options(opts, delim):
@ -156,22 +151,18 @@ def _handle_security_options(options):
- `options`: Instance of _CaseInsensitiveDictionary containing
MongoDB URI options.
"""
# Implicitly defined options must not be explicitly specified.
tlsinsecure = options.get('tlsinsecure')
if tlsinsecure is not None:
for opt in _TLSINSECURE_EXCLUDE_OPTS:
for opt in _IMPLICIT_TLSINSECURE_OPTS:
if opt in options:
err_msg = ("URI options %s and %s cannot be specified "
"simultaneously.")
raise InvalidURI(err_msg % (
options.cased_key('tlsinsecure'), options.cased_key(opt)))
# Convenience function to retrieve option values based on public or private names.
def _getopt(opt):
return (options.get(opt) or
options.get(INTERNAL_URI_OPTION_NAME_MAP[opt]))
# Handle co-occurence of OCSP & tlsAllowInvalidCertificates options.
tlsallowinvalidcerts = _getopt('tlsallowinvalidcertificates')
tlsallowinvalidcerts = options.get('tlsallowinvalidcertificates')
if tlsallowinvalidcerts is not None:
if 'tlsdisableocspendpointcheck' in options:
err_msg = ("URI options %s and %s cannot be specified "
@ -183,7 +174,7 @@ def _handle_security_options(options):
options['tlsdisableocspendpointcheck'] = True
# Handle co-occurence of CRL and OCSP-related options.
tlscrlfile = _getopt('tlscrlfile')
tlscrlfile = options.get('tlscrlfile')
if tlscrlfile is not None:
for opt in ('tlsinsecure', 'tlsallowinvalidcertificates',
'tlsdisableocspendpointcheck'):
@ -201,7 +192,7 @@ def _handle_security_options(options):
return val
if truth_value(options.get('ssl')) != truth_value(options.get('tls')):
err_msg = ("Can not specify conflicting values for URI options %s "
"and %s.")
"and %s.")
raise InvalidURI(err_msg % (
options.cased_key('ssl'), options.cased_key('tls')))
@ -246,18 +237,18 @@ def _handle_option_deprecations(options):
def _normalize_options(options):
"""Normalizes option names in the options dictionary by converting them to
their internally-used names. Also handles use of the tlsInsecure option.
their internally-used names.
:Parameters:
- `options`: Instance of _CaseInsensitiveDictionary containing
MongoDB URI options.
"""
# Expand the tlsInsecure option.
tlsinsecure = options.get('tlsinsecure')
if tlsinsecure is not None:
for opt in _IMPLICIT_TLSINSECURE_OPTS:
intname = INTERNAL_URI_OPTION_NAME_MAP[opt]
# Internal options are logical inverse of public options.
options[intname] = not tlsinsecure
# Implicit options are logically the same as tlsInsecure.
options[opt] = tlsinsecure
for optname in list(options):
intname = INTERNAL_URI_OPTION_NAME_MAP.get(optname, None)
@ -316,15 +307,15 @@ def split_options(opts, validate=True, warn=False, normalize=True):
options = _handle_option_deprecations(options)
if normalize:
options = _normalize_options(options)
if validate:
options = validate_options(options, warn)
if options.get('authsource') == '':
raise InvalidURI(
"the authSource database cannot be an empty string")
if normalize:
options = _normalize_options(options)
return options
@ -521,8 +512,8 @@ def parse_uri(uri, default_port=DEFAULT_PORT, validate=True, warn=False,
for opt, val in parsed_dns_options.items():
if opt not in options:
options[opt] = val
if "ssl" not in options:
options["ssl"] = True if validate else 'true'
if "tls" not in options and "ssl" not in options:
options["tls"] = True if validate else 'true'
else:
nodes = split_hosts(hosts, default_port=default_port)

View File

@ -241,7 +241,7 @@ class ClientContext(object):
self.is_rs = False
self.has_ipv6 = False
self.tls = False
self.ssl_certfile = False
self.tlsCertificateKeyFile = False
self.server_is_resolvable = is_server_resolvable()
self.default_client_options = {}
self.sessions_enabled = False
@ -323,7 +323,7 @@ class ClientContext(object):
if self.client:
self.tls = True
self.default_client_options.update(TLS_OPTIONS)
self.ssl_certfile = True
self.tlsCertificateKeyFile = True
if self.client:
self.connected = True
@ -793,10 +793,10 @@ class ClientContext(object):
"Must be able to connect without TLS",
func=func)
def require_ssl_certfile(self, func):
"""Run a test only if the client can connect with ssl_certfile."""
return self._require(lambda: self.ssl_certfile,
"Must be able to connect with ssl_certfile",
def require_tlsCertificateKeyFile(self, func):
"""Run a test only if the client can connect with tlsCertificateKeyFile."""
return self._require(lambda: self.tlsCertificateKeyFile,
"Must be able to connect with tlsCertificateKeyFile",
func=func)
def require_server_resolvable(self, func):

View File

@ -114,10 +114,9 @@ class ClientUnitTest(unittest.TestCase):
replicaSet=None,
read_preference=ReadPreference.PRIMARY,
ssl=False,
ssl_keyfile=None,
ssl_certfile=None,
ssl_cert_reqs=0, # ssl.CERT_NONE
ssl_ca_certs=None,
tlsCertificateKeyFile=None,
tlsAllowInvalidCertificates=True, # ssl.CERT_NONE
tlsCAFile=None,
connect=False,
serverSelectionTimeoutMS=12000)
@ -394,7 +393,7 @@ class ClientUnitTest(unittest.TestCase):
clopts = c._MongoClient__options
opts = clopts._options
self.assertEqual(opts['ssl'], False)
self.assertEqual(opts['tls'], False)
self.assertEqual(clopts.replica_set_name, "newname")
self.assertEqual(
clopts.read_preference, ReadPreference.SECONDARY_PREFERRED)
@ -445,7 +444,7 @@ class ClientUnitTest(unittest.TestCase):
# Matching SSL and TLS options should not cause errors.
c = MongoClient('mongodb://localhost/?ssl=false', tls=False,
connect=False)
self.assertEqual(c._MongoClient__options._options['ssl'], False)
self.assertEqual(c._MongoClient__options._options['tls'], False)
# Conflicting tlsInsecure options should raise an error.
with self.assertRaises(InvalidURI):
@ -455,12 +454,13 @@ class ClientUnitTest(unittest.TestCase):
# Conflicting legacy tlsInsecure options should also raise an error.
with self.assertRaises(InvalidURI):
MongoClient('mongodb://localhost/?tlsInsecure=true',
connect=False, ssl_cert_reqs=True)
connect=False, tlsAllowInvalidCertificates=False)
# Conflicting kwargs should raise InvalidURI
with self.assertRaises(InvalidURI):
MongoClient(ssl=True, tls=False)
class TestClient(IntegrationTest):
def test_max_idle_time_reaper_default(self):

View File

@ -58,7 +58,9 @@ def create_test(test_case):
uri = test_case['uri']
seeds = test_case['seeds']
hosts = test_case['hosts']
options = test_case.get('options')
options = test_case.get('options', {})
if 'ssl' in options:
options['tls'] = options.pop('ssl')
parsed_options = test_case.get('parsed_options')
# See DRIVERS-1324, unless tls is explicitly set to False we need TLS.
needs_tls = not (options and (options.get('ssl') == False or

View File

@ -176,7 +176,7 @@ class _TestPoolingBase(IntegrationTest):
# Start the pool with the correct ssl options.
pool_options = client_context.client._topology_settings.pool_options
kwargs['ssl_context'] = pool_options.ssl_context
kwargs['ssl_match_hostname'] = pool_options.ssl_match_hostname
kwargs['tls_allow_invalid_hostnames'] = pool_options.tls_allow_invalid_hostnames
kwargs['server_api'] = pool_options.server_api
pool = Pool(pair, PoolOptions(*args, **kwargs))
pool.ready()

View File

@ -26,7 +26,7 @@ from pymongo import MongoClient, ssl_support
from pymongo.errors import (ConfigurationError,
ConnectionFailure,
OperationFailure)
from pymongo.ssl_support import HAVE_SSL, get_ssl_context, validate_cert_reqs, _ssl
from pymongo.ssl_support import HAVE_SSL, get_ssl_context, _ssl
from pymongo.write_concern import WriteConcern
from test import (IntegrationTest,
client_context,
@ -91,7 +91,7 @@ class TestClientSSL(unittest.TestCase):
# Implied
self.assertRaises(ConfigurationError,
MongoClient, ssl_certfile=CLIENT_PEM)
MongoClient, tlsCertificateKeyFile=CLIENT_PEM)
@unittest.skipUnless(HAVE_SSL, "The ssl module is not available.")
@ignore_deprecations
@ -100,63 +100,41 @@ class TestClientSSL(unittest.TestCase):
self.assertRaises(ValueError, MongoClient, ssl='foo')
self.assertRaises(ConfigurationError,
MongoClient,
ssl=False,
ssl_certfile=CLIENT_PEM)
tls=False,
tlsCertificateKeyFile=CLIENT_PEM)
self.assertRaises(TypeError, MongoClient, ssl=0)
self.assertRaises(TypeError, MongoClient, ssl=5.5)
self.assertRaises(TypeError, MongoClient, ssl=[])
self.assertRaises(IOError, MongoClient, ssl_certfile="NoSuchFile")
self.assertRaises(TypeError, MongoClient, ssl_certfile=True)
self.assertRaises(TypeError, MongoClient, ssl_certfile=[])
self.assertRaises(IOError, MongoClient, ssl_keyfile="NoSuchFile")
self.assertRaises(TypeError, MongoClient, ssl_keyfile=True)
self.assertRaises(TypeError, MongoClient, ssl_keyfile=[])
self.assertRaises(IOError, MongoClient, tlsCertificateKeyFile="NoSuchFile")
self.assertRaises(TypeError, MongoClient, tlsCertificateKeyFile=True)
self.assertRaises(TypeError, MongoClient, tlsCertificateKeyFile=[])
# Test invalid combinations
self.assertRaises(ConfigurationError,
MongoClient,
ssl=False,
ssl_keyfile=CLIENT_PEM)
tls=False,
tlsCertificateKeyFile=CLIENT_PEM)
self.assertRaises(ConfigurationError,
MongoClient,
ssl=False,
ssl_certfile=CLIENT_PEM)
tls=False,
tlsCAFile=CA_PEM)
self.assertRaises(ConfigurationError,
MongoClient,
ssl=False,
ssl_keyfile=CLIENT_PEM,
ssl_certfile=CLIENT_PEM)
self.assertRaises(
ValueError, validate_cert_reqs, 'ssl_cert_reqs', 3)
self.assertRaises(
ValueError, validate_cert_reqs, 'ssl_cert_reqs', -1)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', None), None)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', ssl.CERT_NONE),
ssl.CERT_NONE)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', ssl.CERT_OPTIONAL),
ssl.CERT_OPTIONAL)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', ssl.CERT_REQUIRED),
ssl.CERT_REQUIRED)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', 0), ssl.CERT_NONE)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', 1), ssl.CERT_OPTIONAL)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', 2), ssl.CERT_REQUIRED)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', 'CERT_NONE'), ssl.CERT_NONE)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', 'CERT_OPTIONAL'),
ssl.CERT_OPTIONAL)
self.assertEqual(
validate_cert_reqs('ssl_cert_reqs', 'CERT_REQUIRED'),
ssl.CERT_REQUIRED)
tls=False,
tlsCRLFile=CRL_PEM)
self.assertRaises(ConfigurationError,
MongoClient,
tls=False,
tlsAllowInvalidCertificates=False)
self.assertRaises(ConfigurationError,
MongoClient,
tls=False,
tlsAllowInvalidHostnames=False)
self.assertRaises(ConfigurationError,
MongoClient,
tls=False,
tlsDisableOCSPEndpointCheck=False)
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
def test_use_pyopenssl_when_available(self):
@ -197,9 +175,9 @@ class TestSSL(IntegrationTest):
# no --sslPEMKeyFile or with --sslWeakCertificateValidation
self.assertClientWorks(self.client)
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@ignore_deprecations
def test_ssl_pem_passphrase(self):
def test_tlsCertificateKeyFilePassword(self):
# Expects the server to be running with server.pem and ca.pem
#
# --sslPEMKeyFile=/path/to/pymongo/test/certificates/server.pem
@ -210,26 +188,26 @@ class TestSSL(IntegrationTest):
MongoClient,
'localhost',
ssl=True,
ssl_certfile=CLIENT_ENCRYPTED_PEM,
ssl_pem_passphrase="qwerty",
ssl_ca_certs=CA_PEM,
tlsCertificateKeyFile=CLIENT_ENCRYPTED_PEM,
tlsCertificateKeyFilePassword="qwerty",
tlsCAFile=CA_PEM,
serverSelectionTimeoutMS=100)
else:
connected(MongoClient('localhost',
ssl=True,
ssl_certfile=CLIENT_ENCRYPTED_PEM,
ssl_pem_passphrase="qwerty",
ssl_ca_certs=CA_PEM,
tlsCertificateKeyFile=CLIENT_ENCRYPTED_PEM,
tlsCertificateKeyFilePassword="qwerty",
tlsCAFile=CA_PEM,
serverSelectionTimeoutMS=5000,
**self.credentials))
uri_fmt = ("mongodb://localhost/?ssl=true"
"&ssl_certfile=%s&ssl_pem_passphrase=qwerty"
"&ssl_ca_certs=%s&serverSelectionTimeoutMS=5000")
"&tlsCertificateKeyFile=%s&tlsCertificateKeyFilePassword=qwerty"
"&tlsCAFile=%s&serverSelectionTimeoutMS=5000")
connected(MongoClient(uri_fmt % (CLIENT_ENCRYPTED_PEM, CA_PEM),
**self.credentials))
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@client_context.require_no_auth
@ignore_deprecations
def test_cert_ssl_implicitly_set(self):
@ -239,21 +217,21 @@ class TestSSL(IntegrationTest):
# --sslCAFile=/path/to/pymongo/test/certificates/ca.pem
#
# test that setting ssl_certfile causes ssl to be set to True
# test that setting tlsCertificateKeyFile causes ssl to be set to True
client = MongoClient(client_context.host, client_context.port,
ssl_cert_reqs=ssl.CERT_NONE,
ssl_certfile=CLIENT_PEM)
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CLIENT_PEM)
response = client.admin.command('ismaster')
if 'setName' in response:
client = MongoClient(client_context.pair,
replicaSet=response['setName'],
w=len(response['hosts']),
ssl_cert_reqs=ssl.CERT_NONE,
ssl_certfile=CLIENT_PEM)
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CLIENT_PEM)
self.assertClientWorks(client)
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@client_context.require_no_auth
@ignore_deprecations
def test_cert_ssl_validation(self):
@ -264,9 +242,9 @@ class TestSSL(IntegrationTest):
#
client = MongoClient('localhost',
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ca_certs=CA_PEM)
tlsCertificateKeyFile=CLIENT_PEM,
tlsAllowInvalidCertificates=False,
tlsCAFile=CA_PEM)
response = client.admin.command('ismaster')
if 'setName' in response:
if response['primary'].split(":")[0] != 'localhost':
@ -277,21 +255,21 @@ class TestSSL(IntegrationTest):
replicaSet=response['setName'],
w=len(response['hosts']),
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ca_certs=CA_PEM)
tlsCertificateKeyFile=CLIENT_PEM,
tlsAllowInvalidCertificates=False,
tlsCAFile=CA_PEM)
self.assertClientWorks(client)
if HAVE_IPADDRESS:
client = MongoClient('127.0.0.1',
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ca_certs=CA_PEM)
tlsCertificateKeyFile=CLIENT_PEM,
tlsAllowInvalidCertificates=False,
tlsCAFile=CA_PEM)
self.assertClientWorks(client)
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@client_context.require_no_auth
@ignore_deprecations
def test_cert_ssl_uri_support(self):
@ -300,43 +278,12 @@ class TestSSL(IntegrationTest):
# --sslPEMKeyFile=/path/to/pymongo/test/certificates/server.pem
# --sslCAFile=/path/to/pymongo/test/certificates/ca.pem
#
uri_fmt = ("mongodb://localhost/?ssl=true&ssl_certfile=%s&ssl_cert_reqs"
"=%s&ssl_ca_certs=%s&ssl_match_hostname=true")
client = MongoClient(uri_fmt % (CLIENT_PEM, 'CERT_REQUIRED', CA_PEM))
uri_fmt = ("mongodb://localhost/?ssl=true&tlsCertificateKeyFile=%s&tlsAllowInvalidCertificates"
"=%s&tlsCAFile=%s&tlsAllowInvalidHostnames=false")
client = MongoClient(uri_fmt % (CLIENT_PEM, 'true', CA_PEM))
self.assertClientWorks(client)
@client_context.require_ssl_certfile
@client_context.require_no_auth
@ignore_deprecations
def test_cert_ssl_validation_optional(self):
# Expects the server to be running with server.pem and ca.pem
#
# --sslPEMKeyFile=/path/to/pymongo/test/certificates/server.pem
# --sslCAFile=/path/to/pymongo/test/certificates/ca.pem
#
client = MongoClient('localhost',
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_OPTIONAL,
ssl_ca_certs=CA_PEM)
response = client.admin.command('ismaster')
if 'setName' in response:
if response['primary'].split(":")[0] != 'localhost':
raise SkipTest("No hosts in the replicaset for 'localhost'. "
"Cannot validate hostname in the certificate")
client = MongoClient('localhost',
replicaSet=response['setName'],
w=len(response['hosts']),
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_OPTIONAL,
ssl_ca_certs=CA_PEM)
self.assertClientWorks(client)
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@client_context.require_server_resolvable
@ignore_deprecations
def test_cert_ssl_validation_hostname_matching(self):
@ -345,16 +292,16 @@ class TestSSL(IntegrationTest):
# --sslPEMKeyFile=/path/to/pymongo/test/certificates/server.pem
# --sslCAFile=/path/to/pymongo/test/certificates/ca.pem
ctx = get_ssl_context(
None, None, None, None, ssl.CERT_NONE, None, False, True)
None, None, None, True, None, True, False)
self.assertFalse(ctx.check_hostname)
ctx = get_ssl_context(
None, None, None, None, ssl.CERT_NONE, None, True, True)
None, None, None, True, None, False, False)
self.assertFalse(ctx.check_hostname)
ctx = get_ssl_context(
None, None, None, None, ssl.CERT_REQUIRED, None, False, True)
None, None, None, False, None, True, False)
self.assertFalse(ctx.check_hostname)
ctx = get_ssl_context(
None, None, None, None, ssl.CERT_REQUIRED, None, True, True)
None, None, None, False, None, False, False)
if _PY37PLUS or _HAVE_PYOPENSSL:
self.assertTrue(ctx.check_hostname)
else:
@ -365,18 +312,18 @@ class TestSSL(IntegrationTest):
with self.assertRaises(ConnectionFailure):
connected(MongoClient('server',
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ca_certs=CA_PEM,
tlsCertificateKeyFile=CLIENT_PEM,
tlsAllowInvalidCertificates=False,
tlsCAFile=CA_PEM,
serverSelectionTimeoutMS=500,
**self.credentials))
connected(MongoClient('server',
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ca_certs=CA_PEM,
ssl_match_hostname=False,
tlsCertificateKeyFile=CLIENT_PEM,
tlsAllowInvalidCertificates=False,
tlsCAFile=CA_PEM,
tlsAllowInvalidHostnames=True,
serverSelectionTimeoutMS=500,
**self.credentials))
@ -385,61 +332,61 @@ class TestSSL(IntegrationTest):
connected(MongoClient('server',
replicaSet=response['setName'],
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ca_certs=CA_PEM,
tlsCertificateKeyFile=CLIENT_PEM,
tlsAllowInvalidCertificates=False,
tlsCAFile=CA_PEM,
serverSelectionTimeoutMS=500,
**self.credentials))
connected(MongoClient('server',
replicaSet=response['setName'],
ssl=True,
ssl_certfile=CLIENT_PEM,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ca_certs=CA_PEM,
ssl_match_hostname=False,
tlsCertificateKeyFile=CLIENT_PEM,
tlsAllowInvalidCertificates=False,
tlsCAFile=CA_PEM,
tlsAllowInvalidHostnames=True,
serverSelectionTimeoutMS=500,
**self.credentials))
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@ignore_deprecations
def test_ssl_crlfile_support(self):
def test_tlsCRLFile_support(self):
if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF') or _ssl.IS_PYOPENSSL:
self.assertRaises(
ConfigurationError,
MongoClient,
'localhost',
ssl=True,
ssl_ca_certs=CA_PEM,
ssl_crlfile=CRL_PEM,
tlsCAFile=CA_PEM,
tlsCRLFile=CRL_PEM,
serverSelectionTimeoutMS=100)
else:
connected(MongoClient('localhost',
ssl=True,
ssl_ca_certs=CA_PEM,
tlsCAFile=CA_PEM,
serverSelectionTimeoutMS=100,
**self.credentials))
with self.assertRaises(ConnectionFailure):
connected(MongoClient('localhost',
ssl=True,
ssl_ca_certs=CA_PEM,
ssl_crlfile=CRL_PEM,
tlsCAFile=CA_PEM,
tlsCRLFile=CRL_PEM,
serverSelectionTimeoutMS=100,
**self.credentials))
uri_fmt = ("mongodb://localhost/?ssl=true&"
"ssl_ca_certs=%s&serverSelectionTimeoutMS=100")
"tlsCAFile=%s&serverSelectionTimeoutMS=100")
connected(MongoClient(uri_fmt % (CA_PEM,),
**self.credentials))
uri_fmt = ("mongodb://localhost/?ssl=true&ssl_crlfile=%s"
"&ssl_ca_certs=%s&serverSelectionTimeoutMS=100")
uri_fmt = ("mongodb://localhost/?ssl=true&tlsCRLFile=%s"
"&tlsCAFile=%s&serverSelectionTimeoutMS=100")
with self.assertRaises(ConnectionFailure):
connected(MongoClient(uri_fmt % (CRL_PEM, CA_PEM),
**self.credentials))
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@client_context.require_server_resolvable
@ignore_deprecations
def test_validation_with_system_ca_certs(self):
@ -460,7 +407,7 @@ class TestSSL(IntegrationTest):
# Server cert is verified. Disable hostname matching.
connected(MongoClient('server',
ssl=True,
ssl_match_hostname=False,
tlsAllowInvalidHostnames=True,
serverSelectionTimeoutMS=100,
**self.credentials))
@ -478,7 +425,7 @@ class TestSSL(IntegrationTest):
def test_system_certs_config_error(self):
ctx = get_ssl_context(
None, None, None, None, ssl.CERT_NONE, None, False, True)
None, None, None, ssl.CERT_NONE, None, True, False)
if ((sys.platform != "win32"
and hasattr(ctx, "set_default_verify_paths"))
or hasattr(ctx, "load_default_certs")):
@ -542,7 +489,7 @@ class TestSSL(IntegrationTest):
self.assertEqual(ssl_sock.ca_certs, ssl_support._WINCERTS.name)
@client_context.require_auth
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@ignore_deprecations
def test_mongodb_x509_auth(self):
host, port = client_context.host, client_context.port
@ -556,8 +503,8 @@ class TestSSL(IntegrationTest):
noauth = MongoClient(
client_context.pair,
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE,
ssl_certfile=CLIENT_PEM)
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CLIENT_PEM)
with self.assertRaises(OperationFailure):
noauth.pymongo_test.test.find_one()
@ -567,8 +514,8 @@ class TestSSL(IntegrationTest):
client_context.pair,
authMechanism='MONGODB-X509',
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE,
ssl_certfile=CLIENT_PEM,
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CLIENT_PEM,
event_listeners=[listener])
if client_context.version.at_least(3, 3, 12):
@ -590,16 +537,16 @@ class TestSSL(IntegrationTest):
quote_plus(MONGODB_X509_USERNAME), host, port))
client = MongoClient(uri,
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE,
ssl_certfile=CLIENT_PEM)
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CLIENT_PEM)
# No error
client.pymongo_test.test.find_one()
uri = 'mongodb://%s:%d/?authMechanism=MONGODB-X509' % (host, port)
client = MongoClient(uri,
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE,
ssl_certfile=CLIENT_PEM)
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CLIENT_PEM)
if client_context.version.at_least(3, 3, 12):
# No error
client.pymongo_test.test.find_one()
@ -614,7 +561,7 @@ class TestSSL(IntegrationTest):
quote_plus("not the username"), host, port))
bad_client = MongoClient(
uri, ssl=True, ssl_cert_reqs="CERT_NONE", ssl_certfile=CLIENT_PEM)
uri, ssl=True, tlsAllowInvalidCertificates=True, tlsCertificateKeyFile=CLIENT_PEM)
with self.assertRaises(OperationFailure):
bad_client.pymongo_test.test.find_one()
@ -624,8 +571,8 @@ class TestSSL(IntegrationTest):
username="not the username",
authMechanism='MONGODB-X509',
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE,
ssl_certfile=CLIENT_PEM)
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CLIENT_PEM)
with self.assertRaises(OperationFailure):
bad_client.pymongo_test.test.find_one()
@ -637,15 +584,15 @@ class TestSSL(IntegrationTest):
try:
connected(MongoClient(uri,
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE,
ssl_certfile=CA_PEM,
tlsAllowInvalidCertificates=True,
tlsCertificateKeyFile=CA_PEM,
serverSelectionTimeoutMS=100))
except (ConnectionFailure, ConfigurationError):
pass
else:
self.fail("Invalid certificate accepted.")
@client_context.require_ssl_certfile
@client_context.require_tlsCertificateKeyFile
@ignore_deprecations
def test_connect_with_ca_bundle(self):
def remove(path):

View File

@ -92,7 +92,7 @@ class TestURI(unittest.TestCase):
self.assertRaises(ConfigurationError, split_options, 'foo=bar;foo')
self.assertTrue(split_options('ssl=true'))
self.assertTrue(split_options('connect=true'))
self.assertTrue(split_options('ssl_match_hostname=true'))
self.assertTrue(split_options('tlsAllowInvalidHostnames=false'))
# Test Invalid URI options that should throw warnings.
with warnings.catch_warnings():
@ -116,7 +116,7 @@ class TestURI(unittest.TestCase):
self.assertRaises(Warning, split_options,
'connect=foo', warn=True)
self.assertRaises(Warning, split_options,
'ssl_match_hostname=foo', warn=True)
'tlsAllowInvalidHostnames=foo', warn=True)
self.assertRaises(Warning, split_options,
'connectTimeoutMS=inf', warn=True)
self.assertRaises(Warning, split_options,
@ -145,7 +145,7 @@ class TestURI(unittest.TestCase):
'connectTimeoutMS=-1e100000')
self.assertRaises(ValueError, split_options, 'ssl=foo')
self.assertRaises(ValueError, split_options, 'connect=foo')
self.assertRaises(ValueError, split_options, 'ssl_match_hostname=foo')
self.assertRaises(ValueError, split_options, 'tlsAllowInvalidHostnames=foo')
self.assertRaises(ValueError, split_options, 'connectTimeoutMS=inf')
self.assertRaises(ValueError, split_options, 'connectTimeoutMS=-inf')
self.assertRaises(ValueError, split_options, 'wtimeoutms=foo')
@ -444,58 +444,40 @@ class TestURI(unittest.TestCase):
{'collection': None,
'database': None,
'nodelist': [('/MongoDB.sock', None)],
'options': {'ssl_certfile': '/a/b'},
'options': {'tlsCertificateKeyFile': '/a/b'},
'password': 'foo/bar',
'username': 'jesse',
'fqdn': None},
parse_uri(
'mongodb://jesse:foo%2Fbar@%2FMongoDB.sock/?ssl_certfile=/a/b',
'mongodb://jesse:foo%2Fbar@%2FMongoDB.sock/?tlsCertificateKeyFile=/a/b',
validate=False))
self.assertEqual(
{'collection': None,
'database': None,
'nodelist': [('/MongoDB.sock', None)],
'options': {'ssl_certfile': 'a/b'},
'options': {'tlsCertificateKeyFile': 'a/b'},
'password': 'foo/bar',
'username': 'jesse',
'fqdn': None},
parse_uri(
'mongodb://jesse:foo%2Fbar@%2FMongoDB.sock/?ssl_certfile=a/b',
'mongodb://jesse:foo%2Fbar@%2FMongoDB.sock/?tlsCertificateKeyFile=a/b',
validate=False))
def test_tlsinsecure_simple(self):
# check that tlsInsecure is expanded correctly.
self.maxDiff = None
uri = "mongodb://example.com/?tlsInsecure=true"
res = {
"ssl_match_hostname": False, "ssl_cert_reqs": CERT_NONE,
"tlsinsecure": True, 'ssl_check_ocsp_endpoint': False}
self.assertEqual(res, parse_uri(uri)["options"])
def test_tlsinsecure_legacy_conflict(self):
# must not allow use of tlsinsecure alongside legacy TLS options.
# same check for modern TLS options is performed in the spec-tests.
uri = "mongodb://srv.com/?tlsInsecure=true&ssl_match_hostname=true"
with self.assertRaises(InvalidURI):
parse_uri(uri, validate=False, warn=False, normalize=False)
def test_tlsDisableOCSPEndpointCheck(self):
# check that tlsDisableOCSPEndpointCheck is handled correctly.
uri = "mongodb://example.com/?tlsDisableOCSPEndpointCheck=true"
res = {'ssl_check_ocsp_endpoint': False}
self.assertEqual(res, parse_uri(uri)["options"])
uri = "mongodb://example.com/?tlsDisableOCSPEndpointCheck=false"
res = {'ssl_check_ocsp_endpoint': True}
"tlsAllowInvalidHostnames": True,
"tlsAllowInvalidCertificates": True,
"tlsInsecure": True, 'tlsDisableOCSPEndpointCheck': True}
self.assertEqual(res, parse_uri(uri)["options"])
def test_normalize_options(self):
# check that options are converted to their internal names correctly.
uri = ("mongodb://example.com/?tls=true&appname=myapp&maxPoolSize=10&"
"fsync=true&wtimeout=10")
res = {
"ssl": True, "appname": "myapp", "maxpoolsize": 10,
"fsync": True, "wtimeoutms": 10}
uri = ("mongodb://example.com/?ssl=true&appname=myapp&wtimeout=10")
res = {"tls": True, "appname": "myapp", "wtimeoutms": 10}
self.assertEqual(res, parse_uri(uri)["options"])
def test_unquote_after_parsing(self):