# Copyright 2011-2012 10gen, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for SSL support.""" import os import socket import sys import unittest sys.path[0:0] = [""] from urllib import quote_plus from nose.plugins.skip import SkipTest from pymongo import MongoClient, MongoReplicaSetClient from pymongo.common import HAS_SSL from pymongo.errors import (ConfigurationError, ConnectionFailure, OperationFailure) from test import host, port, pair, version from test.utils import get_command_line CERT_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'certificates') CLIENT_PEM = os.path.join(CERT_PATH, 'client.pem') CA_PEM = os.path.join(CERT_PATH, 'ca.pem') SIMPLE_SSL = False CERT_SSL = False SERVER_IS_RESOLVABLE = False MONGODB_X509_USERNAME = os.environ.get('MONGODB_X509_USERNAME') # To fully test this start a mongod instance (built with SSL support) like so: # mongod --dbpath /path/to/data/directory --sslOnNormalPorts \ # --sslPEMKeyFile /path/to/mongo/jstests/libs/server.pem \ # --sslCAFile /path/to/mongo/jstests/libs/ca.pem \ # --sslCRLFile /path/to/mongo/jstests/libs/crl.pem \ # --sslWeakCertificateValidation # Also, make sure you have 'server' as an alias for localhost in /etc/hosts # # Note: For all tests to pass with MongoReplicaSetConnection the replica # set configuration must use 'server' for the hostname of all hosts. def is_server_resolvable(): """Returns True if 'server' is resolvable.""" socket_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(1) try: try: socket.gethostbyname('server') return True except socket.error: return False finally: socket.setdefaulttimeout(socket_timeout) if HAS_SSL: import ssl # Check this all once instead of before every test method below. # Is MongoDB configured for SSL? try: MongoClient(host, port, connectTimeoutMS=100, ssl=True) SIMPLE_SSL = True except ConnectionFailure: pass if SIMPLE_SSL: # Is MongoDB configured with server.pem, ca.pem, and crl.pem from # mongodb jstests/lib? try: MongoClient(host, port, connectTimeoutMS=100, ssl=True, ssl_certfile=CLIENT_PEM) CERT_SSL = True except ConnectionFailure: pass if CERT_SSL: SERVER_IS_RESOLVABLE = is_server_resolvable() class TestClientSSL(unittest.TestCase): def test_no_ssl_module(self): # Test that ConfigurationError is raised if the ssl # module isn't available. if HAS_SSL: raise SkipTest( "The ssl module is available, can't test what happens " "without it." ) # Explicit self.assertRaises(ConfigurationError, MongoClient, ssl=True) self.assertRaises(ConfigurationError, MongoReplicaSetClient, ssl=True) # Implied self.assertRaises(ConfigurationError, MongoClient, ssl_certfile=CLIENT_PEM) self.assertRaises(ConfigurationError, MongoReplicaSetClient, ssl_certfile=CLIENT_PEM) def test_config_ssl(self): """Tests various ssl configurations""" self.assertRaises(ConfigurationError, MongoClient, ssl='foo') self.assertRaises(ConfigurationError, MongoClient, ssl=False, ssl_certfile=CLIENT_PEM) self.assertRaises(TypeError, MongoClient, ssl=0) self.assertRaises(TypeError, MongoClient, ssl=5.5) self.assertRaises(TypeError, MongoClient, ssl=[]) self.assertRaises(ConfigurationError, MongoReplicaSetClient, ssl='foo') self.assertRaises(ConfigurationError, MongoReplicaSetClient, ssl=False, ssl_certfile=CLIENT_PEM) self.assertRaises(TypeError, MongoReplicaSetClient, ssl=0) self.assertRaises(TypeError, MongoReplicaSetClient, ssl=5.5) self.assertRaises(TypeError, MongoReplicaSetClient, ssl=[]) self.assertRaises(IOError, MongoClient, ssl_certfile="NoSuchFile") self.assertRaises(TypeError, MongoClient, ssl_certfile=True) self.assertRaises(TypeError, MongoClient, ssl_certfile=[]) self.assertRaises(IOError, MongoClient, ssl_keyfile="NoSuchFile") self.assertRaises(TypeError, MongoClient, ssl_keyfile=True) self.assertRaises(TypeError, MongoClient, ssl_keyfile=[]) self.assertRaises(IOError, MongoReplicaSetClient, ssl_keyfile="NoSuchFile") self.assertRaises(IOError, MongoReplicaSetClient, ssl_certfile="NoSuchFile") self.assertRaises(TypeError, MongoReplicaSetClient, ssl_certfile=True) # Test invalid combinations self.assertRaises(ConfigurationError, MongoClient, ssl=False, ssl_keyfile=CLIENT_PEM) self.assertRaises(ConfigurationError, MongoClient, ssl=False, ssl_certfile=CLIENT_PEM) self.assertRaises(ConfigurationError, MongoClient, ssl=False, ssl_keyfile=CLIENT_PEM, ssl_certfile=CLIENT_PEM) self.assertRaises(ConfigurationError, MongoReplicaSetClient, ssl=False, ssl_keyfile=CLIENT_PEM) self.assertRaises(ConfigurationError, MongoReplicaSetClient, ssl=False, ssl_certfile=CLIENT_PEM) self.assertRaises(ConfigurationError, MongoReplicaSetClient, ssl=False, ssl_keyfile=CLIENT_PEM, ssl_certfile=CLIENT_PEM) class TestSSL(unittest.TestCase): def setUp(self): if not HAS_SSL: raise SkipTest("The ssl module is not available.") if sys.version.startswith('3.0'): raise SkipTest("Python 3.0.x has problems " "with SSL and socket timeouts.") if not SIMPLE_SSL: raise SkipTest("No simple mongod available over SSL") def test_simple_ssl(self): # Expects the server to be running with ssl and with # no --sslPEMKeyFile or with --sslWeakCertificateValidation client = MongoClient(host, port, ssl=True) response = client.admin.command('ismaster') if 'setName' in response: client = MongoReplicaSetClient(pair, replicaSet=response['setName'], w=len(response['hosts']), ssl=True) db = client.pymongo_ssl_test db.test.drop() self.assertTrue(db.test.insert({'ssl': True})) self.assertTrue(db.test.find_one()['ssl']) client.drop_database('pymongo_ssl_test') def test_cert_ssl(self): # Expects the server to be running with the the server.pem, ca.pem # and crl.pem provided in mongodb and the server tests eg: # # --sslPEMKeyFile=jstests/libs/server.pem # --sslCAFile=jstests/libs/ca.pem # --sslCRLFile=jstests/libs/crl.pem # # Also requires an /etc/hosts entry where "server" is resolvable if not CERT_SSL: raise SkipTest("No mongod available over SSL with certs") client = MongoClient(host, port, ssl=True, ssl_certfile=CLIENT_PEM) response = client.admin.command('ismaster') if 'setName' in response: client = MongoReplicaSetClient(pair, replicaSet=response['setName'], w=len(response['hosts']), ssl=True, ssl_certfile=CLIENT_PEM) db = client.pymongo_ssl_test db.test.drop() self.assertTrue(db.test.insert({'ssl': True})) self.assertTrue(db.test.find_one()['ssl']) client.drop_database('pymongo_ssl_test') def test_cert_ssl_implicitly_set(self): # Expects the server to be running with the the server.pem, ca.pem # and crl.pem provided in mongodb and the server tests eg: # # --sslPEMKeyFile=jstests/libs/server.pem # --sslCAFile=jstests/libs/ca.pem # --sslCRLFile=jstests/libs/crl.pem # # Also requires an /etc/hosts entry where "server" is resolvable if not CERT_SSL: raise SkipTest("No mongod available over SSL with certs") client = MongoClient(host, port, ssl_certfile=CLIENT_PEM) response = client.admin.command('ismaster') if 'setName' in response: client = MongoReplicaSetClient(pair, replicaSet=response['setName'], w=len(response['hosts']), ssl_certfile=CLIENT_PEM) db = client.pymongo_ssl_test db.test.drop() self.assertTrue(db.test.insert({'ssl': True})) self.assertTrue(db.test.find_one()['ssl']) client.drop_database('pymongo_ssl_test') def test_cert_ssl_validation(self): # Expects the server to be running with the the server.pem, ca.pem # and crl.pem provided in mongodb and the server tests eg: # # --sslPEMKeyFile=jstests/libs/server.pem # --sslCAFile=jstests/libs/ca.pem # --sslCRLFile=jstests/libs/crl.pem # # Also requires an /etc/hosts entry where "server" is resolvable if not CERT_SSL: raise SkipTest("No mongod available over SSL with certs") if not SERVER_IS_RESOLVABLE: raise SkipTest("No hosts entry for 'server'. Cannot validate " "hostname in the certificate") client = MongoClient('server', ssl=True, ssl_certfile=CLIENT_PEM, ssl_cert_reqs=ssl.CERT_REQUIRED, ssl_ca_certs=CA_PEM) response = client.admin.command('ismaster') if 'setName' in response: if response['primary'].split(":")[0] != 'server': raise SkipTest("No hosts in the replicaset for 'server'. " "Cannot validate hostname in the certificate") client = MongoReplicaSetClient('server', replicaSet=response['setName'], w=len(response['hosts']), ssl=True, ssl_certfile=CLIENT_PEM, ssl_cert_reqs=ssl.CERT_REQUIRED, ssl_ca_certs=CA_PEM) db = client.pymongo_ssl_test db.test.drop() self.assertTrue(db.test.insert({'ssl': True})) self.assertTrue(db.test.find_one()['ssl']) client.drop_database('pymongo_ssl_test') def test_cert_ssl_validation_optional(self): # Expects the server to be running with the the server.pem, ca.pem # and crl.pem provided in mongodb and the server tests eg: # # --sslPEMKeyFile=jstests/libs/server.pem # --sslCAFile=jstests/libs/ca.pem # --sslCRLFile=jstests/libs/crl.pem # # Also requires an /etc/hosts entry where "server" is resolvable if not CERT_SSL: raise SkipTest("No mongod available over SSL with certs") if not SERVER_IS_RESOLVABLE: raise SkipTest("No hosts entry for 'server'. Cannot validate " "hostname in the certificate") client = MongoClient('server', ssl=True, ssl_certfile=CLIENT_PEM, ssl_cert_reqs=ssl.CERT_OPTIONAL, ssl_ca_certs=CA_PEM) response = client.admin.command('ismaster') if 'setName' in response: if response['primary'].split(":")[0] != 'server': raise SkipTest("No hosts in the replicaset for 'server'. " "Cannot validate hostname in the certificate") client = MongoReplicaSetClient('server', replicaSet=response['setName'], w=len(response['hosts']), ssl=True, ssl_certfile=CLIENT_PEM, ssl_cert_reqs=ssl.CERT_OPTIONAL, ssl_ca_certs=CA_PEM) db = client.pymongo_ssl_test db.test.drop() self.assertTrue(db.test.insert({'ssl': True})) self.assertTrue(db.test.find_one()['ssl']) client.drop_database('pymongo_ssl_test') def test_cert_ssl_validation_hostname_fail(self): # Expects the server to be running with the the server.pem, ca.pem # and crl.pem provided in mongodb and the server tests eg: # # --sslPEMKeyFile=jstests/libs/server.pem # --sslCAFile=jstests/libs/ca.pem # --sslCRLFile=jstests/libs/crl.pem if not CERT_SSL: raise SkipTest("No mongod available over SSL with certs") client = MongoClient(ssl=True, ssl_certfile=CLIENT_PEM) response = client.admin.command('ismaster') single_server = 'setName' not in response if single_server: try: MongoClient(pair, ssl=True, ssl_certfile=CLIENT_PEM, ssl_cert_reqs=ssl.CERT_REQUIRED, ssl_ca_certs=CA_PEM) self.fail("Invalid hostname should have failed") except: pass else: try: MongoReplicaSetClient(pair, replicaSet=response['setName'], w=len(response['hosts']), ssl=True, ssl_certfile=CLIENT_PEM, ssl_cert_reqs=ssl.CERT_OPTIONAL, ssl_ca_certs=CA_PEM) self.fail("Invalid hostname should have failed") except: pass def test_mongodb_x509_auth(self): # Expects the server to be running with the the server.pem, ca.pem # and crl.pem provided in mongodb and the server tests as well as # --auth # # --sslPEMKeyFile=jstests/libs/server.pem # --sslCAFile=jstests/libs/ca.pem # --sslCRLFile=jstests/libs/crl.pem # --auth if not MONGODB_X509_USERNAME: raise SkipTest("MONGODB_X509_USERNAME " "must be set to test MONGODB-X509") if not CERT_SSL: raise SkipTest("No mongod available over SSL with certs") client = MongoClient(host, port, ssl=True, ssl_certfile=CLIENT_PEM) if not version.at_least(client, (2, 5, 1)): raise SkipTest("MONGODB-X509 requires MongoDB 2.5.1 or newer") argv = get_command_line(client) if '--auth' not in argv: raise SkipTest("Mongo must be started with " "--auth to test MONGODB-X509") # Give admin all necessary priviledges. client.admin.add_user(MONGODB_X509_USERNAME, userSource='$external', roles=['readWriteAnyDatabase', 'userAdminAnyDatabase', 'dbAdminAnyDatabase']) client = MongoClient(host, port, ssl=True, ssl_certfile=CLIENT_PEM) coll = client.pymongo_test.test self.assertRaises(OperationFailure, coll.count) self.assertTrue(client.admin.authenticate(MONGODB_X509_USERNAME, mechanism='MONGODB-X509')) self.assertEqual(0, coll.count()) uri = ('mongodb://%s@%s:%d/?authMechanism=' 'MONGODB-X509' % (quote_plus(MONGODB_X509_USERNAME), host, port)) # SSL options aren't supported in the URI... self.assertTrue(MongoClient(uri, ssl=True, ssl_certfile=CLIENT_PEM)) # Cleanup client.admin.system.users.remove() client['$external'].logout() if __name__ == "__main__": unittest.main()