Partition swift pollster resources by tenant
Since the latest discovery change, Swift pollsters on different agents only decide which who should poll. Most of the time there's only one endpoint, so only one agent gets to do any work. This patch fixes this by introducing a new TenantDiscovery, which enables Swift (and other in the future) pollsters to partition the set of keystone tenants among them and then each poll the samples for their assigned subset of tenants. Closes-Bug: #1365351 Change-Id: Iba4a3b91d5ee978213fdd6fcf8bb62b315324a52
This commit is contained in:
parent
b0116dbb22
commit
87afc5ada1
@ -27,6 +27,13 @@ cfg.CONF.import_group('service_credentials', 'ceilometer.service')
|
||||
|
||||
|
||||
class EndpointDiscovery(plugin.DiscoveryBase):
|
||||
"""Discovery that supplies service endpoints.
|
||||
|
||||
This discovery should be used when the relevant APIs are not well suited
|
||||
to dividing the pollster's work into smaller pieces than a whole service
|
||||
at once. Example of this is the floating_ip pollster which calls
|
||||
nova.floating_ips.list() and therefore gets all floating IPs at once.
|
||||
"""
|
||||
|
||||
def discover(self, manager, param=None):
|
||||
if not param:
|
||||
@ -40,3 +47,16 @@ class EndpointDiscovery(plugin.DiscoveryBase):
|
||||
return []
|
||||
else:
|
||||
return endpoints
|
||||
|
||||
|
||||
class TenantDiscovery(plugin.DiscoveryBase):
|
||||
"""Discovery that supplies keystone tenants.
|
||||
|
||||
This discovery should be used when the pollster's work can't be divided
|
||||
into smaller pieces than per-tenant. Example of this is the Swift
|
||||
pollster, which polls account details and does so per-tenant.
|
||||
"""
|
||||
|
||||
def discover(self, manager, param=None):
|
||||
tenants = manager.keystone.tenants.list()
|
||||
return tenants or []
|
||||
|
@ -19,12 +19,14 @@
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from keystoneclient import exceptions
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import timeutils
|
||||
import six.moves.urllib.parse as urlparse
|
||||
from swiftclient import client as swift
|
||||
|
||||
from ceilometer.central import plugin
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer import sample
|
||||
|
||||
@ -43,30 +45,44 @@ cfg.CONF.register_opts(OPTS)
|
||||
|
||||
class _Base(plugin.CentralPollster):
|
||||
|
||||
CACHE_KEY_TENANT = 'tenants'
|
||||
METHOD = 'head'
|
||||
_ENDPOINT = None
|
||||
|
||||
@property
|
||||
def default_discovery(self):
|
||||
return 'endpoint:object-store'
|
||||
return 'tenant'
|
||||
|
||||
@property
|
||||
def CACHE_KEY_METHOD(self):
|
||||
return 'swift.%s_account' % self.METHOD
|
||||
|
||||
def _iter_accounts(self, ksclient, cache, endpoint):
|
||||
key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT)
|
||||
key_method = '%s-%s' % (endpoint, self.CACHE_KEY_METHOD)
|
||||
if key_tenant not in cache:
|
||||
cache[key_tenant] = ksclient.tenants.list()
|
||||
if key_method not in cache:
|
||||
cache[key_method] = list(self._get_account_info(
|
||||
ksclient, cache, endpoint))
|
||||
return iter(cache[key_method])
|
||||
@staticmethod
|
||||
def _get_endpoint(ksclient):
|
||||
# we store the endpoint as a base class attribute, so keystone is
|
||||
# only ever called once
|
||||
if _Base._ENDPOINT is None:
|
||||
try:
|
||||
endpoint_type = cfg.CONF.service_credentials.os_endpoint_type
|
||||
endpoint = ksclient.service_catalog.url_for(
|
||||
service_type='object-store',
|
||||
endpoint_type=endpoint_type)
|
||||
_Base._ENDPOINT = endpoint
|
||||
except exceptions.EndpointNotFound:
|
||||
LOG.debug(_("Swift endpoint not found"))
|
||||
return _Base._ENDPOINT
|
||||
|
||||
def _get_account_info(self, ksclient, cache, endpoint):
|
||||
key_tenant = '%s-%s' % (endpoint, self.CACHE_KEY_TENANT)
|
||||
for t in cache[key_tenant]:
|
||||
def _iter_accounts(self, ksclient, cache, tenants):
|
||||
if self.CACHE_KEY_METHOD not in cache:
|
||||
cache[self.CACHE_KEY_METHOD] = list(self._get_account_info(
|
||||
ksclient, tenants))
|
||||
return iter(cache[self.CACHE_KEY_METHOD])
|
||||
|
||||
def _get_account_info(self, ksclient, tenants):
|
||||
endpoint = self._get_endpoint(ksclient)
|
||||
if not endpoint:
|
||||
raise StopIteration()
|
||||
|
||||
for t in tenants:
|
||||
api_method = '%s_account' % self.METHOD
|
||||
yield (t.id, getattr(swift, api_method)
|
||||
(self._neaten_url(endpoint, t.id),
|
||||
@ -82,58 +98,58 @@ class _Base(plugin.CentralPollster):
|
||||
class ObjectsPollster(_Base):
|
||||
"""Iterate over all accounts, using keystone."""
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
yield sample.Sample(
|
||||
name='storage.objects',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(account['x-account-object-count']),
|
||||
unit='object',
|
||||
user_id=None,
|
||||
project_id=tenant,
|
||||
resource_id=tenant,
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, tenants):
|
||||
yield sample.Sample(
|
||||
name='storage.objects',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(account['x-account-object-count']),
|
||||
unit='object',
|
||||
user_id=None,
|
||||
project_id=tenant,
|
||||
resource_id=tenant,
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
|
||||
|
||||
class ObjectsSizePollster(_Base):
|
||||
"""Iterate over all accounts, using keystone."""
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
yield sample.Sample(
|
||||
name='storage.objects.size',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(account['x-account-bytes-used']),
|
||||
unit='B',
|
||||
user_id=None,
|
||||
project_id=tenant,
|
||||
resource_id=tenant,
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, tenants):
|
||||
yield sample.Sample(
|
||||
name='storage.objects.size',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(account['x-account-bytes-used']),
|
||||
unit='B',
|
||||
user_id=None,
|
||||
project_id=tenant,
|
||||
resource_id=tenant,
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
|
||||
|
||||
class ObjectsContainersPollster(_Base):
|
||||
"""Iterate over all accounts, using keystone."""
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
yield sample.Sample(
|
||||
name='storage.objects.containers',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(account['x-account-container-count']),
|
||||
unit='container',
|
||||
user_id=None,
|
||||
project_id=tenant,
|
||||
resource_id=tenant,
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, tenants):
|
||||
yield sample.Sample(
|
||||
name='storage.objects.containers',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(account['x-account-container-count']),
|
||||
unit='container',
|
||||
user_id=None,
|
||||
project_id=tenant,
|
||||
resource_id=tenant,
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
|
||||
|
||||
class ContainersObjectsPollster(_Base):
|
||||
@ -142,22 +158,22 @@ class ContainersObjectsPollster(_Base):
|
||||
METHOD = 'get'
|
||||
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
for project, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
containers_info = account[1]
|
||||
for container in containers_info:
|
||||
yield sample.Sample(
|
||||
name='storage.containers.objects',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(container['count']),
|
||||
unit='object',
|
||||
user_id=None,
|
||||
project_id=project,
|
||||
resource_id=project + '/' + container['name'],
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, tenants):
|
||||
containers_info = account[1]
|
||||
for container in containers_info:
|
||||
yield sample.Sample(
|
||||
name='storage.containers.objects',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(container['count']),
|
||||
unit='object',
|
||||
user_id=None,
|
||||
project_id=tenant,
|
||||
resource_id=tenant + '/' + container['name'],
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
|
||||
|
||||
class ContainersSizePollster(_Base):
|
||||
@ -166,19 +182,19 @@ class ContainersSizePollster(_Base):
|
||||
METHOD = 'get'
|
||||
|
||||
def get_samples(self, manager, cache, resources):
|
||||
for endpoint in resources:
|
||||
for project, account in self._iter_accounts(manager.keystone,
|
||||
cache, endpoint):
|
||||
containers_info = account[1]
|
||||
for container in containers_info:
|
||||
yield sample.Sample(
|
||||
name='storage.containers.objects.size',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(container['bytes']),
|
||||
unit='B',
|
||||
user_id=None,
|
||||
project_id=project,
|
||||
resource_id=project + '/' + container['name'],
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
tenants = resources
|
||||
for tenant, account in self._iter_accounts(manager.keystone,
|
||||
cache, tenants):
|
||||
containers_info = account[1]
|
||||
for container in containers_info:
|
||||
yield sample.Sample(
|
||||
name='storage.containers.objects.size',
|
||||
type=sample.TYPE_GAUGE,
|
||||
volume=int(container['bytes']),
|
||||
unit='B',
|
||||
user_id=None,
|
||||
project_id=tenant,
|
||||
resource_id=tenant + '/' + container['name'],
|
||||
timestamp=timeutils.isotime(),
|
||||
resource_metadata=None,
|
||||
)
|
||||
|
@ -166,6 +166,16 @@ class DiscoveryBase(object):
|
||||
def discover(self, manager, param=None):
|
||||
"""Discover resources to monitor.
|
||||
|
||||
The most fine-grained discovery should be preferred, so the work is
|
||||
the most evenly distributed among multiple agents (if they exist).
|
||||
|
||||
For example:
|
||||
if the pollster can separately poll individual resources, it should
|
||||
have its own discovery implementation to discover those resources. If
|
||||
it can only poll per-tenant, then the `TenantDiscovery` should be
|
||||
used. If even that is not possible, use `EndpointDiscovery` (see
|
||||
their respective docstrings).
|
||||
|
||||
:param manager: The service manager class invoking the plugin.
|
||||
:param param: an optional parameter to guide the discovery
|
||||
"""
|
||||
|
@ -35,9 +35,13 @@ HEAD_ACCOUNTS = [('tenant-000', {'x-account-object-count': 12,
|
||||
('tenant-001', {'x-account-object-count': 34,
|
||||
'x-account-bytes-used': 9898989898,
|
||||
'x-account-container-count': 17,
|
||||
})]
|
||||
}),
|
||||
('tenant-002-ignored', {'x-account-object-count': 34,
|
||||
'x-account-bytes-used': 9898989898,
|
||||
'x-account-container-count': 17,
|
||||
})]
|
||||
|
||||
GET_ACCOUNTS = [('tenant-002', ({'x-account-object-count': 10,
|
||||
GET_ACCOUNTS = [('tenant-000', ({'x-account-object-count': 10,
|
||||
'x-account-bytes-used': 123123,
|
||||
'x-account-container-count': 2,
|
||||
},
|
||||
@ -48,12 +52,17 @@ GET_ACCOUNTS = [('tenant-002', ({'x-account-object-count': 10,
|
||||
'bytes': 0,
|
||||
'name': 'new_container'
|
||||
}])),
|
||||
('tenant-003', ({'x-account-object-count': 0,
|
||||
('tenant-001', ({'x-account-object-count': 0,
|
||||
'x-account-bytes-used': 0,
|
||||
'x-account-container-count': 0,
|
||||
}, [])), ]
|
||||
}, [])),
|
||||
('tenant-002-ignored', ({'x-account-object-count': 0,
|
||||
'x-account-bytes-used': 0,
|
||||
'x-account-container-count': 0,
|
||||
}, []))]
|
||||
|
||||
ENDPOINT = 'end://point'
|
||||
Tenant = collections.namedtuple('Tenant', 'id')
|
||||
ASSIGNED_TENANTS = [Tenant('tenant-000'), Tenant('tenant-001')]
|
||||
|
||||
|
||||
class TestManager(manager.AgentManager):
|
||||
@ -85,9 +94,11 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
|
||||
def fake_ks_service_catalog_url_for(*args, **kwargs):
|
||||
raise exceptions.EndpointNotFound("Fake keystone exception")
|
||||
|
||||
def fake_iter_accounts(self, ksclient, cache, endpoint):
|
||||
def fake_iter_accounts(self, ksclient, cache, tenants):
|
||||
tenant_ids = [t.id for t in tenants]
|
||||
for i in self.ACCOUNTS:
|
||||
yield i
|
||||
if i[0] in tenant_ids:
|
||||
yield i
|
||||
|
||||
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
||||
def setUp(self):
|
||||
@ -100,42 +111,35 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
|
||||
else:
|
||||
self.ACCOUNTS = GET_ACCOUNTS
|
||||
|
||||
def tearDown(self):
|
||||
super(TestSwiftPollster, self).tearDown()
|
||||
swift._Base._ENDPOINT = None
|
||||
|
||||
def test_iter_accounts_no_cache(self):
|
||||
cache = {}
|
||||
with mockpatch.PatchObject(self.factory, '_get_account_info',
|
||||
return_value=[]):
|
||||
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
|
||||
ENDPOINT))
|
||||
ASSIGNED_TENANTS))
|
||||
|
||||
self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT)
|
||||
in cache)
|
||||
self.assertTrue('%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_METHOD)
|
||||
in cache)
|
||||
self.assertTrue(self.pollster.CACHE_KEY_METHOD in cache)
|
||||
self.assertEqual([], data)
|
||||
|
||||
def test_iter_accounts_tenants_cached(self):
|
||||
# Verify that if there are tenants pre-cached then the account
|
||||
# info loop iterates over those instead of asking for the list
|
||||
# again.
|
||||
ksclient = mock.Mock()
|
||||
ksclient.tenants.list.side_effect = AssertionError(
|
||||
def test_iter_accounts_cached(self):
|
||||
# Verify that if a method has already been called, _iter_accounts
|
||||
# uses the cached version and doesn't call swiftclient.
|
||||
mock_method = mock.Mock()
|
||||
mock_method.side_effect = AssertionError(
|
||||
'should not be called',
|
||||
)
|
||||
|
||||
api_method = '%s_account' % self.pollster.METHOD
|
||||
with mockpatch.PatchObject(swift_client, api_method, new=ksclient):
|
||||
key = '%s-%s' % (ENDPOINT, self.pollster.CACHE_KEY_TENANT)
|
||||
with mockpatch.PatchObject(swift_client, api_method, new=mock_method):
|
||||
with mockpatch.PatchObject(self.factory, '_neaten_url'):
|
||||
Tenant = collections.namedtuple('Tenant', 'id')
|
||||
cache = {
|
||||
key: [
|
||||
Tenant(self.ACCOUNTS[0][0])
|
||||
],
|
||||
}
|
||||
cache = {self.pollster.CACHE_KEY_METHOD: [self.ACCOUNTS[0]]}
|
||||
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
|
||||
ENDPOINT))
|
||||
self.assertTrue(key in cache)
|
||||
self.assertEqual(self.ACCOUNTS[0][0], data[0][0])
|
||||
ASSIGNED_TENANTS))
|
||||
self.assertEqual([self.ACCOUNTS[0]], data)
|
||||
|
||||
def test_neaten_url(self):
|
||||
test_endpoints = ['http://127.0.0.1:8080',
|
||||
@ -158,24 +162,53 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
|
||||
with mockpatch.PatchObject(self.factory, '_iter_accounts',
|
||||
side_effect=self.fake_iter_accounts):
|
||||
samples = list(self.pollster.get_samples(self.manager, {},
|
||||
[ENDPOINT]))
|
||||
ASSIGNED_TENANTS))
|
||||
|
||||
self.assertEqual(2, len(samples))
|
||||
self.assertEqual(2, len(samples), self.pollster.__class__)
|
||||
|
||||
def test_get_meter_names(self):
|
||||
with mockpatch.PatchObject(self.factory, '_iter_accounts',
|
||||
side_effect=self.fake_iter_accounts):
|
||||
samples = list(self.pollster.get_samples(self.manager, {},
|
||||
[ENDPOINT]))
|
||||
ASSIGNED_TENANTS))
|
||||
|
||||
self.assertEqual(set([samples[0].name]),
|
||||
set([s.name for s in samples]))
|
||||
|
||||
def test_only_poll_assigned(self):
|
||||
mock_method = mock.MagicMock()
|
||||
endpoint = 'end://point/'
|
||||
api_method = '%s_account' % self.pollster.METHOD
|
||||
with mockpatch.PatchObject(swift_client, api_method, new=mock_method):
|
||||
with mockpatch.PatchObject(
|
||||
self.manager.keystone.service_catalog, 'url_for',
|
||||
return_value=endpoint):
|
||||
list(self.pollster.get_samples(self.manager, {},
|
||||
ASSIGNED_TENANTS))
|
||||
expected = [mock.call(self.pollster._neaten_url(endpoint, t.id),
|
||||
self.manager.keystone.auth_token)
|
||||
for t in ASSIGNED_TENANTS]
|
||||
self.assertEqual(expected, mock_method.call_args_list)
|
||||
|
||||
def test_get_endpoint_only_once(self):
|
||||
mock_url_for = mock.MagicMock()
|
||||
api_method = '%s_account' % self.pollster.METHOD
|
||||
with mockpatch.PatchObject(swift_client, api_method,
|
||||
new=mock.MagicMock()):
|
||||
with mockpatch.PatchObject(
|
||||
self.manager.keystone.service_catalog, 'url_for',
|
||||
new=mock_url_for):
|
||||
list(self.pollster.get_samples(self.manager, {},
|
||||
ASSIGNED_TENANTS))
|
||||
list(self.pollster.get_samples(self.manager, {},
|
||||
ASSIGNED_TENANTS))
|
||||
self.assertEqual(1, mock_url_for.call_count)
|
||||
|
||||
def test_endpoint_notfound(self):
|
||||
with mockpatch.PatchObject(
|
||||
self.manager.keystone.service_catalog, 'url_for',
|
||||
side_effect=self.fake_ks_service_catalog_url_for):
|
||||
samples = list(self.pollster.get_samples(self.manager, {},
|
||||
[ENDPOINT]))
|
||||
ASSIGNED_TENANTS))
|
||||
|
||||
self.assertEqual(0, len(samples))
|
||||
|
@ -81,6 +81,7 @@ ceilometer.notification =
|
||||
ceilometer.discover =
|
||||
local_instances = ceilometer.compute.discovery:InstanceDiscovery
|
||||
endpoint = ceilometer.central.discovery:EndpointDiscovery
|
||||
tenant = ceilometer.central.discovery:TenantDiscovery
|
||||
lb_pools = ceilometer.network.services.discovery:LBPoolsDiscovery
|
||||
lb_vips = ceilometer.network.services.discovery:LBVipsDiscovery
|
||||
lb_members = ceilometer.network.services.discovery:LBMembersDiscovery
|
||||
|
Loading…
Reference in New Issue
Block a user