diff --git a/ceilometer/collector.py b/ceilometer/collector.py index cde04a4a4f..a4739c3d77 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -25,6 +25,7 @@ from oslo.utils import units from ceilometer import dispatcher from ceilometer import messaging from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common.gettextutils import _LE from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service @@ -36,6 +37,11 @@ OPTS = [ cfg.IntOpt('udp_port', default=4952, help='Port to which the UDP socket is bound.'), + cfg.BoolOpt('requeue_sample_on_dispatcher_error', + default=False, + help='Requeue the sample on the collector sample queue ' + 'when the collector fails to dispatch it. This is only valid ' + 'if the sample come from the notifier publisher'), ] cfg.CONF.register_opts(OPTS, group="collector") @@ -61,6 +67,7 @@ class CollectorService(os_service.Service): if cfg.CONF.collector.udp_address: self.tg.add_thread(self.start_udp) + allow_requeue = cfg.CONF.collector.requeue_sample_on_dispatcher_error transport = messaging.get_transport(optional=True) if transport: self.rpc_server = messaging.get_rpc_server( @@ -69,7 +76,8 @@ class CollectorService(os_service.Service): target = oslo.messaging.Target( topic=cfg.CONF.publisher_notifier.metering_topic) self.notification_server = messaging.get_notification_listener( - transport, [target], [self]) + transport, [target], [self], + allow_requeue=allow_requeue) self.rpc_server.start() self.notification_server.start() @@ -116,8 +124,15 @@ class CollectorService(os_service.Service): bus, this method receives it. """ - self.dispatcher_manager.map_method('record_metering_data', - data=payload) + try: + self.dispatcher_manager.map_method('record_metering_data', + data=payload) + except Exception: + if cfg.CONF.collector.requeue_sample_on_dispatcher_error: + LOG.exception(_LE("Dispatcher failed to handle the sample, " + "requeue it.")) + return oslo.messaging.NotificationResult.REQUEUE + raise def record_metering_data(self, context, data): """RPC endpoint for messages we send to ourselves. diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index 6d40871bda..3c0dee4520 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -112,10 +112,12 @@ def get_rpc_client(transport, **kwargs): serializer=serializer) -def get_notification_listener(transport, targets, endpoints): +def get_notification_listener(transport, targets, endpoints, + allow_requeue=False): """Return a configured oslo.messaging notification listener.""" return oslo.messaging.get_notification_listener( - transport, targets, endpoints, executor='eventlet') + transport, targets, endpoints, executor='eventlet', + allow_requeue=allow_requeue) def get_notifier(transport, publisher_id): diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py index a7f39efd03..bc58fb30b1 100644 --- a/ceilometer/tests/test_collector.py +++ b/ceilometer/tests/test_collector.py @@ -34,6 +34,10 @@ from ceilometer import sample from ceilometer.tests import base as tests_base +class FakeException(Exception): + pass + + class FakeConnection(): def create_worker(self, topic, proxy, pool_name): pass @@ -227,3 +231,26 @@ class TestCollector(tests_base.BaseTestCase): self.srv.rpc_server.wait() mylog.info.assert_called_once_with( 'metering data test for test_run_tasks: 1') + + @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start') + @mock.patch.object(collector.CollectorService, 'start_udp') + def test_collector_requeue(self, udp_start, rpc_start): + self.CONF.set_override('requeue_sample_on_dispatcher_error', True, + group='collector') + self.srv.start() + with mock.patch.object(self.srv.dispatcher_manager, 'map_method', + side_effect=Exception('boom')): + ret = self.srv.sample({}, 'pub_id', 'event', {}, {}) + self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, + ret) + + @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start') + @mock.patch.object(collector.CollectorService, 'start_udp') + def test_collector_no_requeue(self, udp_start, rpc_start): + self.CONF.set_override('requeue_sample_on_dispatcher_error', False, + group='collector') + self.srv.start() + with mock.patch.object(self.srv.dispatcher_manager, 'map_method', + side_effect=FakeException('boom')): + self.assertRaises(FakeException, self.srv.sample, {}, 'pub_id', + 'event', {}, {}) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 48b321995d..e19d72567d 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -208,6 +208,20 @@ evaluation_service singleton Driver to use for alarm evaluation servi ====================== ============== ==================================================================================== +Collector +========= + +The following options in the [collector] configuration section affect the collector service + +===================================== ====================================== ============================================================== +Parameter Default Note +===================================== ====================================== ============================================================== +requeue_sample_on_dispatcher_error False Requeue the sample on the collector sample queue when the + collector fails to dispatch it. This option is only valid if + the sample comes from the notifier publisher +===================================== ====================================== ============================================================== + + General options ===============