From 75c4361a11aaf11e61378d947f3f388fc105a1f7 Mon Sep 17 00:00:00 2001 From: Adam Harwell Date: Tue, 9 May 2017 15:45:39 -0400 Subject: [PATCH] Reintroducing local certificate manager, as it is useful for testing Change-Id: I6616d815135ed00662c3b921796f5e988208bdf8 --- octavia/certificates/manager/local.py | 162 ++++++++++++++++++ .../unit/certificates/manager/test_local.py | 135 +++++++++++++++ setup.cfg | 1 + 3 files changed, 298 insertions(+) create mode 100644 octavia/certificates/manager/local.py create mode 100644 octavia/tests/unit/certificates/manager/test_local.py diff --git a/octavia/certificates/manager/local.py b/octavia/certificates/manager/local.py new file mode 100644 index 0000000000..03b7ff8428 --- /dev/null +++ b/octavia/certificates/manager/local.py @@ -0,0 +1,162 @@ +# Copyright (c) 2014 Rackspace US, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os +import stat +import uuid + +from oslo_config import cfg +from oslo_log import log as logging + +from octavia.certificates.common import local as local_common +from octavia.certificates.manager import cert_mgr +from octavia.common import exceptions + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class LocalCertManager(cert_mgr.CertManager): + """Cert Manager Interface that stores data locally.""" + + @staticmethod + def store_cert(project_id, certificate, private_key, intermediates=None, + private_key_passphrase=None, **kwargs): + """Stores (i.e., registers) a cert with the cert manager. + + This method stores the specified cert to the filesystem and returns + a UUID that can be used to retrieve it. + + :param project_id: Ignored in this implementation + :param certificate: PEM encoded TLS certificate + :param private_key: private key for the supplied certificate + :param intermediates: ordered and concatenated intermediate certs + :param private_key_passphrase: optional passphrase for the supplied key + + :returns: the UUID of the stored cert + :raises CertificateStorageException: if certificate storage fails + """ + cert_ref = str(uuid.uuid4()) + filename_base = os.path.join(CONF.certificates.storage_path, cert_ref) + + LOG.info("Storing certificate data on the local filesystem.") + try: + filename_certificate = "{0}.crt".format(filename_base) + flags = os.O_WRONLY | os.O_CREAT + mode = stat.S_IRUSR | stat.S_IWUSR # mode 0600 + with os.fdopen(os.open( + filename_certificate, flags, mode), 'w') as cert_file: + cert_file.write(certificate) + + filename_private_key = "{0}.key".format(filename_base) + with os.fdopen(os.open( + filename_private_key, flags, mode), 'w') as key_file: + key_file.write(private_key) + + if intermediates: + filename_intermediates = "{0}.int".format(filename_base) + with os.fdopen(os.open( + filename_intermediates, flags, mode), 'w') as int_file: + int_file.write(intermediates) + + if private_key_passphrase: + filename_pkp = "{0}.pass".format(filename_base) + with os.fdopen(os.open( + filename_pkp, flags, mode), 'w') as pass_file: + pass_file.write(private_key_passphrase) + except IOError as ioe: + LOG.error("Failed to store certificate.") + raise exceptions.CertificateStorageException(message=ioe.message) + + return cert_ref + + @staticmethod + def get_cert(project_id, cert_ref, **kwargs): + """Retrieves the specified cert. + + :param project_id: Ignored in this implementation + :param cert_ref: the UUID of the cert to retrieve + + :return: octavia.certificates.common.Cert representation of the + certificate data + :raises CertificateStorageException: if certificate retrieval fails + """ + LOG.info("Loading certificate %s from the local filesystem.", cert_ref) + + filename_base = os.path.join(CONF.certificates.storage_path, cert_ref) + + filename_certificate = "{0}.crt".format(filename_base) + filename_private_key = "{0}.key".format(filename_base) + filename_intermediates = "{0}.int".format(filename_base) + filename_pkp = "{0}.pass".format(filename_base) + + cert_data = dict() + + flags = os.O_RDONLY + try: + with os.fdopen(os.open(filename_certificate, flags)) as cert_file: + cert_data['certificate'] = cert_file.read() + except IOError: + LOG.error("Failed to read certificate for %s.", cert_ref) + raise exceptions.CertificateStorageException( + msg="Certificate could not be read.") + try: + with os.fdopen(os.open(filename_private_key, flags)) as key_file: + cert_data['private_key'] = key_file.read() + except IOError: + LOG.error("Failed to read private key for %s", cert_ref) + raise exceptions.CertificateStorageException( + msg="Private Key could not be read.") + + try: + with os.fdopen(os.open(filename_intermediates, flags)) as int_file: + cert_data['intermediates'] = int_file.read() + except IOError: + pass + + try: + with os.fdopen(os.open(filename_pkp, flags)) as pass_file: + cert_data['private_key_passphrase'] = pass_file.read() + except IOError: + pass + + return local_common.LocalCert(**cert_data) + + @staticmethod + def delete_cert(project_id, cert_ref, **kwargs): + """Deletes the specified cert. + + :param project_id: Ignored in this implementation + :param cert_ref: the UUID of the cert to delete + + :raises CertificateStorageException: if certificate deletion fails + """ + LOG.info("Deleting certificate %s from the local filesystem.", + cert_ref) + + filename_base = os.path.join(CONF.certificates.storage_path, cert_ref) + + filename_certificate = "{0}.crt".format(filename_base) + filename_private_key = "{0}.key".format(filename_base) + filename_intermediates = "{0}.int".format(filename_base) + filename_pkp = "{0}.pass".format(filename_base) + + try: + os.remove(filename_certificate) + os.remove(filename_private_key) + os.remove(filename_intermediates) + os.remove(filename_pkp) + except IOError as ioe: + LOG.error("Failed to delete certificate %s", cert_ref) + raise exceptions.CertificateStorageException(message=ioe.message) diff --git a/octavia/tests/unit/certificates/manager/test_local.py b/octavia/tests/unit/certificates/manager/test_local.py new file mode 100644 index 0000000000..6c4f049447 --- /dev/null +++ b/octavia/tests/unit/certificates/manager/test_local.py @@ -0,0 +1,135 @@ +# Copyright 2014 Rackspace US, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import stat + +import mock +from oslo_config import cfg +from oslo_config import fixture as oslo_fixture + +import octavia.certificates.common.cert as cert +import octavia.certificates.manager.local as local_cert_mgr +import octavia.tests.unit.base as base + + +class TestLocalManager(base.TestCase): + + def setUp(self): + self.certificate = "My Certificate" + self.intermediates = "My Intermediates" + self.private_key = "My Private Key" + self.private_key_passphrase = "My Private Key Passphrase" + + conf = oslo_fixture.Config(cfg.CONF) + conf.config(group="certificates", storage_path="/tmp/") + + super(TestLocalManager, self).setUp() + + def _store_cert(self): + fd_mock = mock.mock_open() + open_mock = mock.Mock() + # Attempt to store the cert + with mock.patch('os.open', open_mock), mock.patch.object( + os, 'fdopen', fd_mock): + cert_id = local_cert_mgr.LocalCertManager.store_cert( + None, + certificate=self.certificate, + intermediates=self.intermediates, + private_key=self.private_key, + private_key_passphrase=self.private_key_passphrase + ) + + # Check that something came back + self.assertIsNotNone(cert_id) + + # Verify the correct files were opened + flags = os.O_WRONLY | os.O_CREAT + mode = stat.S_IRUSR | stat.S_IWUSR # mode 0600 + open_mock.assert_has_calls([ + mock.call( + os.path.join('/tmp/{0}.crt'.format(cert_id)), flags, mode), + mock.call( + os.path.join('/tmp/{0}.key'.format(cert_id)), flags, mode), + mock.call( + os.path.join('/tmp/{0}.int'.format(cert_id)), flags, mode), + mock.call( + os.path.join('/tmp/{0}.pass'.format(cert_id)), flags, mode) + ], any_order=True) + + # Verify the writes were made + fd_mock().write.assert_has_calls([ + mock.call(self.certificate), + mock.call(self.intermediates), + mock.call(self.private_key), + mock.call(self.private_key_passphrase) + ], any_order=True) + + return cert_id + + def _get_cert(self, cert_id): + fd_mock = mock.mock_open() + open_mock = mock.Mock() + # Attempt to retrieve the cert + with mock.patch('os.open', open_mock), mock.patch.object( + os, 'fdopen', fd_mock): + data = local_cert_mgr.LocalCertManager.get_cert(None, cert_id) + + # Verify the correct files were opened + flags = os.O_RDONLY + open_mock.assert_has_calls([ + mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), flags), + mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), flags), + mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), flags), + mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), flags) + ], any_order=True) + + # The returned data should be a Cert object + self.assertIsInstance(data, cert.Cert) + + return data + + def _delete_cert(self, cert_id): + remove_mock = mock.Mock() + # Delete the cert + with mock.patch('os.remove', remove_mock): + local_cert_mgr.LocalCertManager.delete_cert(None, cert_id) + + # Verify the correct files were removed + remove_mock.assert_has_calls([ + mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id))), + mock.call(os.path.join('/tmp/{0}.key'.format(cert_id))), + mock.call(os.path.join('/tmp/{0}.int'.format(cert_id))), + mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id))) + ], any_order=True) + + def test_store_cert(self): + self._store_cert() + + def test_get_cert(self): + # Store a cert + cert_id = self._store_cert() + + # Get the cert + self._get_cert(cert_id) + + def test_delete_cert(self): + # Store a cert + cert_id = self._store_cert() + + # Verify the cert exists + self._get_cert(cert_id) + + # Delete the cert + self._delete_cert(cert_id) diff --git a/setup.cfg b/setup.cfg index fa30fec518..44d6a8be70 100644 --- a/setup.cfg +++ b/setup.cfg @@ -79,6 +79,7 @@ octavia.cert_generator = local_cert_generator = octavia.certificates.generator.local:LocalCertGenerator anchor_cert_generator = octavia.certificates.generator.anchor:AnchorCertGenerator octavia.cert_manager = + local_cert_manager = octavia.certificates.manager.local:LocalCertManager barbican_cert_manager = octavia.certificates.manager.barbican:BarbicanCertManager octavia.barbican_auth = barbican_acl_auth = octavia.certificates.common.auth.barbican_acl:BarbicanACLAuth