diff --git a/ceilometer/collector/manager.py b/ceilometer/collector/manager.py index 8b2968a203..0df7a19a5b 100644 --- a/ceilometer/collector/manager.py +++ b/ceilometer/collector/manager.py @@ -87,16 +87,18 @@ class CollectorManager(manager.Manager): # that notification messages do not conform to the RPC # invocation protocol (they do not include a "method" # parameter). + # FIXME(dhellmann): Break this out into its own method + # so we can test the subscription logic. for handler in self.handlers: + LOG.debug('Event types: %r', handler.get_event_types()) for exchange_topic in handler.get_exchange_topics(cfg.CONF): for topic in exchange_topic.topics: self.connection.declare_topic_consumer( queue_name="ceilometer.notifications", topic=topic, exchange_name=exchange_topic.exchange, - callback=functools.partial( - self.process_notification, - handler)) + callback=self.process_notification, + ) # Set ourselves up as a separate worker for the metering data, # since the default for manager is to use create_consumer(). @@ -108,19 +110,23 @@ class CollectorManager(manager.Manager): self.connection.consume_in_thread() - def process_notification(self, handler, notification): + def process_notification(self, notification): """Make a notification processed by an handler.""" - if notification['event_type'] in handler.get_event_types(): - for c in handler.process_notification(notification): - LOG.info('COUNTER: %s', c) - # FIXME(dhellmann): Spawn green thread? - self.publish_counter(c) + LOG.debug('notification %r', notification.get('event_type')) + for handler in self.handlers: + if notification['event_type'] in handler.get_event_types(): + for c in handler.process_notification(notification): + LOG.info('COUNTER: %s', c) + # FIXME(dhellmann): Spawn green thread? + self.publish_counter(c) @staticmethod def publish_counter(counter): """Create a metering message for the counter and publish it.""" ctxt = context.get_admin_context() - publish.publish_counter(ctxt, counter) + publish.publish_counter(ctxt, counter, + cfg.CONF.metering_topic, cfg.CONF.metering_secret, + ) def record_metering_data(self, context, data): """This method is triggered when metering data is diff --git a/tests/collector/test_manager.py b/tests/collector/test_manager.py index 9f1bc57b47..4a6fa11f0a 100644 --- a/tests/collector/test_manager.py +++ b/tests/collector/test_manager.py @@ -176,5 +176,6 @@ class TestCollectorManager(tests_base.TestCase): results = [] self.stubs.Set(self.mgr, 'publish_counter', lambda counter: results.append(counter)) - self.mgr.process_notification(notifications.Instance(), TEST_NOTICE) + self.mgr.handlers = [notifications.Instance()] + self.mgr.process_notification(TEST_NOTICE) self.assert_(len(results) >= 1)