Merge "Migrate the rest of the central agent pollsters to use discoveries"

This commit is contained in:
Jenkins 2014-09-09 15:06:24 +00:00 committed by Gerrit Code Review
commit 261c61b63e
23 changed files with 507 additions and 320 deletions

View File

@ -86,15 +86,22 @@ class PollingTask(object):
agent_resources = self.manager.discover()
with self.publish_context as publisher:
cache = {}
discovery_cache = {}
for pollster in self.pollsters:
key = pollster.name
LOG.info(_("Polling pollster %s"), key)
pollster_resources = None
if pollster.obj.default_discovery:
pollster_resources = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
source_resources = list(self.resources[key].resources)
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=source_resources or agent_resources,
resources=(source_resources or
pollster_resources or
agent_resources)
))
publisher(samples)
except Exception as err:
@ -187,9 +194,12 @@ class AgentManager(os_service.Service):
return d.obj
return None
def discover(self, discovery=None):
def discover(self, discovery=None, discovery_cache=None):
resources = []
for url in (discovery or self.default_discovery):
if discovery_cache is not None and url in discovery_cache:
resources.extend(discovery_cache[url])
continue
name, param = self._parse_discoverer(url)
discoverer = self._discoverer(name)
if discoverer:
@ -199,6 +209,8 @@ class AgentManager(os_service.Service):
self._construct_group_id(discoverer.group_id),
discovered)
resources.extend(partitioned)
if discovery_cache is not None:
discovery_cache[url] = partitioned
except Exception as err:
LOG.exception(_('Unable to discover resources: %s') % err)
else:

View File

@ -0,0 +1,54 @@
#
# Copyright 2014 Red Hat, Inc
#
# Author: Nejc Saje <nsaje@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient.v2_0 import client as ksclient
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _LW
from ceilometer.openstack.common import log
from ceilometer import plugin
LOG = log.getLogger(__name__)
cfg.CONF.import_group('service_credentials', 'ceilometer.service')
class EndpointDiscovery(plugin.DiscoveryBase):
def __init__(self):
super(EndpointDiscovery, self).__init__()
self.keystone = ksclient.Client(
username=cfg.CONF.service_credentials.os_username,
password=cfg.CONF.service_credentials.os_password,
tenant_id=cfg.CONF.service_credentials.os_tenant_id,
tenant_name=cfg.CONF.service_credentials.os_tenant_name,
cacert=cfg.CONF.service_credentials.os_cacert,
auth_url=cfg.CONF.service_credentials.os_auth_url,
region_name=cfg.CONF.service_credentials.os_region_name,
insecure=cfg.CONF.service_credentials.insecure)
def discover(self, param=None):
if not param:
return []
endpoints = self.keystone.service_catalog.get_urls(
service_type=param,
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type,
region_name=cfg.CONF.service_credentials.os_region_name)
if not endpoints:
LOG.warning(_LW('No endpoints found for service %s'), param)
return []
else:
return endpoints

View File

@ -32,6 +32,11 @@ class ComputePollster(plugin.PollsterBase):
It supports the polling API on the compute node.
"""
@property
def default_discovery(self):
# get resources from agent-default discovery
return None
@abc.abstractmethod
def get_samples(self, manager, cache, resources):
"""Return a sequence of Counter instances from polling the resources.

View File

@ -17,7 +17,6 @@
import datetime
from keystoneclient import exceptions
from oslo.config import cfg
import requests
import six
@ -55,25 +54,27 @@ class KwapiClient(object):
class _Base(plugin.CentralPollster):
"""Base class for the Kwapi pollster, derived from CentralPollster."""
@property
def default_discovery(self):
return 'endpoint:energy'
@staticmethod
def get_kwapi_client(ksclient):
def get_kwapi_client(ksclient, endpoint):
"""Returns a KwapiClient configured with the proper url and token."""
endpoint = ksclient.service_catalog.url_for(
service_type='energy',
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type)
return KwapiClient(endpoint, ksclient.auth_token)
CACHE_KEY_PROBE = 'kwapi.probes'
def _iter_probes(self, ksclient, cache):
def _iter_probes(self, ksclient, cache, endpoint):
"""Iterate over all probes."""
if self.CACHE_KEY_PROBE not in cache:
cache[self.CACHE_KEY_PROBE] = self._get_probes(ksclient)
return iter(cache[self.CACHE_KEY_PROBE])
key = '%s-%s' % (endpoint, self.CACHE_KEY_PROBE)
if key not in cache:
cache[key] = self._get_probes(ksclient, endpoint)
return iter(cache[key])
def _get_probes(self, ksclient):
def _get_probes(self, ksclient, endpoint):
try:
client = self.get_kwapi_client(ksclient)
client = self.get_kwapi_client(ksclient, endpoint)
except exceptions.EndpointNotFound:
LOG.debug(_("Kwapi endpoint not found"))
return []
@ -82,39 +83,39 @@ class _Base(plugin.CentralPollster):
class EnergyPollster(_Base):
"""Measures energy consumption."""
@plugin.check_keystone('energy')
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
"""Returns all samples."""
for probe in self._iter_probes(manager.keystone, cache):
yield sample.Sample(
name='energy',
type=sample.TYPE_CUMULATIVE,
unit='kWh',
volume=probe['kwh'],
user_id=None,
project_id=None,
resource_id=probe['id'],
timestamp=datetime.datetime.fromtimestamp(
probe['timestamp']).isoformat(),
resource_metadata={}
)
for endpoint in resources:
for probe in self._iter_probes(manager.keystone, cache, endpoint):
yield sample.Sample(
name='energy',
type=sample.TYPE_CUMULATIVE,
unit='kWh',
volume=probe['kwh'],
user_id=None,
project_id=None,
resource_id=probe['id'],
timestamp=datetime.datetime.fromtimestamp(
probe['timestamp']).isoformat(),
resource_metadata={}
)
class PowerPollster(_Base):
"""Measures power consumption."""
@plugin.check_keystone('energy')
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
"""Returns all samples."""
for probe in self._iter_probes(manager.keystone, cache):
yield sample.Sample(
name='power',
type=sample.TYPE_GAUGE,
unit='W',
volume=probe['w'],
user_id=None,
project_id=None,
resource_id=probe['id'],
timestamp=datetime.datetime.fromtimestamp(
probe['timestamp']).isoformat(),
resource_metadata={}
)
for endpoint in resources:
for probe in self._iter_probes(manager.keystone, cache, endpoint):
yield sample.Sample(
name='power',
type=sample.TYPE_GAUGE,
unit='W',
volume=probe['w'],
user_id=None,
project_id=None,
resource_id=probe['id'],
timestamp=datetime.datetime.fromtimestamp(
probe['timestamp']).isoformat(),
resource_metadata={}
)

View File

@ -43,7 +43,11 @@ class HardwarePollster(plugin.CentralPollster):
super(HardwarePollster, self).__init__()
self.inspectors = {}
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'tripleo_overcloud_nodes'
def get_samples(self, manager, cache, resources):
"""Return an iterable of Sample instances from polling the resources.
:param manager: The service manager invoking the plugin

View File

@ -44,12 +44,12 @@ cfg.CONF.register_opts(OPTS)
class _Base(plugin.CentralPollster):
@staticmethod
def get_glance_client(ksclient):
endpoint = ksclient.service_catalog.url_for(
service_type='image',
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type)
@property
def default_discovery(self):
return 'endpoint:image'
@staticmethod
def get_glance_client(ksclient, endpoint):
# hard-code v1 glance API version selection while v2 API matures
service_credentials = cfg.CONF.service_credentials
return glanceclient.Client('1', endpoint,
@ -57,8 +57,8 @@ class _Base(plugin.CentralPollster):
cacert=service_credentials.os_cacert,
insecure=service_credentials.insecure)
def _get_images(self, ksclient):
client = self.get_glance_client(ksclient)
def _get_images(self, ksclient, endpoint):
client = self.get_glance_client(ksclient, endpoint)
page_size = cfg.CONF.glance_page_size
kwargs = {}
if page_size > 0:
@ -88,11 +88,12 @@ class _Base(plugin.CentralPollster):
imageIdSet -= set([image.id])
yield image
def _iter_images(self, ksclient, cache):
def _iter_images(self, ksclient, cache, endpoint):
"""Iterate over all images."""
if 'images' not in cache:
cache['images'] = list(self._get_images(ksclient))
return iter(cache['images'])
key = '%s-images' % endpoint
if key not in cache:
cache[key] = list(self._get_images(ksclient, endpoint))
return iter(cache[key])
@staticmethod
def extract_image_metadata(image):
@ -117,34 +118,34 @@ class _Base(plugin.CentralPollster):
class ImagePollster(_Base):
@plugin.check_keystone('image')
def get_samples(self, manager, cache, resources=None):
for image in self._iter_images(manager.keystone, cache):
yield sample.Sample(
name='image',
type=sample.TYPE_GAUGE,
unit='image',
volume=1,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for image in self._iter_images(manager.keystone, cache, endpoint):
yield sample.Sample(
name='image',
type=sample.TYPE_GAUGE,
unit='image',
volume=1,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)
class ImageSizePollster(_Base):
@plugin.check_keystone('image')
def get_samples(self, manager, cache, resources=None):
for image in self._iter_images(manager.keystone, cache):
yield sample.Sample(
name='image.size',
type=sample.TYPE_GAUGE,
unit='B',
volume=image.size,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for image in self._iter_images(manager.keystone, cache, endpoint):
yield sample.Sample(
name='image.size',
type=sample.TYPE_GAUGE,
unit='B',
volume=image.size,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)

View File

@ -30,33 +30,41 @@ LOG = log.getLogger(__name__)
class FloatingIPPollster(plugin.CentralPollster):
def _get_floating_ips(self):
nv = nova_client.Client()
def _get_floating_ips(self, ksclient, endpoint):
nv = nova_client.Client(
auth_token=ksclient.auth_token, bypass_url=endpoint)
return nv.floating_ip_get_all()
def _iter_floating_ips(self, cache):
if 'floating_ips' not in cache:
cache['floating_ips'] = list(self._get_floating_ips())
return iter(cache['floating_ips'])
def _iter_floating_ips(self, ksclient, cache, endpoint):
key = '%s-floating_ips' % endpoint
if key not in cache:
cache[key] = list(self._get_floating_ips(ksclient, endpoint))
return iter(cache[key])
@plugin.check_keystone('network')
def get_samples(self, manager, cache, resources=None):
for ip in self._iter_floating_ips(cache):
LOG.info(_("FLOATING IP USAGE: %s") % ip.ip)
# FIXME (flwang) Now Nova API /os-floating-ips can't provide those
# attributes were used by Ceilometer, such as project id, host.
# In this fix, those attributes usage will be removed temporarily.
# And they will be back after fix the Nova bug 1174802.
yield sample.Sample(
name='ip.floating',
type=sample.TYPE_GAUGE,
unit='ip',
volume=1,
user_id=None,
project_id=None,
resource_id=ip.id,
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={
'address': ip.ip,
'pool': ip.pool
})
@property
def default_discovery(self):
return 'endpoint:compute'
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for ip in self._iter_floating_ips(manager.keystone, cache,
endpoint):
LOG.info(_("FLOATING IP USAGE: %s") % ip.ip)
# FIXME (flwang) Now Nova API /os-floating-ips can't provide
# those attributes were used by Ceilometer, such as project
# id, host. In this fix, those attributes usage will be
# removed temporarily. And they will be back after fix the
# Nova bug 1174802.
yield sample.Sample(
name='ip.floating',
type=sample.TYPE_GAUGE,
unit='ip',
volume=1,
user_id=None,
project_id=None,
resource_id=ip.id,
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={
'address': ip.ip,
'pool': ip.pool
})

View File

@ -34,7 +34,11 @@ class FirewallPollster(base.BaseServicesPollster):
'firewall_policy_id',
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'fw_services'
def get_samples(self, manager, cache, resources):
resources = resources or []
for fw in resources:
@ -71,7 +75,11 @@ class FirewallPolicyPollster(base.BaseServicesPollster):
'audited',
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'fw_policy'
def get_samples(self, manager, cache, resources):
resources = resources or []
for fw in resources:

View File

@ -21,7 +21,6 @@ import collections
from oslo.utils import timeutils
import six
from ceilometer.central import plugin
from ceilometer.network.services import base
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
@ -50,7 +49,11 @@ class LBPoolPollster(base.BaseServicesPollster):
'vip_id'
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'lb_pools'
def get_samples(self, manager, cache, resources):
resources = resources or []
for pool in resources:
@ -94,7 +97,11 @@ class LBVipPollster(base.BaseServicesPollster):
'session_persistence',
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'lb_vips'
def get_samples(self, manager, cache, resources):
resources = resources or []
for vip in resources:
@ -132,7 +139,11 @@ class LBMemberPollster(base.BaseServicesPollster):
'weight',
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'lb_members'
def get_samples(self, manager, cache, resources):
resources = resources or []
for member in resources:
@ -167,7 +178,11 @@ class LBHealthMonitorPollster(base.BaseServicesPollster):
'type'
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'lb_health_probes'
def get_samples(self, manager, cache, resources):
for probe in resources:
LOG.debug("Load Balancer Health probe : %s" % probe)
yield sample.Sample(
@ -226,13 +241,16 @@ class _LBStatsPollster(base.BaseServicesPollster):
)
return i_cache[pool_id]
@property
def default_discovery(self):
return 'lb_pools'
@abc.abstractmethod
def _get_sample(pool, c_data):
"""Return one Sample."""
@plugin.check_keystone('network', 'nc')
def get_samples(self, manager, cache, resources=None):
for pool in self._get_lb_pools():
def get_samples(self, manager, cache, resources):
for pool in resources:
try:
c_data = self._populate_stats_cache(pool['id'], cache)
yield self._get_sample(pool, c_data)

View File

@ -35,7 +35,11 @@ class VPNServicesPollster(base.BaseServicesPollster):
'router_id'
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'vpn_services'
def get_samples(self, manager, cache, resources):
resources = resources or []
for vpn in resources:
@ -80,7 +84,11 @@ class IPSecConnectionsPollster(base.BaseServicesPollster):
'tenant_id'
]
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return 'ipsec_connections'
def get_samples(self, manager, cache, resources):
resources = resources or []
for conn in resources:

View File

@ -30,6 +30,13 @@ class _Base(plugin.CentralPollster):
NAMESPACE = 'network.statistics.drivers'
drivers = {}
@property
def default_discovery(self):
# this signifies that the pollster gets its resources from
# elsewhere, in this case they're manually listed in the
# pipeline configuration
return None
@abc.abstractproperty
def meter_name(self):
"""Return a Meter Name."""
@ -63,7 +70,7 @@ class _Base(plugin.CentralPollster):
scheme).driver()
return _Base.drivers[scheme]
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
resources = resources or []
for resource in resources:
parse_url, params = self._parse_my_resource(resource)

View File

@ -48,7 +48,7 @@ def logged(func):
class Client(object):
"""A client which gets information via python-novaclient."""
def __init__(self):
def __init__(self, bypass_url=None, auth_token=None):
"""Initialize a nova client object."""
conf = cfg.CONF.service_credentials
tenant = conf.os_tenant_id or conf.os_tenant_name
@ -57,8 +57,10 @@ class Client(object):
api_key=conf.os_password,
project_id=tenant,
auth_url=conf.os_auth_url,
auth_token=auth_token,
region_name=conf.os_region_name,
endpoint_type=conf.os_endpoint_type,
bypass_url=bypass_url,
cacert=conf.os_cacert,
insecure=conf.insecure,
http_log_debug=cfg.CONF.nova_http_log_debug,

View File

@ -19,14 +19,12 @@
from __future__ import absolute_import
from keystoneclient import exceptions
from oslo.config import cfg
from oslo.utils import timeutils
import six.moves.urllib.parse as urlparse
from swiftclient import client as swift
from ceilometer.central import plugin
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer import sample
@ -48,28 +46,27 @@ class _Base(plugin.CentralPollster):
CACHE_KEY_TENANT = 'tenants'
METHOD = 'head'
@property
def default_discovery(self):
return 'endpoint:object-store'
@property
def CACHE_KEY_METHOD(self):
return 'swift.%s_account' % self.METHOD
def _iter_accounts(self, ksclient, cache):
if self.CACHE_KEY_TENANT not in cache:
cache[self.CACHE_KEY_TENANT] = ksclient.tenants.list()
if self.CACHE_KEY_METHOD not in cache:
cache[self.CACHE_KEY_METHOD] = list(self._get_account_info(
ksclient, cache))
return iter(cache[self.CACHE_KEY_METHOD])
def _iter_accounts(self, ksclient, cache, endpoint):
key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT)
key_method = '%s-%s' % (endpoint, self.CACHE_KEY_METHOD)
if key_tenant not in cache:
cache[key_tenant] = ksclient.tenants.list()
if key_method not in cache:
cache[key_method] = list(self._get_account_info(
ksclient, cache, endpoint))
return iter(cache[key_method])
def _get_account_info(self, ksclient, cache):
try:
endpoint = ksclient.service_catalog.url_for(
service_type='object-store',
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type)
except exceptions.EndpointNotFound:
LOG.debug(_("Swift endpoint not found"))
raise StopIteration()
for t in cache[self.CACHE_KEY_TENANT]:
def _get_account_info(self, ksclient, cache, endpoint):
key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT)
for t in cache[key_tenant]:
api_method = '%s_account' % self.METHOD
yield (t.id, getattr(swift, api_method)
(self._neaten_url(endpoint, t.id),
@ -84,56 +81,59 @@ class _Base(plugin.CentralPollster):
class ObjectsPollster(_Base):
"""Iterate over all accounts, using keystone."""
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample(
name='storage.objects',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-object-count']),
unit='object',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for tenant, account in self._iter_accounts(manager.keystone,
cache, endpoint):
yield sample.Sample(
name='storage.objects',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-object-count']),
unit='object',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ObjectsSizePollster(_Base):
"""Iterate over all accounts, using keystone."""
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample(
name='storage.objects.size',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-bytes-used']),
unit='B',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for tenant, account in self._iter_accounts(manager.keystone,
cache, endpoint):
yield sample.Sample(
name='storage.objects.size',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-bytes-used']),
unit='B',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ObjectsContainersPollster(_Base):
"""Iterate over all accounts, using keystone."""
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for tenant, account in self._iter_accounts(manager.keystone, cache):
yield sample.Sample(
name='storage.objects.containers',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-container-count']),
unit='container',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for tenant, account in self._iter_accounts(manager.keystone,
cache, endpoint):
yield sample.Sample(
name='storage.objects.containers',
type=sample.TYPE_GAUGE,
volume=int(account['x-account-container-count']),
unit='container',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ContainersObjectsPollster(_Base):
@ -141,22 +141,23 @@ class ContainersObjectsPollster(_Base):
METHOD = 'get'
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for project, account in self._iter_accounts(manager.keystone, cache):
containers_info = account[1]
for container in containers_info:
yield sample.Sample(
name='storage.containers.objects',
type=sample.TYPE_GAUGE,
volume=int(container['count']),
unit='object',
user_id=None,
project_id=project,
resource_id=project + '/' + container['name'],
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for project, account in self._iter_accounts(manager.keystone,
cache, endpoint):
containers_info = account[1]
for container in containers_info:
yield sample.Sample(
name='storage.containers.objects',
type=sample.TYPE_GAUGE,
volume=int(container['count']),
unit='object',
user_id=None,
project_id=project,
resource_id=project + '/' + container['name'],
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ContainersSizePollster(_Base):
@ -164,19 +165,20 @@ class ContainersSizePollster(_Base):
METHOD = 'get'
@plugin.check_keystone('object-store')
def get_samples(self, manager, cache, resources=None):
for project, account in self._iter_accounts(manager.keystone, cache):
containers_info = account[1]
for container in containers_info:
yield sample.Sample(
name='storage.containers.objects.size',
type=sample.TYPE_GAUGE,
volume=int(container['bytes']),
unit='B',
user_id=None,
project_id=project,
resource_id=project + '/' + container['name'],
timestamp=timeutils.isotime(),
resource_metadata=None,
)
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for project, account in self._iter_accounts(manager.keystone,
cache, endpoint):
containers_info = account[1]
for container in containers_info:
yield sample.Sample(
name='storage.containers.objects.size',
type=sample.TYPE_GAUGE,
volume=int(container['bytes']),
unit='B',
user_id=None,
project_id=project,
resource_id=project + '/' + container['name'],
timestamp=timeutils.isotime(),
resource_metadata=None,
)

View File

@ -128,8 +128,23 @@ class NotificationBase(PluginBase):
class PollsterBase(PluginBase):
"""Base class for plugins that support the polling API."""
@abc.abstractproperty
def default_discovery(self):
"""Default discovery to use for this pollster.
There are three ways a pollster can get a list of resources to poll,
listed here in ascending order of precedence:
1. from the per-agent discovery,
2. from the per-pollster discovery (defined here)
3. from the per-pipeline configured discovery and/or per-pipeline
configured static resources.
If a pollster should only get resources from #1 or #3, this property
should be set to None.
"""
@abc.abstractmethod
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
"""Return a sequence of Counter instances from polling the resources.
:param manager: The service manager class invoking the plugin.
@ -137,9 +152,10 @@ class PollsterBase(PluginBase):
between themselves when recomputing it would be
expensive (e.g., asking another service for a
list of objects).
:param resources: A list of the endpoints the pollster will get data
:param resources: A list of resources the pollster will get data
from. It's up to the specific pollster to decide
how to use it.
how to use it. It is usually supplied by a discovery,
see ``default_discovery`` for more information.
"""

View File

@ -71,8 +71,13 @@ default_test_data = TestSample(
class TestPollster(plugin.PollsterBase):
test_data = default_test_data
discovery = None
def get_samples(self, manager, cache, resources=None):
@property
def default_discovery(self):
return self.discovery
def get_samples(self, manager, cache, resources):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
@ -82,7 +87,7 @@ class TestPollster(plugin.PollsterBase):
class TestPollsterException(TestPollster):
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
@ -257,9 +262,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def tearDown(self):
self.Pollster.samples = []
self.Pollster.discovery = []
self.PollsterAnother.samples = []
self.PollsterAnother.discovery = []
self.PollsterException.samples = []
self.PollsterException.discovery = []
self.PollsterExceptionAnother.samples = []
self.PollsterExceptionAnother.discovery = []
self.Pollster.resources = []
self.PollsterAnother.resources = []
self.PollsterException.resources = []
@ -442,6 +451,23 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_agent_discovery_overridden_by_per_pollster_discovery(self):
discovered_resources = ['discovered_1', 'discovered_2']
self.mgr.discovery_manager = self.create_discovery_manager()
self.Pollster.discovery = 'testdiscovery'
self.mgr.default_discovery = ['testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = []
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(set(self.Discovery.resources),
set(self.Pollster.resources))
def test_per_agent_discovery_overridden_by_per_pipeline_discovery(self):
discovered_resources = ['discovered_1', 'discovered_2']
self.mgr.discovery_manager = self.create_discovery_manager()
@ -458,6 +484,57 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(set(self.DiscoveryAnother.resources),
set(self.Pollster.resources))
def _do_test_per_pollster_discovery(self, discovered_resources,
static_resources):
self.Pollster.discovery = 'testdiscovery'
self.mgr.discovery_manager = self.create_discovery_manager()
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
if static_resources:
# just so we can test that static + pre_pipeline amalgamated
# override per_pollster
self.pipeline_cfg[0]['discovery'] = ['testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = static_resources
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
if static_resources:
self.assertEqual(set(static_resources +
self.DiscoveryAnother.resources),
set(self.Pollster.resources))
else:
self.assertEqual(set(self.Discovery.resources),
set(self.Pollster.resources))
def test_per_pollster_discovery(self):
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_pollster_discovery_overridden_by_per_pipeline_discovery(self):
# ensure static+per_source_discovery overrides per_pollster_discovery
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_per_pollster_discovery_caching(self):
# ensure single discovery associated with multiple pollsters
# only called once per polling cycle
discovered_resources = ['discovered_1', 'discovered_2']
self.Pollster.discovery = 'testdiscovery'
self.PollsterAnother.discovery = 'testdiscovery'
self.mgr.discovery_manager = self.create_discovery_manager()
self.Discovery.resources = discovered_resources
self.pipeline_cfg[0]['counters'].append('testanother')
self.pipeline_cfg[0]['resources'] = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(1, len(self.Discovery.params))
self.assertEqual(discovered_resources, self.Pollster.resources)
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
def _do_test_per_pipeline_discovery(self,
discovered_resources,
static_resources):

View File

@ -38,7 +38,7 @@ class TestManager(base.BaseTestCase):
class TestPollsterKeystone(agentbase.TestPollster):
@plugin.check_keystone
def get_samples(self, manager, cache, resources=None):
def get_samples(self, manager, cache, resources):
func = super(TestPollsterKeystone, self).get_samples
return func(manager=manager,
cache=cache,

View File

@ -47,9 +47,12 @@ PROBE_DICT = {
}
}
ENDPOINT = 'end://point'
class TestManager(manager.AgentManager):
@mock.patch('keystoneclient.v2_0.client', mock.MagicMock())
def __init__(self):
super(TestManager, self).__init__()
self.keystone = mock.Mock()
@ -64,14 +67,15 @@ class TestKwapi(base.BaseTestCase):
self.manager = TestManager()
@staticmethod
def fake_get_kwapi_client(ksclient):
def fake_get_kwapi_client(ksclient, endpoint):
raise exceptions.EndpointNotFound("fake keystone exception")
def test_endpoint_not_exist(self):
with mockpatch.PatchObject(kwapi._Base, 'get_kwapi_client',
side_effect=self.fake_get_kwapi_client):
pollster = kwapi.EnergyPollster()
samples = list(pollster.get_samples(self.manager, {}))
samples = list(pollster.get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(0, len(samples))
@ -87,7 +91,7 @@ class TestEnergyPollster(base.BaseTestCase):
kwapi._Base, '_iter_probes', side_effect=self.fake_iter_probes))
@staticmethod
def fake_iter_probes(ksclient, cache):
def fake_iter_probes(ksclient, cache, endpoint):
probes = PROBE_DICT['probes']
for key, value in six.iteritems(probes):
probe_dict = value
@ -99,6 +103,7 @@ class TestEnergyPollster(base.BaseTestCase):
samples = list(kwapi.EnergyPollster().get_samples(
self.manager,
cache,
[ENDPOINT]
))
self.assertEqual(3, len(samples))
samples_by_name = dict((s.resource_id, s) for s in samples)
@ -126,13 +131,15 @@ class TestEnergyPollsterCache(base.BaseTestCase):
probe = {'id': 'A'}
probe.update(PROBE_DICT['probes']['A'])
cache = {
kwapi.EnergyPollster.CACHE_KEY_PROBE: [probe],
'%s-%s' % (ENDPOINT, kwapi.EnergyPollster.CACHE_KEY_PROBE):
[probe],
}
self.manager.keystone = mock.Mock()
pollster = kwapi.EnergyPollster()
with mock.patch.object(pollster, '_get_probes') as do_not_call:
do_not_call.side_effect = AssertionError('should not be called')
samples = list(pollster.get_samples(self.manager, cache))
samples = list(pollster.get_samples(self.manager, cache,
[ENDPOINT]))
self.assertEqual(1, len(samples))
@ -147,7 +154,7 @@ class TestPowerPollster(base.BaseTestCase):
kwapi._Base, '_iter_probes', side_effect=self.fake_iter_probes))
@staticmethod
def fake_iter_probes(ksclient, cache):
def fake_iter_probes(ksclient, cache, endpoint):
probes = PROBE_DICT['probes']
for key, value in six.iteritems(probes):
probe_dict = value
@ -159,6 +166,7 @@ class TestPowerPollster(base.BaseTestCase):
samples = list(kwapi.PowerPollster().get_samples(
self.manager,
cache,
[ENDPOINT]
))
self.assertEqual(3, len(samples))
samples_by_name = dict((s.resource_id, s) for s in samples)
@ -183,11 +191,12 @@ class TestPowerPollsterCache(base.BaseTestCase):
probe = {'id': 'A'}
probe.update(PROBE_DICT['probes']['A'])
cache = {
kwapi.PowerPollster.CACHE_KEY_PROBE: [probe],
'%s-%s' % (ENDPOINT, kwapi.PowerPollster.CACHE_KEY_PROBE): [probe],
}
self.manager.keystone = mock.Mock()
pollster = kwapi.PowerPollster()
with mock.patch.object(pollster, '_get_probes') as do_not_call:
do_not_call.side_effect = AssertionError('should not be called')
samples = list(pollster.get_samples(self.manager, cache))
samples = list(pollster.get_samples(self.manager, cache,
[ENDPOINT]))
self.assertEqual(1, len(samples))

View File

@ -104,6 +104,8 @@ IMAGE_LIST = [
u'size': 2048}),
]
ENDPOINT = 'end://point'
class _BaseObject(object):
pass
@ -125,7 +127,7 @@ class TestManager(manager.AgentManager):
class TestImagePollsterPageSize(base.BaseTestCase):
def fake_get_glance_client(self, ksclient):
def fake_get_glance_client(self, ksclient, endpoint):
glanceclient = FakeGlanceClient()
glanceclient.images.list = mock.MagicMock(return_value=IMAGE_LIST)
return glanceclient
@ -143,7 +145,7 @@ class TestImagePollsterPageSize(base.BaseTestCase):
def _do_test_iter_images(self, page_size=0):
self.CONF.set_override("glance_page_size", page_size)
images = list(glance.ImagePollster().
_iter_images(self.manager.keystone, {}))
_iter_images(self.manager.keystone, {}, ENDPOINT))
kwargs = {}
if page_size > 0:
kwargs['page_size'] = page_size
@ -163,7 +165,7 @@ class TestImagePollsterPageSize(base.BaseTestCase):
class TestImagePollster(base.BaseTestCase):
def fake_get_glance_client(self, ksclient):
def fake_get_glance_client(self, ksclient, endpoint):
glanceclient = _BaseObject()
setattr(glanceclient, "images", _BaseObject())
setattr(glanceclient.images,
@ -183,26 +185,29 @@ class TestImagePollster(base.BaseTestCase):
# Tests whether the iter_images method returns a unique image
# list when there is nothing in the cache
images = list(glance.ImagePollster().
_iter_images(self.manager.keystone, {}))
_iter_images(self.manager.keystone, {}, ENDPOINT))
self.assertEqual(len(set(image.id for image in images)), len(images))
def test_iter_images_cached(self):
# Tests whether the iter_images method returns the values from
# the cache
cache = {'images': []}
cache = {'%s-images' % ENDPOINT: []}
images = list(glance.ImagePollster().
_iter_images(self.manager.keystone, cache))
_iter_images(self.manager.keystone, cache,
ENDPOINT))
self.assertEqual([], images)
def test_image(self):
samples = list(glance.ImagePollster().get_samples(self.manager, {}))
samples = list(glance.ImagePollster().get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(3, len(samples))
for sample in samples:
self.assertEqual(1, sample.volume)
def test_image_size(self):
samples = list(glance.ImageSizePollster().get_samples(self.manager,
{}))
{},
[ENDPOINT]))
self.assertEqual(3, len(samples))
for image in IMAGE_LIST:
self.assertTrue(
@ -210,10 +215,12 @@ class TestImagePollster(base.BaseTestCase):
samples)))
def test_image_get_sample_names(self):
samples = list(glance.ImagePollster().get_samples(self.manager, {}))
samples = list(glance.ImagePollster().get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(set(['image']), set([s.name for s in samples]))
def test_image_size_get_sample_names(self):
samples = list(glance.ImageSizePollster().get_samples(self.manager,
{}))
{},
[ENDPOINT]))
self.assertEqual(set(['image.size']), set([s.name for s in samples]))

View File

@ -469,7 +469,8 @@ class TestLBStatsPollster(_BaseTestLBPollster):
pollster = factory()
cache = {}
samples = list(pollster.get_samples(self.manager, cache))
samples = list(pollster.get_samples(self.manager, cache,
self.fake_get_pools()))
self.assertEqual(1, len(samples))
self.assertIsNotNone(samples)
self.assertIn('lbstats', cache)

View File

@ -70,7 +70,7 @@ class TestFloatingIPPollster(base.BaseTestCase):
# assert False, 'Should have seen an error'
def test_get_samples_not_empty(self):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {}, ['e']))
self.assertEqual(3, len(samples))
# It's necessary to verify all the attributes extracted by Nova
# API /os-floating-ips to make sure they're available and correct.
@ -87,10 +87,10 @@ class TestFloatingIPPollster(base.BaseTestCase):
self.assertEqual("public", samples[2].resource_metadata["pool"])
def test_get_meter_names(self):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {}, ['e']))
self.assertEqual(set(['ip.floating']), set([s.name for s in samples]))
def test_get_samples_cached(self):
cache = {'floating_ips': self.fake_get_ips()[:2]}
samples = list(self.pollster.get_samples(self.manager, cache))
cache = {'e-floating_ips': self.fake_get_ips()[:2]}
samples = list(self.pollster.get_samples(self.manager, cache, ['e']))
self.assertEqual(2, len(samples))

View File

@ -53,6 +53,8 @@ GET_ACCOUNTS = [('tenant-002', ({'x-account-object-count': 10,
'x-account-container-count': 0,
}, [])), ]
ENDPOINT = 'end://point'
class TestManager(manager.AgentManager):
@ -83,7 +85,7 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
def fake_ks_service_catalog_url_for(*args, **kwargs):
raise exceptions.EndpointNotFound("Fake keystone exception")
def fake_iter_accounts(self, ksclient, cache):
def fake_iter_accounts(self, ksclient, cache, endpoint):
for i in self.ACCOUNTS:
yield i
@ -102,10 +104,13 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
cache = {}
with mockpatch.PatchObject(self.factory, '_get_account_info',
return_value=[]):
data = list(self.pollster._iter_accounts(mock.Mock(), cache))
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
ENDPOINT))
self.assertTrue(self.pollster.CACHE_KEY_TENANT in cache)
self.assertTrue(self.pollster.CACHE_KEY_METHOD in cache)
self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT)
in cache)
self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_METHOD)
in cache)
self.assertEqual([], data)
def test_iter_accounts_tenants_cached(self):
@ -119,15 +124,17 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
api_method = '%s_account' % self.pollster.METHOD
with mockpatch.PatchObject(swift_client, api_method, new=ksclient):
key = '%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT)
with mockpatch.PatchObject(self.factory, '_neaten_url'):
Tenant = collections.namedtuple('Tenant', 'id')
cache = {
self.pollster.CACHE_KEY_TENANT: [
key: [
Tenant(self.ACCOUNTS[0][0])
],
}
data = list(self.pollster._iter_accounts(mock.Mock(), cache))
self.assertTrue(self.pollster.CACHE_KEY_METHOD in cache)
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
ENDPOINT))
self.assertTrue(key in cache)
self.assertEqual(self.ACCOUNTS[0][0], data[0][0])
def test_neaten_url(self):
@ -150,14 +157,16 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
def test_metering(self):
with mockpatch.PatchObject(self.factory, '_iter_accounts',
side_effect=self.fake_iter_accounts):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(2, len(samples))
def test_get_meter_names(self):
with mockpatch.PatchObject(self.factory, '_iter_accounts',
side_effect=self.fake_iter_accounts):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(set([samples[0].name]),
set([s.name for s in samples]))
@ -166,6 +175,7 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
with mockpatch.PatchObject(
self.manager.keystone.service_catalog, 'url_for',
side_effect=self.fake_ks_service_catalog_url_for):
samples = list(self.pollster.get_samples(self.manager, {}))
samples = list(self.pollster.get_samples(self.manager, {},
[ENDPOINT]))
self.assertEqual(0, len(samples))

View File

@ -30,70 +30,6 @@ sources:
- "network.outgoing.packets"
sinks:
- network_sink
- name: lb_pool_source
interval: 600
meters:
- "network.services.lb.pool"
discovery:
- "lb_pools"
sinks:
- meter_sink
- name: lb_health_monitor_source
interval: 600
meters:
- "network.services.lb.health_monitor"
discovery:
- "lb_health_probes"
sinks:
- meter_sink
- name: lb_vip_source
interval: 600
meters:
- "network.services.lb.vip"
discovery:
- "lb_vips"
sinks:
- meter_sink
- name: lb_member_source
interval: 600
meters:
- "network.services.lb.member"
discovery:
- "lb_members"
sinks:
- meter_sink
- name: vpn_services_source
interval: 600
meters:
- "network.services.vpn"
discovery:
- "vpn_services"
sinks:
- "meter_sink"
- name: vpn_conns_source
interval: 600
meters:
- "network.services.vpn.connections"
discovery:
- "ipsec_connections"
sinks:
- "meter_sink"
- name: firewall_source
interval: 600
meters:
- "network.services.firewall"
discovery:
- "fw_services"
sinks:
- "meter_sink"
- name: fw_policy_source
interval: 600
meters:
- "network.services.firewall.policy"
discovery:
- "fw_policy"
sinks:
- "meter_sink"
sinks:
- name: meter_sink
transformers:

View File

@ -80,6 +80,7 @@ ceilometer.notification =
ceilometer.discover =
local_instances = ceilometer.compute.discovery:InstanceDiscovery
endpoint = ceilometer.central.discovery:EndpointDiscovery
lb_pools = ceilometer.network.services.discovery:LBPoolsDiscovery
lb_vips = ceilometer.network.services.discovery:LBVipsDiscovery
lb_members = ceilometer.network.services.discovery:LBMembersDiscovery