diff --git a/vitrage/api/controllers/rest.py b/vitrage/api/controllers/rest.py index 3cb9ecd77..f1b31a558 100644 --- a/vitrage/api/controllers/rest.py +++ b/vitrage/api/controllers/rest.py @@ -11,9 +11,10 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - import networkx as nx from networkx.readwrite import json_graph +import oslo_messaging +import pecan from pecan import rest from vitrage.datasources import OPENSTACK_CLUSTER @@ -21,6 +22,22 @@ from vitrage.datasources import OPENSTACK_CLUSTER class RootRestController(rest.RestController): + @pecan.expose() + def _route(self, args, request=None): + """All requests go through here + + We can check the backend status + """ + try: + client = pecan.request.client.prepare(timeout=5) + backend_is_alive = client.call(pecan.request.context, 'is_alive') + if backend_is_alive: + return super(RootRestController, self)._route(args, request) + else: + pecan.abort(503, detail='vitrage-graph is not ready') + except oslo_messaging.MessagingTimeout: + pecan.abort(503, detail='vitrage-graph not available') + @staticmethod def as_tree(graph, root=OPENSTACK_CLUSTER, reverse=False): if nx.__version__ >= '2.0': diff --git a/vitrage/api/hooks.py b/vitrage/api/hooks.py index 389fe092f..aef5bec4e 100644 --- a/vitrage/api/hooks.py +++ b/vitrage/api/hooks.py @@ -46,7 +46,7 @@ class RPCHook(hooks.PecanHook): target = oslo_messaging.Target(topic=conf.rpc_topic) self.client = vitrage_rpc.get_client(transport, target) - def before(self, state): + def on_route(self, state): state.request.client = self.client diff --git a/vitrage/api_handler/apis/operational.py b/vitrage/api_handler/apis/operational.py new file mode 100644 index 000000000..006235ec4 --- /dev/null +++ b/vitrage/api_handler/apis/operational.py @@ -0,0 +1,34 @@ +# Copyright 2018 - Nokia Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log +from vitrage.api_handler.apis.base import EntityGraphApisBase + +LOG = log.getLogger(__name__) + + +class OperationalApis(EntityGraphApisBase): + + def __init__(self, conf, graph): + self.conf = conf + self.graph = graph + + def is_alive(self, ctx): + try: + if self.graph and self.graph.ready: + return True + except Exception: + LOG.exception("is_alive check failed.") + LOG.warning("Api during initialization - graph not ready") + return False diff --git a/vitrage/cli/__init__.py b/vitrage/cli/__init__.py index 555f13790..5aae80094 100644 --- a/vitrage/cli/__init__.py +++ b/vitrage/cli/__init__.py @@ -15,17 +15,12 @@ from vitrage import __version__ VITRAGE_TITLE = r""" - __ __ __ __ -/ | / |/ | / | -$$ | $$ |$$/ _$$ |_ ______ ______ ______ ______ -$$ | $$ |/ |/ $$ | / \ / \ / \ / \ -$$ \ /$$/ $$ |$$$$$$/ /$$$$$$ |$$$$$$ |/$$$$$$ |/$$$$$$ | - $$ /$$/ $$ | $$ | __ $$ | $$/ / $$ |$$ | $$ |$$ $$ | - $$ $$/ $$ | $$ |/ |$$ | /$$$$$$$ |$$ \__$$ |$$$$$$$$/ - $$$/ $$ | $$ $$/ $$ | $$ $$ |$$ $$ |$$ | - $/ $$/ $$$$/ $$/ $$$$$$$/ $$$$$$$ | $$$$$$$/ - / \__$$ | - $$ $$/ - $$$$$$/ + _ __ _ __ +| | / /(_)/ /_ _____ ____ _ ____ _ ___ +| | / // // __// ___// __ `// __ `// _ \ +| |/ // // /_ / / / /_/ // /_/ // __/ +|___//_/ \__//_/ \__,_/ \__, / \___/ + /____/ + Vitrage RCA Service, version %s """ % __version__ diff --git a/vitrage/cli/graph.py b/vitrage/cli/graph.py index 95cd6ddd5..72e5d388a 100644 --- a/vitrage/cli/graph.py +++ b/vitrage/cli/graph.py @@ -12,25 +12,34 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - +from oslo_log import log import sys from vitrage.cli import VITRAGE_TITLE -from vitrage.entity_graph import get_graph_driver +from vitrage.common.utils import spawn from vitrage.entity_graph.graph_init import VitrageGraphInit +from vitrage.entity_graph.workers import GraphWorkersManager from vitrage import service -from vitrage import storage + +LOG = log.getLogger(__name__) def main(): """Main method of vitrage-graph""" - print(VITRAGE_TITLE) conf = service.prepare_service() - e_graph = get_graph_driver(conf)('Entity Graph') - db_connection = storage.get_connection_from_config(conf) - VitrageGraphInit(conf, e_graph, db_connection).run() + LOG.info(VITRAGE_TITLE) + workers = GraphWorkersManager(conf) + spawn(init, conf, workers) + workers.run() + + +def init(conf, workers): + # Because fork duplicates the process memory. + # We should only create master process resources after workers are forked. + workers.wait_for_worker_start() + VitrageGraphInit(conf, workers).run() if __name__ == "__main__": sys.exit(main()) diff --git a/vitrage/entity_graph/consistency/consistency_enforcer.py b/vitrage/entity_graph/consistency/consistency_enforcer.py index 04a50f186..3831d6f6c 100644 --- a/vitrage/entity_graph/consistency/consistency_enforcer.py +++ b/vitrage/entity_graph/consistency/consistency_enforcer.py @@ -157,5 +157,7 @@ class ConsistencyEnforcer(object): if driver_class.should_delete_outdated_entities(): self.datasources_to_mark_deleted.append(driver_name) - LOG.info('Vertices of the following datasources will be deleted if ' - 'they become outdated: %s', self.datasources_to_mark_deleted) + if self.datasources_to_mark_deleted: + LOG.info('Vertices of the following datasources will be deleted if' + 'they become outdated: %s', + self.datasources_to_mark_deleted) diff --git a/vitrage/entity_graph/graph_init.py b/vitrage/entity_graph/graph_init.py index f8f8f6abc..2d1ea9a26 100644 --- a/vitrage/entity_graph/graph_init.py +++ b/vitrage/entity_graph/graph_init.py @@ -21,53 +21,60 @@ from vitrage.common.constants import VertexProperties as VProps from vitrage.common.utils import spawn from vitrage.datasources.transformer_base import TransformerBase from vitrage.entity_graph import driver_exec +from vitrage.entity_graph import get_graph_driver + from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.entity_graph.graph_persistency import GraphPersistency from vitrage.entity_graph.processor.notifier import GraphNotifier from vitrage.entity_graph.processor.notifier import PersistNotifier from vitrage.entity_graph.processor.processor import Processor from vitrage.entity_graph.scheduler import Scheduler -from vitrage.entity_graph.workers import GraphWorkersManager from vitrage.graph.driver.networkx_graph import NXGraph from vitrage import messaging - +from vitrage import storage LOG = log.getLogger(__name__) class VitrageGraphInit(object): - def __init__(self, conf, graph, db_connection): + def __init__(self, conf, workers): self.conf = conf - self.graph = graph - self.db = db_connection - self.workers = GraphWorkersManager(conf, graph, db_connection) + self.graph = get_graph_driver(conf)('Entity Graph') + self.db = db_connection = storage.get_connection_from_config(conf) + self.workers = workers self.events_coordination = EventsCoordination(conf, self.process_event) - self.persist = GraphPersistency(conf, db_connection, graph) + self.persist = GraphPersistency(conf, db_connection, self.graph) self.driver_exec = driver_exec.DriverExec( self.conf, self.events_coordination.handle_multiple_low_priority, self.persist) - self.scheduler = Scheduler(conf, graph, self.driver_exec, self.persist) - self.processor = Processor(conf, graph) + self.scheduler = Scheduler(conf, self.graph, self.driver_exec, + self.persist) + self.processor = Processor(conf, self.graph) def run(self): LOG.info('Init Started') graph_snapshot = self.persist.query_recent_snapshot() if graph_snapshot: + t = spawn(self.workers.submit_read_db_graph) self._restart_from_stored_graph(graph_snapshot) + t.join() + self.workers.submit_enable_evaluations() + else: self._start_from_scratch() - self.workers.run() + self.workers.submit_read_db_graph() + self.workers.submit_start_evaluations() + self._init_finale() def _restart_from_stored_graph(self, graph_snapshot): - LOG.info('Initializing graph from database snapshot (%sKb)', + LOG.info('Main process - loading graph from database snapshot (%sKb)', len(graph_snapshot.graph_snapshot) / 1024) NXGraph.read_gpickle(graph_snapshot.graph_snapshot, self.graph) self.persist.replay_events(self.graph, graph_snapshot.event_id) self._recreate_transformers_id_cache() LOG.info("%s vertices loaded", self.graph.num_vertices()) self.subscribe_presist_notifier() - spawn(self._start_all_workers, is_snapshot=True) def _start_from_scratch(self): LOG.info('Starting for the first time') @@ -78,13 +85,8 @@ class VitrageGraphInit(object): self.subscribe_presist_notifier() self.driver_exec.snapshot_get_all() LOG.info("%s vertices loaded", self.graph.num_vertices()) - spawn(self._start_all_workers, is_snapshot=False) - def _start_all_workers(self, is_snapshot): - if is_snapshot: - self.workers.submit_enable_evaluations() - else: - self.workers.submit_start_evaluations() + def _init_finale(self): self._add_graph_subscriptions() self.scheduler.start_periodic_tasks() LOG.info('Init Finished') diff --git a/vitrage/entity_graph/graph_persistency.py b/vitrage/entity_graph/graph_persistency.py index c6910992f..8bd44e689 100644 --- a/vitrage/entity_graph/graph_persistency.py +++ b/vitrage/entity_graph/graph_persistency.py @@ -48,9 +48,13 @@ class GraphPersistency(object): def replay_events(self, graph, event_id): LOG.info('Getting events from database') - events = self.db.events.get_replay_events( + count = self.do_replay_events(self.db, graph, event_id) + LOG.info('%s database events applied ', count) + + @staticmethod + def do_replay_events(db, graph, event_id): + events = db.events.get_replay_events( event_id=event_id) - LOG.info('Applying %s database events', len(events)) for event in events: if event.is_vertex: @@ -67,6 +71,7 @@ class GraphPersistency(object): del event.payload['label'] e = Edge(source_id, target_id, label, event.payload) graph.update_edge(e) + return len(events) def persist_event(self, before, current, is_vertex, graph, event_id=None): """Callback subscribed to driver.graph updates""" diff --git a/vitrage/entity_graph/workers.py b/vitrage/entity_graph/workers.py index 4f947e8d2..2d9333f2e 100644 --- a/vitrage/entity_graph/workers.py +++ b/vitrage/entity_graph/workers.py @@ -14,12 +14,15 @@ import abc import cotyledon import multiprocessing - +import os from oslo_concurrency import processutils as ps from oslo_log import log import oslo_messaging from oslo_utils import uuidutils +from vitrage.api_handler.apis.operational import OperationalApis +from vitrage.entity_graph.graph_persistency import GraphPersistency + from vitrage.api_handler.apis.alarm import AlarmApis from vitrage.api_handler.apis.event import EventApis from vitrage.api_handler.apis.rca import RcaApis @@ -34,6 +37,7 @@ from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.evaluator.actions.base import ActionMode from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator from vitrage.evaluator.scenario_repository import ScenarioRepository +from vitrage.graph.driver.networkx_graph import NXGraph from vitrage import messaging from vitrage import rpc as vitrage_rpc from vitrage import storage @@ -41,6 +45,8 @@ from vitrage import storage LOG = log.getLogger(__name__) # Supported message types +WAIT_FOR_WORKER_START = 'wait_for_worker_start' +READ_DB_GRAPH = 'read_db_graph' GRAPH_UPDATE = 'graph_update' ENABLE_EVALUATION = 'enable_evaluation' START_EVALUATION = 'start_evaluation' @@ -58,16 +64,15 @@ class GraphWorkersManager(cotyledon.ServiceManager): - the queues used to communicate with these workers - methods interface to submit tasks to workers """ - def __init__(self, conf, entity_graph, db): + def __init__(self, conf): super(GraphWorkersManager, self).__init__() self._conf = conf - self._entity_graph = entity_graph - self._db = db + self._db = None self._evaluator_queues = [] self._template_queues = [] self._api_queues = [] self._all_queues = [] - self.register_hooks(on_terminate=self._stop) + self.register_hooks(on_terminate=self._force_stop) self.add_evaluator_workers() self.add_api_workers() @@ -88,7 +93,7 @@ class GraphWorkersManager(cotyledon.ServiceManager): workers = self._conf.evaluator.workers or ps.get_worker_count() queues = [multiprocessing.JoinableQueue() for i in range(workers)] self.add(EvaluatorWorker, - args=(self._conf, queues, self._entity_graph, workers), + args=(self._conf, queues, workers), workers=workers) self._evaluator_queues = queues self._all_queues.extend(queues) @@ -105,9 +110,7 @@ class GraphWorkersManager(cotyledon.ServiceManager): raise VitrageError('add_api_workers called more than once') workers = self._conf.api.workers queues = [multiprocessing.JoinableQueue() for i in range(workers)] - self.add(ApiWorker, - args=(self._conf, queues, self._entity_graph), - workers=workers) + self.add(ApiWorker, args=(self._conf, queues), workers=workers) self._api_queues = queues self._all_queues.extend(queues) @@ -143,6 +146,24 @@ class GraphWorkersManager(cotyledon.ServiceManager): """ self._submit_and_wait(self._evaluator_queues, (RELOAD_TEMPLATES,)) + def submit_read_db_graph(self): + """Initialize the worker graph from database snapshot + + So that new/deleted templates are added/removed + """ + LOG.info("Worker processes - loading graph...") + self._submit_and_wait(self._all_queues, (READ_DB_GRAPH,)) + LOG.info("Worker processes - graph is ready") + + def wait_for_worker_start(self): + """Wait for responses from all workers + + So that new/deleted templates are added/removed + """ + LOG.info("Worker processes - starting...") + self._submit_and_wait(self._all_queues, (WAIT_FOR_WORKER_START,)) + LOG.info("Worker processes - ready!") + def submit_template_event(self, event): """Template worker to load the new/deleted template @@ -150,6 +171,9 @@ class GraphWorkersManager(cotyledon.ServiceManager): """ template_action = event.get(TEMPLATE_ACTION) + if not self._db: + self._db = storage.get_connection_from_config(self._conf) + if template_action == ADD: templates = self._db.templates.query(status=TStatus.LOADING) new_status = TStatus.ACTIVE @@ -182,20 +206,19 @@ class GraphWorkersManager(cotyledon.ServiceManager): q.join() @staticmethod - def _stop(): - raise SystemExit(0) + def _force_stop(): + os._exit(0) class GraphCloneWorkerBase(cotyledon.Service): def __init__(self, worker_id, conf, - task_queues, - entity_graph): + task_queues): super(GraphCloneWorkerBase, self).__init__(worker_id) self._conf = conf self._task_queue = task_queues[worker_id] - self._entity_graph = entity_graph + self._entity_graph = NXGraph() name = 'GraphCloneWorkerBase' @@ -205,9 +228,14 @@ class GraphCloneWorkerBase(cotyledon.Service): raise NotImplementedError def run(self): - LOG.info("%s - Starting %s", self.__class__.__name__, self.worker_id) self._entity_graph.notifier._subscriptions = [] # Quick n dirty self._init_instance() + if self._entity_graph.num_vertices(): + LOG.info("%s - Started %s (%s vertices)", self.__class__.__name__, + self.worker_id, self._entity_graph.num_vertices()) + else: + LOG.info("%s - Started empty %s", self.__class__.__name__, + self.worker_id) self._read_queue() def _read_queue(self): @@ -226,6 +254,11 @@ class GraphCloneWorkerBase(cotyledon.Service): if action == GRAPH_UPDATE: (action, before, current, is_vertex) = task self._graph_update(before, current, is_vertex) + elif action == READ_DB_GRAPH: + self._read_db_graph() + elif action == WAIT_FOR_WORKER_START: + # Nothing to do, manager is just verifying this worker is alive + pass def _graph_update(self, before, current, is_vertex): if current: @@ -239,16 +272,23 @@ class GraphCloneWorkerBase(cotyledon.Service): else: self._entity_graph.remove_edge(before) + def _read_db_graph(self): + db = storage.get_connection_from_config(self._conf) + graph_snapshot = db.graph_snapshots.query() + NXGraph.read_gpickle(graph_snapshot.graph_snapshot, self._entity_graph) + GraphPersistency.do_replay_events(db, self._entity_graph, + graph_snapshot.event_id) + self._entity_graph.ready = True + class EvaluatorWorker(GraphCloneWorkerBase): def __init__(self, worker_id, conf, task_queues, - e_graph, workers_num): super(EvaluatorWorker, self).__init__( - worker_id, conf, task_queues, e_graph) + worker_id, conf, task_queues) self._workers_num = workers_num self._evaluator = None @@ -312,7 +352,6 @@ class ApiWorker(GraphCloneWorkerBase): def _init_instance(self): conf = self._conf - LOG.info("Vitrage Api Handler Service - Starting...") notifier = messaging.VitrageNotifier(conf, "vitrage.api", [EVALUATOR_TOPIC]) db = storage.get_connection_from_config(conf) @@ -326,10 +365,9 @@ class ApiWorker(GraphCloneWorkerBase): TemplateApis(notifier, db), EventApis(conf), ResourceApis(self._entity_graph, conf), - WebhookApis(conf)] + WebhookApis(conf), + OperationalApis(conf, self._entity_graph)] server = vitrage_rpc.get_server(target, endpoints, transport) server.start() - - LOG.info("Vitrage Api Handler Service - Started!") diff --git a/vitrage/evaluator/scenario_repository.py b/vitrage/evaluator/scenario_repository.py index f80593bc8..757b86f91 100644 --- a/vitrage/evaluator/scenario_repository.py +++ b/vitrage/evaluator/scenario_repository.py @@ -217,4 +217,5 @@ class ScenarioRepository(object): def log_enabled_scenarios(self): scenarios = [s for s in self._all_scenarios if s.enabled] - LOG.info("Scenarios:\n%s", sorted([s.id for s in scenarios])) + if scenarios: + LOG.info("Scenarios:\n%s", sorted([s.id for s in scenarios])) diff --git a/vitrage/graph/driver/networkx_graph.py b/vitrage/graph/driver/networkx_graph.py index eabb3ce8b..1d63de691 100644 --- a/vitrage/graph/driver/networkx_graph.py +++ b/vitrage/graph/driver/networkx_graph.py @@ -55,6 +55,7 @@ class NXGraph(Graph): self._g = nx.MultiDiGraph() self.add_vertices(vertices) self.add_edges(edges) + self.ready = False def __len__(self): return len(self._g) diff --git a/vitrage/opts.py b/vitrage/opts.py index 1c6afc62b..5a57b19a1 100644 --- a/vitrage/opts.py +++ b/vitrage/opts.py @@ -102,8 +102,6 @@ def _normalize_path_to_datasource_name(path_list, top=os.getcwd()): def register_opts(conf, package_name, paths): """register opts of package package_name, with base path in paths""" for path in paths: - LOG.info("package name: %s" % package_name) - LOG.info("path: % s" % path) try: opt = importutils.import_module( "%s.%s" % (path, package_name)).OPTS @@ -113,5 +111,6 @@ def register_opts(conf, package_name, paths): ) return except ImportError: - LOG.error("Failed to register config options for %s" % - package_name) + pass + LOG.error("Failed to import config options for %s. Not found in %s", + package_name, str(paths)) diff --git a/vitrage/rpc.py b/vitrage/rpc.py index 86f13d48b..49841a9e2 100644 --- a/vitrage/rpc.py +++ b/vitrage/rpc.py @@ -90,7 +90,7 @@ def get_server(target, endpoints, transport, serializer=None): assert transport is not None if profiler: - LOG.info('profiler enabled for RPC server') + LOG.debug('profiler enabled for RPC server') serializer = ProfilerContextSerializer(serializer=serializer) access_policy = dispatcher.DefaultRPCAccessPolicy