collector: Allows to requeue a sample

This change allows to choice the behavior when the dispatcher method
fail.

Now we can requeue on the messaging bus the sample by setting:

[collector]
requeue_sample_on_dispatcher_error=True

This only works when the sample have been publisher with the notifier://

DocImpact
Change-Id: I4a2ad60613aa7ac2c7e90996821be0a4fc018ccf
This commit is contained in:
Mehdi Abaakouk 2014-07-30 17:10:18 +02:00
parent 8362688d64
commit ede5c45843
4 changed files with 63 additions and 5 deletions

View File

@ -25,6 +25,7 @@ from oslo.utils import units
from ceilometer import dispatcher
from ceilometer import messaging
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.gettextutils import _LE
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
@ -36,6 +37,11 @@ OPTS = [
cfg.IntOpt('udp_port',
default=4952,
help='Port to which the UDP socket is bound.'),
cfg.BoolOpt('requeue_sample_on_dispatcher_error',
default=False,
help='Requeue the sample on the collector sample queue '
'when the collector fails to dispatch it. This is only valid '
'if the sample come from the notifier publisher'),
]
cfg.CONF.register_opts(OPTS, group="collector")
@ -61,6 +67,7 @@ class CollectorService(os_service.Service):
if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp)
allow_requeue = cfg.CONF.collector.requeue_sample_on_dispatcher_error
transport = messaging.get_transport(optional=True)
if transport:
self.rpc_server = messaging.get_rpc_server(
@ -69,7 +76,8 @@ class CollectorService(os_service.Service):
target = oslo.messaging.Target(
topic=cfg.CONF.publisher_notifier.metering_topic)
self.notification_server = messaging.get_notification_listener(
transport, [target], [self])
transport, [target], [self],
allow_requeue=allow_requeue)
self.rpc_server.start()
self.notification_server.start()
@ -116,8 +124,15 @@ class CollectorService(os_service.Service):
bus, this method receives it.
"""
self.dispatcher_manager.map_method('record_metering_data',
data=payload)
try:
self.dispatcher_manager.map_method('record_metering_data',
data=payload)
except Exception:
if cfg.CONF.collector.requeue_sample_on_dispatcher_error:
LOG.exception(_LE("Dispatcher failed to handle the sample, "
"requeue it."))
return oslo.messaging.NotificationResult.REQUEUE
raise
def record_metering_data(self, context, data):
"""RPC endpoint for messages we send to ourselves.

View File

@ -112,10 +112,12 @@ def get_rpc_client(transport, **kwargs):
serializer=serializer)
def get_notification_listener(transport, targets, endpoints):
def get_notification_listener(transport, targets, endpoints,
allow_requeue=False):
"""Return a configured oslo.messaging notification listener."""
return oslo.messaging.get_notification_listener(
transport, targets, endpoints, executor='eventlet')
transport, targets, endpoints, executor='eventlet',
allow_requeue=allow_requeue)
def get_notifier(transport, publisher_id):

View File

@ -34,6 +34,10 @@ from ceilometer import sample
from ceilometer.tests import base as tests_base
class FakeException(Exception):
pass
class FakeConnection():
def create_worker(self, topic, proxy, pool_name):
pass
@ -227,3 +231,26 @@ class TestCollector(tests_base.BaseTestCase):
self.srv.rpc_server.wait()
mylog.info.assert_called_once_with(
'metering data test for test_run_tasks: 1')
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
@mock.patch.object(collector.CollectorService, 'start_udp')
def test_collector_requeue(self, udp_start, rpc_start):
self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
group='collector')
self.srv.start()
with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
side_effect=Exception('boom')):
ret = self.srv.sample({}, 'pub_id', 'event', {}, {})
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE,
ret)
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
@mock.patch.object(collector.CollectorService, 'start_udp')
def test_collector_no_requeue(self, udp_start, rpc_start):
self.CONF.set_override('requeue_sample_on_dispatcher_error', False,
group='collector')
self.srv.start()
with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
side_effect=FakeException('boom')):
self.assertRaises(FakeException, self.srv.sample, {}, 'pub_id',
'event', {}, {})

View File

@ -208,6 +208,20 @@ evaluation_service singleton Driver to use for alarm evaluation servi
====================== ============== ====================================================================================
Collector
=========
The following options in the [collector] configuration section affect the collector service
===================================== ====================================== ==============================================================
Parameter Default Note
===================================== ====================================== ==============================================================
requeue_sample_on_dispatcher_error False Requeue the sample on the collector sample queue when the
collector fails to dispatch it. This option is only valid if
the sample comes from the notifier publisher
===================================== ====================================== ==============================================================
General options
===============