Merge "Requeuing event with workload_partitioning on publish failure"
This commit is contained in:
commit
0ec90f1ad1
ceilometer
@ -24,6 +24,7 @@ import os
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
from stevedore import extension
|
||||
@ -117,8 +118,14 @@ class EventPipelineEndpoint(PipelineEndpoint):
|
||||
for ev in payload if publisher_utils.verify_signature(
|
||||
ev, cfg.CONF.publisher.telemetry_secret)
|
||||
]
|
||||
with self.publish_context as p:
|
||||
p(events)
|
||||
try:
|
||||
with self.publish_context as p:
|
||||
p(events)
|
||||
except Exception:
|
||||
if not cfg.CONF.notification.ack_on_event_error:
|
||||
return oslo_messaging.NotificationResult.REQUEUE
|
||||
raise
|
||||
return oslo_messaging.NotificationResult.HANDLED
|
||||
|
||||
|
||||
class _PipelineTransportManager(object):
|
||||
|
@ -15,6 +15,9 @@ import datetime
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
import oslo_messaging
|
||||
from oslotest import base
|
||||
from oslotest import mockpatch
|
||||
|
||||
@ -22,6 +25,7 @@ from ceilometer.event.storage import models
|
||||
from ceilometer import pipeline
|
||||
from ceilometer import publisher
|
||||
from ceilometer.publisher import test as test_publisher
|
||||
from ceilometer.publisher import utils
|
||||
|
||||
|
||||
class EventPipelineTestCase(base.BaseTestCase):
|
||||
@ -364,3 +368,42 @@ class EventPipelineTestCase(base.BaseTestCase):
|
||||
def test_unique_pipeline_names(self):
|
||||
self._dup_pipeline_name_cfg()
|
||||
self._exception_create_pipelinemanager()
|
||||
|
||||
def test_event_pipeline_endpoint_requeue_on_failure(self):
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
self.CONF([])
|
||||
|
||||
self.CONF.set_override("ack_on_event_error", False,
|
||||
group="notification")
|
||||
self.CONF.set_override("telemetry_secret", "not-so-secret",
|
||||
group="publisher")
|
||||
test_data = {
|
||||
'message_id': uuid.uuid4(),
|
||||
'event_type': 'a',
|
||||
'generated': '2013-08-08 21:06:37.803826',
|
||||
'traits': [
|
||||
{'name': 't_text',
|
||||
'value': 1,
|
||||
'dtype': 'text_trait'
|
||||
}
|
||||
],
|
||||
'raw': {'status': 'started'}
|
||||
}
|
||||
message_sign = utils.compute_signature(test_data, 'not-so-secret')
|
||||
test_data['message_signature'] = message_sign
|
||||
|
||||
fake_publisher = mock.Mock()
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'ceilometer.publisher.test.TestPublisher',
|
||||
return_value=fake_publisher))
|
||||
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager,
|
||||
self.p_type)
|
||||
event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
|
||||
mock.Mock(), pipeline_manager.pipelines[0])
|
||||
|
||||
fake_publisher.publish_events.side_effect = Exception
|
||||
ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise',
|
||||
'a', [test_data], None)
|
||||
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)
|
||||
|
Loading…
x
Reference in New Issue
Block a user