@ -13,26 +13,18 @@
# under the License.
#
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from sqlalchemy . orm import exc as db_exceptions
from taskflow . listeners import logging as tf_logging
from stevedore import driver as stevedore_driver
import tenacity
from octavia . amphorae . driver_exceptions import exceptions
from octavia . api . drivers import utils as provider_utils
from octavia . common import base_taskflow
from octavia . common import constants
from octavia . controller . worker . v2 . flows import amphora_flows
from octavia . controller . worker . v2 . flows import health_monitor_flows
from octavia . controller . worker . v2 . flows import l7policy_flows
from octavia . controller . worker . v2 . flows import l7rule_flows
from octavia . controller . worker . v2 . flows import listener_flows
from octavia . controller . worker . v2 . flows import load_balancer_flows
from octavia . controller . worker . v2 . flows import member_flows
from octavia . controller . worker . v2 . flows import pool_flows
from octavia . controller . worker . v2 . flows import flow_utils
from octavia . controller . worker . v2 import taskflow_jobboard_driver as tsk_driver
from octavia . db import api as db_apis
from octavia . db import repositories as repo
@ -45,35 +37,14 @@ RETRY_BACKOFF = 1
RETRY_MAX = 5
# We do not need to log retry exception information. Warning "Could not connect
# to instance" will be logged as usual.
def retryMaskFilter ( record ) :
if record . exc_info is not None and isinstance (
record . exc_info [ 1 ] , exceptions . AmpConnectionRetry ) :
return False
return True
LOG . logger . addFilter ( retryMaskFilter )
def _is_provisioning_status_pending_update ( lb_obj ) :
return not lb_obj . provisioning_status == constants . PENDING_UPDATE
class ControllerWorker ( base_taskflow . BaseTaskFlowEngine ) :
class ControllerWorker ( object ) :
def __init__ ( self ) :
self . _amphora_flows = amphora_flows . AmphoraFlows ( )
self . _health_monitor_flows = health_monitor_flows . HealthMonitorFlows ( )
self . _lb_flows = load_balancer_flows . LoadBalancerFlows ( )
self . _listener_flows = listener_flows . ListenerFlows ( )
self . _member_flows = member_flows . MemberFlows ( )
self . _pool_flows = pool_flows . PoolFlows ( )
self . _l7policy_flows = l7policy_flows . L7PolicyFlows ( )
self . _l7rule_flows = l7rule_flows . L7RuleFlows ( )
self . _amphora_repo = repo . AmphoraRepository ( )
self . _amphora_health_repo = repo . AmphoraHealthRepository ( )
self . _health_mon_repo = repo . HealthMonitorRepository ( )
@ -86,7 +57,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
self . _flavor_repo = repo . FlavorRepository ( )
self . _az_repo = repo . AvailabilityZoneRepository ( )
super ( ControllerWorker , self ) . __init__ ( )
persistence = tsk_driver . MysqlPersistenceDriver ( )
self . jobboard_driver = stevedore_driver . DriverManager (
namespace = ' octavia.worker.jobboard_driver ' ,
name = CONF . task_flow . jobboard_backend_driver ,
invoke_args = ( persistence , ) ,
invoke_on_load = True ) . driver
@tenacity . retry (
retry = (
@ -99,12 +76,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
return repo . get ( db_apis . get_session ( ) , id = id )
@property
def services_controller ( self ) :
return base_taskflow . TaskFlowServiceController ( self . jobboard_driver )
def create_amphora ( self , availability_zone = None ) :
""" Creates an Amphora.
This is used to create spare amphora .
: returns : amphora_ id
: returns : uu id
"""
try :
store = { constants . BUILD_TYPE_PRIORITY :
@ -115,13 +96,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
store [ constants . AVAILABILITY_ZONE ] = (
self . _az_repo . get_availability_zone_metadata_dict (
db_apis . get_session ( ) , availability_zone ) )
create_amp_tf = self . _taskflow_load (
self . _amphora_flows . get_create_amphora_flow ( ) ,
store = store )
with tf_logging . DynamicLoggingListener ( create_amp_tf , log = LOG ) :
create_amp_tf . run ( )
job_id = self . services_controller . run_poster (
flow_utils . get_create_amphora_flow ,
store = store , wait = True )
return create_amp_tf . storage . fetch ( ' amphora ' )
return job_id
except Exception as e :
LOG . error ( ' Failed to create an amphora due to: {} ' . format ( str ( e ) ) )
@ -134,13 +113,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
"""
amphora = self . _amphora_repo . get ( db_apis . get_session ( ) ,
id = amphora_id )
delete_amp_tf = self . _taskflow_load (
self . _amphora_flows . get_delete_amphora_flow ( ) ,
store = { constants . AMPHORA : amphora . to_dict ( ) } )
with tf_logging . DynamicLoggingListener ( delete_amp_tf ,
log = LOG ) :
delete_amp_tf . run ( )
store = { constants . AMPHORA : amphora . to_dict ( ) }
self . services_controller . run_poster (
flow_utils . get_delete_amphora_flow ,
store = store )
@tenacity . retry (
retry = tenacity . retry_if_exception_type ( db_exceptions . NoResultFound ) ,
wait = tenacity . wait_incrementing (
RETRY_INITIAL_DELAY , RETRY_BACKOFF , RETRY_MAX ) ,
stop = tenacity . stop_after_attempt ( RETRY_ATTEMPTS ) )
def create_health_monitor ( self , health_monitor ) :
""" Creates a health monitor.
@ -162,16 +144,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils . db_listeners_to_provider_dicts_list_of_dicts (
pool . listeners ) )
create_hm_tf = self . _taskflow_load (
self . _health_monitor_flows . get_create_health_monitor_flow ( ) ,
store = { constants . HEALTH_MON : health_monitor ,
constants . POOL_ID : pool . id ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . LOADBALANCER : provider_lb } )
with tf_logging . DynamicLoggingListener ( create_hm_tf ,
log = LOG ) :
create_hm_tf . run ( )
store = { constants . HEALTH_MON : health_monitor ,
constants . POOL_ID : pool . id ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . LOADBALANCER : provider_lb }
self . services_controller . run_poster (
flow_utils . get_create_health_monitor_flow ,
store = store )
def delete_health_monitor ( self , health_monitor ) :
""" Deletes a health monitor.
@ -193,17 +173,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils . db_listeners_to_provider_dicts_list_of_dicts (
pool . listeners ) )
delete_hm_tf = self . _taskflow_load (
self . _health_monitor_flows . get_delete_health_monitor_flow ( ) ,
store = { constants . HEALTH_MON : health_monitor ,
constants . POOL_ID : pool . id ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . LOADBALANCER : provider_lb ,
constants . PROJECT_ID : load_balancer . project_id } )
with tf_logging . DynamicLoggingListener ( delete_hm_tf ,
log = LOG ) :
delete_hm_tf . run ( )
store = { constants . HEALTH_MON : health_monitor ,
constants . POOL_ID : pool . id ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . LOADBALANCER : provider_lb ,
constants . PROJECT_ID : load_balancer . project_id }
self . services_controller . run_poster (
flow_utils . get_delete_health_monitor_flow ,
store = store )
def update_health_monitor ( self , original_health_monitor ,
health_monitor_updates ) :
@ -236,18 +214,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_lb = provider_utils . db_loadbalancer_to_provider_loadbalancer (
load_balancer ) . to_dict ( )
update_hm_tf = self . _taskflow_load (
self . _health_monitor_flows . get_update_health_monitor_flow ( ) ,
store = { constants . HEALTH_MON : original_health_monitor ,
constants . POOL_ID : pool . id ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . LOADBALANCER : provider_lb ,
constants . UPDATE_DICT : health_monitor_updates } )
with tf_logging . DynamicLoggingListener ( update_hm_tf ,
log = LOG ) :
update_hm_tf . run ( )
store = { constants . HEALTH_MON : original_health_monitor ,
constants . POOL_ID : pool . id ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . LOADBALANCER : provider_lb ,
constants . UPDATE_DICT : health_monitor_updates }
self . services_controller . run_poster (
flow_utils . get_update_health_monitor_flow ,
store = store )
@tenacity . retry (
retry = tenacity . retry_if_exception_type ( db_exceptions . NoResultFound ) ,
wait = tenacity . wait_incrementing (
RETRY_INITIAL_DELAY , RETRY_BACKOFF , RETRY_MAX ) ,
stop = tenacity . stop_after_attempt ( RETRY_ATTEMPTS ) )
def create_listener ( self , listener ) :
""" Creates a listener.
@ -272,14 +253,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_lb = provider_utils . db_loadbalancer_to_provider_loadbalancer (
load_balancer ) . to_dict ( )
create_listener_tf = self . _taskflow_load (
self . _listener_flows . get_create_listener_flow ( ) ,
store = { constants . LISTENERS : dict_listeners ,
constants . LOADBALANCER : provider_lb ,
constants . LOADBALANCER_ID : load_balancer . id } )
with tf_logging . DynamicLoggingListener ( create_listener_tf ,
log = LOG ) :
create_listener_tf . run ( )
store = { constants . LISTENERS : dict_listeners ,
constants . LOADBALANCER : provider_lb ,
constants . LOADBALANCER_ID : load_balancer . id }
self . services_controller . run_poster (
flow_utils . get_create_listener_flow ,
store = store )
def delete_listener ( self , listener ) :
""" Deletes a listener.
@ -292,15 +272,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
# the project ID
lb = self . _lb_repo . get ( db_apis . get_session ( ) ,
id = listener [ constants . LOADBALANCER_ID ] )
delete_listener_tf = self . _taskflow_load (
self . _listener_flows . get_delete_listener_flow ( ) ,
store = { constants . LISTENER : listener ,
constants . LOADBALANCER_ID :
listener [ constants . LOADBALANCER_ID ] ,
constants . PROJECT_ID : lb . project_id } )
with tf_logging . DynamicLoggingListener ( delete_listener_tf ,
log = LOG ) :
delete_listener_tf . run ( )
store = { constants . LISTENER : listener ,
constants . LOADBALANCER_ID :
listener [ constants . LOADBALANCER_ID ] ,
constants . PROJECT_ID : lb . project_id }
self . services_controller . run_poster (
flow_utils . get_delete_listener_flow ,
store = store )
def update_listener ( self , listener , listener_updates ) :
""" Updates a listener.
@ -312,14 +290,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
"""
db_lb = self . _lb_repo . get ( db_apis . get_session ( ) ,
id = listener [ constants . LOADBALANCER_ID ] )
update_listener_tf = self . _taskflow_load (
self . _listener_flows . get_update_listener_flow ( ) ,
store = { constants . LISTENER : listener ,
constants . UPDATE_DICT : listener_updates ,
constants . LOADBALANCER_ID : db_lb . id ,
constants . LISTENERS : [ listener ] } )
with tf_logging . DynamicLoggingListener ( update_listener_tf , log = LOG ) :
update_listener_tf . run ( )
store = { constants . LISTENER : listener ,
constants . UPDATE_DICT : listener_updates ,
constants . LOADBALANCER_ID : db_lb . id ,
constants . LISTENERS : [ listener ] }
self . services_controller . run_poster (
flow_utils . get_update_listener_flow ,
store = store )
@tenacity . retry (
retry = tenacity . retry_if_exception_type ( db_exceptions . NoResultFound ) ,
@ -364,13 +341,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
store [ constants . UPDATE_DICT ] = {
constants . TOPOLOGY : topology
}
create_lb_flow = self . _lb_flows . get_create_load_balancer_flow (
topology = topology , listeners = listeners_dicts )
create_lb_tf = self . _taskflow_load ( create_lb_flow , store = store )
with tf_logging . DynamicLoggingListener ( create_lb_tf , log = LOG ) :
create_lb_tf . run ( )
self . services_controller . run_poster (
flow_utils . get_create_load_balancer_flow ,
topology , listeners = listeners_dicts ,
store = store )
def delete_load_balancer ( self , load_balancer , cascade = False ) :
""" Deletes a load balancer by de-allocating Amphorae.
@ -381,25 +355,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
"""
db_lb = self . _lb_repo . get ( db_apis . get_session ( ) ,
id = load_balancer [ constants . LOADBALANCER_ID ] )
store = { }
store = { constants . LOADBALANCER : load_balancer ,
constants . SERVER_GROUP_ID : db_lb . server_group_id ,
constants . PROJECT_ID : db_lb . project_id }
if cascade :
flow = self . _lb_flows . get_cascade_delete_load_balancer_flow (
load_balancer )
store . update ( self . _lb_flows . get_delete_pools_store ( db_lb ) )
store . update ( self . _lb_flows . get_delete_listeners_store ( db_lb ) )
store . update ( flow_utils . get_delete_pools_store ( db_lb ) )
store . update ( flow_utils . get_delete_listeners_store ( db_lb ) )
self . services_controller . run_poster (
flow_utils . get_cascade_delete_load_balancer_flow ,
load_balancer , store = store )
else :
flow = self . _lb_flows . get_delete_load_balancer_flow (
load_balancer )
store . update ( { constants . LOADBALANCER : load_balancer ,
constants . SERVER_GROUP_ID : db_lb . server_group_id ,
constants . PROJECT_ID : db_lb . project_id } )
delete_lb_tf = self . _taskflow_load ( flow , store = store )
with tf_logging . DynamicLoggingListener ( delete_lb_tf ,
log = LOG ) :
delete_lb_tf . run ( )
self . services_controller . run_poster (
flow_utils . get_delete_load_balancer_flow ,
load_balancer , store = store )
def update_load_balancer ( self , original_load_balancer ,
load_balancer_updates ) :
@ -410,17 +378,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
: returns : None
: raises LBNotFound : The referenced load balancer was not found
"""
store = { constants . LOADBALANCER : original_load_balancer ,
constants . LOADBALANCER_ID :
original_load_balancer [ constants . LOADBALANCER_ID ] ,
constants . UPDATE_DICT : load_balancer_updates }
update_lb_tf = self . _taskflow_load (
self . _lb_flows . get_update_load_balancer_flow ( ) ,
store = { constants . LOADBALANCER : original_load_balancer ,
constants . LOADBALANCER_ID :
original_load_balancer [ constants . LOADBALANCER_ID ] ,
constants . UPDATE_DICT : load_balancer_updates } )
with tf_logging . DynamicLoggingListener ( update_lb_tf ,
log = LOG ) :
update_lb_tf . run ( )
self . services_controller . run_poster (
flow_utils . get_update_load_balancer_flow ,
store = store )
def create_member ( self , member ) :
""" Creates a pool member.
@ -452,12 +417,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else :
store [ constants . AVAILABILITY_ZONE ] = { }
create_member_tf = self . _taskflow_load (
self . _member_flow s. get_create_member_flow ( ) ,
self . services_controller . run_poster (
flow_util s. get_create_member_flow ,
store = store )
with tf_logging . DynamicLoggingListener ( create_member_tf ,
log = LOG ) :
create_member_tf . run ( )
def delete_member ( self , member ) :
""" Deletes a pool member.
@ -491,13 +453,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else :
store [ constants . AVAILABILITY_ZONE ] = { }
delete_member_tf = self . _taskflow_load (
self . _member_flows . get_delete_member_flow ( ) ,
store = store
)
with tf_logging . DynamicLoggingListener ( delete_member_tf ,
log = LOG ) :
delete_member_tf . run ( )
self . services_controller . run_poster (
flow_utils . get_delete_member_flow ,
store = store )
def batch_update_members ( self , old_members , new_members ,
updated_members ) :
@ -543,13 +501,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else :
store [ constants . AVAILABILITY_ZONE ] = { }
batch_update_members_tf = self . _taskflow_load (
self . _member_flow s. get_batch_update_members_flow (
provider_old_members , new_members , updated_members ) ,
self . services_controller . run_poster (
flow_util s. get_batch_update_members_flow ,
provider_old_members , new_members , updated_members ,
store = store )
with tf_logging . DynamicLoggingListener ( batch_update_members_tf ,
log = LOG ) :
batch_update_members_tf . run ( )
def update_member ( self , member , member_updates ) :
""" Updates a pool member.
@ -584,13 +539,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else :
store [ constants . AVAILABILITY_ZONE ] = { }
update_member_tf = self . _taskflow_load (
self . _member_flow s. get_update_member_flow ( ) ,
self . services_controller . run_poster (
flow_util s. get_update_member_flow ,
store = store )
with tf_logging . DynamicLoggingListener ( update_member_tf ,
log = LOG ) :
update_member_tf . run ( )
@tenacity . retry (
retry = tenacity . retry_if_exception_type ( db_exceptions . NoResultFound ) ,
wait = tenacity . wait_incrementing (
RETRY_INITIAL_DELAY , RETRY_BACKOFF , RETRY_MAX ) ,
stop = tenacity . stop_after_attempt ( RETRY_ATTEMPTS ) )
def create_pool ( self , pool ) :
""" Creates a node pool.
@ -616,15 +573,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils . db_listeners_to_provider_dicts_list_of_dicts (
db_pool . listeners ) )
create_pool_tf = self . _taskflow_load (
self . _pool_flows . get_create_pool_flow ( ) ,
store = { constants . POOL_ID : pool [ constants . POOL_ID ] ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . LOADBALANCER : provider_lb } )
with tf_logging . DynamicLoggingListener ( create_pool_tf ,
log = LOG ) :
create_pool_tf . run ( )
store = { constants . POOL_ID : pool [ constants . POOL_ID ] ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . LOADBALANCER : provider_lb }
self . services_controller . run_poster (
flow_utils . get_create_pool_flow ,
store = store )
def delete_pool ( self , pool ) :
""" Deletes a node pool.
@ -644,16 +599,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_lb = provider_utils . db_loadbalancer_to_provider_loadbalancer (
load_balancer ) . to_dict ( )
delete_pool_tf = self . _taskflow_load (
self . _pool_flows . get_delete_pool_flow ( ) ,
store = { constants . POOL_ID : pool [ constants . POOL_ID ] ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER : provider_lb ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . PROJECT_ID : db_pool . project_id } )
with tf_logging . DynamicLoggingListener ( delete_pool_tf ,
log = LOG ) :
delete_pool_tf . run ( )
store = { constants . POOL_ID : pool [ constants . POOL_ID ] ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER : provider_lb ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . PROJECT_ID : db_pool . project_id }
self . services_controller . run_poster (
flow_utils . get_delete_pool_flow ,
store = store )
def update_pool ( self , origin_pool , pool_updates ) :
""" Updates a node pool.
@ -682,16 +635,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils . db_listeners_to_provider_dicts_list_of_dicts (
db_pool . listeners ) )
update_pool_tf = self . _taskflow_load (
self . _pool_flows . get_update_pool_flow ( ) ,
store = { constants . POOL_ID : db_pool . id ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER : provider_lb ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . UPDATE_DICT : pool_updates } )
with tf_logging . DynamicLoggingListener ( update_pool_tf ,
log = LOG ) :
update_pool_tf . run ( )
store = { constants . POOL_ID : db_pool . id ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER : provider_lb ,
constants . LOADBALANCER_ID : load_balancer . id ,
constants . UPDATE_DICT : pool_updates }
self . services_controller . run_poster (
flow_utils . get_update_pool_flow ,
store = store )
def create_l7policy ( self , l7policy ) :
""" Creates an L7 Policy.
@ -707,15 +658,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils . db_listeners_to_provider_dicts_list_of_dicts (
[ db_listener ] ) )
create_l7policy_tf = self . _taskflow_load (
self . _l7policy_flows . get_create_l7policy_flow ( ) ,
store = { constants . L7POLICY : l7policy ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : db_listener . load_balancer . id
} )
with tf_logging . DynamicLoggingListener ( create_l7policy_tf ,
log = LOG ) :
create_l7policy_tf . run ( )
store = { constants . L7POLICY : l7policy ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : db_listener . load_balancer . id
}
self . services_controller . run_poster (
flow_utils . get_create_l7policy_flow ,
store = store )
def delete_l7policy ( self , l7policy ) :
""" Deletes an L7 policy.
@ -730,15 +679,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils . db_listeners_to_provider_dicts_list_of_dicts (
[ db_listener ] ) )
delete_l7policy_tf = self . _taskflow_load (
self . _l7policy_flows . get_delete_l7policy_flow ( ) ,
store = { constants . L7POLICY : l7policy ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : db_listener . load_balancer . id
} )
with tf_logging . DynamicLoggingListener ( delete_l7policy_tf ,
log = LOG ) :
delete_l7policy_tf . run ( )
store = { constants . L7POLICY : l7policy ,
constants . LISTENERS : listeners_dicts ,
constants . LOADBALANCER_ID : db_listener . load_balancer . id
}
self . services_controller . run_poster (
flow_utils . get_delete_l7policy_flow ,
store = store )
def update_l7policy ( self , original_l7policy , l7policy_updates ) :
""" Updates an L7 policy.
@ -755,15 +702,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils . db_listeners_to_provider_dicts_list_of_dicts (
[ db_listener ] ) )
update_l7policy_tf = self . _taskflow_load (
self . _l7policy_flows . get_update_l7policy_flow ( ) ,
store = { constants . L7POLICY : original_l7policy ,