From 1e866f3ba23d4da83b36c3999584107cb35ae848 Mon Sep 17 00:00:00 2001 From: Adam Harwell Date: Tue, 21 Oct 2014 18:25:29 -0500 Subject: [PATCH] Local development implementation for Certificates A basic local filesystem implementation of CertManager and a local pyOpenSSL implementation of CertGenerator. Change-Id: I0eb0476afaad8a1bbb2eaaf90564eb63f7872546 Partially-implements: blueprint tls-data-security --- etc/octavia.conf | 11 +- octavia/certificates/common/local.py | 87 +++++++++ octavia/certificates/generator/__init__.py | 2 +- octavia/certificates/generator/local.py | 108 +++++++++++ octavia/certificates/manager/__init__.py | 2 +- octavia/certificates/manager/local.py | 167 ++++++++++++++++++ .../unit/certificates/common/test_local.py | 42 +++++ .../unit/certificates/generator/test_local.py | 90 ++++++++++ .../unit/certificates/manager/test_local.py | 121 +++++++++++++ requirements.txt | 2 + 10 files changed, 628 insertions(+), 4 deletions(-) create mode 100644 octavia/certificates/common/local.py create mode 100644 octavia/certificates/generator/local.py create mode 100644 octavia/certificates/manager/local.py create mode 100644 octavia/tests/unit/certificates/common/test_local.py create mode 100644 octavia/tests/unit/certificates/generator/test_local.py create mode 100644 octavia/tests/unit/certificates/manager/test_local.py diff --git a/etc/octavia.conf b/etc/octavia.conf index a6788d4c3e..c6dfe5e120 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -25,5 +25,12 @@ # admin_project_id = service [certificates] -# cert_generator_class = -# cert_manager_class = +# cert_generator_class = octavia.certificates.generator.LocalCertGenerator +# cert_manager_class = octavia.certificates.manager.LocalCertManager + +# For local certificate signing (development only): +# ca_certificate = /etc/ssl/certs/ssl-cert-snakeoil.pem +# ca_private_key = /etc/ssl/private/ssl-cert-snakeoil.key +# ca_private_key_passphrase = +# signing_digest = sha265 +# storage_path = /var/lib/octavia/certificates/ diff --git a/octavia/certificates/common/local.py b/octavia/certificates/common/local.py new file mode 100644 index 0000000000..7b45d134c1 --- /dev/null +++ b/octavia/certificates/common/local.py @@ -0,0 +1,87 @@ +# Copyright (c) 2014 Rackspace US, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Common classes for local filesystem certificate handling +""" +import os + +from oslo.config import cfg + +from octavia.certificates.common import cert + +TLS_CERT_DEFAULT = os.environ.get( + 'OS_OCTAVIA_TLS_CA_CERT', '/etc/ssl/certs/ssl-cert-snakeoil.pem' +) +TLS_KEY_DEFAULT = os.environ.get( + 'OS_OCTAVIA_TLS_CA_KEY', '/etc/ssl/private/ssl-cert-snakeoil.key' +) +TLS_PKP_DEFAULT = os.environ.get('OS_OCTAVIA_CA_KEY_PASS') +TLS_DIGEST_DEFAULT = os.environ.get('OS_OCTAVIA_CA_SIGNING_DIGEST', 'sha256') +TLS_STORAGE_DEFAULT = os.environ.get( + 'OS_OCTAVIA_TLS_STORAGE', '/var/lib/octavia/certificates/' +) + +certgen_opts = [ + cfg.StrOpt('ca_certificate', + default=TLS_CERT_DEFAULT, + help='Absolute path to the CA Certificate for signing. Defaults' + ' to env[OS_OCTAVIA_TLS_CA_CERT].'), + cfg.StrOpt('ca_private_key', + default=TLS_KEY_DEFAULT, + help='Absolute path to the Private Key for signing. Defaults' + ' to env[OS_OCTAVIA_TLS_CA_KEY].'), + cfg.StrOpt('ca_private_key_passphrase', + default=TLS_PKP_DEFAULT, + help='Passphrase for the Private Key. Defaults' + ' to env[OS_OCTAVIA_CA_KEY_PASS] or None.'), + cfg.StrOpt('signing_digest', + default=TLS_DIGEST_DEFAULT, + help='Certificate signing digest. Defaults' + ' to env[OS_OCTAVIA_CA_SIGNING_DIGEST] or "sha256".') +] + +certmgr_opts = [ + cfg.StrOpt('storage_path', + default=TLS_STORAGE_DEFAULT, + help='Absolute path to the certificate storage directory. ' + 'Defaults to env[OS_OCTAVIA_TLS_STORAGE].') +] + +CONF = cfg.CONF +CONF.register_opts(certgen_opts, group='certificates') +CONF.register_opts(certmgr_opts, group='certificates') + + +class LocalCert(cert.Cert): + """Representation of a Cert for local storage.""" + def __init__(self, certificate, private_key, intermediates=None, + private_key_passphrase=None): + self.certificate = certificate + self.intermediates = intermediates + self.private_key = private_key + self.private_key_passphrase = private_key_passphrase + + def get_certificate(self): + return self.certificate + + def get_intermediates(self): + return self.intermediates + + def get_private_key(self): + return self.private_key + + def get_private_key_passphrase(self): + return self.private_key_passphrase diff --git a/octavia/certificates/generator/__init__.py b/octavia/certificates/generator/__init__.py index b8bcaea3eb..acfd50a5fb 100644 --- a/octavia/certificates/generator/__init__.py +++ b/octavia/certificates/generator/__init__.py @@ -19,7 +19,7 @@ from octavia.openstack.common import importutils certgen_opts = [ cfg.StrOpt('cert_generator_class', - default='octavia.certificates.barbican.BarbicanCertGenerator', + default='octavia.certificates.generator.LocalCertGenerator', help='The full class name of the cert generator API class'), ] diff --git a/octavia/certificates/generator/local.py b/octavia/certificates/generator/local.py new file mode 100644 index 0000000000..b8470afb81 --- /dev/null +++ b/octavia/certificates/generator/local.py @@ -0,0 +1,108 @@ +# Copyright (c) 2014 Rackspace US, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import binascii +import os + +from OpenSSL import crypto +from oslo.config import cfg + +from octavia.certificates.generator import cert_gen +from octavia.common import exceptions +from octavia.openstack.common import gettextutils +from octavia.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + + +class LocalCertGenerator(cert_gen.CertGenerator): + """Cert Generator Interface that signs certs locally.""" + + @staticmethod + def _new_serial(): + return int(binascii.hexlify(os.urandom(20)), 16) + + @staticmethod + def sign_cert(csr, validity, ca_cert=None, ca_key=None, ca_key_pass=None, + ca_digest=None): + """Signs a certificate using our private CA based on the specified CSR + + The signed certificate will be valid from now until seconds + from now. + + :param csr: A Certificate Signing Request + :param validity: Valid for seconds from the current time + :param ca_cert: Signing Certificate (default: config) + :param ca_key: Signing Certificate Key (default: config) + :param ca_key_pass: Signing Certificate Key Pass (default: config) + :param ca_digest: Digest method to use for signing (default: config) + + :return: Signed certificate + :raises Exception: if certificate signing fails + """ + LOG.info(gettextutils._LI( + "Signing a certificate request using pyOpenSSL locally." + )) + if not ca_cert: + LOG.info(gettextutils._LI("Using CA Certificate from config.")) + try: + ca_cert = open(CONF.certificates.ca_certificate).read() + except IOError: + raise exceptions.CertificateGenerationException( + "Failed to load {0}." + .format(CONF.certificates.ca_certificate) + ) + if not ca_key: + LOG.info(gettextutils._LI("Using CA Private Key from config.")) + try: + ca_key = open(CONF.certificates.ca_private_key).read() + except IOError: + raise exceptions.CertificateGenerationException( + "Failed to load {0}." + .format(CONF.certificates.ca_certificate) + ) + if not ca_key_pass: + ca_key_pass = CONF.certificates.ca_private_key_passphrase + if ca_key_pass: + LOG.info(gettextutils._LI( + "Using CA Private Key Passphrase from config." + )) + else: + LOG.info(gettextutils._LI( + "No Passphrase found for CA Private Key, not using one." + )) + if not ca_digest: + ca_digest = CONF.certificates.signing_digest + + try: + lo_cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca_cert) + lo_key = crypto.load_privatekey(crypto.FILETYPE_PEM, ca_key, + passphrase=ca_key_pass) + lo_req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr) + + new_cert = crypto.X509() + new_cert.set_serial_number(LocalCertGenerator._new_serial()) + new_cert.gmtime_adj_notBefore(0) + new_cert.gmtime_adj_notAfter(validity) + new_cert.set_issuer(lo_cert.get_subject()) + new_cert.set_subject(lo_req.get_subject()) + new_cert.set_pubkey(lo_req.get_pubkey()) + new_cert.sign(lo_key, ca_digest) + + return crypto.dump_certificate(crypto.FILETYPE_PEM, new_cert) + except Exception as e: + LOG.error(gettextutils._LE("Unable to sign certificate.")) + raise exceptions.CertificateGenerationException(e) \ No newline at end of file diff --git a/octavia/certificates/manager/__init__.py b/octavia/certificates/manager/__init__.py index 598ea925e0..aaa54171af 100644 --- a/octavia/certificates/manager/__init__.py +++ b/octavia/certificates/manager/__init__.py @@ -19,7 +19,7 @@ from octavia.openstack.common import importutils certmgr_opts = [ cfg.StrOpt('cert_manager_class', - default='octavia.certificates.barbican.BarbicanCertManager', + default='octavia.certificates.manager.LocalCertManager', help='The full class name of the cert manager API class'), ] diff --git a/octavia/certificates/manager/local.py b/octavia/certificates/manager/local.py new file mode 100644 index 0000000000..7206266edd --- /dev/null +++ b/octavia/certificates/manager/local.py @@ -0,0 +1,167 @@ +# Copyright (c) 2014 Rackspace US, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os +import uuid + +from oslo.config import cfg + +from octavia.certificates.common import local as local_common +from octavia.certificates.manager import cert_mgr +from octavia.common import exceptions +from octavia.openstack.common import gettextutils +from octavia.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + + +class LocalCertManager(cert_mgr.CertManager): + """Cert Manager Interface that stores data locally.""" + + @staticmethod + def store_cert(certificate, private_key, intermediates=None, + private_key_passphrase=None, **kwargs): + """Stores (i.e., registers) a cert with the cert manager. + + This method stores the specified cert to the filesystem and returns + a UUID that can be used to retrieve it. + + :param certificate: PEM encoded TLS certificate + :param private_key: private key for the supplied certificate + :param intermediates: ordered and concatenated intermediate certs + :param private_key_passphrase: optional passphrase for the supplied key + + :returns: the UUID of the stored cert + :raises CertificateStorageException: if certificate storage fails + """ + cert_ref = str(uuid.uuid4()) + filename_base = os.path.join(CONF.certificates.storage_path, cert_ref) + + LOG.info(gettextutils._LI( + "Storing certificate data on the local filesystem." + )) + try: + filename_certificate = "{0}.crt".format(filename_base, cert_ref) + with open(filename_certificate, 'w') as cert_file: + cert_file.write(certificate) + + filename_private_key = "{0}.key".format(filename_base, cert_ref) + with open(filename_private_key, 'w') as key_file: + key_file.write(private_key) + + if intermediates: + filename_intermediates = "{0}.int".format(filename_base, + cert_ref) + with open(filename_intermediates, 'w') as int_file: + int_file.write(intermediates) + + if private_key_passphrase: + filename_pkp = "{0}.pass".format(filename_base, cert_ref) + with open(filename_pkp, 'w') as pass_file: + pass_file.write(private_key_passphrase) + except IOError as ioe: + LOG.error(gettextutils._LE("Failed to store certificate.")) + raise exceptions.CertificateStorageException(message=ioe.message) + + return cert_ref + + @staticmethod + def get_cert(cert_ref, **kwargs): + """Retrieves the specified cert. + + :param cert_ref: the UUID of the cert to retrieve + + :return: octavia.certificates.common.Cert representation of the + certificate data + :raises CertificateStorageException: if certificate retrieval fails + """ + LOG.info(gettextutils._LI( + "Loading certificate {0} from the local filesystem." + ).format(cert_ref)) + + filename_base = os.path.join(CONF.certificates.storage_path, cert_ref) + + filename_certificate = "{0}.crt".format(filename_base, cert_ref) + filename_private_key = "{0}.key".format(filename_base, cert_ref) + filename_intermediates = "{0}.int".format(filename_base, cert_ref) + filename_pkp = "{0}.pass".format(filename_base, cert_ref) + + cert_data = dict() + + try: + with open(filename_certificate, 'r') as cert_file: + cert_data['certificate'] = cert_file.read() + except IOError: + LOG.error(gettextutils._LE( + "Failed to read certificate for {0}." + ).format(cert_ref)) + raise exceptions.CertificateStorageException( + msg="Certificate could not be read." + ) + try: + with open(filename_private_key, 'r') as key_file: + cert_data['private_key'] = key_file.read() + except IOError: + LOG.error(gettextutils._LE( + "Failed to read private key for {0}." + ).format(cert_ref)) + raise exceptions.CertificateStorageException( + msg="Private Key could not be read." + ) + + try: + with open(filename_intermediates, 'r') as int_file: + cert_data['intermediates'] = int_file.read() + except IOError: + pass + + try: + with open(filename_pkp, 'r') as pass_file: + cert_data['private_key_passphrase'] = pass_file.read() + except IOError: + pass + + return local_common.LocalCert(**cert_data) + + @staticmethod + def delete_cert(cert_ref, **kwargs): + """Deletes the specified cert. + + :param cert_ref: the UUID of the cert to delete + + :raises CertificateStorageException: if certificate deletion fails + """ + LOG.info(gettextutils._LI( + "Deleting certificate {0} from the local filesystem." + ).format(cert_ref)) + + filename_base = os.path.join(CONF.certificates.storage_path, cert_ref) + + filename_certificate = "{0}.crt".format(filename_base, cert_ref) + filename_private_key = "{0}.key".format(filename_base, cert_ref) + filename_intermediates = "{0}.int".format(filename_base, cert_ref) + filename_pkp = "{0}.pass".format(filename_base, cert_ref) + + try: + os.remove(filename_certificate) + os.remove(filename_private_key) + os.remove(filename_intermediates) + os.remove(filename_pkp) + except IOError as ioe: + LOG.error(gettextutils._LE( + "Failed to delete certificate {0}." + ).format(cert_ref)) + raise exceptions.CertificateStorageException(message=ioe.message) diff --git a/octavia/tests/unit/certificates/common/test_local.py b/octavia/tests/unit/certificates/common/test_local.py new file mode 100644 index 0000000000..742b7f4138 --- /dev/null +++ b/octavia/tests/unit/certificates/common/test_local.py @@ -0,0 +1,42 @@ +# Copyright 2014 Rackspace US, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import octavia.certificates.common.local as local_cert +import octavia.tests.unit.base as base + + +class TestLocalCommon(base.TestCase): + + def setUp(self): + self.certificate = "My Certificate" + self.intermediates = "My Intermediates" + self.private_key = "My Private Key" + self.private_key_passphrase = "My Private Key Passphrase" + + super(TestLocalCommon, self).setUp() + + def test_local_cert(self): + # Create a cert + cert = local_cert.LocalCert( + certificate=self.certificate, + intermediates=self.intermediates, + private_key=self.private_key, + private_key_passphrase=self.private_key_passphrase + ) + + # Validate the cert functions + self.assertEqual(cert.get_certificate(), self.certificate) + self.assertEqual(cert.get_intermediates(), self.intermediates) + self.assertEqual(cert.get_private_key(), self.private_key) + self.assertEqual(cert.get_private_key_passphrase(), + self.private_key_passphrase) diff --git a/octavia/tests/unit/certificates/generator/test_local.py b/octavia/tests/unit/certificates/generator/test_local.py new file mode 100644 index 0000000000..d14acaaa41 --- /dev/null +++ b/octavia/tests/unit/certificates/generator/test_local.py @@ -0,0 +1,90 @@ +# Copyright 2014 Rackspace US, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import datetime + +from OpenSSL import crypto + +import octavia.certificates.generator.local as local_cert_gen +import octavia.tests.unit.base as base + + +class TestLocalGenerator(base.TestCase): + + def setUp(self): + self.signing_digest = "sha256" + + # Set up CSR data + csr_key = crypto.PKey() + csr_key.generate_key(crypto.TYPE_RSA, 2048) + csr = crypto.X509Req() + csr.set_pubkey(csr_key) + self.certificate_signing_request = crypto.dump_certificate_request( + crypto.FILETYPE_PEM, csr + ) + + # Set up CA data + ca_key = crypto.PKey() + ca_key.generate_key(crypto.TYPE_RSA, 2048) + + self.ca_private_key_passphrase = "Testing" + self.ca_private_key = crypto.dump_privatekey( + crypto.FILETYPE_PEM, + ca_key, + 'aes-256-cbc', + self.ca_private_key_passphrase + ) + + ca_cert = crypto.X509() + ca_subject = ca_cert.get_subject() + ca_subject.C = "US" + ca_subject.ST = "Oregon" + ca_subject.L = "Springfield" + ca_subject.O = "Springfield Nuclear Power Plant" + ca_subject.OU = "Section 7-G" + ca_subject.CN = "maggie1" + ca_cert.set_issuer(ca_cert.get_subject()) + ca_cert.set_pubkey(ca_key) + ca_cert.gmtime_adj_notBefore(0) + ca_cert.gmtime_adj_notAfter(2 * 365 * 24 * 60 * 60) + ca_cert.sign(ca_key, self.signing_digest) + + self.ca_certificate = crypto.dump_certificate( + crypto.FILETYPE_PEM, ca_cert + ) + + super(TestLocalGenerator, self).setUp() + + def test_sign_cert(self): + # Attempt sign a cert + signed_cert = local_cert_gen.LocalCertGenerator.sign_cert( + csr=self.certificate_signing_request, + validity=2 * 365 * 24 * 60 * 60, + ca_cert=self.ca_certificate, + ca_key=self.ca_private_key, + ca_key_pass=self.ca_private_key_passphrase, + ca_digest=self.signing_digest + ) + + self.assertIn("-----BEGIN CERTIFICATE-----", signed_cert) + + # Load the cert for specific tests + cert = crypto.load_certificate(crypto.FILETYPE_PEM, signed_cert) + + # Make sure expiry time is accurate + expires = datetime.datetime.strptime(cert.get_notAfter(), + '%Y%m%d%H%M%SZ') + should_expire = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=2 * 365 * 24 * 60 * 60)) + diff = should_expire - expires + self.assertTrue(diff < datetime.timedelta(seconds=10)) diff --git a/octavia/tests/unit/certificates/manager/test_local.py b/octavia/tests/unit/certificates/manager/test_local.py new file mode 100644 index 0000000000..6d832e5290 --- /dev/null +++ b/octavia/tests/unit/certificates/manager/test_local.py @@ -0,0 +1,121 @@ +# Copyright 2014 Rackspace US, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os + +import mock +from oslo.config import cfg +from oslo.config import fixture as oslo_fixture + +import octavia.certificates.common.cert as cert +import octavia.certificates.manager.local as local_cert_mgr +import octavia.tests.unit.base as base + + +class TestLocalGenerator(base.TestCase): + + def setUp(self): + self.certificate = "My Certificate" + self.intermediates = "My Intermediates" + self.private_key = "My Private Key" + self.private_key_passphrase = "My Private Key Passphrase" + + conf = oslo_fixture.Config(cfg.CONF) + conf.config(group="certificates", storage_path="/tmp/") + + super(TestLocalGenerator, self).setUp() + + def _store_cert(self): + file_mock = mock.mock_open() + # Attempt to store the cert + with mock.patch('__builtin__.open', file_mock, create=True): + cert_id = local_cert_mgr.LocalCertManager.store_cert( + certificate=self.certificate, + intermediates=self.intermediates, + private_key=self.private_key, + private_key_passphrase=self.private_key_passphrase + ) + + # Check that something came back + self.assertIsNotNone(cert_id) + + # Verify the correct files were opened + file_mock.assert_has_calls([ + mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'w'), + mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'w'), + mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'w'), + mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'w') + ], any_order=True) + + # Verify the writes were made + file_mock().write.assert_has_calls([ + mock.call(self.certificate), + mock.call(self.intermediates), + mock.call(self.private_key), + mock.call(self.private_key_passphrase) + ], any_order=True) + + return cert_id + + def _get_cert(self, cert_id): + file_mock = mock.mock_open() + # Attempt to retrieve the cert + with mock.patch('__builtin__.open', file_mock, create=True): + data = local_cert_mgr.LocalCertManager.get_cert(cert_id) + + # Verify the correct files were opened + file_mock.assert_has_calls([ + mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'r'), + mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'r'), + mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'r'), + mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'r') + ], any_order=True) + + # The returned data should be a Cert object + self.assertIsInstance(data, cert.Cert) + + return data + + def _delete_cert(self, cert_id): + remove_mock = mock.Mock() + # Delete the cert + with mock.patch('os.remove', remove_mock): + local_cert_mgr.LocalCertManager.delete_cert(cert_id) + + # Verify the correct files were removed + remove_mock.assert_has_calls([ + mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id))), + mock.call(os.path.join('/tmp/{0}.key'.format(cert_id))), + mock.call(os.path.join('/tmp/{0}.int'.format(cert_id))), + mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id))) + ], any_order=True) + + def test_store_cert(self): + self._store_cert() + + def test_get_cert(self): + # Store a cert + cert_id = self._store_cert() + + # Get the cert + self._get_cert(cert_id) + + def test_delete_cert(self): + # Store a cert + cert_id = self._store_cert() + + # Verify the cert exists + self._get_cert(cert_id) + + # Delete the cert + self._delete_cert(cert_id) diff --git a/requirements.txt b/requirements.txt index d31af4d899..e55f71b480 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,3 +30,5 @@ oslo.rootwrap>=1.3.0.0a1 python-keystoneclient>=0.11.1 python-novaclient>=2.17.0 posix_ipc + +pyOpenSSL>=0.14