Add ceph object storage meters

Implemented pollster classes to get the basic meters from ceph
object storage (i.e radosgw) and added corresponding unittests.

DocImpact

Co-Authored-By: Abhishek Lekshmanan <abhishek.lekshmanan@ril.com>

Change-Id: Ib90b1d5bbaa36760a2563a044ab256c045772e20
Implements: blueprint ceph-ceilometer-integration
This commit is contained in:
Swami Reddy 2015-02-24 17:25:47 +05:30
parent 35d375dabc
commit 98005590ef
7 changed files with 663 additions and 0 deletions

View File

@ -0,0 +1,212 @@
#
# Copyright 2015 Reliance Jio Infocomm Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Common code for working with ceph object stores
"""
from keystoneclient import exceptions
from oslo_config import cfg
from oslo_utils import timeutils
import six.moves.urllib.parse as urlparse
from ceilometer.agent import plugin_base
from ceilometer.i18n import _
from ceilometer.objectstore.rgw_client import RGWAdminClient as rgwclient
from ceilometer.openstack.common import log
from ceilometer import sample
LOG = log.getLogger(__name__)
SERVICE_OPTS = [
cfg.StrOpt('radosgw',
default='object-store',
help='Radosgw service type.'),
]
CREDENTIAL_OPTS = [
cfg.StrOpt('access_key',
secret=True,
help='Access key for Radosgw Admin.'),
cfg.StrOpt('secret_key',
secret=True,
help='Secret key for Radosgw Admin.')
]
cfg.CONF.register_opts(SERVICE_OPTS, group='service_types')
cfg.CONF.register_opts(CREDENTIAL_OPTS, group='rgw_admin_credentials')
cfg.CONF.import_group('rgw_admin_credentials', 'ceilometer.service')
class _Base(plugin_base.PollsterBase):
METHOD = 'bucket'
_ENDPOINT = None
def __init__(self):
self.access_key = cfg.CONF.rgw_admin_credentials.access_key
self.secret = cfg.CONF.rgw_admin_credentials.secret_key
@property
def default_discovery(self):
return 'tenant'
@property
def CACHE_KEY_METHOD(self):
return 'rgw.get_%s' % self.METHOD
@staticmethod
def _get_endpoint(ksclient):
# we store the endpoint as a base class attribute, so keystone is
# only ever called once, also we assume that in a single deployment
# we may be only deploying `radosgw` or `swift` as the object-store
if _Base._ENDPOINT is None:
try:
conf = cfg.CONF.service_credentials
rgw_url = ksclient.service_catalog.url_for(
service_type=cfg.CONF.service_types.radosgw,
endpoint_type=conf.os_endpoint_type)
_Base._ENDPOINT = urlparse.urljoin(rgw_url, '/admin')
except exceptions.EndpointNotFound:
LOG.debug(_("Radosgw endpoint not found"))
return _Base._ENDPOINT
def _iter_accounts(self, ksclient, cache, tenants):
if self.CACHE_KEY_METHOD not in cache:
cache[self.CACHE_KEY_METHOD] = list(self._get_account_info(
ksclient, tenants))
return iter(cache[self.CACHE_KEY_METHOD])
def _get_account_info(self, ksclient, tenants):
endpoint = self._get_endpoint(ksclient)
if not endpoint:
raise StopIteration()
rgw_client = rgwclient(endpoint, self.access_key, self.secret)
for t in tenants:
api_method = 'get_%s' % self.METHOD
yield t.id, getattr(rgw_client, api_method)(t.id)
class ContainersObjectsPollster(_Base):
"""Get info about object counts in a container using RGW Admin APIs."""
def get_samples(self, manager, cache, resources):
for tenant, bucket_info in self._iter_accounts(manager.keystone,
cache, resources):
for it in bucket_info['buckets']:
yield sample.Sample(
name='radosgw.containers.objects',
type=sample.TYPE_GAUGE,
volume=int(it.num_objects),
unit='object',
user_id=None,
project_id=tenant,
resource_id=tenant + '/' + it.name,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ContainersSizePollster(_Base):
"""Get info about object sizes in a container using RGW Admin APIs."""
def get_samples(self, manager, cache, resources):
for tenant, bucket_info in self._iter_accounts(manager.keystone,
cache, resources):
for it in bucket_info['buckets']:
yield sample.Sample(
name='radosgw.containers.objects.size',
type=sample.TYPE_GAUGE,
volume=int(it.size * 1024),
unit='B',
user_id=None,
project_id=tenant,
resource_id=tenant + '/' + it.name,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ObjectsSizePollster(_Base):
"""Iterate over all accounts, using keystone."""
def get_samples(self, manager, cache, resources):
for tenant, bucket_info in self._iter_accounts(manager.keystone,
cache, resources):
yield sample.Sample(
name='radosgw.objects.size',
type=sample.TYPE_GAUGE,
volume=int(bucket_info['size'] * 1024),
unit='B',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ObjectsPollster(_Base):
"""Iterate over all accounts, using keystone."""
def get_samples(self, manager, cache, resources):
for tenant, bucket_info in self._iter_accounts(manager.keystone,
cache, resources):
yield sample.Sample(
name='radosgw.objects',
type=sample.TYPE_GAUGE,
volume=int(bucket_info['num_objects']),
unit='object',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class ObjectsContainersPollster(_Base):
def get_samples(self, manager, cache, resources):
for tenant, bucket_info in self._iter_accounts(manager.keystone,
cache, resources):
yield sample.Sample(
name='radosgw.objects.containers',
type=sample.TYPE_GAUGE,
volume=int(bucket_info['num_buckets']),
unit='object',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)
class UsagePollster(_Base):
METHOD = 'usage'
def get_samples(self, manager, cache, resources):
for tenant, usage in self._iter_accounts(manager.keystone,
cache, resources):
yield sample.Sample(
name='radosgw.api.request',
type=sample.TYPE_GAUGE,
volume=int(usage),
unit='request',
user_id=None,
project_id=tenant,
resource_id=tenant,
timestamp=timeutils.isotime(),
resource_metadata=None,
)

View File

@ -0,0 +1,72 @@
#
# Copyright 2015 Reliance Jio Infocomm Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from awsauth import S3Auth
import requests
import six.moves.urllib.parse as urlparse
from ceilometer.i18n import _
class RGWAdminAPIFailed(Exception):
pass
class RGWAdminClient(object):
Bucket = namedtuple('Bucket', 'name, num_objects, size')
def __init__(self, endpoint, access_key, secret_key):
self.access_key = access_key
self.secret = secret_key
self.endpoint = endpoint
self.hostname = urlparse.urlparse(endpoint).netloc
def _make_request(self, path, req_params):
uri = "{0}/{1}".format(self.endpoint, path)
r = requests.get(uri, params=req_params,
auth=S3Auth(self.access_key, self.secret,
self.hostname)
)
if r.status_code != 200:
raise RGWAdminAPIFailed(
_('RGW AdminOps API returned %(status)s %(reason)s') %
{'status': r.status_code, 'reason': r.reason})
return r.json()
def get_bucket(self, tenant_id):
path = "bucket"
req_params = {"uid": tenant_id, "stats": "true"}
json_data = self._make_request(path, req_params)
stats = {'num_buckets': 0, 'buckets': [], 'size': 0, 'num_objects': 0}
stats['num_buckets'] = len(json_data)
for it in json_data:
for k, v in it["usage"].items():
stats['num_objects'] += v["num_objects"]
stats['size'] += v["size_kb"]
stats['buckets'].append(self.Bucket(it["bucket"],
v["num_objects"], v["size_kb"]))
return stats
def get_usage(self, tenant_id):
path = "usage"
req_params = {"uid": tenant_id}
json_data = self._make_request(path, req_params)
usage_data = json_data["summary"]
return sum((it["total"]["ops"] for it in usage_data))

View File

@ -48,6 +48,7 @@ import ceilometer.network.notifications
import ceilometer.neutron_client
import ceilometer.notification
import ceilometer.nova_client
import ceilometer.objectstore.rgw
import ceilometer.objectstore.swift
import ceilometer.openstack.common.eventlet_backdoor
import ceilometer.openstack.common.log
@ -83,6 +84,7 @@ def list_opts():
ceilometer.middleware.OPTS,
ceilometer.network.notifications.OPTS,
ceilometer.nova_client.OPTS,
ceilometer.objectstore.rgw.OPTS,
ceilometer.objectstore.swift.OPTS,
(ceilometer.openstack.common.eventlet_backdoor
.eventlet_backdoor_opts),
@ -131,6 +133,7 @@ def list_opts():
ceilometer.image.glance.SERVICE_OPTS,
ceilometer.neutron_client.SERVICE_OPTS,
ceilometer.nova_client.SERVICE_OPTS,
ceilometer.objectstore.rgw.SERVICE_OPTS,
ceilometer.objectstore.swift.SERVICE_OPTS,)),
('vmware', ceilometer.compute.virt.vmware.inspector.OPTS),
('xenapi', ceilometer.compute.virt.xenapi.inspector.OPTS),

View File

@ -0,0 +1,179 @@
#!/usr/bin/env python
#
# Copyright 2015 Reliance Jio Infocomm Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from keystoneclient import exceptions
import mock
from oslotest import base
from oslotest import mockpatch
import testscenarios.testcase
from ceilometer.agent import manager
from ceilometer.objectstore import rgw
from ceilometer.objectstore.rgw_client import RGWAdminClient as rgw_client
bucket_list1 = [rgw_client.Bucket('somefoo1', 10, 7)]
bucket_list2 = [rgw_client.Bucket('somefoo2', 2, 9)]
bucket_list3 = [rgw_client.Bucket('unlisted', 100, 100)]
GET_BUCKETS = [('tenant-000', {'num_buckets': 2, 'size': 1042,
'num_objects': 1001, 'buckets': bucket_list1}),
('tenant-001', {'num_buckets': 2, 'size': 1042,
'num_objects': 1001, 'buckets': bucket_list2}),
('tenant-002-ignored', {'num_buckets': 2, 'size': 1042,
'num_objects': 1001,
'buckets': bucket_list3})]
GET_USAGE = [('tenant-000', 10),
('tenant-001', 11),
('tenant-002-ignored', 12)]
Tenant = collections.namedtuple('Tenant', 'id')
ASSIGNED_TENANTS = [Tenant('tenant-000'), Tenant('tenant-001')]
class TestManager(manager.AgentManager):
def __init__(self):
super(TestManager, self).__init__()
self.keystone = mock.MagicMock()
class TestRgwPollster(testscenarios.testcase.WithScenarios,
base.BaseTestCase):
# Define scenarios to run all of the tests against all of the
# pollsters.
scenarios = [
('radosgw.objects',
{'factory': rgw.ObjectsPollster}),
('radosgw.objects.size',
{'factory': rgw.ObjectsSizePollster}),
('radosgw.objects.containers',
{'factory': rgw.ObjectsContainersPollster}),
('radosgw.containers.objects',
{'factory': rgw.ContainersObjectsPollster}),
('radosgw.containers.objects.size',
{'factory': rgw.ContainersSizePollster}),
('radosgw.api.request',
{'factory': rgw.UsagePollster}),
]
@staticmethod
def fake_ks_service_catalog_url_for(*args, **kwargs):
raise exceptions.EndpointNotFound("Fake keystone exception")
def fake_iter_accounts(self, ksclient, cache, tenants):
tenant_ids = [t.id for t in tenants]
for i in self.ACCOUNTS:
if i[0] in tenant_ids:
yield i
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestRgwPollster, self).setUp()
self.pollster = self.factory()
self.manager = TestManager()
if self.pollster.CACHE_KEY_METHOD == 'rgw.get_bucket':
self.ACCOUNTS = GET_BUCKETS
else:
self.ACCOUNTS = GET_USAGE
def tearDown(self):
super(TestRgwPollster, self).tearDown()
rgw._Base._ENDPOINT = None
def test_iter_accounts_no_cache(self):
cache = {}
with mockpatch.PatchObject(self.factory, '_get_account_info',
return_value=[]):
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
ASSIGNED_TENANTS))
self.assertIn(self.pollster.CACHE_KEY_METHOD, cache)
self.assertEqual([], data)
def test_iter_accounts_cached(self):
# Verify that if a method has already been called, _iter_accounts
# uses the cached version and doesn't call rgw_clinet.
mock_method = mock.Mock()
mock_method.side_effect = AssertionError(
'should not be called',
)
api_method = 'get_%s' % self.pollster.METHOD
with mockpatch.PatchObject(rgw_client, api_method, new=mock_method):
cache = {self.pollster.CACHE_KEY_METHOD: [self.ACCOUNTS[0]]}
data = list(self.pollster._iter_accounts(mock.Mock(), cache,
ASSIGNED_TENANTS))
self.assertEqual([self.ACCOUNTS[0]], data)
def test_metering(self):
with mockpatch.PatchObject(self.factory, '_iter_accounts',
side_effect=self.fake_iter_accounts):
samples = list(self.pollster.get_samples(self.manager, {},
ASSIGNED_TENANTS))
self.assertEqual(2, len(samples), self.pollster.__class__)
def test_get_meter_names(self):
with mockpatch.PatchObject(self.factory, '_iter_accounts',
side_effect=self.fake_iter_accounts):
samples = list(self.pollster.get_samples(self.manager, {},
ASSIGNED_TENANTS))
self.assertEqual(set([samples[0].name]),
set([s.name for s in samples]))
def test_only_poll_assigned(self):
mock_method = mock.MagicMock()
endpoint = 'http://127.0.0.1:8000/admin'
api_method = 'get_%s' % self.pollster.METHOD
with mockpatch.PatchObject(rgw_client, api_method, new=mock_method):
with mockpatch.PatchObject(
self.manager.keystone.service_catalog, 'url_for',
return_value=endpoint):
list(self.pollster.get_samples(self.manager, {},
ASSIGNED_TENANTS))
expected = [mock.call(t.id)
for t in ASSIGNED_TENANTS]
self.assertEqual(expected, mock_method.call_args_list)
def test_get_endpoint_only_once(self):
mock_url_for = mock.MagicMock()
api_method = 'get_%s' % self.pollster.METHOD
with mockpatch.PatchObject(rgw_client, api_method,
new=mock.MagicMock()):
with mockpatch.PatchObject(
self.manager.keystone.service_catalog, 'url_for',
new=mock_url_for):
list(self.pollster.get_samples(self.manager, {},
ASSIGNED_TENANTS))
list(self.pollster.get_samples(self.manager, {},
ASSIGNED_TENANTS))
self.assertEqual(1, mock_url_for.call_count)
def test_endpoint_notfound(self):
with mockpatch.PatchObject(
self.manager.keystone.service_catalog, 'url_for',
side_effect=self.fake_ks_service_catalog_url_for):
samples = list(self.pollster.get_samples(self.manager, {},
ASSIGNED_TENANTS))
self.assertEqual(0, len(samples))

View File

@ -0,0 +1,190 @@
#!/usr/bin/env python
#
# Copyright (C) 2015 Reliance Jio Infocomm Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from oslotest import base
from ceilometer.objectstore.rgw_client import RGWAdminAPIFailed
from ceilometer.objectstore.rgw_client import RGWAdminClient
RGW_ADMIN_BUCKETS = '''
[
{
"max_marker": "",
"ver": 2001,
"usage": {
"rgw.main": {
"size_kb_actual": 16000,
"num_objects": 1000,
"size_kb": 1000
}
},
"bucket": "somefoo",
"owner": "admin",
"master_ver": 0,
"mtime": 1420176126,
"marker": "default.4126.1",
"bucket_quota": {
"max_objects": -1,
"enabled": false,
"max_size_kb": -1
},
"id": "default.4126.1",
"pool": ".rgw.buckets",
"index_pool": ".rgw.buckets.index"
},
{
"max_marker": "",
"ver": 3,
"usage": {
"rgw.main": {
"size_kb_actual": 43,
"num_objects": 1,
"size_kb": 42
}
},
"bucket": "somefoo31",
"owner": "admin",
"master_ver": 0,
"mtime": 1420176134,
"marker": "default.4126.5",
"bucket_quota": {
"max_objects": -1,
"enabled": false,
"max_size_kb": -1
},
"id": "default.4126.5",
"pool": ".rgw.buckets",
"index_pool": ".rgw.buckets.index"
}
]'''
RGW_ADMIN_USAGE = '''
{ "entries": [
{ "owner": "5f7fe2d5352e466f948f49341e33d107",
"buckets": [
{ "bucket": "",
"time": "2015-01-23 09:00:00.000000Z",
"epoch": 1422003600,
"categories": [
{ "category": "list_buckets",
"bytes_sent": 46,
"bytes_received": 0,
"ops": 3,
"successful_ops": 3},
{ "category": "stat_account",
"bytes_sent": 0,
"bytes_received": 0,
"ops": 1,
"successful_ops": 1}]},
{ "bucket": "foodsgh",
"time": "2015-01-23 09:00:00.000000Z",
"epoch": 1422003600,
"categories": [
{ "category": "create_bucket",
"bytes_sent": 0,
"bytes_received": 0,
"ops": 1,
"successful_ops": 1},
{ "category": "get_obj",
"bytes_sent": 0,
"bytes_received": 0,
"ops": 1,
"successful_ops": 0},
{ "category": "put_obj",
"bytes_sent": 0,
"bytes_received": 238,
"ops": 1,
"successful_ops": 1}]}]}],
"summary": [
{ "user": "5f7fe2d5352e466f948f49341e33d107",
"categories": [
{ "category": "create_bucket",
"bytes_sent": 0,
"bytes_received": 0,
"ops": 1,
"successful_ops": 1},
{ "category": "get_obj",
"bytes_sent": 0,
"bytes_received": 0,
"ops": 1,
"successful_ops": 0},
{ "category": "list_buckets",
"bytes_sent": 46,
"bytes_received": 0,
"ops": 3,
"successful_ops": 3},
{ "category": "put_obj",
"bytes_sent": 0,
"bytes_received": 238,
"ops": 1,
"successful_ops": 1},
{ "category": "stat_account",
"bytes_sent": 0,
"bytes_received": 0,
"ops": 1,
"successful_ops": 1}],
"total": { "bytes_sent": 46,
"bytes_received": 238,
"ops": 7,
"successful_ops": 6}}]}
'''
buckets_json = json.loads(RGW_ADMIN_BUCKETS)
usage_json = json.loads(RGW_ADMIN_USAGE)
class TestRGWAdminClient(base.BaseTestCase):
def setUp(self):
super(TestRGWAdminClient, self).setUp()
self.client = RGWAdminClient('http://127.0.0.1:8080/admin',
'abcde', 'secret')
self.get_resp = mock.MagicMock()
self.get = mock.patch('requests.get',
return_value=self.get_resp).start()
def test_make_request_exception(self):
self.get_resp.status_code = 403
self.assertRaises(RGWAdminAPIFailed, self.client._make_request,
*('foo', {}))
def test_make_request(self):
self.get_resp.status_code = 200
self.get_resp.json.return_value = buckets_json
actual = self.client._make_request('foo', [])
self.assertEqual(buckets_json, actual)
def test_get_buckets(self):
self.get_resp.status_code = 200
self.get_resp.json.return_value = buckets_json
actual = self.client.get_bucket('foo')
bucket_list = [RGWAdminClient.Bucket('somefoo', 1000, 1000),
RGWAdminClient.Bucket('somefoo31', 1, 42),
]
expected = {'num_buckets': 2, 'size': 1042, 'num_objects': 1001,
'buckets': bucket_list}
self.assertEqual(expected, actual)
def test_get_usage(self):
self.get_resp.status_code = 200
self.get_resp.json.return_value = usage_json
actual = self.client.get_usage('foo')
expected = 7
self.assertEqual(expected, actual)

View File

@ -159,6 +159,12 @@ ceilometer.poll.central =
ip.floating = ceilometer.network.floatingip:FloatingIPPollster
image = ceilometer.image.glance:ImagePollster
image.size = ceilometer.image.glance:ImageSizePollster
rgw.containers.objects = ceilometer.objectstore.rgw:ContainersObjectsPollster
rgw.containers.objects.size = ceilometer.objectstore.rgw:ContainersSizePollster
rgw.objects = ceilometer.objectstore.rgw:ObjectsPollster
rgw.objects.size = ceilometer.objectstore.rgw:ObjectsSizePollster
rgw.objects.containers = ceilometer.objectstore.rgw:ObjectsContainersPollster
rgw.usage = ceilometer.objectstore.rgw:UsagePollster
storage.containers.objects = ceilometer.objectstore.swift:ContainersObjectsPollster
storage.containers.objects.size = ceilometer.objectstore.swift:ContainersSizePollster
storage.objects = ceilometer.objectstore.swift:ObjectsPollster

View File

@ -29,3 +29,4 @@ testrepository>=0.0.18
testscenarios>=0.4
testtools>=0.9.36,!=1.2.0
gabbi>=0.6.0
requests-aws>=0.1.4