add notifier publisher for events

this allows the ability for deployers to push events to MQ rather
than send data directly to a database. the notifier publisher
supports the same options as the sample publisher.

Change-Id: Ia2cdfe351dad2336bce8519da3e568f226a2593a
Implements: blueprint notification-pipelines
This commit is contained in:
gordon chung
2015-01-15 18:52:17 -05:00
parent c834442ff2
commit de0c2944b7
3 changed files with 102 additions and 60 deletions

View File

@@ -24,7 +24,6 @@ from oslo_config import cfg
import six import six
import six.moves.urllib.parse as urlparse import six.moves.urllib.parse as urlparse
import ceilometer
from ceilometer.i18n import _ from ceilometer.i18n import _
from ceilometer import messaging from ceilometer import messaging
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
@@ -48,6 +47,11 @@ NOTIFIER_OPTS = [
help='The topic that ceilometer uses for metering ' help='The topic that ceilometer uses for metering '
'notifications.', 'notifications.',
), ),
cfg.StrOpt('event_topic',
default='event',
help='The topic that ceilometer uses for event '
'notifications.',
),
cfg.StrOpt('metering_driver', cfg.StrOpt('metering_driver',
default='messagingv2', default='messagingv2',
help='The driver that ceilometer uses for metering ' help='The driver that ceilometer uses for metering '
@@ -102,7 +106,6 @@ class MessagingPublisher(publisher.PublisherBase):
cfg.CONF.publisher.metering_secret) cfg.CONF.publisher.metering_secret)
for sample in samples for sample in samples
] ]
topic = cfg.CONF.publisher_rpc.metering_topic topic = cfg.CONF.publisher_rpc.metering_topic
self.local_queue.append((context, topic, meters)) self.local_queue.append((context, topic, meters))
@@ -142,18 +145,18 @@ class MessagingPublisher(publisher.PublisherBase):
def _process_queue(self, queue, policy): def _process_queue(self, queue, policy):
while queue: while queue:
context, topic, meters = queue[0] context, topic, data = queue[0]
try: try:
self._send(context, topic, meters) self._send(context, topic, data)
except oslo.messaging.MessageDeliveryFailure: except oslo.messaging.MessageDeliveryFailure:
samples = sum([len(m) for __, __, m in queue]) data = sum([len(m) for __, __, m in queue])
if policy == 'queue': if policy == 'queue':
LOG.warn(_("Failed to publish %d samples, queue them"), LOG.warn(_("Failed to publish %d datapoints, queue them"),
samples) data)
return queue return queue
elif policy == 'drop': elif policy == 'drop':
LOG.warn(_("Failed to publish %d samples, dropping them"), LOG.warn(_("Failed to publish %d datapoints, "
samples) "dropping them"), data)
return [] return []
# default, occur only if rabbit_max_retries > 0 # default, occur only if rabbit_max_retries > 0
raise raise
@@ -167,7 +170,12 @@ class MessagingPublisher(publisher.PublisherBase):
:param context: Execution context from the service or RPC call :param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation :param events: events from pipeline after transformation
""" """
raise ceilometer.NotImplementedError ev_list = [utils.message_from_event(
event, cfg.CONF.publisher.metering_secret) for event in events]
topic = cfg.CONF.publisher_notifier.event_topic
self.local_queue.append((context, topic, ev_list))
self.flush()
@abc.abstractmethod @abc.abstractmethod
def _send(self, context, topic, meters): def _send(self, context, topic, meters):
@@ -192,16 +200,28 @@ class RPCPublisher(MessagingPublisher):
class NotifierPublisher(MessagingPublisher): class NotifierPublisher(MessagingPublisher):
def __init__(self, parsed_url): def __init__(self, parsed_url, topic):
super(NotifierPublisher, self).__init__(parsed_url) super(NotifierPublisher, self).__init__(parsed_url)
self.notifier = oslo.messaging.Notifier( self.notifier = oslo.messaging.Notifier(
messaging.get_transport(), messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.metering_driver, driver=cfg.CONF.publisher_notifier.metering_driver,
publisher_id='metering.publisher.%s' % cfg.CONF.host, publisher_id='telemetry.publisher.%s' % cfg.CONF.host,
topic=cfg.CONF.publisher_notifier.metering_topic, topic=topic,
retry=self.retry retry=self.retry
) )
def _send(self, context, event_type, meters): def _send(self, context, event_type, data):
self.notifier.sample(context.to_dict(), event_type=event_type, self.notifier.sample(context.to_dict(), event_type=event_type,
payload=meters) payload=data)
class SampleNotifierPublisher(NotifierPublisher):
def __init__(self, parsed_url):
super(SampleNotifierPublisher, self).__init__(
parsed_url, cfg.CONF.publisher_notifier.metering_topic)
class EventNotifierPublisher(NotifierPublisher):
def __init__(self, parsed_url):
super(EventNotifierPublisher, self).__init__(
parsed_url, cfg.CONF.publisher_notifier.event_topic)

View File

@@ -15,6 +15,7 @@
"""Tests for ceilometer/publisher/messaging.py """Tests for ceilometer/publisher/messaging.py
""" """
import datetime import datetime
import uuid
import eventlet import eventlet
import mock import mock
@@ -24,6 +25,7 @@ from oslo_context import context
from oslo_utils import netutils from oslo_utils import netutils
import testscenarios.testcase import testscenarios.testcase
from ceilometer.event.storage import models as event
from ceilometer import messaging from ceilometer import messaging
from ceilometer.publisher import messaging as msg_publisher from ceilometer.publisher import messaging as msg_publisher
from ceilometer import sample from ceilometer import sample
@@ -31,7 +33,15 @@ from ceilometer.tests import base as tests_base
class BasePublisherTestCase(tests_base.BaseTestCase): class BasePublisherTestCase(tests_base.BaseTestCase):
test_data = [ test_event_data = [
event.Event(message_id=uuid.uuid4(),
event_type='event_%d' % i,
generated=datetime.datetime.utcnow(),
traits=[])
for i in range(0, 5)
]
test_sample_data = [
sample.Sample( sample.Sample(
name='test', name='test',
type=sample.TYPE_CUMULATIVE, type=sample.TYPE_CUMULATIVE,
@@ -93,7 +103,6 @@ class BasePublisherTestCase(tests_base.BaseTestCase):
super(BasePublisherTestCase, self).setUp() super(BasePublisherTestCase, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF = self.useFixture(fixture_config.Config()).conf
self.setup_messaging(self.CONF) self.setup_messaging(self.CONF)
self.published = []
class RpcOnlyPublisherTest(BasePublisherTestCase): class RpcOnlyPublisherTest(BasePublisherTestCase):
@@ -110,14 +119,15 @@ class RpcOnlyPublisherTest(BasePublisherTestCase):
collector.start() collector.start()
eventlet.sleep() eventlet.sleep()
publisher.publish_samples(context.RequestContext(), publisher.publish_samples(context.RequestContext(),
self.test_data) self.test_sample_data)
collector.wait() collector.wait()
class Matcher(object): class Matcher(object):
@staticmethod @staticmethod
def __eq__(data): def __eq__(data):
for i, sample_item in enumerate(data): for i, sample_item in enumerate(data):
if sample_item['counter_name'] != self.test_data[i].name: if (sample_item['counter_name'] !=
self.test_sample_data[i].name):
return False return False
return True return True
@@ -131,7 +141,7 @@ class RpcOnlyPublisherTest(BasePublisherTestCase):
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
prepare.return_value = cast_context prepare.return_value = cast_context
publisher.publish_samples(mock.MagicMock(), publisher.publish_samples(mock.MagicMock(),
self.test_data) self.test_sample_data)
prepare.assert_called_once_with( prepare.assert_called_once_with(
topic=self.CONF.publisher_rpc.metering_topic) topic=self.CONF.publisher_rpc.metering_topic)
@@ -143,7 +153,7 @@ class RpcOnlyPublisherTest(BasePublisherTestCase):
netutils.urlsplit('rpc://?per_meter_topic=1')) netutils.urlsplit('rpc://?per_meter_topic=1'))
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
publisher.publish_samples(mock.MagicMock(), publisher.publish_samples(mock.MagicMock(),
self.test_data) self.test_sample_data)
class MeterGroupMatcher(object): class MeterGroupMatcher(object):
def __eq__(self, meters): def __eq__(self, meters):
@@ -169,12 +179,28 @@ class RpcOnlyPublisherTest(BasePublisherTestCase):
class TestPublisher(testscenarios.testcase.WithScenarios, class TestPublisher(testscenarios.testcase.WithScenarios,
BasePublisherTestCase): BasePublisherTestCase):
scenarios = [ scenarios = [
('notifier', dict(protocol="notifier", ('notifier',
publisher_cls=msg_publisher.NotifierPublisher)), dict(protocol="notifier",
publisher_cls=msg_publisher.SampleNotifierPublisher,
test_data=BasePublisherTestCase.test_sample_data,
pub_func='publish_samples', attr='source')),
('event_notifier',
dict(protocol="notifier",
publisher_cls=msg_publisher.EventNotifierPublisher,
test_data=BasePublisherTestCase.test_event_data,
pub_func='publish_events', attr='event_type')),
('rpc', dict(protocol="rpc", ('rpc', dict(protocol="rpc",
publisher_cls=msg_publisher.RPCPublisher)), publisher_cls=msg_publisher.RPCPublisher,
test_data=BasePublisherTestCase.test_sample_data,
pub_func='publish_samples', attr='source')),
] ]
def setUp(self):
super(TestPublisher, self).setUp()
self.topic = (self.CONF.publisher_notifier.event_topic
if self.pub_func == 'publish_events' else
self.CONF.publisher_rpc.metering_topic)
def test_published_concurrency(self): def test_published_concurrency(self):
"""Test concurrent access to the local queue of the rpc publisher.""" """Test concurrent access to the local queue of the rpc publisher."""
@@ -189,9 +215,9 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
fake_send.side_effect = fake_send_wait fake_send.side_effect = fake_send_wait
job1 = eventlet.spawn(publisher.publish_samples, job1 = eventlet.spawn(getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data) mock.MagicMock(), self.test_data)
job2 = eventlet.spawn(publisher.publish_samples, job2 = eventlet.spawn(getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data) mock.MagicMock(), self.test_data)
job1.wait() job1.wait()
@@ -210,14 +236,13 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
fake_send.side_effect = side_effect fake_send.side_effect = side_effect
self.assertRaises( self.assertRaises(
oslo.messaging.MessageDeliveryFailure, oslo.messaging.MessageDeliveryFailure,
publisher.publish_samples, getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data) mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called) self.assertTrue(mylog.info.called)
self.assertEqual('default', publisher.policy) self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with( fake_send.assert_called_once_with(
mock.ANY, self.CONF.publisher_rpc.metering_topic, mock.ANY, self.topic, mock.ANY)
mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG') @mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_policy_block(self, mylog): def test_published_with_policy_block(self, mylog):
@@ -228,13 +253,12 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
fake_send.side_effect = side_effect fake_send.side_effect = side_effect
self.assertRaises( self.assertRaises(
oslo.messaging.MessageDeliveryFailure, oslo.messaging.MessageDeliveryFailure,
publisher.publish_samples, getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data) mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called) self.assertTrue(mylog.info.called)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with( fake_send.assert_called_once_with(
mock.ANY, self.CONF.publisher_rpc.metering_topic, mock.ANY, self.topic, mock.ANY)
mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG') @mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_policy_incorrect(self, mylog): def test_published_with_policy_incorrect(self, mylog):
@@ -245,14 +269,13 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
fake_send.side_effect = side_effect fake_send.side_effect = side_effect
self.assertRaises( self.assertRaises(
oslo.messaging.MessageDeliveryFailure, oslo.messaging.MessageDeliveryFailure,
publisher.publish_samples, getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data) mock.MagicMock(), self.test_data)
self.assertTrue(mylog.warn.called) self.assertTrue(mylog.warn.called)
self.assertEqual('default', publisher.policy) self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with( fake_send.assert_called_once_with(
mock.ANY, self.CONF.publisher_rpc.metering_topic, mock.ANY, self.topic, mock.ANY)
mock.ANY)
def test_published_with_policy_drop_and_rpc_down(self): def test_published_with_policy_drop_and_rpc_down(self):
publisher = self.publisher_cls( publisher = self.publisher_cls(
@@ -260,12 +283,11 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
side_effect = oslo.messaging.MessageDeliveryFailure() side_effect = oslo.messaging.MessageDeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send: with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect fake_send.side_effect = side_effect
publisher.publish_samples(mock.MagicMock(), getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with( fake_send.assert_called_once_with(
mock.ANY, self.CONF.publisher_rpc.metering_topic, mock.ANY, self.topic, mock.ANY)
mock.ANY)
def test_published_with_policy_queue_and_rpc_down(self): def test_published_with_policy_queue_and_rpc_down(self):
publisher = self.publisher_cls( publisher = self.publisher_cls(
@@ -274,12 +296,11 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
with mock.patch.object(publisher, '_send') as fake_send: with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect fake_send.side_effect = side_effect
publisher.publish_samples(mock.MagicMock(), getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(1, len(publisher.local_queue)) self.assertEqual(1, len(publisher.local_queue))
fake_send.assert_called_once_with( fake_send.assert_called_once_with(
mock.ANY, self.CONF.publisher_rpc.metering_topic, mock.ANY, self.topic, mock.ANY)
mock.ANY)
def test_published_with_policy_queue_and_rpc_down_up(self): def test_published_with_policy_queue_and_rpc_down_up(self):
self.rpc_unreachable = True self.rpc_unreachable = True
@@ -289,18 +310,18 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
side_effect = oslo.messaging.MessageDeliveryFailure() side_effect = oslo.messaging.MessageDeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send: with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect fake_send.side_effect = side_effect
publisher.publish_samples(mock.MagicMock(), getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(1, len(publisher.local_queue)) self.assertEqual(1, len(publisher.local_queue))
fake_send.side_effect = mock.MagicMock() fake_send.side_effect = mock.MagicMock()
publisher.publish_samples(mock.MagicMock(), getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
topic = self.CONF.publisher_rpc.metering_topic topic = self.topic
expected = [mock.call(mock.ANY, topic, mock.ANY), expected = [mock.call(mock.ANY, topic, mock.ANY),
mock.call(mock.ANY, topic, mock.ANY), mock.call(mock.ANY, topic, mock.ANY),
mock.call(mock.ANY, topic, mock.ANY)] mock.call(mock.ANY, topic, mock.ANY)]
@@ -315,22 +336,22 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
fake_send.side_effect = side_effect fake_send.side_effect = side_effect
for i in range(0, 5): for i in range(0, 5):
for s in self.test_data: for s in self.test_data:
s.source = 'test-%d' % i setattr(s, self.attr, 'test-%d' % i)
publisher.publish_samples(mock.MagicMock(), getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(3, len(publisher.local_queue)) self.assertEqual(3, len(publisher.local_queue))
self.assertEqual( self.assertEqual(
'test-2', 'test-2',
publisher.local_queue[0][2][0]['source'] publisher.local_queue[0][2][0][self.attr]
) )
self.assertEqual( self.assertEqual(
'test-3', 'test-3',
publisher.local_queue[1][2][0]['source'] publisher.local_queue[1][2][0][self.attr]
) )
self.assertEqual( self.assertEqual(
'test-4', 'test-4',
publisher.local_queue[2][2][0]['source'] publisher.local_queue[2][2][0][self.attr]
) )
def test_published_with_policy_default_sized_queue_and_rpc_down(self): def test_published_with_policy_default_sized_queue_and_rpc_down(self):
@@ -342,16 +363,16 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
fake_send.side_effect = side_effect fake_send.side_effect = side_effect
for i in range(0, 2000): for i in range(0, 2000):
for s in self.test_data: for s in self.test_data:
s.source = 'test-%d' % i setattr(s, self.attr, 'test-%d' % i)
publisher.publish_samples(mock.MagicMock(), getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(1024, len(publisher.local_queue)) self.assertEqual(1024, len(publisher.local_queue))
self.assertEqual( self.assertEqual(
'test-976', 'test-976',
publisher.local_queue[0][2][0]['source'] publisher.local_queue[0][2][0][self.attr]
) )
self.assertEqual( self.assertEqual(
'test-1999', 'test-1999',
publisher.local_queue[1023][2][0]['source'] publisher.local_queue[1023][2][0][self.attr]
) )

View File

@@ -256,7 +256,7 @@ ceilometer.publisher =
meter_publisher = ceilometer.publisher.messaging:RPCPublisher meter_publisher = ceilometer.publisher.messaging:RPCPublisher
meter = ceilometer.publisher.messaging:RPCPublisher meter = ceilometer.publisher.messaging:RPCPublisher
rpc = ceilometer.publisher.messaging:RPCPublisher rpc = ceilometer.publisher.messaging:RPCPublisher
notifier = ceilometer.publisher.messaging:NotifierPublisher notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
udp = ceilometer.publisher.udp:UDPPublisher udp = ceilometer.publisher.udp:UDPPublisher
file = ceilometer.publisher.file:FilePublisher file = ceilometer.publisher.file:FilePublisher
direct = ceilometer.publisher.direct:DirectPublisher direct = ceilometer.publisher.direct:DirectPublisher
@@ -264,6 +264,7 @@ ceilometer.publisher =
ceilometer.event.publisher = ceilometer.event.publisher =
test = ceilometer.publisher.test:TestPublisher test = ceilometer.publisher.test:TestPublisher
direct = ceilometer.publisher.direct:DirectPublisher direct = ceilometer.publisher.direct:DirectPublisher
notifier = ceilometer.publisher.messaging:EventNotifierPublisher
ceilometer.alarm.evaluator = ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator