Merge "Fix multi-listener LB with missing certificate"

This commit is contained in:
Zuul 2019-12-09 23:05:37 +00:00 committed by Gerrit Code Review
commit 951a6bace2
10 changed files with 178 additions and 212 deletions

View File

@ -37,6 +37,9 @@ import octavia.common.jinja.haproxy.split_listeners.jinja_cfg as jinja_split
from octavia.common.jinja.lvs import jinja_cfg as jinja_udp_cfg
from octavia.common.tls_utils import cert_parser
from octavia.common import utils
from octavia.db import api as db_apis
from octavia.db import repositories as repo
LOG = logging.getLogger(__name__)
API_VERSION = consts.API_VERSION
@ -147,6 +150,11 @@ class HaproxyAmphoraLoadBalancerDriver(
'process mode.', amphora.id, loadbalancer.id)
has_tcp = False
certs = {}
client_ca_filename = None
crl_filename = None
pool_tls_certs = dict()
listeners_to_update = []
for listener in loadbalancer.listeners:
LOG.debug("%s updating listener %s on amphora %s",
self.__class__.__name__, listener.id, amphora.id)
@ -164,42 +172,63 @@ class HaproxyAmphoraLoadBalancerDriver(
else:
obj_id = loadbalancer.id
self._process_tls_certificates(listener, amphora, obj_id)
try:
certs.update({
listener.tls_certificate_id:
self._process_tls_certificates(
listener, amphora, obj_id)['tls_cert']})
client_ca_filename = self._process_secret(
listener, listener.client_ca_tls_certificate_id,
amphora, obj_id)
crl_filename = self._process_secret(
listener, listener.client_crl_container_id,
amphora, obj_id)
pool_tls_certs = self._process_listener_pool_certs(
listener, amphora, obj_id)
client_ca_filename = self._process_secret(
listener, listener.client_ca_tls_certificate_id,
amphora, obj_id)
crl_filename = self._process_secret(
listener, listener.client_crl_container_id,
amphora, obj_id)
pool_tls_certs = self._process_listener_pool_certs(
listener, amphora, obj_id)
if split_config:
config = self.jinja_split.build_config(
host_amphora=amphora, listener=listener,
haproxy_versions=haproxy_versions,
client_ca_filename=client_ca_filename,
client_crl=crl_filename,
pool_tls_certs=pool_tls_certs)
self.clients[amphora.api_version].upload_config(
amphora, listener.id, config,
timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
amphora, listener.id, timeout_dict=timeout_dict)
if split_config:
config = self.jinja_split.build_config(
host_amphora=amphora, listener=listener,
haproxy_versions=haproxy_versions,
client_ca_filename=client_ca_filename,
client_crl=crl_filename,
pool_tls_certs=pool_tls_certs)
self.clients[amphora.api_version].upload_config(
amphora, listener.id, config,
timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
amphora, listener.id, timeout_dict=timeout_dict)
else:
listeners_to_update.append(listener)
except Exception as e:
LOG.error('Unable to update listener {0} due to "{1}". '
'Skipping this listener.'.format(
listener.id, str(e)))
listener_repo = repo.ListenerRepository()
listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=consts.ERROR,
operating_status=consts.ERROR)
if has_tcp and not split_config:
# Generate HaProxy configuration from listener object
config = self.jinja_combo.build_config(
host_amphora=amphora, listeners=loadbalancer.listeners,
haproxy_versions=haproxy_versions,
client_ca_filename=client_ca_filename,
client_crl=crl_filename,
pool_tls_certs=pool_tls_certs)
self.clients[amphora.api_version].upload_config(
amphora, loadbalancer.id, config, timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
amphora, loadbalancer.id, timeout_dict=timeout_dict)
if listeners_to_update:
# Generate HaProxy configuration from listener object
config = self.jinja_combo.build_config(
host_amphora=amphora, listeners=listeners_to_update,
haproxy_versions=haproxy_versions,
client_ca_filename=client_ca_filename,
client_crl=crl_filename,
pool_tls_certs=pool_tls_certs)
self.clients[amphora.api_version].upload_config(
amphora, loadbalancer.id, config,
timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
amphora, loadbalancer.id, timeout_dict=timeout_dict)
else:
# If we aren't updating any listeners, make sure there are
# no listeners hanging around. For example if this update
# was called from a listener delete.
self.clients[amphora.api_version].delete_listener(
amphora, loadbalancer.id)
def _udp_update(self, listener, vip):
LOG.debug("Amphora %s keepalivedlvs, updating "

View File

@ -258,20 +258,26 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
potential_offline_pools = {}
# We got a heartbeat so lb is healthy until proven otherwise
if db_lb['enabled'] is False:
if db_lb[constants.ENABLED] is False:
lb_status = constants.OFFLINE
else:
lb_status = constants.ONLINE
health_msg_version = health.get('ver', 0)
for listener_id in db_lb.get('listeners', {}):
db_op_status = db_lb['listeners'][listener_id]['operating_status']
for listener_id in db_lb.get(constants.LISTENERS, {}):
db_listener = db_lb[constants.LISTENERS][listener_id]
db_op_status = db_listener[constants.OPERATING_STATUS]
listener_status = None
listener = None
if listener_id not in listeners:
listener_status = constants.OFFLINE
if (db_listener[constants.ENABLED] and
db_lb[constants.PROVISIONING_STATUS] ==
constants.ACTIVE):
listener_status = constants.ERROR
else:
listener_status = constants.OFFLINE
else:
listener = listeners[listener_id]

View File

@ -1176,14 +1176,14 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
"""
LOG.debug("Mark ACTIVE in DB for load balancer id: %s "
"and listener ids: %s", loadbalancer.id,
"and updating status for listener ids: %s", loadbalancer.id,
', '.join([l.id for l in listeners]))
self.loadbalancer_repo.update(db_apis.get_session(),
loadbalancer.id,
provisioning_status=constants.ACTIVE)
for listener in listeners:
self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ACTIVE)
self.listener_repo.prov_status_active_if_not_error(
db_apis.get_session(), listener.id)
def revert(self, loadbalancer, listeners, *args, **kwargs):
"""Mark the load balancer and listeners as broken.
@ -1202,35 +1202,6 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerActiveInDB(BaseDatabaseTask):
"""Mark the listener active in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, listener):
"""Mark the listener as active in DB
:param listener: The listener to be marked active
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for listener id: %s ", listener.id)
self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ACTIVE)
def revert(self, listener, *args, **kwargs):
"""Mark the listener ERROR since the delete couldn't happen
:param listener: The listener that couldn't be updated
:returns: None
"""
LOG.warning("Reverting mark listener active in DB "
"for listener id %s", listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerDeletedInDB(BaseDatabaseTask):
"""Mark the listener deleted in the DB.

View File

@ -1224,15 +1224,13 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
if lb_id:
LOG.debug("Mark ACTIVE in DB for load balancer id: %s "
"and listener ids: %s", lb_id,
"and updating status for listener ids: %s", lb_id,
', '.join([l[constants.LISTENER_ID] for l in listeners]))
self.loadbalancer_repo.update(db_apis.get_session(),
lb_id,
self.loadbalancer_repo.update(db_apis.get_session(), lb_id,
provisioning_status=constants.ACTIVE)
for listener in listeners:
self.listener_repo.update(
db_apis.get_session(), listener[constants.LISTENER_ID],
provisioning_status=constants.ACTIVE)
self.listener_repo.prov_status_active_if_not_error(
db_apis.get_session(), listener[constants.LISTENER_ID])
def revert(self, loadbalancer_id, listeners, *args, **kwargs):
"""Mark the load balancer and listeners as broken.
@ -1260,35 +1258,6 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
listener[constants.LISTENER_ID])
class MarkListenerActiveInDB(BaseDatabaseTask):
"""Mark the listener active in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, listener):
"""Mark the listener as active in DB
:param listener: The listener to be marked active
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for listener id: %s ", listener.id)
self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ACTIVE)
def revert(self, listener, *args, **kwargs):
"""Mark the listener ERROR since the delete couldn't happen
:param listener: The listener that couldn't be updated
:returns: None
"""
LOG.warning("Reverting mark listener active in DB "
"for listener id %s", listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerDeletedInDB(BaseDatabaseTask):
"""Mark the listener deleted in the DB.

View File

@ -1107,6 +1107,16 @@ class ListenerRepository(BaseRepository):
session.add(model)
return model.to_data_model()
def prov_status_active_if_not_error(self, session, listener_id):
"""Update provisioning_status to ACTIVE if not already in ERROR."""
with session.begin(subtransactions=True):
(session.query(self.model_class).filter_by(id=listener_id).
# Don't mark ERROR or already ACTIVE as ACTIVE
filter(~self.model_class.provisioning_status.in_(
[consts.ERROR, consts.ACTIVE])).
update({self.model_class.provisioning_status: consts.ACTIVE},
synchronize_session='fetch'))
class ListenerStatisticsRepository(BaseRepository):
model_class = models.ListenerStatistics

View File

@ -2236,15 +2236,16 @@ class TestListenerRepositoryTest(BaseRepositoryTest):
operating_status=constants.ONLINE, enabled=True,
server_group_id=self.FAKE_UUID_1)
def create_listener(self, listener_id, port, default_pool_id=None):
def create_listener(self, listener_id, port, default_pool_id=None,
provisioning_status=constants.ACTIVE):
listener = self.listener_repo.create(
self.session, id=listener_id, project_id=self.FAKE_UUID_2,
name="listener_name", description="listener_description",
protocol=constants.PROTOCOL_HTTP, protocol_port=port,
connection_limit=1, load_balancer_id=self.load_balancer.id,
default_pool_id=default_pool_id, operating_status=constants.ONLINE,
provisioning_status=constants.ACTIVE, enabled=True, peer_port=1025,
tags=['test_tag'])
provisioning_status=provisioning_status, enabled=True,
peer_port=1025, tags=['test_tag'])
return listener
def create_amphora(self, amphora_id, loadbalancer_id):
@ -2471,6 +2472,40 @@ class TestListenerRepositoryTest(BaseRepositoryTest):
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertIsNone(new_listener.default_pool)
def test_prov_status_active_if_not_error_active(self):
listener = self.create_listener(self.FAKE_UUID_1, 80,
provisioning_status=constants.ACTIVE)
self.listener_repo.prov_status_active_if_not_error(self.session,
listener.id)
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertEqual(constants.ACTIVE, new_listener.provisioning_status)
def test_prov_status_active_if_not_error_error(self):
listener = self.create_listener(self.FAKE_UUID_1, 80,
provisioning_status=constants.ERROR)
self.listener_repo.prov_status_active_if_not_error(self.session,
listener.id)
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertEqual(constants.ERROR, new_listener.provisioning_status)
def test_prov_status_active_if_not_error_pending_update(self):
listener = self.create_listener(
self.FAKE_UUID_1, 80, provisioning_status=constants.PENDING_UPDATE)
self.listener_repo.prov_status_active_if_not_error(self.session,
listener.id)
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertEqual(constants.ACTIVE, new_listener.provisioning_status)
def test_prov_status_active_if_not_error_bogus_listener(self):
listener = self.create_listener(
self.FAKE_UUID_1, 80, provisioning_status=constants.PENDING_UPDATE)
# Should not raise an exception nor change any status
self.listener_repo.prov_status_active_if_not_error(self.session,
'bogus_id')
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertEqual(constants.PENDING_UPDATE,
new_listener.provisioning_status)
class ListenerStatisticsRepositoryTest(BaseRepositoryTest):

View File

@ -123,16 +123,6 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
mock_amphora = mock.MagicMock()
mock_amphora.id = 'mock_amphora_id'
mock_amphora.api_version = API_VERSION
# mock_listener = mock.MagicMock()
# mock_listener.id = 'mock_listener_id'
# mock_listener.protocol = constants.PROTOCOL_HTTP
# mock_listener.connection_limit = constants.DEFAULT_CONNECTION_LIMIT
# mock_listener.tls_certificate_id = None
# mock_loadbalancer = mock.MagicMock()
# mock_loadbalancer.id = 'mock_lb_id'
# mock_loadbalancer.project_id = 'mock_lb_project'
# mock_loadbalancer.listeners = [mock_listener]
# mock_listener.load_balancer = mock_loadbalancer
mock_secret.return_value = 'filename.pem'
mock_load_cert.return_value = {
'tls_cert': self.sl.default_tls_container, 'sni_certs': [],
@ -168,6 +158,26 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.driver.clients[API_VERSION].upload_config.assert_not_called()
self.driver.clients[API_VERSION].reload_listener.assert_not_called()
@mock.patch('octavia.db.api.get_session')
@mock.patch('octavia.db.repositories.ListenerRepository.update')
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
def test_update_amphora_listeners_bad_cert(
self, mock_load_cert, mock_list_update, mock_get_session):
mock_amphora = mock.MagicMock()
mock_amphora.id = 'mock_amphora_id'
mock_amphora.api_version = API_VERSION
mock_get_session.return_value = 'fake_session'
mock_load_cert.side_effect = [Exception]
self.driver.update_amphora_listeners(self.lb,
mock_amphora, self.timeout_dict)
mock_list_update.assert_called_once_with(
'fake_session', self.lb.listeners[0].id,
provisioning_status=constants.ERROR,
operating_status=constants.ERROR)
self.driver.jinja_split.build_config.assert_not_called()
self.driver.clients[API_VERSION].delete_listener.assert_not_called()
@mock.patch('octavia.amphorae.drivers.haproxy.rest_api_driver.'
'HaproxyAmphoraLoadBalancerDriver._process_secret')
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')

View File

@ -123,16 +123,6 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
mock_amphora = mock.MagicMock()
mock_amphora.id = 'mock_amphora_id'
mock_amphora.api_version = API_VERSION
# mock_listener = mock.MagicMock()
# mock_listener.id = 'mock_listener_id'
# mock_listener.protocol = constants.PROTOCOL_HTTP
# mock_listener.connection_limit = constants.DEFAULT_CONNECTION_LIMIT
# mock_listener.tls_certificate_id = None
# mock_loadbalancer = mock.MagicMock()
# mock_loadbalancer.id = 'mock_lb_id'
# mock_loadbalancer.project_id = 'mock_lb_project'
# mock_loadbalancer.listeners = [mock_listener]
# mock_listener.load_balancer = mock_loadbalancer
mock_secret.return_value = 'filename.pem'
mock_load_cert.return_value = {
'tls_cert': self.sl.default_tls_container, 'sni_certs': [],
@ -168,6 +158,27 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.driver.clients[API_VERSION].upload_config.assert_not_called()
self.driver.clients[API_VERSION].reload_listener.assert_not_called()
@mock.patch('octavia.db.api.get_session')
@mock.patch('octavia.db.repositories.ListenerRepository.update')
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
def test_update_amphora_listeners_bad_cert(
self, mock_load_cert, mock_list_update, mock_get_session):
mock_amphora = mock.MagicMock()
mock_amphora.id = 'mock_amphora_id'
mock_amphora.api_version = API_VERSION
mock_get_session.return_value = 'fake_session'
mock_load_cert.side_effect = [Exception]
self.driver.update_amphora_listeners(self.lb,
mock_amphora, self.timeout_dict)
mock_list_update.assert_called_once_with(
'fake_session', self.lb.listeners[0].id,
provisioning_status=constants.ERROR,
operating_status=constants.ERROR)
self.driver.jinja_combo.build_config.assert_not_called()
(self.driver.clients[API_VERSION].delete_listener.
assert_called_once_with)(mock_amphora, self.lb.id)
@mock.patch('octavia.amphorae.drivers.haproxy.rest_api_driver.'
'HaproxyAmphoraLoadBalancerDriver._process_secret')
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')

View File

@ -882,42 +882,6 @@ class TestDatabaseTasks(base.TestCase):
'TEST',
id=AMP_ID)
def test_mark_listener_active_in_db(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_listener_active = database_tasks.MarkListenerActiveInDB()
mark_listener_active.execute(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_listener_repo_update.reset_mock()
mark_listener_active.revert(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
# Test the revert
mock_listener_repo_update.reset_mock()
mock_listener_repo_update.side_effect = Exception('fail')
mark_listener_active.revert(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
def test_mark_listener_deleted_in_db(self,
mock_generate_uuid,
mock_LOG,
@ -991,7 +955,10 @@ class TestDatabaseTasks(base.TestCase):
id=LISTENER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.ListenerRepository.'
'prov_status_active_if_not_error')
def test_mark_lb_and_listeners_active_in_db(self,
mock_list_not_error,
mock_generate_uuid,
mock_LOG,
mock_get_session,
@ -1005,10 +972,7 @@ class TestDatabaseTasks(base.TestCase):
mark_lb_and_listeners_active.execute(self.loadbalancer_mock,
[self.listener_mock])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
mock_list_not_error.assert_called_once_with('TEST', LISTENER_ID)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
LB_ID,

View File

@ -893,42 +893,6 @@ class TestDatabaseTasks(base.TestCase):
'TEST',
id=AMP_ID)
def test_mark_listener_active_in_db(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_listener_active = database_tasks.MarkListenerActiveInDB()
mark_listener_active.execute(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_listener_repo_update.reset_mock()
mark_listener_active.revert(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
# Test the revert
mock_listener_repo_update.reset_mock()
mock_listener_repo_update.side_effect = Exception('fail')
mark_listener_active.revert(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
def test_mark_listener_deleted_in_db(self,
mock_generate_uuid,
mock_LOG,
@ -1002,7 +966,10 @@ class TestDatabaseTasks(base.TestCase):
id=LISTENER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.ListenerRepository.'
'prov_status_active_if_not_error')
def test_mark_lb_and_listeners_active_in_db(self,
mock_list_not_error,
mock_generate_uuid,
mock_LOG,
mock_get_session,
@ -1017,10 +984,7 @@ class TestDatabaseTasks(base.TestCase):
MarkLBAndListenersActiveInDB())
mark_lb_and_listeners_active.execute(LB_ID, [listener_dict])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
mock_list_not_error.assert_called_once_with('TEST', LISTENER_ID)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
LB_ID,
@ -1028,7 +992,7 @@ class TestDatabaseTasks(base.TestCase):
# Test with LB_ID from listeners
mock_loadbalancer_repo_update.reset_mock()
mock_listener_repo_update.reset_mock()
mock_list_not_error.reset_mock()
listener_dict = {constants.LISTENER_ID: LISTENER_ID,
constants.LOADBALANCER_ID: LB_ID}
@ -1036,10 +1000,7 @@ class TestDatabaseTasks(base.TestCase):
MarkLBAndListenersActiveInDB())
mark_lb_and_listeners_active.execute(None, [listener_dict])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
mock_list_not_error.assert_called_once_with('TEST', LISTENER_ID)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
LB_ID,