Requeuing event with workload_partitioning on publish failure

when workload_partitioning is enabled,
publishing of samples occurs in the
pipeline listeners. If publishing fails
when single publisher is configured, event
will not be requeued or ack'ed.

This fix requeues or acks the event based on
ack_on_event_error.

Change-Id: I8f2f889736c8897e5b15952ab32308cf33205c3f
Closes-Bug: 1488202
This commit is contained in:
Rohit Jaiswal 2015-08-28 00:15:38 +00:00
parent fcf5b4e7e2
commit 967d927278
2 changed files with 52 additions and 2 deletions

View File

@ -24,6 +24,7 @@ import os
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
import oslo_messaging
from oslo_utils import timeutils from oslo_utils import timeutils
import six import six
from stevedore import extension from stevedore import extension
@ -117,8 +118,14 @@ class EventPipelineEndpoint(PipelineEndpoint):
for ev in payload if publisher_utils.verify_signature( for ev in payload if publisher_utils.verify_signature(
ev, cfg.CONF.publisher.telemetry_secret) ev, cfg.CONF.publisher.telemetry_secret)
] ]
with self.publish_context as p: try:
p(events) with self.publish_context as p:
p(events)
except Exception:
if not cfg.CONF.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
raise
return oslo_messaging.NotificationResult.HANDLED
class _PipelineTransportManager(object): class _PipelineTransportManager(object):

View File

@ -15,6 +15,9 @@ import datetime
import traceback import traceback
import uuid import uuid
import mock
from oslo_config import fixture as fixture_config
import oslo_messaging
from oslotest import base from oslotest import base
from oslotest import mockpatch from oslotest import mockpatch
@ -22,6 +25,7 @@ from ceilometer.event.storage import models
from ceilometer import pipeline from ceilometer import pipeline
from ceilometer import publisher from ceilometer import publisher
from ceilometer.publisher import test as test_publisher from ceilometer.publisher import test as test_publisher
from ceilometer.publisher import utils
class EventPipelineTestCase(base.BaseTestCase): class EventPipelineTestCase(base.BaseTestCase):
@ -364,3 +368,42 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self): def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg() self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager() self._exception_create_pipelinemanager()
def test_event_pipeline_endpoint_requeue_on_failure(self):
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF([])
self.CONF.set_override("ack_on_event_error", False,
group="notification")
self.CONF.set_override("telemetry_secret", "not-so-secret",
group="publisher")
test_data = {
'message_id': uuid.uuid4(),
'event_type': 'a',
'generated': '2013-08-08 21:06:37.803826',
'traits': [
{'name': 't_text',
'value': 1,
'dtype': 'text_trait'
}
],
'raw': {'status': 'started'}
}
message_sign = utils.compute_signature(test_data, 'not-so-secret')
test_data['message_signature'] = message_sign
fake_publisher = mock.Mock()
self.useFixture(mockpatch.Patch(
'ceilometer.publisher.test.TestPublisher',
return_value=fake_publisher))
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
mock.Mock(), pipeline_manager.pipelines[0])
fake_publisher.publish_events.side_effect = Exception
ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise',
'a', [test_data], None)
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)