@ -53,13 +53,23 @@ AGENT_SCHEDULER_OPTS = [
cfg . CONF . register_opts ( AGENT_SCHEDULER_OPTS )
# topic name for this particular agent implementation
TOPIC_PROCESS_ON_HOST = ' q-lbaas-process-on-host '
TOPIC_LOADBALANCER_AGENT = ' lbaas_process_on_host_agent '
TOPIC_LOADBALANCER_PLUGIN = ' n-lbaas-plugin '
TOPIC_LOADBALANCER_AGENT = ' n-lbaas_agent '
class DriverNotSpecified ( q_exc . NeutronException ) :
message = _ ( " Device driver for agent should be specified "
" in plugin driver. " )
class LoadBalancerCallbacks ( object ) :
RPC_API_VERSION = ' 1.0 '
RPC_API_VERSION = ' 2.0 '
# history
# 1.0 Initial version
# 2.0 Generic API for agent based drivers
# - get_logical_device() handling changed;
# - pool_deployed() and update_status() methods added;
def __init__ ( self , plugin ) :
self . plugin = plugin
@ -70,67 +80,47 @@ class LoadBalancerCallbacks(object):
def get_ready_devices ( self , context , host = None ) :
with context . session . begin ( subtransactions = True ) :
qry = ( context . session . query ( loadbalancer_db . Pool . id ) .
join ( loadbalancer_db . Vip ) )
qry = qry . filter ( loadbalancer_db . Vip . status . in_ ( ACTIVE_PENDING ) )
qry = qry . filter ( loadbalancer_db . Pool . status . in_ ( ACTIVE_PENDING ) )
up = True # makes pep8 and sqlalchemy happy
qry = qry . filter ( loadbalancer_db . Vip . admin_state_up == up )
qry = qry . filter ( loadbalancer_db . Pool . admin_state_up == up )
agents = self . plugin . get_lbaas_agents ( context ,
filters = { ' host ' : [ host ] } )
if not agents :
return [ ]
elif len ( agents ) > 1 :
LOG . warning ( _ ( ' Multiple lbaas agents found on host %s ' ) , host )
pools = self . plugin . list_pools_on_lbaas_agent ( context ,
agents [ 0 ] . id )
pool_ids = [ pool [ ' id ' ] for pool in pools [ ' pools ' ] ]
qry = context . session . query ( loadbalancer_db . Pool . id )
qry = qry . filter ( loadbalancer_db . Pool . id . in_ ( pool_ids ) )
qry = qry . filter ( loadbalancer_db . Pool . status . in_ ( ACTIVE_PENDING ) )
up = True # makes pep8 and sqlalchemy happy
qry = qry . filter ( loadbalancer_db . Pool . admin_state_up == up )
return [ id for id , in qry ]
def get_logical_device ( self , context , pool_id = None , activate = True ,
* * kwargs ) :
def get_logical_device ( self , context , pool_id = None ) :
with context . session . begin ( subtransactions = True ) :
qry = context . session . query ( loadbalancer_db . Pool )
qry = qry . filter_by ( id = pool_id )
pool = qry . one ( )
if activate :
# set all resources to active
if pool . status in ACTIVE_PENDING :
pool . status = constants . ACTIVE
if pool . vip . status in ACTIVE_PENDING :
pool . vip . status = constants . ACTIVE
for m in pool . members :
if m . status in ACTIVE_PENDING :
m . status = constants . ACTIVE
for hm in pool . monitors :
if hm . status in ACTIVE_PENDING :
hm . status = constants . ACTIVE
if ( pool . status != constants . ACTIVE
or pool . vip . status != constants . ACTIVE ) :
raise q_exc . Invalid ( _ ( ' Expected active pool and vip ' ) )
if pool . status != constants . ACTIVE :
raise q_exc . Invalid ( _ ( ' Expected active pool ' ) )
retval = { }
retval [ ' pool ' ] = self . plugin . _make_pool_dict ( pool )
retval [ ' vip ' ] = self . plugin . _make_vip_dict ( pool . vip )
retval [ ' vip ' ] [ ' port ' ] = (
self . plugin . _core_plugin . _make_port_dict ( pool . vip . port )
)
for fixed_ip in retval [ ' vip ' ] [ ' port ' ] [ ' fixed_ips ' ] :
fixed_ip [ ' subnet ' ] = (
self . plugin . _core_plugin . get_subnet (
context ,
fixed_ip [ ' subnet_id ' ]
)
if pool . vip :
retval [ ' vip ' ] = self . plugin . _make_vip_dict ( pool . vip )
retval [ ' vip ' ] [ ' port ' ] = (
self . plugin . _core_plugin . _make_port_dict ( pool . vip . port )
)
for fixed_ip in retval [ ' vip ' ] [ ' port ' ] [ ' fixed_ips ' ] :
fixed_ip [ ' subnet ' ] = (
self . plugin . _core_plugin . get_subnet (
context ,
fixed_ip [ ' subnet_id ' ]
)
)
retval [ ' members ' ] = [
self . plugin . _make_member_dict ( m )
for m in pool . members if m . status in ( constants . ACTIVE ,
@ -141,10 +131,49 @@ class LoadBalancerCallbacks(object):
for hm in pool . monitors
if hm . status == constants . ACTIVE
]
retval [ ' driver ' ] = (
self . plugin . drivers [ pool . provider . provider_name ] . device_driver )
return retval
def pool_destroyed ( self , context , pool_id = None , host = None ) :
def pool_deployed ( self , context , pool_id ) :
with context . session . begin ( subtransactions = True ) :
qry = context . session . query ( loadbalancer_db . Pool )
qry = qry . filter_by ( id = pool_id )
pool = qry . one ( )
# set all resources to active
if pool . status in ACTIVE_PENDING :
pool . status = constants . ACTIVE
if pool . vip and pool . vip . status in ACTIVE_PENDING :
pool . vip . status = constants . ACTIVE
for m in pool . members :
if m . status in ACTIVE_PENDING :
m . status = constants . ACTIVE
for hm in pool . monitors :
if hm . status in ACTIVE_PENDING :
hm . status = constants . ACTIVE
def update_status ( self , context , obj_type , obj_id , status ) :
model_mapping = {
' pool ' : loadbalancer_db . Pool ,
' vip ' : loadbalancer_db . Vip ,
' member ' : loadbalancer_db . Member ,
' health_monitor ' : loadbalancer_db . PoolMonitorAssociation
}
if obj_type not in model_mapping :
raise q_exc . Invalid ( _ ( ' Unknown object type: %s ' ) % obj_type )
elif obj_type == ' health_monitor ' :
self . plugin . update_pool_health_monitor (
context , obj_id [ ' monitor_id ' ] , obj_id [ ' pool_id ' ] , status )
else :
self . plugin . update_status (
context , model_mapping [ obj_type ] , obj_id , status )
def pool_destroyed ( self , context , pool_id = None ) :
""" Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion
@ -214,65 +243,116 @@ class LoadBalancerCallbacks(object):
class LoadBalancerAgentApi ( proxy . RpcProxy ) :
""" Plugin side of plugin to agent RPC API. """
BASE_RPC_API_VERSION = ' 1 .0'
BASE_RPC_API_VERSION = ' 2 .0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
# 2.0 Generic API for agent based drivers
# - modify/reload/destroy_pool methods were removed;
# - added methods to handle create/update/delete for every lbaas
# object individually;
def __init__ ( self , topic ) :
super ( LoadBalancerAgentApi , self ) . __init__ (
topic , default_version = self . BASE_RPC_API_VERSION )
def reload_pool( self , context , pool_id , host ) :
def _cast( self , context , method_name , method_args , host , version = None ) :
return self . cast (
context ,
self . make_msg ( ' reload_pool ' , pool_id = pool_id , host = host ) ,
topic = ' %s . %s ' % ( self . topic , host )
self . make_msg ( method_name , * * method_args ) ,
topic = ' %s . %s ' % ( self . topic , host ) ,
version = version
)
def destroy_pool ( self , context , pool_id , host ) :
return self . cast (
context ,
self . make_msg ( ' destroy_pool ' , pool_id = pool_id , host = host ) ,
topic = ' %s . %s ' % ( self . topic , host )
)
def create_vip ( self , context , vip , host ) :
return self . _cast ( context , ' create_vip ' , { ' vip ' : vip } , host )
def modify_pool ( self , context , pool_id , host ) :
return self . cast (
context ,
self . make_msg ( ' modify_pool ' , pool_id = pool_id , host = host ) ,
topic = ' %s . %s ' % ( self . topic , host )
)
def update_vip ( self , context , old_vip , vip , host ) :
return self . _cast ( context , ' update_vip ' ,
{ ' old_vip ' : old_vip , ' vip ' : vip } , host )
def delete_vip ( self , context , vip , host ) :
return self . _cast ( context , ' delete_vip ' , { ' vip ' : vip } , host )
def create_pool ( self , context , pool , host , driver_name ) :
return self . _cast ( context , ' create_pool ' ,
{ ' pool ' : pool , ' driver_name ' : driver_name } , host )
def update_pool ( self , context , old_pool , pool , host ) :
return self . _cast ( context , ' update_pool ' ,
{ ' old_pool ' : old_pool , ' pool ' : pool } , host )
def delete_pool ( self , context , pool , host ) :
return self . _cast ( context , ' delete_pool ' , { ' pool ' : pool } , host )
def create_member ( self , context , member , host ) :
return self . _cast ( context , ' create_member ' , { ' member ' : member } , host )
def update_member ( self , context , old_member , member , host ) :
return self . _cast ( context , ' update_member ' ,
{ ' old_member ' : old_member , ' member ' : member } , host )
def delete_member ( self , context , member , host ) :
return self . _cast ( context , ' delete_member ' , { ' member ' : member } , host )
def create_pool_health_monitor ( self , context , health_monitor , pool_id ,
host ) :
return self . _cast ( context , ' create_pool_health_monitor ' ,
{ ' health_monitor ' : health_monitor ,
' pool_id ' : pool_id } , host )
def update_pool_health_monitor ( self , context , old_health_monitor ,
health_monitor , pool_id , host ) :
return self . _cast ( context , ' update_pool_health_monitor ' ,
{ ' old_health_monitor ' : old_health_monitor ,
' health_monitor ' : health_monitor ,
' pool_id ' : pool_id } , host )
def delete_pool_health_monitor ( self , context , health_monitor , pool_id ,
host ) :
return self . _cast ( context , ' delete_pool_health_monitor ' ,
{ ' health_monitor ' : health_monitor ,
' pool_id ' : pool_id } , host )
def agent_updated ( self , context , admin_state_up , host ) :
return self . cast (
context ,
self . make_msg ( ' agent_updated ' ,
payload = { ' admin_state_up ' : admin_state_up } ) ,
topic = ' %s . %s ' % ( self . topic , host ) ,
version = ' 1.1 '
)
return self . _cast ( context , ' agent_updated ' ,
{ ' payload ' : { ' admin_state_up ' : admin_state_up } } ,
host )
class HaproxyOnHostPluginDriver ( abstract_driver . LoadBalancerAbstractDriver ) :
class AgentBasedPluginDriver ( abstract_driver . LoadBalancerAbstractDriver ) :
# name of device driver that should be used by the agent;
# vendor specific plugin drivers must override it;
device_driver = None
def __init__ ( self , plugin ) :
if not self . device_driver :
raise DriverNotSpecified ( )
self . agent_rpc = LoadBalancerAgentApi ( TOPIC_LOADBALANCER_AGENT )
self . callbacks = LoadBalancerCallbacks ( plugin )
self . conn = rpc . create_connection ( new = True )
self . conn . create_consumer (
TOPIC_PROCESS_ON_HOST ,
self . callbacks . create_rpc_dispatcher ( ) ,
fanout = False )
self . conn . consume_in_thread ( )
self . plugin = plugin
self . _set_callbacks_on_plugin ( )
self . plugin . agent_notifiers . update (
{ q_const . AGENT_TYPE_LOADBALANCER : self . agent_rpc } )
self . pool_scheduler = importutils . import_object (
cfg . CONF . loadbalancer_pool_scheduler_driver )
def _set_callbacks_on_plugin ( self ) :
# other agent based plugin driver might already set callbacks on plugin
if hasattr ( self . plugin , ' agent_callbacks ' ) :
return
self . plugin . agent_callbacks = LoadBalancerCallbacks ( self . plugin )
self . plugin . conn = rpc . create_connection ( new = True )
self . plugin . conn . create_consumer (
TOPIC_LOADBALANCER_PLUGIN ,
self . plugin . agent_callbacks . create_rpc_dispatcher ( ) ,
fanout = False )
self . plugin . conn . consume_in_thread ( )
def get_pool_agent ( self , context , pool_id ) :
agent = self . plugin . get_lbaas_agent_hosting_pool ( context , pool_id )
if not agent :
@ -281,80 +361,95 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
def create_vip ( self , context , vip ) :
agent = self . get_pool_agent ( context , vip [ ' pool_id ' ] )
self . agent_rpc . reload_pool( context , vip [ ' pool_id ' ] , agent [ ' host ' ] )
self . agent_rpc . create_vip( context , vip , agent [ ' host ' ] )
def update_vip ( self , context , old_vip , vip ) :
agent = self . get_pool_agent ( context , vip [ ' pool_id ' ] )
if vip [ ' status ' ] in ACTIVE_PENDING :
self . agent_rpc . reload_pool( context , vip [ ' pool_id ' ] , agent [ ' host ' ] )
self . agent_rpc . update_vip( context , old_vip , vip , agent [ ' host ' ] )
else :
self . agent_rpc . de stroy_pool( context , vip [ ' pool_id ' ] , agent [ ' host ' ] )
self . agent_rpc . de lete_vip( context , vip , agent [ ' host ' ] )
def delete_vip ( self , context , vip ) :
self . plugin . _delete_db_vip ( context , vip [ ' id ' ] )
agent = self . get_pool_agent ( context , vip [ ' pool_id ' ] )
self . agent_rpc . de stroy_pool( context , vip [ ' pool_id ' ] , agent [ ' host ' ] )
self . agent_rpc . de lete_vip( context , vip , agent [ ' host ' ] )
def create_pool ( self , context , pool ) :
if not self . pool_scheduler . schedule ( self . plugin , context , pool ) :
agent = self . pool_scheduler . schedule ( self . plugin , context , pool ,
self . device_driver )
if not agent :
raise lbaas_agentscheduler . NoEligibleLbaasAgent ( pool_id = pool [ ' id ' ] )
# don't notify here because a pool needs a vip to be useful
self . agent_rpc . create_pool ( context , pool , agent [ ' host ' ] ,
self . device_driver )
def update_pool ( self , context , old_pool , pool ) :
agent = self . get_pool_agent ( context , pool [ ' id ' ] )
if pool [ ' status ' ] in ACTIVE_PENDING :
if pool [ ' vip_id ' ] is not None :
self . agent_rpc . reload_pool ( context , pool [ ' id ' ] , agent [ ' host ' ] )
self . agent_rpc . update_pool ( context , old_pool , pool ,
agent [ ' host ' ] )
else :
self . agent_rpc . de stroy _pool( context , pool [ ' id ' ] , agent [ ' host ' ] )
self . agent_rpc . de lete _pool( context , pool , agent [ ' host ' ] )
def delete_pool ( self , context , pool ) :
# get agent first to know host as binding will be deleted
# after pool is deleted from db
agent = self . plugin . get_lbaas_agent_hosting_pool ( context , pool [ ' id ' ] )
if agent :
self . agent_rpc . destroy_pool ( context , pool [ ' id ' ] ,
agent [ ' agent ' ] [ ' host ' ] )
self . plugin . _delete_db_pool ( context , pool [ ' id ' ] )
if agent :
self . agent_rpc . delete_pool ( context , pool , agent [ ' agent ' ] [ ' host ' ] )
def create_member ( self , context , member ) :
agent = self . get_pool_agent ( context , member [ ' pool_id ' ] )
self . agent_rpc . modify_pool( context , member [ ' pool_id ' ] , agent [ ' host ' ] )
self . agent_rpc . create_member( context , member , agent [ ' host ' ] )
def update_member ( self , context , old_member , member ) :
agent = self . get_pool_agent ( context , member [ ' pool_id ' ] )
# member may change pool id
if member [ ' pool_id ' ] != old_member [ ' pool_id ' ] :
agent = self . plugin . get_lbaas_agent_hosting_pool (
old_pool_ agent = self . plugin . get_lbaas_agent_hosting_pool (
context , old_member [ ' pool_id ' ] )
if agent :
self . agent_rpc . modify_pool ( context ,
old_member [ ' pool_id ' ] ,
agent [ ' agent ' ] [ ' host ' ] )
agent = self . get_pool_agent ( context , member [ ' pool_id ' ] )
self . agent_rpc . modify_pool ( context , member [ ' pool_id ' ] , agent [ ' host ' ] )
if old_pool_agent :
self . agent_rpc . delete_member ( context , old_member ,
old_pool_agent [ ' agent ' ] [ ' host ' ] )
self . agent_rpc . create_member ( context , member , agent [ ' host ' ] )
else :
self . agent_rpc . update_member ( context , old_member , member ,
agent [ ' host ' ] )
def delete_member ( self , context , member ) :
self . plugin . _delete_db_member ( context , member [ ' id ' ] )
agent = self . get_pool_agent ( context , member [ ' pool_id ' ] )
self . agent_rpc . modify_pool ( context , member [ ' pool_id ' ] , agent [ ' host ' ] )
def update_health_monitor ( self , context , old_health_monitor ,
health_monitor , pool_id ) :
# monitors are unused here because agent will fetch what is necessary
agent = self . get_pool_agent ( context , pool_id )
self . agent_rpc . modify_pool ( context , pool_id , agent [ ' host ' ] )
self . agent_rpc . delete_member ( context , member , agent [ ' host ' ] )
def create_pool_health_monitor ( self , context , healthmon , pool_id ) :
# healthmon is not used here
agent = self . get_pool_agent ( context , pool_id )
self . agent_rpc . modify_pool ( context , pool_id , agent [ ' host ' ] )
self . agent_rpc . create_pool_health_monitor ( context , healthmon ,
pool_id , agent [ ' host ' ] )
def update_pool_health_monitor ( self , context , old_health_monitor ,
health_monitor , pool_id ) :
agent = self . get_pool_agent ( context , pool_id )
self . agent_rpc . update_pool_health_monitor ( context , old_health_monitor ,
health_monitor , pool_id ,
agent [ ' host ' ] )
def delete_pool_health_monitor ( self , context , health_monitor , pool_id ) :
self . plugin . _delete_db_pool_health_monitor (
context , health_monitor [ ' id ' ] , pool_id
)
# healthmon_id is not used here
agent = self . get_