@ -51,6 +51,7 @@ class ConductorManager(object):
self . _periodics_worker = None
self . _zeroconf = None
self . _shutting_down = semaphore . Semaphore ( )
self . coordinator = None
def init_host ( self ) :
""" Initialize Worker host
@ -70,6 +71,24 @@ class ConductorManager(object):
db . init ( )
self . coordinator = None
try :
self . coordinator = coordination . get_coordinator ( prefix = ' conductor ' )
self . coordinator . start ( heartbeat = True )
self . coordinator . join_group ( )
except Exception as exc :
if CONF . standalone :
LOG . info ( ' Coordination backend cannot be started, assuming '
' no other instances are running. Error: %s ' , exc )
self . coordinator = None
else :
with excutils . save_and_reraise_exception ( ) :
LOG . critical ( ' Failure when connecting to coordination '
' backend ' , exc_info = True )
self . del_host ( )
else :
LOG . info ( ' Successfully connected to coordination backend. ' )
try :
hooks = plugins_base . validate_processing_hooks ( )
except Exception as exc :
@ -91,11 +110,20 @@ class ConductorManager(object):
) ( sync_with_ironic )
callables = [ ( periodic_clean_up_ , None , None ) ,
( sync_with_ironic_ , None , None ) ]
( sync_with_ironic_ , ( self , ) , None ) ]
driver_task = driver . get_periodic_sync_task ( )
if driver_task is not None :
callables . append ( ( driver_task , None , None ) )
# run elections periodically if we have a coordinator
# that we were able to start
if ( self . coordinator and self . coordinator . started ) :
periodic_leader_election_ = periodics . periodic (
spacing = CONF . leader_election_interval
) ( periodic_leader_election )
callables . append ( ( periodic_leader_election_ , ( self , ) , None ) )
self . _periodics_worker = periodics . PeriodicWorker (
callables = callables ,
executor_factory = periodics . ExistingExecutor ( utils . executor ( ) ) ,
@ -109,28 +137,14 @@ class ConductorManager(object):
self . _zeroconf . register_service ( ' baremetal-introspection ' ,
endpoint )
if not CONF . standalone :
try :
coordinator = coordination . get_coordinator ( prefix = ' conductor ' )
coordinator . start ( heartbeat = True )
coordinator . join_group ( )
except tooz . ToozError :
with excutils . save_and_reraise_exception ( ) :
LOG . critical ( ' Failed when connecting to coordination '
' backend. ' )
self . del_host ( )
else :
LOG . info ( ' Successfully connected to coordination backend. ' )
def del_host ( self ) :
""" Shutdown the ironic inspector conductor service. """
if not CONF . standal one:
if self . coordinator is not None :
try :
coordinator = coordination . get_coordinator ( prefix = ' conductor ' )
if coordinator . started :
coordinator . leave_group ( )
coordinator . stop ( )
if self . coordinator . started :
self . coordinator . leave_group ( )
self . coordinator . stop ( )
except tooz . ToozError :
LOG . exception ( ' Failed to stop coordinator ' )
@ -201,9 +215,22 @@ def periodic_clean_up(): # pragma: no cover
pxe_filter . driver ( ) . sync ( ir_utils . get_client ( ) )
def sync_with_ironic ( ) :
def sync_with_ironic ( conductor ) :
if ( conductor . coordinator is not None
and not conductor . coordinator . is_leader ) :
LOG . debug ( ' The conductor is not a leader, skipping syncing '
' with ironic ' )
return
LOG . debug ( ' Syncing with ironic ' )
ironic = ir_utils . get_client ( )
# TODO(yuikotakada): pagination
ironic_nodes = ironic . nodes ( fields = [ " uuid " ] , limit = None )
ironic_node_uuids = { node . id for node in ironic_nodes }
node_cache . delete_nodes_not_in_list ( ironic_node_uuids )
def periodic_leader_election ( conductor ) :
if conductor . coordinator is not None :
conductor . coordinator . run_elect_coordinator ( )
return