diff --git a/vitrage/common/constants.py b/vitrage/common/constants.py index c84e396ab..435808445 100644 --- a/vitrage/common/constants.py +++ b/vitrage/common/constants.py @@ -114,6 +114,7 @@ class NotifierEventTypes(object): DEACTIVATE_DEDUCED_ALARM_EVENT = 'vitrage.deduced_alarm.deactivate' ACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.activate' DEACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.deactivate' + EXECUTE_EXTERNAL_ACTION = 'vitrage.execute_external_action' class TemplateTopologyFields(object): diff --git a/vitrage/evaluator/actions/action_executor.py b/vitrage/evaluator/actions/action_executor.py index 685f0c0cd..4edf3abb5 100644 --- a/vitrage/evaluator/actions/action_executor.py +++ b/vitrage/evaluator/actions/action_executor.py @@ -14,6 +14,7 @@ import copy +from oslo_log import log from oslo_utils import importutils from vitrage.common.constants import DatasourceAction as AType @@ -23,9 +24,11 @@ from vitrage.evaluator.actions.base import ActionMode from vitrage.evaluator.actions.base import ActionType from vitrage.evaluator.actions.evaluator_event_transformer \ import VITRAGE_DATASOURCE +from vitrage.evaluator.actions.notifier import EvaluatorNotifier from vitrage.evaluator.actions.recipes.action_steps import ADD_EDGE from vitrage.evaluator.actions.recipes.action_steps import ADD_VERTEX from vitrage.evaluator.actions.recipes.action_steps import EXECUTE_EXTERNAL +from vitrage.evaluator.actions.recipes.action_steps import EXECUTION_ENGINE from vitrage.evaluator.actions.recipes.action_steps import REMOVE_EDGE from vitrage.evaluator.actions.recipes.action_steps import REMOVE_VERTEX from vitrage.evaluator.actions.recipes.action_steps import UPDATE_VERTEX @@ -38,11 +41,14 @@ from vitrage.evaluator.actions.recipes.raise_alarm import RaiseAlarm from vitrage.evaluator.actions.recipes.set_state import SetState from vitrage.utils import datetime as datetime_utils +LOG = log.getLogger(__name__) + class ActionExecutor(object): - def __init__(self, event_queue): + def __init__(self, conf, event_queue): self.event_queue = event_queue + self.notifier = EvaluatorNotifier(conf) self.action_recipes = ActionExecutor._register_action_recipes() self.action_step_defs = { @@ -106,9 +112,12 @@ class ActionExecutor(object): def _execute_external(self, params): - # TODO(ifat_afek): send to a dedicated queue - # external_engine = params[EXECUTION_ENGINE] - pass + # Send a notification to the external engine + external_engine = params[EXECUTION_ENGINE] + LOG.debug('Notifying external engine %s. Properties: %s', + external_engine, + str(params)) + self.notifier.notify(external_engine, params) @staticmethod def _add_default_properties(event): diff --git a/vitrage/evaluator/actions/notifier.py b/vitrage/evaluator/actions/notifier.py new file mode 100644 index 000000000..304d47637 --- /dev/null +++ b/vitrage/evaluator/actions/notifier.py @@ -0,0 +1,75 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_log import log +import oslo_messaging + +from vitrage.common.constants import NotifierEventTypes +from vitrage.messaging import get_transport + + +LOG = log.getLogger(__name__) + + +class EvaluatorNotifier(object): + """Allows writing to message bus""" + def __init__(self, conf): + self.oslo_notifiers = {} + try: + notifier_plugins = conf.notifiers + + LOG.debug('notifier_plugins: %s', notifier_plugins) + + if not notifier_plugins: + LOG.info('Evaluator Notifier is disabled') + return + + for notifier in notifier_plugins: + LOG.debug('Adding evaluator notifier %s', notifier) + + self.oslo_notifiers[notifier] = oslo_messaging.Notifier( + get_transport(conf), + driver='messagingv2', + publisher_id='vitrage.evaluator', + topics=[notifier]) + + except Exception as e: + LOG.info('Evaluator Notifier - missing configuration %s' % str(e)) + + @property + def enabled(self): + return len(self.oslo_notifiers) > 0 + + def notify(self, external_engine, properties): + """Send a message to the wanted notifier + + :param external_engine: the external engine that should handle the + notification and execute an action + :param properties: Properties to be processed by the external engine + """ + + LOG.debug('external_engine: %s, properties: %s', + external_engine, + str(properties)) + + try: + if external_engine in self.oslo_notifiers: + LOG.debug('Notifying %s', external_engine) + self.oslo_notifiers[external_engine].info( + {}, + NotifierEventTypes.EXECUTE_EXTERNAL_ACTION, + properties) + except Exception as e: + LOG.exception('Cannot notify - %s - %s', + NotifierEventTypes.EXECUTE_EXTERNAL_ACTION, + e) diff --git a/vitrage/evaluator/scenario_evaluator.py b/vitrage/evaluator/scenario_evaluator.py index d0da932c4..0bc333e36 100644 --- a/vitrage/evaluator/scenario_evaluator.py +++ b/vitrage/evaluator/scenario_evaluator.py @@ -55,7 +55,7 @@ class ScenarioEvaluator(object): self.conf = conf self._scenario_repo = scenario_repo self._entity_graph = entity_graph - self._action_executor = ActionExecutor(event_queue) + self._action_executor = ActionExecutor(conf, event_queue) self._entity_graph.subscribe(self.process_event) self._action_tracker = ActionTracker(DatasourceInfoMapper(self.conf)) self.enabled = enabled diff --git a/vitrage/notifier/plugins/base.py b/vitrage/notifier/plugins/base.py index 3e031c99b..6706265aa 100644 --- a/vitrage/notifier/plugins/base.py +++ b/vitrage/notifier/plugins/base.py @@ -30,3 +30,12 @@ class NotifierBase(object): @abc.abstractmethod def get_notifier_name(): pass + + @staticmethod + def use_private_topic(): + return False + + @staticmethod + def info(self, ctxt, publisher_id, event_type, payload, metadata): + """An endpoint for notifiers that use a private topic""" + pass diff --git a/vitrage/notifier/service.py b/vitrage/notifier/service.py index ea6c9d466..f1cbf6d07 100644 --- a/vitrage/notifier/service.py +++ b/vitrage/notifier/service.py @@ -29,25 +29,24 @@ class VitrageNotifierService(os_service.Service): super(VitrageNotifierService, self).__init__() self.conf = conf self.notifiers = self.get_notifier_plugins(conf) - transport = messaging.get_transport(conf) - target = oslo_messaging.Target(topic=conf.entity_graph.notifier_topic) - self.listener = messaging.get_notification_listener( - transport, [target], - [VitrageEventEndpoint(self.notifiers)]) + self._init_listeners(self.conf) def start(self): LOG.info("Vitrage Notifier Service - Starting...") super(VitrageNotifierService, self).start() - self.listener.start() + for listener in self.listeners: + listener.start() LOG.info("Vitrage Notifier Service - Started!") def stop(self, graceful=False): LOG.info("Vitrage Notifier Service - Stopping...") - self.listener.stop() - self.listener.wait() + for listener in self.listeners: + listener.stop() + listener.wait() + super(VitrageNotifierService, self).stop(graceful) LOG.info("Vitrage Notifier Service - Stopped!") @@ -67,8 +66,29 @@ class VitrageNotifierService(os_service.Service): conf)) return notifiers + def _init_listeners(self, conf): + self.listeners = [] + transport = messaging.get_transport(conf) -class VitrageEventEndpoint(object): + self._init_notifier(transport=transport, + topic=conf.entity_graph.notifier_topic, + endpoint=VitrageDefaultEventEndpoint( + self.notifiers)) + + for notifier in self.notifiers: + if notifier.use_private_topic(): + self._init_notifier(transport=transport, + topic=notifier.get_notifier_name(), + endpoint=notifier) + + def _init_notifier(self, transport, topic, endpoint): + LOG.debug('Initializing notifier with topic %s', topic) + + self.listeners.append(messaging.get_notification_listener( + transport, [oslo_messaging.Target(topic=topic)], [endpoint])) + + +class VitrageDefaultEventEndpoint(object): def __init__(self, notifiers): self.notifiers = notifiers diff --git a/vitrage/tests/functional/evaluator/test_action_executor.py b/vitrage/tests/functional/evaluator/test_action_executor.py index f656abefc..62accfeef 100644 --- a/vitrage/tests/functional/evaluator/test_action_executor.py +++ b/vitrage/tests/functional/evaluator/test_action_executor.py @@ -71,7 +71,7 @@ class TestActionExecutor(TestFunctionalBase): action_spec = ActionSpecs(ActionType.SET_STATE, targets, props) event_queue = queue.Queue() - action_executor = ActionExecutor(event_queue) + action_executor = ActionExecutor(self.conf, event_queue) # Test Action - do action_executor.execute(action_spec, ActionMode.DO) @@ -122,7 +122,7 @@ class TestActionExecutor(TestFunctionalBase): action_spec = ActionSpecs(ActionType.MARK_DOWN, targets, props) event_queue = queue.Queue() - action_executor = ActionExecutor(event_queue) + action_executor = ActionExecutor(self.conf, event_queue) # Test Action - do action_executor.execute(action_spec, ActionMode.DO) @@ -178,7 +178,7 @@ class TestActionExecutor(TestFunctionalBase): {}) event_queue = queue.Queue() - action_executor = ActionExecutor(event_queue) + action_executor = ActionExecutor(self.conf, event_queue) before_edge = processor.entity_graph.get_edge(alarm2.vertex_id, alarm1.vertex_id, @@ -221,7 +221,7 @@ class TestActionExecutor(TestFunctionalBase): before_alarms = processor.entity_graph.get_vertices( vertex_attr_filter=alarm_vertex_attrs) event_queue = queue.Queue() - action_executor = ActionExecutor(event_queue) + action_executor = ActionExecutor(self.conf, event_queue) # Test Action action_executor.execute(action_spec, ActionMode.DO) @@ -285,7 +285,7 @@ class TestActionExecutor(TestFunctionalBase): vertex_attr_filter=alarm_vertex_attrs) event_queue = queue.Queue() - action_executor = ActionExecutor(event_queue) + action_executor = ActionExecutor(self.conf, event_queue) # Test Action - undo action_executor.execute(action_spec, ActionMode.UNDO)