From 1f67217cdecd96e08097de952af2b7fa77b4a1a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Albert?= Date: Tue, 25 Nov 2014 12:14:00 +0100 Subject: [PATCH] Added multi-tenancy support Change-Id: I89e75d4dd36410ab7e24f81261ad8703dab11297 --- cloudkitty/api/controllers/v1.py | 27 ++- cloudkitty/billing/__init__.py | 11 +- cloudkitty/billing/hash/__init__.py | 3 +- cloudkitty/billing/noop.py | 4 +- cloudkitty/cli/writer.py | 81 ++++++- cloudkitty/collector/ceilometer.py | 3 +- cloudkitty/db/__init__.py | 2 +- cloudkitty/orchestrator.py | 214 ++++++++++++------ cloudkitty/storage/__init__.py | 63 ++++-- cloudkitty/storage/sqlalchemy/__init__.py | 99 +++++--- .../792b438b663_added_tenant_informations.py | 27 +++ cloudkitty/storage/sqlalchemy/models.py | 22 +- cloudkitty/write_orchestrator.py | 11 +- cloudkitty/writer/__init__.py | 6 +- cloudkitty/writer/osrf.py | 2 +- requirements.txt | 2 +- 16 files changed, 408 insertions(+), 169 deletions(-) create mode 100644 cloudkitty/storage/sqlalchemy/alembic/versions/792b438b663_added_tenant_informations.py diff --git a/cloudkitty/api/controllers/v1.py b/cloudkitty/api/controllers/v1.py index 2f7baf3b..fe36ba78 100644 --- a/cloudkitty/api/controllers/v1.py +++ b/cloudkitty/api/controllers/v1.py @@ -15,6 +15,8 @@ # # @author: Stéphane Albert # +import datetime + from oslo.config import cfg import pecan from pecan import rest @@ -242,19 +244,34 @@ class ReportController(rest.RestController): """ _custom_actions = { - 'total': ['GET'] + 'total': ['GET'], + 'tenants': ['GET'] } - @wsme_pecan.wsexpose(float) - def total(self): - """Return the amount to pay for the current month. + @wsme_pecan.wsexpose([wtypes.text], + datetime.datetime, + datetime.datetime) + def tenants(self, begin=None, end=None): + """Return the list of rated tenants. + + """ + storage = pecan.request.storage_backend + tenants = storage.get_tenants(begin, end) + return tenants + + @wsme_pecan.wsexpose(float, + datetime.datetime, + datetime.datetime, + wtypes.text) + def total(self, begin=None, end=None, tenant_id=None): + """Return the amount to pay for a given period. """ storage = pecan.request.storage_backend # FIXME(sheeprine): We should filter on user id. # Use keystone token information by default but make it overridable and # enforce it by policy engine - total = storage.get_total() + total = storage.get_total(begin, end, tenant_id) return total diff --git a/cloudkitty/billing/__init__.py b/cloudkitty/billing/__init__.py index 777d783b..556d093e 100644 --- a/cloudkitty/billing/__init__.py +++ b/cloudkitty/billing/__init__.py @@ -131,10 +131,11 @@ class BillingController(rest.RestController): """ - config = BillingConfigController() - enabled = BillingEnableController() - def __init__(self): + if not hasattr(self, 'config'): + self.config = BillingConfigController() + if not hasattr(self, 'enabled'): + self.enabled = BillingEnableController() if hasattr(self, 'module_name'): self.config.module_name = self.module_name self.enabled.module_name = self.module_name @@ -159,8 +160,8 @@ class BillingProcessorBase(object): controller = BillingController - def __init__(self): - pass + def __init__(self, tenant_id=None): + self._tenant_id = tenant_id @abc.abstractproperty def enabled(self): diff --git a/cloudkitty/billing/hash/__init__.py b/cloudkitty/billing/hash/__init__.py index 38fb1a08..b5e96777 100644 --- a/cloudkitty/billing/hash/__init__.py +++ b/cloudkitty/billing/hash/__init__.py @@ -227,7 +227,8 @@ class BasicHashMap(billing.BillingProcessorBase): controller = BasicHashMapController db_api = api.get_instance() - def __init__(self): + def __init__(self, tenant_id=None): + super(BasicHashMap, self).__init__(tenant_id) self._billing_info = {} self._load_billing_rates() diff --git a/cloudkitty/billing/noop.py b/cloudkitty/billing/noop.py index 7d491658..7575c31c 100644 --- a/cloudkitty/billing/noop.py +++ b/cloudkitty/billing/noop.py @@ -37,8 +37,8 @@ class Noop(billing.BillingProcessorBase): controller = NoopController - def __init__(self): - pass + def __init__(self, tenant_id=None): + super(Noop, self).__init__(tenant_id) @property def enabled(self): diff --git a/cloudkitty/cli/writer.py b/cloudkitty/cli/writer.py index a7957a94..ef61a56e 100644 --- a/cloudkitty/cli/writer.py +++ b/cloudkitty/cli/writer.py @@ -15,6 +15,8 @@ # # @author: Stéphane Albert # +from __future__ import print_function + from oslo.config import cfg from stevedore import driver @@ -44,14 +46,75 @@ def load_output_backend(): return backend +class DBCommand(object): + + def __init__(self): + self._storage = None + self._output = None + self._load_storage_backend() + self._load_output_backend() + + def _load_storage_backend(self): + storage_args = {'period': CONF.collect.period} + CONF.import_opt('backend', 'cloudkitty.storage', 'storage') + backend = driver.DriverManager( + STORAGES_NAMESPACE, + CONF.storage.backend, + invoke_on_load=True, + invoke_kwds=storage_args).driver + self._storage = backend + + def _load_output_backend(self): + CONF.import_opt('backend', 'cloudkitty.config', 'output') + backend = i_utils.import_class(CONF.output.backend) + self._output = backend + + def generate(self): + if not CONF.command.tenant: + tenants = self._storage.get_tenants(CONF.command.begin, + CONF.command.end) + else: + tenants = [CONF.command.tenant] + for tenant in tenants: + wo = write_orchestrator.WriteOrchestrator(self._output, + tenant, + self._storage) + wo.init_writing_pipeline() + if not CONF.command.begin: + wo.restart_month() + wo.process() + + def tenants_list(self): + tenants = self._storage.get_tenants(CONF.command.begin, + CONF.command.end) + print('Tenant list:') + for tenant in tenants: + print(tenant) + + +def add_command_parsers(subparsers): + command_object = DBCommand() + + parser = subparsers.add_parser('generate') + parser.set_defaults(func=command_object.generate) + parser.add_argument('--tenant', nargs='?') + parser.add_argument('--begin', nargs='?') + parser.add_argument('--end', nargs='?') + + parser = subparsers.add_parser('tenants_list') + parser.set_defaults(func=command_object.tenants_list) + parser.add_argument('--begin', nargs='?') + parser.add_argument('--end', nargs='?') + + +command_opt = cfg.SubCommandOpt('command', + title='Command', + help='Available commands', + handler=add_command_parsers) + +CONF.register_cli_opt(command_opt) + + def main(): service.prepare_service() - output_backend = load_output_backend() - storage_backend = load_storage_backend() - - wo = write_orchestrator.WriteOrchestrator(output_backend, - 'writer', - storage_backend) - wo.init_writing_pipeline() - wo.restart_month() - wo.process() + CONF.command.func() diff --git a/cloudkitty/collector/ceilometer.py b/cloudkitty/collector/ceilometer.py index dea995f4..80afbc1a 100644 --- a/cloudkitty/collector/ceilometer.py +++ b/cloudkitty/collector/ceilometer.py @@ -94,7 +94,8 @@ class CeilometerCollector(collector.BaseCollector): self._cacher = CeilometerResourceCacher() - self._conn = cclient.get_client('2', os_username=self.user, + self._conn = cclient.get_client('2', + os_username=self.user, os_password=self.password, os_auth_url=self.keystone_url, os_tenant_name=self.tenant, diff --git a/cloudkitty/db/__init__.py b/cloudkitty/db/__init__.py index 38947a7a..4dd472d4 100644 --- a/cloudkitty/db/__init__.py +++ b/cloudkitty/db/__init__.py @@ -24,7 +24,7 @@ _FACADE = None def _create_facade_lazily(): global _FACADE if _FACADE is None: - _FACADE = session.EngineFacade.from_config(cfg.CONF) + _FACADE = session.EngineFacade.from_config(cfg.CONF, sqlite_fk=True) return _FACADE diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 336ee06c..3fbe7787 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -16,6 +16,8 @@ # # @author: Stéphane Albert # +import decimal + import eventlet from keystoneclient.v2_0 import client as kclient from oslo.config import cfg @@ -67,7 +69,8 @@ class BillingEndpoint(object): def quote(self, ctxt, res_data): LOG.debug('Received quote from RPC.') - return self._orchestrator.process_quote(res_data) + worker = APIWorker() + return worker.quote(res_data) def reload_module(self, ctxt, name): LOG.info('Received reload command for module {}.'.format(name)) @@ -91,13 +94,112 @@ class BillingEndpoint(object): self._pending_reload.remove(name) +class BaseWorker(object): + def __init__(self, tenant_id=None): + self._tenant_id = tenant_id + + # Billing processors + self._processors = {} + self._load_billing_processors() + + def _load_billing_processors(self): + self._processors = {} + processors = extension_manager.EnabledExtensionManager( + PROCESSORS_NAMESPACE, + invoke_kwds={'tenant_id': self._tenant_id} + ) + + for processor in processors: + b_name = processor.name + b_obj = processor.obj + self._processors[b_name] = b_obj + + +class APIWorker(BaseWorker): + def __init__(self, tenant_id=None): + super(APIWorker, self).__init__(tenant_id) + + def quote(self, res_data): + for processor in self._processors.values(): + processor.process(res_data) + + price = decimal.Decimal(0) + for res in res_data: + for res_usage in res['usage'].values(): + for data in res_usage: + price += data.get('billing', {}).get('price', 0.0) + return price + + +class Worker(BaseWorker): + def __init__(self, collector, storage, tenant_id=None): + self._collector = collector + self._storage = storage + + self._period = CONF.collect.period + self._wait_time = CONF.collect.wait_periods * CONF.collect.period + + super(Worker, self).__init__(tenant_id) + + def _collect(self, service, start_timestamp): + next_timestamp = start_timestamp + CONF.collect.period + raw_data = self._collector.retrieve(service, + start_timestamp, + next_timestamp, + self._tenant_id) + + timed_data = [{'period': {'begin': start_timestamp, + 'end': next_timestamp}, + 'usage': raw_data}] + return timed_data + + def check_state(self): + timestamp = self._storage.get_state(self._tenant_id) + if not timestamp: + month_start = ck_utils.get_month_start() + return ck_utils.dt2ts(month_start) + + now = ck_utils.utcnow_ts() + next_timestamp = timestamp + self._period + if next_timestamp + self._wait_time < now: + return next_timestamp + return 0 + + def run(self): + while True: + timestamp = self.check_state() + if not timestamp: + break + + for service in CONF.collect.services: + data = self._collect(service, timestamp) + + # Billing + for processor in self._processors.values(): + processor.process(data) + + # Writing + self._storage.append(data, self._tenant_id) + + # We're getting a full period so we directly commit + self._storage.commit(self._tenant_id) + + class Orchestrator(object): def __init__(self): - self.admin_ks = kclient.Client(username=CONF.auth.username, - password=CONF.auth.password, - tenant_name=CONF.auth.tenant, - region_name=CONF.auth.region, - auth_url=CONF.auth.url) + # Load credentials informations + self.user = CONF.auth.username + self.password = CONF.auth.password + self.tenant = CONF.auth.tenant + self.region = CONF.auth.region + self.keystone_url = CONF.auth.url + + # Initialize keystone admin session + self.admin_ks = kclient.Client(username=self.user, + password=self.password, + tenant_name=self.tenant, + region_name=self.region, + auth_url=self.keystone_url) # Transformers self.transformers = {} @@ -119,15 +221,25 @@ class Orchestrator(object): invoke_on_load=True, invoke_kwds=storage_args).driver - # Billing processors - self.b_processors = {} - self._load_billing_processors() - # RPC self.server = None self._billing_endpoint = BillingEndpoint(self) self._init_messaging() + def _load_tenant_list(self): + ks = kclient.Client(username=self.user, + password=self.password, + auth_url=self.keystone_url, + region_name=self.region) + tenant_list = ks.tenants.list() + self._tenants = [] + for tenant in tenant_list: + roles = self.admin_ks.roles.roles_for_user(self.admin_ks.user_id, + tenant) + for role in roles: + if role.name == 'rating': + self._tenants.append(tenant) + def _init_messaging(self): target = messaging.Target(topic='cloudkitty', server=CONF.host, @@ -138,8 +250,8 @@ class Orchestrator(object): self.server = rpc.get_server(target, endpoints) self.server.start() - def _check_state(self): - timestamp = self.storage.get_state() + def _check_state(self, tenant_id): + timestamp = self.storage.get_state(tenant_id) if not timestamp: month_start = ck_utils.get_month_start() return ck_utils.dt2ts(month_start) @@ -173,72 +285,28 @@ class Orchestrator(object): t_obj = transformer.obj self.transformers[t_name] = t_obj - def _load_billing_processors(self): - self.b_processors = {} - processors = extension_manager.EnabledExtensionManager( - PROCESSORS_NAMESPACE, - ) - - for processor in processors: - b_name = processor.name - b_obj = processor.obj - self.b_processors[b_name] = b_obj - - def process_quote(self, res_data): - for processor in self.b_processors.values(): - processor.process(res_data) - - price = 0.0 - for res in res_data: - for res_usage in res['usage'].values(): - for data in res_usage: - price += data.get('billing', {}).get('price', 0.0) - return price - def process_messages(self): - pending_reload = self._billing_endpoint.get_reload_list() - pending_states = self._billing_endpoint.get_module_state() - for name in pending_reload: - if name in self.b_processors: - if name in self.b_processors.keys(): - LOG.info('Reloading configuration of {} module.'.format( - name)) - self.b_processors[name].reload_config() - else: - LOG.info('Tried to reload a disabled module: {}.'.format( - name)) - for name, status in pending_states.items(): - if name in self.b_processors and not status: - LOG.info('Disabling {} module.'.format(name)) - self.b_processors.pop(name) - else: - LOG.info('Enabling {} module.'.format(name)) - processors = extension_manager.EnabledExtensionManager( - PROCESSORS_NAMESPACE) - for processor in processors: - if processor.name == name: - self.b_processors[name] = processor + # TODO(sheeprine): Code kept to handle threading and asynchronous + # reloading + # pending_reload = self._billing_endpoint.get_reload_list() + # pending_states = self._billing_endpoint.get_module_state() + pass def process(self): while True: self.process_messages() - timestamp = self._check_state() - if not timestamp: - eventlet.sleep(CONF.collect.period) - continue - - for service in CONF.collect.services: - data = self._collect(service, timestamp) - - # Billing - for processor in self.b_processors.values(): - processor.process(data) - - # Writing - self.storage.append(data) - - # We're getting a full period so we directly commit - self.storage.commit() + self._load_tenant_list() + while len(self._tenants): + for tenant in self._tenants: + if not self._check_state(tenant.id): + self._tenants.remove(tenant) + else: + worker = Worker(self.collector, + self.storage, + tenant.id) + worker.run() + # FIXME(sheeprine): We may cause a drift here + eventlet.sleep(CONF.collect.period) def terminate(self): pass diff --git a/cloudkitty/storage/__init__.py b/cloudkitty/storage/__init__.py index d1e15713..c3ba1a2e 100644 --- a/cloudkitty/storage/__init__.py +++ b/cloudkitty/storage/__init__.py @@ -62,10 +62,10 @@ class BaseStorage(object): self._period = period # State vars - self.usage_start = None - self.usage_start_dt = None - self.usage_end = None - self.usage_end_dt = None + self.usage_start = {} + self.usage_start_dt = {} + self.usage_end = {} + self.usage_end_dt = {} @staticmethod def init(): @@ -93,38 +93,45 @@ class BaseStorage(object): if candidate_ts: return candidate_ts, json_data.pop(candidate_idx)['usage'] - def _pre_commit(self): + def _pre_commit(self, tenant_id): """Called before every commit. """ @abc.abstractmethod - def _commit(self): + def _commit(self, tenant_id): """Push data to the storage backend. """ - def _post_commit(self): + def _post_commit(self, tenant_id): """Called after every commit. """ @abc.abstractmethod - def _dispatch(self, data): + def _dispatch(self, data, tenant_id): """Process rated data. :param data: The rated data frames. """ @abc.abstractmethod - def get_state(self): + def get_state(self, tenant_id=None): """Return the last written frame's timestamp. + :param tenant_id: Tenant ID to filter on. + """ + + @abc.abstractmethod + def get_total(self, tenant_id=None): + """Return the current total. + """ @abc.abstractmethod - def get_total(self): - """Return the current total. + def get_tenants(self, begin=None, end=None): + """Return the list of rated tenants. """ @@ -138,31 +145,37 @@ class BaseStorage(object): :type end: datetime.datetime :param res_type: (Optional) Filter on the resource type. :type res_type: str + :param tenant_id: (Optional) Filter on the tenant_id. + :type res_type: str """ - def append(self, raw_data): + def append(self, raw_data, tenant_id): """Append rated data before committing them to the backend. :param raw_data: The rated data frames. + :param tenant_id: Tenant the frame is belonging. """ while raw_data: usage_start, data = self._filter_period(raw_data) - if self.usage_end is not None and usage_start >= self.usage_end: - self.commit() - self.usage_start = None + usage_end = self.usage_end.get(tenant_id) + if usage_end is not None and usage_start >= usage_end: + self.commit(tenant_id) + self.usage_start.pop(tenant_id) - if self.usage_start is None: - self.usage_start = usage_start - self.usage_end = usage_start + self._period - self.usage_start_dt = ck_utils.ts2dt(self.usage_start) - self.usage_end_dt = ck_utils.ts2dt(self.usage_end) + if self.usage_start.get(tenant_id) is None: + self.usage_start[tenant_id] = usage_start + self.usage_end[tenant_id] = usage_start + self._period + self.usage_start_dt[tenant_id] = ck_utils.ts2dt( + self.usage_start.get(tenant_id)) + self.usage_end_dt[tenant_id] = ck_utils.ts2dt( + self.usage_end.get(tenant_id)) - self._dispatch(data) + self._dispatch(data, tenant_id) - def commit(self): + def commit(self, tenant_id): """Commit the changes to the backend. """ - self._pre_commit() - self._commit() - self._post_commit() + self._pre_commit(tenant_id) + self._commit(tenant_id) + self._post_commit(tenant_id) diff --git a/cloudkitty/storage/sqlalchemy/__init__.py b/cloudkitty/storage/sqlalchemy/__init__.py index ee641b49..b3544162 100644 --- a/cloudkitty/storage/sqlalchemy/__init__.py +++ b/cloudkitty/storage/sqlalchemy/__init__.py @@ -33,69 +33,97 @@ class SQLAlchemyStorage(storage.BaseStorage): """ def __init__(self, period=3600): super(SQLAlchemyStorage, self).__init__(period) - self._session = None + self._session = {} @staticmethod def init(): migration.upgrade('head') - def _commit(self): - self._session.commit() - self._session.begin() + def _commit(self, tenant_id): + self._session[tenant_id].commit() + self._session[tenant_id].begin() - def _dispatch(self, data): + def _dispatch(self, data, tenant_id): for service in data: for frame in data[service]: - self._append_time_frame(service, frame) + self._append_time_frame(service, frame, tenant_id) # HACK(adriant) Quick hack to allow billing windows to # progress. This check/insert probably ought to be moved # somewhere else. if not data[service]: empty_frame = {'vol': {'qty': 0, 'unit': 'None'}, 'billing': {'price': 0}, 'desc': ''} - self._append_time_frame(service, empty_frame) + self._append_time_frame(service, empty_frame, tenant_id) - def append(self, raw_data): - if not self._session: - self._session = db.get_session() - self._session.begin() - super(SQLAlchemyStorage, self).append(raw_data) + def append(self, raw_data, tenant_id): + session = self._session.get(tenant_id) + if not session: + self._session[tenant_id] = db.get_session() + self._session[tenant_id].begin() + super(SQLAlchemyStorage, self).append(raw_data, tenant_id) - def get_state(self): + def get_state(self, tenant_id=None): session = db.get_session() - r = utils.model_query( + q = utils.model_query( models.RatedDataFrame, session - ).order_by( + ) + if tenant_id: + q = q.filter( + models.RatedDataFrame.tenant_id == tenant_id + ) + r = q.order_by( models.RatedDataFrame.begin.desc() ).first() if r: return ck_utils.dt2ts(r.begin) - def get_total(self): + def get_total(self, begin=None, end=None, tenant_id=None): model = models.RatedDataFrame # Boundary calculation - month_start = ck_utils.get_month_start() - month_end = ck_utils.get_next_month() + if not begin: + begin = ck_utils.get_month_start() + if not end: + end = ck_utils.get_next_month() session = db.get_session() - rate = session.query( + q = session.query( sqlalchemy.func.sum(model.rate).label('rate') - ).filter( - model.begin >= month_start, - model.end <= month_end + ) + if tenant_id: + q = q.filter( + models.RatedDataFrame.tenant_id == tenant_id + ) + rate = q.filter( + model.begin >= begin, + model.end <= end ).scalar() return rate - def get_time_frame(self, begin, end, **filters): - """Return a list of time frames. + def get_tenants(self, begin=None, end=None): + model = models.RatedDataFrame - :param start: Filter from `start`. - :param end: Filter to `end`. - :param unit: Filter on an unit type. - :param res_type: Filter on a resource type. - """ + # Boundary calculation + if not begin: + begin = ck_utils.get_month_start() + if not end: + end = ck_utils.get_next_month() + + session = db.get_session() + q = utils.model_query( + model, + session + ).filter( + model.begin >= begin, + model.end <= end + ) + tenants = q.distinct().values( + model.tenant_id + ) + return [tenant.tenant_id for tenant in tenants] + + def get_time_frame(self, begin, end, **filters): model = models.RatedDataFrame session = db.get_session() q = utils.model_query( @@ -112,30 +140,33 @@ class SQLAlchemyStorage(storage.BaseStorage): raise storage.NoTimeFrame() return [entry.to_cloudkitty() for entry in r] - def _append_time_frame(self, res_type, frame): + def _append_time_frame(self, res_type, frame, tenant_id): vol_dict = frame['vol'] qty = vol_dict['qty'] unit = vol_dict['unit'] rating_dict = frame['billing'] rate = rating_dict['price'] desc = json.dumps(frame['desc']) - self.add_time_frame(self.usage_start_dt, - self.usage_end_dt, + self.add_time_frame(self.usage_start_dt.get(tenant_id), + self.usage_end_dt.get(tenant_id), + tenant_id, unit, qty, res_type, rate, desc) - def add_time_frame(self, begin, end, unit, qty, res_type, rate, desc): + def add_time_frame(self, begin, end, tenant_id, unit, qty, res_type, + rate, desc): """Create a new time frame. """ frame = models.RatedDataFrame(begin=begin, end=end, + tenant_id=tenant_id, unit=unit, qty=qty, res_type=res_type, rate=rate, desc=desc) - self._session.add(frame) + self._session[tenant_id].add(frame) diff --git a/cloudkitty/storage/sqlalchemy/alembic/versions/792b438b663_added_tenant_informations.py b/cloudkitty/storage/sqlalchemy/alembic/versions/792b438b663_added_tenant_informations.py new file mode 100644 index 00000000..a438d816 --- /dev/null +++ b/cloudkitty/storage/sqlalchemy/alembic/versions/792b438b663_added_tenant_informations.py @@ -0,0 +1,27 @@ +"""added tenant informations + +Revision ID: 792b438b663 +Revises: 17fd1b237aa3 +Create Date: 2014-12-02 13:12:11.328534 + +""" + +# revision identifiers, used by Alembic. +revision = '792b438b663' +down_revision = '17fd1b237aa3' + +from alembic import op +import sqlalchemy as sa + +from cloudkitty.storage.sqlalchemy import models + + +def upgrade(): + op.add_column('rated_data_frames', + sa.Column('tenant_id', + sa.String(length=32), + nullable=True)) + + +def downgrade(): + op.drop_column('rated_data_frames', 'tenant_id') diff --git a/cloudkitty/storage/sqlalchemy/models.py b/cloudkitty/storage/sqlalchemy/models.py index 5023fc3a..98d182c1 100644 --- a/cloudkitty/storage/sqlalchemy/models.py +++ b/cloudkitty/storage/sqlalchemy/models.py @@ -21,6 +21,8 @@ from oslo.db.sqlalchemy import models import sqlalchemy from sqlalchemy.ext import declarative +from cloudkitty import utils as ck_utils + Base = declarative.declarative_base() @@ -34,6 +36,8 @@ class RatedDataFrame(Base, models.ModelBase): id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + tenant_id = sqlalchemy.Column(sqlalchemy.String(32), + nullable=True) begin = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False) end = sqlalchemy.Column(sqlalchemy.DateTime, @@ -50,20 +54,32 @@ class RatedDataFrame(Base, models.ModelBase): nullable=False) def to_cloudkitty(self): - period_dict = {} - period_dict['begin'] = self.begin.isoformat() - period_dict['end'] = self.end.isoformat() + # Rating informations rating_dict = {} rating_dict['price'] = self.rate + + # Volume informations vol_dict = {} vol_dict['qty'] = self.qty vol_dict['unit'] = self.unit res_dict = {} + + # Encapsulate informations in a resource dict res_dict['billing'] = rating_dict res_dict['desc'] = json.loads(self.desc) res_dict['vol'] = vol_dict + res_dict['tenant_id'] = self.tenant_id + + # Add resource to the usage dict usage_dict = {} usage_dict[self.res_type] = [res_dict] + + # Time informations + period_dict = {} + period_dict['begin'] = ck_utils.dt2iso(self.begin) + period_dict['end'] = ck_utils.dt2iso(self.end) + + # Add period to the resource informations ck_dict = {} ck_dict['period'] = period_dict ck_dict['usage'] = usage_dict diff --git a/cloudkitty/write_orchestrator.py b/cloudkitty/write_orchestrator.py index 07998a28..05f76356 100644 --- a/cloudkitty/write_orchestrator.py +++ b/cloudkitty/write_orchestrator.py @@ -34,16 +34,16 @@ class WriteOrchestrator(object): """ def __init__(self, backend, - user_id, + tenant_id, storage, basepath=None, period=3600): self._backend = backend - self._uid = user_id + self._tenant_id = tenant_id self._storage = storage self._basepath = basepath self._period = period - self._sm = state.DBStateManager(self._uid, + self._sm = state.DBStateManager(self._tenant_id, 'writer_status') self._write_pipeline = [] @@ -64,7 +64,7 @@ class WriteOrchestrator(object): def add_writer(self, writer_class): writer = writer_class(self, - self._uid, + self._tenant_id, self._backend, self._basepath) self._write_pipeline.append(writer) @@ -97,7 +97,8 @@ class WriteOrchestrator(object): timeframe_end = timeframe + self._period try: data = self._storage.get_time_frame(timeframe, - timeframe_end) + timeframe_end, + tenant_id=self._tenant_id) except storage.NoTimeFrame: return None return data diff --git a/cloudkitty/writer/__init__.py b/cloudkitty/writer/__init__.py index aef0c083..6e2c0307 100644 --- a/cloudkitty/writer/__init__.py +++ b/cloudkitty/writer/__init__.py @@ -28,11 +28,11 @@ class BaseReportWriter(object): """Base report writer.""" report_type = None - def __init__(self, write_orchestrator, user_id, backend, basepath=None): + def __init__(self, write_orchestrator, tenant_id, backend, basepath=None): self._write_orchestrator = write_orchestrator self._backend = backend - self._uid = user_id - self._sm = state.DBStateManager(self._uid, + self._tenant_id = tenant_id + self._sm = state.DBStateManager(self._tenant_id, self.report_type) self._report = None self._period = 3600 diff --git a/cloudkitty/writer/osrf.py b/cloudkitty/writer/osrf.py index 815cf88f..a96941ca 100644 --- a/cloudkitty/writer/osrf.py +++ b/cloudkitty/writer/osrf.py @@ -37,7 +37,7 @@ class OSRFBackend(writer.BaseReportWriter): report_type = 'osrf' def _gen_filename(self, timeframe): - filename = '{}-osrf-{}-{:02d}.json'.format(self._uid, + filename = '{}-osrf-{}-{:02d}.json'.format(self._tenant_id, timeframe.year, timeframe.month) if self._basepath: diff --git a/requirements.txt b/requirements.txt index 4d88d763..1cda86a6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ python-keystoneclient iso8601 PasteDeploy==1.5.2 posix_ipc -pecan==0.5.0 +pecan>=0.8.0 WSME>=0.6,!=0.6.2 oslo.config>=1.2.0 oslo.messaging<1.6.0