Added multi-tenancy support

Change-Id: I89e75d4dd36410ab7e24f81261ad8703dab11297
This commit is contained in:
Stéphane Albert 2014-11-25 12:14:00 +01:00
parent 32d83e3614
commit 1f67217cde
16 changed files with 408 additions and 169 deletions

View File

@ -15,6 +15,8 @@
#
# @author: Stéphane Albert
#
import datetime
from oslo.config import cfg
import pecan
from pecan import rest
@ -242,19 +244,34 @@ class ReportController(rest.RestController):
"""
_custom_actions = {
'total': ['GET']
'total': ['GET'],
'tenants': ['GET']
}
@wsme_pecan.wsexpose(float)
def total(self):
"""Return the amount to pay for the current month.
@wsme_pecan.wsexpose([wtypes.text],
datetime.datetime,
datetime.datetime)
def tenants(self, begin=None, end=None):
"""Return the list of rated tenants.
"""
storage = pecan.request.storage_backend
tenants = storage.get_tenants(begin, end)
return tenants
@wsme_pecan.wsexpose(float,
datetime.datetime,
datetime.datetime,
wtypes.text)
def total(self, begin=None, end=None, tenant_id=None):
"""Return the amount to pay for a given period.
"""
storage = pecan.request.storage_backend
# FIXME(sheeprine): We should filter on user id.
# Use keystone token information by default but make it overridable and
# enforce it by policy engine
total = storage.get_total()
total = storage.get_total(begin, end, tenant_id)
return total

View File

@ -131,10 +131,11 @@ class BillingController(rest.RestController):
"""
config = BillingConfigController()
enabled = BillingEnableController()
def __init__(self):
if not hasattr(self, 'config'):
self.config = BillingConfigController()
if not hasattr(self, 'enabled'):
self.enabled = BillingEnableController()
if hasattr(self, 'module_name'):
self.config.module_name = self.module_name
self.enabled.module_name = self.module_name
@ -159,8 +160,8 @@ class BillingProcessorBase(object):
controller = BillingController
def __init__(self):
pass
def __init__(self, tenant_id=None):
self._tenant_id = tenant_id
@abc.abstractproperty
def enabled(self):

View File

@ -227,7 +227,8 @@ class BasicHashMap(billing.BillingProcessorBase):
controller = BasicHashMapController
db_api = api.get_instance()
def __init__(self):
def __init__(self, tenant_id=None):
super(BasicHashMap, self).__init__(tenant_id)
self._billing_info = {}
self._load_billing_rates()

View File

@ -37,8 +37,8 @@ class Noop(billing.BillingProcessorBase):
controller = NoopController
def __init__(self):
pass
def __init__(self, tenant_id=None):
super(Noop, self).__init__(tenant_id)
@property
def enabled(self):

View File

@ -15,6 +15,8 @@
#
# @author: Stéphane Albert
#
from __future__ import print_function
from oslo.config import cfg
from stevedore import driver
@ -44,14 +46,75 @@ def load_output_backend():
return backend
class DBCommand(object):
def __init__(self):
self._storage = None
self._output = None
self._load_storage_backend()
self._load_output_backend()
def _load_storage_backend(self):
storage_args = {'period': CONF.collect.period}
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
backend = driver.DriverManager(
STORAGES_NAMESPACE,
CONF.storage.backend,
invoke_on_load=True,
invoke_kwds=storage_args).driver
self._storage = backend
def _load_output_backend(self):
CONF.import_opt('backend', 'cloudkitty.config', 'output')
backend = i_utils.import_class(CONF.output.backend)
self._output = backend
def generate(self):
if not CONF.command.tenant:
tenants = self._storage.get_tenants(CONF.command.begin,
CONF.command.end)
else:
tenants = [CONF.command.tenant]
for tenant in tenants:
wo = write_orchestrator.WriteOrchestrator(self._output,
tenant,
self._storage)
wo.init_writing_pipeline()
if not CONF.command.begin:
wo.restart_month()
wo.process()
def tenants_list(self):
tenants = self._storage.get_tenants(CONF.command.begin,
CONF.command.end)
print('Tenant list:')
for tenant in tenants:
print(tenant)
def add_command_parsers(subparsers):
command_object = DBCommand()
parser = subparsers.add_parser('generate')
parser.set_defaults(func=command_object.generate)
parser.add_argument('--tenant', nargs='?')
parser.add_argument('--begin', nargs='?')
parser.add_argument('--end', nargs='?')
parser = subparsers.add_parser('tenants_list')
parser.set_defaults(func=command_object.tenants_list)
parser.add_argument('--begin', nargs='?')
parser.add_argument('--end', nargs='?')
command_opt = cfg.SubCommandOpt('command',
title='Command',
help='Available commands',
handler=add_command_parsers)
CONF.register_cli_opt(command_opt)
def main():
service.prepare_service()
output_backend = load_output_backend()
storage_backend = load_storage_backend()
wo = write_orchestrator.WriteOrchestrator(output_backend,
'writer',
storage_backend)
wo.init_writing_pipeline()
wo.restart_month()
wo.process()
CONF.command.func()

View File

@ -94,7 +94,8 @@ class CeilometerCollector(collector.BaseCollector):
self._cacher = CeilometerResourceCacher()
self._conn = cclient.get_client('2', os_username=self.user,
self._conn = cclient.get_client('2',
os_username=self.user,
os_password=self.password,
os_auth_url=self.keystone_url,
os_tenant_name=self.tenant,

View File

@ -24,7 +24,7 @@ _FACADE = None
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
_FACADE = session.EngineFacade.from_config(cfg.CONF)
_FACADE = session.EngineFacade.from_config(cfg.CONF, sqlite_fk=True)
return _FACADE

View File

@ -16,6 +16,8 @@
#
# @author: Stéphane Albert
#
import decimal
import eventlet
from keystoneclient.v2_0 import client as kclient
from oslo.config import cfg
@ -67,7 +69,8 @@ class BillingEndpoint(object):
def quote(self, ctxt, res_data):
LOG.debug('Received quote from RPC.')
return self._orchestrator.process_quote(res_data)
worker = APIWorker()
return worker.quote(res_data)
def reload_module(self, ctxt, name):
LOG.info('Received reload command for module {}.'.format(name))
@ -91,13 +94,112 @@ class BillingEndpoint(object):
self._pending_reload.remove(name)
class BaseWorker(object):
def __init__(self, tenant_id=None):
self._tenant_id = tenant_id
# Billing processors
self._processors = {}
self._load_billing_processors()
def _load_billing_processors(self):
self._processors = {}
processors = extension_manager.EnabledExtensionManager(
PROCESSORS_NAMESPACE,
invoke_kwds={'tenant_id': self._tenant_id}
)
for processor in processors:
b_name = processor.name
b_obj = processor.obj
self._processors[b_name] = b_obj
class APIWorker(BaseWorker):
def __init__(self, tenant_id=None):
super(APIWorker, self).__init__(tenant_id)
def quote(self, res_data):
for processor in self._processors.values():
processor.process(res_data)
price = decimal.Decimal(0)
for res in res_data:
for res_usage in res['usage'].values():
for data in res_usage:
price += data.get('billing', {}).get('price', 0.0)
return price
class Worker(BaseWorker):
def __init__(self, collector, storage, tenant_id=None):
self._collector = collector
self._storage = storage
self._period = CONF.collect.period
self._wait_time = CONF.collect.wait_periods * CONF.collect.period
super(Worker, self).__init__(tenant_id)
def _collect(self, service, start_timestamp):
next_timestamp = start_timestamp + CONF.collect.period
raw_data = self._collector.retrieve(service,
start_timestamp,
next_timestamp,
self._tenant_id)
timed_data = [{'period': {'begin': start_timestamp,
'end': next_timestamp},
'usage': raw_data}]
return timed_data
def check_state(self):
timestamp = self._storage.get_state(self._tenant_id)
if not timestamp:
month_start = ck_utils.get_month_start()
return ck_utils.dt2ts(month_start)
now = ck_utils.utcnow_ts()
next_timestamp = timestamp + self._period
if next_timestamp + self._wait_time < now:
return next_timestamp
return 0
def run(self):
while True:
timestamp = self.check_state()
if not timestamp:
break
for service in CONF.collect.services:
data = self._collect(service, timestamp)
# Billing
for processor in self._processors.values():
processor.process(data)
# Writing
self._storage.append(data, self._tenant_id)
# We're getting a full period so we directly commit
self._storage.commit(self._tenant_id)
class Orchestrator(object):
def __init__(self):
self.admin_ks = kclient.Client(username=CONF.auth.username,
password=CONF.auth.password,
tenant_name=CONF.auth.tenant,
region_name=CONF.auth.region,
auth_url=CONF.auth.url)
# Load credentials informations
self.user = CONF.auth.username
self.password = CONF.auth.password
self.tenant = CONF.auth.tenant
self.region = CONF.auth.region
self.keystone_url = CONF.auth.url
# Initialize keystone admin session
self.admin_ks = kclient.Client(username=self.user,
password=self.password,
tenant_name=self.tenant,
region_name=self.region,
auth_url=self.keystone_url)
# Transformers
self.transformers = {}
@ -119,15 +221,25 @@ class Orchestrator(object):
invoke_on_load=True,
invoke_kwds=storage_args).driver
# Billing processors
self.b_processors = {}
self._load_billing_processors()
# RPC
self.server = None
self._billing_endpoint = BillingEndpoint(self)
self._init_messaging()
def _load_tenant_list(self):
ks = kclient.Client(username=self.user,
password=self.password,
auth_url=self.keystone_url,
region_name=self.region)
tenant_list = ks.tenants.list()
self._tenants = []
for tenant in tenant_list:
roles = self.admin_ks.roles.roles_for_user(self.admin_ks.user_id,
tenant)
for role in roles:
if role.name == 'rating':
self._tenants.append(tenant)
def _init_messaging(self):
target = messaging.Target(topic='cloudkitty',
server=CONF.host,
@ -138,8 +250,8 @@ class Orchestrator(object):
self.server = rpc.get_server(target, endpoints)
self.server.start()
def _check_state(self):
timestamp = self.storage.get_state()
def _check_state(self, tenant_id):
timestamp = self.storage.get_state(tenant_id)
if not timestamp:
month_start = ck_utils.get_month_start()
return ck_utils.dt2ts(month_start)
@ -173,72 +285,28 @@ class Orchestrator(object):
t_obj = transformer.obj
self.transformers[t_name] = t_obj
def _load_billing_processors(self):
self.b_processors = {}
processors = extension_manager.EnabledExtensionManager(
PROCESSORS_NAMESPACE,
)
for processor in processors:
b_name = processor.name
b_obj = processor.obj
self.b_processors[b_name] = b_obj
def process_quote(self, res_data):
for processor in self.b_processors.values():
processor.process(res_data)
price = 0.0
for res in res_data:
for res_usage in res['usage'].values():
for data in res_usage:
price += data.get('billing', {}).get('price', 0.0)
return price
def process_messages(self):
pending_reload = self._billing_endpoint.get_reload_list()
pending_states = self._billing_endpoint.get_module_state()
for name in pending_reload:
if name in self.b_processors:
if name in self.b_processors.keys():
LOG.info('Reloading configuration of {} module.'.format(
name))
self.b_processors[name].reload_config()
else:
LOG.info('Tried to reload a disabled module: {}.'.format(
name))
for name, status in pending_states.items():
if name in self.b_processors and not status:
LOG.info('Disabling {} module.'.format(name))
self.b_processors.pop(name)
else:
LOG.info('Enabling {} module.'.format(name))
processors = extension_manager.EnabledExtensionManager(
PROCESSORS_NAMESPACE)
for processor in processors:
if processor.name == name:
self.b_processors[name] = processor
# TODO(sheeprine): Code kept to handle threading and asynchronous
# reloading
# pending_reload = self._billing_endpoint.get_reload_list()
# pending_states = self._billing_endpoint.get_module_state()
pass
def process(self):
while True:
self.process_messages()
timestamp = self._check_state()
if not timestamp:
eventlet.sleep(CONF.collect.period)
continue
for service in CONF.collect.services:
data = self._collect(service, timestamp)
# Billing
for processor in self.b_processors.values():
processor.process(data)
# Writing
self.storage.append(data)
# We're getting a full period so we directly commit
self.storage.commit()
self._load_tenant_list()
while len(self._tenants):
for tenant in self._tenants:
if not self._check_state(tenant.id):
self._tenants.remove(tenant)
else:
worker = Worker(self.collector,
self.storage,
tenant.id)
worker.run()
# FIXME(sheeprine): We may cause a drift here
eventlet.sleep(CONF.collect.period)
def terminate(self):
pass

View File

@ -62,10 +62,10 @@ class BaseStorage(object):
self._period = period
# State vars
self.usage_start = None
self.usage_start_dt = None
self.usage_end = None
self.usage_end_dt = None
self.usage_start = {}
self.usage_start_dt = {}
self.usage_end = {}
self.usage_end_dt = {}
@staticmethod
def init():
@ -93,38 +93,45 @@ class BaseStorage(object):
if candidate_ts:
return candidate_ts, json_data.pop(candidate_idx)['usage']
def _pre_commit(self):
def _pre_commit(self, tenant_id):
"""Called before every commit.
"""
@abc.abstractmethod
def _commit(self):
def _commit(self, tenant_id):
"""Push data to the storage backend.
"""
def _post_commit(self):
def _post_commit(self, tenant_id):
"""Called after every commit.
"""
@abc.abstractmethod
def _dispatch(self, data):
def _dispatch(self, data, tenant_id):
"""Process rated data.
:param data: The rated data frames.
"""
@abc.abstractmethod
def get_state(self):
def get_state(self, tenant_id=None):
"""Return the last written frame's timestamp.
:param tenant_id: Tenant ID to filter on.
"""
@abc.abstractmethod
def get_total(self, tenant_id=None):
"""Return the current total.
"""
@abc.abstractmethod
def get_total(self):
"""Return the current total.
def get_tenants(self, begin=None, end=None):
"""Return the list of rated tenants.
"""
@ -138,31 +145,37 @@ class BaseStorage(object):
:type end: datetime.datetime
:param res_type: (Optional) Filter on the resource type.
:type res_type: str
:param tenant_id: (Optional) Filter on the tenant_id.
:type res_type: str
"""
def append(self, raw_data):
def append(self, raw_data, tenant_id):
"""Append rated data before committing them to the backend.
:param raw_data: The rated data frames.
:param tenant_id: Tenant the frame is belonging.
"""
while raw_data:
usage_start, data = self._filter_period(raw_data)
if self.usage_end is not None and usage_start >= self.usage_end:
self.commit()
self.usage_start = None
usage_end = self.usage_end.get(tenant_id)
if usage_end is not None and usage_start >= usage_end:
self.commit(tenant_id)
self.usage_start.pop(tenant_id)
if self.usage_start is None:
self.usage_start = usage_start
self.usage_end = usage_start + self._period
self.usage_start_dt = ck_utils.ts2dt(self.usage_start)
self.usage_end_dt = ck_utils.ts2dt(self.usage_end)
if self.usage_start.get(tenant_id) is None:
self.usage_start[tenant_id] = usage_start
self.usage_end[tenant_id] = usage_start + self._period
self.usage_start_dt[tenant_id] = ck_utils.ts2dt(
self.usage_start.get(tenant_id))
self.usage_end_dt[tenant_id] = ck_utils.ts2dt(
self.usage_end.get(tenant_id))
self._dispatch(data)
self._dispatch(data, tenant_id)
def commit(self):
def commit(self, tenant_id):
"""Commit the changes to the backend.
"""
self._pre_commit()
self._commit()
self._post_commit()
self._pre_commit(tenant_id)
self._commit(tenant_id)
self._post_commit(tenant_id)

View File

@ -33,69 +33,97 @@ class SQLAlchemyStorage(storage.BaseStorage):
"""
def __init__(self, period=3600):
super(SQLAlchemyStorage, self).__init__(period)
self._session = None
self._session = {}
@staticmethod
def init():
migration.upgrade('head')
def _commit(self):
self._session.commit()
self._session.begin()
def _commit(self, tenant_id):
self._session[tenant_id].commit()
self._session[tenant_id].begin()
def _dispatch(self, data):
def _dispatch(self, data, tenant_id):
for service in data:
for frame in data[service]:
self._append_time_frame(service, frame)
self._append_time_frame(service, frame, tenant_id)
# HACK(adriant) Quick hack to allow billing windows to
# progress. This check/insert probably ought to be moved
# somewhere else.
if not data[service]:
empty_frame = {'vol': {'qty': 0, 'unit': 'None'},
'billing': {'price': 0}, 'desc': ''}
self._append_time_frame(service, empty_frame)
self._append_time_frame(service, empty_frame, tenant_id)
def append(self, raw_data):
if not self._session:
self._session = db.get_session()
self._session.begin()
super(SQLAlchemyStorage, self).append(raw_data)
def append(self, raw_data, tenant_id):
session = self._session.get(tenant_id)
if not session:
self._session[tenant_id] = db.get_session()
self._session[tenant_id].begin()
super(SQLAlchemyStorage, self).append(raw_data, tenant_id)
def get_state(self):
def get_state(self, tenant_id=None):
session = db.get_session()
r = utils.model_query(
q = utils.model_query(
models.RatedDataFrame,
session
).order_by(
)
if tenant_id:
q = q.filter(
models.RatedDataFrame.tenant_id == tenant_id
)
r = q.order_by(
models.RatedDataFrame.begin.desc()
).first()
if r:
return ck_utils.dt2ts(r.begin)
def get_total(self):
def get_total(self, begin=None, end=None, tenant_id=None):
model = models.RatedDataFrame
# Boundary calculation
month_start = ck_utils.get_month_start()
month_end = ck_utils.get_next_month()
if not begin:
begin = ck_utils.get_month_start()
if not end:
end = ck_utils.get_next_month()
session = db.get_session()
rate = session.query(
q = session.query(
sqlalchemy.func.sum(model.rate).label('rate')
).filter(
model.begin >= month_start,
model.end <= month_end
)
if tenant_id:
q = q.filter(
models.RatedDataFrame.tenant_id == tenant_id
)
rate = q.filter(
model.begin >= begin,
model.end <= end
).scalar()
return rate
def get_time_frame(self, begin, end, **filters):
"""Return a list of time frames.
def get_tenants(self, begin=None, end=None):
model = models.RatedDataFrame
:param start: Filter from `start`.
:param end: Filter to `end`.
:param unit: Filter on an unit type.
:param res_type: Filter on a resource type.
"""
# Boundary calculation
if not begin:
begin = ck_utils.get_month_start()
if not end:
end = ck_utils.get_next_month()
session = db.get_session()
q = utils.model_query(
model,
session
).filter(
model.begin >= begin,
model.end <= end
)
tenants = q.distinct().values(
model.tenant_id
)
return [tenant.tenant_id for tenant in tenants]
def get_time_frame(self, begin, end, **filters):
model = models.RatedDataFrame
session = db.get_session()
q = utils.model_query(
@ -112,30 +140,33 @@ class SQLAlchemyStorage(storage.BaseStorage):
raise storage.NoTimeFrame()
return [entry.to_cloudkitty() for entry in r]
def _append_time_frame(self, res_type, frame):
def _append_time_frame(self, res_type, frame, tenant_id):
vol_dict = frame['vol']
qty = vol_dict['qty']
unit = vol_dict['unit']
rating_dict = frame['billing']
rate = rating_dict['price']
desc = json.dumps(frame['desc'])
self.add_time_frame(self.usage_start_dt,
self.usage_end_dt,
self.add_time_frame(self.usage_start_dt.get(tenant_id),
self.usage_end_dt.get(tenant_id),
tenant_id,
unit,
qty,
res_type,
rate,
desc)
def add_time_frame(self, begin, end, unit, qty, res_type, rate, desc):
def add_time_frame(self, begin, end, tenant_id, unit, qty, res_type,
rate, desc):
"""Create a new time frame.
"""
frame = models.RatedDataFrame(begin=begin,
end=end,
tenant_id=tenant_id,
unit=unit,
qty=qty,
res_type=res_type,
rate=rate,
desc=desc)
self._session.add(frame)
self._session[tenant_id].add(frame)

View File

@ -0,0 +1,27 @@
"""added tenant informations
Revision ID: 792b438b663
Revises: 17fd1b237aa3
Create Date: 2014-12-02 13:12:11.328534
"""
# revision identifiers, used by Alembic.
revision = '792b438b663'
down_revision = '17fd1b237aa3'
from alembic import op
import sqlalchemy as sa
from cloudkitty.storage.sqlalchemy import models
def upgrade():
op.add_column('rated_data_frames',
sa.Column('tenant_id',
sa.String(length=32),
nullable=True))
def downgrade():
op.drop_column('rated_data_frames', 'tenant_id')

View File

@ -21,6 +21,8 @@ from oslo.db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
from cloudkitty import utils as ck_utils
Base = declarative.declarative_base()
@ -34,6 +36,8 @@ class RatedDataFrame(Base, models.ModelBase):
id = sqlalchemy.Column(sqlalchemy.Integer,
primary_key=True)
tenant_id = sqlalchemy.Column(sqlalchemy.String(32),
nullable=True)
begin = sqlalchemy.Column(sqlalchemy.DateTime,
nullable=False)
end = sqlalchemy.Column(sqlalchemy.DateTime,
@ -50,20 +54,32 @@ class RatedDataFrame(Base, models.ModelBase):
nullable=False)
def to_cloudkitty(self):
period_dict = {}
period_dict['begin'] = self.begin.isoformat()
period_dict['end'] = self.end.isoformat()
# Rating informations
rating_dict = {}
rating_dict['price'] = self.rate
# Volume informations
vol_dict = {}
vol_dict['qty'] = self.qty
vol_dict['unit'] = self.unit
res_dict = {}
# Encapsulate informations in a resource dict
res_dict['billing'] = rating_dict
res_dict['desc'] = json.loads(self.desc)
res_dict['vol'] = vol_dict
res_dict['tenant_id'] = self.tenant_id
# Add resource to the usage dict
usage_dict = {}
usage_dict[self.res_type] = [res_dict]
# Time informations
period_dict = {}
period_dict['begin'] = ck_utils.dt2iso(self.begin)
period_dict['end'] = ck_utils.dt2iso(self.end)
# Add period to the resource informations
ck_dict = {}
ck_dict['period'] = period_dict
ck_dict['usage'] = usage_dict

View File

@ -34,16 +34,16 @@ class WriteOrchestrator(object):
"""
def __init__(self,
backend,
user_id,
tenant_id,
storage,
basepath=None,
period=3600):
self._backend = backend
self._uid = user_id
self._tenant_id = tenant_id
self._storage = storage
self._basepath = basepath
self._period = period
self._sm = state.DBStateManager(self._uid,
self._sm = state.DBStateManager(self._tenant_id,
'writer_status')
self._write_pipeline = []
@ -64,7 +64,7 @@ class WriteOrchestrator(object):
def add_writer(self, writer_class):
writer = writer_class(self,
self._uid,
self._tenant_id,
self._backend,
self._basepath)
self._write_pipeline.append(writer)
@ -97,7 +97,8 @@ class WriteOrchestrator(object):
timeframe_end = timeframe + self._period
try:
data = self._storage.get_time_frame(timeframe,
timeframe_end)
timeframe_end,
tenant_id=self._tenant_id)
except storage.NoTimeFrame:
return None
return data

View File

@ -28,11 +28,11 @@ class BaseReportWriter(object):
"""Base report writer."""
report_type = None
def __init__(self, write_orchestrator, user_id, backend, basepath=None):
def __init__(self, write_orchestrator, tenant_id, backend, basepath=None):
self._write_orchestrator = write_orchestrator
self._backend = backend
self._uid = user_id
self._sm = state.DBStateManager(self._uid,
self._tenant_id = tenant_id
self._sm = state.DBStateManager(self._tenant_id,
self.report_type)
self._report = None
self._period = 3600

View File

@ -37,7 +37,7 @@ class OSRFBackend(writer.BaseReportWriter):
report_type = 'osrf'
def _gen_filename(self, timeframe):
filename = '{}-osrf-{}-{:02d}.json'.format(self._uid,
filename = '{}-osrf-{}-{:02d}.json'.format(self._tenant_id,
timeframe.year,
timeframe.month)
if self._basepath:

View File

@ -5,7 +5,7 @@ python-keystoneclient
iso8601
PasteDeploy==1.5.2
posix_ipc
pecan==0.5.0
pecan>=0.8.0
WSME>=0.6,!=0.6.2
oslo.config>=1.2.0
oslo.messaging<1.6.0