Merge "Registering Barbican consumers"

This commit is contained in:
Jenkins 2015-08-30 03:23:50 +00:00 committed by Gerrit Code Review
commit bf04bce4bf
8 changed files with 255 additions and 368 deletions

View File

@ -43,6 +43,16 @@
# quota_health_monitor.
# quota_healthmonitor = -1
[service_auth]
# auth_url = http://127.0.0.1:5000/v2.0
# admin_tenant_name = %SERVICE_TENANT_NAME%
# admin_user = %SERVICE_USER%
# admin_password = %SERVICE_PASSWORD%
# admin_user_domain = %SERVICE_USER_DOMAIN%
# admin_project_domain = %SERVICE_PROJECT_DOMAIN%
# region = %REGION%
# service_name = lbaas
# auth_version = 2
[service_providers]
# Must be in form:

View File

@ -13,20 +13,17 @@
# under the License.
from barbicanclient import client as barbican_client
from keystoneclient import session
from keystoneclient.v2_0 import client as v2_client
from keystoneclient.v3 import client as v3_client
from neutron.i18n import _LI, _LW, _LE
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from neutron_lbaas.common.cert_manager import cert_manager
from neutron_lbaas.common import keystone
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
class Cert(cert_manager.Cert):
@ -63,40 +60,6 @@ class BarbicanKeystoneAuth(object):
_keystone_session = None
_barbican_client = None
@classmethod
def _get_keystone_session(cls):
"""Initializes a Keystone session.
:return: a Keystone Session object
:raises Exception: if the session cannot be established
"""
if not cls._keystone_session:
try:
if CONF.keystone_authtoken.auth_version.lower() == 'v2':
kc = v2_client.Client(
username=CONF.keystone_authtoken.admin_user,
password=CONF.keystone_authtoken.admin_password,
tenant_name=CONF.keystone_authtoken.admin_tenant_name,
auth_url=CONF.keystone_authtoken.auth_uri
)
elif CONF.keystone_authtoken.auth_version.lower() == 'v3':
kc = v3_client.Client(
username=CONF.keystone_authtoken.admin_user,
password=CONF.keystone_authtoken.admin_password,
tenant_name=CONF.keystone_authtoken.admin_tenant_name,
auth_url=CONF.keystone_authtoken.auth_uri
)
else:
raise Exception('Unknown authentication version')
cls._keystone_session = session.Session(auth=kc)
except Exception:
# Keystone sometimes masks exceptions strangely -- this will
# reraise the original exception, while also providing useful
# feedback in the logs for debugging
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error creating Keystone session"))
return cls._keystone_session
@classmethod
def get_barbican_client(cls):
"""Creates a Barbican client object.
@ -106,8 +69,9 @@ class BarbicanKeystoneAuth(object):
"""
if not cls._barbican_client:
try:
cls._keystone_session = keystone.get_session()
cls._barbican_client = barbican_client.Client(
session=cls._get_keystone_session()
session=cls._keystone_session
)
except Exception:
# Barbican (because of Keystone-middleware) sometimes masks
@ -204,7 +168,8 @@ class CertManager(cert_manager.CertManager):
LOG.exception(_LE("Error storing certificate data"))
@staticmethod
def get_cert(cert_ref, service_name='Octavia', resource_ref=None,
def get_cert(cert_ref, service_name='lbaas',
resource_ref=None,
check_only=False, **kwargs):
"""Retrieves the specified cert and registers as a consumer.
@ -239,8 +204,7 @@ class CertManager(cert_manager.CertManager):
LOG.exception(_LE("Error getting {0}").format(cert_ref))
@staticmethod
def delete_cert(cert_ref, service_name='Octavia', resource_ref=None,
**kwargs):
def delete_cert(cert_ref, resource_ref, service_name='lbaas', **kwargs):
"""Deregister as a consumer for the specified cert.
:param cert_ref: the UUID of the cert to retrieve

View File

@ -65,7 +65,8 @@ class CertManager(object):
pass
@abc.abstractmethod
def get_cert(self, cert_ref, check_only=False, **kwargs):
def get_cert(self, cert_ref, check_only=False,
resource_ref=None, **kwargs):
"""Retrieves the specified cert.
If check_only is True, don't perform any sort of registration.
@ -75,7 +76,7 @@ class CertManager(object):
pass
@abc.abstractmethod
def delete_cert(self, cert_ref, **kwargs):
def delete_cert(self, cert_ref, resource_ref, **kwargs):
"""Deletes the specified cert.
If the specified cert does not exist, a CertificateStorageException

View File

@ -0,0 +1,113 @@
# Copyright 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient.auth.identity import v2 as v2_client
from keystoneclient.auth.identity import v3 as v3_client
from keystoneclient import session
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from neutron.i18n import _LE
LOG = logging.getLogger(__name__)
_SESSION = None
OPTS = [
cfg.StrOpt(
'auth_url',
default='http://127.0.0.1:5000/v2.0',
help=_('Authentication endpoint'),
),
cfg.StrOpt(
'admin_user',
default='admin',
help=_('The service admin user name'),
),
cfg.StrOpt(
'admin_tenant_name',
default='admin',
help=_('The service admin tenant name'),
),
cfg.StrOpt(
'admin_password',
default='password',
help=_('The service admin password'),
),
cfg.StrOpt(
'admin_user_domain',
default='admin',
help=_('The admin user domain name'),
),
cfg.StrOpt(
'admin_project_domain',
default='admin',
help=_('The admin project domain name'),
),
cfg.StrOpt(
'region',
default='RegionOne',
help=_('The deployment region'),
),
cfg.StrOpt(
'service_name',
default='lbaas',
help=_('The name of the service'),
),
cfg.StrOpt(
'auth_version',
default='2',
help=_('The auth version used to authenticate'),
)
]
cfg.CONF.register_opts(OPTS, 'service_auth')
def get_session():
"""Initializes a Keystone session.
:return: a Keystone Session object
:raises Exception: if the session cannot be established
"""
global _SESSION
if not _SESSION:
auth_url = cfg.CONF.service_auth.auth_url
kwargs = {'auth_url': auth_url,
'username': cfg.CONF.service_auth.admin_user,
'password': cfg.CONF.service_auth.admin_password}
if cfg.CONF.service_auth.auth_version == '2':
client = v2_client
kwargs['tenant_name'] = cfg.CONF.service_auth.admin_tenant_name
elif cfg.CONF.service_auth.auth_version == '3':
client = v3_client
kwargs['project_name'] = cfg.CONF.service_auth.admin_tenant_name
kwargs['user_domain_name'] = (cfg.CONF.service_auth.
admin_user_domain)
kwargs['project_domain_name'] = (cfg.CONF.service_auth.
admin_project_domain)
else:
raise Exception('Unknown keystone version!')
try:
kc = client.Password(**kwargs)
_SESSION = session.Session(auth=kc)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error creating Keystone session."))
return _SESSION

View File

@ -120,6 +120,10 @@ class TLSContainerInvalid(nexception.NeutronException):
message = _("TLS container %(container_id)s is invalid. %(reason)s")
class CertManagerError(nexception.NeutronException):
message = _("Could not process TLS container %(ref)s, %(reason)s")
RESOURCE_ATTRIBUTE_MAP = {
'loadbalancers': {
'id': {'allow_post': False, 'allow_put': False,

View File

@ -573,10 +573,17 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
cert_container = None
try:
cert_container = CERT_MANAGER_PLUGIN.CertManager.get_cert(
container_ref, check_only=True)
except Exception:
raise loadbalancerv2.TLSContainerNotFound(
container_id=container_ref)
container_ref,
resource_ref=self._get_service_url(listener))
except Exception as e:
if hasattr(e, 'status_code') and e.status_code == 404:
raise loadbalancerv2.TLSContainerNotFound(
container_id=container_ref)
else:
# Could be a keystone configuration error...
raise loadbalancerv2.CertManagerError(
ref=container_ref, reason=e.message
)
try:
cert_parser.validate_cert(
@ -586,6 +593,8 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
cert_container.get_private_key_passphrase()),
intermediates=cert_container.get_intermediates())
except Exception as e:
CERT_MANAGER_PLUGIN.CertManager.delete_cert(
container_ref, self._get_service_url(listener))
raise loadbalancerv2.TLSContainerInvalid(
container_id=container_ref, reason=str(e))
@ -620,6 +629,14 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
return len(to_validate) > 0
def _get_service_url(self, listener):
# Format: <servicename>://<region>/<resource>/<object_id>
return "{0}://{1}/{2}/{3}".format(
cfg.CONF.service_auth.service_name,
cfg.CONF.service_auth.region,
constants.LOADBALANCER,
listener.get('loadbalancer_id'))
def create_listener(self, context, listener):
listener = listener.get('listener')
lb_id = listener.get('loadbalancer_id')

View File

@ -1,4 +1,4 @@
# Copyright 2014, 2015 Rackspace US, Inc.
# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,109 +12,43 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from barbicanclient import client as barbican_client
from barbicanclient import containers
from barbicanclient import secrets
from keystoneclient import session
import mock
from oslo_config import fixture
from neutron_lbaas.common.cert_manager import barbican_cert_manager as bcm
from neutron_lbaas.common.cert_manager import cert_manager
from neutron_lbaas.tests import base
import neutron_lbaas.common.cert_manager.barbican_cert_manager as bbq_common
from neutron_lbaas.common import keystone
import neutron_lbaas.tests.base as base
class TestBarbicanAuth(base.BaseTestCase):
def setUp(self):
# Reset the session and client
bcm.BarbicanKeystoneAuth._keystone_session = None
bcm.BarbicanKeystoneAuth._barbican_client = None
# Reset the client
bbq_common.BarbicanKeystoneAuth._barbican_client = None
keystone._SESSION = None
super(TestBarbicanAuth, self).setUp()
def test_get_keystone_client_v2(self):
# self.skip("Temporarily skipping test until a workaround is found.")
bcm.v2_client = mock.MagicMock()
fixture.Config().config(group='keystone_authtoken', auth_version='v2')
# There should be no existing session
self.assertIsNone(
bcm.BarbicanKeystoneAuth._keystone_session
)
# Get us a session
ks1 = bcm.BarbicanKeystoneAuth._get_keystone_session()
# Our returned session should also be the saved session
self.assertIsInstance(
bcm.BarbicanKeystoneAuth._keystone_session,
session.Session
)
self.assertIs(
bcm.BarbicanKeystoneAuth._keystone_session,
ks1
)
# Getting the session again should return the same object
ks2 = bcm.BarbicanKeystoneAuth._get_keystone_session()
self.assertIs(ks1, ks2)
def test_get_keystone_client_v3(self):
# self.skip("Temporarily skipping test until a workaround is found.")
bcm.v3_client = mock.MagicMock()
fixture.Config().config(group='keystone_authtoken', auth_version='v3')
# There should be no existing session
self.assertIsNone(
bcm.BarbicanKeystoneAuth._keystone_session
)
# Get us a session
ks1 = bcm.BarbicanKeystoneAuth._get_keystone_session()
# Our returned session should also be the saved session
self.assertIsInstance(
bcm.BarbicanKeystoneAuth._keystone_session,
session.Session
)
self.assertIs(
bcm.BarbicanKeystoneAuth._keystone_session,
ks1
)
# Getting the session again should return the same object
ks2 = bcm.BarbicanKeystoneAuth._get_keystone_session()
self.assertIs(ks1, ks2)
def test_get_barbican_client(self):
# There should be no existing client
self.assertIsNone(
bcm.BarbicanKeystoneAuth._barbican_client
)
self.assertIsNone(keystone._SESSION)
# Mock out the keystone session and get the client
bcm.BarbicanKeystoneAuth._keystone_session = (
mock.MagicMock()
)
bc1 = bcm.BarbicanKeystoneAuth.get_barbican_client()
keystone._SESSION = mock.MagicMock()
bc1 = bbq_common.BarbicanKeystoneAuth.get_barbican_client()
# Our returned client should also be the saved client
self.assertIsInstance(
bcm.BarbicanKeystoneAuth._barbican_client,
bbq_common.BarbicanKeystoneAuth._barbican_client,
barbican_client.Client
)
self.assertIs(
bcm.BarbicanKeystoneAuth._barbican_client,
bbq_common.BarbicanKeystoneAuth._barbican_client,
bc1
)
# Getting the session again should return the same object
bc2 = bcm.BarbicanKeystoneAuth.get_barbican_client()
bc2 = bbq_common.BarbicanKeystoneAuth.get_barbican_client()
self.assertIs(bc1, bc2)
@ -155,7 +89,7 @@ class TestBarbicanCert(base.BaseTestCase):
private_key_passphrase=self.private_key_passphrase_secret
)
# Create a cert
cert = bcm.Cert(
cert = bbq_common.Cert(
cert_container=container
)
@ -165,237 +99,3 @@ class TestBarbicanCert(base.BaseTestCase):
self.assertEqual(cert.get_private_key(), self.private_key)
self.assertEqual(cert.get_private_key_passphrase(),
self.private_key_passphrase)
def test_barbican_cert_none_values(self):
container = barbican_client.containers.CertificateContainer(
api=mock.MagicMock(),
certificate=None,
intermediates=None,
private_key=None,
private_key_passphrase=None
)
# Create a cert
cert = bcm.Cert(
cert_container=container
)
# Validate the cert functions
self.assertEqual(cert.get_certificate(), None)
self.assertEqual(cert.get_intermediates(), None)
self.assertEqual(cert.get_private_key(), None)
self.assertEqual(cert.get_private_key_passphrase(), None)
class TestBarbicanManager(base.BaseTestCase):
def setUp(self):
# Make a fake Container and contents
self.barbican_endpoint = 'http://localhost:9311/v1'
self.container_uuid = uuid.uuid4()
self.container_ref = '{0}/containers/{1}'.format(
self.barbican_endpoint, self.container_uuid
)
self.name = 'My Fancy Cert'
self.private_key = mock.Mock(spec=secrets.Secret)
self.certificate = mock.Mock(spec=secrets.Secret)
self.intermediates = mock.Mock(spec=secrets.Secret)
self.private_key_passphrase = mock.Mock(spec=secrets.Secret)
container = mock.Mock(spec=containers.CertificateContainer)
container.container_ref = self.container_ref
container.name = self.name
container.private_key = self.private_key
container.certificate = self.certificate
container.intermediates = self.intermediates
container.private_key_passphrase = self.private_key_passphrase
self.container = container
self.empty_container = mock.Mock(spec=containers.CertificateContainer)
self.secret1 = mock.Mock(spec=secrets.Secret)
self.secret2 = mock.Mock(spec=secrets.Secret)
self.secret3 = mock.Mock(spec=secrets.Secret)
self.secret4 = mock.Mock(spec=secrets.Secret)
super(TestBarbicanManager, self).setUp()
def test_store_cert(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.create_certificate.return_value = self.empty_container
bcm.BarbicanKeystoneAuth._barbican_client = bc
# Attempt to store a cert
bcm.CertManager.store_cert(
certificate=self.certificate,
private_key=self.private_key,
intermediates=self.intermediates,
private_key_passphrase=self.private_key_passphrase,
name=self.name
)
# create_secret should be called four times with our data
calls = [
mock.call(payload=self.certificate, expiration=None,
name=mock.ANY),
mock.call(payload=self.private_key, expiration=None,
name=mock.ANY),
mock.call(payload=self.intermediates, expiration=None,
name=mock.ANY),
mock.call(payload=self.private_key_passphrase, expiration=None,
name=mock.ANY)
]
bc.secrets.create.assert_has_calls(calls, any_order=True)
# create_certificate should be called once
self.assertEqual(bc.containers.create_certificate.call_count, 1)
# Container should be stored once
self.empty_container.store.assert_called_once_with()
def test_store_cert_failure(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.create_certificate.return_value = self.empty_container
test_secrets = [
self.secret1,
self.secret2,
self.secret3,
self.secret4
]
bc.secrets.create.side_effect = test_secrets
self.empty_container.store.side_effect = ValueError()
bcm.BarbicanKeystoneAuth._barbican_client = bc
# Attempt to store a cert
self.assertRaises(
ValueError,
bcm.CertManager.store_cert,
certificate=self.certificate,
private_key=self.private_key,
intermediates=self.intermediates,
private_key_passphrase=self.private_key_passphrase,
name=self.name
)
# create_secret should be called four times with our data
calls = [
mock.call(payload=self.certificate, expiration=None,
name=mock.ANY),
mock.call(payload=self.private_key, expiration=None,
name=mock.ANY),
mock.call(payload=self.intermediates, expiration=None,
name=mock.ANY),
mock.call(payload=self.private_key_passphrase, expiration=None,
name=mock.ANY)
]
bc.secrets.create.assert_has_calls(calls, any_order=True)
# create_certificate should be called once
self.assertEqual(bc.containers.create_certificate.call_count, 1)
# Container should be stored once
self.empty_container.store.assert_called_once_with()
# All secrets should be deleted (or at least an attempt made)
for s in test_secrets:
s.delete.assert_called_once_with()
def test_get_cert(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.register_consumer.return_value = self.container
bcm.BarbicanKeystoneAuth._barbican_client = bc
# Get the container data
data = bcm.CertManager.get_cert(
cert_ref=self.container_ref,
resource_ref=self.container_ref,
service_name='Octavia'
)
# 'register_consumer' should be called once with the container_ref
bc.containers.register_consumer.assert_called_once_with(
container_ref=self.container_ref,
url=self.container_ref,
name='Octavia'
)
# The returned data should be a Cert object with the correct values
self.assertIsInstance(data, cert_manager.Cert)
self.assertEqual(data.get_private_key(),
self.private_key.payload)
self.assertEqual(data.get_certificate(),
self.certificate.payload)
self.assertEqual(data.get_intermediates(),
self.intermediates.payload)
self.assertEqual(data.get_private_key_passphrase(),
self.private_key_passphrase.payload)
def test_get_cert_no_registration(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.get.return_value = self.container
bcm.BarbicanKeystoneAuth._barbican_client = bc
# Get the container data
data = bcm.CertManager.get_cert(
cert_ref=self.container_ref, check_only=True
)
# 'get' should be called once with the container_ref
bc.containers.get.assert_called_once_with(
container_ref=self.container_ref
)
# The returned data should be a Cert object with the correct values
self.assertIsInstance(data, cert_manager.Cert)
self.assertEqual(data.get_private_key(),
self.private_key.payload)
self.assertEqual(data.get_certificate(),
self.certificate.payload)
self.assertEqual(data.get_intermediates(),
self.intermediates.payload)
self.assertEqual(data.get_private_key_passphrase(),
self.private_key_passphrase.payload)
def test_delete_cert(self):
# Mock out the client
bc = mock.MagicMock()
bcm.BarbicanKeystoneAuth._barbican_client = bc
# Attempt to deregister as a consumer
bcm.CertManager.delete_cert(
cert_ref=self.container_ref,
resource_ref=self.container_ref,
service_name='Octavia'
)
# remove_consumer should be called once with the container_ref
bc.containers.remove_consumer.assert_called_once_with(
container_ref=self.container_ref,
url=self.container_ref,
name='Octavia'
)
def test_actually_delete_cert(self):
# Mock out the client
bc = mock.MagicMock()
bc.containers.get.return_value = self.container
bcm.BarbicanKeystoneAuth._barbican_client = bc
# Attempt to store a cert
bcm.CertManager._actually_delete_cert(
cert_ref=self.container_ref
)
# All secrets should be deleted
self.container.certificate.delete.assert_called_once_with()
self.container.private_key.delete.assert_called_once_with()
self.container.intermediates.delete.assert_called_once_with()
self.container.private_key_passphrase.delete.assert_called_once_with()
# Container should be deleted once
self.container.delete.assert_called_once_with()

View File

@ -15,6 +15,7 @@
import contextlib
import copy
import exceptions as ex
import mock
import six
@ -27,6 +28,7 @@ from neutron import context
import neutron.db.l3_db # noqa
from neutron.plugins.common import constants
from neutron.tests.unit.db import test_db_base_plugin_v2
from oslo_config import cfg
from oslo_utils import uuidutils
import testtools
import webob.exc
@ -780,6 +782,12 @@ class CertMock(cert_manager.Cert):
return "mock"
class Exceptions(object):
def __iter__(self):
return self
pass
class LbaasListenerTests(ListenerTestBase):
def test_create_listener(self, **extras):
@ -831,6 +839,19 @@ class LbaasListenerTests(ListenerTestBase):
def test_create_listener_with_tls_missing_container(self, **extras):
default_tls_container_ref = uuidutils.generate_uuid()
class ReplaceClass(ex.Exception):
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
pass
cfg.CONF.set_override('service_name',
'lbaas',
'service_auth')
cfg.CONF.set_override('region',
'RegionOne',
'service_auth')
listener_data = {
'protocol': lb_const.PROTOCOL_TERMINATED_HTTPS,
'default_tls_container_ref': default_tls_container_ref,
@ -842,18 +863,70 @@ class LbaasListenerTests(ListenerTestBase):
}
listener_data.update(extras)
with mock.patch(
'neutron_lbaas.services.loadbalancer.plugin.'
'CERT_MANAGER_PLUGIN.CertManager.get_cert') as get_cert_mock:
get_cert_mock.side_effect = LookupError
with contextlib.nested(
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'CERT_MANAGER_PLUGIN.CertManager.get_cert'),
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'CERT_MANAGER_PLUGIN.CertManager.delete_cert')
) as (get_cert_mock, rm_consumer_mock):
ex.Exception = ReplaceClass(status_code=404,
message='Cert Not Found')
get_cert_mock.side_effect = ex.Exception
self.assertRaises(loadbalancerv2.TLSContainerNotFound,
self.plugin.create_listener,
context.get_admin_context(),
{'listener': listener_data})
def test_create_listener_with_tls_invalid_service_acct(self, **extras):
default_tls_container_ref = uuidutils.generate_uuid()
listener_data = {
'protocol': lb_const.PROTOCOL_TERMINATED_HTTPS,
'default_tls_container_ref': default_tls_container_ref,
'sni_container_refs': [],
'protocol_port': 443,
'admin_state_up': True,
'tenant_id': self._tenant_id,
'loadbalancer_id': self.lb_id
}
listener_data.update(extras)
with contextlib.nested(
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'CERT_MANAGER_PLUGIN.CertManager.get_cert'),
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'CERT_MANAGER_PLUGIN.CertManager.delete_cert')
) as (get_cert_mock, rm_consumer_mock):
get_cert_mock.side_effect = Exception('RandomFailure')
self.assertRaises(loadbalancerv2.CertManagerError,
self.plugin.create_listener,
context.get_admin_context(),
{'listener': listener_data})
def test_get_service_url(self):
# Format: <servicename>://<region>/<resource>/<object_id>
cfg.CONF.set_override('service_name',
'lbaas',
'service_auth')
cfg.CONF.set_override('region',
'RegionOne',
'service_auth')
listner = {
'loadbalancer_id': self.lb_id
}
self.assertEqual(
'lbaas://RegionOne/LOADBALANCER/{0}'.format(self.lb_id),
self.plugin._get_service_url(listner))
def test_create_listener_with_tls_invalid_container(self, **extras):
default_tls_container_ref = uuidutils.generate_uuid()
cfg.CONF.set_override('service_name',
'lbaas',
'service_auth')
cfg.CONF.set_override('region',
'RegionOne',
'service_auth')
listener_data = {
'protocol': lb_const.PROTOCOL_TERMINATED_HTTPS,
'default_tls_container_ref': default_tls_container_ref,
@ -869,8 +942,10 @@ class LbaasListenerTests(ListenerTestBase):
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'cert_parser.validate_cert'),
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'CERT_MANAGER_PLUGIN.CertManager.get_cert')
) as (validate_cert_mock, get_cert_mock):
'CERT_MANAGER_PLUGIN.CertManager.get_cert'),
mock.patch('neutron_lbaas.services.loadbalancer.plugin.'
'CERT_MANAGER_PLUGIN.CertManager.delete_cert')
) as (validate_cert_mock, get_cert_mock, rm_consumer_mock):
get_cert_mock.start().return_value = CertMock(
'mock_cert')
validate_cert_mock.side_effect = exceptions.MisMatchedKey
@ -879,6 +954,9 @@ class LbaasListenerTests(ListenerTestBase):
self.plugin.create_listener,
context.get_admin_context(),
{'listener': listener_data})
rm_consumer_mock.assert_called_once_with(
listener_data['default_tls_container_ref'],
'lbaas://RegionOne/LOADBALANCER/{0}'.format(self.lb_id))
def test_create_listener_with_tls(self, **extras):
default_tls_container_ref = uuidutils.generate_uuid()