Refactor BarbicanAuth to allow for configurable auth method

There is now a new configuration option "barbican_auth" in the
certificates section, to specify which auth plugin to use when
communicating with Barbican. This is because the default option (using
ACLs inside Barbican to control access) should be ok as a default
workflow, but it might be required to use other methods depending on
your deployment. For example, another possible auth method would be
BarbicanTrustAuth, utilizing Keystone Trusts.

Some deployers may need custom auth methods that do not exist in
upstream Keystone, and will need their own Auth plugin. This should be in line
with the way Octavia's network and compute drivers work already.

While we're in this file, prune the unused (and really bad) method that
would *actually* delete certs from Barbican (not in our scope).
Also do the tenant_id -> project_id rename.

Change-Id: Ic9aef68924bb5c216734afd25403e59476c576e7
This commit is contained in:
Adam Harwell 2015-08-24 03:07:18 -05:00
parent 969f811bd4
commit 52351a5698
18 changed files with 225 additions and 181 deletions

View File

@ -72,6 +72,9 @@
# Certificate Manager options are local_cert_manager
# barbican_cert_manager
# cert_manager=local_cert_manager
# For Barbican authentication (if using any Barbican based cert class)
# barbican_auth = barbican_acl_auth
[anchor]
# Use OpenStack anchor to sign the amphora REST API certificates

View File

@ -0,0 +1,44 @@
# Copyright (c) 2014 Rackspace US, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Barbican ACL auth class for Barbican certificate handling
"""
from barbicanclient import client as barbican_client
from oslo_log import log as logging
from oslo_utils import excutils
from octavia.certificates.common import barbican as barbican_common
from octavia.common import keystone
from octavia.i18n import _LE
LOG = logging.getLogger(__name__)
class BarbicanACLAuth(barbican_common.BarbicanAuth):
_barbican_client = None
@classmethod
def get_barbican_client(cls, project_id=None):
if not cls._barbican_client:
try:
cls._barbican_client = barbican_client.Client(
session=keystone.get_session()
)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error creating Barbican client"))
return cls._barbican_client

View File

@ -17,13 +17,14 @@
Common classes for Barbican certificate handling
"""
import abc
from barbicanclient import client as barbican_client
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import six
from octavia.certificates.common import cert
from octavia.common import keystone
from octavia.i18n import _LE
@ -59,22 +60,13 @@ class BarbicanCert(cert.Cert):
return self._cert_container.private_key_passphrase.payload
@six.add_metaclass(abc.ABCMeta)
class BarbicanAuth(object):
_barbican_client = None
@classmethod
def get_barbican_client(cls):
@abc.abstractmethod
def get_barbican_client(self, project_id):
"""Creates a Barbican client object.
:param project_id: Project ID that the request will be used for
:return: a Barbican Client object
:raises Exception: if the client cannot be created
"""
if not cls._barbican_client:
try:
cls._barbican_client = barbican_client.Client(
session=keystone.get_session()
)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error creating Barbican client"))
return cls._barbican_client

View File

@ -19,9 +19,10 @@ Cert generator implementation for Barbican
import random
import time
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver as stevedore_driver
from octavia.certificates.common import barbican as barbican_common
from octavia.certificates.generator import cert_gen
@ -33,8 +34,15 @@ MAX_ATTEMPTS = 10
class BarbicanCertGenerator(cert_gen.CertGenerator):
"""Certificate Generator that wraps the Barbican client API."""
@classmethod
def sign_cert(cls, csr, validity):
def __init__(self):
super(BarbicanCertGenerator, self).__init__()
self.auth = stevedore_driver.DriverManager(
namespace='octavia.barbican_auth',
name=cfg.CONF.certificates.barbican_auth,
invoke_on_load=True,
).driver
def sign_cert(self, csr, validity):
"""Signs a certificate using our private CA based on the specified CSR.
:param csr: A Certificate Signing Request
@ -45,8 +53,7 @@ class BarbicanCertGenerator(cert_gen.CertGenerator):
"""
raise NotImplementedError("Barbican does not yet support signing.")
@classmethod
def _generate_private_key(cls, bit_length=2048, passphrase=None,
def _generate_private_key(self, bit_length=2048, passphrase=None,
create_only=False):
"""Generates a private key
@ -56,7 +63,8 @@ class BarbicanCertGenerator(cert_gen.CertGenerator):
:return: PEM encoded private key
:raises Exception: If private key generation fails
"""
connection = barbican_common.BarbicanAuth.get_barbican_client()
connection = self.auth.get_barbican_client(
cfg.CONF.keystone_authtoken.admin_user)
order = connection.orders.create_asymmetric(
bit_length=bit_length,
algorithm='rsa',
@ -92,20 +100,18 @@ class BarbicanCertGenerator(cert_gen.CertGenerator):
return secret.secret_ref
@classmethod
def _sign_cert_from_stored_key(cls, cn, pk_ref, validity):
def _sign_cert_from_stored_key(self, cn, pk_ref, validity):
raise NotImplementedError("Barbican does not yet support signing.")
@classmethod
def generate_cert_key_pair(cls, cn, validity, bit_length=2048,
def generate_cert_key_pair(self, cn, validity, bit_length=2048,
passphrase=None):
# This code will essentially work once Barbican enables CertOrders
# pk = cls._generate_private_key(
# pk = self._generate_private_key(
# bit_length=bit_length,
# passphrase=passphrase,
# create_only=True
# )
# cert_container = cls._sign_cert_from_stored_key(cn=cn, pk_ref=pk,
# cert_container = self._sign_cert_from_stored_key(cn=cn, pk_ref=pk,
# validity=validity)
# return barbican_common.BarbicanCert(cert_container)
raise NotImplementedError("Barbican does not yet support signing.")

View File

@ -19,6 +19,7 @@ Cert manager implementation for Barbican
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from stevedore import driver as stevedore_driver
from octavia.certificates.common import barbican as barbican_common
from octavia.certificates.manager import cert_mgr
@ -27,15 +28,21 @@ from octavia.i18n import _LE, _LI, _LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class BarbicanCertManager(cert_mgr.CertManager):
"""Certificate Manager that wraps the Barbican client API."""
@staticmethod
def store_cert(certificate, private_key, intermediates=None,
private_key_passphrase=None, expiration=None,
name='Octavia TLS Cert'):
def __init__(self):
super(BarbicanCertManager, self).__init__()
self.auth = stevedore_driver.DriverManager(
namespace='octavia.barbican_auth',
name=cfg.CONF.certificates.barbican_auth,
invoke_on_load=True,
).driver
def store_cert(self, project_id, certificate, private_key,
intermediates=None, private_key_passphrase=None,
expiration=None, name='Octavia TLS Cert'):
"""Stores a certificate in the certificate manager.
:param certificate: PEM encoded TLS certificate
@ -48,7 +55,7 @@ class BarbicanCertManager(cert_mgr.CertManager):
:returns: the container_ref of the stored cert
:raises Exception: if certificate storage fails
"""
connection = barbican_common.BarbicanAuth.get_barbican_client()
connection = self.auth.get_barbican_client(project_id)
LOG.info(_LI(
"Storing certificate container '{0}' in Barbican."
@ -112,9 +119,8 @@ class BarbicanCertManager(cert_mgr.CertManager):
"Error storing certificate data: {0}"
).format(str(e)))
@staticmethod
def get_cert(cert_ref, resource_ref=None, check_only=False,
service_name='Octavia'):
def get_cert(self, project_id, cert_ref, resource_ref=None,
check_only=False, service_name='Octavia'):
"""Retrieves the specified cert and registers as a consumer.
:param cert_ref: the UUID of the cert to retrieve
@ -126,7 +132,7 @@ class BarbicanCertManager(cert_mgr.CertManager):
certificate data
:raises Exception: if certificate retrieval fails
"""
connection = barbican_common.BarbicanAuth.get_barbican_client()
connection = self.auth.get_barbican_client(project_id)
LOG.info(_LI(
"Loading certificate container {0} from Barbican."
@ -149,8 +155,8 @@ class BarbicanCertManager(cert_mgr.CertManager):
"Error getting {0}: {1}"
).format(cert_ref, str(e)))
@staticmethod
def delete_cert(cert_ref, resource_ref=None, service_name='Octavia'):
def delete_cert(self, project_id, cert_ref, resource_ref=None,
service_name='Octavia'):
"""Deregister as a consumer for the specified cert.
:param cert_ref: the UUID of the cert to retrieve
@ -159,7 +165,7 @@ class BarbicanCertManager(cert_mgr.CertManager):
:raises Exception: if deregistration fails
"""
connection = barbican_common.BarbicanAuth.get_barbican_client()
connection = self.auth.get_barbican_client(project_id)
LOG.info(_LI(
"Deregistering as a consumer of {0} in Barbican."
@ -175,30 +181,3 @@ class BarbicanCertManager(cert_mgr.CertManager):
LOG.error(_LE(
"Error deregistering as a consumer of {0}: {1}"
).format(cert_ref, str(e)))
@staticmethod
def _actually_delete_cert(cert_ref):
"""Deletes the specified cert. Very dangerous. Do not recommend.
:param cert_ref: the UUID of the cert to delete
:raises Exception: if certificate deletion fails
"""
connection = barbican_common.BarbicanAuth.get_barbican_client()
LOG.info(_LI(
"Recursively deleting certificate container {0} from Barbican."
).format(cert_ref))
try:
certificate_container = connection.containers.get(cert_ref)
certificate_container.certificate.delete()
if certificate_container.intermediates:
certificate_container.intermediates.delete()
if certificate_container.private_key_passphrase:
certificate_container.private_key_passphrase.delete()
certificate_container.private_key.delete()
certificate_container.delete()
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE(
"Error recursively deleting container {0}: {1}"
).format(cert_ref, str(e)))

View File

@ -29,9 +29,9 @@ class CertManager(object):
"""
@abc.abstractmethod
def store_cert(self, certificate, private_key, intermediates=None,
private_key_passphrase=None, expiration=None,
name=None):
def store_cert(self, project_id, certificate, private_key,
intermediates=None, private_key_passphrase=None,
expiration=None, name=None):
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert and returns its UUID that
@ -42,8 +42,8 @@ class CertManager(object):
pass
@abc.abstractmethod
def get_cert(self, cert_ref, resource_ref=None, check_only=False,
service_name=None):
def get_cert(self, project_id, cert_ref, resource_ref=None,
check_only=False, service_name=None):
"""Retrieves the specified cert.
If check_only is True, don't perform any sort of registration.
@ -53,7 +53,8 @@ class CertManager(object):
pass
@abc.abstractmethod
def delete_cert(self, cert_ref, resource_ref, service_name=None):
def delete_cert(self, project_id, cert_ref, resource_ref,
service_name=None):
"""Deletes the specified cert.
If the specified cert does not exist, a CertificateStorageException

View File

@ -32,13 +32,14 @@ class LocalCertManager(cert_mgr.CertManager):
"""Cert Manager Interface that stores data locally."""
@staticmethod
def store_cert(certificate, private_key, intermediates=None,
def store_cert(project_id, certificate, private_key, intermediates=None,
private_key_passphrase=None, **kwargs):
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert to the filesystem and returns
a UUID that can be used to retrieve it.
:param project_id: Ignored in this implementation
:param certificate: PEM encoded TLS certificate
:param private_key: private key for the supplied certificate
:param intermediates: ordered and concatenated intermediate certs
@ -54,22 +55,21 @@ class LocalCertManager(cert_mgr.CertManager):
"Storing certificate data on the local filesystem."
))
try:
filename_certificate = "{0}.crt".format(filename_base, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
with open(filename_certificate, 'w') as cert_file:
cert_file.write(certificate)
filename_private_key = "{0}.key".format(filename_base, cert_ref)
filename_private_key = "{0}.key".format(filename_base)
with open(filename_private_key, 'w') as key_file:
key_file.write(private_key)
if intermediates:
filename_intermediates = "{0}.int".format(filename_base,
cert_ref)
filename_intermediates = "{0}.int".format(filename_base)
with open(filename_intermediates, 'w') as int_file:
int_file.write(intermediates)
if private_key_passphrase:
filename_pkp = "{0}.pass".format(filename_base, cert_ref)
filename_pkp = "{0}.pass".format(filename_base)
with open(filename_pkp, 'w') as pass_file:
pass_file.write(private_key_passphrase)
except IOError as ioe:
@ -79,9 +79,10 @@ class LocalCertManager(cert_mgr.CertManager):
return cert_ref
@staticmethod
def get_cert(cert_ref, **kwargs):
def get_cert(project_id, cert_ref, **kwargs):
"""Retrieves the specified cert.
:param project_id: Ignored in this implementation
:param cert_ref: the UUID of the cert to retrieve
:return: octavia.certificates.common.Cert representation of the
@ -93,10 +94,10 @@ class LocalCertManager(cert_mgr.CertManager):
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
filename_certificate = "{0}.crt".format(filename_base, cert_ref)
filename_private_key = "{0}.key".format(filename_base, cert_ref)
filename_intermediates = "{0}.int".format(filename_base, cert_ref)
filename_pkp = "{0}.pass".format(filename_base, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
filename_private_key = "{0}.key".format(filename_base)
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
cert_data = dict()
@ -132,9 +133,10 @@ class LocalCertManager(cert_mgr.CertManager):
return local_common.LocalCert(**cert_data)
@staticmethod
def delete_cert(cert_ref, **kwargs):
def delete_cert(project_id, cert_ref, **kwargs):
"""Deletes the specified cert.
:param project_id: Ignored in this implementation
:param cert_ref: the UUID of the cert to delete
:raises CertificateStorageException: if certificate deletion fails
@ -144,10 +146,10 @@ class LocalCertManager(cert_mgr.CertManager):
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
filename_certificate = "{0}.crt".format(filename_base, cert_ref)
filename_private_key = "{0}.key".format(filename_base, cert_ref)
filename_intermediates = "{0}.int".format(filename_base, cert_ref)
filename_pkp = "{0}.pass".format(filename_base, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
filename_private_key = "{0}.key".format(filename_base)
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
try:
os.remove(filename_certificate)

View File

@ -269,6 +269,9 @@ certificate_opts = [
cfg.StrOpt('cert_generator',
default='local_cert_generator',
help='Name of the cert generator to use'),
cfg.StrOpt('barbican_auth',
default='barbican_acl_auth',
help='Name of the Barbican authentication method to use')
]
house_keeping_opts = [

View File

@ -188,12 +188,14 @@ def load_certificates_data(cert_mngr, listener):
if listener.tls_certificate_id:
tls_cert = _map_cert_tls_container(
cert_mngr.get_cert(listener.tls_certificate_id,
cert_mngr.get_cert(listener.project_id,
listener.tls_certificate_id,
check_only=True))
if listener.sni_containers:
for sni_cont in listener.sni_containers:
cert_container = _map_cert_tls_container(
cert_mngr.get_cert(sni_cont.tls_container.id,
cert_mngr.get_cert(listener.project_id,
sni_cont.tls_container.id,
check_only=True))
sni_certs.append(cert_container)
return {'tls_cert': tls_cert, 'sni_certs': sni_certs}

View File

@ -0,0 +1,54 @@
# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from barbicanclient import client as barbican_client
import mock
import octavia.certificates.common.auth.barbican_acl as barbican_acl
import octavia.certificates.manager.barbican as barbican_cert_mgr
from octavia.common import keystone
import octavia.tests.unit.base as base
class TestBarbicanACLAuth(base.TestCase):
def setUp(self):
# Reset the client
keystone._SESSION = None
super(TestBarbicanACLAuth, self).setUp()
def test_get_barbican_client(self):
# There should be no existing client
self.assertIsNone(keystone._SESSION)
# Mock out the keystone session and get the client
keystone._SESSION = mock.MagicMock()
acl_auth_object = barbican_acl.BarbicanACLAuth()
bc1 = acl_auth_object.get_barbican_client()
# Our returned client should be an instance of barbican_client.Client
self.assertIsInstance(
bc1,
barbican_client.Client
)
# Getting the session again with new class should get the same object
acl_auth_object2 = barbican_acl.BarbicanACLAuth()
bc2 = acl_auth_object2.get_barbican_client()
self.assertIs(bc1, bc2)
def test_load_auth_driver(self):
bcm = barbican_cert_mgr.BarbicanCertManager()
self.assertTrue(isinstance(bcm.auth, barbican_acl.BarbicanACLAuth))

View File

@ -16,42 +16,9 @@ from barbicanclient import client as barbican_client
import mock
import octavia.certificates.common.barbican as barbican_common
from octavia.common import keystone
import octavia.tests.unit.base as base
class TestBarbicanAuth(base.TestCase):
def setUp(self):
# Reset the client
barbican_common.BarbicanAuth._barbican_client = None
keystone._SESSION = None
super(TestBarbicanAuth, self).setUp()
def test_get_barbican_client(self):
# There should be no existing client
self.assertIsNone(keystone._SESSION)
# Mock out the keystone session and get the client
keystone._SESSION = mock.MagicMock()
bc1 = barbican_common.BarbicanAuth.get_barbican_client()
# Our returned client should also be the saved client
self.assertIsInstance(
barbican_common.BarbicanAuth._barbican_client,
barbican_client.Client
)
self.assertIs(
barbican_common.BarbicanAuth._barbican_client,
bc1
)
# Getting the session again should return the same object
bc2 = barbican_common.BarbicanAuth.get_barbican_client()
self.assertIs(bc1, bc2)
class TestBarbicanCert(base.TestCase):
def setUp(self):

View File

@ -24,6 +24,9 @@ import octavia.certificates.manager.barbican as barbican_cert_mgr
import octavia.tests.unit.base as base
PROJECT_ID = "12345"
class TestBarbicanManager(base.TestCase):
def setUp(self):
@ -57,16 +60,24 @@ class TestBarbicanManager(base.TestCase):
self.secret3 = mock.Mock(spec=secrets.Secret)
self.secret4 = mock.Mock(spec=secrets.Secret)
# Mock out the client
self.bc = mock.Mock()
barbican_auth = mock.Mock(spec=barbican_common.BarbicanAuth)
barbican_auth.get_barbican_client.return_value = self.bc
self.cert_manager = barbican_cert_mgr.BarbicanCertManager()
self.cert_manager.auth = barbican_auth
super(TestBarbicanManager, self).setUp()
def test_store_cert(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.create_certificate.return_value = self.empty_container
barbican_common.BarbicanAuth._barbican_client = bc
self.bc.containers.create_certificate.return_value = (
self.empty_container)
# Attempt to store a cert
barbican_cert_mgr.BarbicanCertManager.store_cert(
container_ref = self.cert_manager.store_cert(
project_id=PROJECT_ID,
certificate=self.certificate,
private_key=self.private_key,
intermediates=self.intermediates,
@ -74,6 +85,8 @@ class TestBarbicanManager(base.TestCase):
name=self.name
)
self.assertEqual(self.empty_container.container_ref, container_ref)
# create_secret should be called four times with our data
calls = [
mock.call(payload=self.certificate, expiration=None,
@ -85,32 +98,32 @@ class TestBarbicanManager(base.TestCase):
mock.call(payload=self.private_key_passphrase, expiration=None,
name=mock.ANY)
]
bc.secrets.create.assert_has_calls(calls, any_order=True)
self.bc.secrets.create.assert_has_calls(calls, any_order=True)
# create_certificate should be called once
self.assertEqual(1, bc.containers.create_certificate.call_count)
self.assertEqual(1, self.bc.containers.create_certificate.call_count)
# Container should be stored once
self.empty_container.store.assert_called_once_with()
def test_store_cert_failure(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.create_certificate.return_value = self.empty_container
self.bc.containers.create_certificate.return_value = (
self.empty_container)
test_secrets = [
self.secret1,
self.secret2,
self.secret3,
self.secret4
]
bc.secrets.create.side_effect = test_secrets
self.bc.secrets.create.side_effect = test_secrets
self.empty_container.store.side_effect = ValueError()
barbican_common.BarbicanAuth._barbican_client = bc
# Attempt to store a cert
self.assertRaises(
ValueError,
barbican_cert_mgr.BarbicanCertManager.store_cert,
self.cert_manager.store_cert,
project_id=PROJECT_ID,
certificate=self.certificate,
private_key=self.private_key,
intermediates=self.intermediates,
@ -129,10 +142,10 @@ class TestBarbicanManager(base.TestCase):
mock.call(payload=self.private_key_passphrase, expiration=None,
name=mock.ANY)
]
bc.secrets.create.assert_has_calls(calls, any_order=True)
self.bc.secrets.create.assert_has_calls(calls, any_order=True)
# create_certificate should be called once
self.assertEqual(1, bc.containers.create_certificate.call_count)
self.assertEqual(1, self.bc.containers.create_certificate.call_count)
# Container should be stored once
self.empty_container.store.assert_called_once_with()
@ -143,19 +156,18 @@ class TestBarbicanManager(base.TestCase):
def test_get_cert(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.register_consumer.return_value = self.container
barbican_common.BarbicanAuth._barbican_client = bc
self.bc.containers.register_consumer.return_value = self.container
# Get the container data
data = barbican_cert_mgr.BarbicanCertManager.get_cert(
data = self.cert_manager.get_cert(
project_id=PROJECT_ID,
cert_ref=self.container_ref,
resource_ref=self.container_ref,
service_name='Octavia'
)
# 'register_consumer' should be called once with the container_ref
bc.containers.register_consumer.assert_called_once_with(
self.bc.containers.register_consumer.assert_called_once_with(
container_ref=self.container_ref,
url=self.container_ref,
name='Octavia'
@ -173,18 +185,16 @@ class TestBarbicanManager(base.TestCase):
self.private_key_passphrase.payload)
def test_get_cert_no_registration(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.get.return_value = self.container
barbican_common.BarbicanAuth._barbican_client = bc
self.bc.containers.get.return_value = self.container
# Get the container data
data = barbican_cert_mgr.BarbicanCertManager.get_cert(
data = self.cert_manager.get_cert(
project_id=PROJECT_ID,
cert_ref=self.container_ref, check_only=True
)
# 'get' should be called once with the container_ref
bc.containers.get.assert_called_once_with(
self.bc.containers.get.assert_called_once_with(
container_ref=self.container_ref
)
@ -200,40 +210,17 @@ class TestBarbicanManager(base.TestCase):
self.private_key_passphrase.payload)
def test_delete_cert(self):
# Mock out the client
bc = mock.MagicMock()
barbican_common.BarbicanAuth._barbican_client = bc
# Attempt to deregister as a consumer
barbican_cert_mgr.BarbicanCertManager.delete_cert(
self.cert_manager.delete_cert(
project_id=PROJECT_ID,
cert_ref=self.container_ref,
resource_ref=self.container_ref,
service_name='Octavia'
)
# remove_consumer should be called once with the container_ref
bc.containers.remove_consumer.assert_called_once_with(
self.bc.containers.remove_consumer.assert_called_once_with(
container_ref=self.container_ref,
url=self.container_ref,
name='Octavia'
)
def test_actually_delete_cert(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.get.return_value = self.container
barbican_common.BarbicanAuth._barbican_client = bc
# Attempt to store a cert
barbican_cert_mgr.BarbicanCertManager._actually_delete_cert(
cert_ref=self.container_ref
)
# All secrets should be deleted
self.container.certificate.delete.assert_called_once_with()
self.container.private_key.delete.assert_called_once_with()
self.container.intermediates.delete.assert_called_once_with()
self.container.private_key_passphrase.delete.assert_called_once_with()
# Container should be deleted once
self.container.delete.assert_called_once_with()

View File

@ -47,6 +47,7 @@ class TestLocalManager(base.TestCase):
# Attempt to store the cert
with mock.patch.object(builtins, 'open', file_mock):
cert_id = local_cert_mgr.LocalCertManager.store_cert(
None,
certificate=self.certificate,
intermediates=self.intermediates,
private_key=self.private_key,
@ -78,7 +79,7 @@ class TestLocalManager(base.TestCase):
file_mock = mock.mock_open()
# Attempt to retrieve the cert
with mock.patch.object(builtins, 'open', file_mock):
data = local_cert_mgr.LocalCertManager.get_cert(cert_id)
data = local_cert_mgr.LocalCertManager.get_cert(None, cert_id)
# Verify the correct files were opened
file_mock.assert_has_calls([
@ -97,7 +98,7 @@ class TestLocalManager(base.TestCase):
remove_mock = mock.Mock()
# Delete the cert
with mock.patch('os.remove', remove_mock):
local_cert_mgr.LocalCertManager.delete_cert(cert_id)
local_cert_mgr.LocalCertManager.delete_cert(None, cert_id)
# Verify the correct files were removed
remove_mock.assert_has_calls([

View File

@ -194,12 +194,13 @@ def sample_listener_tuple(proto=None, monitor=True, persistence=True,
port = '443' if proto is 'HTTPS' or proto is 'TERMINATED_HTTPS' else '80'
peer_port = 1024 if peer_port is None else peer_port
in_listener = collections.namedtuple(
'listener', 'id, protocol_port, protocol, default_pool, '
'listener', 'id, project_id, protocol_port, protocol, default_pool, '
'connection_limit, tls_certificate_id, '
'sni_container_ids, default_tls_container, '
'sni_containers, load_balancer, peer_port')
return in_listener(
id='sample_listener_id_1',
project_id='12345',
protocol_port=port,
protocol=proto,
load_balancer=sample_listener_loadbalancer_tuple(proto=proto,

View File

@ -272,9 +272,9 @@ class TestTLSParseUtils(base.TestCase):
# Ensure upload_cert is called three times
calls_cert_mngr = [
mock.call.get_cert('cont_id_1', check_only=True),
mock.call.get_cert('cont_id_2', check_only=True),
mock.call.get_cert('cont_id_3', check_only=True)
mock.call.get_cert('12345', 'cont_id_1', check_only=True),
mock.call.get_cert('12345', 'cont_id_2', check_only=True),
mock.call.get_cert('12345', 'cont_id_3', check_only=True)
]
client.assert_has_calls(calls_cert_mngr)

View File

@ -64,6 +64,8 @@ octavia.cert_generator =
octavia.cert_manager =
local_cert_manager = octavia.certificates.manager.local:LocalCertManager
barbican_cert_manager = octavia.certificates.manager.barbican:BarbicanCertManager
octavia.barbican_auth =
barbican_acl_auth = octavia.certificates.common.auth.barbican_acl:BarbicanACLAuth
octavia.plugins =
hot_plug_plugin = octavia.controller.worker.controller_worker:ControllerWorker
oslo.config.opts =