# Copyright 2015 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # from octavia_lib.common import constants as lib_consts from oslo_config import cfg from oslo_log import log as logging from oslo_utils import excutils from sqlalchemy.orm import exc as db_exceptions from stevedore import driver as stevedore_driver from taskflow.listeners import logging as tf_logging import tenacity from octavia.amphorae.driver_exceptions import exceptions as driver_exc from octavia.api.drivers import utils as provider_utils from octavia.common import base_taskflow from octavia.common import constants from octavia.common import exceptions from octavia.common import utils from octavia.controller.worker.v2.flows import flow_utils from octavia.controller.worker.v2 import taskflow_jobboard_driver as tsk_driver from octavia.db import api as db_apis from octavia.db import repositories as repo CONF = cfg.CONF LOG = logging.getLogger(__name__) # We do not need to log retry exception information. Warning "Could not connect # to instance" will be logged as usual. def retryMaskFilter(record): if record.exc_info is not None and isinstance( record.exc_info[1], driver_exc.AmpConnectionRetry): return False return True LOG.logger.addFilter(retryMaskFilter) def _is_provisioning_status_pending_update(lb_obj): return not lb_obj.provisioning_status == constants.PENDING_UPDATE class ControllerWorker(object): def __init__(self): self._amphora_repo = repo.AmphoraRepository() self._amphora_health_repo = repo.AmphoraHealthRepository() self._health_mon_repo = repo.HealthMonitorRepository() self._lb_repo = repo.LoadBalancerRepository() self._listener_repo = repo.ListenerRepository() self._member_repo = repo.MemberRepository() self._pool_repo = repo.PoolRepository() self._l7policy_repo = repo.L7PolicyRepository() self._l7rule_repo = repo.L7RuleRepository() self._flavor_repo = repo.FlavorRepository() self._az_repo = repo.AvailabilityZoneRepository() if CONF.task_flow.jobboard_enabled: persistence = tsk_driver.MysqlPersistenceDriver() self.jobboard_driver = stevedore_driver.DriverManager( namespace='octavia.worker.jobboard_driver', name=CONF.task_flow.jobboard_backend_driver, invoke_args=(persistence,), invoke_on_load=True).driver else: self.tf_engine = base_taskflow.BaseTaskFlowEngine() @tenacity.retry( retry=( tenacity.retry_if_result(_is_provisioning_status_pending_update) | tenacity.retry_if_exception_type()), wait=tenacity.wait_incrementing( CONF.haproxy_amphora.api_db_commit_retry_initial_delay, CONF.haproxy_amphora.api_db_commit_retry_backoff, CONF.haproxy_amphora.api_db_commit_retry_max), stop=tenacity.stop_after_attempt( CONF.haproxy_amphora.api_db_commit_retry_attempts)) def _get_db_obj_until_pending_update(self, repo, id): return repo.get(db_apis.get_session(), id=id) @property def services_controller(self): return base_taskflow.TaskFlowServiceController(self.jobboard_driver) def run_flow(self, func, *args, **kwargs): if CONF.task_flow.jobboard_enabled: self.services_controller.run_poster(func, *args, **kwargs) else: tf = self.tf_engine.taskflow_load( func(*args), **kwargs) with tf_logging.DynamicLoggingListener(tf, log=LOG): tf.run() def create_amphora(self, availability_zone=None): """Creates an Amphora. This is used to create spare amphora. :returns: uuid """ try: store = {constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_SPARES_POOL_PRIORITY, constants.FLAVOR: None, constants.SERVER_GROUP_ID: None, constants.AVAILABILITY_ZONE: None} if availability_zone: store[constants.AVAILABILITY_ZONE] = ( self._az_repo.get_availability_zone_metadata_dict( db_apis.get_session(), availability_zone)) self.run_flow( flow_utils.get_create_amphora_flow, store=store, wait=True) except Exception as e: LOG.error('Failed to create an amphora due to: %s', str(e)) def delete_amphora(self, amphora_id): """Deletes an existing Amphora. :param amphora_id: ID of the amphora to delete :returns: None :raises AmphoraNotFound: The referenced Amphora was not found """ try: amphora = self._amphora_repo.get(db_apis.get_session(), id=amphora_id) store = {constants.AMPHORA: amphora.to_dict()} self.run_flow( flow_utils.get_delete_amphora_flow, store=store) except Exception as e: LOG.error('Failed to delete a amphora %s due to: %s', amphora_id, str(e)) return LOG.info('Finished deleting amphora %s.', amphora_id) @tenacity.retry( retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), wait=tenacity.wait_incrementing( CONF.haproxy_amphora.api_db_commit_retry_initial_delay, CONF.haproxy_amphora.api_db_commit_retry_backoff, CONF.haproxy_amphora.api_db_commit_retry_max), stop=tenacity.stop_after_attempt( CONF.haproxy_amphora.api_db_commit_retry_attempts)) def create_health_monitor(self, health_monitor): """Creates a health monitor. :param health_monitor: Provider health monitor dict :returns: None :raises NoResultFound: Unable to find the object """ db_health_monitor = self._health_mon_repo.get( db_apis.get_session(), id=health_monitor[constants.HEALTHMONITOR_ID]) if not db_health_monitor: LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' '60 seconds.', 'healthmonitor', health_monitor[constants.HEALTHMONITOR_ID]) raise db_exceptions.NoResultFound pool = db_health_monitor.pool pool.health_monitor = db_health_monitor load_balancer = pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) store = {constants.HEALTH_MON: health_monitor, constants.POOL_ID: pool.id, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id, constants.LOADBALANCER: provider_lb} self.run_flow( flow_utils.get_create_health_monitor_flow, store=store) def delete_health_monitor(self, health_monitor): """Deletes a health monitor. :param health_monitor: Provider health monitor dict :returns: None :raises HMNotFound: The referenced health monitor was not found """ db_health_monitor = self._health_mon_repo.get( db_apis.get_session(), id=health_monitor[constants.HEALTHMONITOR_ID]) pool = db_health_monitor.pool load_balancer = pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) store = {constants.HEALTH_MON: health_monitor, constants.POOL_ID: pool.id, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id, constants.LOADBALANCER: provider_lb, constants.PROJECT_ID: load_balancer.project_id} self.run_flow( flow_utils.get_delete_health_monitor_flow, store=store) def update_health_monitor(self, original_health_monitor, health_monitor_updates): """Updates a health monitor. :param original_health_monitor: Provider health monitor dict :param health_monitor_updates: Dict containing updated health monitor :returns: None :raises HMNotFound: The referenced health monitor was not found """ try: db_health_monitor = self._get_db_obj_until_pending_update( self._health_mon_repo, original_health_monitor[constants.HEALTHMONITOR_ID]) except tenacity.RetryError as e: LOG.warning('Health monitor did not go into %s in 60 seconds. ' 'This either due to an in-progress Octavia upgrade ' 'or an overloaded and failing database. Assuming ' 'an upgrade is in progress and continuing.', constants.PENDING_UPDATE) db_health_monitor = e.last_attempt.result() pool = db_health_monitor.pool listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) load_balancer = pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() store = {constants.HEALTH_MON: original_health_monitor, constants.POOL_ID: pool.id, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id, constants.LOADBALANCER: provider_lb, constants.UPDATE_DICT: health_monitor_updates} self.run_flow( flow_utils.get_update_health_monitor_flow, store=store) @tenacity.retry( retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), wait=tenacity.wait_incrementing( CONF.haproxy_amphora.api_db_commit_retry_initial_delay, CONF.haproxy_amphora.api_db_commit_retry_backoff, CONF.haproxy_amphora.api_db_commit_retry_max), stop=tenacity.stop_after_attempt( CONF.haproxy_amphora.api_db_commit_retry_attempts)) def create_listener(self, listener): """Creates a listener. :param listener: A listener provider dictionary. :returns: None :raises NoResultFound: Unable to find the object """ db_listener = self._listener_repo.get( db_apis.get_session(), id=listener[constants.LISTENER_ID]) if not db_listener: LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' '60 seconds.', 'listener', listener[constants.LISTENER_ID]) raise db_exceptions.NoResultFound load_balancer = db_listener.load_balancer listeners = load_balancer.listeners dict_listeners = [] for li in listeners: dict_listeners.append( provider_utils.db_listener_to_provider_listener(li).to_dict()) provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() store = {constants.LISTENERS: dict_listeners, constants.LOADBALANCER: provider_lb, constants.LOADBALANCER_ID: load_balancer.id} self.run_flow( flow_utils.get_create_listener_flow, store=store) def delete_listener(self, listener): """Deletes a listener. :param listener: A listener provider dictionary to delete :returns: None :raises ListenerNotFound: The referenced listener was not found """ store = {constants.LISTENER: listener, constants.LOADBALANCER_ID: listener[constants.LOADBALANCER_ID], constants.PROJECT_ID: listener[constants.PROJECT_ID]} self.run_flow( flow_utils.get_delete_listener_flow, store=store) def update_listener(self, listener, listener_updates): """Updates a listener. :param listener: A listener provider dictionary to update :param listener_updates: Dict containing updated listener attributes :returns: None :raises ListenerNotFound: The referenced listener was not found """ db_lb = self._lb_repo.get(db_apis.get_session(), id=listener[constants.LOADBALANCER_ID]) store = {constants.LISTENER: listener, constants.UPDATE_DICT: listener_updates, constants.LOADBALANCER_ID: db_lb.id, constants.LISTENERS: [listener]} self.run_flow( flow_utils.get_update_listener_flow, store=store) @tenacity.retry( retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), wait=tenacity.wait_incrementing( CONF.haproxy_amphora.api_db_commit_retry_initial_delay, CONF.haproxy_amphora.api_db_commit_retry_backoff, CONF.haproxy_amphora.api_db_commit_retry_max), stop=tenacity.stop_after_attempt( CONF.haproxy_amphora.api_db_commit_retry_attempts)) def create_load_balancer(self, loadbalancer, flavor=None, availability_zone=None): """Creates a load balancer by allocating Amphorae. First tries to allocate an existing Amphora in READY state. If none are available it will attempt to build one specifically for this load balancer. :param loadbalancer: The dict of load balancer to create :returns: None :raises NoResultFound: Unable to find the object """ lb = self._lb_repo.get(db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) if not lb: LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' '60 seconds.', 'load_balancer', loadbalancer[constants.LOADBALANCER_ID]) raise db_exceptions.NoResultFound store = {lib_consts.LOADBALANCER_ID: loadbalancer[lib_consts.LOADBALANCER_ID], constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY, lib_consts.FLAVOR: flavor, lib_consts.AVAILABILITY_ZONE: availability_zone} topology = lb.topology if (not CONF.nova.enable_anti_affinity or topology == constants.TOPOLOGY_SINGLE): store[constants.SERVER_GROUP_ID] = None listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( lb.listeners) ) store[constants.UPDATE_DICT] = { constants.TOPOLOGY: topology } self.run_flow( flow_utils.get_create_load_balancer_flow, topology, listeners=listeners_dicts, store=store) def delete_load_balancer(self, load_balancer, cascade=False): """Deletes a load balancer by de-allocating Amphorae. :param load_balancer: Dict of the load balancer to delete :returns: None :raises LBNotFound: The referenced load balancer was not found """ loadbalancer_id = load_balancer[constants.LOADBALANCER_ID] db_lb = self._lb_repo.get(db_apis.get_session(), id=loadbalancer_id) store = {constants.LOADBALANCER: load_balancer, constants.LOADBALANCER_ID: loadbalancer_id, constants.SERVER_GROUP_ID: db_lb.server_group_id, constants.PROJECT_ID: db_lb.project_id} if cascade: listeners = flow_utils.get_listeners_on_lb(db_lb) pools = flow_utils.get_pools_on_lb(db_lb) self.run_flow( flow_utils.get_cascade_delete_load_balancer_flow, load_balancer, listeners, pools, store=store) else: self.run_flow( flow_utils.get_delete_load_balancer_flow, load_balancer, store=store) def update_load_balancer(self, original_load_balancer, load_balancer_updates): """Updates a load balancer. :param original_load_balancer: Dict of the load balancer to update :param load_balancer_updates: Dict containing updated load balancer :returns: None :raises LBNotFound: The referenced load balancer was not found """ store = {constants.LOADBALANCER: original_load_balancer, constants.LOADBALANCER_ID: original_load_balancer[constants.LOADBALANCER_ID], constants.UPDATE_DICT: load_balancer_updates} self.run_flow( flow_utils.get_update_load_balancer_flow, store=store) @tenacity.retry( retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), wait=tenacity.wait_incrementing( CONF.haproxy_amphora.api_db_commit_retry_initial_delay, CONF.haproxy_amphora.api_db_commit_retry_backoff, CONF.haproxy_amphora.api_db_commit_retry_max), stop=tenacity.stop_after_attempt( CONF.haproxy_amphora.api_db_commit_retry_attempts)) def create_member(self, member): """Creates a pool member. :param member: A member provider dictionary to create :returns: None :raises NoSuitablePool: Unable to find the node pool """ pool = self._pool_repo.get(db_apis.get_session(), id=member[constants.POOL_ID]) load_balancer = pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) store = { constants.MEMBER: member, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id, constants.LOADBALANCER: provider_lb, constants.POOL_ID: pool.id} if load_balancer.availability_zone: store[constants.AVAILABILITY_ZONE] = ( self._az_repo.get_availability_zone_metadata_dict( db_apis.get_session(), load_balancer.availability_zone)) else: store[constants.AVAILABILITY_ZONE] = {} self.run_flow( flow_utils.get_create_member_flow, store=store) def delete_member(self, member): """Deletes a pool member. :param member: A member provider dictionary to delete :returns: None :raises MemberNotFound: The referenced member was not found """ pool = self._pool_repo.get(db_apis.get_session(), id=member[constants.POOL_ID]) load_balancer = pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) store = { constants.MEMBER: member, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id, constants.LOADBALANCER: provider_lb, constants.POOL_ID: pool.id, constants.PROJECT_ID: load_balancer.project_id} if load_balancer.availability_zone: store[constants.AVAILABILITY_ZONE] = ( self._az_repo.get_availability_zone_metadata_dict( db_apis.get_session(), load_balancer.availability_zone)) else: store[constants.AVAILABILITY_ZONE] = {} self.run_flow( flow_utils.get_delete_member_flow, store=store) def batch_update_members(self, old_members, new_members, updated_members): updated_members = [ (provider_utils.db_member_to_provider_member( self._member_repo.get(db_apis.get_session(), id=m.get(constants.ID))).to_dict(), m) for m in updated_members] provider_old_members = [ provider_utils.db_member_to_provider_member( self._member_repo.get(db_apis.get_session(), id=m.get(constants.ID))).to_dict() for m in old_members] if old_members: pool = self._pool_repo.get(db_apis.get_session(), id=old_members[0][constants.POOL_ID]) elif new_members: pool = self._pool_repo.get(db_apis.get_session(), id=new_members[0][constants.POOL_ID]) else: pool = self._pool_repo.get( db_apis.get_session(), id=updated_members[0][0][constants.POOL_ID]) load_balancer = pool.load_balancer listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() store = { constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id, constants.LOADBALANCER: provider_lb, constants.POOL_ID: pool.id, constants.PROJECT_ID: load_balancer.project_id} if load_balancer.availability_zone: store[constants.AVAILABILITY_ZONE] = ( self._az_repo.get_availability_zone_metadata_dict( db_apis.get_session(), load_balancer.availability_zone)) else: store[constants.AVAILABILITY_ZONE] = {} self.run_flow( flow_utils.get_batch_update_members_flow, provider_old_members, new_members, updated_members, store=store) def update_member(self, member, member_updates): """Updates a pool member. :param member_id: A member provider dictionary to update :param member_updates: Dict containing updated member attributes :returns: None :raises MemberNotFound: The referenced member was not found """ # TODO(ataraday) when other flows will use dicts - revisit this pool = self._pool_repo.get(db_apis.get_session(), id=member[constants.POOL_ID]) load_balancer = pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) store = { constants.MEMBER: member, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id, constants.LOADBALANCER: provider_lb, constants.POOL_ID: pool.id, constants.UPDATE_DICT: member_updates} if load_balancer.availability_zone: store[constants.AVAILABILITY_ZONE] = ( self._az_repo.get_availability_zone_metadata_dict( db_apis.get_session(), load_balancer.availability_zone)) else: store[constants.AVAILABILITY_ZONE] = {} self.run_flow( flow_utils.get_update_member_flow, store=store) @tenacity.retry( retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), wait=tenacity.wait_incrementing( CONF.haproxy_amphora.api_db_commit_retry_initial_delay, CONF.haproxy_amphora.api_db_commit_retry_backoff, CONF.haproxy_amphora.api_db_commit_retry_max), stop=tenacity.stop_after_attempt( CONF.haproxy_amphora.api_db_commit_retry_attempts)) def create_pool(self, pool): """Creates a node pool. :param pool: Provider pool dict to create :returns: None :raises NoResultFound: Unable to find the object """ # TODO(ataraday) It seems we need to get db pool here anyway to get # proper listeners db_pool = self._pool_repo.get(db_apis.get_session(), id=pool[constants.POOL_ID]) if not db_pool: LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' '60 seconds.', 'pool', pool[constants.POOL_ID]) raise db_exceptions.NoResultFound load_balancer = db_pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( db_pool.listeners)) store = {constants.POOL_ID: pool[constants.POOL_ID], constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id, constants.LOADBALANCER: provider_lb} self.run_flow( flow_utils.get_create_pool_flow, store=store) def delete_pool(self, pool): """Deletes a node pool. :param pool: Provider pool dict to delete :returns: None :raises PoolNotFound: The referenced pool was not found """ db_pool = self._pool_repo.get(db_apis.get_session(), id=pool[constants.POOL_ID]) listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( db_pool.listeners)) load_balancer = db_pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() store = {constants.POOL_ID: pool[constants.POOL_ID], constants.LISTENERS: listeners_dicts, constants.LOADBALANCER: provider_lb, constants.LOADBALANCER_ID: load_balancer.id, constants.PROJECT_ID: db_pool.project_id} self.run_flow( flow_utils.get_delete_pool_flow, store=store) def update_pool(self, origin_pool, pool_updates): """Updates a node pool. :param origin_pool: Provider pool dict to update :param pool_updates: Dict containing updated pool attributes :returns: None :raises PoolNotFound: The referenced pool was not found """ try: db_pool = self._get_db_obj_until_pending_update( self._pool_repo, origin_pool[constants.POOL_ID]) except tenacity.RetryError as e: LOG.warning('Pool did not go into %s in 60 seconds. ' 'This either due to an in-progress Octavia upgrade ' 'or an overloaded and failing database. Assuming ' 'an upgrade is in progress and continuing.', constants.PENDING_UPDATE) db_pool = e.last_attempt.result() load_balancer = db_pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( db_pool.listeners)) store = {constants.POOL_ID: db_pool.id, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER: provider_lb, constants.LOADBALANCER_ID: load_balancer.id, constants.UPDATE_DICT: pool_updates} self.run_flow( flow_utils.get_update_pool_flow, store=store) @tenacity.retry( retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), wait=tenacity.wait_incrementing( CONF.haproxy_amphora.api_db_commit_retry_initial_delay, CONF.haproxy_amphora.api_db_commit_retry_backoff, CONF.haproxy_amphora.api_db_commit_retry_max), stop=tenacity.stop_after_attempt( CONF.haproxy_amphora.api_db_commit_retry_attempts)) def create_l7policy(self, l7policy): """Creates an L7 Policy. :param l7policy: Provider dict of the l7policy to create :returns: None :raises NoResultFound: Unable to find the object """ db_listener = self._listener_repo.get( db_apis.get_session(), id=l7policy[constants.LISTENER_ID]) listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_listener])) store = {constants.L7POLICY: l7policy, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: db_listener.load_balancer.id } self.run_flow( flow_utils.get_create_l7policy_flow, store=store) def delete_l7policy(self, l7policy): """Deletes an L7 policy. :param l7policy: Provider dict of the l7policy to delete :returns: None :raises L7PolicyNotFound: The referenced l7policy was not found """ db_listener = self._listener_repo.get( db_apis.get_session(), id=l7policy[constants.LISTENER_ID]) listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_listener])) store = {constants.L7POLICY: l7policy, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: db_listener.load_balancer.id } self.run_flow( flow_utils.get_delete_l7policy_flow, store=store) def update_l7policy(self, original_l7policy, l7policy_updates): """Updates an L7 policy. :param l7policy: Provider dict of the l7policy to update :param l7policy_updates: Dict containing updated l7policy attributes :returns: None :raises L7PolicyNotFound: The referenced l7policy was not found """ db_listener = self._listener_repo.get( db_apis.get_session(), id=original_l7policy[constants.LISTENER_ID]) listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_listener])) store = {constants.L7POLICY: original_l7policy, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: db_listener.load_balancer.id, constants.UPDATE_DICT: l7policy_updates} self.run_flow( flow_utils.get_update_l7policy_flow, store=store) @tenacity.retry( retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), wait=tenacity.wait_incrementing( CONF.haproxy_amphora.api_db_commit_retry_initial_delay, CONF.haproxy_amphora.api_db_commit_retry_backoff, CONF.haproxy_amphora.api_db_commit_retry_max), stop=tenacity.stop_after_attempt( CONF.haproxy_amphora.api_db_commit_retry_attempts)) def create_l7rule(self, l7rule): """Creates an L7 Rule. :param l7rule: Provider dict l7rule :returns: None :raises NoResultFound: Unable to find the object """ db_l7policy = self._l7policy_repo.get(db_apis.get_session(), id=l7rule[constants.L7POLICY_ID]) load_balancer = db_l7policy.listener.load_balancer listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_l7policy.listener])) l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy( db_l7policy) store = {constants.L7RULE: l7rule, constants.L7POLICY: l7policy_dict.to_dict(), constants.L7POLICY_ID: db_l7policy.id, constants.LISTENERS: listeners_dicts, constants.LOADBALANCER_ID: load_balancer.id } self.run_flow( flow_utils.get_create_l7rule_flow, store=store) def delete_l7rule(self, l7rule): """Deletes an L7 rule. :param l7rule: Provider dict of the l7rule to delete :returns: None :raises L7RuleNotFound: The referenced l7rule was not found """ db_l7policy = self._l7policy_repo.get(db_apis.get_session(), id=l7rule[constants.L7POLICY_ID]) l7policy = provider_utils.db_l7policy_to_provider_l7policy(db_l7policy) load_balancer = db_l7policy.listener.load_balancer listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_l7policy.listener])) store = {constants.L7RULE: l7rule, constants.L7POLICY: l7policy.to_dict(), constants.LISTENERS: listeners_dicts, constants.L7POLICY_ID: db_l7policy.id, constants.LOADBALANCER_ID: load_balancer.id } self.run_flow( flow_utils.get_delete_l7rule_flow, store=store) def update_l7rule(self, original_l7rule, l7rule_updates): """Updates an L7 rule. :param l7rule: Origin dict of the l7rule to update :param l7rule_updates: Dict containing updated l7rule attributes :returns: None :raises L7RuleNotFound: The referenced l7rule was not found """ db_l7policy = self._l7policy_repo.get( db_apis.get_session(), id=original_l7rule[constants.L7POLICY_ID]) load_balancer = db_l7policy.listener.load_balancer listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_l7policy.listener])) l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy( db_l7policy) store = {constants.L7RULE: original_l7rule, constants.L7POLICY: l7policy_dict.to_dict(), constants.LISTENERS: listeners_dicts, constants.L7POLICY_ID: db_l7policy.id, constants.LOADBALANCER_ID: load_balancer.id, constants.UPDATE_DICT: l7rule_updates} self.run_flow( flow_utils.get_update_l7rule_flow, store=store) def failover_amphora(self, amphora_id, reraise=False): """Perform failover operations for an amphora. Note: This expects the load balancer to already be in provisioning_status=PENDING_UPDATE state. :param amphora_id: ID for amphora to failover :param reraise: If enabled reraise any caught exception :returns: None :raises octavia.common.exceptions.NotFound: The referenced amphora was not found """ amphora = None try: amphora = self._amphora_repo.get(db_apis.get_session(), id=amphora_id) if amphora is None: LOG.error('Amphora failover for amphora %s failed because ' 'there is no record of this amphora in the ' 'database. Check that the [house_keeping] ' 'amphora_expiry_age configuration setting is not ' 'too short. Skipping failover.', amphora_id) raise exceptions.NotFound(resource=constants.AMPHORA, id=amphora_id) if amphora.status == constants.DELETED: LOG.warning('Amphora %s is marked DELETED in the database but ' 'was submitted for failover. Deleting it from the ' 'amphora health table to exclude it from health ' 'checks and skipping the failover.', amphora.id) self._amphora_health_repo.delete(db_apis.get_session(), amphora_id=amphora.id) return loadbalancer = None if amphora.load_balancer_id: loadbalancer = self._lb_repo.get(db_apis.get_session(), id=amphora.load_balancer_id) lb_amp_count = None if loadbalancer: if loadbalancer.topology == constants.TOPOLOGY_ACTIVE_STANDBY: lb_amp_count = 2 elif loadbalancer.topology == constants.TOPOLOGY_SINGLE: lb_amp_count = 1 az_metadata = {} flavor_dict = {} lb_id = None vip_dict = {} server_group_id = None if loadbalancer: lb_id = loadbalancer.id # Even if the LB doesn't have a flavor, create one and # pass through the topology. if loadbalancer.flavor_id: flavor_dict = self._flavor_repo.get_flavor_metadata_dict( db_apis.get_session(), loadbalancer.flavor_id) flavor_dict[constants.LOADBALANCER_TOPOLOGY] = ( loadbalancer.topology) else: flavor_dict = {constants.LOADBALANCER_TOPOLOGY: loadbalancer.topology} if loadbalancer.availability_zone: az_metadata = ( self._az_repo.get_availability_zone_metadata_dict( db_apis.get_session(), loadbalancer.availability_zone)) vip_dict = loadbalancer.vip.to_dict() server_group_id = loadbalancer.server_group_id provider_lb_dict = (provider_utils. db_loadbalancer_to_provider_loadbalancer)( loadbalancer).to_dict() if loadbalancer else loadbalancer stored_params = {constants.AVAILABILITY_ZONE: az_metadata, constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_FAILOVER_PRIORITY, constants.FLAVOR: flavor_dict, constants.LOADBALANCER: provider_lb_dict, constants.SERVER_GROUP_ID: server_group_id, constants.LOADBALANCER_ID: lb_id, constants.VIP: vip_dict} self.run_flow( flow_utils.get_failover_amphora_flow, amphora.to_dict(), lb_amp_count, store=stored_params, wait=True) LOG.info("Successfully completed the failover for an amphora: %s", {"id": amphora_id, "load_balancer_id": lb_id, "lb_network_ip": amphora.lb_network_ip, "compute_id": amphora.compute_id, "role": amphora.role}) except Exception as e: with excutils.save_and_reraise_exception(reraise=reraise): LOG.exception("Amphora %s failover exception: %s", amphora_id, str(e)) self._amphora_repo.update(db_apis.get_session(), amphora_id, status=constants.ERROR) if amphora and amphora.load_balancer_id: self._lb_repo.update( db_apis.get_session(), amphora.load_balancer_id, provisioning_status=constants.ERROR) @staticmethod def _get_amphorae_for_failover(load_balancer): """Returns an ordered list of amphora to failover. :param load_balancer: The load balancer being failed over. :returns: An ordered list of amphora to failover, first amp to failover is last in the list :raises octavia.common.exceptions.InvalidTopology: LB has an unknown topology. """ if load_balancer.topology == constants.TOPOLOGY_SINGLE: # In SINGLE topology, amp failover order does not matter return [a.to_dict() for a in load_balancer.amphorae if a.status != constants.DELETED] if load_balancer.topology == constants.TOPOLOGY_ACTIVE_STANDBY: # In Active/Standby we should preference the standby amp # for failover first in case the Active is still able to pass # traffic. # Note: The active amp can switch at any time and in less than a # second, so this is "best effort". amphora_driver = utils.get_amphora_driver() timeout_dict = { constants.CONN_MAX_RETRIES: CONF.haproxy_amphora.failover_connection_max_retries, constants.CONN_RETRY_INTERVAL: CONF.haproxy_amphora.failover_connection_retry_interval} amps = [] selected_amp = None for amp in load_balancer.amphorae: if amp.status == constants.DELETED: continue if selected_amp is None: try: if amphora_driver.get_interface_from_ip( amp, load_balancer.vip.ip_address, timeout_dict): # This is a potential ACTIVE, add it to the list amps.append(amp.to_dict()) else: # This one doesn't have the VIP IP, so start # failovers here. selected_amp = amp LOG.debug("Selected amphora %s as the initial " "failover amphora.", amp.id) except Exception: # This amphora is broken, so start failovers here. selected_amp = amp else: # We have already found a STANDBY, so add the rest to the # list without querying them. amps.append(amp.to_dict()) # Put the selected amphora at the end of the list so it is # first to failover. if selected_amp: amps.append(selected_amp.to_dict()) return amps LOG.error('Unknown load balancer topology found: %s, aborting ' 'failover.', load_balancer.topology) raise exceptions.InvalidTopology(topology=load_balancer.topology) def failover_loadbalancer(self, load_balancer_id): """Perform failover operations for a load balancer. Note: This expects the load balancer to already be in provisioning_status=PENDING_UPDATE state. :param load_balancer_id: ID for load balancer to failover :returns: None :raises octavia.commom.exceptions.NotFound: The load balancer was not found. """ try: lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id) if lb is None: raise exceptions.NotFound(resource=constants.LOADBALANCER, id=load_balancer_id) # Get the ordered list of amphorae to failover for this LB. amps = self._get_amphorae_for_failover(lb) if lb.topology == constants.TOPOLOGY_SINGLE: if len(amps) != 1: LOG.warning('%d amphorae found on load balancer %s where ' 'one should exist. Repairing.', len(amps), load_balancer_id) elif lb.topology == constants.TOPOLOGY_ACTIVE_STANDBY: if len(amps) != 2: LOG.warning('%d amphorae found on load balancer %s where ' 'two should exist. Repairing.', len(amps), load_balancer_id) else: LOG.error('Unknown load balancer topology found: %s, aborting ' 'failover!', lb.topology) raise exceptions.InvalidTopology(topology=lb.topology) # We must provide a topology in the flavor definition # here for the amphora to be created with the correct # configuration. if lb.flavor_id: flavor = self._flavor_repo.get_flavor_metadata_dict( db_apis.get_session(), lb.flavor_id) flavor[constants.LOADBALANCER_TOPOLOGY] = lb.topology else: flavor = {constants.LOADBALANCER_TOPOLOGY: lb.topology} provider_lb_dict = ( provider_utils.db_loadbalancer_to_provider_loadbalancer( lb).to_dict() if lb else lb) provider_lb_dict[constants.FLAVOR] = flavor stored_params = {constants.LOADBALANCER: provider_lb_dict, constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_FAILOVER_PRIORITY, constants.SERVER_GROUP_ID: lb.server_group_id, constants.LOADBALANCER_ID: lb.id, constants.FLAVOR: flavor} if lb.availability_zone: stored_params[constants.AVAILABILITY_ZONE] = ( self._az_repo.get_availability_zone_metadata_dict( db_apis.get_session(), lb.availability_zone)) else: stored_params[constants.AVAILABILITY_ZONE] = {} self.run_flow( flow_utils.get_failover_LB_flow, amps, provider_lb_dict, store=stored_params, wait=True) LOG.info('Failover of load balancer %s completed successfully.', lb.id) except Exception as e: with excutils.save_and_reraise_exception(reraise=False): LOG.exception("LB %(lbid)s failover exception: %(exc)s", {'lbid': load_balancer_id, 'exc': str(e)}) self._lb_repo.update( db_apis.get_session(), load_balancer_id, provisioning_status=constants.ERROR) def amphora_cert_rotation(self, amphora_id): """Perform cert rotation for an amphora. :param amphora_id: ID for amphora to rotate :returns: None :raises AmphoraNotFound: The referenced amphora was not found """ amp = self._amphora_repo.get(db_apis.get_session(), id=amphora_id) LOG.info("Start amphora cert rotation, amphora's id is: %s", amphora_id) store = {constants.AMPHORA: amp.to_dict(), constants.AMPHORA_ID: amphora_id} self.run_flow( flow_utils.cert_rotate_amphora_flow, store=store) LOG.info("Finished amphora cert rotation, amphora's id was: %s", amphora_id) def update_amphora_agent_config(self, amphora_id): """Update the amphora agent configuration. Note: This will update the amphora agent configuration file and update the running configuration for mutatable configuration items. :param amphora_id: ID of the amphora to update. :returns: None """ LOG.info("Start amphora agent configuration update, amphora's id " "is: %s", amphora_id) amp = self._amphora_repo.get(db_apis.get_session(), id=amphora_id) lb = self._amphora_repo.get_lb_for_amphora(db_apis.get_session(), amphora_id) flavor = {} if lb.flavor_id: flavor = self._flavor_repo.get_flavor_metadata_dict( db_apis.get_session(), lb.flavor_id) store = {constants.AMPHORA: amp.to_dict(), constants.FLAVOR: flavor} self.run_flow( flow_utils.update_amphora_config_flow, store=store) LOG.info("Finished amphora agent configuration update, amphora's id " "was: %s", amphora_id)