Rework RPC connection

We are now able to listen to multiple topics, we don't use Nova RPC anymore
and we have a single namespace for all collector plugins.

Change-Id: I23603601cb285e9bd71beabfd9558fe903c24308
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2012-09-24 19:04:34 +02:00
parent 6ae7e3203e
commit 40ef8d455e
7 changed files with 45 additions and 68 deletions

View File

@ -34,6 +34,7 @@ class NotificationDispatcher(object):
self.plugin_namespace = plugin_namespace self.plugin_namespace = plugin_namespace
self.publish_func = publish_func self.publish_func = publish_func
self.handlers = {} self.handlers = {}
self.topics = set()
self._load_plugins() self._load_plugins()
def _load_plugins(self): def _load_plugins(self):
@ -42,13 +43,14 @@ class NotificationDispatcher(object):
LOG.info('attempting to load notification handler for %s:%s', LOG.info('attempting to load notification handler for %s:%s',
self.plugin_namespace, ep.name) self.plugin_namespace, ep.name)
try: try:
plugin_class = ep.load()
plugin = plugin_class()
# FIXME(dhellmann): Currently assumes all plugins are # FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and # enabled when they are discovered and
# importable. Need to add check against global # importable. Need to add check against global
# configuration flag and check that asks the plugin if # configuration flag and check that asks the plugin if
# it should be enabled. # it should be enabled.
plugin_class = ep.load()
plugin = plugin_class()
self.topics.update(plugin.topics)
for event_type in plugin.get_event_types(): for event_type in plugin.get_event_types():
LOG.info('subscribing %s handler to %s events', LOG.info('subscribing %s handler to %s events',
ep.name, event_type) ep.name, event_type)
@ -61,13 +63,14 @@ class NotificationDispatcher(object):
LOG.warning('Failed to load any notification handlers for %s', LOG.warning('Failed to load any notification handlers for %s',
self.plugin_namespace) self.plugin_namespace)
def notify(self, body): def notify(self, topic, body):
"""Dispatch the notification to the appropriate handler """Dispatch the notification to the appropriate handler
and publish the counters returned. and publish the counters returned.
""" """
event_type = body.get('event_type') event_type = body.get('event_type')
LOG.info('NOTIFICATION: %s', event_type) LOG.info('NOTIFICATION: %s', event_type)
for handler in self.handlers.get(event_type, []): for handler in self.handlers.get(event_type, []):
if topic in handler.topics:
for c in handler.process_notification(body): for c in handler.process_notification(body):
LOG.info('COUNTER: %s', c) LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread? # FIXME(dhellmann): Spawn green thread?

View File

@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import functools
from nova import context from nova import context
from nova import manager from nova import manager
@ -28,8 +30,7 @@ from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
# FIXME(dhellmann): There must be another way to do this. Import # Import rabbit_notifier to register notification_topics flag
# rabbit_notifier to register notification_topics flag
import ceilometer.openstack.common.notifier.rabbit_notifier import ceilometer.openstack.common.notifier.rabbit_notifier
try: try:
import ceilometer.openstack.common.rpc as rpc import ceilometer.openstack.common.rpc as rpc
@ -40,8 +41,7 @@ except ImportError:
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
COMPUTE_COLLECTOR_NAMESPACE = 'ceilometer.collector.compute' COLLECTOR_NAMESPACE = 'ceilometer.collector'
VOLUME_COLLECTOR_NAMESPACE = 'ceilometer.collector.volume'
class CollectorManager(manager.Manager): class CollectorManager(manager.Manager):
@ -56,24 +56,18 @@ class CollectorManager(manager.Manager):
self.storage_engine = storage.get_engine(cfg.CONF) self.storage_engine = storage.get_engine(cfg.CONF)
self.storage_conn = self.storage_engine.get_connection(cfg.CONF) self.storage_conn = self.storage_engine.get_connection(cfg.CONF)
self.compute_handler = dispatcher.NotificationDispatcher( self.handler = dispatcher.NotificationDispatcher(
COMPUTE_COLLECTOR_NAMESPACE, COLLECTOR_NAMESPACE,
self._publish_counter,
)
self.volume_handler = dispatcher.NotificationDispatcher(
VOLUME_COLLECTOR_NAMESPACE,
self._publish_counter, self._publish_counter,
) )
# FIXME(dhellmann): Should be using create_worker(), except # FIXME(dhellmann): Should be using create_worker(), except
# that notification messages do not conform to the RPC # that notification messages do not conform to the RPC
# invocation protocol (they do not include a "method" # invocation protocol (they do not include a "method"
# parameter). # parameter).
for topic in self.handler.topics:
self.connection.declare_topic_consumer( self.connection.declare_topic_consumer(
topic='%s.info' % cfg.CONF.notification_topics[0], topic=topic,
callback=self.compute_handler.notify) callback=functools.partial(self.handler.notify, topic))
self.connection.declare_topic_consumer(
topic='%s.info' % cfg.CONF.notification_topics[0],
callback=self.volume_handler.notify)
# Set ourselves up as a separate worker for the metering data, # Set ourselves up as a separate worker for the metering data,
# since the default for manager is to use create_consumer(). # since the default for manager is to use create_consumer().

View File

@ -20,12 +20,18 @@
import abc import abc
from ceilometer.openstack.common import cfg
class NotificationBase(object): class NotificationBase(object):
"""Base class for plugins that support the notification API.""" """Base class for plugins that support the notification API."""
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
def __init__(self):
self.topics = set(topic + ".info"
for topic in cfg.CONF.notification_topics)
@abc.abstractmethod @abc.abstractmethod
def get_event_types(self): def get_event_types(self):
"""Return a sequence of strings defining the event types to be """Return a sequence of strings defining the event types to be

View File

@ -1,31 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 eNovance <licensing@enovance.com>
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.openstack.common.rpc import impl_kombu
class Connection(impl_kombu.Connection):
"""A Kombu connection that does not use the AMQP Proxy class when
creating a consumer, so we can decode the message ourself."""
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer without using ProxyCallback."""
if fanout:
self.declare_fanout_consumer(topic, proxy)
else:
self.declare_topic_consumer(topic, proxy)

View File

@ -159,12 +159,9 @@ bus for data being provided by the pollsters via the agent as well as
notification messages from other OpenStack components such as nova, notification messages from other OpenStack components such as nova,
glance, quantum, and swift. glance, quantum, and swift.
The collector loads one or more *listener* plugins, using a namespace The collector loads one or more *listener* plugins, using the namespace
under ``ceilometer.collector``. The namespace controls the exchange ``ceilometer.collector``. Each plugin can listen to any topics, but by
and topic where the listener is subscribed. For example, default it will listen to ``notifications.info``.
``ceilometer.collector.compute`` listens on the ``nova`` exchange to
the ``notifications.info`` topic while ``ceilometer.collector.image``
listens on the ``glance`` exchange for ``notifications.info``.
The plugin provides a method to list the event types it wants and a The plugin provides a method to list the event types it wants and a
callback for processing incoming messages. The registered name of the callback for processing incoming messages. The registered name of the

View File

@ -38,15 +38,13 @@ setuptools.setup(
'bin/ceilometer-collector'], 'bin/ceilometer-collector'],
py_modules=[], py_modules=[],
entry_points=textwrap.dedent(""" entry_points=textwrap.dedent("""
[ceilometer.collector.compute] [ceilometer.collector]
instance = ceilometer.compute.notifications:Instance instance = ceilometer.compute.notifications:Instance
instance_flavor = ceilometer.compute.notifications:InstanceFlavor instance_flavor = ceilometer.compute.notifications:InstanceFlavor
memory = ceilometer.compute.notifications:Memory memory = ceilometer.compute.notifications:Memory
vcpus = ceilometer.compute.notifications:VCpus vcpus = ceilometer.compute.notifications:VCpus
root_disk_size = ceilometer.compute.notifications:RootDiskSize root_disk_size = ceilometer.compute.notifications:RootDiskSize
ephemeral_disk_size = ceilometer.compute.notifications:EphemeralDiskSize ephemeral_disk_size = ceilometer.compute.notifications:EphemeralDiskSize
[ceilometer.collector.volume]
volume = ceilometer.volume.notifications:Volume volume = ceilometer.volume.notifications:Volume
volume_size = ceilometer.volume.notifications:VolumeSize volume_size = ceilometer.volume.notifications:VolumeSize

View File

@ -82,16 +82,16 @@ TEST_NOTICE = {
def test_notify(): def test_notify():
results = [] results = []
d = StubDispatcher(None, lambda x: results.append(x)) d = StubDispatcher(None, lambda x: results.append(x))
d.notify(TEST_NOTICE) d.notify("notifications.info", TEST_NOTICE)
assert len(results) >= 1 assert len(results) >= 1
counter = results[0] counter = results[0]
assert counter.name == 'instance' assert counter.name == 'instance'
def test_load_compute_plugins(): def test_load_plugins():
results = [] results = []
d = dispatcher.NotificationDispatcher( d = dispatcher.NotificationDispatcher(
'ceilometer.collector.compute', 'ceilometer.collector',
lambda x: results.append(x) lambda x: results.append(x)
) )
assert d.handlers, 'No handlers were loaded' assert d.handlers, 'No handlers were loaded'
@ -109,11 +109,21 @@ def test_load_no_plugins():
def test_notify_through_plugin(): def test_notify_through_plugin():
results = [] results = []
d = dispatcher.NotificationDispatcher( d = dispatcher.NotificationDispatcher(
'ceilometer.collector.compute', 'ceilometer.collector',
lambda x: results.append(x) lambda x: results.append(x)
) )
d.notify(TEST_NOTICE) d.notify("notifications.info", TEST_NOTICE)
assert len(results) >= 1 assert len(results) >= 1
results_name = [result.name for result in results] results_name = [result.name for result in results]
assert 'instance' in results_name assert 'instance' in results_name
assert 'memory' in results_name assert 'memory' in results_name
def test_notify_topics():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector',
lambda x: results.append(x)
)
d.notify("dont.care", TEST_NOTICE)
assert len(results) == 0