Merge "Refactor keystone handling in discovery manager"
This commit is contained in:
commit
33e4da9464
@ -23,6 +23,7 @@ import fnmatch
|
||||
import itertools
|
||||
import random
|
||||
|
||||
from keystoneclient import exceptions as ks_exceptions
|
||||
from oslo_config import cfg
|
||||
from oslo_context import context
|
||||
from oslo_log import log
|
||||
@ -33,7 +34,7 @@ from stevedore import extension
|
||||
|
||||
from ceilometer.agent import plugin_base
|
||||
from ceilometer import coordination
|
||||
from ceilometer.i18n import _, _LI
|
||||
from ceilometer.i18n import _, _LI, _LE, _LW
|
||||
from ceilometer import keystone_client
|
||||
from ceilometer import messaging
|
||||
from ceilometer import pipeline
|
||||
@ -71,6 +72,12 @@ cfg.CONF.register_opts(OPTS)
|
||||
cfg.CONF.register_opts(POLLING_OPTS, group='polling')
|
||||
cfg.CONF.import_opt('telemetry_driver', 'ceilometer.publisher.messaging',
|
||||
group='publisher_notifier')
|
||||
cfg.CONF.import_group('service_types', 'ceilometer.energy.kwapi')
|
||||
cfg.CONF.import_group('service_types', 'ceilometer.image.glance')
|
||||
cfg.CONF.import_group('service_types', 'ceilometer.neutron_client')
|
||||
cfg.CONF.import_group('service_types', 'ceilometer.nova_client')
|
||||
cfg.CONF.import_group('service_types', 'ceilometer.objectstore.rgw')
|
||||
cfg.CONF.import_group('service_types', 'ceilometer.objectstore.swift')
|
||||
|
||||
|
||||
class PollsterListForbidden(Exception):
|
||||
@ -271,6 +278,9 @@ class AgentManager(service_base.BaseService):
|
||||
driver=cfg.CONF.publisher_notifier.telemetry_driver,
|
||||
publisher_id="ceilometer.api")
|
||||
|
||||
self._keystone = None
|
||||
self._keystone_last_exception = None
|
||||
|
||||
@staticmethod
|
||||
def _get_ext_mgr(namespace):
|
||||
def _catch_extension_load_error(mgr, ep, exc):
|
||||
@ -385,12 +395,31 @@ class AgentManager(service_base.BaseService):
|
||||
super(AgentManager, self).stop()
|
||||
|
||||
def interval_task(self, task):
|
||||
try:
|
||||
self.keystone = keystone_client.get_client()
|
||||
except Exception as e:
|
||||
self.keystone = e
|
||||
# NOTE(sileht): remove the previous keystone client
|
||||
# and exception to get a new one in this polling cycle.
|
||||
self._keystone = None
|
||||
self._keystone_last_exception = None
|
||||
|
||||
task.poll_and_notify()
|
||||
|
||||
@property
|
||||
def keystone(self):
|
||||
# NOTE(sileht): we do lazy loading of the keystone client
|
||||
# for multiple reasons:
|
||||
# * don't use it if no plugin need it
|
||||
# * use only one client for all plugins per polling cycle
|
||||
if self._keystone is None and self._keystone_last_exception is None:
|
||||
try:
|
||||
self._keystone = keystone_client.get_client()
|
||||
self._keystone_last_exception = None
|
||||
except ks_exceptions.ClientException as e:
|
||||
self._keystone = None
|
||||
self._keystone_last_exception = e
|
||||
if self._keystone is not None:
|
||||
return self._keystone
|
||||
else:
|
||||
raise self._keystone_last_exception
|
||||
|
||||
@staticmethod
|
||||
def _parse_discoverer(url):
|
||||
s = urlparse.urlparse(url)
|
||||
@ -413,6 +442,18 @@ class AgentManager(service_base.BaseService):
|
||||
discoverer = self._discoverer(name)
|
||||
if discoverer:
|
||||
try:
|
||||
if discoverer.KEYSTONE_REQUIRED_FOR_SERVICE:
|
||||
service_type = getattr(
|
||||
cfg.CONF.service_types,
|
||||
discoverer.KEYSTONE_REQUIRED_FOR_SERVICE)
|
||||
if not self.keystone.service_catalog.get_endpoints(
|
||||
service_type=service_type):
|
||||
LOG.warning(_LW(
|
||||
'Skipping %(name)s, %(service_type)s service '
|
||||
'is not registered in keystone'),
|
||||
{'name': name, 'service_type': service_type})
|
||||
continue
|
||||
|
||||
discovered = discoverer.discover(self, param)
|
||||
partitioned = self.partition_coordinator.extract_my_subset(
|
||||
self.construct_group_id(discoverer.group_id),
|
||||
@ -420,6 +461,9 @@ class AgentManager(service_base.BaseService):
|
||||
resources.extend(partitioned)
|
||||
if discovery_cache is not None:
|
||||
discovery_cache[url] = partitioned
|
||||
except ks_exceptions.ClientException as e:
|
||||
LOG.error(_LE('Skipping %(name)s, keystone issue: '
|
||||
'%(exc)s'), {'name': name, 'exc': e})
|
||||
except Exception as err:
|
||||
LOG.exception(_('Unable to discover resources: %s') % err)
|
||||
else:
|
||||
|
@ -18,71 +18,20 @@
|
||||
import abc
|
||||
import collections
|
||||
|
||||
from keystoneclient.v2_0 import client as ksclient
|
||||
from oslo_config import cfg
|
||||
from oslo_context import context
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
import six
|
||||
from stevedore import extension
|
||||
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer import messaging
|
||||
|
||||
cfg.CONF.import_group('service_credentials', 'ceilometer.service')
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
ExchangeTopics = collections.namedtuple('ExchangeTopics',
|
||||
['exchange', 'topics'])
|
||||
|
||||
|
||||
def _get_keystone():
|
||||
try:
|
||||
return ksclient.Client(
|
||||
username=cfg.CONF.service_credentials.os_username,
|
||||
password=cfg.CONF.service_credentials.os_password,
|
||||
tenant_id=cfg.CONF.service_credentials.os_tenant_id,
|
||||
tenant_name=cfg.CONF.service_credentials.os_tenant_name,
|
||||
cacert=cfg.CONF.service_credentials.os_cacert,
|
||||
auth_url=cfg.CONF.service_credentials.os_auth_url,
|
||||
region_name=cfg.CONF.service_credentials.os_region_name,
|
||||
insecure=cfg.CONF.service_credentials.insecure)
|
||||
except Exception as e:
|
||||
return e
|
||||
|
||||
|
||||
def check_keystone(service_type=None):
|
||||
"""Decorator function to check if manager has valid keystone client.
|
||||
|
||||
Also checks if the service is registered/enabled in Keystone.
|
||||
|
||||
:param service_type: name of service in Keystone
|
||||
"""
|
||||
def wrapped(f):
|
||||
def func(self, *args, **kwargs):
|
||||
manager = kwargs.get('manager')
|
||||
if not manager and len(args) > 0:
|
||||
manager = args[0]
|
||||
keystone = getattr(manager, 'keystone', None)
|
||||
if not keystone:
|
||||
keystone = _get_keystone()
|
||||
if isinstance(keystone, Exception):
|
||||
LOG.error(_('Skip due to keystone error %s'),
|
||||
keystone if keystone else '')
|
||||
return iter([])
|
||||
elif service_type:
|
||||
endpoints = keystone.service_catalog.get_endpoints(
|
||||
service_type=service_type)
|
||||
if not endpoints:
|
||||
LOG.warning(_('Skipping because %s service is not '
|
||||
'registered in keystone') % service_type)
|
||||
return iter([])
|
||||
return f(self, *args, **kwargs)
|
||||
return func
|
||||
return wrapped
|
||||
|
||||
|
||||
class PluginBase(object):
|
||||
"""Base class for all plugins."""
|
||||
|
||||
@ -280,6 +229,9 @@ class PollsterBase(PluginBase):
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class DiscoveryBase(object):
|
||||
KEYSTONE_REQUIRED_FOR_SERVICE = None
|
||||
"""Service type required in keystone catalog to works"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor.
|
||||
|
@ -13,13 +13,12 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from ceilometer.agent import plugin_base
|
||||
from ceilometer import neutron_client
|
||||
|
||||
|
||||
class _BaseServicesDiscovery(plugin_base.DiscoveryBase):
|
||||
REQUIRED_KEYSTONE_FOR_SERVICE = 'neutron'
|
||||
|
||||
def __init__(self):
|
||||
super(_BaseServicesDiscovery, self).__init__()
|
||||
@ -27,7 +26,6 @@ class _BaseServicesDiscovery(plugin_base.DiscoveryBase):
|
||||
|
||||
|
||||
class LBPoolsDiscovery(_BaseServicesDiscovery):
|
||||
@plugin_base.check_keystone(cfg.CONF.service_types.neutron)
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor."""
|
||||
|
||||
@ -37,7 +35,6 @@ class LBPoolsDiscovery(_BaseServicesDiscovery):
|
||||
|
||||
|
||||
class LBVipsDiscovery(_BaseServicesDiscovery):
|
||||
@plugin_base.check_keystone(cfg.CONF.service_types.neutron)
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor."""
|
||||
|
||||
@ -47,7 +44,6 @@ class LBVipsDiscovery(_BaseServicesDiscovery):
|
||||
|
||||
|
||||
class LBMembersDiscovery(_BaseServicesDiscovery):
|
||||
@plugin_base.check_keystone(cfg.CONF.service_types.neutron)
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor."""
|
||||
|
||||
@ -57,7 +53,6 @@ class LBMembersDiscovery(_BaseServicesDiscovery):
|
||||
|
||||
|
||||
class LBHealthMonitorsDiscovery(_BaseServicesDiscovery):
|
||||
@plugin_base.check_keystone(cfg.CONF.service_types.neutron)
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor."""
|
||||
|
||||
@ -66,7 +61,6 @@ class LBHealthMonitorsDiscovery(_BaseServicesDiscovery):
|
||||
|
||||
|
||||
class VPNServicesDiscovery(_BaseServicesDiscovery):
|
||||
@plugin_base.check_keystone(cfg.CONF.service_types.neutron)
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor."""
|
||||
|
||||
@ -76,7 +70,6 @@ class VPNServicesDiscovery(_BaseServicesDiscovery):
|
||||
|
||||
|
||||
class IPSecConnectionsDiscovery(_BaseServicesDiscovery):
|
||||
@plugin_base.check_keystone(cfg.CONF.service_types.neutron)
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor."""
|
||||
|
||||
@ -85,7 +78,6 @@ class IPSecConnectionsDiscovery(_BaseServicesDiscovery):
|
||||
|
||||
|
||||
class FirewallDiscovery(_BaseServicesDiscovery):
|
||||
@plugin_base.check_keystone(cfg.CONF.service_types.neutron)
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor."""
|
||||
|
||||
@ -95,7 +87,6 @@ class FirewallDiscovery(_BaseServicesDiscovery):
|
||||
|
||||
|
||||
class FirewallPolicyDiscovery(_BaseServicesDiscovery):
|
||||
@plugin_base.check_keystone(cfg.CONF.service_types.neutron)
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor."""
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
import shutil
|
||||
|
||||
import eventlet
|
||||
from keystoneclient import exceptions as ks_exceptions
|
||||
import mock
|
||||
from novaclient import client as novaclient
|
||||
from oslo_service import service as os_service
|
||||
@ -166,12 +167,9 @@ class TestManager(base.BaseTestCase):
|
||||
|
||||
|
||||
class TestPollsterKeystone(agentbase.TestPollster):
|
||||
@plugin_base.check_keystone
|
||||
def get_samples(self, manager, cache, resources):
|
||||
func = super(TestPollsterKeystone, self).get_samples
|
||||
return func(manager=manager,
|
||||
cache=cache,
|
||||
resources=resources)
|
||||
# Just try to use keystone, that will raise an exception
|
||||
manager.keystone.tenants.list()
|
||||
|
||||
|
||||
class TestPollsterPollingException(agentbase.TestPollster):
|
||||
@ -283,7 +281,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
||||
"""Test for bug 1316532."""
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'keystoneclient.v2_0.client.Client',
|
||||
side_effect=Exception))
|
||||
side_effect=ks_exceptions.ClientException))
|
||||
self.pipeline_cfg = {
|
||||
'sources': [{
|
||||
'name': "test_keystone",
|
||||
|
@ -50,10 +50,9 @@ ENDPOINT = 'end://point'
|
||||
|
||||
class TestManager(manager.AgentManager):
|
||||
|
||||
@mock.patch('keystoneclient.v2_0.client', mock.MagicMock())
|
||||
def __init__(self):
|
||||
super(TestManager, self).__init__()
|
||||
self.keystone = mock.Mock()
|
||||
self._keystone = mock.Mock()
|
||||
|
||||
|
||||
class _BaseTestCase(base.BaseTestCase):
|
||||
@ -131,7 +130,7 @@ class TestEnergyPollsterCache(_BaseTestCase):
|
||||
cache = {
|
||||
'%s-%s' % (ENDPOINT, self.pollster_cls.CACHE_KEY_PROBE): [probe],
|
||||
}
|
||||
self.manager.keystone = mock.Mock()
|
||||
self.manager._keystone = mock.Mock()
|
||||
pollster = self.pollster_cls()
|
||||
with mock.patch.object(pollster, '_get_probes') as do_not_call:
|
||||
do_not_call.side_effect = AssertionError('should not be called')
|
||||
|
@ -117,8 +117,8 @@ class TestManager(manager.AgentManager):
|
||||
|
||||
def __init__(self):
|
||||
super(TestManager, self).__init__()
|
||||
self.keystone = mock.Mock()
|
||||
self.keystone.service_catalog.get_endpoints = mock.Mock(
|
||||
self._keystone = mock.Mock()
|
||||
self._keystone.service_catalog.get_endpoints = mock.Mock(
|
||||
return_value={'image': mock.ANY})
|
||||
|
||||
|
||||
|
@ -33,8 +33,8 @@ class TestFloatingIPPollster(base.BaseTestCase):
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
self.context = context.get_admin_context()
|
||||
self.manager = manager.AgentManager()
|
||||
self.manager.keystone = mock.Mock()
|
||||
self.manager.keystone.service_catalog.get_endpoints = mock.Mock(
|
||||
self.manager._keystone = mock.Mock()
|
||||
self.manager._keystone.service_catalog.get_endpoints = mock.Mock(
|
||||
return_value={'network': mock.ANY})
|
||||
self.pollster = floatingip.FloatingIPPollster()
|
||||
fake_ips = self.fake_get_ips()
|
||||
|
@ -50,8 +50,8 @@ class TestManager(manager.AgentManager):
|
||||
|
||||
def __init__(self):
|
||||
super(TestManager, self).__init__()
|
||||
self.keystone = mock.MagicMock()
|
||||
self.keystone.service_catalog.url_for.return_value = '/endpoint'
|
||||
self._keystone = mock.MagicMock()
|
||||
self._keystone.service_catalog.url_for.return_value = '/endpoint'
|
||||
|
||||
|
||||
class TestRgwPollster(testscenarios.testcase.WithScenarios,
|
||||
|
@ -67,7 +67,8 @@ class TestManager(manager.AgentManager):
|
||||
|
||||
def __init__(self):
|
||||
super(TestManager, self).__init__()
|
||||
self.keystone = mock.MagicMock()
|
||||
self._keystone = mock.MagicMock()
|
||||
self._keystone_last_exception = None
|
||||
|
||||
|
||||
class TestSwiftPollster(testscenarios.testcase.WithScenarios,
|
||||
|
Loading…
Reference in New Issue
Block a user