support batch listener for aodh-notifier

This change add batch messages listener for aodh-notifier service, that
allow users configure aodh-notifier to receive messages in batch.

Change-Id: I5d1eb60be656088a83a3428d0387338b081d1b88
Signed-off-by: liusheng <liusheng@huawei.com>
This commit is contained in:
liusheng 2016-04-22 09:30:26 +08:00
parent c93768cc68
commit 520425faf8
6 changed files with 98 additions and 7 deletions

View File

@ -53,6 +53,16 @@ def get_notification_listener(transport, targets, endpoints,
allow_requeue=allow_requeue)
def get_batch_notification_listener(transport, targets, endpoints,
allow_requeue=False,
batch_size=1, batch_timeout=None):
"""Return a configured oslo_messaging notification listener."""
return oslo_messaging.get_batch_notification_listener(
transport, targets, endpoints, executor='threading',
allow_requeue=allow_requeue,
batch_size=batch_size, batch_timeout=batch_timeout)
def get_notifier(transport, publisher_id):
"""Return a configured oslo_messaging notifier."""
notifier = oslo_messaging.Notifier(transport, serializer=_SERIALIZER)

View File

@ -15,6 +15,7 @@
import abc
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_service import service as os_service
@ -28,6 +29,18 @@ from aodh import messaging
LOG = log.getLogger(__name__)
OPTS = [
cfg.IntOpt('batch_size',
default=1,
help='Number of notification messages to wait before '
'dispatching them.'),
cfg.IntOpt('batch_timeout',
default=None,
help='Number of seconds to wait before dispatching samples '
'when batch_size is not reached (None means indefinitely).'
),
]
@six.add_metaclass(abc.ABCMeta)
class AlarmNotifier(object):
@ -69,9 +82,9 @@ class AlarmNotifierService(os_service.Service):
invoke_args=(self.conf,))
target = oslo_messaging.Target(topic=self.conf.notifier_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[AlarmEndpoint(self.notifiers)])
self.listener = messaging.get_batch_notification_listener(
transport, [target], [AlarmEndpoint(self.notifiers)], False,
self.conf.notifier.batch_size, self.conf.notifier.batch_timeout)
self.listener.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
@ -88,9 +101,11 @@ class AlarmEndpoint(object):
def __init__(self, notifiers):
self.notifiers = notifiers
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
def sample(self, notifications):
"""Endpoint for alarm notifications"""
self._process_alarm(self.notifiers, payload)
LOG.debug('Received %s messages in batch.', len(notifications))
for notification in notifications:
self._process_alarm(self.notifiers, notification['payload'])
@staticmethod
def _handle_action(notifiers, action, alarm_id, alarm_name, severity,
@ -133,7 +148,6 @@ class AlarmEndpoint(object):
previous, current, reason, reason_data)
except Exception:
LOG.exception(_LE("Unable to notify alarm %s"), alarm_id)
return
@staticmethod
def _process_alarm(notifiers, data):

View File

@ -68,6 +68,7 @@ def list_opts():
('notifier', aodh.service.NOTIFIER_OPTS),
('service_credentials', aodh.keystone_client.OPTS),
('service_types', aodh.notifier.zaqar.SERVICE_OPTS),
('notifier', aodh.notifier.OPTS)
]

View File

@ -98,6 +98,60 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
data['reason_data']),
notifications[0])
@mock.patch('aodh.notifier.LOG.debug')
def test_notify_alarm_with_batch_listener(self, logger):
data1 = {
'actions': ['test://'],
'alarm_id': 'foobar',
'alarm_name': 'testalarm',
'severity': 'critical',
'previous': 'OK',
'current': 'ALARM',
'reason': 'Everything is on fire',
'reason_data': {'fire': 'everywhere'}
}
data2 = {
'actions': ['test://'],
'alarm_id': 'foobar2',
'alarm_name': 'testalarm2',
'severity': 'low',
'previous': 'ALARM',
'current': 'OK',
'reason': 'Everything is fine',
'reason_data': {'fine': 'fine'}
}
self.service.stop()
self.CONF.set_override("batch_size", 2, 'notifier')
# Init a new service with new configuration
self.svc = notifier.AlarmNotifierService(self.CONF)
self.svc.start()
self._msg_notifier.sample({}, 'alarm.update', data1)
self._msg_notifier.sample({}, 'alarm.update', data2)
time.sleep(1)
notifications = self.svc.notifiers['test'].obj.notifications
self.assertEqual(2, len(notifications))
self.assertEqual((urlparse.urlsplit(data1['actions'][0]),
data1['alarm_id'],
data1['alarm_name'],
data1['severity'],
data1['previous'],
data1['current'],
data1['reason'],
data1['reason_data']),
notifications[0])
self.assertEqual((urlparse.urlsplit(data2['actions'][0]),
data2['alarm_id'],
data2['alarm_name'],
data2['severity'],
data2['previous'],
data2['current'],
data2['reason'],
data2['reason_data']),
notifications[1])
self.assertEqual(mock.call('Received %s messages in batch.', 2),
logger.call_args_list[0])
self.svc.stop()
@staticmethod
def _notification(action):
notification = {}

View File

@ -0,0 +1,12 @@
---
features:
- >
Add support for batch processing of messages from queue. This will allow
the aodh-notifier to grab multiple messages per thread to enable more
efficient processing.
upgrade:
- >
batch_size and batch_timeout configuration options are added to [notifier]
section of configuration. The batch_size controls the number of messages to
grab before processing. Similarly, the batch_timeout defines the wait time
before processing.

View File

@ -19,7 +19,7 @@ oslo.service>=0.1.0 # Apache-2.0
PasteDeploy>=1.5.0
pbr<2.0,>=0.11
pecan>=0.8.0
oslo.messaging>2.6.1,!=2.8.0 # Apache-2.0
oslo.messaging>=4.5.0 # Apache-2.0
oslo.middleware>=3.0.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
oslo.utils>=3.5.0 # Apache-2.0