diff --git a/cloudkitty/cli/storage.py b/cloudkitty/cli/storage.py new file mode 100644 index 00000000..ab220d2b --- /dev/null +++ b/cloudkitty/cli/storage.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +from oslo.config import cfg +from stevedore import driver + +from cloudkitty import config # noqa +from cloudkitty import service + +CONF = cfg.CONF +STORAGES_NAMESPACE = 'cloudkitty.storage.backends' + + +def init_storage_backend(): + CONF.import_opt('backend', 'cloudkitty.storage', 'storage') + backend = driver.DriverManager( + STORAGES_NAMESPACE, + CONF.storage.backend) + backend.driver.init() + + +def main(): + service.prepare_service() + init_storage_backend() diff --git a/cloudkitty/config.py b/cloudkitty/config.py index 33e510ef..39b63b9d 100644 --- a/cloudkitty/config.py +++ b/cloudkitty/config.py @@ -49,6 +49,9 @@ collect_opts = [ cfg.IntOpt('period', default=3600, help='Billing period in seconds.'), + cfg.IntOpt('wait_periods', + default=2, + help='Wait for N periods before collecting new data.'), cfg.ListOpt('services', default=['compute'], help='Services to monitor.'), ] diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 5eb976fe..977536ba 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -45,6 +45,7 @@ CONF = cfg.CONF COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends' TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers' PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors' +STORAGES_NAMESPACE = 'cloudkitty.storage.backends' WRITERS_NAMESPACE = 'cloudkitty.output.writers' @@ -131,6 +132,14 @@ class Orchestrator(object): self.sm, basepath=CONF.output.basepath) + CONF.import_opt('backend', 'cloudkitty.storage', 'storage') + storage_args = {'period': CONF.collect.period} + self.storage = driver.DriverManager( + STORAGES_NAMESPACE, + CONF.storage.backend, + invoke_on_load=True, + invoke_kwds=storage_args).driver + # Billing processors self.b_processors = {} self._load_billing_processors() @@ -158,13 +167,15 @@ class Orchestrator(object): self.server.start() def _check_state(self): - timestamp = self.sm.get_state() + timestamp = self.storage.get_state() if not timestamp: return ck_utils.get_this_month_timestamp() - now = int(time.time()) - if timestamp + CONF.collect.period < now: - return timestamp + now = int(time.time() + time.timezone) + next_timestamp = timestamp + CONF.collect.period + wait_time = CONF.collect.wait_periods * CONF.collect.period + if next_timestamp + wait_time < now: + return next_timestamp return 0 def _collect(self, service, start_timestamp): @@ -251,10 +262,14 @@ class Orchestrator(object): processor.process(data) # Writing - self.wo.append(data) + # Copy data to keep old behaviour with write_orchestrator + wo_data = list(data) + self.wo.append(wo_data) + self.storage.append(data) # We're getting a full period so we directly commit self.wo.commit() + self.storage.commit() def terminate(self): self.wo.close() diff --git a/cloudkitty/storage/__init__.py b/cloudkitty/storage/__init__.py new file mode 100644 index 00000000..8f522aba --- /dev/null +++ b/cloudkitty/storage/__init__.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +import abc +import datetime + +from oslo.config import cfg +import six + + +storage_opts = [ + cfg.StrOpt('backend', + default='sqlalchemy', + help='Name of the storage backend driver.') +] + +cfg.CONF.register_opts(storage_opts, group='storage') + + +@six.add_metaclass(abc.ABCMeta) +class BaseStorage(object): + """Base Storage class: + + Handle incoming data from the global orchestrator, and store them. + """ + def __init__(self, period=3600): + self._period = period + + # State vars + self.usage_start = None + self.usage_start_dt = None + self.usage_end = None + self.usage_end_dt = None + + @staticmethod + def init(): + """Initialize storage backend. + + Can be used to create DB schema on first start. + """ + pass + + def _filter_period(self, json_data): + """Detect the best usage period to extract. + + Removes the usage from the json data and returns it. + :param json_data: Data to filter. + """ + candidate_ts = None + candidate_idx = 0 + + for idx, usage in enumerate(json_data): + usage_ts = usage['period']['begin'] + if candidate_ts is None or usage_ts < candidate_ts: + candidate_ts = usage_ts + candidate_idx = idx + + if candidate_ts: + return candidate_ts, json_data.pop(candidate_idx)['usage'] + + def _pre_commit(self): + """Called before every commit. + + """ + + @abc.abstractmethod + def _commit(self): + """Push data to the storage backend. + + """ + + def _post_commit(self): + """Called after every commit. + + """ + + @abc.abstractmethod + def _dispatch(self, data): + """Process rated data. + + :param data: The rated data frames. + """ + + @abc.abstractmethod + def get_state(self): + """Return the last written frame's timestamp. + + """ + + @abc.abstractmethod + def get_total(self): + pass + + @abc.abstractmethod + def get_time_frame(self, begin, end, **filters): + """Request a time frame from the storage backend. + + :param begin: When to start filtering. + :type begin: datetime.datetime + :param end: When to stop filtering. + :type end: datetime.datetime + :param res_type: (Optional) Filter on the resource type. + :type res_type: str + """ + + def append(self, raw_data): + """Append rated data before committing them to the backend. + + :param raw_data: The rated data frames. + """ + while raw_data: + usage_start, data = self._filter_period(raw_data) + if self.usage_end is not None and usage_start >= self.usage_end: + self.commit() + self.usage_start = None + + if self.usage_start is None: + self.usage_start = usage_start + self.usage_end = usage_start + self._period + self.usage_start_dt = ( + datetime.datetime.fromtimestamp(self.usage_start)) + self.usage_end_dt = ( + datetime.datetime.fromtimestamp(self.usage_end)) + + self._dispatch(data) + + def commit(self): + """Commit the changes to the backend. + + """ + self._pre_commit() + self._commit() + self._post_commit() diff --git a/cloudkitty/storage/sqlalchemy/__init__.py b/cloudkitty/storage/sqlalchemy/__init__.py new file mode 100644 index 00000000..dfec493b --- /dev/null +++ b/cloudkitty/storage/sqlalchemy/__init__.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +import json + +from oslo.db.sqlalchemy import utils + +from cloudkitty import db +from cloudkitty import storage +from cloudkitty.storage.sqlalchemy import migration +from cloudkitty.storage.sqlalchemy import models +from cloudkitty import utils as ck_utils + + +class NoTimeFrame(Exception): + """Raised when there is no time frame available.""" + + def __init__(self): + super(NoTimeFrame, self).__init__( + "No time frame available") + + +class SQLAlchemyStorage(storage.BaseStorage): + """SQLAlchemy Storage Backend + + """ + def __init__(self, period=3600): + super(SQLAlchemyStorage, self).__init__(period) + self._session = None + + @staticmethod + def init(): + migration.upgrade('head') + + def _commit(self): + self._session.commit() + self._session.begin() + + def _dispatch(self, data): + for service in data: + for frame in data[service]: + self._append_time_frame(service, frame) + + def append(self, raw_data): + if not self._session: + self._session = db.get_session() + self._session.begin() + super(SQLAlchemyStorage, self).append(raw_data) + + def get_state(self): + session = db.get_session() + r = utils.model_query( + models.RatedDataFrame, + session + ).order_by( + models.RatedDataFrame.begin.desc() + ).first() + if r: + return ck_utils.dt2ts(r.begin) + + def get_total(self): + pass + + def get_time_frame(self, begin, end, **filters): + """Return a list of time frames. + + :param start: Filter from `start`. + :param end: Filter to `end`. + :param unit: Filter on an unit type. + :param res_type: Filter on a resource type. + """ + model = models.RatedDataFrame + session = db.get_session() + q = utils.model_query( + model, + session + ).filter( + model.begin >= begin, + model.end <= end + ) + for cur_filter in filters: + q = q.filter(getattr(model, cur_filter) == filters[cur_filter]) + if not q: + raise NoTimeFrame() + return q.to_cloudkitty() + + def _append_time_frame(self, res_type, frame): + vol_dict = frame['vol'] + qty = vol_dict['qty'] + unit = vol_dict['unit'] + rating_dict = frame['billing'] + rate = rating_dict['price'] + desc = json.dumps(frame['desc']) + self.add_time_frame(self.usage_start_dt, + self.usage_end_dt, + unit, + qty, + res_type, + rate, + desc) + + def add_time_frame(self, begin, end, unit, qty, res_type, rate, desc): + """Create a new time frame. + + """ + frame = models.RatedDataFrame(begin=begin, + end=end, + unit=unit, + qty=qty, + res_type=res_type, + rate=rate, + desc=desc) + self._session.add(frame) diff --git a/cloudkitty/storage/sqlalchemy/alembic/env.py b/cloudkitty/storage/sqlalchemy/alembic/env.py new file mode 100644 index 00000000..bcb048f3 --- /dev/null +++ b/cloudkitty/storage/sqlalchemy/alembic/env.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +from cloudkitty.common.db.alembic import env # noqa +from cloudkitty.storage.sqlalchemy import models + +target_metadata = models.Base.metadata +version_table = 'storage_sqlalchemy_alembic' + + +env.run_migrations_online(target_metadata, version_table) diff --git a/cloudkitty/storage/sqlalchemy/alembic/script.py.mako b/cloudkitty/storage/sqlalchemy/alembic/script.py.mako new file mode 100644 index 00000000..95702017 --- /dev/null +++ b/cloudkitty/storage/sqlalchemy/alembic/script.py.mako @@ -0,0 +1,22 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision} +Create Date: ${create_date} + +""" + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/cloudkitty/storage/sqlalchemy/alembic/versions/17fd1b237aa3_initial_migration.py b/cloudkitty/storage/sqlalchemy/alembic/versions/17fd1b237aa3_initial_migration.py new file mode 100644 index 00000000..5d47dd82 --- /dev/null +++ b/cloudkitty/storage/sqlalchemy/alembic/versions/17fd1b237aa3_initial_migration.py @@ -0,0 +1,35 @@ +"""Initial migration + +Revision ID: 17fd1b237aa3 +Revises: None +Create Date: 2014-10-10 11:28:08.645122 + +""" + +# revision identifiers, used by Alembic. +revision = '17fd1b237aa3' +down_revision = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table('rated_data_frames', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('begin', sa.DateTime(), nullable=False), + sa.Column('end', sa.DateTime(), nullable=False), + sa.Column('unit', sa.String(length=255), nullable=False), + sa.Column('qty', sa.Numeric(), nullable=False), + sa.Column('res_type', sa.String(length=255), nullable=False), + sa.Column('rate', sa.Float(), nullable=False), + sa.Column('desc', sa.Text(), nullable=False), + sa.PrimaryKeyConstraint('id'), + mysql_charset='utf8', + mysql_engine='InnoDB' + ) + + +def downgrade(): + op.drop_table('rated_data_frames') + op.drop_table('storage_sqlalchemy_alembic') diff --git a/cloudkitty/storage/sqlalchemy/migration.py b/cloudkitty/storage/sqlalchemy/migration.py new file mode 100644 index 00000000..a6e16056 --- /dev/null +++ b/cloudkitty/storage/sqlalchemy/migration.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +import os + +from cloudkitty.common.db.alembic import migration + +ALEMBIC_REPO = os.path.join(os.path.dirname(__file__), 'alembic') + + +def upgrade(revision): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.upgrade(config, revision) + + +def downgrade(revision): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.downgrade(config, revision) + + +def version(): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.version(config) + + +def revision(message, autogenerate): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.revision(config, message, autogenerate) + + +def stamp(revision): + config = migration.load_alembic_config(ALEMBIC_REPO) + return migration.stamp(config, revision) diff --git a/cloudkitty/storage/sqlalchemy/models.py b/cloudkitty/storage/sqlalchemy/models.py new file mode 100644 index 00000000..e8dc137c --- /dev/null +++ b/cloudkitty/storage/sqlalchemy/models.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Stéphane Albert +# +import json + +from oslo.db.sqlalchemy import models +import sqlalchemy +from sqlalchemy.ext import declarative + +Base = declarative.declarative_base() + + +class RatedDataFrame(Base, models.ModelBase): + """A rated data frame. + + """ + __table_args__ = {'mysql_charset': "utf8", + 'mysql_engine': "InnoDB"} + __tablename__ = 'rated_data_frames' + + id = sqlalchemy.Column(sqlalchemy.Integer, + primary_key=True) + begin = sqlalchemy.Column(sqlalchemy.DateTime, + nullable=False) + end = sqlalchemy.Column(sqlalchemy.DateTime, + nullable=False) + unit = sqlalchemy.Column(sqlalchemy.String(255), + nullable=False) + qty = sqlalchemy.Column(sqlalchemy.Numeric(), + nullable=False) + res_type = sqlalchemy.Column(sqlalchemy.String(255), + nullable=False) + rate = sqlalchemy.Column(sqlalchemy.Float(), + nullable=False) + desc = sqlalchemy.Column(sqlalchemy.Text(), + nullable=False) + + def to_cloudkitty(self): + rating_dict = {} + rating_dict['price'] = self.rate + vol_dict = {} + vol_dict['qty'] = self.qty + vol_dict['unit'] = self.unit + res_dict = {} + res_dict['billing'] = rating_dict + res_dict['desc'] = json.loads(self.desc) + res_dict['vol'] = vol_dict + ck_dict = {} + ck_dict[self.res_type] = [res_dict] + return ck_dict diff --git a/etc/cloudkitty/cloudkitty.conf.sample b/etc/cloudkitty/cloudkitty.conf.sample index 9bcb09ae..c9064618 100644 --- a/etc/cloudkitty/cloudkitty.conf.sample +++ b/etc/cloudkitty/cloudkitty.conf.sample @@ -361,6 +361,10 @@ # Billing period in seconds. (integer value) #period=3600 +# Wait for N periods before collecting new data. (integer +# value) +#wait_periods=2 + # Services to monitor. (list value) #services=compute @@ -529,3 +533,13 @@ #basepath=/var/lib/cloudkitty/states/ +[storage] + +# +# Options defined in cloudkitty.storage +# + +# Name of the storage backend driver. (string value) +#backend=sqlalchemy + + diff --git a/setup.cfg b/setup.cfg index d47bfbcd..6fc5f010 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,6 +23,7 @@ console_scripts = cloudkitty-api = cloudkitty.cli.api:main cloudkitty-dbsync = cloudkitty.cli.dbsync:main cloudkitty-processor = cloudkitty.cli.processor:main + cloudkitty-storage-init = cloudkitty.cli.storage:main cloudkitty.collector.backends = ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector @@ -36,6 +37,9 @@ cloudkitty.billing.processors = noop = cloudkitty.billing.noop:Noop hashmap = cloudkitty.billing.hash:BasicHashMap +cloudkitty.storage.backends = + sqlalchemy = cloudkitty.storage.sqlalchemy:SQLAlchemyStorage + cloudkitty.output.writers = osrf = cloudkitty.writer.osrf:OSRFBackend