From 40ef8d455efa2d4b9d8da7fa9b25fdbd1d1e0e73 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Mon, 24 Sep 2012 19:04:34 +0200 Subject: [PATCH] Rework RPC connection We are now able to listen to multiple topics, we don't use Nova RPC anymore and we have a single namespace for all collector plugins. Change-Id: I23603601cb285e9bd71beabfd9558fe903c24308 Signed-off-by: Julien Danjou --- ceilometer/collector/dispatcher.py | 17 +++++++++------- ceilometer/collector/manager.py | 26 ++++++++++--------------- ceilometer/plugin.py | 6 ++++++ ceilometer/rpc/__init__.py | 31 ------------------------------ doc/source/architecture.rst | 9 +++------ setup.py | 4 +--- tests/collector/test_dispatcher.py | 20 ++++++++++++++----- 7 files changed, 45 insertions(+), 68 deletions(-) delete mode 100644 ceilometer/rpc/__init__.py diff --git a/ceilometer/collector/dispatcher.py b/ceilometer/collector/dispatcher.py index e7ae2d3487..bcf23c6375 100644 --- a/ceilometer/collector/dispatcher.py +++ b/ceilometer/collector/dispatcher.py @@ -34,6 +34,7 @@ class NotificationDispatcher(object): self.plugin_namespace = plugin_namespace self.publish_func = publish_func self.handlers = {} + self.topics = set() self._load_plugins() def _load_plugins(self): @@ -42,13 +43,14 @@ class NotificationDispatcher(object): LOG.info('attempting to load notification handler for %s:%s', self.plugin_namespace, ep.name) try: - plugin_class = ep.load() - plugin = plugin_class() # FIXME(dhellmann): Currently assumes all plugins are # enabled when they are discovered and # importable. Need to add check against global # configuration flag and check that asks the plugin if # it should be enabled. + plugin_class = ep.load() + plugin = plugin_class() + self.topics.update(plugin.topics) for event_type in plugin.get_event_types(): LOG.info('subscribing %s handler to %s events', ep.name, event_type) @@ -61,15 +63,16 @@ class NotificationDispatcher(object): LOG.warning('Failed to load any notification handlers for %s', self.plugin_namespace) - def notify(self, body): + def notify(self, topic, body): """Dispatch the notification to the appropriate handler and publish the counters returned. """ event_type = body.get('event_type') LOG.info('NOTIFICATION: %s', event_type) for handler in self.handlers.get(event_type, []): - for c in handler.process_notification(body): - LOG.info('COUNTER: %s', c) - # FIXME(dhellmann): Spawn green thread? - self.publish_func(c) + if topic in handler.topics: + for c in handler.process_notification(body): + LOG.info('COUNTER: %s', c) + # FIXME(dhellmann): Spawn green thread? + self.publish_func(c) return diff --git a/ceilometer/collector/manager.py b/ceilometer/collector/manager.py index fb072c7e59..04d25c612a 100644 --- a/ceilometer/collector/manager.py +++ b/ceilometer/collector/manager.py @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +import functools + from nova import context from nova import manager @@ -28,8 +30,7 @@ from ceilometer.openstack.common import log from ceilometer.openstack.common import timeutils from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher -# FIXME(dhellmann): There must be another way to do this. Import -# rabbit_notifier to register notification_topics flag +# Import rabbit_notifier to register notification_topics flag import ceilometer.openstack.common.notifier.rabbit_notifier try: import ceilometer.openstack.common.rpc as rpc @@ -40,8 +41,7 @@ except ImportError: LOG = log.getLogger(__name__) -COMPUTE_COLLECTOR_NAMESPACE = 'ceilometer.collector.compute' -VOLUME_COLLECTOR_NAMESPACE = 'ceilometer.collector.volume' +COLLECTOR_NAMESPACE = 'ceilometer.collector' class CollectorManager(manager.Manager): @@ -56,24 +56,18 @@ class CollectorManager(manager.Manager): self.storage_engine = storage.get_engine(cfg.CONF) self.storage_conn = self.storage_engine.get_connection(cfg.CONF) - self.compute_handler = dispatcher.NotificationDispatcher( - COMPUTE_COLLECTOR_NAMESPACE, - self._publish_counter, - ) - self.volume_handler = dispatcher.NotificationDispatcher( - VOLUME_COLLECTOR_NAMESPACE, + self.handler = dispatcher.NotificationDispatcher( + COLLECTOR_NAMESPACE, self._publish_counter, ) # FIXME(dhellmann): Should be using create_worker(), except # that notification messages do not conform to the RPC # invocation protocol (they do not include a "method" # parameter). - self.connection.declare_topic_consumer( - topic='%s.info' % cfg.CONF.notification_topics[0], - callback=self.compute_handler.notify) - self.connection.declare_topic_consumer( - topic='%s.info' % cfg.CONF.notification_topics[0], - callback=self.volume_handler.notify) + for topic in self.handler.topics: + self.connection.declare_topic_consumer( + topic=topic, + callback=functools.partial(self.handler.notify, topic)) # Set ourselves up as a separate worker for the metering data, # since the default for manager is to use create_consumer(). diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index a57bca6627..5348ad1212 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -20,12 +20,18 @@ import abc +from ceilometer.openstack.common import cfg + class NotificationBase(object): """Base class for plugins that support the notification API.""" __metaclass__ = abc.ABCMeta + def __init__(self): + self.topics = set(topic + ".info" + for topic in cfg.CONF.notification_topics) + @abc.abstractmethod def get_event_types(self): """Return a sequence of strings defining the event types to be diff --git a/ceilometer/rpc/__init__.py b/ceilometer/rpc/__init__.py deleted file mode 100644 index a852c01590..0000000000 --- a/ceilometer/rpc/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- encoding: utf-8 -*- -# -# Copyright © 2012 eNovance -# -# Author: Julien Danjou -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from ceilometer.openstack.common.rpc import impl_kombu - - -class Connection(impl_kombu.Connection): - """A Kombu connection that does not use the AMQP Proxy class when - creating a consumer, so we can decode the message ourself.""" - - def create_consumer(self, topic, proxy, fanout=False): - """Create a consumer without using ProxyCallback.""" - if fanout: - self.declare_fanout_consumer(topic, proxy) - else: - self.declare_topic_consumer(topic, proxy) diff --git a/doc/source/architecture.rst b/doc/source/architecture.rst index 4d322a07d2..0d760d2bfe 100644 --- a/doc/source/architecture.rst +++ b/doc/source/architecture.rst @@ -159,12 +159,9 @@ bus for data being provided by the pollsters via the agent as well as notification messages from other OpenStack components such as nova, glance, quantum, and swift. -The collector loads one or more *listener* plugins, using a namespace -under ``ceilometer.collector``. The namespace controls the exchange -and topic where the listener is subscribed. For example, -``ceilometer.collector.compute`` listens on the ``nova`` exchange to -the ``notifications.info`` topic while ``ceilometer.collector.image`` -listens on the ``glance`` exchange for ``notifications.info``. +The collector loads one or more *listener* plugins, using the namespace +``ceilometer.collector``. Each plugin can listen to any topics, but by +default it will listen to ``notifications.info``. The plugin provides a method to list the event types it wants and a callback for processing incoming messages. The registered name of the diff --git a/setup.py b/setup.py index ccb089acce..3c1e89a754 100755 --- a/setup.py +++ b/setup.py @@ -38,15 +38,13 @@ setuptools.setup( 'bin/ceilometer-collector'], py_modules=[], entry_points=textwrap.dedent(""" - [ceilometer.collector.compute] + [ceilometer.collector] instance = ceilometer.compute.notifications:Instance instance_flavor = ceilometer.compute.notifications:InstanceFlavor memory = ceilometer.compute.notifications:Memory vcpus = ceilometer.compute.notifications:VCpus root_disk_size = ceilometer.compute.notifications:RootDiskSize ephemeral_disk_size = ceilometer.compute.notifications:EphemeralDiskSize - - [ceilometer.collector.volume] volume = ceilometer.volume.notifications:Volume volume_size = ceilometer.volume.notifications:VolumeSize diff --git a/tests/collector/test_dispatcher.py b/tests/collector/test_dispatcher.py index ef55db87c3..82aff2a481 100644 --- a/tests/collector/test_dispatcher.py +++ b/tests/collector/test_dispatcher.py @@ -82,16 +82,16 @@ TEST_NOTICE = { def test_notify(): results = [] d = StubDispatcher(None, lambda x: results.append(x)) - d.notify(TEST_NOTICE) + d.notify("notifications.info", TEST_NOTICE) assert len(results) >= 1 counter = results[0] assert counter.name == 'instance' -def test_load_compute_plugins(): +def test_load_plugins(): results = [] d = dispatcher.NotificationDispatcher( - 'ceilometer.collector.compute', + 'ceilometer.collector', lambda x: results.append(x) ) assert d.handlers, 'No handlers were loaded' @@ -109,11 +109,21 @@ def test_load_no_plugins(): def test_notify_through_plugin(): results = [] d = dispatcher.NotificationDispatcher( - 'ceilometer.collector.compute', + 'ceilometer.collector', lambda x: results.append(x) ) - d.notify(TEST_NOTICE) + d.notify("notifications.info", TEST_NOTICE) assert len(results) >= 1 results_name = [result.name for result in results] assert 'instance' in results_name assert 'memory' in results_name + + +def test_notify_topics(): + results = [] + d = dispatcher.NotificationDispatcher( + 'ceilometer.collector', + lambda x: results.append(x) + ) + d.notify("dont.care", TEST_NOTICE) + assert len(results) == 0