Merge "Refactor BarbicanAuth to allow for configurable auth method"

This commit is contained in:
Jenkins 2016-01-20 00:49:49 +00:00 committed by Gerrit Code Review
commit 860d7b73b3
20 changed files with 271 additions and 171 deletions

View File

@ -23,7 +23,10 @@ cert_manager_opts = [
cfg.StrOpt('cert_manager_type',
default=CERT_MANAGER_DEFAULT,
help='Certificate Manager plugin. '
'Defaults to {0}.'.format(CERT_MANAGER_DEFAULT))
'Defaults to {0}.'.format(CERT_MANAGER_DEFAULT)),
cfg.StrOpt('barbican_auth',
default='barbican_acl_auth',
help='Name of the Barbican authentication method to use')
]
CONF.register_opts(cert_manager_opts, group='certificates')

View File

@ -0,0 +1,43 @@
# Copyright (c) 2014-2016 Rackspace US, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Barbican ACL auth class for Barbican certificate handling
"""
from barbicanclient import client as barbican_client
from oslo_log import log as logging
from oslo_utils import excutils
from neutron_lbaas._i18n import _LE
from neutron_lbaas.common.cert_manager.barbican_auth import common
from neutron_lbaas.common import keystone
LOG = logging.getLogger(__name__)
class BarbicanACLAuth(common.BarbicanAuth):
_barbican_client = None
@classmethod
def get_barbican_client(cls, project_id=None):
if not cls._barbican_client:
try:
cls._barbican_client = barbican_client.Client(
session=keystone.get_session()
)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error creating Barbican client"))
return cls._barbican_client

View File

@ -0,0 +1,28 @@
# Copyright 2014-2016 Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class BarbicanAuth(object):
@abc.abstractmethod
def get_barbican_client(self, project_id):
"""Creates a Barbican client object.
:param project_id: Project ID that the request will be used for
:return: a Barbican Client object
:raises Exception: if the client cannot be created
"""

View File

@ -13,14 +13,13 @@
# under the License.
from barbicanclient import client as barbican_client
from neutron.plugins.common import constants
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from stevedore import driver as stevedore_driver
from neutron_lbaas._i18n import _LI, _LW, _LE
from neutron_lbaas.common.cert_manager import cert_manager
from neutron_lbaas.common import keystone
LOG = logging.getLogger(__name__)
@ -29,6 +28,7 @@ CONF = cfg.CONF
class Cert(cert_manager.Cert):
"""Representation of a Cert based on the Barbican CertificateContainer."""
def __init__(self, cert_container):
if not isinstance(cert_container,
barbican_client.containers.CertificateContainer):
@ -57,39 +57,20 @@ class Cert(cert_manager.Cert):
return self._cert_container.private_key_passphrase.payload
class BarbicanKeystoneAuth(object):
_keystone_session = None
_barbican_client = None
@classmethod
def get_barbican_client(cls):
"""Creates a Barbican client object.
:returns: a Barbican Client object
:raises Exception: if the client cannot be created
"""
if not cls._barbican_client:
try:
cls._keystone_session = keystone.get_session()
cls._barbican_client = barbican_client.Client(
session=cls._keystone_session
)
except Exception:
# Barbican (because of Keystone-middleware) sometimes masks
# exceptions strangely -- this will reraise the original
# exception, while also providiung useful feedback in the
# logs for debugging
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error creating Barbican client"))
return cls._barbican_client
class CertManager(cert_manager.CertManager):
"""Certificate Manager that wraps the Barbican client API."""
@staticmethod
def store_cert(certificate, private_key, intermediates=None,
private_key_passphrase=None, expiration=None,
name='Octavia TLS Cert', **kwargs):
def __init__(self):
super(CertManager, self).__init__()
self.auth = stevedore_driver.DriverManager(
namespace='neutron_lbaas.cert_manager.barbican_auth',
name=cfg.CONF.certificates.barbican_auth,
invoke_on_load=True,
).driver
def store_cert(self, project_id, certificate, private_key,
intermediates=None, private_key_passphrase=None,
expiration=None, name='LBaaS TLS Cert'):
"""Stores a certificate in the certificate manager.
:param certificate: PEM encoded TLS certificate
@ -102,7 +83,8 @@ class CertManager(cert_manager.CertManager):
:returns: the container_ref of the stored cert
:raises Exception: if certificate storage fails
"""
connection = BarbicanKeystoneAuth.get_barbican_client()
connection = self.auth.get_barbican_client(project_id)
LOG.info(_LI(
"Storing certificate container '{0}' in Barbican."
@ -152,7 +134,7 @@ class CertManager(cert_manager.CertManager):
# feedback in the logs for debugging
except Exception:
for secret in [certificate_secret, private_key_secret,
intermediates_secret, pkp_secret]:
intermediates_secret, pkp_secret]:
if secret and secret.secret_ref:
old_ref = secret.secret_ref
try:
@ -168,22 +150,20 @@ class CertManager(cert_manager.CertManager):
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error storing certificate data"))
@staticmethod
def get_cert(cert_ref, service_name='lbaas',
lb_id=None,
check_only=False, **kwargs):
def get_cert(self, project_id, cert_ref, resource_ref,
check_only=False, service_name='lbaas'):
"""Retrieves the specified cert and registers as a consumer.
:param cert_ref: the UUID of the cert to retrieve
:param service_name: Friendly name for the consuming service
:param lb_id: Loadbalancer id for building resource consumer URL
:param resource_ref: Full HATEOAS reference to the consuming resource
:param check_only: Read Certificate data without registering
:param service_name: Friendly name for the consuming service
:returns: octavia.certificates.common.Cert representation of the
certificate data
:raises Exception: if certificate retrieval fails
"""
connection = BarbicanKeystoneAuth.get_barbican_client()
connection = self.auth.get_barbican_client(project_id)
LOG.info(_LI(
"Loading certificate container {0} from Barbican."
@ -197,15 +177,15 @@ class CertManager(cert_manager.CertManager):
cert_container = connection.containers.register_consumer(
container_ref=cert_ref,
name=service_name,
url=CertManager._get_service_url(lb_id)
url=resource_ref
)
return Cert(cert_container)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error getting {0}").format(cert_ref))
@staticmethod
def delete_cert(cert_ref, lb_id, service_name='lbaas', **kwargs):
def delete_cert(self, project_id, cert_ref, resource_ref,
service_name='lbaas'):
"""Deregister as a consumer for the specified cert.
:param cert_ref: the UUID of the cert to retrieve
@ -214,7 +194,7 @@ class CertManager(cert_manager.CertManager):
:raises Exception: if deregistration fails
"""
connection = BarbicanKeystoneAuth.get_barbican_client()
connection = self.auth.get_barbican_client(project_id)
LOG.info(_LI(
"Deregistering as a consumer of {0} in Barbican."
@ -223,46 +203,10 @@ class CertManager(cert_manager.CertManager):
connection.containers.remove_consumer(
container_ref=cert_ref,
name=service_name,
url=CertManager._get_service_url(lb_id)
url=resource_ref
)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE(
"Error deregistering as a consumer of {0}"
).format(cert_ref))
@staticmethod
def _actually_delete_cert(cert_ref):
"""Deletes the specified cert. Very dangerous. Do not recommend.
:param cert_ref: the UUID of the cert to delete
:raises Exception: if certificate deletion fails
"""
connection = BarbicanKeystoneAuth.get_barbican_client()
LOG.info(_LI(
"Recursively deleting certificate container {0} from Barbican."
).format(cert_ref))
try:
certificate_container = connection.containers.get(cert_ref)
certificate_container.certificate.delete()
if certificate_container.intermediates:
certificate_container.intermediates.delete()
if certificate_container.private_key_passphrase:
certificate_container.private_key_passphrase.delete()
certificate_container.private_key.delete()
certificate_container.delete()
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE(
"Error recursively deleting certificate container {0}"
).format(cert_ref))
@staticmethod
def _get_service_url(lb_id):
# Format: <servicename>://<region>/<resource>/<object_id>
return "{0}://{1}/{2}/{3}".format(
cfg.CONF.service_auth.service_name,
cfg.CONF.service_auth.region,
constants.LOADBALANCER,
lb_id)

View File

@ -17,6 +17,7 @@ Certificate manager API
"""
import abc
from oslo_config import cfg
import six
@ -53,8 +54,9 @@ class CertManager(object):
"""
@abc.abstractmethod
def store_cert(self, certificate, private_key, intermediates=None,
private_key_passphrase=None, **kwargs):
def store_cert(self, project_id, certificate, private_key,
intermediates=None, private_key_passphrase=None,
expiration=None, name=None):
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert and returns its UUID that
@ -65,8 +67,8 @@ class CertManager(object):
pass
@abc.abstractmethod
def get_cert(self, cert_ref, check_only=False,
resource_ref=None, **kwargs):
def get_cert(self, project_id, cert_ref, resource_ref,
check_only=False, service_name=None):
"""Retrieves the specified cert.
If check_only is True, don't perform any sort of registration.
@ -76,10 +78,21 @@ class CertManager(object):
pass
@abc.abstractmethod
def delete_cert(self, cert_ref, resource_ref, **kwargs):
def delete_cert(self, project_id, cert_ref, resource_ref,
service_name=None):
"""Deletes the specified cert.
If the specified cert does not exist, a CertificateStorageException
should be raised.
"""
pass
@classmethod
def get_service_url(cls, loadbalancer_id):
# Format: <servicename>://<region>/<resource>/<object_id>
return "{0}://{1}/{2}/{3}".format(
cfg.CONF.service_auth.service_name,
cfg.CONF.service_auth.region,
"loadbalancer",
loadbalancer_id
)

View File

@ -42,6 +42,7 @@ CONF.register_opts(local_cert_manager_opts, group='certificates')
class Cert(cert_manager.Cert):
"""Representation of a Cert for local storage."""
def __init__(self, certificate, private_key, intermediates=None,
private_key_passphrase=None):
self.certificate = certificate
@ -65,14 +66,14 @@ class Cert(cert_manager.Cert):
class CertManager(cert_manager.CertManager):
"""Cert Manager Interface that stores data locally."""
@staticmethod
def store_cert(certificate, private_key, intermediates=None,
private_key_passphrase=None, **kwargs):
def store_cert(self, project_id, certificate, private_key,
intermediates=None, private_key_passphrase=None, **kwargs):
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert to the filesystem and returns
a UUID that can be used to retrieve it.
:param project_id: Project ID for the owner of the certificate
:param certificate: PEM encoded TLS certificate
:param private_key: private key for the supplied certificate
:param intermediates: ordered and concatenated intermediate certs
@ -111,11 +112,12 @@ class CertManager(cert_manager.CertManager):
return cert_ref
@staticmethod
def get_cert(cert_ref, **kwargs):
def get_cert(self, project_id, cert_ref, resource_ref, **kwargs):
"""Retrieves the specified cert.
:param project_id: Project ID for the owner of the certificate
:param cert_ref: the UUID of the cert to retrieve
:param resource_ref: Full HATEOAS reference to the consuming resource
:returns: neutron_lbaas.common.cert_manager.cert_manager.Cert
representation of the certificate data
@ -169,11 +171,12 @@ class CertManager(cert_manager.CertManager):
return Cert(**cert_data)
@staticmethod
def delete_cert(cert_ref, **kwargs):
def delete_cert(self, project_id, cert_ref, resource_ref, **kwargs):
"""Deletes the specified cert.
:param project_id: Project ID for the owner of the certificate
:param cert_ref: the UUID of the cert to delete
:param resource_ref: Full HATEOAS reference to the consuming resource
:raises CertificateStorageException: if certificate deletion fails
"""

View File

@ -334,9 +334,14 @@ class RadwareLBaaSV2Driver(base_v2_driver.RadwareLBaaSBaseV2Driver):
listener_dict[prop] = getattr(
listener, prop, PROPERTY_DEFAULTS.get(prop))
cert_mgr = CERT_MANAGER_PLUGIN.CertManager()
if listener.default_tls_container_id:
default_cert = CERT_MANAGER_PLUGIN.CertManager.get_cert(
listener.default_tls_container_id,
default_cert = cert_mgr.get_cert(
project_id=listener.tenant_id,
cert_ref=listener.default_tls_container_id,
resource_ref=cert_mgr.get_service_url(
listener.loadbalancer_id),
service_name='Neutron LBaaS v2 Radware provider')
cert_dict = {
'id': listener.default_tls_container_id,
@ -349,8 +354,11 @@ class RadwareLBaaSV2Driver(base_v2_driver.RadwareLBaaSBaseV2Driver):
if listener.sni_containers:
listener_dict['sni_tls_certificates'] = []
for sni_container in listener.sni_containers:
sni_cert = CERT_MANAGER_PLUGIN.CertManager.get_cert(
sni_container.tls_container_id,
sni_cert = cert_mgr.get_cert(
project_id=listener.tenant_id,
cert_ref=sni_container.tls_container_id,
resource_ref=cert_mgr.get_service_url(
listener.loadbalancer_id),
service_name='Neutron LBaaS v2 Radware provider')
listener_dict['sni_tls_certificates'].append(
{'id': sni_container.tls_container_id,

View File

@ -70,8 +70,12 @@ class EdgeListenerManager(driver_base.BaseListenerManager,
if listener.default_tls_container_id:
cert_backend = cert_manager.get_backend()
if cert_backend:
return cert_backend.CertManager.get_cert(
listener.default_tls_container_id)
return cert_backend.CertManager().get_cert(
project_id=listener.tenant_id,
cert_ref=listener.default_tls_container_id,
resource_ref=cert_backend.CertManager.get_service_url(
listener.loadbalancer_id)
)
@call_log
def create(self, context, listener):

View File

@ -155,13 +155,25 @@ def _process_tls_certificates(listener):
if listener.default_tls_container_id:
tls_cert = _map_cert_tls_container(
cert_mgr.get_cert(
listener.default_tls_container_id,
check_only=True))
project_id=listener.tenant_id,
cert_ref=listener.default_tls_container_id,
resource_ref=cert_mgr.get_service_url(
listener.loadbalancer_id),
check_only=True
)
)
if listener.sni_containers:
# Retrieve, map and store SNI certificates
for sni_cont in listener.sni_containers:
cert_container = _map_cert_tls_container(
cert_mgr.get_cert(sni_cont.tls_container_id, check_only=True))
cert_mgr.get_cert(
project_id=listener.tenant_id,
cert_ref=sni_cont.tls_container_id,
resource_ref=cert_mgr.get_service_url(
listener.loadbalancer_id),
check_only=True
)
)
sni_certs.append(cert_container)
return {'tls_cert': tls_cert, 'sni_certs': sni_certs}

View File

@ -621,18 +621,20 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
def _validate_tls(self, listener, curr_listener=None):
def validate_tls_container(container_ref):
cert_container = None
lb_id = None
cert_mgr = CERT_MANAGER_PLUGIN.CertManager()
if curr_listener:
lb_id = curr_listener['loadbalancer_id']
tenant_id = curr_listener['tenant_id']
else:
lb_id = listener.get('loadbalancer_id')
tenant_id = listener.get('tenant_id')
try:
cert_container = CERT_MANAGER_PLUGIN.CertManager.get_cert(
container_ref,
lb_id=lb_id)
cert_container = cert_mgr.get_cert(
project_id=tenant_id,
cert_ref=container_ref,
resource_ref=cert_mgr.get_service_url(lb_id))
except Exception as e:
if hasattr(e, 'status_code') and e.status_code == 404:
raise loadbalancerv2.TLSContainerNotFound(
@ -651,8 +653,10 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
cert_container.get_private_key_passphrase()),
intermediates=cert_container.get_intermediates())
except Exception as e:
CERT_MANAGER_PLUGIN.CertManager.delete_cert(
container_ref, lb_id)
cert_mgr.delete_cert(
project_id=tenant_id,
cert_ref=container_ref,
resource_ref=cert_mgr.get_service_url(lb_id))
raise loadbalancerv2.TLSContainerInvalid(
container_id=container_ref, reason=str(e))

View File

@ -0,0 +1,53 @@
# Copyright 2014-2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from barbicanclient import client as barbican_client
import mock
from neutron_lbaas.common.cert_manager.barbican_auth import barbican_acl
from neutron_lbaas.common.cert_manager import barbican_cert_manager
from neutron_lbaas.common import keystone
import neutron_lbaas.tests.base as base
class TestBarbicanACLAuth(base.BaseTestCase):
def setUp(self):
# Reset the client
keystone._SESSION = None
super(TestBarbicanACLAuth, self).setUp()
def test_get_barbican_client(self):
# There should be no existing client
self.assertIsNone(keystone._SESSION)
# Mock out the keystone session and get the client
keystone._SESSION = mock.MagicMock()
acl_auth_object = barbican_acl.BarbicanACLAuth()
bc1 = acl_auth_object.get_barbican_client()
# Our returned client should be an instance of barbican_client.Client
self.assertIsInstance(
bc1,
barbican_client.Client
)
# Getting the session again with new class should get the same object
acl_auth_object2 = barbican_acl.BarbicanACLAuth()
bc2 = acl_auth_object2.get_barbican_client()
self.assertIs(bc1, bc2)
def test_load_auth_driver(self):
bcm = barbican_cert_manager.CertManager()
self.assertTrue(isinstance(bcm.auth, barbican_acl.BarbicanACLAuth))

View File

@ -16,57 +16,10 @@ from barbicanclient import client as barbican_client
import mock
import neutron_lbaas.common.cert_manager.barbican_cert_manager as bbq_common
from neutron_lbaas.common import keystone
import neutron_lbaas.tests.base as base
from oslo_config import cfg
class TestBarbicanAuth(base.BaseTestCase):
def setUp(self):
# Reset the client
bbq_common.BarbicanKeystoneAuth._barbican_client = None
keystone._SESSION = None
super(TestBarbicanAuth, self).setUp()
def test_get_barbican_client(self):
# There should be no existing client
self.assertIsNone(keystone._SESSION)
# Mock out the keystone session and get the client
keystone._SESSION = mock.MagicMock()
bc1 = bbq_common.BarbicanKeystoneAuth.get_barbican_client()
# Our returned client should also be the saved client
self.assertIsInstance(
bbq_common.BarbicanKeystoneAuth._barbican_client,
barbican_client.Client
)
self.assertIs(
bbq_common.BarbicanKeystoneAuth._barbican_client,
bc1
)
# Getting the session again should return the same object
bc2 = bbq_common.BarbicanKeystoneAuth.get_barbican_client()
self.assertIs(bc1, bc2)
def test_get_service_url(self):
# Format: <servicename>://<region>/<resource>/<object_id>
cfg.CONF.set_override('service_name',
'lbaas',
'service_auth')
cfg.CONF.set_override('region',
'RegionOne',
'service_auth')
self.assertEqual(
'lbaas://RegionOne/LOADBALANCER/LB-ID',
bbq_common.CertManager._get_service_url('LB-ID'))
class TestBarbicanCert(base.BaseTestCase):
def setUp(self):
# Certificate data
self.certificate = "My Certificate"

View File

@ -16,7 +16,7 @@ from oslo_config import cfg
from neutron_lbaas.common import cert_manager
from neutron_lbaas.common.cert_manager import barbican_cert_manager as bcm
from neutron_lbaas.common.cert_manager import get_backend
from neutron_lbaas.common.cert_manager import cert_manager as cmi
from neutron_lbaas.common.cert_manager import local_cert_manager as lcm
from neutron_lbaas.tests import base
@ -27,12 +27,26 @@ class TestCertManager(base.BaseTestCase):
cert_manager._CERT_MANAGER_PLUGIN = None
super(TestCertManager, self).setUp()
def test_get_service_url(self):
# Format: <servicename>://<region>/<resource>/<object_id>
cfg.CONF.set_override('service_name',
'lbaas',
'service_auth',
enforce_type=True)
cfg.CONF.set_override('region',
'RegionOne',
'service_auth',
enforce_type=True)
self.assertEqual(
'lbaas://RegionOne/loadbalancer/LB-ID',
cmi.CertManager.get_service_url('LB-ID'))
def test_barbican_cert_manager(self):
cfg.CONF.set_override(
'cert_manager_type',
'barbican',
group='certificates')
self.assertEqual(get_backend().CertManager,
self.assertEqual(cert_manager.get_backend().CertManager,
bcm.CertManager)
def test_local_cert_manager(self):
@ -40,5 +54,5 @@ class TestCertManager(base.BaseTestCase):
'cert_manager_type',
'local',
group='certificates')
self.assertEqual(get_backend().CertManager,
self.assertEqual(cert_manager.get_backend().CertManager,
lcm.CertManager)

View File

@ -52,6 +52,7 @@ class TestLocalCert(base.BaseTestCase):
class TestLocalManager(base.BaseTestCase):
def setUp(self):
self.project_id = "12345"
self.certificate = "My Certificate"
self.intermediates = "My Intermediates"
self.private_key = "My Private Key"
@ -60,13 +61,16 @@ class TestLocalManager(base.BaseTestCase):
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="certificates", storage_path="/tmp/")
self.cert_manager = local_cert_manager.CertManager()
super(TestLocalManager, self).setUp()
def _store_cert(self):
file_mock = mock.mock_open()
# Attempt to store the cert
with mock.patch('six.moves.builtins.open', file_mock, create=True):
cert_id = local_cert_manager.CertManager.store_cert(
cert_id = self.cert_manager.store_cert(
project_id=self.project_id,
certificate=self.certificate,
intermediates=self.intermediates,
private_key=self.private_key,
@ -98,7 +102,11 @@ class TestLocalManager(base.BaseTestCase):
file_mock = mock.mock_open()
# Attempt to retrieve the cert
with mock.patch('six.moves.builtins.open', file_mock, create=True):
data = local_cert_manager.CertManager.get_cert(cert_id)
data = self.cert_manager.get_cert(
project_id=self.project_id,
cert_ref=cert_id,
resource_ref=None
)
# Verify the correct files were opened
file_mock.assert_has_calls([
@ -117,7 +125,11 @@ class TestLocalManager(base.BaseTestCase):
remove_mock = mock.Mock()
# Delete the cert
with mock.patch('os.remove', remove_mock):
local_cert_manager.CertManager.delete_cert(cert_id)
self.cert_manager.delete_cert(
project_id=self.project_id,
cert_ref=cert_id,
resource_ref=None
)
# Verify the correct files were removed
remove_mock.assert_has_calls([

View File

@ -962,8 +962,10 @@ class LbaasListenerTests(ListenerTestBase):
context.get_admin_context(),
{'listener': listener_data})
rm_consumer_mock.assert_called_once_with(
listener_data['default_tls_container_ref'],
self.lb_id)
cert_ref=listener_data['default_tls_container_ref'],
project_id=self._tenant_id,
resource_ref=cert_manager.CertManager.get_service_url(
self.lb_id))
def test_create_listener_with_tls(self, **extras):
default_tls_container_ref = uuidutils.generate_uuid()

View File

@ -556,7 +556,7 @@ class TestLBaaSDriver(TestLBaaSDriverBase):
cert_mock.get_private_key.return_value = 'private_key'
cert_mock.get_private_key_passphrase.return_value = \
'private_key_passphrase'
cert_manager_mock.get_cert.return_value = cert_mock
cert_manager_mock().get_cert.return_value = cert_mock
cert_parser_mock.validate_cert.return_value = True
with self.listener(

View File

@ -146,12 +146,13 @@ def sample_listener_tuple(proto=None, monitor=True, persistence=True,
proto = 'HTTP' if proto is None else proto
port = '443' if proto is 'HTTPS' or proto is 'TERMINATED_HTTPS' else '80'
in_listener = collections.namedtuple(
'listener', 'id, protocol_port, protocol, default_pool, '
'listener', 'id, tenant_id, protocol_port, protocol, default_pool, '
'connection_limit, default_tls_container_id, '
'sni_container_ids, default_tls_container, '
'sni_containers')
'sni_containers, loadbalancer_id')
return in_listener(
id='sample_listener_id_1',
tenant_id='sample_tenant_id',
protocol_port=port,
protocol=proto,
default_pool=sample_pool_tuple(
@ -181,7 +182,8 @@ def sample_listener_tuple(proto=None, monitor=True, persistence=True,
private_key='--imakey3--\n', intermediates=[
'--imainter3--\n', '--imainter3too--\n'],
primary_cn='fakeCN2'))]
if sni else []
if sni else [],
loadbalancer_id='sample_loadbalancer_id_1'
)

View File

@ -54,6 +54,8 @@ oslo.config.opts =
neutron.lbaas = neutron_lbaas.opts:list_opts
neutron.lbaas.agent = neutron_lbaas.opts:list_agent_opts
neutron.lbaas.service = neutron_lbaas.opts:list_service_opts
neutron_lbaas.cert_manager.barbican_auth =
barbican_acl_auth = neutron_lbaas.common.cert_manager.barbican_auth.barbican_acl:BarbicanACLAuth
[build_sphinx]
all_files = 1