diff --git a/aodh/evaluator/__init__.py b/aodh/evaluator/__init__.py index b9bb819db..f081b7705 100644 --- a/aodh/evaluator/__init__.py +++ b/aodh/evaluator/__init__.py @@ -36,6 +36,7 @@ from aodh import coordination from aodh.i18n import _ from aodh import keystone_client from aodh import messaging +from aodh import queue from aodh import rpc from aodh import storage from aodh.storage import models @@ -60,9 +61,12 @@ OPTS = [ class Evaluator(object): """Base class for alarm rule evaluator plugins.""" - def __init__(self, conf, notifier): + def __init__(self, conf): self.conf = conf - self.notifier = notifier + if conf.ipc_protocol == 'rpc': + self.notifier = rpc.RPCAlarmNotifier(self.conf) + else: + self.notifier = queue.AlarmNotifier(self.conf) self.storage_conn = None self._ks_client = None self._alarm_change_notifier = None @@ -121,8 +125,7 @@ class Evaluator(object): self._storage_conn.update_alarm(alarm) self._record_change(alarm) - if self.notifier: - self.notifier.notify(alarm, previous, reason, reason_data) + self.notifier.notify(alarm, previous, reason, reason_data) except Exception: # retry will occur naturally on the next evaluation # cycle (unless alarm state reverts in the meantime) @@ -196,7 +199,7 @@ class AlarmService(object): self.evaluators = extension.ExtensionManager( namespace=self.EVALUATOR_EXTENSIONS_NAMESPACE, invoke_on_load=True, - invoke_args=(self.conf, rpc.RPCAlarmNotifier(self.conf),) + invoke_args=(self.conf,) ) def _evaluate_assigned_alarms(self): diff --git a/aodh/evaluator/event.py b/aodh/evaluator/event.py index 8a8df7c63..06cb25102 100644 --- a/aodh/evaluator/event.py +++ b/aodh/evaluator/event.py @@ -150,8 +150,8 @@ class Alarm(object): class EventAlarmEvaluator(evaluator.Evaluator): - def __init__(self, conf, notifier): - super(EventAlarmEvaluator, self).__init__(conf, notifier) + def __init__(self, conf): + super(EventAlarmEvaluator, self).__init__(conf) self.caches = {} def evaluate_events(self, events): diff --git a/aodh/evaluator/gnocchi.py b/aodh/evaluator/gnocchi.py index 51e99eed3..a1a064328 100644 --- a/aodh/evaluator/gnocchi.py +++ b/aodh/evaluator/gnocchi.py @@ -33,8 +33,8 @@ OPTS = [ class GnocchiThresholdEvaluator(threshold.ThresholdEvaluator): - def __init__(self, conf, notifier): - super(threshold.ThresholdEvaluator, self).__init__(conf, notifier) + def __init__(self, conf): + super(threshold.ThresholdEvaluator, self).__init__(conf) self.gnocchi_url = conf.gnocchi_url def _get_headers(self, content_type="application/json"): diff --git a/aodh/evaluator/threshold.py b/aodh/evaluator/threshold.py index 91e13ae5c..dce35acb3 100644 --- a/aodh/evaluator/threshold.py +++ b/aodh/evaluator/threshold.py @@ -44,8 +44,8 @@ class ThresholdEvaluator(evaluator.Evaluator): # for reporting/ingestion lag look_back = 1 - def __init__(self, conf, notifier): - super(ThresholdEvaluator, self).__init__(conf, notifier) + def __init__(self, conf): + super(ThresholdEvaluator, self).__init__(conf) auth_config = conf.service_credentials self._client = ceiloclient.get_client( 2, diff --git a/aodh/event.py b/aodh/event.py index 190913c0d..f17696862 100644 --- a/aodh/event.py +++ b/aodh/event.py @@ -19,7 +19,6 @@ from oslo_service import service from aodh.evaluator import event from aodh import messaging -from aodh import rpc from aodh import storage @@ -46,9 +45,7 @@ class EventAlarmEvaluationService(service.Service): super(EventAlarmEvaluationService, self).__init__() self.conf = conf self.storage_conn = storage.get_connection_from_config(self.conf) - self.evaluator = event.EventAlarmEvaluator( - self.conf, - rpc.RPCAlarmNotifier(self.conf)) + self.evaluator = event.EventAlarmEvaluator(self.conf) def start(self): super(EventAlarmEvaluationService, self).start() diff --git a/aodh/notifier/__init__.py b/aodh/notifier/__init__.py index 8f09d1dff..b43e9d363 100644 --- a/aodh/notifier/__init__.py +++ b/aodh/notifier/__init__.py @@ -16,6 +16,8 @@ import abc import logging +from oslo_config import cfg +import oslo_messaging from oslo_service import service as os_service from oslo_utils import netutils import six @@ -24,6 +26,14 @@ from stevedore import extension from aodh.i18n import _ from aodh import messaging +OPTS = [ + cfg.StrOpt('ipc_protocol', + default='queue', + choices=['queue', 'rpc'], + help='The protocol used to communicate between evaluator and ' + 'notifier services.'), +] + LOG = logging.getLogger(__name__) @@ -57,80 +67,115 @@ class AlarmNotifierService(os_service.Service): def __init__(self, conf): super(AlarmNotifierService, self).__init__() transport = messaging.get_transport(conf) - self.rpc_server = messaging.get_rpc_server( - conf, transport, conf.notifier_rpc_topic, self) self.notifiers = extension.ExtensionManager( self.NOTIFIER_EXTENSIONS_NAMESPACE, invoke_on_load=True, invoke_args=(conf,)) + if conf.ipc_protocol == 'rpc': + self.ipc = 'rpc' + self.rpc_server = messaging.get_rpc_server( + conf, transport, conf.notifier_rpc_topic, self) + else: + self.ipc = 'queue' + target = oslo_messaging.Target(topic=conf.notifier_topic) + self.listener = messaging.get_notification_listener( + transport, [target], + [AlarmEndpoint(self.notifiers)]) + def start(self): super(AlarmNotifierService, self).start() - self.rpc_server.start() + if self.ipc == 'rpc': + self.rpc_server.start() + else: + self.listener.start() # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) def stop(self): - self.rpc_server.stop() + if self.ipc == 'rpc': + self.rpc_server.stop() + else: + self.listener.stop() + self.listener.wait() super(AlarmNotifierService, self).stop() - def _handle_action(self, action, alarm_id, alarm_name, severity, - previous, current, reason, reason_data): - try: - action = netutils.urlsplit(action) - except Exception: - LOG.error( - _("Unable to parse action %(action)s for alarm %(alarm_id)s"), - {'action': action, 'alarm_id': alarm_id}) - return - - try: - notifier = self.notifiers[action.scheme].obj - except KeyError: - scheme = action.scheme - LOG.error( - _("Action %(scheme)s for alarm %(alarm_id)s is unknown, " - "cannot notify"), - {'scheme': scheme, 'alarm_id': alarm_id}) - return - - try: - LOG.debug("Notifying alarm %(id)s with action %(act)s", - {'id': alarm_id, 'act': action}) - notifier.notify(action, alarm_id, alarm_name, severity, - previous, current, reason, reason_data) - except Exception: - LOG.exception(_("Unable to notify alarm %s"), alarm_id) - return - def notify_alarm(self, context, data): - """Notify that alarm has been triggered. + process_alarm(self.notifiers, data) - :param context: Request context. - :param data: (dict): - - actions, the URL of the action to run; this is mapped to - extensions automatically - - alarm_id, the ID of the alarm that has been triggered - - alarm_name, the name of the alarm that has been triggered - - severity, the level of the alarm that has been triggered - - previous, the previous state of the alarm - - current, the new state the alarm has transitioned to - - reason, the reason the alarm changed its state - - reason_data, a dict representation of the reason - """ - actions = data.get('actions') - if not actions: - LOG.error(_("Unable to notify for an alarm with no action")) - return +def _handle_action(notifiers, action, alarm_id, alarm_name, severity, + previous, current, reason, reason_data): + """Process action on alarm - for action in actions: - self._handle_action(action, - data.get('alarm_id'), - data.get('alarm_name'), - data.get('severity'), - data.get('previous'), - data.get('current'), - data.get('reason'), - data.get('reason_data')) + :param notifiers: list of possible notifiers. + :param action: The action that is being attended, as a parsed URL. + :param alarm_id: The triggered alarm. + :param alarm_name: The name of triggered alarm. + :param severity: The level of triggered alarm + :param previous: The previous state of the alarm. + :param current: The current state of the alarm. + :param reason: The reason the alarm changed its state. + :param reason_data: A dict representation of the reason. + """ + + try: + action = netutils.urlsplit(action) + except Exception: + LOG.error( + _("Unable to parse action %(action)s for alarm %(alarm_id)s"), + {'action': action, 'alarm_id': alarm_id}) + return + + try: + notifier = notifiers[action.scheme].obj + except KeyError: + scheme = action.scheme + LOG.error( + _("Action %(scheme)s for alarm %(alarm_id)s is unknown, " + "cannot notify"), + {'scheme': scheme, 'alarm_id': alarm_id}) + return + + try: + LOG.debug("Notifying alarm %(id)s with action %(act)s", + {'id': alarm_id, 'act': action}) + notifier.notify(action, alarm_id, alarm_name, severity, + previous, current, reason, reason_data) + except Exception: + LOG.exception(_("Unable to notify alarm %s"), alarm_id) + return + + +def process_alarm(notifiers, data): + """Notify that alarm has been triggered. + + :param notifiers: list of possible notifiers + :param data: (dict): alarm data + """ + + actions = data.get('actions') + if not actions: + LOG.error(_("Unable to notify for an alarm with no action")) + return + + for action in actions: + _handle_action(notifiers, action, + data.get('alarm_id'), + data.get('alarm_name'), + data.get('severity'), + data.get('previous'), + data.get('current'), + data.get('reason'), + data.get('reason_data')) + + +class AlarmEndpoint(object): + + def __init__(self, notifiers): + self.notifiers = notifiers + + def sample(self, ctxt, publisher_id, event_type, payload, metadata): + """Endpoint for alarm notifications""" + process_alarm(self.notifiers, payload) diff --git a/aodh/opts.py b/aodh/opts.py index ec0f395a9..ded7a5c82 100644 --- a/aodh/opts.py +++ b/aodh/opts.py @@ -36,9 +36,11 @@ def list_opts(): aodh.evaluator.event.OPTS, aodh.evaluator.gnocchi.OPTS, aodh.event.OPTS, + aodh.notifier.OPTS, aodh.notifier.rest.OPTS, - aodh.service.OPTS, + aodh.queue.OPTS, aodh.rpc.OPTS, + aodh.service.OPTS, aodh.api.controllers.v2.alarms.ALARM_API_OPTS)), ('api', itertools.chain( diff --git a/aodh/queue.py b/aodh/queue.py new file mode 100644 index 000000000..cbf3d8a25 --- /dev/null +++ b/aodh/queue.py @@ -0,0 +1,60 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_context import context +from oslo_log import log +import oslo_messaging +import six + +from aodh import messaging +from aodh.storage import models + +OPTS = [ + cfg.StrOpt('notifier_topic', + default='alarming', + help='The topic that aodh uses for alarm notifier ' + 'messages.'), +] + +LOG = log.getLogger(__name__) + + +class AlarmNotifier(object): + def __init__(self, conf): + self.ctxt = context.get_admin_context().to_dict() + self.notifier = oslo_messaging.Notifier( + messaging.get_transport(conf), + driver='messagingv2', + publisher_id="alarming.evaluator", + topic=conf.notifier_topic) + + def notify(self, alarm, previous, reason, reason_data): + actions = getattr(alarm, models.Alarm.ALARM_ACTIONS_MAP[alarm.state]) + if not actions: + LOG.debug('alarm %(alarm_id)s has no action configured ' + 'for state transition from %(previous)s to ' + 'state %(state)s, skipping the notification.', + {'alarm_id': alarm.alarm_id, + 'previous': previous, + 'state': alarm.state}) + return + payload = {'actions': actions, + 'alarm_id': alarm.alarm_id, + 'alarm_name': alarm.name, + 'severity': alarm.severity, + 'previous': previous, + 'current': alarm.state, + 'reason': six.text_type(reason), + 'reason_data': reason_data} + self.notifier.sample(self.ctxt, 'alarm.update', payload) diff --git a/aodh/rpc.py b/aodh/rpc.py index 4dc6ea3a9..71cdfb86b 100644 --- a/aodh/rpc.py +++ b/aodh/rpc.py @@ -36,6 +36,7 @@ LOG = log.getLogger(__name__) class RPCAlarmNotifier(object): def __init__(self, conf): + self.ctxt = context.get_admin_context() transport = messaging.get_transport(conf) self.client = messaging.get_rpc_client( transport, topic=conf.notifier_rpc_topic, @@ -51,7 +52,7 @@ class RPCAlarmNotifier(object): 'previous': previous, 'state': alarm.state}) return - self.client.cast(context.get_admin_context(), + self.client.cast(self.ctxt, 'notify_alarm', data={ 'actions': actions, 'alarm_id': alarm.alarm_id, diff --git a/aodh/tests/unit/evaluator/base.py b/aodh/tests/unit/evaluator/base.py index 46c1311eb..05d485bcd 100644 --- a/aodh/tests/unit/evaluator/base.py +++ b/aodh/tests/unit/evaluator/base.py @@ -29,8 +29,9 @@ class TestEvaluatorBase(base.BaseTestCase): self.api_client = mock.Mock() self.useFixture(mockpatch.Patch('ceilometerclient.client.get_client', return_value=self.api_client)) + self.evaluator = self.EVALUATOR(self.conf) self.notifier = mock.MagicMock() - self.evaluator = self.EVALUATOR(self.conf, self.notifier) + self.evaluator.notifier = self.notifier self.storage_conn = mock.MagicMock() self.evaluator.storage_conn = self.storage_conn self.evaluator._ks_client = mock.Mock(user_id='fake_user_id', diff --git a/aodh/tests/unit/evaluator/test_base.py b/aodh/tests/unit/evaluator/test_base.py index 5bbf628c2..b13ed6754 100644 --- a/aodh/tests/unit/evaluator/test_base.py +++ b/aodh/tests/unit/evaluator/test_base.py @@ -19,6 +19,7 @@ from oslo_utils import timeutils from oslotest import base from aodh import evaluator +from aodh import queue class TestEvaluatorBaseClass(base.BaseTestCase): @@ -30,15 +31,16 @@ class TestEvaluatorBaseClass(base.BaseTestCase): self.called = True raise Exception('Boom!') - def test_base_refresh(self): - notifier = mock.MagicMock() + @mock.patch.object(queue, 'AlarmNotifier') + def test_base_refresh(self, notifier): notifier.notify = self._notify class EvaluatorSub(evaluator.Evaluator): def evaluate(self, alarm): pass - ev = EvaluatorSub(mock.MagicMock(), notifier) + ev = EvaluatorSub(mock.MagicMock()) + ev.notifier = notifier ev.storage_conn = mock.MagicMock() ev._record_change = mock.MagicMock() ev._refresh(mock.MagicMock(), mock.MagicMock(), diff --git a/aodh/tests/unit/test_notifier.py b/aodh/tests/unit/test_notifier.py index 55dd135f6..45b1e80d5 100644 --- a/aodh/tests/unit/test_notifier.py +++ b/aodh/tests/unit/test_notifier.py @@ -41,6 +41,26 @@ NOTIFICATION = dict(alarm_id='foobar', current='ALARM') +class TestAlarmNotifierService(tests_base.BaseTestCase): + + def setUp(self): + super(TestAlarmNotifierService, self).setUp() + conf = service.prepare_service(argv=[], config_files=[]) + self.CONF = self.useFixture(fixture_config.Config(conf)).conf + self.setup_messaging(self.CONF) + + def test_init_host_rpc(self): + self.CONF.set_override('ipc_protocol', 'rpc') + self.service = notifier.AlarmNotifierService(self.CONF) + self.service.start() + self.service.stop() + + def test_init_host_queue(self): + self.service = notifier.AlarmNotifierService(self.CONF) + self.service.start() + self.service.stop() + + class TestAlarmNotifier(tests_base.BaseTestCase): def setUp(self): @@ -53,10 +73,6 @@ class TestAlarmNotifier(tests_base.BaseTestCase): 'oslo_context.context.generate_request_id', self._fake_generate_request_id)) - def test_init_host(self): - self.service.start() - self.service.stop() - def test_notify_alarm(self): data = { 'actions': ['test://'],