From 967d9272780f379c90a0a77330422c4b80440617 Mon Sep 17 00:00:00 2001 From: Rohit Jaiswal Date: Fri, 28 Aug 2015 00:15:38 +0000 Subject: [PATCH] Requeuing event with workload_partitioning on publish failure when workload_partitioning is enabled, publishing of samples occurs in the pipeline listeners. If publishing fails when single publisher is configured, event will not be requeued or ack'ed. This fix requeues or acks the event based on ack_on_event_error. Change-Id: I8f2f889736c8897e5b15952ab32308cf33205c3f Closes-Bug: 1488202 --- ceilometer/pipeline.py | 11 ++++- ceilometer/tests/unit/test_event_pipeline.py | 43 ++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 95863aef..d82a3b7f 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -24,6 +24,7 @@ import os from oslo_config import cfg from oslo_log import log +import oslo_messaging from oslo_utils import timeutils import six from stevedore import extension @@ -117,8 +118,14 @@ class EventPipelineEndpoint(PipelineEndpoint): for ev in payload if publisher_utils.verify_signature( ev, cfg.CONF.publisher.telemetry_secret) ] - with self.publish_context as p: - p(events) + try: + with self.publish_context as p: + p(events) + except Exception: + if not cfg.CONF.notification.ack_on_event_error: + return oslo_messaging.NotificationResult.REQUEUE + raise + return oslo_messaging.NotificationResult.HANDLED class _PipelineTransportManager(object): diff --git a/ceilometer/tests/unit/test_event_pipeline.py b/ceilometer/tests/unit/test_event_pipeline.py index cc3b9b21..fac9d51b 100644 --- a/ceilometer/tests/unit/test_event_pipeline.py +++ b/ceilometer/tests/unit/test_event_pipeline.py @@ -15,6 +15,9 @@ import datetime import traceback import uuid +import mock +from oslo_config import fixture as fixture_config +import oslo_messaging from oslotest import base from oslotest import mockpatch @@ -22,6 +25,7 @@ from ceilometer.event.storage import models from ceilometer import pipeline from ceilometer import publisher from ceilometer.publisher import test as test_publisher +from ceilometer.publisher import utils class EventPipelineTestCase(base.BaseTestCase): @@ -364,3 +368,42 @@ class EventPipelineTestCase(base.BaseTestCase): def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() + + def test_event_pipeline_endpoint_requeue_on_failure(self): + self.CONF = self.useFixture(fixture_config.Config()).conf + self.CONF([]) + + self.CONF.set_override("ack_on_event_error", False, + group="notification") + self.CONF.set_override("telemetry_secret", "not-so-secret", + group="publisher") + test_data = { + 'message_id': uuid.uuid4(), + 'event_type': 'a', + 'generated': '2013-08-08 21:06:37.803826', + 'traits': [ + {'name': 't_text', + 'value': 1, + 'dtype': 'text_trait' + } + ], + 'raw': {'status': 'started'} + } + message_sign = utils.compute_signature(test_data, 'not-so-secret') + test_data['message_signature'] = message_sign + + fake_publisher = mock.Mock() + self.useFixture(mockpatch.Patch( + 'ceilometer.publisher.test.TestPublisher', + return_value=fake_publisher)) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + event_pipeline_endpoint = pipeline.EventPipelineEndpoint( + mock.Mock(), pipeline_manager.pipelines[0]) + + fake_publisher.publish_events.side_effect = Exception + ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise', + 'a', [test_data], None) + self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)