Add CertManager to store CA and client certificate

To implement TLS support, we should store CA and client cert for each
bay. This patch adds common library to store cert to Barbican.
Magnum uses service admin privilege to store the cert, this means that
end user can't retrieve CA cert and private key from Barbican
directly.

This patch is copied from neutron-lbaas project.
*  I435189b2637e32803a13ebd4951e61fac4ab234d

Change-Id: I519228d9749ad610db3e0c698caa1144813f9d52
Partial-Implements: blueprint magnum-as-a-ca
This commit is contained in:
OTSUKA, Yuanying 2015-08-13 16:41:37 +09:00
parent 04691e75ce
commit 1727c1728f
12 changed files with 1106 additions and 0 deletions

View File

@ -330,6 +330,20 @@
#bay_create_timeout = <None>
[certificates]
#
# From magnum
#
# Certificate Manager plugin. Defaults to barbican. (string value)
#cert_manager_type = barbican
# Absolute path of the certificate storage directory. Defaults to
# /var/lib/magnum/certificates/. (string value)
#storage_path = /var/lib/magnum/certificates/
[conductor]
#

View File

@ -0,0 +1,40 @@
# Copyright 2015 Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from stevedore import driver
CONF = cfg.CONF
DEFAULT_CERT_MANAGER = 'barbican'
cert_manager_opts = [
cfg.StrOpt('cert_manager_type',
default=DEFAULT_CERT_MANAGER,
help='Certificate Manager plugin. '
'Defaults to {0}.'.format(DEFAULT_CERT_MANAGER))
]
CONF.register_opts(cert_manager_opts, group='certificates')
_CERT_MANAGER_PLUGIN = None
def get_backend():
global _CERT_MANAGER_PLUGIN
if not _CERT_MANAGER_PLUGIN:
_CERT_MANAGER_PLUGIN = driver.DriverManager(
"magnum.cert_manager.backend",
cfg.CONF.certificates.cert_manager_type).driver
return _CERT_MANAGER_PLUGIN

View File

@ -0,0 +1,243 @@
# Copyright 2014, 2015 Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from barbicanclient import client as barbican_client
from oslo_log import log as logging
from oslo_utils import excutils
from magnum.common.cert_manager import cert_manager
from magnum.common import clients
from magnum.common import context
from magnum.i18n import _
from magnum.i18n import _LE
from magnum.i18n import _LI
from magnum.i18n import _LW
LOG = logging.getLogger(__name__)
class Cert(cert_manager.Cert):
"""Representation of a Cert based on the Barbican CertificateContainer."""
def __init__(self, cert_container):
if not isinstance(cert_container,
barbican_client.containers.CertificateContainer):
raise TypeError(_(
"Retrieved Barbican Container is not of the correct type "
"(certificate)."))
self._cert_container = cert_container
# Container secrets are accessed upon query and can return as None,
# don't return the payload if the secret is not available.
def get_certificate(self):
if self._cert_container.certificate:
return self._cert_container.certificate.payload
def get_intermediates(self):
if self._cert_container.intermediates:
return self._cert_container.intermediates.payload
def get_private_key(self):
if self._cert_container.private_key:
return self._cert_container.private_key.payload
def get_private_key_passphrase(self):
if self._cert_container.private_key_passphrase:
return self._cert_container.private_key_passphrase.payload
_ADMIN_OSC = None
def get_admin_clients():
global _ADMIN_OSC
if not _ADMIN_OSC:
_ADMIN_OSC = clients.OpenStackClients(
context.RequestContext(is_admin=True))
return _ADMIN_OSC
class CertManager(cert_manager.CertManager):
"""Certificate Manager that wraps the Barbican client API."""
@staticmethod
def store_cert(certificate, private_key, intermediates=None,
private_key_passphrase=None, expiration=None,
name='Magnum TLS Cert', **kwargs):
"""Stores a certificate in the certificate manager.
:param certificate: PEM encoded TLS certificate
:param private_key: private key for the supplied certificate
:param intermediates: ordered and concatenated intermediate certs
:param private_key_passphrase: optional passphrase for the supplied key
:param expiration: the expiration time of the cert in ISO 8601 format
:param name: a friendly name for the cert
:returns: the container_ref of the stored cert
:raises Exception: if certificate storage fails
"""
connection = get_admin_clients().barbican()
LOG.info(_LI(
"Storing certificate container '{0}' in Barbican."
).format(name))
certificate_secret = None
private_key_secret = None
intermediates_secret = None
pkp_secret = None
try:
certificate_secret = connection.secrets.create(
payload=certificate,
expiration=expiration,
name="Certificate"
)
private_key_secret = connection.secrets.create(
payload=private_key,
expiration=expiration,
name="Private Key"
)
certificate_container = connection.containers.create_certificate(
name=name,
certificate=certificate_secret,
private_key=private_key_secret
)
if intermediates:
intermediates_secret = connection.secrets.create(
payload=intermediates,
expiration=expiration,
name="Intermediates"
)
certificate_container.intermediates = intermediates_secret
if private_key_passphrase:
pkp_secret = connection.secrets.create(
payload=private_key_passphrase,
expiration=expiration,
name="Private Key Passphrase"
)
certificate_container.private_key_passphrase = pkp_secret
certificate_container.store()
return certificate_container.container_ref
# Barbican (because of Keystone-middleware) sometimes masks
# exceptions strangely -- this will catch anything that it raises and
# reraise the original exception, while also providing useful
# feedback in the logs for debugging
except Exception:
for secret in [certificate_secret, private_key_secret,
intermediates_secret, pkp_secret]:
if secret and secret.secret_ref:
old_ref = secret.secret_ref
try:
secret.delete()
LOG.info(_LI(
"Deleted secret {0} ({1}) during rollback."
).format(secret.name, old_ref))
except Exception:
LOG.warning(_LW(
"Failed to delete {0} ({1}) during rollback. This "
"is probably not a problem."
).format(secret.name, old_ref))
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error storing certificate data"))
@staticmethod
def get_cert(cert_ref, service_name='Magnum', resource_ref=None,
check_only=False, **kwargs):
"""Retrieves the specified cert and registers as a consumer.
:param cert_ref: the UUID of the cert to retrieve
:param service_name: Friendly name for the consuming service
:param resource_ref: Full HATEOAS reference to the consuming resource
:param check_only: Read Certificate data without registering
:return: Magnum.certificates.common.Cert representation of the
certificate data
:raises Exception: if certificate retrieval fails
"""
connection = get_admin_clients().barbican()
LOG.info(_LI(
"Loading certificate container {0} from Barbican."
).format(cert_ref))
try:
if check_only:
cert_container = connection.containers.get(
container_ref=cert_ref
)
else:
cert_container = connection.containers.register_consumer(
container_ref=cert_ref,
name=service_name,
url=resource_ref
)
return Cert(cert_container)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error getting {0}").format(cert_ref))
@staticmethod
def delete_cert(cert_ref, service_name='Magnum', resource_ref=None,
**kwargs):
"""Deregister as a consumer for the specified cert.
:param cert_ref: the UUID of the cert to retrieve
:param service_name: Friendly name for the consuming service
:param resource_ref: Full HATEOAS reference to the consuming resource
:raises Exception: if deregistration fails
"""
connection = get_admin_clients().barbican()
LOG.info(_LI(
"Deregistering as a consumer of {0} in Barbican."
).format(cert_ref))
try:
connection.containers.remove_consumer(
container_ref=cert_ref,
name=service_name,
url=resource_ref
)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE(
"Error deregistering as a consumer of {0}"
).format(cert_ref))
@staticmethod
def _actually_delete_cert(cert_ref):
"""Deletes the specified cert. Very dangerous. Do not recommend.
:param cert_ref: the UUID of the cert to delete
:raises Exception: if certificate deletion fails
"""
connection = get_admin_clients().barbican()
LOG.info(_LI(
"Recursively deleting certificate container {0} from Barbican."
).format(cert_ref))
try:
certificate_container = connection.containers.get(cert_ref)
certificate_container.certificate.delete()
if certificate_container.intermediates:
certificate_container.intermediates.delete()
if certificate_container.private_key_passphrase:
certificate_container.private_key_passphrase.delete()
certificate_container.private_key.delete()
certificate_container.delete()
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE(
"Error recursively deleting certificate container {0}"
).format(cert_ref))

View File

@ -0,0 +1,84 @@
# Copyright 2014, 2015 Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Certificate manager API
"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Cert(object):
"""Base class to represent all certificates."""
@abc.abstractmethod
def get_certificate(self):
"""Returns the certificate."""
pass
@abc.abstractmethod
def get_intermediates(self):
"""Returns the intermediate certificates."""
pass
@abc.abstractmethod
def get_private_key(self):
"""Returns the private key for the certificate."""
pass
@abc.abstractmethod
def get_private_key_passphrase(self):
"""Returns the passphrase for the private key."""
pass
@six.add_metaclass(abc.ABCMeta)
class CertManager(object):
"""Base Cert Manager Interface
A Cert Manager is responsible for managing certificates for TLS.
"""
@abc.abstractmethod
def store_cert(self, certificate, private_key, intermediates=None,
private_key_passphrase=None, expiration=None,
name='Magnum TLS Cert', **kwargs):
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert and returns its UUID that
identifies it within the cert manager.
If storage of the certificate data fails, a CertificateStorageException
should be raised.
"""
pass
@abc.abstractmethod
def get_cert(self, cert_uuid, check_only=False, **kwargs):
"""Retrieves the specified cert.
If check_only is True, don't perform any sort of registration.
If the specified cert does not exist, a CertificateStorageException
should be raised.
"""
pass
@abc.abstractmethod
def delete_cert(self, cert_uuid, **kwargs):
"""Deletes the specified cert.
If the specified cert does not exist, a CertificateStorageException
should be raised.
"""
pass

View File

@ -0,0 +1,209 @@
# Copyright 2014, 2015 Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import uuid
from oslo_config import cfg
from oslo_log import log as logging
from magnum.common.cert_manager import cert_manager
from magnum.common import exception
from magnum.i18n import _
from magnum.i18n import _LE
from magnum.i18n import _LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
TLS_STORAGE_DEFAULT = '/var/lib/magnum/certificates/'
local_cert_manager_opts = [
cfg.StrOpt('storage_path',
default=TLS_STORAGE_DEFAULT,
help='Absolute path of the certificate storage directory. '
'Defaults to /var/lib/magnum/certificates/.')
]
CONF.register_opts(local_cert_manager_opts, group='certificates')
class Cert(cert_manager.Cert):
"""Representation of a Cert for local storage."""
def __init__(self, certificate, private_key, intermediates=None,
private_key_passphrase=None):
self.certificate = certificate
self.intermediates = intermediates
self.private_key = private_key
self.private_key_passphrase = private_key_passphrase
def get_certificate(self):
return self.certificate
def get_intermediates(self):
return self.intermediates
def get_private_key(self):
return self.private_key
def get_private_key_passphrase(self):
return self.private_key_passphrase
class CertManager(cert_manager.CertManager):
"""Cert Manager Interface that stores data locally.
This Cert Manager should be used for testing purpose.
"""
@staticmethod
def store_cert(certificate, private_key, intermediates=None,
private_key_passphrase=None, **kwargs):
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert to the filesystem and returns
a UUID that can be used to retrieve it.
:param certificate: PEM encoded TLS certificate
:param private_key: private key for the supplied certificate
:param intermediates: ordered and concatenated intermediate certs
:param private_key_passphrase: optional passphrase for the supplied key
:returns: the UUID of the stored cert
:raises CertificateStorageException: if certificate storage fails
"""
cert_ref = str(uuid.uuid4())
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
LOG.warn(_LW(
"Storing certificate data on the local filesystem. "
"CertManager type 'local' should be used for testing purpose."
))
try:
filename_certificate = "{0}.crt".format(filename_base)
with open(filename_certificate, 'w') as cert_file:
cert_file.write(certificate)
filename_private_key = "{0}.key".format(filename_base)
with open(filename_private_key, 'w') as key_file:
key_file.write(private_key)
if intermediates:
filename_intermediates = "{0}.int".format(filename_base)
with open(filename_intermediates, 'w') as int_file:
int_file.write(intermediates)
if private_key_passphrase:
filename_pkp = "{0}.pass".format(filename_base)
with open(filename_pkp, 'w') as pass_file:
pass_file.write(private_key_passphrase)
except IOError as ioe:
LOG.error(_LE("Failed to store certificate."))
raise exception.CertificateStorageException(msg=ioe.message)
return cert_ref
@staticmethod
def get_cert(cert_ref, **kwargs):
"""Retrieves the specified cert.
:param cert_ref: the UUID of the cert to retrieve
:return: magnum.common.cert_manager.cert_manager.Cert
representation of the certificate data
:raises CertificateStorageException: if certificate retrieval fails
"""
LOG.warn(_LW(
"Loading certificate {0} from the local filesystem. "
"CertManager type 'local' should be used for testing purpose."
).format(cert_ref))
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
filename_private_key = "{0}.key".format(filename_base)
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
cert_data = dict()
try:
with open(filename_certificate, 'r') as cert_file:
cert_data['certificate'] = cert_file.read()
except IOError:
LOG.error(_LE(
"Failed to read certificate for {0}."
).format(cert_ref))
raise exception.CertificateStorageException(
msg=_("Certificate could not be read.")
)
try:
with open(filename_private_key, 'r') as key_file:
cert_data['private_key'] = key_file.read()
except IOError:
LOG.error(_LE(
"Failed to read private key for {0}."
).format(cert_ref))
raise exception.CertificateStorageException(
msg=_("Private Key could not be read.")
)
try:
with open(filename_intermediates, 'r') as int_file:
cert_data['intermediates'] = int_file.read()
except IOError as ioe:
LOG.error(_LE("Failed to read certificate."))
raise exception.CertificateStorageException(msg=ioe.message)
try:
with open(filename_pkp, 'r') as pass_file:
cert_data['private_key_passphrase'] = pass_file.read()
except IOError as ioe:
LOG.error(_LE("Failed to read certificate."))
raise exception.CertificateStorageException(msg=ioe.message)
return Cert(**cert_data)
@staticmethod
def delete_cert(cert_ref, **kwargs):
"""Deletes the specified cert.
:param cert_ref: the UUID of the cert to delete
:raises CertificateStorageException: if certificate deletion fails
"""
LOG.warn(_LW(
"Deleting certificate {0} from the local filesystem. "
"CertManager type 'local' should be used for testing purpose."
).format(cert_ref))
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
filename_private_key = "{0}.key".format(filename_base)
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
try:
os.remove(filename_certificate)
os.remove(filename_private_key)
os.remove(filename_intermediates)
os.remove(filename_pkp)
except IOError as ioe:
LOG.error(_LE(
"Failed to delete certificate {0}."
).format(cert_ref))
raise exception.CertificateStorageException(msg=ioe.message)

View File

@ -475,3 +475,7 @@ class X509KeyPairNotFound(ResourceNotFound):
class X509KeyPairAlreadyExists(Conflict):
message = _("A key pair with UUID %(uuid)s already exists.")
class CertificateStorageException(MagnumException):
message = _("Could not store certificate: %(msg)s")

View File

@ -17,6 +17,8 @@ import itertools
import magnum.api.app
import magnum.api.auth
import magnum.common.cert_manager
from magnum.common.cert_manager import local_cert_manager
import magnum.common.clients
import magnum.common.exception
import magnum.common.magnum_keystoneclient
@ -49,6 +51,10 @@ def list_opts():
('glance_client', magnum.common.clients.glance_client_opts),
('barbican_client', magnum.common.clients.barbican_client_opts),
('bay_heat', magnum.conductor.handlers.bay_conductor.bay_heat_opts),
('certificates',
itertools.chain(magnum.common.cert_manager.cert_manager_opts,
local_cert_manager.local_cert_manager_opts,
)),
('kubernetes',
magnum.conductor.k8s_api.kubernetes_opts),
]

View File

@ -0,0 +1,314 @@
# Copyright 2014, 2015 Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from barbicanclient import client as barbican_client
from barbicanclient import containers
from barbicanclient import secrets
import mock
from mock import patch
from magnum.common.cert_manager import barbican_cert_manager as bcm
from magnum.common.cert_manager import cert_manager
from magnum.tests import base
class TestBarbicanCert(base.BaseTestCase):
def setUp(self):
# Certificate data
self.certificate = "My Certificate"
self.intermediates = "My Intermediates"
self.private_key = "My Private Key"
self.private_key_passphrase = "My Private Key Passphrase"
self.certificate_secret = barbican_client.secrets.Secret(
api=mock.MagicMock(),
payload=self.certificate
)
self.intermediates_secret = barbican_client.secrets.Secret(
api=mock.MagicMock(),
payload=self.intermediates
)
self.private_key_secret = barbican_client.secrets.Secret(
api=mock.MagicMock(),
payload=self.private_key
)
self.private_key_passphrase_secret = barbican_client.secrets.Secret(
api=mock.MagicMock(),
payload=self.private_key_passphrase
)
super(TestBarbicanCert, self).setUp()
def test_barbican_cert(self):
container = barbican_client.containers.CertificateContainer(
api=mock.MagicMock(),
certificate=self.certificate_secret,
intermediates=self.intermediates_secret,
private_key=self.private_key_secret,
private_key_passphrase=self.private_key_passphrase_secret
)
# Create a cert
cert = bcm.Cert(
cert_container=container
)
# Validate the cert functions
self.assertEqual(cert.get_certificate(), self.certificate)
self.assertEqual(cert.get_intermediates(), self.intermediates)
self.assertEqual(cert.get_private_key(), self.private_key)
self.assertEqual(cert.get_private_key_passphrase(),
self.private_key_passphrase)
def test_barbican_cert_none_values(self):
container = barbican_client.containers.CertificateContainer(
api=mock.MagicMock(),
certificate=None,
intermediates=None,
private_key=None,
private_key_passphrase=None
)
# Create a cert
cert = bcm.Cert(
cert_container=container
)
# Validate the cert functions
self.assertEqual(cert.get_certificate(), None)
self.assertEqual(cert.get_intermediates(), None)
self.assertEqual(cert.get_private_key(), None)
self.assertEqual(cert.get_private_key_passphrase(), None)
class TestBarbicanManager(base.BaseTestCase):
def setUp(self):
# Make a fake Container and contents
self.barbican_endpoint = 'http://localhost:9311/v1'
self.container_uuid = uuid.uuid4()
self.container_ref = '{0}/containers/{1}'.format(
self.barbican_endpoint, self.container_uuid
)
self.name = 'My Fancy Cert'
self.private_key = mock.Mock(spec=secrets.Secret)
self.certificate = mock.Mock(spec=secrets.Secret)
self.intermediates = mock.Mock(spec=secrets.Secret)
self.private_key_passphrase = mock.Mock(spec=secrets.Secret)
container = mock.Mock(spec=containers.CertificateContainer)
container.container_ref = self.container_ref
container.name = self.name
container.private_key = self.private_key
container.certificate = self.certificate
container.intermediates = self.intermediates
container.private_key_passphrase = self.private_key_passphrase
self.container = container
self.empty_container = mock.Mock(spec=containers.CertificateContainer)
self.secret1 = mock.Mock(spec=secrets.Secret)
self.secret2 = mock.Mock(spec=secrets.Secret)
self.secret3 = mock.Mock(spec=secrets.Secret)
self.secret4 = mock.Mock(spec=secrets.Secret)
super(TestBarbicanManager, self).setUp()
@patch('magnum.common.clients.OpenStackClients.barbican')
def test_store_cert(self, mock_barbican):
# Mock out the client
bc = mock.MagicMock()
bc.containers.create_certificate.return_value = self.empty_container
mock_barbican.return_value = bc
# Attempt to store a cert
bcm.CertManager.store_cert(
certificate=self.certificate,
private_key=self.private_key,
intermediates=self.intermediates,
private_key_passphrase=self.private_key_passphrase,
name=self.name
)
# create_secret should be called four times with our data
calls = [
mock.call(payload=self.certificate, expiration=None,
name=mock.ANY),
mock.call(payload=self.private_key, expiration=None,
name=mock.ANY),
mock.call(payload=self.intermediates, expiration=None,
name=mock.ANY),
mock.call(payload=self.private_key_passphrase, expiration=None,
name=mock.ANY)
]
bc.secrets.create.assert_has_calls(calls, any_order=True)
# create_certificate should be called once
self.assertEqual(bc.containers.create_certificate.call_count, 1)
# Container should be stored once
self.empty_container.store.assert_called_once_with()
@patch('magnum.common.clients.OpenStackClients.barbican')
def test_store_cert_failure(self, mock_barbican):
# Mock out the client
bc = mock.MagicMock()
bc.containers.create_certificate.return_value = self.empty_container
test_secrets = [
self.secret1,
self.secret2,
self.secret3,
self.secret4
]
bc.secrets.create.side_effect = test_secrets
self.empty_container.store.side_effect = ValueError()
mock_barbican.return_value = bc
# Attempt to store a cert
self.assertRaises(
ValueError,
bcm.CertManager.store_cert,
certificate=self.certificate,
private_key=self.private_key,
intermediates=self.intermediates,
private_key_passphrase=self.private_key_passphrase,
name=self.name
)
# create_secret should be called four times with our data
calls = [
mock.call(payload=self.certificate, expiration=None,
name=mock.ANY),
mock.call(payload=self.private_key, expiration=None,
name=mock.ANY),
mock.call(payload=self.intermediates, expiration=None,
name=mock.ANY),
mock.call(payload=self.private_key_passphrase, expiration=None,
name=mock.ANY)
]
bc.secrets.create.assert_has_calls(calls, any_order=True)
# create_certificate should be called once
self.assertEqual(bc.containers.create_certificate.call_count, 1)
# Container should be stored once
self.empty_container.store.assert_called_once_with()
# All secrets should be deleted (or at least an attempt made)
for s in test_secrets:
s.delete.assert_called_once_with()
@patch('magnum.common.clients.OpenStackClients.barbican')
def test_get_cert(self, mock_barbican):
# Mock out the client
bc = mock.MagicMock()
bc.containers.register_consumer.return_value = self.container
mock_barbican.return_value = bc
# Get the container data
data = bcm.CertManager.get_cert(
cert_ref=self.container_ref,
resource_ref=self.container_ref,
service_name='Magnum'
)
# 'register_consumer' should be called once with the container_ref
bc.containers.register_consumer.assert_called_once_with(
container_ref=self.container_ref,
url=self.container_ref,
name='Magnum'
)
# The returned data should be a Cert object with the correct values
self.assertIsInstance(data, cert_manager.Cert)
self.assertEqual(data.get_private_key(),
self.private_key.payload)
self.assertEqual(data.get_certificate(),
self.certificate.payload)
self.assertEqual(data.get_intermediates(),
self.intermediates.payload)
self.assertEqual(data.get_private_key_passphrase(),
self.private_key_passphrase.payload)
@patch('magnum.common.clients.OpenStackClients.barbican')
def test_get_cert_no_registration(self, mock_barbican):
# Mock out the client
bc = mock.MagicMock()
bc.containers.get.return_value = self.container
mock_barbican.return_value = bc
# Get the container data
data = bcm.CertManager.get_cert(
cert_ref=self.container_ref, check_only=True
)
# 'get' should be called once with the container_ref
bc.containers.get.assert_called_once_with(
container_ref=self.container_ref
)
# The returned data should be a Cert object with the correct values
self.assertIsInstance(data, cert_manager.Cert)
self.assertEqual(data.get_private_key(),
self.private_key.payload)
self.assertEqual(data.get_certificate(),
self.certificate.payload)
self.assertEqual(data.get_intermediates(),
self.intermediates.payload)
self.assertEqual(data.get_private_key_passphrase(),
self.private_key_passphrase.payload)
@patch('magnum.common.clients.OpenStackClients.barbican')
def test_delete_cert(self, mock_barbican):
# Mock out the client
bc = mock.MagicMock()
mock_barbican.return_value = bc
# Attempt to deregister as a consumer
bcm.CertManager.delete_cert(
cert_ref=self.container_ref,
resource_ref=self.container_ref,
service_name='Magnum'
)
# remove_consumer should be called once with the container_ref
bc.containers.remove_consumer.assert_called_once_with(
container_ref=self.container_ref,
url=self.container_ref,
name='Magnum'
)
@patch('magnum.common.clients.OpenStackClients.barbican')
def test_actually_delete_cert(self, mock_barbican):
# Mock out the client
bc = mock.MagicMock()
bc.containers.get.return_value = self.container
mock_barbican.return_value = bc
# Attempt to store a cert
bcm.CertManager._actually_delete_cert(
cert_ref=self.container_ref
)
# All secrets should be deleted
self.container.certificate.delete.assert_called_once_with()
self.container.private_key.delete.assert_called_once_with()
self.container.intermediates.delete.assert_called_once_with()
self.container.private_key_passphrase.delete.assert_called_once_with()
# Container should be deleted once
self.container.delete.assert_called_once_with()

View File

@ -0,0 +1,40 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import fixture
from magnum.common import cert_manager
from magnum.common.cert_manager import barbican_cert_manager as bcm
from magnum.common.cert_manager import get_backend
from magnum.common.cert_manager import local_cert_manager as lcm
from magnum.tests import base
class TestCertManager(base.BaseTestCase):
def setUp(self):
cert_manager._CERT_MANAGER_PLUGIN = None
super(TestCertManager, self).setUp()
def test_barbican_cert_manager(self):
fixture.Config().config(group='certificates',
cert_manager_type='barbican')
self.assertEqual(get_backend().CertManager,
bcm.CertManager)
def test_local_cert_manager(self):
fixture.Config().config(group='certificates',
cert_manager_type='local')
self.assertEqual(get_backend().CertManager,
lcm.CertManager)

View File

@ -0,0 +1,148 @@
# Copyright 2014 Rackspace US, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from magnum.common.cert_manager import cert_manager
from magnum.common.cert_manager import local_cert_manager
from magnum.tests import base
class TestLocalCert(base.BaseTestCase):
def setUp(self):
self.certificate = "My Certificate"
self.intermediates = "My Intermediates"
self.private_key = "My Private Key"
self.private_key_passphrase = "My Private Key Passphrase"
super(TestLocalCert, self).setUp()
def test_local_cert(self):
# Create a cert
cert = local_cert_manager.Cert(
certificate=self.certificate,
intermediates=self.intermediates,
private_key=self.private_key,
private_key_passphrase=self.private_key_passphrase
)
# Validate the cert functions
self.assertEqual(cert.get_certificate(), self.certificate)
self.assertEqual(cert.get_intermediates(), self.intermediates)
self.assertEqual(cert.get_private_key(), self.private_key)
self.assertEqual(cert.get_private_key_passphrase(),
self.private_key_passphrase)
class TestLocalManager(base.BaseTestCase):
def setUp(self):
self.certificate = "My Certificate"
self.intermediates = "My Intermediates"
self.private_key = "My Private Key"
self.private_key_passphrase = "My Private Key Passphrase"
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="certificates", storage_path="/tmp/")
super(TestLocalManager, self).setUp()
def _store_cert(self):
file_mock = mock.mock_open()
# Attempt to store the cert
with mock.patch('__builtin__.open', file_mock, create=True):
cert_id = local_cert_manager.CertManager.store_cert(
certificate=self.certificate,
intermediates=self.intermediates,
private_key=self.private_key,
private_key_passphrase=self.private_key_passphrase
)
# Check that something came back
self.assertIsNotNone(cert_id)
# Verify the correct files were opened
file_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'w')
], any_order=True)
# Verify the writes were made
file_mock().write.assert_has_calls([
mock.call(self.certificate),
mock.call(self.intermediates),
mock.call(self.private_key),
mock.call(self.private_key_passphrase)
], any_order=True)
return cert_id
def _get_cert(self, cert_id):
file_mock = mock.mock_open()
# Attempt to retrieve the cert
with mock.patch('__builtin__.open', file_mock, create=True):
data = local_cert_manager.CertManager.get_cert(cert_id)
# Verify the correct files were opened
file_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'r')
], any_order=True)
# The returned data should be a Cert object
self.assertIsInstance(data, cert_manager.Cert)
return data
def _delete_cert(self, cert_id):
remove_mock = mock.Mock()
# Delete the cert
with mock.patch('os.remove', remove_mock):
local_cert_manager.CertManager.delete_cert(cert_id)
# Verify the correct files were removed
remove_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)))
], any_order=True)
def test_store_cert(self):
self._store_cert()
def test_get_cert(self):
# Store a cert
cert_id = self._store_cert()
# Get the cert
self._get_cert(cert_id)
def test_delete_cert(self):
# Store a cert
cert_id = self._store_cert()
# Verify the cert exists
self._get_cert(cert_id)
# Delete the cert
self._delete_cert(cert_id)

View File

@ -62,5 +62,9 @@ magnum.template_definitions =
magnum.database.migration_backend =
sqlalchemy = magnum.db.sqlalchemy.migration
magnum.cert_manager.backend =
barbican = magnum.common.cert_manager.barbican_cert_manager
local = magnum.common.cert_manager.local_cert_manager
[wheel]
universal = 1