support queue based communication between evaluator and notifier

we don't need rpc and it's slower, let's stop using it.

Change-Id: I3fc4d4a4bce732a6e0be05d77181caf5237d1f64
This commit is contained in:
gordon chung 2015-11-18 17:25:53 -05:00
parent 43a1797d46
commit a8ae6b1c76
12 changed files with 211 additions and 84 deletions

View File

@ -36,6 +36,7 @@ from aodh import coordination
from aodh.i18n import _
from aodh import keystone_client
from aodh import messaging
from aodh import queue
from aodh import rpc
from aodh import storage
from aodh.storage import models
@ -60,9 +61,12 @@ OPTS = [
class Evaluator(object):
"""Base class for alarm rule evaluator plugins."""
def __init__(self, conf, notifier):
def __init__(self, conf):
self.conf = conf
self.notifier = notifier
if conf.ipc_protocol == 'rpc':
self.notifier = rpc.RPCAlarmNotifier(self.conf)
else:
self.notifier = queue.AlarmNotifier(self.conf)
self.storage_conn = None
self._ks_client = None
self._alarm_change_notifier = None
@ -121,8 +125,7 @@ class Evaluator(object):
self._storage_conn.update_alarm(alarm)
self._record_change(alarm)
if self.notifier:
self.notifier.notify(alarm, previous, reason, reason_data)
self.notifier.notify(alarm, previous, reason, reason_data)
except Exception:
# retry will occur naturally on the next evaluation
# cycle (unless alarm state reverts in the meantime)
@ -196,7 +199,7 @@ class AlarmService(object):
self.evaluators = extension.ExtensionManager(
namespace=self.EVALUATOR_EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(self.conf, rpc.RPCAlarmNotifier(self.conf),)
invoke_args=(self.conf,)
)
def _evaluate_assigned_alarms(self):

View File

@ -150,8 +150,8 @@ class Alarm(object):
class EventAlarmEvaluator(evaluator.Evaluator):
def __init__(self, conf, notifier):
super(EventAlarmEvaluator, self).__init__(conf, notifier)
def __init__(self, conf):
super(EventAlarmEvaluator, self).__init__(conf)
self.caches = {}
def evaluate_events(self, events):

View File

@ -33,8 +33,8 @@ OPTS = [
class GnocchiThresholdEvaluator(threshold.ThresholdEvaluator):
def __init__(self, conf, notifier):
super(threshold.ThresholdEvaluator, self).__init__(conf, notifier)
def __init__(self, conf):
super(threshold.ThresholdEvaluator, self).__init__(conf)
self.gnocchi_url = conf.gnocchi_url
def _get_headers(self, content_type="application/json"):

View File

@ -44,8 +44,8 @@ class ThresholdEvaluator(evaluator.Evaluator):
# for reporting/ingestion lag
look_back = 1
def __init__(self, conf, notifier):
super(ThresholdEvaluator, self).__init__(conf, notifier)
def __init__(self, conf):
super(ThresholdEvaluator, self).__init__(conf)
auth_config = conf.service_credentials
self._client = ceiloclient.get_client(
2,

View File

@ -19,7 +19,6 @@ from oslo_service import service
from aodh.evaluator import event
from aodh import messaging
from aodh import rpc
from aodh import storage
@ -46,9 +45,7 @@ class EventAlarmEvaluationService(service.Service):
super(EventAlarmEvaluationService, self).__init__()
self.conf = conf
self.storage_conn = storage.get_connection_from_config(self.conf)
self.evaluator = event.EventAlarmEvaluator(
self.conf,
rpc.RPCAlarmNotifier(self.conf))
self.evaluator = event.EventAlarmEvaluator(self.conf)
def start(self):
super(EventAlarmEvaluationService, self).start()

View File

@ -16,6 +16,8 @@
import abc
import logging
from oslo_config import cfg
import oslo_messaging
from oslo_service import service as os_service
from oslo_utils import netutils
import six
@ -24,6 +26,14 @@ from stevedore import extension
from aodh.i18n import _
from aodh import messaging
OPTS = [
cfg.StrOpt('ipc_protocol',
default='queue',
choices=['queue', 'rpc'],
help='The protocol used to communicate between evaluator and '
'notifier services.'),
]
LOG = logging.getLogger(__name__)
@ -57,80 +67,115 @@ class AlarmNotifierService(os_service.Service):
def __init__(self, conf):
super(AlarmNotifierService, self).__init__()
transport = messaging.get_transport(conf)
self.rpc_server = messaging.get_rpc_server(
conf, transport, conf.notifier_rpc_topic, self)
self.notifiers = extension.ExtensionManager(
self.NOTIFIER_EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(conf,))
if conf.ipc_protocol == 'rpc':
self.ipc = 'rpc'
self.rpc_server = messaging.get_rpc_server(
conf, transport, conf.notifier_rpc_topic, self)
else:
self.ipc = 'queue'
target = oslo_messaging.Target(topic=conf.notifier_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[AlarmEndpoint(self.notifiers)])
def start(self):
super(AlarmNotifierService, self).start()
self.rpc_server.start()
if self.ipc == 'rpc':
self.rpc_server.start()
else:
self.listener.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def stop(self):
self.rpc_server.stop()
if self.ipc == 'rpc':
self.rpc_server.stop()
else:
self.listener.stop()
self.listener.wait()
super(AlarmNotifierService, self).stop()
def _handle_action(self, action, alarm_id, alarm_name, severity,
previous, current, reason, reason_data):
try:
action = netutils.urlsplit(action)
except Exception:
LOG.error(
_("Unable to parse action %(action)s for alarm %(alarm_id)s"),
{'action': action, 'alarm_id': alarm_id})
return
try:
notifier = self.notifiers[action.scheme].obj
except KeyError:
scheme = action.scheme
LOG.error(
_("Action %(scheme)s for alarm %(alarm_id)s is unknown, "
"cannot notify"),
{'scheme': scheme, 'alarm_id': alarm_id})
return
try:
LOG.debug("Notifying alarm %(id)s with action %(act)s",
{'id': alarm_id, 'act': action})
notifier.notify(action, alarm_id, alarm_name, severity,
previous, current, reason, reason_data)
except Exception:
LOG.exception(_("Unable to notify alarm %s"), alarm_id)
return
def notify_alarm(self, context, data):
"""Notify that alarm has been triggered.
process_alarm(self.notifiers, data)
:param context: Request context.
:param data: (dict):
- actions, the URL of the action to run; this is mapped to
extensions automatically
- alarm_id, the ID of the alarm that has been triggered
- alarm_name, the name of the alarm that has been triggered
- severity, the level of the alarm that has been triggered
- previous, the previous state of the alarm
- current, the new state the alarm has transitioned to
- reason, the reason the alarm changed its state
- reason_data, a dict representation of the reason
"""
actions = data.get('actions')
if not actions:
LOG.error(_("Unable to notify for an alarm with no action"))
return
def _handle_action(notifiers, action, alarm_id, alarm_name, severity,
previous, current, reason, reason_data):
"""Process action on alarm
for action in actions:
self._handle_action(action,
data.get('alarm_id'),
data.get('alarm_name'),
data.get('severity'),
data.get('previous'),
data.get('current'),
data.get('reason'),
data.get('reason_data'))
:param notifiers: list of possible notifiers.
:param action: The action that is being attended, as a parsed URL.
:param alarm_id: The triggered alarm.
:param alarm_name: The name of triggered alarm.
:param severity: The level of triggered alarm
:param previous: The previous state of the alarm.
:param current: The current state of the alarm.
:param reason: The reason the alarm changed its state.
:param reason_data: A dict representation of the reason.
"""
try:
action = netutils.urlsplit(action)
except Exception:
LOG.error(
_("Unable to parse action %(action)s for alarm %(alarm_id)s"),
{'action': action, 'alarm_id': alarm_id})
return
try:
notifier = notifiers[action.scheme].obj
except KeyError:
scheme = action.scheme
LOG.error(
_("Action %(scheme)s for alarm %(alarm_id)s is unknown, "
"cannot notify"),
{'scheme': scheme, 'alarm_id': alarm_id})
return
try:
LOG.debug("Notifying alarm %(id)s with action %(act)s",
{'id': alarm_id, 'act': action})
notifier.notify(action, alarm_id, alarm_name, severity,
previous, current, reason, reason_data)
except Exception:
LOG.exception(_("Unable to notify alarm %s"), alarm_id)
return
def process_alarm(notifiers, data):
"""Notify that alarm has been triggered.
:param notifiers: list of possible notifiers
:param data: (dict): alarm data
"""
actions = data.get('actions')
if not actions:
LOG.error(_("Unable to notify for an alarm with no action"))
return
for action in actions:
_handle_action(notifiers, action,
data.get('alarm_id'),
data.get('alarm_name'),
data.get('severity'),
data.get('previous'),
data.get('current'),
data.get('reason'),
data.get('reason_data'))
class AlarmEndpoint(object):
def __init__(self, notifiers):
self.notifiers = notifiers
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""Endpoint for alarm notifications"""
process_alarm(self.notifiers, payload)

View File

@ -36,9 +36,11 @@ def list_opts():
aodh.evaluator.event.OPTS,
aodh.evaluator.gnocchi.OPTS,
aodh.event.OPTS,
aodh.notifier.OPTS,
aodh.notifier.rest.OPTS,
aodh.service.OPTS,
aodh.queue.OPTS,
aodh.rpc.OPTS,
aodh.service.OPTS,
aodh.api.controllers.v2.alarms.ALARM_API_OPTS)),
('api',
itertools.chain(

60
aodh/queue.py Normal file
View File

@ -0,0 +1,60 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_context import context
from oslo_log import log
import oslo_messaging
import six
from aodh import messaging
from aodh.storage import models
OPTS = [
cfg.StrOpt('notifier_topic',
default='alarming',
help='The topic that aodh uses for alarm notifier '
'messages.'),
]
LOG = log.getLogger(__name__)
class AlarmNotifier(object):
def __init__(self, conf):
self.ctxt = context.get_admin_context().to_dict()
self.notifier = oslo_messaging.Notifier(
messaging.get_transport(conf),
driver='messagingv2',
publisher_id="alarming.evaluator",
topic=conf.notifier_topic)
def notify(self, alarm, previous, reason, reason_data):
actions = getattr(alarm, models.Alarm.ALARM_ACTIONS_MAP[alarm.state])
if not actions:
LOG.debug('alarm %(alarm_id)s has no action configured '
'for state transition from %(previous)s to '
'state %(state)s, skipping the notification.',
{'alarm_id': alarm.alarm_id,
'previous': previous,
'state': alarm.state})
return
payload = {'actions': actions,
'alarm_id': alarm.alarm_id,
'alarm_name': alarm.name,
'severity': alarm.severity,
'previous': previous,
'current': alarm.state,
'reason': six.text_type(reason),
'reason_data': reason_data}
self.notifier.sample(self.ctxt, 'alarm.update', payload)

View File

@ -36,6 +36,7 @@ LOG = log.getLogger(__name__)
class RPCAlarmNotifier(object):
def __init__(self, conf):
self.ctxt = context.get_admin_context()
transport = messaging.get_transport(conf)
self.client = messaging.get_rpc_client(
transport, topic=conf.notifier_rpc_topic,
@ -51,7 +52,7 @@ class RPCAlarmNotifier(object):
'previous': previous,
'state': alarm.state})
return
self.client.cast(context.get_admin_context(),
self.client.cast(self.ctxt,
'notify_alarm', data={
'actions': actions,
'alarm_id': alarm.alarm_id,

View File

@ -29,8 +29,9 @@ class TestEvaluatorBase(base.BaseTestCase):
self.api_client = mock.Mock()
self.useFixture(mockpatch.Patch('ceilometerclient.client.get_client',
return_value=self.api_client))
self.evaluator = self.EVALUATOR(self.conf)
self.notifier = mock.MagicMock()
self.evaluator = self.EVALUATOR(self.conf, self.notifier)
self.evaluator.notifier = self.notifier
self.storage_conn = mock.MagicMock()
self.evaluator.storage_conn = self.storage_conn
self.evaluator._ks_client = mock.Mock(user_id='fake_user_id',

View File

@ -19,6 +19,7 @@ from oslo_utils import timeutils
from oslotest import base
from aodh import evaluator
from aodh import queue
class TestEvaluatorBaseClass(base.BaseTestCase):
@ -30,15 +31,16 @@ class TestEvaluatorBaseClass(base.BaseTestCase):
self.called = True
raise Exception('Boom!')
def test_base_refresh(self):
notifier = mock.MagicMock()
@mock.patch.object(queue, 'AlarmNotifier')
def test_base_refresh(self, notifier):
notifier.notify = self._notify
class EvaluatorSub(evaluator.Evaluator):
def evaluate(self, alarm):
pass
ev = EvaluatorSub(mock.MagicMock(), notifier)
ev = EvaluatorSub(mock.MagicMock())
ev.notifier = notifier
ev.storage_conn = mock.MagicMock()
ev._record_change = mock.MagicMock()
ev._refresh(mock.MagicMock(), mock.MagicMock(),

View File

@ -41,6 +41,26 @@ NOTIFICATION = dict(alarm_id='foobar',
current='ALARM')
class TestAlarmNotifierService(tests_base.BaseTestCase):
def setUp(self):
super(TestAlarmNotifierService, self).setUp()
conf = service.prepare_service(argv=[], config_files=[])
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
self.setup_messaging(self.CONF)
def test_init_host_rpc(self):
self.CONF.set_override('ipc_protocol', 'rpc')
self.service = notifier.AlarmNotifierService(self.CONF)
self.service.start()
self.service.stop()
def test_init_host_queue(self):
self.service = notifier.AlarmNotifierService(self.CONF)
self.service.start()
self.service.stop()
class TestAlarmNotifier(tests_base.BaseTestCase):
def setUp(self):
@ -53,10 +73,6 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
'oslo_context.context.generate_request_id',
self._fake_generate_request_id))
def test_init_host(self):
self.service.start()
self.service.stop()
def test_notify_alarm(self):
data = {
'actions': ['test://'],