enable event pipeline

this patch enables the event pipeline. by default the event pipeline
will publish events straight to db.

Implements: blueprint notification-pipelines
Change-Id: I7cad6dd39d50a42f2af347f79a24fc776d261896
This commit is contained in:
gordon chung 2015-01-08 12:55:32 -05:00
parent 9a52e4081d
commit c834442ff2
10 changed files with 175 additions and 91 deletions

View File

@ -17,26 +17,29 @@ import logging
import oslo.messaging
from oslo_config import cfg
from oslo_context import context
from stevedore import extension
import ceilometer
from ceilometer import dispatcher
from ceilometer.event import converter as event_converter
from ceilometer.event.storage import models
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer.publisher import utils
LOG = logging.getLogger(__name__)
class EventsNotificationEndpoint(object):
def __init__(self):
def __init__(self, transporter):
super(EventsNotificationEndpoint, self).__init__()
self.dispatcher_manager = dispatcher.load_dispatcher_manager()
LOG.debug(_('Loading event definitions'))
self.ctxt = context.get_admin_context()
self.event_converter = event_converter.setup_events(
extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin'))
self.transporter = transporter
# NOTE(gordc): if no publisher, this isn't a PipelineManager and
# data should be requeued.
self.requeue = not hasattr(transporter, 'publisher')
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""Convert message to Ceilometer Event.
@ -56,19 +59,21 @@ class EventsNotificationEndpoint(object):
self.process_notification(notification)
def process_notification(self, notification):
event = self.event_converter.to_event(notification)
if event is not None:
LOG.debug(_('Saving event "%s"'), event.event_type)
problem_events = []
for dispatcher_ext in self.dispatcher_manager:
try:
problem_events.extend(
dispatcher_ext.obj.record_events(event))
except ceilometer.NotImplementedError:
LOG.warn(_('Event is not implemented with the storage'
' backend'))
if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
event = self.event_converter.to_event(notification)
if event is not None:
if self.requeue:
for notifier in self.transporter:
notifier.sample(
self.ctxt.to_dict(),
event_type='pipeline.event',
payload=[utils.message_from_event(
event, cfg.CONF.publisher.metering_secret)])
else:
with self.transporter.publisher(self.ctxt) as p:
p(event)
except Exception:
if not cfg.CONF.notification.ack_on_event_error:
return oslo.messaging.NotificationResult.REQUEUE
raise
return oslo.messaging.NotificationResult.HANDLED

View File

@ -17,6 +17,11 @@ from oslo_utils import timeutils
from ceilometer.storage import base
def serialize_dt(value):
"""Serializes parameter if it is datetime."""
return value.isoformat() if hasattr(value, 'isoformat') else value
class Event(base.Model):
"""A raw event from the source system. Events have Traits.
@ -51,6 +56,12 @@ class Event(base.Model):
(self.message_id, self.event_type, self.generated,
" ".join(trait_list)))
def serialize(self):
return {'message_id': self.message_id,
'event_type': self.event_type,
'generated': serialize_dt(self.generated),
'traits': [trait.serialize() for trait in self.traits]}
class Trait(base.Model):
"""A Trait is a key/value pair of data on an Event.
@ -80,6 +91,9 @@ class Trait(base.Model):
def __repr__(self):
return "<Trait: %s %d %s>" % (self.name, self.dtype, self.value)
def serialize(self):
return self.name, self.dtype, serialize_dt(self.value)
def get_type_name(self):
return self.get_name_by_type(self.dtype)

View File

@ -77,22 +77,33 @@ class NotificationService(os_service.Service):
invoke_args=(transporter, )
)
def _get_notifier(self, transport, pipe):
return oslo.messaging.Notifier(
transport,
driver=cfg.CONF.publisher_notifier.metering_driver,
publisher_id='ceilometer.notification',
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))
def start(self):
super(NotificationService, self).start()
self.pipeline_manager = pipeline.setup_pipeline()
if cfg.CONF.notification.store_events:
self.event_pipeline_manager = pipeline.setup_event_pipeline()
transport = messaging.get_transport()
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
event_transporter = None
if cfg.CONF.notification.workload_partitioning:
transporter = []
for pipe in self.pipeline_manager.pipelines:
transporter.append(oslo.messaging.Notifier(
transport,
driver=cfg.CONF.publisher_notifier.metering_driver,
publisher_id='ceilometer.notification',
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name)))
transporter.append(self._get_notifier(transport, pipe))
if cfg.CONF.notification.store_events:
event_transporter = []
for pipe in self.event_pipeline_manager.pipelines:
event_transporter.append(self._get_notifier(transport,
pipe))
self.ctxt = context.get_admin_context()
self.group_id = self.NOTIFICATION_NAMESPACE
@ -105,10 +116,12 @@ class NotificationService(os_service.Service):
# beeen registered by oslo.messaging
messaging.get_notifier(transport, '')
transporter = self.pipeline_manager
if cfg.CONF.notification.store_events:
event_transporter = self.event_pipeline_manager
self.group_id = None
self.listeners = self.pipeline_listeners = []
self._configure_main_queue_listeners(transporter)
self._configure_main_queue_listeners(transporter, event_transporter)
if cfg.CONF.notification.workload_partitioning:
self.partition_coordinator.join_group(self.group_id)
@ -124,10 +137,9 @@ class NotificationService(os_service.Service):
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def _configure_main_queue_listeners(self, transporter):
self.notification_manager = self._get_notifications_manager(
transporter)
if not list(self.notification_manager):
def _configure_main_queue_listeners(self, transporter, event_transporter):
notification_manager = self._get_notifications_manager(transporter)
if not list(notification_manager):
LOG.warning(_('Failed to load any notification handlers for %s'),
self.NOTIFICATION_NAMESPACE)
@ -135,10 +147,11 @@ class NotificationService(os_service.Service):
endpoints = []
if cfg.CONF.notification.store_events:
endpoints = [event_endpoint.EventsNotificationEndpoint()]
endpoints.append(
event_endpoint.EventsNotificationEndpoint(event_transporter))
targets = []
for ext in self.notification_manager:
for ext in notification_manager:
handler = ext.obj
LOG.debug(_('Event types from %(name)s: %(type)s'
' (ack_on_error=%(error)s)') %
@ -176,16 +189,22 @@ class NotificationService(os_service.Service):
def _configure_pipeline_listeners(self):
self.pipeline_listeners = []
ev_pipes = []
if cfg.CONF.notification.store_events:
ev_pipes = self.event_pipeline_manager.pipelines
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id, self.pipeline_manager.pipelines)
self.group_id, self.pipeline_manager.pipelines + ev_pipes)
transport = messaging.get_transport()
for pipe in partitioned:
LOG.debug(_('Pipeline endpoint: %s'), pipe.name)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline) else
pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
transport,
[oslo.messaging.Target(
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))],
[pipeline.PipelineEndpoint(self.ctxt, pipe)])
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)

View File

@ -22,9 +22,11 @@ import fnmatch
import os
from oslo_config import cfg
from oslo_utils import timeutils
import six
import yaml
from ceilometer.event.storage import models
from ceilometer.i18n import _
from ceilometer.openstack.common import log
from ceilometer import publisher
@ -57,13 +59,19 @@ class PipelineException(Exception):
return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
@six.add_metaclass(abc.ABCMeta)
class PipelineEndpoint(object):
def __init__(self, context, pipeline):
self.publish_context = PublishContext(context, [pipeline])
@abc.abstractmethod
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
pass
class SamplePipelineEndpoint(PipelineEndpoint):
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for pipeline messages."""
samples = [
sample_util.Sample(name=s['counter_name'],
type=s['counter_type'],
@ -81,6 +89,23 @@ class PipelineEndpoint(object):
p(samples)
class EventPipelineEndpoint(PipelineEndpoint):
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
events = [
models.Event(
message_id=ev['message_id'],
event_type=ev['event_type'],
generated=timeutils.normalize_time(
timeutils.parse_isotime(ev['generated'])),
traits=[models.Trait(name, dtype,
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']])
for ev in payload
]
with self.publish_context as p:
p(events)
class PublishContext(object):
def __init__(self, context, pipelines=None):
@ -280,7 +305,8 @@ class Sink(object):
# Support old format without URL
p = p + "://"
try:
self.publishers.append(publisher.get_publisher(p))
self.publishers.append(publisher.get_publisher(p,
self.NAMESPACE))
except Exception:
LOG.exception(_("Unable to load publisher %s"), p)
@ -311,6 +337,8 @@ class Sink(object):
class EventSink(Sink):
NAMESPACE = 'ceilometer.event.publisher'
def publish_events(self, ctxt, events):
if events:
for p in self.publishers:
@ -329,6 +357,8 @@ class EventSink(Sink):
class SampleSink(Sink):
NAMESPACE = 'ceilometer.publisher'
def _transform_sample(self, start, ctxt, sample):
try:
for transformer in self.transformers[start:]:

View File

@ -13,11 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo.utils import timeutils
import ceilometer
from ceilometer.dispatcher import database
from ceilometer import publisher
from ceilometer.publisher import utils
@ -33,7 +31,9 @@ class DirectPublisher(publisher.PublisherBase):
def __init__(self, parsed_url):
super(DirectPublisher, self).__init__(parsed_url)
self.meter_conn = database.DatabaseDispatcher(cfg.CONF).meter_conn
dispatcher = database.DatabaseDispatcher(cfg.CONF)
self.meter_conn = dispatcher.meter_conn
self.event_conn = dispatcher.event_conn
def publish_samples(self, context, samples):
if not isinstance(samples, list):
@ -54,4 +54,7 @@ class DirectPublisher(publisher.PublisherBase):
self.meter_conn.record_metering_data(meter)
def publish_events(self, context, events):
raise ceilometer.NotImplementedError
if not isinstance(events, list):
events = [events]
self.event_conn.record_events(events)

View File

@ -118,3 +118,13 @@ def meter_message_from_counter(sample, secret):
}
msg['message_signature'] = compute_signature(msg, secret)
return msg
def message_from_event(event, secret):
"""Make an event message ready to be published or stored.
Returns a serialized model of Event containing an event message
"""
msg = event.serialize()
msg['message_signature'] = compute_signature(msg, secret)
return msg

View File

@ -18,11 +18,8 @@ import mock
import oslo.messaging
from oslo_config import cfg
from oslo_config import fixture as fixture_config
from stevedore import extension
import ceilometer
from ceilometer.event import endpoint as event_endpoint
from ceilometer.event.storage import models
from ceilometer.tests import base as tests_base
TEST_NOTICE_CTXT = {
@ -91,46 +88,21 @@ class TestEventEndpoint(tests_base.BaseTestCase):
self.CONF.set_override("store_events", True, group="notification")
self.setup_messaging(self.CONF)
self.mock_dispatcher = mock.MagicMock()
self.endpoint = event_endpoint.EventsNotificationEndpoint()
(self.endpoint.
dispatcher_manager) = (extension.ExtensionManager.
make_test_instance([extension.
Extension('test', None,
None,
self.
mock_dispatcher)
]))
self.mock_pm = mock.MagicMock()
self.endpoint = event_endpoint.EventsNotificationEndpoint(self.mock_pm)
self.endpoint.event_converter = mock.MagicMock()
self.endpoint.event_converter.to_event.return_value = mock.MagicMock(
event_type='test.test')
@mock.patch('ceilometer.event.endpoint.LOG')
def test_event_not_implemented(self, log):
re = self.mock_dispatcher.record_events
re.side_effect = ceilometer.NotImplementedError
message = {'event_type': "foo", 'message_id': "abc"}
ret = self.endpoint.process_notification(message)
log.warn.assert_called_once_with(
'Event is not implemented with the storage backend')
self.assertEqual(oslo.messaging.NotificationResult.HANDLED, ret)
def test_message_to_event(self):
self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
def test_message_to_event_duplicate(self):
self.mock_dispatcher.record_events.return_value = [
(models.Event.DUPLICATE, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.endpoint.process_notification(message) # Should return silently.
def test_message_to_event_bad_event(self):
self.CONF.set_override("ack_on_event_error", False,
group="notification")
self.mock_dispatcher.record_events.return_value = [
(models.Event.UNKNOWN_PROBLEM, object())]
self.mock_pm.publisher.side_effect = Exception
message = {'event_type': "foo", 'message_id': "abc"}
ret = self.endpoint.process_notification(message)
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, ret)

View File

@ -20,6 +20,7 @@ import uuid
from oslo.utils import netutils
from ceilometer.event.storage import models as event
from ceilometer.publisher import direct
from ceilometer import sample
from ceilometer.tests import db as tests_db
@ -80,3 +81,22 @@ class TestDirectPublisher(tests_db.TestBase,
self.assertEqual(3, len(meters), 'There should be 3 samples')
self.assertEqual(['alpha', 'beta', 'gamma'], names)
class TestEventDirectPublisher(tests_db.TestBase,
tests_db.MixinTestsWithBackendScenarios):
test_data = [event.Event(message_id=str(uuid.uuid4()),
event_type='event_%d' % i,
generated=datetime.datetime.utcnow(),
traits=[])
for i in range(0, 5)]
def test_direct_publisher(self, ):
parsed_url = netutils.urlsplit('direct://')
publisher = direct.DirectPublisher(parsed_url)
publisher.publish_events(None, self.test_data)
e_types = list(self.event_conn.get_event_types())
self.assertEqual(5, len(e_types))
self.assertEqual(['event_%d' % i for i in range(0, 5)], e_types)

View File

@ -139,6 +139,7 @@ class TestNotification(tests_base.BaseTestCase):
self.assertNotEqual(self.fake_event_endpoint,
self.srv.listeners[0].dispatcher.endpoints[0])
@mock.patch('ceilometer.pipeline.setup_event_pipeline', mock.MagicMock())
def test_process_notification_with_events(self):
self.CONF.set_override("store_events", True, group="notification")
self._do_process_notification_manager_start()
@ -146,21 +147,6 @@ class TestNotification(tests_base.BaseTestCase):
self.assertEqual(self.fake_event_endpoint,
self.srv.listeners[0].dispatcher.endpoints[0])
@mock.patch('ceilometer.event.converter.get_config_file',
mock.MagicMock(return_value=None))
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.MagicMock())
def test_event_dispatcher_loaded(self):
self.CONF.set_override("store_events", True, group="notification")
with mock.patch.object(self.srv,
'_get_notifications_manager') as get_nm:
get_nm.side_effect = self.fake_get_notifications_manager
self.srv.start()
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
event_endpoint = self.srv.listeners[0].dispatcher.endpoints[0]
self.assertEqual(1, len(list(event_endpoint.dispatcher_manager)))
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.MagicMock())
@ -193,13 +179,32 @@ class BaseRealNotification(tests_base.BaseTestCase):
'transformers': [],
'publishers': ['test://'],
}])
self.expected_samples = 2
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.CONF.set_override("store_events", True, group="notification")
ev_pipeline = yaml.dump({
'sources': [{
'name': 'test_event',
'events': '*',
'sinks': ['test_sink']
}],
'sinks': [{
'name': 'test_sink',
'publishers': ['test://']
}]
})
self.expected_events = 1
ev_pipeline_cfg_file = fileutils.write_to_tempfile(
content=ev_pipeline, prefix="event_pipeline", suffix="yaml")
self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file)
self.CONF.set_override(
"definitions_cfg_file",
self.path_get('etc/ceilometer/event_definitions.yaml'),
group='event')
self.publisher = test_publisher.TestPublisher("")
def _check_notification_service(self):
@ -211,7 +216,8 @@ class BaseRealNotification(tests_base.BaseTestCase):
TEST_NOTICE_PAYLOAD)
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.publisher.samples) >= self.expected_samples:
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
eventlet.sleep(0)
@ -219,6 +225,7 @@ class BaseRealNotification(tests_base.BaseTestCase):
resources = list(set(s.resource_id for s in self.publisher.samples))
self.assertEqual(self.expected_samples, len(self.publisher.samples))
self.assertEqual(self.expected_events, len(self.publisher.events))
self.assertEqual(["9f9d01b9-4a58-4271-9e27-398b21ab20d1"], resources)
@ -263,7 +270,7 @@ class TestRealNotificationHA(BaseRealNotification):
def test_reset_listeners_on_refresh(self):
self.srv.start()
self.assertEqual(1, len(self.srv.pipeline_listeners))
self.assertEqual(2, len(self.srv.pipeline_listeners))
self.srv._refresh_agent(None)
self.assertEqual(1, len(self.srv.pipeline_listeners))
self.assertEqual(2, len(self.srv.pipeline_listeners))
self.srv.stop()

View File

@ -261,6 +261,10 @@ ceilometer.publisher =
file = ceilometer.publisher.file:FilePublisher
direct = ceilometer.publisher.direct:DirectPublisher
ceilometer.event.publisher =
test = ceilometer.publisher.test:TestPublisher
direct = ceilometer.publisher.direct:DirectPublisher
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator