Enable collector to requeue samples when enabled

If an exception is reported by the storage layer,
the dispatcher logs the error but does not
propagate it. Stevedore plugin framework also does
not propagate the exception back to the caller.

The sample is not requeued since collector does
not get the exception and returns a Handled
NotificationResult. This fix raises the exception
in the database dispatcher and enables Stevedore
to propagate exceptions to the collector.

Change-Id: I341ad92df050bf0f474b07c3f27c68cdb4f4e59b
Closes-Bug: 1433881
This commit is contained in:
Rohit Jaiswal
2015-03-19 11:28:20 -07:00
parent 348ecb0861
commit 7e5a6711ce
3 changed files with 27 additions and 13 deletions

View File

@@ -40,11 +40,14 @@ DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
def load_dispatcher_manager():
LOG.debug(_('loading dispatchers from %s'),
DISPATCHER_NAMESPACE)
# set propagate_map_exceptions to True to enable stevedore
# to propagate exceptions.
dispatcher_manager = named.NamedExtensionManager(
namespace=DISPATCHER_NAMESPACE,
names=cfg.CONF.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
invoke_args=[cfg.CONF],
propagate_map_exceptions=True)
if not list(dispatcher_manager):
LOG.warning(_('Failed to load any dispatchers for %s'),
DISPATCHER_NAMESPACE)

View File

@@ -90,6 +90,8 @@ class DatabaseDispatcher(dispatcher.Base):
except Exception as err:
LOG.exception(_('Failed to record metering data: %s'),
err)
# raise the exception to propagate it up in the chain.
raise
else:
LOG.warning(_(
'message signature invalid, discarding message: %r'),

View File

@@ -100,7 +100,7 @@ class TestCollector(tests_base.BaseTestCase):
plugin = mock.MagicMock()
fake_dispatcher = extension.ExtensionManager.make_test_instance([
extension.Extension('test', None, None, plugin,),
])
], propagate_map_exceptions=True)
self.useFixture(mockpatch.Patch(
'ceilometer.dispatcher.load_dispatcher_manager',
return_value=fake_dispatcher))
@@ -231,13 +231,17 @@ class TestCollector(tests_base.BaseTestCase):
'metering data test for test_run_tasks: 1')
def _test_collector_requeue(self, listener):
mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
mock_dispatcher.record_metering_data.side_effect = Exception('boom')
mock_dispatcher.record_events.side_effect = Exception('boom')
self.srv.start()
with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
side_effect=Exception('boom')):
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
ret = endp.sample({}, 'pub_id', 'event', {}, {})
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE,
ret)
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
ret = endp.sample({}, 'pub_id', 'event', {}, {})
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE,
ret)
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.Mock())
@@ -257,12 +261,17 @@ class TestCollector(tests_base.BaseTestCase):
self._test_collector_requeue('event_listener')
def _test_collector_no_requeue(self, listener):
mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
mock_dispatcher.record_metering_data.side_effect = (FakeException
('boom'))
mock_dispatcher.record_events.side_effect = (FakeException
('boom'))
self.srv.start()
with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
side_effect=FakeException('boom')):
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
self.assertRaises(FakeException, endp.sample, {}, 'pub_id',
'event', {}, {})
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
self.assertRaises(FakeException, endp.sample, {}, 'pub_id',
'event', {}, {})
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.Mock())