Merge "Use stevedore to load all plugins"

This commit is contained in:
Jenkins 2012-10-31 18:02:42 +00:00 committed by Gerrit Code Review
commit e1eb5d98bf
7 changed files with 147 additions and 156 deletions

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import pkg_resources
from stevedore import extension
from nova import manager
@ -33,43 +33,40 @@ PLUGIN_NAMESPACE = 'ceilometer.poll.central'
class AgentManager(manager.Manager):
def init_host(self):
self._load_plugins()
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
self.ext_manager = extension.ExtensionManager(
namespace=PLUGIN_NAMESPACE,
invoke_on_load=True,
)
return
def _load_plugins(self):
self.pollsters = []
for ep in pkg_resources.iter_entry_points(PLUGIN_NAMESPACE):
try:
plugin_class = ep.load()
plugin = plugin_class()
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
self.pollsters.append((ep.name, plugin))
LOG.info('loaded pollster %s:%s',
PLUGIN_NAMESPACE, ep.name)
except Exception as err:
LOG.warning('Failed to load pollster %s:%s',
ep.name, err)
LOG.exception(err)
if not self.pollsters:
LOG.warning('Failed to load any pollsters for %s',
PLUGIN_NAMESPACE)
return
@staticmethod
def publish_counters_from_one_pollster(ext, manager, context):
"""Used to invoke the plugins loaded by the ExtensionManager.
"""
try:
LOG.info('polling %s', ext.name)
for c in ext.obj.get_counters(manager, context):
LOG.info('COUNTER: %s', c)
publish.publish_counter(context=context,
counter=c,
topic=cfg.CONF.metering_topic,
secret=cfg.CONF.metering_secret,
source=cfg.CONF.counter_source,
)
except Exception as err:
LOG.warning('Continuing after error from %s: %s',
ext.name, err)
LOG.exception(err)
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for name, pollster in self.pollsters:
try:
LOG.info('polling %s', name)
for c in pollster.get_counters(self, context):
LOG.info('COUNTER: %s', c)
publish.publish_counter(context, c,
cfg.CONF.metering_topic,
cfg.CONF.metering_secret,
)
except Exception as err:
LOG.warning('Continuing after error from %s: %s', name, err)
LOG.exception(err)
self.ext_manager.map(self.publish_counters_from_one_pollster,
manager=self,
context=context,
)
return

View File

@ -16,9 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import itertools
import pkg_resources
from stevedore import extension
from nova import manager
@ -45,60 +43,24 @@ class CollectorManager(manager.Manager):
COLLECTOR_NAMESPACE = 'ceilometer.collector'
@staticmethod
def _load_plugins(plugin_namespace):
handlers = []
# Listen for notifications from nova
for ep in pkg_resources.iter_entry_points(plugin_namespace):
LOG.info('attempting to load notification handler for %s:%s',
plugin_namespace, ep.name)
try:
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
plugin_class = ep.load()
plugin = plugin_class()
handlers.append(plugin)
except Exception as err:
LOG.warning('Failed to load notification handler %s: %s',
ep.name, err)
LOG.exception(err)
return handlers
def init_host(self):
# Use the nova configuration flags to get
# a connection to the RPC mechanism nova
# is using.
# FIXME(dhellmann): Update Manager API to get Service instance
# with existing rpc handle.
self.connection = rpc.create_connection()
storage.register_opts(cfg.CONF)
self.storage_engine = storage.get_engine(cfg.CONF)
self.storage_conn = self.storage_engine.get_connection(cfg.CONF)
self.handlers = self._load_plugins(self.COLLECTOR_NAMESPACE)
self.ext_manager = extension.ExtensionManager(self.COLLECTOR_NAMESPACE,
invoke_on_load=True,
)
if not self.handlers:
if not list(self.ext_manager):
LOG.warning('Failed to load any notification handlers for %s',
self.plugin_namespace)
self.COLLECTOR_NAMESPACE)
# FIXME(dhellmann): Should be using create_worker(), except
# that notification messages do not conform to the RPC
# invocation protocol (they do not include a "method"
# parameter).
# FIXME(dhellmann): Break this out into its own method
# so we can test the subscription logic.
for handler in self.handlers:
LOG.debug('Event types: %r', handler.get_event_types())
for exchange_topic in handler.get_exchange_topics(cfg.CONF):
for topic in exchange_topic.topics:
self.connection.declare_topic_consumer(
queue_name="ceilometer.notifications",
topic=topic,
exchange_name=exchange_topic.exchange,
callback=self.process_notification,
)
self.ext_manager.map(self._setup_subscription)
# Set ourselves up as a separate worker for the metering data,
# since the default for manager is to use create_consumer().
@ -110,15 +72,36 @@ class CollectorManager(manager.Manager):
self.connection.consume_in_thread()
def _setup_subscription(self, ext, *args, **kwds):
handler = ext.obj
LOG.debug('Event types: %r', handler.get_event_types())
for exchange_topic in handler.get_exchange_topics(cfg.CONF):
for topic in exchange_topic.topics:
# FIXME(dhellmann): Should be using create_worker(), except
# that notification messages do not conform to the RPC
# invocation protocol (they do not include a "method"
# parameter).
self.connection.declare_topic_consumer(
queue_name="ceilometer.notifications",
topic=topic,
exchange_name=exchange_topic.exchange,
callback=self.process_notification,
)
def process_notification(self, notification):
"""Make a notification processed by an handler."""
LOG.debug('notification %r', notification.get('event_type'))
for handler in self.handlers:
if notification['event_type'] in handler.get_event_types():
for c in handler.process_notification(notification):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_counter(c)
self.ext_manager.map(self._process_notification_for_ext,
notification=notification,
)
def _process_notification_for_ext(self, ext, notification):
handler = ext.obj
if notification['event_type'] in handler.get_event_types():
for c in handler.process_notification(notification):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_counter(c)
@staticmethod
def publish_counter(counter):

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import pkg_resources
from stevedore import extension
from nova import manager
@ -33,47 +33,41 @@ PLUGIN_NAMESPACE = 'ceilometer.poll.compute'
class AgentManager(manager.Manager):
def init_host(self):
self._load_plugins()
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
self.ext_manager = extension.ExtensionManager(
namespace=PLUGIN_NAMESPACE,
invoke_on_load=True,
)
return
def _load_plugins(self):
self.pollsters = []
for ep in pkg_resources.iter_entry_points(PLUGIN_NAMESPACE):
try:
plugin_class = ep.load()
plugin = plugin_class()
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
self.pollsters.append((ep.name, plugin))
LOG.info('loaded pollster %s:%s',
PLUGIN_NAMESPACE, ep.name)
except Exception as err:
LOG.warning('Failed to load pollster %s:%s',
ep.name, err)
LOG.exception(err)
if not self.pollsters:
LOG.warning('Failed to load any pollsters for %s',
PLUGIN_NAMESPACE)
return
@staticmethod
def publish_counters_from_one_pollster(ext, manager, context, instance):
"""Used to invoke the plugins loaded by the ExtensionManager.
"""
try:
LOG.info('polling %s', ext.name)
for c in ext.obj.get_counters(manager, instance):
LOG.info('COUNTER: %s', c)
publish.publish_counter(context, c,
cfg.CONF.metering_topic,
cfg.CONF.metering_secret,
)
except Exception as err:
LOG.warning('Continuing after error from %s for %s: %s',
ext.name, instance.id, err)
LOG.exception(err)
def poll_instance(self, context, instance):
"""Poll one instance."""
for name, pollster in self.pollsters:
try:
LOG.info('polling %s', name)
for c in pollster.get_counters(self, instance):
LOG.info('COUNTER: %s', c)
publish.publish_counter(context, c,
cfg.CONF.metering_topic,
cfg.CONF.metering_secret,
)
except Exception as err:
LOG.warning('Continuing after error from %s for %s: %s',
name, instance.name, err)
LOG.exception(err)
self.ext_manager.map(self.publish_counters_from_one_pollster,
manager=self,
context=context,
instance=instance,
)
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""

View File

@ -20,12 +20,15 @@
from datetime import datetime
from mock import patch
from stevedore import extension
from stevedore.tests import manager as test_manager
from ceilometer import meter
from ceilometer.collector import manager
from ceilometer.openstack.common import cfg
from ceilometer.storage import base
from ceilometer.openstack.common import rpc
from ceilometer.openstack.common import cfg
from ceilometer.tests import base as tests_base
from ceilometer.compute import notifications
@ -80,17 +83,6 @@ TEST_NOTICE = {
}
class StubConnection(object):
def declare_topic_consumer(*args, **kwargs):
pass
def create_worker(*args, **kwargs):
pass
def consume_in_thread(self):
pass
class TestCollectorManager(tests_base.TestCase):
def setUp(self):
@ -100,9 +92,12 @@ class TestCollectorManager(tests_base.TestCase):
#cfg.CONF.metering_secret = 'not-so-secret'
def test_init_host(self):
self.stubs.Set(rpc, 'create_connection', lambda: StubConnection())
cfg.CONF.database_connection = 'log://localhost'
self.mgr.init_host()
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the manager
# configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.mgr.init_host()
def test_valid_message(self):
msg = {'counter_name': 'test',
@ -186,18 +181,20 @@ class TestCollectorManager(tests_base.TestCase):
self.mgr.record_metering_data(self.ctx, msg)
self.mox.VerifyAll()
def test_load_plugins(self):
results = self.mgr._load_plugins(self.mgr.COLLECTOR_NAMESPACE)
self.assert_(len(results) > 0)
def test_load_no_plugins(self):
results = self.mgr._load_plugins("foobar.namespace")
self.assertEqual(results, [])
def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the manager
# configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.mgr.init_host()
results = []
self.stubs.Set(self.mgr, 'publish_counter',
lambda counter: results.append(counter))
self.mgr.handlers = [notifications.Instance()]
self.stubs.Set(self.mgr, 'publish_counter', results.append)
self.mgr.ext_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
notifications.Instance(),
),
])
self.mgr.process_notification(TEST_NOTICE)
self.assert_(len(results) >= 1)

View File

@ -20,6 +20,8 @@
import datetime
from stevedore import extension
from ceilometer.compute import manager
from ceilometer import counter
from ceilometer import publish
@ -31,7 +33,7 @@ from ceilometer.openstack.common import cfg
def test_load_plugins():
mgr = manager.AgentManager()
mgr.init_host()
assert mgr.pollsters, 'Failed to load any plugins'
assert list(mgr.ext_manager), 'Failed to load any plugins'
return
@ -63,7 +65,15 @@ class TestRunTasks(base.TestCase):
self.notifications = []
self.stubs.Set(publish, 'publish_counter', self.faux_notify)
self.mgr = manager.AgentManager()
self.mgr.pollsters = [('test', self.Pollster())]
self.mgr.ext_manager = extension.ExtensionManager('fake',
invoke_on_load=False,
)
self.mgr.ext_manager.extensions = [extension.Extension('test',
None,
None,
self.Pollster(),
),
]
# Set up a fake instance value to be returned by
# instance_get_all_by_host() so when the manager gets the list
# of instances to poll we can control the results.

View File

@ -21,6 +21,9 @@
import mock
import datetime
from stevedore import extension
from stevedore.tests import manager as test_manager
from nova import flags
from nova import db
from nova import context
@ -124,7 +127,14 @@ class TestNovaNotifier(base.TestCase):
self.stubs.Set(publish, 'publish_counter', self.do_nothing)
nova_notifier._initialize_config_options = False
nova_notifier.initialize_manager()
nova_notifier._agent_manager.pollsters = [('test', self.Pollster())]
nova_notifier._agent_manager.ext_manager = \
test_manager.TestExtensionManager([
extension.Extension('test',
None,
None,
self.Pollster(),
),
])
def tearDown(self):
self.Pollster.counters = []

View File

@ -9,6 +9,6 @@ sqlalchemy-migrate>=0.7.2
eventlet
anyjson>=0.3.1
Flask==0.9
stevedore>=0.4
stevedore>=0.5
python-glanceclient
python-cinderclient