diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 95863aef..d82a3b7f 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -24,6 +24,7 @@ import os from oslo_config import cfg from oslo_log import log +import oslo_messaging from oslo_utils import timeutils import six from stevedore import extension @@ -117,8 +118,14 @@ class EventPipelineEndpoint(PipelineEndpoint): for ev in payload if publisher_utils.verify_signature( ev, cfg.CONF.publisher.telemetry_secret) ] - with self.publish_context as p: - p(events) + try: + with self.publish_context as p: + p(events) + except Exception: + if not cfg.CONF.notification.ack_on_event_error: + return oslo_messaging.NotificationResult.REQUEUE + raise + return oslo_messaging.NotificationResult.HANDLED class _PipelineTransportManager(object): diff --git a/ceilometer/tests/unit/test_event_pipeline.py b/ceilometer/tests/unit/test_event_pipeline.py index cc3b9b21..fac9d51b 100644 --- a/ceilometer/tests/unit/test_event_pipeline.py +++ b/ceilometer/tests/unit/test_event_pipeline.py @@ -15,6 +15,9 @@ import datetime import traceback import uuid +import mock +from oslo_config import fixture as fixture_config +import oslo_messaging from oslotest import base from oslotest import mockpatch @@ -22,6 +25,7 @@ from ceilometer.event.storage import models from ceilometer import pipeline from ceilometer import publisher from ceilometer.publisher import test as test_publisher +from ceilometer.publisher import utils class EventPipelineTestCase(base.BaseTestCase): @@ -364,3 +368,42 @@ class EventPipelineTestCase(base.BaseTestCase): def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() + + def test_event_pipeline_endpoint_requeue_on_failure(self): + self.CONF = self.useFixture(fixture_config.Config()).conf + self.CONF([]) + + self.CONF.set_override("ack_on_event_error", False, + group="notification") + self.CONF.set_override("telemetry_secret", "not-so-secret", + group="publisher") + test_data = { + 'message_id': uuid.uuid4(), + 'event_type': 'a', + 'generated': '2013-08-08 21:06:37.803826', + 'traits': [ + {'name': 't_text', + 'value': 1, + 'dtype': 'text_trait' + } + ], + 'raw': {'status': 'started'} + } + message_sign = utils.compute_signature(test_data, 'not-so-secret') + test_data['message_signature'] = message_sign + + fake_publisher = mock.Mock() + self.useFixture(mockpatch.Patch( + 'ceilometer.publisher.test.TestPublisher', + return_value=fake_publisher)) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + event_pipeline_endpoint = pipeline.EventPipelineEndpoint( + mock.Mock(), pipeline_manager.pipelines[0]) + + fake_publisher.publish_events.side_effect = Exception + ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise', + 'a', [test_data], None) + self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)