From 63ad96f19e458c374a13918a29f38eebe4df60aa Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Mon, 1 Oct 2012 15:40:28 -0400 Subject: [PATCH] Fix notification subscription logic We need to funnel all messages from all exchanges through the same function to dispatch them to the handlers. If we don't do that, we have multiple subscribers, each configured to be interested in different events but that do not all receive all events. The result is events being discarded. Change-Id: I31daa204c5ee13e8e5f767e0f5e61e2e7eafe624 Signed-off-by: Doug Hellmann --- ceilometer/collector/manager.py | 26 ++++++++++++++++---------- tests/collector/test_manager.py | 3 ++- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/ceilometer/collector/manager.py b/ceilometer/collector/manager.py index 8b2968a203..0df7a19a5b 100644 --- a/ceilometer/collector/manager.py +++ b/ceilometer/collector/manager.py @@ -87,16 +87,18 @@ class CollectorManager(manager.Manager): # that notification messages do not conform to the RPC # invocation protocol (they do not include a "method" # parameter). + # FIXME(dhellmann): Break this out into its own method + # so we can test the subscription logic. for handler in self.handlers: + LOG.debug('Event types: %r', handler.get_event_types()) for exchange_topic in handler.get_exchange_topics(cfg.CONF): for topic in exchange_topic.topics: self.connection.declare_topic_consumer( queue_name="ceilometer.notifications", topic=topic, exchange_name=exchange_topic.exchange, - callback=functools.partial( - self.process_notification, - handler)) + callback=self.process_notification, + ) # Set ourselves up as a separate worker for the metering data, # since the default for manager is to use create_consumer(). @@ -108,19 +110,23 @@ class CollectorManager(manager.Manager): self.connection.consume_in_thread() - def process_notification(self, handler, notification): + def process_notification(self, notification): """Make a notification processed by an handler.""" - if notification['event_type'] in handler.get_event_types(): - for c in handler.process_notification(notification): - LOG.info('COUNTER: %s', c) - # FIXME(dhellmann): Spawn green thread? - self.publish_counter(c) + LOG.debug('notification %r', notification.get('event_type')) + for handler in self.handlers: + if notification['event_type'] in handler.get_event_types(): + for c in handler.process_notification(notification): + LOG.info('COUNTER: %s', c) + # FIXME(dhellmann): Spawn green thread? + self.publish_counter(c) @staticmethod def publish_counter(counter): """Create a metering message for the counter and publish it.""" ctxt = context.get_admin_context() - publish.publish_counter(ctxt, counter) + publish.publish_counter(ctxt, counter, + cfg.CONF.metering_topic, cfg.CONF.metering_secret, + ) def record_metering_data(self, context, data): """This method is triggered when metering data is diff --git a/tests/collector/test_manager.py b/tests/collector/test_manager.py index 9f1bc57b47..4a6fa11f0a 100644 --- a/tests/collector/test_manager.py +++ b/tests/collector/test_manager.py @@ -176,5 +176,6 @@ class TestCollectorManager(tests_base.TestCase): results = [] self.stubs.Set(self.mgr, 'publish_counter', lambda counter: results.append(counter)) - self.mgr.process_notification(notifications.Instance(), TEST_NOTICE) + self.mgr.handlers = [notifications.Instance()] + self.mgr.process_notification(TEST_NOTICE) self.assert_(len(results) >= 1)