Fix notification subscription logic

We need to funnel all messages from all exchanges through
the same function to dispatch them to the handlers. If
we don't do that, we have multiple subscribers,
each configured to be interested in different events
but that do not all receive all events. The result is
events being discarded.

Change-Id: I31daa204c5ee13e8e5f767e0f5e61e2e7eafe624
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2012-10-01 15:40:28 -04:00
parent 658ea8ed33
commit 63ad96f19e
2 changed files with 18 additions and 11 deletions

View File

@ -87,16 +87,18 @@ class CollectorManager(manager.Manager):
# that notification messages do not conform to the RPC # that notification messages do not conform to the RPC
# invocation protocol (they do not include a "method" # invocation protocol (they do not include a "method"
# parameter). # parameter).
# FIXME(dhellmann): Break this out into its own method
# so we can test the subscription logic.
for handler in self.handlers: for handler in self.handlers:
LOG.debug('Event types: %r', handler.get_event_types())
for exchange_topic in handler.get_exchange_topics(cfg.CONF): for exchange_topic in handler.get_exchange_topics(cfg.CONF):
for topic in exchange_topic.topics: for topic in exchange_topic.topics:
self.connection.declare_topic_consumer( self.connection.declare_topic_consumer(
queue_name="ceilometer.notifications", queue_name="ceilometer.notifications",
topic=topic, topic=topic,
exchange_name=exchange_topic.exchange, exchange_name=exchange_topic.exchange,
callback=functools.partial( callback=self.process_notification,
self.process_notification, )
handler))
# Set ourselves up as a separate worker for the metering data, # Set ourselves up as a separate worker for the metering data,
# since the default for manager is to use create_consumer(). # since the default for manager is to use create_consumer().
@ -108,19 +110,23 @@ class CollectorManager(manager.Manager):
self.connection.consume_in_thread() self.connection.consume_in_thread()
def process_notification(self, handler, notification): def process_notification(self, notification):
"""Make a notification processed by an handler.""" """Make a notification processed by an handler."""
if notification['event_type'] in handler.get_event_types(): LOG.debug('notification %r', notification.get('event_type'))
for c in handler.process_notification(notification): for handler in self.handlers:
LOG.info('COUNTER: %s', c) if notification['event_type'] in handler.get_event_types():
# FIXME(dhellmann): Spawn green thread? for c in handler.process_notification(notification):
self.publish_counter(c) LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_counter(c)
@staticmethod @staticmethod
def publish_counter(counter): def publish_counter(counter):
"""Create a metering message for the counter and publish it.""" """Create a metering message for the counter and publish it."""
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
publish.publish_counter(ctxt, counter) publish.publish_counter(ctxt, counter,
cfg.CONF.metering_topic, cfg.CONF.metering_secret,
)
def record_metering_data(self, context, data): def record_metering_data(self, context, data):
"""This method is triggered when metering data is """This method is triggered when metering data is

View File

@ -176,5 +176,6 @@ class TestCollectorManager(tests_base.TestCase):
results = [] results = []
self.stubs.Set(self.mgr, 'publish_counter', self.stubs.Set(self.mgr, 'publish_counter',
lambda counter: results.append(counter)) lambda counter: results.append(counter))
self.mgr.process_notification(notifications.Instance(), TEST_NOTICE) self.mgr.handlers = [notifications.Instance()]
self.mgr.process_notification(TEST_NOTICE)
self.assert_(len(results) >= 1) self.assert_(len(results) >= 1)