@ -14,12 +14,15 @@
import abc
import cotyledon
import multiprocessing
import os
from oslo_concurrency import processutils as ps
from oslo_log import log
import oslo_messaging
from oslo_utils import uuidutils
from vitrage . api_handler . apis . operational import OperationalApis
from vitrage . entity_graph . graph_persistency import GraphPersistency
from vitrage . api_handler . apis . alarm import AlarmApis
from vitrage . api_handler . apis . event import EventApis
from vitrage . api_handler . apis . rca import RcaApis
@ -34,6 +37,7 @@ from vitrage.entity_graph import EVALUATOR_TOPIC
from vitrage . evaluator . actions . base import ActionMode
from vitrage . evaluator . scenario_evaluator import ScenarioEvaluator
from vitrage . evaluator . scenario_repository import ScenarioRepository
from vitrage . graph . driver . networkx_graph import NXGraph
from vitrage import messaging
from vitrage import rpc as vitrage_rpc
from vitrage import storage
@ -41,6 +45,8 @@ from vitrage import storage
LOG = log . getLogger ( __name__ )
# Supported message types
WAIT_FOR_WORKER_START = ' wait_for_worker_start '
READ_DB_GRAPH = ' read_db_graph '
GRAPH_UPDATE = ' graph_update '
ENABLE_EVALUATION = ' enable_evaluation '
START_EVALUATION = ' start_evaluation '
@ -58,16 +64,15 @@ class GraphWorkersManager(cotyledon.ServiceManager):
- the queues used to communicate with these workers
- methods interface to submit tasks to workers
"""
def __init__ ( self , conf , entity_graph , db ):
def __init__ ( self , conf ):
super ( GraphWorkersManager , self ) . __init__ ( )
self . _conf = conf
self . _entity_graph = entity_graph
self . _db = db
self . _db = None
self . _evaluator_queues = [ ]
self . _template_queues = [ ]
self . _api_queues = [ ]
self . _all_queues = [ ]
self . register_hooks ( on_terminate = self . _ stop)
self . register_hooks ( on_terminate = self . _ force_ stop)
self . add_evaluator_workers ( )
self . add_api_workers ( )
@ -88,7 +93,7 @@ class GraphWorkersManager(cotyledon.ServiceManager):
workers = self . _conf . evaluator . workers or ps . get_worker_count ( )
queues = [ multiprocessing . JoinableQueue ( ) for i in range ( workers ) ]
self . add ( EvaluatorWorker ,
args = ( self . _conf , queues , self . _entity_graph , workers ) ,
args = ( self . _conf , queues , workers ) ,
workers = workers )
self . _evaluator_queues = queues
self . _all_queues . extend ( queues )
@ -105,9 +110,7 @@ class GraphWorkersManager(cotyledon.ServiceManager):
raise VitrageError ( ' add_api_workers called more than once ' )
workers = self . _conf . api . workers
queues = [ multiprocessing . JoinableQueue ( ) for i in range ( workers ) ]
self . add ( ApiWorker ,
args = ( self . _conf , queues , self . _entity_graph ) ,
workers = workers )
self . add ( ApiWorker , args = ( self . _conf , queues ) , workers = workers )
self . _api_queues = queues
self . _all_queues . extend ( queues )
@ -143,6 +146,24 @@ class GraphWorkersManager(cotyledon.ServiceManager):
"""
self . _submit_and_wait ( self . _evaluator_queues , ( RELOAD_TEMPLATES , ) )
def submit_read_db_graph ( self ) :
""" Initialize the worker graph from database snapshot
So that new / deleted templates are added / removed
"""
LOG . info ( " Worker processes - loading graph... " )
self . _submit_and_wait ( self . _all_queues , ( READ_DB_GRAPH , ) )
LOG . info ( " Worker processes - graph is ready " )
def wait_for_worker_start ( self ) :
""" Wait for responses from all workers
So that new / deleted templates are added / removed
"""
LOG . info ( " Worker processes - starting... " )
self . _submit_and_wait ( self . _all_queues , ( WAIT_FOR_WORKER_START , ) )
LOG . info ( " Worker processes - ready! " )
def submit_template_event ( self , event ) :
""" Template worker to load the new/deleted template
@ -150,6 +171,9 @@ class GraphWorkersManager(cotyledon.ServiceManager):
"""
template_action = event . get ( TEMPLATE_ACTION )
if not self . _db :
self . _db = storage . get_connection_from_config ( self . _conf )
if template_action == ADD :
templates = self . _db . templates . query ( status = TStatus . LOADING )
new_status = TStatus . ACTIVE
@ -182,20 +206,19 @@ class GraphWorkersManager(cotyledon.ServiceManager):
q . join ( )
@staticmethod
def _ stop( ) :
raise SystemE xit( 0 )
def _ force_ stop( ) :
os . _e xit( 0 )
class GraphCloneWorkerBase ( cotyledon . Service ) :
def __init__ ( self ,
worker_id ,
conf ,
task_queues ,
entity_graph ) :
task_queues ) :
super ( GraphCloneWorkerBase , self ) . __init__ ( worker_id )
self . _conf = conf
self . _task_queue = task_queues [ worker_id ]
self . _entity_graph = entity_graph
self . _entity_graph = NXGraph( )
name = ' GraphCloneWorkerBase '
@ -205,9 +228,14 @@ class GraphCloneWorkerBase(cotyledon.Service):
raise NotImplementedError
def run ( self ) :
LOG . info ( " %s - Starting %s " , self . __class__ . __name__ , self . worker_id )
self . _entity_graph . notifier . _subscriptions = [ ] # Quick n dirty
self . _init_instance ( )
if self . _entity_graph . num_vertices ( ) :
LOG . info ( " %s - Started %s ( %s vertices) " , self . __class__ . __name__ ,
self . worker_id , self . _entity_graph . num_vertices ( ) )
else :
LOG . info ( " %s - Started empty %s " , self . __class__ . __name__ ,
self . worker_id )
self . _read_queue ( )
def _read_queue ( self ) :
@ -226,6 +254,11 @@ class GraphCloneWorkerBase(cotyledon.Service):
if action == GRAPH_UPDATE :
( action , before , current , is_vertex ) = task
self . _graph_update ( before , current , is_vertex )
elif action == READ_DB_GRAPH :
self . _read_db_graph ( )
elif action == WAIT_FOR_WORKER_START :
# Nothing to do, manager is just verifying this worker is alive
pass
def _graph_update ( self , before , current , is_vertex ) :
if current :
@ -239,16 +272,23 @@ class GraphCloneWorkerBase(cotyledon.Service):
else :
self . _entity_graph . remove_edge ( before )
def _read_db_graph ( self ) :
db = storage . get_connection_from_config ( self . _conf )
graph_snapshot = db . graph_snapshots . query ( )
NXGraph . read_gpickle ( graph_snapshot . graph_snapshot , self . _entity_graph )
GraphPersistency . do_replay_events ( db , self . _entity_graph ,
graph_snapshot . event_id )
self . _entity_graph . ready = True
class EvaluatorWorker ( GraphCloneWorkerBase ) :
def __init__ ( self ,
worker_id ,
conf ,
task_queues ,
e_graph ,
workers_num ) :
super ( EvaluatorWorker , self ) . __init__ (
worker_id , conf , task_queues , e_graph )
worker_id , conf , task_queues )
self . _workers_num = workers_num
self . _evaluator = None
@ -312,7 +352,6 @@ class ApiWorker(GraphCloneWorkerBase):
def _init_instance ( self ) :
conf = self . _conf
LOG . info ( " Vitrage Api Handler Service - Starting... " )
notifier = messaging . VitrageNotifier ( conf , " vitrage.api " ,
[ EVALUATOR_TOPIC ] )
db = storage . get_connection_from_config ( conf )
@ -326,10 +365,9 @@ class ApiWorker(GraphCloneWorkerBase):
TemplateApis ( notifier , db ) ,
EventApis ( conf ) ,
ResourceApis ( self . _entity_graph , conf ) ,
WebhookApis ( conf ) ]
WebhookApis ( conf ) ,
OperationalApis ( conf , self . _entity_graph ) ]
server = vitrage_rpc . get_server ( target , endpoints , transport )
server . start ( )
LOG . info ( " Vitrage Api Handler Service - Started! " )