diff --git a/cloudkitty/storage/__init__.py b/cloudkitty/storage/__init__.py index ddfdda0e..a2368c70 100644 --- a/cloudkitty/storage/__init__.py +++ b/cloudkitty/storage/__init__.py @@ -18,12 +18,15 @@ import abc from oslo_config import cfg +from oslo_log import log as logging import six from stevedore import driver from cloudkitty import collector as ck_collector from cloudkitty import utils as ck_utils +LOG = logging.getLogger(__name__) + storage_opts = [ cfg.StrOpt('backend', default='sqlalchemy', @@ -47,6 +50,9 @@ def get_storage(collector=None): cfg.CONF.storage.backend, invoke_on_load=True, invoke_kwds=storage_args).driver + if cfg.CONF.storage.backend not in ['sqlalchemy', 'hybrid']: + LOG.warning('{} storage backend is deprecated and will be removed ' + 'in a future release.'.format(cfg.CONF.storage.backend)) return backend diff --git a/cloudkitty/storage/hybrid/__init__.py b/cloudkitty/storage/hybrid/__init__.py new file mode 100644 index 00000000..92369257 --- /dev/null +++ b/cloudkitty/storage/hybrid/__init__.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +from oslo_config import cfg +from oslo_db.sqlalchemy import utils +from stevedore import driver + +from cloudkitty import db +from cloudkitty import storage +from cloudkitty.storage.hybrid import migration +from cloudkitty.storage.hybrid import models +from cloudkitty import utils as ck_utils + + +storage_opts = [ + cfg.StrOpt( + 'backend', + default='gnocchi', + help='Name of the storage backend that should be used ' + 'by the hybrid storage') +] + +CONF = cfg.CONF +CONF.register_opts(storage_opts, group='hybrid_storage') + +HYBRID_BACKENDS_NAMESPACE = 'cloudkitty.storage.hybrid.backends' + + +class HybridStorage(storage.BaseStorage): + """Hybrid Storage Backend. + + Stores dataframes in one of the available backends and other informations + in a classical SQL database. + """ + + state_model = models.TenantState + + def __init__(self, **kwargs): + super(HybridStorage, self).__init__(**kwargs) + self._hybrid_backend = driver.DriverManager( + HYBRID_BACKENDS_NAMESPACE, + cfg.CONF.hybrid_storage.backend, + invoke_on_load=True).driver + self._sql_session = {} + + def _check_session(self, tenant_id): + session = self._sql_session.get(tenant_id, None) + if not session: + self._sql_session[tenant_id] = db.get_session() + self._sql_session[tenant_id].begin() + + def init(self): + migration.upgrade('head') + self._hybrid_backend.init() + + def get_state(self, tenant_id=None): + session = db.get_session() + q = utils.model_query(self.state_model, session) + if tenant_id: + q = q.filter(self.state_model.tenant_id == tenant_id) + q = q.order_by(self.state_model.state.desc()) + r = q.first() + return ck_utils.dt2ts(r.state) if r else None + + def _set_state(self, tenant_id, state): + self._check_session(tenant_id) + session = self._sql_session[tenant_id] + q = utils.model_query(self.state_model, session) + if tenant_id: + q = q.filter(self.state_model.tenant_id == tenant_id) + r = q.first() + do_commit = False + if r: + if state >= r.state: + q.update({'state': state}) + do_commit = True + else: + state = self.state_model(tenant_id=tenant_id, state=state) + session.add(state) + do_commit = True + if do_commit: + session.commit() + + def _commit(self, tenant_id): + self._hybrid_backend.commit(tenant_id, self.get_state(tenant_id)) + + def _pre_commit(self, tenant_id): + super(HybridStorage, self)._pre_commit(tenant_id) + + def _post_commit(self, tenant_id): + self._set_state(tenant_id, self.usage_start_dt.get(tenant_id)) + super(HybridStorage, self)._post_commit(tenant_id) + del self._sql_session[tenant_id] + + def get_total(self, begin=None, end=None, tenant_id=None, + service=None, groupby=None): + return self._hybrid_backend.get_total( + begin=begin, end=end, tenant_id=tenant_id, + service=service, groupby=groupby) + + def _dispatch(self, data, tenant_id): + for service in data: + for frame in data[service]: + self._hybrid_backend.append_time_frame( + service, frame, tenant_id) + self._has_data[tenant_id] = True + + def get_tenants(self, begin, end): + return self._hybrid_backend.get_tenants(begin, end) + + def get_time_frame(self, begin, end, **filters): + return self._hybrid_backend.get_time_frame(begin, end, **filters) diff --git a/cloudkitty/storage/hybrid/alembic/env.py b/cloudkitty/storage/hybrid/alembic/env.py new file mode 100644 index 00000000..b005e4cb --- /dev/null +++ b/cloudkitty/storage/hybrid/alembic/env.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +from cloudkitty.common.db.alembic import env # noqa +from cloudkitty.storage.hybrid import models + +target_metadata = models.Base.metadata +version_table = 'storage_hybrid_alembic' + + +env.run_migrations_online(target_metadata, version_table) diff --git a/cloudkitty/storage/hybrid/alembic/script.py.mako b/cloudkitty/storage/hybrid/alembic/script.py.mako new file mode 100644 index 00000000..2c015630 --- /dev/null +++ b/cloudkitty/storage/hybrid/alembic/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/cloudkitty/storage/hybrid/alembic/versions/03da4bb002b9_initial_revision.py b/cloudkitty/storage/hybrid/alembic/versions/03da4bb002b9_initial_revision.py new file mode 100644 index 00000000..a8c92f45 --- /dev/null +++ b/cloudkitty/storage/hybrid/alembic/versions/03da4bb002b9_initial_revision.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +"""initial revision + +Revision ID: 03da4bb002b9 +Revises: None +Create Date: 2017-11-21 15:59:26.776639 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '03da4bb002b9' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + 'hybrid_storage_states', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('tenant_id', sa.String(length=32), nullable=False), + sa.Column('state', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id'), + mysql_charset='utf8', + mysql_engine='InnoDB') diff --git a/cloudkitty/storage/hybrid/backends/__init__.py b/cloudkitty/storage/hybrid/backends/__init__.py new file mode 100644 index 00000000..bfd07d40 --- /dev/null +++ b/cloudkitty/storage/hybrid/backends/__init__.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class BaseHybridBackend(object): + """Base Backend class for the Hybrid Storage. + + This is the interface that all backends for the hybrid storage + should implement. + """ + + @abc.abstractmethod + def commit(self, tenant_id, state): + """Push data to the storage backend. + + :param tenant_id: id of the tenant which information must be commited. + """ + pass + + @abc.abstractmethod + def init(self): + """Initialize hybrid storage backend. + + Can be used to create DB scheme on first start + """ + pass + + @abc.abstractmethod + def get_total(self, begin=None, end=None, tenant_id=None, + service=None, groupby=None): + """Return the current total. + + :param begin: When to start filtering. + :type begin: datetime.datetime + :param end: When to stop filtering. + :type end: datetime.datetime + :param tenant_id: Filter on the tenant_id. + :type res_type: str + :param service: Filter on the resource type. + :type service: str + :param groupby: Fields to group by, separated by commas if multiple. + :type groupby: str + """ + pass + + @abc.abstractmethod + def append_time_frame(self, res_type, frame, tenant_id): + """Append a time frame to commit to the backend. + + :param res_type: The resource type of the dataframe. + :param frame: The timeframe to append. + :param tenant_id: Tenant the frame is belonging to. + """ + pass + + @abc.abstractmethod + def get_tenants(self, begin, end): + """Return the list of rated tenants. + + :param begin: When to start filtering. + :type begin: datetime.datetime + :param end: When to stop filtering. + :type end: datetime.datetime + """ + + @abc.abstractmethod + def get_time_frame(self, begin, end, **filters): + """Request a time frame from the storage backend. + + :param begin: When to start filtering. + :type begin: datetime.datetime + :param end: When to stop filtering. + :type end: datetime.datetime + :param res_type: (Optional) Filter on the resource type. + :type res_type: str + :param tenant_id: (Optional) Filter on the tenant_id. + :type res_type: str + """ diff --git a/cloudkitty/storage/hybrid/backends/gnocchi.py b/cloudkitty/storage/hybrid/backends/gnocchi.py new file mode 100644 index 00000000..cbb6b3c1 --- /dev/null +++ b/cloudkitty/storage/hybrid/backends/gnocchi.py @@ -0,0 +1,461 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +import datetime +import decimal +import json + +from gnocchiclient import client as gclient +from gnocchiclient import exceptions as gexceptions +from keystoneauth1 import loading as ks_loading +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import uuidutils +import six + +from cloudkitty.storage.hybrid.backends import BaseHybridBackend +from cloudkitty.transformer import gnocchi as gtransformer +import cloudkitty.utils as ck_utils + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + +CONF.import_opt('period', 'cloudkitty.collector', 'collect') + +GNOCCHI_STORAGE_OPTS = 'storage_gnocchi' +gnocchi_storage_opts = [ + cfg.StrOpt('archive_policy_name', + default='rating', + help='Gnocchi storage archive policy name.'), + # The archive policy definition MUST include the collect period granularity + cfg.StrOpt('archive_policy_definition', + default='[{"granularity": ' + + six.text_type(CONF.collect.period) + + ', "timespan": "90 days"}, ' + '{"granularity": 86400, "timespan": "360 days"}, ' + '{"granularity": 2592000, "timespan": "1800 days"}]', + help='Gnocchi storage archive policy definition.'), ] +CONF.register_opts(gnocchi_storage_opts, GNOCCHI_STORAGE_OPTS) + +ks_loading.register_session_conf_options( + CONF, + GNOCCHI_STORAGE_OPTS) +ks_loading.register_auth_conf_options( + CONF, + GNOCCHI_STORAGE_OPTS) + +METRICS_CONF = ck_utils.get_metrics_conf(CONF.collect.metrics_conf) + +RESOURCE_TYPE_NAME_ROOT = 'rating_service_' + + +class DecimalJSONEncoder(json.JSONEncoder): + """Wrapper class to handle decimal.Decimal objects in json.dumps().""" + def default(self, obj): + if isinstance(obj, decimal.Decimal): + return float(obj) + return super(DecimalJSONEncoder, self).default(obj) + + +class UnknownResourceType(Exception): + """Exception raised when an unknown resource type is encountered""" + + def __init__(self, resource_type): + super(UnknownResourceType, self).__init__( + 'Unknown resource type {}'.format(resource_type) + ) + + +class GnocchiStorage(BaseHybridBackend): + """Gnocchi backend for hybrid storage. + + """ + + # NOTE(lukapeschke): List taken directly from gnocchi code + invalid_attribute_names = [ + "id", "type", "metrics", + "revision", "revision_start", "revision_end", + "started_at", "ended_at", + "user_id", "project_id", + "created_by_user_id", "created_by_project_id", "get_metric", + "creator", + ] + + @staticmethod + def _get_service_metrics(service_name): + metrics = METRICS_CONF['services_metrics'][service_name] + metric_list = ['price'] + for metric in metrics: + metric_list.append(list(metric.keys())[0]) + return metric_list + + def _init_resource_types(self): + transformer = gtransformer.GnocchiTransformer() + services = METRICS_CONF['services'] + for service in services: + service_dict = dict() + service_dict['attributes'] = list() + for attribute in transformer.get_metadata(service): + if attribute not in self.invalid_attribute_names: + service_dict['attributes'].append(attribute) + service_dict['required_attributes'] = [ + 'resource_id', + 'unit', + ] + try: + service_dict['metrics'] = self._get_service_metrics(service) + except KeyError: + LOG.warning( + 'No metrics configured for service {}'.format(service)) + service_dict['metrics'] = list() + service_dict['name'] = RESOURCE_TYPE_NAME_ROOT + service + service_dict['qty_metric'] \ + = list(METRICS_CONF['metrics_units'][service].keys())[0] + self._resource_type_data[service] = service_dict + + def _get_res_type_dict(self, res_type): + res_type_data = self._resource_type_data.get(res_type, None) + if not res_type_data: + return None + attribute_dict = dict() + for attribute in res_type_data['attributes']: + attribute_dict[attribute] = { + 'required': False, + 'type': 'string', + } + for attribute in res_type_data['required_attributes']: + attribute_dict[attribute] = { + 'required': True, + 'type': 'string', + } + return { + 'name': res_type_data['name'], + 'attributes': attribute_dict, + } + + def _create_resource(self, res_type, tenant_id, data): + res_type_data = self._resource_type_data.get(res_type, None) + if not res_type_data: + raise UnknownResourceType( + "Unknown resource type '{}'".format(res_type)) + + res_dict = { + 'id': data['resource_id'], + 'resource_id': data['resource_id'], + 'project_id': tenant_id, + 'user_id': data['user_id'], + 'unit': data['unit'], + } + for attr in res_type_data['attributes']: + res_dict[attr] = data.get(attr, None) or 'None' + if isinstance(res_dict[attr], decimal.Decimal): + res_dict[attr] = float(res_dict[attr]) + + created_metrics = [ + self._conn.metric.create({ + 'name': metric, + 'archive_policy_name': + CONF.storage_gnocchi.archive_policy_name, + }) for metric in res_type_data['metrics'] + ] + + metrics_dict = dict() + for metric in created_metrics: + metrics_dict[metric['name']] = metric['id'] + res_dict['metrics'] = metrics_dict + try: + return self._conn.resource.create(res_type_data['name'], res_dict) + except gexceptions.ResourceAlreadyExists: + res_dict['id'] = uuidutils.generate_uuid() + return self._conn.resource.create(res_type_data['name'], res_dict) + + def _get_resource(self, resource_type, resource_id): + try: + resource_name = self._resource_type_data[resource_type]['name'] + except KeyError: + raise UnknownResourceType( + "Unknown resource type '{}'".format(resource_type)) + try: + return self._conn.resource.get(resource_name, resource_id) + except gexceptions.ResourceNotFound: + return None + + def _find_resource(self, resource_type, resource_id): + try: + resource_type = self._resource_type_data[resource_type]['name'] + except KeyError: + raise UnknownResourceType( + "Unknown resource type '{}'".format(resource_type)) + query = { + '=': { + 'resource_id': resource_id, + } + } + try: + return self._conn.resource.search( + resource_type=resource_type, query=query, limit=1)[0] + except IndexError: + return None + + def _create_resource_type(self, resource_type): + res_type = self._resource_type_data.get(resource_type, None) + if not res_type: + return None + res_type_dict = self._get_res_type_dict(resource_type) + try: + output = self._conn.resource_type.create(res_type_dict) + except gexceptions.ResourceTypeAlreadyExists: + output = None + return output + + def _get_resource_type(self, resource_type): + res_type = self._resource_type_data.get(resource_type, None) + if not res_type: + return None + return self._conn.resource_type.get(res_type['name']) + + def __init__(self, **kwargs): + super(GnocchiStorage, self).__init__(**kwargs) + self.auth = ks_loading.load_auth_from_conf_options( + CONF, + GNOCCHI_STORAGE_OPTS) + self.session = ks_loading.load_session_from_conf_options( + CONF, + GNOCCHI_STORAGE_OPTS, + auth=self.auth) + self._conn = gclient.Client('1', session=self.session) + self._archive_policy_name = ( + CONF.storage_gnocchi.archive_policy_name) + self._archive_policy_definition = json.loads( + CONF.storage_gnocchi.archive_policy_definition) + self._period = CONF.collect.period + if "period" in kwargs: + self._period = kwargs["period"] + self._measurements = dict() + self._resource_type_data = dict() + self._init_resource_types() + + def commit(self, tenant_id, state): + if not self._measurements.get(tenant_id, None): + return + commitable_measurements = dict() + for metrics in self._measurements[tenant_id].values(): + for metric_id, measurements in metrics.items(): + if measurements: + measures = list() + for measurement in measurements: + measures.append( + { + 'timestamp': state, + 'value': measurement, + } + ) + commitable_measurements[metric_id] = measures + if commitable_measurements: + self._conn.metric.batch_metrics_measures(commitable_measurements) + del self._measurements[tenant_id] + + def init(self): + try: + self._conn.archive_policy.get(self._archive_policy_name) + except gexceptions.ArchivePolicyNotFound: + ck_archive_policy = {} + ck_archive_policy['name'] = self._archive_policy_name + ck_archive_policy['back_window'] = 0 + ck_archive_policy['aggregation_methods'] \ + = ['std', 'count', 'min', 'max', 'sum', 'mean'] + ck_archive_policy['definition'] = self._archive_policy_definition + self._conn.archive_policy.create(ck_archive_policy) + for service in self._resource_type_data.keys(): + try: + self._get_resource_type(service) + except gexceptions.ResourceTypeNotFound: + self._create_resource_type(service) + + def get_total(self, begin=None, end=None, tenant_id=None, + service=None, groupby=None): + # Query can't be None if we don't specify a resource_id + query = {} + if tenant_id: + query['='] = {"project_id": tenant_id} + measures = self._conn.metric.aggregation( + metrics='price', query=query, + start=begin, stop=end, + aggregation='sum', + granularity=self._period, + needed_overlap=0) + rate = sum(measure[2] for measure in measures) if len(measures) else 0 + return [{ + 'begin': begin, + 'end': end, + 'rate': rate, + }] + + def _append_measurements(self, resource, data, tenant_id): + if not self._measurements.get(tenant_id, None): + self._measurements[tenant_id] = {} + measurements = self._measurements[tenant_id] + if not measurements.get(resource['id'], None): + measurements[resource['id']] = { + key: list() for key in resource['metrics'].values() + } + for metric_name, metric_id in resource['metrics'].items(): + measurement = data.get(metric_name, None) + if measurement is not None: + measurements[resource['id']][metric_id].append( + float(measurement) + if isinstance(measurement, decimal.Decimal) + else measurement) + + def append_time_frame(self, res_type, frame, tenant_id): + flat_frame = ck_utils.flat_dict(frame) + resource = self._find_resource(res_type, flat_frame['resource_id']) + if not resource: + resource = self._create_resource(res_type, tenant_id, flat_frame) + self._append_measurements(resource, flat_frame, tenant_id) + + def get_tenants(self, begin, end): + query = {'like': {'type': RESOURCE_TYPE_NAME_ROOT + '%'}} + r = self._conn.metric.aggregation( + metrics='price', + query=query, + start=begin, + stop=end, + aggregation='sum', + granularity=self._period, + needed_overlap=0, + groupby='project_id') + projects = list() + for measures in r: + projects.append(measures['group']['project_id']) + return projects + + @staticmethod + def _get_time_query(start, end, resource_type, tenant_id=None): + query = {'and': [{ + 'or': [ + {'=': {'ended_at': None}}, + {'<=': {'ended_at': end}} + ] + }, + {'>=': {'started_at': start}}, + {'=': {'type': resource_type}}, + ] + } + if tenant_id: + query['and'].append({'=': {'project_id': tenant_id}}) + return query + + def _get_resources(self, resource_type, start, end, tenant_id=None): + """Returns the resources of the given type in the given period""" + return self._conn.resource.search( + resource_type=resource_type, + query=self._get_time_query(start, end, resource_type, tenant_id), + details=True) + + def _format_frame(self, res_type, resource, desc, measure, tenant_id): + res_type_info = self._resource_type_data.get(res_type, None) + if not res_type_info: + return dict() + + start = measure[0] + stop = start + datetime.timedelta(seconds=self._period) + + # Getting price + price = decimal.Decimal(measure[2]) + price_dict = {'price': float(price)} + + # Getting vol + if isinstance(res_type_info['qty_metric'], (str, unicode)): + try: + qty = self._conn.metric.get_measures( + resource['metrics'][res_type_info['qty_metric']], + aggregation='sum', + start=start, stop=stop, + refresh=True)[-1][2] + except IndexError: + qty = 0 + else: + qty = res_type_info['qty_metric'] + vol_dict = {'qty': decimal.Decimal(qty), 'unit': resource['unit']} + + # Period + period_dict = { + 'begin': ck_utils.dt2iso(start), + 'end': ck_utils.dt2iso(stop), + } + + # Formatting + res_dict = dict() + res_dict['desc'] = desc + res_dict['vol'] = vol_dict + res_dict['rating'] = price_dict + res_dict['tenant_id'] = tenant_id + + return { + 'usage': {res_type: [res_dict]}, + 'period': period_dict, + } + + def resource_info(self, resource_type, start, end, tenant_id=None): + """Returns a dataframe for the given resource type""" + try: + res_type_info = self._resource_type_data.get(resource_type, None) + resource_name = res_type_info['name'] + except (KeyError, AttributeError): + raise UnknownResourceType(resource_type) + attributes = res_type_info['attributes'] \ + + res_type_info['required_attributes'] + output = list() + query = self._get_time_query(start, end, resource_name, tenant_id) + measures = self._conn.metric.aggregation( + metrics='price', + resource_type=resource_name, + query=query, + start=start, + stop=end, + granularity=self._period, + aggregation='sum', + needed_overlap=0, + groupby=['type', 'id'], + ) + for resource_measures in measures: + resource_desc = None + resource = None + for measure in resource_measures['measures']: + if not resource_desc: + resource = self._get_resource( + resource_type, resource_measures['group']['id']) + if not resource: + continue + desc = {a: resource.get(a, None) for a in attributes} + formatted_frame = self._format_frame( + resource_type, resource, desc, measure, tenant_id) + output.append(formatted_frame) + return output + + def get_time_frame(self, begin, end, **filters): + tenant_id = filters.get('tenant_id', None) + resource_types = [filters.get('res_type', None)] + if not resource_types[0]: + resource_types = self._resource_type_data.keys() + output = list() + for resource_type in resource_types: + output += self.resource_info(resource_type, begin, end, tenant_id) + return output diff --git a/cloudkitty/storage/hybrid/migration.py b/cloudkitty/storage/hybrid/migration.py new file mode 100644 index 00000000..11439fe1 --- /dev/null +++ b/cloudkitty/storage/hybrid/migration.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +import os + +from cloudkitty.common.db.alembic import migration + +ALEMBIC_REPO = os.path.join(os.path.dirname(__file__), 'alembic') + + +def upgrade(revision): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.upgrade(config, revision) + + +def version(): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.version(config) + + +def revision(message, autogenerate): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.revision(config, message, autogenerate) + + +def stamp(revision): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.stamp(config, revision) diff --git a/cloudkitty/storage/hybrid/models.py b/cloudkitty/storage/hybrid/models.py new file mode 100644 index 00000000..7167cf4b --- /dev/null +++ b/cloudkitty/storage/hybrid/models.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +from oslo_db.sqlalchemy import models +import sqlalchemy +from sqlalchemy.ext import declarative + +Base = declarative.declarative_base() + + +class TenantState(Base, models.ModelBase): + """A tenant state. + + """ + __table_args__ = {'mysql_charset': "utf8", + 'mysql_engine': "InnoDB"} + __tablename__ = 'hybrid_storage_states' + + id = sqlalchemy.Column(sqlalchemy.Integer, + primary_key=True) + tenant_id = sqlalchemy.Column(sqlalchemy.String(32), + nullable=False) + state = sqlalchemy.Column(sqlalchemy.DateTime, + nullable=False) diff --git a/cloudkitty/tests/samples.py b/cloudkitty/tests/samples.py index d00d5ba3..a44b3f10 100644 --- a/cloudkitty/tests/samples.py +++ b/cloudkitty/tests/samples.py @@ -36,6 +36,7 @@ COMPUTE_METADATA = { 'flavor': 'm1.nano', 'image_id': 'f5600101-8fa2-4864-899e-ebcb7ed6b568', 'instance_id': '26c084e1-b8f1-4cbc-a7ec-e8b356788a17', + 'resource_id': '1558f911-b55a-4fd2-9173-c8f1f23e5639', 'memory': '64', 'metadata': { 'farm': 'prod' @@ -47,6 +48,7 @@ COMPUTE_METADATA = { IMAGE_METADATA = { 'checksum': '836c69cbcd1dc4f225daedbab6edc7c7', + 'resource_id': '7b5b73f2-9181-4307-a710-b1aa6472526d', 'container_format': 'aki', 'created_at': '2014-06-04T16:26:01', 'deleted': 'False', @@ -127,3 +129,62 @@ STORED_DATA[1]['usage']['compute'][0]['rating'] = { 'price': 0.42} STORED_DATA = split_storage_data(STORED_DATA) + +METRICS_CONF = { + 'collector': 'gnocchi', + 'name': 'OpenStack', + 'period': 3600, + 'services': [ + 'compute', + 'volume', + 'network.bw.in', + 'network.bw.out', + 'network.floating', + 'image' + ], + 'services_metrics': { + 'compute': [ + {'vcpus': 'max'}, + {'memory': 'max'}, + {'cpu': 'max'}, + {'disk.root.size': 'max'}, + {'disk.ephemeral.size': 'max'} + ], + 'image': [ + {'image.size': 'max'}, + {'image.download': 'max'}, + {'image.serve': 'max'} + ], + 'network.bw.in': [{'network.incoming.bytes': 'max'}], + 'network.bw.out': [{'network.outgoing.bytes': 'max'}], + 'network.floating': [{'ip.floating': 'max'}], + 'volume': [{'volume.size': 'max'}], + 'radosgw.usage': [{'radosgw.objects.size': 'max'}]}, + 'services_objects': { + 'compute': 'instance', + 'image': 'image', + 'network.bw.in': 'instance_network_interface', + 'network.bw.out': 'instance_network_interface', + 'network.floating': 'network', + 'volume': 'volume', + 'radosgw.usage': 'ceph_account', + }, + 'metrics_units': { + 'compute': {1: {'unit': 'instance'}}, + 'default_unit': {1: {'unit': 'unknown'}}, + 'image': {'image.size': {'unit': 'MiB', 'factor': '1/1048576'}}, + 'network.bw.in': {'network.incoming.bytes': { + 'unit': 'MB', + 'factor': '1/1000000'}}, + 'network.bw.out': {'network.outgoing.bytes': { + 'unit': 'MB', + 'factor': '1/1000000'}}, + 'network.floating': {1: {'unit': 'ip'}}, + 'volume': {'volume.size': {'unit': 'GiB'}}, + 'radosgw.usage': {'radosgw.objects.size': { + 'unit': 'GiB', + 'factor': '1/1073741824'}}, + }, + 'wait_periods': 2, + 'window': 1800 +} diff --git a/cloudkitty/tests/storage/test_hybrid_storage.py b/cloudkitty/tests/storage/test_hybrid_storage.py new file mode 100644 index 00000000..cafa2ea8 --- /dev/null +++ b/cloudkitty/tests/storage/test_hybrid_storage.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# + +import mock + +from gnocchiclient import exceptions as gexc + +from cloudkitty import storage +from cloudkitty.storage.hybrid.backends import gnocchi as hgnocchi +from cloudkitty import tests +from cloudkitty.tests import samples + + +class BaseHybridStorageTest(tests.TestCase): + + def setUp(self): + super(BaseHybridStorageTest, self).setUp() + self.conf.set_override('backend', 'hybrid', 'storage') + hgnocchi.METRICS_CONF = samples.METRICS_CONF + self.storage = storage.get_storage() + with mock.patch.object( + self.storage._hybrid_backend, 'init'): + self.storage.init() + + +class PermissiveDict(object): + """Allows to check a single key of a dict in an assertion. + + Example: + >>> mydict = {'a': 'A', 'b': 'B'} + >>> checker = PermissiveDict('A', key='a') + >>> checker == mydict + True + """ + def __init__(self, value, key='name'): + self.key = key + self.value = value + + def __eq__(self, other): + return self.value == other.get(self.key) + + +class HybridStorageTestGnocchi(BaseHybridStorageTest): + + def setUp(self): + super(HybridStorageTestGnocchi, self).setUp() + + def tearDown(self): + super(HybridStorageTestGnocchi, self).tearDown() + + def _init_storage(self, archive_policy=False, res_type=False): + with mock.patch.object(self.storage._hybrid_backend._conn, + 'archive_policy', + spec=['get', 'create']) as pol_mock: + if not archive_policy: + pol_mock.get.side_effect = gexc.ArchivePolicyNotFound + else: + pol_mock.create.side_effect = gexc.ArchivePolicyAlreadyExists + with mock.patch.object(self.storage._hybrid_backend._conn, + 'resource_type', + spec=['get', 'create']) as rtype_mock: + if not res_type: + rtype_mock.get.side_effect = gexc.ResourceTypeNotFound + else: + rtype_mock.create.side_effect \ + = gexc.ResourceTypeAlreadyExists + + self.storage.init() + rtype_data = self.storage._hybrid_backend._resource_type_data + rtype_calls = list() + for val in rtype_data.values(): + rtype_calls.append( + mock.call(PermissiveDict(val['name'], key='name'))) + if res_type: + rtype_mock.create.assert_not_called() + else: + rtype_mock.create.assert_has_calls( + rtype_calls, any_order=True) + pol_mock.get.assert_called_once_with( + self.storage._hybrid_backend._archive_policy_name) + if archive_policy: + pol_mock.create.assert_not_called() + else: + apolicy = { + 'name': self.storage._hybrid_backend._archive_policy_name, + 'back_window': 0, + 'aggregation_methods': + ['std', 'count', 'min', 'max', 'sum', 'mean'], + } + apolicy['definition'] = \ + self.storage._hybrid_backend._archive_policy_definition + pol_mock.create.assert_called_once_with(apolicy) + + def test_init_no_res_type_no_policy(self): + self._init_storage() + + def test_init_with_res_type_no_policy(self): + self._init_storage(res_type=True) + + def test_init_no_res_type_with_policy(self): + self._init_storage(archive_policy=True) + + def test_init_with_res_type_with_policy(self): + self._init_storage(res_type=True, archive_policy=True) diff --git a/cloudkitty/tests/storage/test_storage.py b/cloudkitty/tests/storage/test_storage.py index 6bc4bcb9..a224ea31 100644 --- a/cloudkitty/tests/storage/test_storage.py +++ b/cloudkitty/tests/storage/test_storage.py @@ -17,10 +17,12 @@ # import copy +import mock import sqlalchemy import testscenarios from cloudkitty import storage +from cloudkitty.storage.hybrid.backends import gnocchi as hgnocchi from cloudkitty import tests from cloudkitty.tests import samples from cloudkitty import utils as ck_utils @@ -28,7 +30,8 @@ from cloudkitty import utils as ck_utils class StorageTest(tests.TestCase): storage_scenarios = [ - ('sqlalchemy', dict(storage_backend='sqlalchemy'))] + ('sqlalchemy', dict(storage_backend='sqlalchemy')), + ('hybrid', dict(storage_backend='hybrid'))] @classmethod def generate_scenarios(cls): @@ -36,8 +39,10 @@ class StorageTest(tests.TestCase): cls.scenarios, cls.storage_scenarios) - def setUp(self): + @mock.patch('cloudkitty.storage.hybrid.backends.gnocchi.gclient') + def setUp(self, gclient_mock): super(StorageTest, self).setUp() + hgnocchi.METRICS_CONF = samples.METRICS_CONF self._tenant_id = samples.TENANT self._other_tenant_id = '8d3ae50089ea4142-9c6e1269db6a0b64' self.conf.set_override('backend', self.storage_backend, 'storage') @@ -77,6 +82,278 @@ class StorageTest(tests.TestCase): self.assertEqual(samples.RATED_DATA[1]['usage'], data) self.assertEqual([], working_data) + # State + def test_get_state_when_nothing_in_storage(self): + state = self.storage.get_state() + self.assertIsNone(state) + + def test_get_latest_global_state(self): + self.insert_different_data_two_tenants() + state = self.storage.get_state() + self.assertEqual(samples.SECOND_PERIOD_BEGIN, state) + + def test_get_state_on_rated_tenant(self): + self.insert_different_data_two_tenants() + state = self.storage.get_state(self._tenant_id) + self.assertEqual(samples.FIRST_PERIOD_BEGIN, state) + state = self.storage.get_state(self._other_tenant_id) + self.assertEqual(samples.SECOND_PERIOD_BEGIN, state) + + def test_get_state_on_no_data_frame(self): + self.storage.nodata( + samples.FIRST_PERIOD_BEGIN, + samples.FIRST_PERIOD_END, + self._tenant_id) + self.storage.commit(self._tenant_id) + state = self.storage.get_state(self._tenant_id) + self.assertEqual(samples.FIRST_PERIOD_BEGIN, state) + + +class StorageDataframeTest(StorageTest): + + storage_scenarios = [ + ('sqlalchemy', dict(storage_backend='sqlalchemy'))] + + # Queries + # Data + def test_get_no_frame_when_nothing_in_storage(self): + self.assertRaises( + storage.NoTimeFrame, + self.storage.get_time_frame, + begin=samples.FIRST_PERIOD_BEGIN - 3600, + end=samples.FIRST_PERIOD_BEGIN) + + def test_get_frame_filter_outside_data(self): + self.insert_different_data_two_tenants() + self.assertRaises( + storage.NoTimeFrame, + self.storage.get_time_frame, + begin=samples.FIRST_PERIOD_BEGIN - 3600, + end=samples.FIRST_PERIOD_BEGIN) + + def test_get_frame_without_filter_but_timestamp(self): + self.insert_different_data_two_tenants() + data = self.storage.get_time_frame( + begin=samples.FIRST_PERIOD_BEGIN, + end=samples.SECOND_PERIOD_END) + self.assertEqual(3, len(data)) + + def test_get_frame_on_one_period(self): + self.insert_different_data_two_tenants() + data = self.storage.get_time_frame( + begin=samples.FIRST_PERIOD_BEGIN, + end=samples.FIRST_PERIOD_END) + self.assertEqual(2, len(data)) + + def test_get_frame_on_one_period_and_one_tenant(self): + self.insert_different_data_two_tenants() + data = self.storage.get_time_frame( + begin=samples.FIRST_PERIOD_BEGIN, + end=samples.FIRST_PERIOD_END, + tenant_id=self._tenant_id) + self.assertEqual(2, len(data)) + + def test_get_frame_on_one_period_and_one_tenant_outside_data(self): + self.insert_different_data_two_tenants() + self.assertRaises( + storage.NoTimeFrame, + self.storage.get_time_frame, + begin=samples.FIRST_PERIOD_BEGIN, + end=samples.FIRST_PERIOD_END, + tenant_id=self._other_tenant_id) + + def test_get_frame_on_two_periods(self): + self.insert_different_data_two_tenants() + data = self.storage.get_time_frame( + begin=samples.FIRST_PERIOD_BEGIN, + end=samples.SECOND_PERIOD_END) + self.assertEqual(3, len(data)) + + +class StorageTotalTest(StorageTest): + + storage_scenarios = [ + ('sqlalchemy', dict(storage_backend='sqlalchemy'))] + + # Total + def test_get_empty_total(self): + begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600) + end = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) + self.insert_data() + total = self.storage.get_total( + begin=begin, + end=end) + self.assertEqual(1, len(total)) + self.assertIsNone(total[0]["rate"]) + self.assertEqual(begin, total[0]["begin"]) + self.assertEqual(end, total[0]["end"]) + + def test_get_total_without_filter_but_timestamp(self): + begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) + end = ck_utils.ts2dt(samples.SECOND_PERIOD_END) + self.insert_data() + total = self.storage.get_total( + begin=begin, + end=end) + # FIXME(sheeprine): floating point error (transition to decimal) + self.assertEqual(1, len(total)) + self.assertEqual(1.9473999999999998, total[0]["rate"]) + self.assertEqual(begin, total[0]["begin"]) + self.assertEqual(end, total[0]["end"]) + + def test_get_total_filtering_on_one_period(self): + begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) + end = ck_utils.ts2dt(samples.FIRST_PERIOD_END) + self.insert_data() + total = self.storage.get_total( + begin=begin, + end=end) + self.assertEqual(1, len(total)) + self.assertEqual(1.1074, total[0]["rate"]) + self.assertEqual(begin, total[0]["begin"]) + self.assertEqual(end, total[0]["end"]) + + def test_get_total_filtering_on_one_period_and_one_tenant(self): + begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) + end = ck_utils.ts2dt(samples.FIRST_PERIOD_END) + self.insert_data() + total = self.storage.get_total( + begin=begin, + end=end, + tenant_id=self._tenant_id) + self.assertEqual(1, len(total)) + self.assertEqual(0.5537, total[0]["rate"]) + self.assertEqual(self._tenant_id, total[0]["tenant_id"]) + self.assertEqual(begin, total[0]["begin"]) + self.assertEqual(end, total[0]["end"]) + + def test_get_total_filtering_on_service(self): + begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) + end = ck_utils.ts2dt(samples.FIRST_PERIOD_END) + self.insert_data() + total = self.storage.get_total( + begin=begin, + end=end, + service='compute') + self.assertEqual(1, len(total)) + self.assertEqual(0.84, total[0]["rate"]) + self.assertEqual('compute', total[0]["res_type"]) + self.assertEqual(begin, total[0]["begin"]) + self.assertEqual(end, total[0]["end"]) + + def test_get_total_groupby_tenant(self): + begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) + end = ck_utils.ts2dt(samples.SECOND_PERIOD_END) + self.insert_data() + total = self.storage.get_total( + begin=begin, + end=end, + groupby="tenant_id") + self.assertEqual(2, len(total)) + self.assertEqual(0.9737, total[0]["rate"]) + self.assertEqual(self._other_tenant_id, total[0]["tenant_id"]) + self.assertEqual(begin, total[0]["begin"]) + self.assertEqual(end, total[0]["end"]) + self.assertEqual(0.9737, total[1]["rate"]) + self.assertEqual(self._tenant_id, total[1]["tenant_id"]) + self.assertEqual(begin, total[1]["begin"]) + self.assertEqual(end, total[1]["end"]) + + def test_get_total_groupby_restype(self): + begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) + end = ck_utils.ts2dt(samples.SECOND_PERIOD_END) + self.insert_data() + total = self.storage.get_total( + begin=begin, + end=end, + groupby="res_type") + self.assertEqual(2, len(total)) + self.assertEqual(0.2674, total[0]["rate"]) + self.assertEqual('image', total[0]["res_type"]) + self.assertEqual(begin, total[0]["begin"]) + self.assertEqual(end, total[0]["end"]) + self.assertEqual(1.68, total[1]["rate"]) + self.assertEqual('compute', total[1]["res_type"]) + self.assertEqual(begin, total[1]["begin"]) + self.assertEqual(end, total[1]["end"]) + + def test_get_total_groupby_tenant_and_restype(self): + begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) + end = ck_utils.ts2dt(samples.SECOND_PERIOD_END) + self.insert_data() + total = self.storage.get_total( + begin=begin, + end=end, + groupby="tenant_id,res_type") + self.assertEqual(4, len(total)) + self.assertEqual(0.1337, total[0]["rate"]) + self.assertEqual(self._other_tenant_id, total[0]["tenant_id"]) + self.assertEqual('image', total[0]["res_type"]) + self.assertEqual(begin, total[0]["begin"]) + self.assertEqual(end, total[0]["end"]) + self.assertEqual(0.1337, total[1]["rate"]) + self.assertEqual(self._tenant_id, total[1]["tenant_id"]) + self.assertEqual('image', total[1]["res_type"]) + self.assertEqual(begin, total[1]["begin"]) + self.assertEqual(end, total[1]["end"]) + self.assertEqual(0.84, total[2]["rate"]) + self.assertEqual(self._other_tenant_id, total[2]["tenant_id"]) + self.assertEqual('compute', total[2]["res_type"]) + self.assertEqual(begin, total[2]["begin"]) + self.assertEqual(end, total[2]["end"]) + self.assertEqual(0.84, total[3]["rate"]) + self.assertEqual(self._tenant_id, total[3]["tenant_id"]) + self.assertEqual('compute', total[3]["res_type"]) + self.assertEqual(begin, total[3]["begin"]) + self.assertEqual(end, total[3]["end"]) + + +class StorageTenantTest(StorageTest): + + storage_scenarios = [ + ('sqlalchemy', dict(storage_backend='sqlalchemy'))] + + # Tenants + def test_get_empty_tenant_with_nothing_in_storage(self): + tenants = self.storage.get_tenants( + begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN), + end=ck_utils.ts2dt(samples.SECOND_PERIOD_BEGIN)) + self.assertEqual([], tenants) + + def test_get_empty_tenant_list(self): + self.insert_data() + tenants = self.storage.get_tenants( + begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600), + end=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)) + self.assertEqual([], tenants) + + def test_get_tenants_filtering_on_period(self): + self.insert_different_data_two_tenants() + tenants = self.storage.get_tenants( + begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN), + end=ck_utils.ts2dt(samples.SECOND_PERIOD_END)) + self.assertListEqual( + [self._tenant_id, self._other_tenant_id], + tenants) + tenants = self.storage.get_tenants( + begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN), + end=ck_utils.ts2dt(samples.FIRST_PERIOD_END)) + self.assertListEqual( + [self._tenant_id], + tenants) + tenants = self.storage.get_tenants( + begin=ck_utils.ts2dt(samples.SECOND_PERIOD_BEGIN), + end=ck_utils.ts2dt(samples.SECOND_PERIOD_END)) + self.assertListEqual( + [self._other_tenant_id], + tenants) + + +class StorageDataIntegrityTest(StorageTest): + + storage_scenarios = [ + ('sqlalchemy', dict(storage_backend='sqlalchemy'))] + # Data integrity def test_has_data_flag_behaviour(self): self.assertNotIn(self._tenant_id, self.storage._has_data) @@ -213,253 +490,9 @@ class StorageTest(tests.TestCase): self.assertNotIn(self._tenant_id, self.storage.usage_end) self.assertNotIn(self._tenant_id, self.storage.usage_end_dt) - # Queries - # Data - def test_get_no_frame_when_nothing_in_storage(self): - self.assertRaises( - storage.NoTimeFrame, - self.storage.get_time_frame, - begin=samples.FIRST_PERIOD_BEGIN - 3600, - end=samples.FIRST_PERIOD_BEGIN) - - def test_get_frame_filter_outside_data(self): - self.insert_different_data_two_tenants() - self.assertRaises( - storage.NoTimeFrame, - self.storage.get_time_frame, - begin=samples.FIRST_PERIOD_BEGIN - 3600, - end=samples.FIRST_PERIOD_BEGIN) - - def test_get_frame_without_filter_but_timestamp(self): - self.insert_different_data_two_tenants() - data = self.storage.get_time_frame( - begin=samples.FIRST_PERIOD_BEGIN, - end=samples.SECOND_PERIOD_END) - self.assertEqual(3, len(data)) - - def test_get_frame_on_one_period(self): - self.insert_different_data_two_tenants() - data = self.storage.get_time_frame( - begin=samples.FIRST_PERIOD_BEGIN, - end=samples.FIRST_PERIOD_END) - self.assertEqual(2, len(data)) - - def test_get_frame_on_one_period_and_one_tenant(self): - self.insert_different_data_two_tenants() - data = self.storage.get_time_frame( - begin=samples.FIRST_PERIOD_BEGIN, - end=samples.FIRST_PERIOD_END, - tenant_id=self._tenant_id) - self.assertEqual(2, len(data)) - - def test_get_frame_on_one_period_and_one_tenant_outside_data(self): - self.insert_different_data_two_tenants() - self.assertRaises( - storage.NoTimeFrame, - self.storage.get_time_frame, - begin=samples.FIRST_PERIOD_BEGIN, - end=samples.FIRST_PERIOD_END, - tenant_id=self._other_tenant_id) - - def test_get_frame_on_two_periods(self): - self.insert_different_data_two_tenants() - data = self.storage.get_time_frame( - begin=samples.FIRST_PERIOD_BEGIN, - end=samples.SECOND_PERIOD_END) - self.assertEqual(3, len(data)) - - # State - def test_get_state_when_nothing_in_storage(self): - state = self.storage.get_state() - self.assertIsNone(state) - - def test_get_latest_global_state(self): - self.insert_different_data_two_tenants() - state = self.storage.get_state() - self.assertEqual(samples.SECOND_PERIOD_BEGIN, state) - - def test_get_state_on_rated_tenant(self): - self.insert_different_data_two_tenants() - state = self.storage.get_state(self._tenant_id) - self.assertEqual(samples.FIRST_PERIOD_BEGIN, state) - state = self.storage.get_state(self._other_tenant_id) - self.assertEqual(samples.SECOND_PERIOD_BEGIN, state) - - def test_get_state_on_no_data_frame(self): - self.storage.nodata( - samples.FIRST_PERIOD_BEGIN, - samples.FIRST_PERIOD_END, - self._tenant_id) - self.storage.commit(self._tenant_id) - state = self.storage.get_state(self._tenant_id) - self.assertEqual(samples.FIRST_PERIOD_BEGIN, state) - - # Total - def test_get_empty_total(self): - begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600) - end = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) - self.insert_data() - total = self.storage.get_total( - begin=begin, - end=end) - self.assertEqual(1, len(total)) - self.assertIsNone(total[0]["rate"]) - self.assertEqual(begin, total[0]["begin"]) - self.assertEqual(end, total[0]["end"]) - - def test_get_total_without_filter_but_timestamp(self): - begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) - end = ck_utils.ts2dt(samples.SECOND_PERIOD_END) - self.insert_data() - total = self.storage.get_total( - begin=begin, - end=end) - # FIXME(sheeprine): floating point error (transition to decimal) - self.assertEqual(1, len(total)) - self.assertEqual(1.9473999999999998, total[0]["rate"]) - self.assertEqual(begin, total[0]["begin"]) - self.assertEqual(end, total[0]["end"]) - - def test_get_total_filtering_on_one_period(self): - begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) - end = ck_utils.ts2dt(samples.FIRST_PERIOD_END) - self.insert_data() - total = self.storage.get_total( - begin=begin, - end=end) - self.assertEqual(1, len(total)) - self.assertEqual(1.1074, total[0]["rate"]) - self.assertEqual(begin, total[0]["begin"]) - self.assertEqual(end, total[0]["end"]) - - def test_get_total_filtering_on_one_period_and_one_tenant(self): - begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) - end = ck_utils.ts2dt(samples.FIRST_PERIOD_END) - self.insert_data() - total = self.storage.get_total( - begin=begin, - end=end, - tenant_id=self._tenant_id) - self.assertEqual(1, len(total)) - self.assertEqual(0.5537, total[0]["rate"]) - self.assertEqual(self._tenant_id, total[0]["tenant_id"]) - self.assertEqual(begin, total[0]["begin"]) - self.assertEqual(end, total[0]["end"]) - - def test_get_total_filtering_on_service(self): - begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) - end = ck_utils.ts2dt(samples.FIRST_PERIOD_END) - self.insert_data() - total = self.storage.get_total( - begin=begin, - end=end, - service='compute') - self.assertEqual(1, len(total)) - self.assertEqual(0.84, total[0]["rate"]) - self.assertEqual('compute', total[0]["res_type"]) - self.assertEqual(begin, total[0]["begin"]) - self.assertEqual(end, total[0]["end"]) - - def test_get_total_groupby_tenant(self): - begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) - end = ck_utils.ts2dt(samples.SECOND_PERIOD_END) - self.insert_data() - total = self.storage.get_total( - begin=begin, - end=end, - groupby="tenant_id") - self.assertEqual(2, len(total)) - self.assertEqual(0.9737, total[0]["rate"]) - self.assertEqual(self._other_tenant_id, total[0]["tenant_id"]) - self.assertEqual(begin, total[0]["begin"]) - self.assertEqual(end, total[0]["end"]) - self.assertEqual(0.9737, total[1]["rate"]) - self.assertEqual(self._tenant_id, total[1]["tenant_id"]) - self.assertEqual(begin, total[1]["begin"]) - self.assertEqual(end, total[1]["end"]) - - def test_get_total_groupby_restype(self): - begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) - end = ck_utils.ts2dt(samples.SECOND_PERIOD_END) - self.insert_data() - total = self.storage.get_total( - begin=begin, - end=end, - groupby="res_type") - self.assertEqual(2, len(total)) - self.assertEqual(0.2674, total[0]["rate"]) - self.assertEqual('image', total[0]["res_type"]) - self.assertEqual(begin, total[0]["begin"]) - self.assertEqual(end, total[0]["end"]) - self.assertEqual(1.68, total[1]["rate"]) - self.assertEqual('compute', total[1]["res_type"]) - self.assertEqual(begin, total[1]["begin"]) - self.assertEqual(end, total[1]["end"]) - - def test_get_total_groupby_tenant_and_restype(self): - begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN) - end = ck_utils.ts2dt(samples.SECOND_PERIOD_END) - self.insert_data() - total = self.storage.get_total( - begin=begin, - end=end, - groupby="tenant_id,res_type") - self.assertEqual(4, len(total)) - self.assertEqual(0.1337, total[0]["rate"]) - self.assertEqual(self._other_tenant_id, total[0]["tenant_id"]) - self.assertEqual('image', total[0]["res_type"]) - self.assertEqual(begin, total[0]["begin"]) - self.assertEqual(end, total[0]["end"]) - self.assertEqual(0.1337, total[1]["rate"]) - self.assertEqual(self._tenant_id, total[1]["tenant_id"]) - self.assertEqual('image', total[1]["res_type"]) - self.assertEqual(begin, total[1]["begin"]) - self.assertEqual(end, total[1]["end"]) - self.assertEqual(0.84, total[2]["rate"]) - self.assertEqual(self._other_tenant_id, total[2]["tenant_id"]) - self.assertEqual('compute', total[2]["res_type"]) - self.assertEqual(begin, total[2]["begin"]) - self.assertEqual(end, total[2]["end"]) - self.assertEqual(0.84, total[3]["rate"]) - self.assertEqual(self._tenant_id, total[3]["tenant_id"]) - self.assertEqual('compute', total[3]["res_type"]) - self.assertEqual(begin, total[3]["begin"]) - self.assertEqual(end, total[3]["end"]) - - # Tenants - def test_get_empty_tenant_with_nothing_in_storage(self): - tenants = self.storage.get_tenants( - begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN), - end=ck_utils.ts2dt(samples.SECOND_PERIOD_BEGIN)) - self.assertEqual([], tenants) - - def test_get_empty_tenant_list(self): - self.insert_data() - tenants = self.storage.get_tenants( - begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600), - end=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)) - self.assertEqual([], tenants) - - def test_get_tenants_filtering_on_period(self): - self.insert_different_data_two_tenants() - tenants = self.storage.get_tenants( - begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN), - end=ck_utils.ts2dt(samples.SECOND_PERIOD_END)) - self.assertListEqual( - [self._tenant_id, self._other_tenant_id], - tenants) - tenants = self.storage.get_tenants( - begin=ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN), - end=ck_utils.ts2dt(samples.FIRST_PERIOD_END)) - self.assertListEqual( - [self._tenant_id], - tenants) - tenants = self.storage.get_tenants( - begin=ck_utils.ts2dt(samples.SECOND_PERIOD_BEGIN), - end=ck_utils.ts2dt(samples.SECOND_PERIOD_END)) - self.assertListEqual( - [self._other_tenant_id], - tenants) - StorageTest.generate_scenarios() +StorageTotalTest.generate_scenarios() +StorageTenantTest.generate_scenarios() +StorageDataframeTest.generate_scenarios() +StorageDataIntegrityTest.generate_scenarios() diff --git a/cloudkitty/utils.py b/cloudkitty/utils.py index d02b1ac2..4b865284 100644 --- a/cloudkitty/utils.py +++ b/cloudkitty/utils.py @@ -291,3 +291,15 @@ def convert_unit(value, factor, offset=0): factor = decimal.Decimal(factor) return (decimal.Decimal(value) * factor) + decimal.Decimal(offset) + + +def flat_dict(item, parent=None): + """Returns a flat version of the nested dict item""" + if not parent: + parent = dict() + for k, val in item.items(): + if isinstance(val, dict): + parent = flat_dict(val, parent) + else: + parent[k] = val + return parent diff --git a/doc/source/configuration/configuration.rst b/doc/source/configuration/configuration.rst index db089eb3..7a94e1d2 100644 --- a/doc/source/configuration/configuration.rst +++ b/doc/source/configuration/configuration.rst @@ -139,13 +139,13 @@ The following shows the basic configuration items: The tenant named ``service`` is also commonly called ``services`` -It is now time to configure the storage backend. Three storage backends are -available: ``sqlalchemy``, ``gnocchihybrid``, and ``gnocchi``. +It is now time to configure the storage backend. Four storage backends are +available: ``sqlalchemy``, ``hybrid``, ``gnocchihybrid``, and ``gnocchi``. .. code-block:: ini [storage] - backend = gnocchihybrid + backend = gnocchi As you will see in the following example, collector and storage backends sometimes need additional configuration sections. (The tenant fetcher works the @@ -158,7 +158,8 @@ example), except for ``storage_gnocchi``. The section name format should become ``{backend_type}_{backend_name}`` for all sections in the future (``storage_gnocchi`` style). -If you want to use the pure gnocchi storage, add the following entry: +If you want to use the pure gnocchi storage or the hybrid storage with a +gnocchi backend, add the following entry: .. code-block:: ini diff --git a/releasenotes/notes/refactor-storage-e5453296e477e594.yaml b/releasenotes/notes/refactor-storage-e5453296e477e594.yaml new file mode 100644 index 00000000..dba30107 --- /dev/null +++ b/releasenotes/notes/refactor-storage-e5453296e477e594.yaml @@ -0,0 +1,13 @@ +--- +features: + - | + The storage system is being refactored. + A hybrid storage backend has been added. This backend handles states + via SQLAlchemy and pure storage via another storage backend. Once + this new storage is considered stable, it will become the default storage. + This will ease the creation of storage backends (no more state handling). + +deprecations: + - | + All storage backends except sqlalchemy and the new hybrid storage + have been deprecated. diff --git a/setup.cfg b/setup.cfg index 66ae6e57..3168dbef 100644 --- a/setup.cfg +++ b/setup.cfg @@ -69,6 +69,10 @@ cloudkitty.storage.backends = sqlalchemy = cloudkitty.storage.sqlalchemy:SQLAlchemyStorage gnocchihybrid = cloudkitty.storage.gnocchi_hybrid:GnocchiHybridStorage gnocchi = cloudkitty.storage.gnocchi:GnocchiStorage + hybrid = cloudkitty.storage.hybrid:HybridStorage + +cloudkitty.storage.hybrid.backends = + gnocchi = cloudkitty.storage.hybrid.backends.gnocchi:GnocchiStorage cloudkitty.output.writers = osrf = cloudkitty.writer.osrf:OSRFBackend