Merge "Fix notification subscription logic"
This commit is contained in:
@@ -87,16 +87,18 @@ class CollectorManager(manager.Manager):
|
||||
# that notification messages do not conform to the RPC
|
||||
# invocation protocol (they do not include a "method"
|
||||
# parameter).
|
||||
# FIXME(dhellmann): Break this out into its own method
|
||||
# so we can test the subscription logic.
|
||||
for handler in self.handlers:
|
||||
LOG.debug('Event types: %r', handler.get_event_types())
|
||||
for exchange_topic in handler.get_exchange_topics(cfg.CONF):
|
||||
for topic in exchange_topic.topics:
|
||||
self.connection.declare_topic_consumer(
|
||||
queue_name="ceilometer.notifications",
|
||||
topic=topic,
|
||||
exchange_name=exchange_topic.exchange,
|
||||
callback=functools.partial(
|
||||
self.process_notification,
|
||||
handler))
|
||||
callback=self.process_notification,
|
||||
)
|
||||
|
||||
# Set ourselves up as a separate worker for the metering data,
|
||||
# since the default for manager is to use create_consumer().
|
||||
@@ -108,19 +110,23 @@ class CollectorManager(manager.Manager):
|
||||
|
||||
self.connection.consume_in_thread()
|
||||
|
||||
def process_notification(self, handler, notification):
|
||||
def process_notification(self, notification):
|
||||
"""Make a notification processed by an handler."""
|
||||
if notification['event_type'] in handler.get_event_types():
|
||||
for c in handler.process_notification(notification):
|
||||
LOG.info('COUNTER: %s', c)
|
||||
# FIXME(dhellmann): Spawn green thread?
|
||||
self.publish_counter(c)
|
||||
LOG.debug('notification %r', notification.get('event_type'))
|
||||
for handler in self.handlers:
|
||||
if notification['event_type'] in handler.get_event_types():
|
||||
for c in handler.process_notification(notification):
|
||||
LOG.info('COUNTER: %s', c)
|
||||
# FIXME(dhellmann): Spawn green thread?
|
||||
self.publish_counter(c)
|
||||
|
||||
@staticmethod
|
||||
def publish_counter(counter):
|
||||
"""Create a metering message for the counter and publish it."""
|
||||
ctxt = context.get_admin_context()
|
||||
publish.publish_counter(ctxt, counter)
|
||||
publish.publish_counter(ctxt, counter,
|
||||
cfg.CONF.metering_topic, cfg.CONF.metering_secret,
|
||||
)
|
||||
|
||||
def record_metering_data(self, context, data):
|
||||
"""This method is triggered when metering data is
|
||||
|
||||
@@ -176,5 +176,6 @@ class TestCollectorManager(tests_base.TestCase):
|
||||
results = []
|
||||
self.stubs.Set(self.mgr, 'publish_counter',
|
||||
lambda counter: results.append(counter))
|
||||
self.mgr.process_notification(notifications.Instance(), TEST_NOTICE)
|
||||
self.mgr.handlers = [notifications.Instance()]
|
||||
self.mgr.process_notification(TEST_NOTICE)
|
||||
self.assert_(len(results) >= 1)
|
||||
|
||||
Reference in New Issue
Block a user