Added support for rating module priority

Rating modules are now sorted based on a priority number. The modules
with the highest priority are loaded first.
Added API calls to set the priority of a module.

Change-Id: I84d9f6c31014c3164e5b7d8efdd3c909e342df96
This commit is contained in:
Stéphane Albert 2015-03-17 17:51:06 +01:00
parent 229043fc42
commit fef8344f33
14 changed files with 424 additions and 94 deletions

View File

@ -53,8 +53,7 @@ class ModulesController(rest.RestController):
modules_list.append(rating_models.CloudkittyModule(**infos))
return rating_models.CloudkittyModuleCollection(
modules=modules_list
)
modules=modules_list)
@wsme_pecan.wsexpose(rating_models.CloudkittyModule, wtypes.text)
def get_one(self, module_id):
@ -65,7 +64,7 @@ class ModulesController(rest.RestController):
try:
module = self.extensions[module_id]
except KeyError:
pecan.abort(404)
pecan.abort(404, 'Module not found.')
infos = module.obj.module_info.copy()
infos['module_id'] = infos.pop('name')
return rating_models.CloudkittyModule(**infos)
@ -75,16 +74,19 @@ class ModulesController(rest.RestController):
body=rating_models.CloudkittyModule,
status_code=302)
def put(self, module_id, module):
"""Change the state of a module (enabled/disabled)
"""Change the state and priority of a module.
:param module_id: name of the module to modify
:param module: CloudKittyModule object describing the new desired state
## :return: CloudKittyModule object describing the desired state
"""
try:
self.extensions[module_id].obj.set_state(module.enabled)
ext = self.extensions[module_id].obj
except KeyError:
pecan.abort(404)
pecan.abort(404, 'Module not found.')
if ext.enabled != module.enabled:
ext.set_state(module.enabled)
if ext.priority != module.priority:
ext.set_priority(module.priority)
pecan.response.location = pecan.request.path

View File

@ -85,12 +85,16 @@ class CloudkittyModule(wtypes.Base):
hot_config = wtypes.wsattr(bool, default=False, name='hot-config')
"""On-the-fly configuration support."""
priority = wtypes.wsattr(int, default=1)
"""Priority of the extension."""
@classmethod
def sample(cls):
sample = cls(name='example',
description='Sample extension.',
enabled=True,
hot_config=False)
hot_config=False,
priority=2)
return sample

View File

@ -84,13 +84,34 @@ class ModuleEnableState(object):
@abc.abstractmethod
def set_state(self, name, state):
"""Retrieve the module state.
"""Set the module state.
:param name: Name of the module
:param value: State of the module
"""
@six.add_metaclass(abc.ABCMeta)
class ModuleInfo(ModuleEnableState):
"""Base class for module info management."""
@abc.abstractmethod
def get_priority(self, name):
"""Retrieve the module priority.
:param name: Name of the module
:return int: Priority of the module
"""
@abc.abstractmethod
def set_priority(self, name, priority):
"""Set the module state.
:param name: Name of the module
:param priority: New priority of the module
"""
class NoSuchMapping(Exception):
"""Raised when the mapping doesn't exist."""

View File

@ -0,0 +1,22 @@
"""Added priority to modules_state.
Revision ID: 385e33fef139
Revises: 2ac2217dcbd9
Create Date: 2015-03-17 17:50:15.229896
"""
# revision identifiers, used by Alembic.
revision = '385e33fef139'
down_revision = '2ac2217dcbd9'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('modules_state', sa.Column('priority', sa.Integer(), nullable=True))
def downgrade():
op.drop_column('modules_state', 'priority')

View File

@ -39,15 +39,11 @@ class State(api.State):
def get_state(self, name):
session = db.get_session()
try:
return utils.model_query(
q = utils.model_query(
models.StateInfo,
session
).filter_by(
name=name,
).value('state')
except sqlalchemy.orm.exc.NoResultFound:
return None
session)
q = q.filter(models.StateInfo.name == name)
return q.value(models.StateInfo.state)
def set_state(self, name, state):
session = db.get_session()
@ -55,54 +51,55 @@ class State(api.State):
try:
q = utils.model_query(
models.StateInfo,
session
).filter_by(
name=name,
).with_lockmode('update')
session)
q = q.filter(models.StateInfo.name == name)
q = q.with_lockmode('update')
db_state = q.one()
db_state.state = state
except sqlalchemy.orm.exc.NoResultFound:
db_state = models.StateInfo(name=name, state=state)
db_state = models.StateInfo(name=name,
state=state)
session.add(db_state)
return db_state.state
def get_metadata(self, name):
session = db.get_session()
return utils.model_query(
q = utils.model_query(
models.StateInfo,
session
).filter_by(
name=name,
).value('s_metadata')
session)
q.filter(models.StateInfo.name == name)
return q.value(models.StateInfo.s_metadata)
def set_metadata(self, name, metadata):
session = db.get_session()
with session.begin():
try:
db_state = utils.model_query(
q = utils.model_query(
models.StateInfo,
session
).filter_by(
name=name,
).with_lockmode('update').one()
session)
q = q.filter(models.StateInfo.name == name)
q = q.with_lockmode('update')
db_state = q.one()
db_state.s_metadata = metadata
except sqlalchemy.orm.exc.NoResultFound:
db_state = models.StateInfo(name=name, s_metadata=metadata)
db_state = models.StateInfo(name=name,
s_metadata=metadata)
session.add(db_state)
finally:
session.flush()
class ModuleEnableState(api.ModuleEnableState):
"""Deprecated, use ModuleInfo instead.
"""
def get_state(self, name):
session = db.get_session()
try:
return bool(utils.model_query(
q = utils.model_query(
models.ModuleStateInfo,
session
).filter_by(
name=name,
).value('state'))
session)
q = q.filter(models.ModuleStateInfo.name == name)
res = q.value(models.ModuleStateInfo.state)
return bool(res)
except sqlalchemy.orm.exc.NoResultFound:
return None
@ -112,10 +109,9 @@ class ModuleEnableState(api.ModuleEnableState):
try:
q = utils.model_query(
models.ModuleStateInfo,
session
).filter_by(
name=name,
).with_lockmode('update')
session)
q = q.filter(models.ModuleStateInfo.name == name)
q = q.with_lockmode('update')
db_state = q.one()
db_state.state = state
except sqlalchemy.orm.exc.NoResultFound:
@ -124,19 +120,52 @@ class ModuleEnableState(api.ModuleEnableState):
return bool(db_state.state)
class ModuleInfo(ModuleEnableState):
"""Base class for module info management."""
def get_priority(self, name):
session = db.get_session()
q = utils.model_query(
models.ModuleStateInfo,
session)
q = q.filter(models.ModuleStateInfo.name == name)
res = q.value(models.ModuleStateInfo.priority)
if res:
return int(res)
else:
return 1
def set_priority(self, name, priority):
session = db.get_session()
with session.begin():
try:
q = utils.model_query(
models.ModuleStateInfo,
session)
q = q.filter(
models.ModuleStateInfo.name == name)
q = q.with_lockmode('update')
db_state = q.one()
db_state.priority = priority
except sqlalchemy.orm.exc.NoResultFound:
db_state = models.ModuleStateInfo(name=name,
priority=priority)
session.add(db_state)
return int(db_state.priority)
class ServiceToCollectorMapping(object):
"""Base class for service to collector mapping."""
def get_mapping(self, service):
session = db.get_session()
try:
res = utils.model_query(
q = utils.model_query(
models.ServiceToCollectorMapping,
session
).filter_by(
service=service,
).one()
return res
session)
q.filter(
models.ServiceToCollectorMapping.service == service)
return q.one()
except sqlalchemy.orm.exc.NoResultFound:
raise api.NoSuchMapping(service)
@ -146,15 +175,16 @@ class ServiceToCollectorMapping(object):
try:
q = utils.model_query(
models.ServiceToCollectorMapping,
session
).filter_by(
service=service,
).with_lockmode('update')
session)
q = q.filter_by(
service=service)
q = q.with_lockmode('update')
db_mapping = q.one()
db_mapping.collector = collector
except sqlalchemy.orm.exc.NoResultFound:
model = models.ServiceToCollectorMapping
db_mapping = model(service=service, collector=collector)
db_mapping = model(service=service,
collector=collector)
session.add(db_mapping)
return db_mapping
@ -162,21 +192,18 @@ class ServiceToCollectorMapping(object):
session = db.get_session()
q = utils.model_query(
models.ServiceToCollectorMapping,
session
)
session)
res = q.distinct().values(
models.ServiceToCollectorMapping.service
)
models.ServiceToCollectorMapping.service)
return res
def delete_mapping(self, service):
session = db.get_session()
r = utils.model_query(
q = utils.model_query(
models.ServiceToCollectorMapping,
session
).filter_by(
service=service,
).delete()
session)
q = q.filter(models.ServiceToCollectorMapping.service == service)
r = q.delete()
if not r:
raise api.NoSuchMapping(service)
@ -191,6 +218,10 @@ class DBAPIManager(object):
def get_module_enable_state():
return ModuleEnableState()
@staticmethod
def get_module_info():
return ModuleInfo()
@staticmethod
def get_service_to_collector_mapping():
return ServiceToCollectorMapping()

View File

@ -60,6 +60,9 @@ class ModuleStateInfo(Base, models.ModelBase):
sqlalchemy.Boolean(),
nullable=False,
default=False)
priority = sqlalchemy.Column(
sqlalchemy.Integer(),
default=1)
def __repr__(self):
return ('<ModuleStateInfo[{name}]: '

View File

@ -104,20 +104,19 @@ class BaseWorker(object):
self._tenant_id = tenant_id
# Rating processors
self._processors = {}
self._processors = []
self._load_rating_processors()
def _load_rating_processors(self):
self._processors = {}
self._processors = []
processors = extension_manager.EnabledExtensionManager(
PROCESSORS_NAMESPACE,
invoke_kwds={'tenant_id': self._tenant_id}
)
for processor in processors:
b_name = processor.name
b_obj = processor.obj
self._processors[b_name] = b_obj
self._processors.append(processor)
self._processors.sort(key=lambda x: x.obj.priority, reverse=True)
class APIWorker(BaseWorker):
@ -125,8 +124,8 @@ class APIWorker(BaseWorker):
super(APIWorker, self).__init__(tenant_id)
def quote(self, res_data):
for processor in self._processors.values():
processor.quote(res_data)
for processor in self._processors:
processor.obj.quote(res_data)
price = decimal.Decimal(0)
for res in res_data:

View File

@ -44,26 +44,50 @@ class RatingProcessorBase(object):
'name': self.module_name,
'description': self.description,
'hot_config': self.hot_config,
'enabled': self.enabled, }
'enabled': self.enabled,
'priority': self.priority}
def __init__(self, tenant_id=None):
self._tenant_id = tenant_id
@abc.abstractproperty
@property
def enabled(self):
"""Check if the module is enabled
:returns: bool if module is enabled
"""
api = db_api.get_instance()
module_db = api.get_module_enable_state()
return module_db.get_state(self.module_name) or False
@property
def priority(self):
"""Get the priority of the module.
"""
api = db_api.get_instance()
module_db = api.get_module_info()
return module_db.get_priority(self.module_name)
def set_priority(self, priority):
"""Set the priority of the module.
:param priority: (int) The new priority, the higher the number, the
higher the priority.
"""
api = db_api.get_instance()
module_db = api.get_module_info()
self.notify_reload()
return module_db.set_priority(self.module_name, priority)
def set_state(self, enabled):
"""Enable or disable a module
"""Enable or disable a module.
:param enabled: (bool) The state to put the module in.
:return: bool
"""
api = db_api.get_instance()
module_db = api.get_module_enable_state()
module_db = api.get_module_info()
client = rpc.get_client().prepare(namespace='rating',
fanout=True)
if enabled:

View File

@ -15,7 +15,6 @@
#
# @author: Stéphane Albert
#
from cloudkitty.db import api as ck_db_api
from cloudkitty.openstack.common import log as logging
from cloudkitty import rating
from cloudkitty.rating.hash.controllers import root as root_api
@ -45,16 +44,6 @@ class HashMap(rating.RatingProcessorBase):
self._res = {}
self._load_rates()
@property
def enabled(self):
"""Check if the module is enabled
:returns: bool if module is enabled
"""
db_api = ck_db_api.get_instance()
module_db = db_api.get_module_enable_state()
return module_db.get_state('hashmap') or False
def reload_config(self):
"""Reload the module's configuration.

View File

@ -31,6 +31,10 @@ class Noop(rating.RatingProcessorBase):
"""
return True
@property
def priority(self):
return 1
def reload_config(self):
pass

View File

@ -20,7 +20,34 @@ from oslotest import base
import testscenarios
from cloudkitty import db
from cloudkitty.db import api as db_api
from cloudkitty.db import api as ck_db_api
from cloudkitty import rating
class FakeRatingModule(rating.RatingProcessorBase):
module_name = 'fake'
description = 'fake rating module'
def __init__(self, tenant_id=None):
super(FakeRatingModule, self).__init__()
def quote(self, data):
self.process(data)
def process(self, data):
for cur_data in data:
cur_usage = cur_data['usage']
for service in cur_usage:
for entry in cur_usage[service]:
if 'rating' not in entry:
entry['rating'] = {'price': 0}
return data
def reload_config(self):
pass
def notify_reload(self):
pass
class TestCase(testscenarios.TestWithScenarios, base.BaseTestCase):
@ -32,7 +59,7 @@ class TestCase(testscenarios.TestWithScenarios, base.BaseTestCase):
super(TestCase, self).setUp()
self.conf = self.useFixture(config_fixture.Config()).conf
self.conf.set_override('connection', self.db_url, 'database')
self.conn = db_api.get_instance()
self.conn = ck_db_api.get_instance()
migration = self.conn.get_migration()
migration.upgrade('head')

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import mock
from oslo.messaging import conffixture
from stevedore import extension
from cloudkitty import orchestrator
from cloudkitty import tests
class FakeKeystoneClient(object):
class FakeTenants(object):
def list(self):
return ['f266f30b11f246b589fd266f85eeec39',
'4dfb25b0947c4f5481daf7b948c14187']
tenants = FakeTenants()
class OrchestratorTest(tests.TestCase):
def setUp(self):
super(OrchestratorTest, self).setUp()
messaging_conf = self.useFixture(conffixture.ConfFixture(self.conf))
messaging_conf.transport_driver = 'fake'
self.conf.set_override('username', 'cloudkitty', 'auth')
self.conf.set_override('password', 'cloudkitty', 'auth')
self.conf.set_override('tenant', 'cloudkitty', 'auth')
self.conf.set_override('region', 'RegionOne', 'auth')
self.conf.set_override('url', 'http://127.0.0.1:5000/v2.0', 'auth')
def setup_fake_modules(self):
fake_module1 = tests.FakeRatingModule()
fake_module1.module_name = 'fake1'
fake_module1.set_priority(3)
fake_module2 = tests.FakeRatingModule()
fake_module2.module_name = 'fake2'
fake_module2.set_priority(1)
fake_module3 = tests.FakeRatingModule()
fake_module3.module_name = 'fake3'
fake_module3.set_priority(2)
fake_extensions = [
extension.Extension(
'fake1',
'cloudkitty.tests.FakeRatingModule1',
None,
fake_module1),
extension.Extension(
'fake2',
'cloudkitty.tests.FakeRatingModule2',
None,
fake_module2),
extension.Extension(
'fake3',
'cloudkitty.tests.FakeRatingModule3',
None,
fake_module3)]
return fake_extensions
def test_processors_ordering_in_workers(self):
fake_extensions = self.setup_fake_modules()
ck_ext_mgr = 'cloudkitty.extension_manager.EnabledExtensionManager'
with mock.patch(ck_ext_mgr) as stevemock:
fake_mgr = extension.ExtensionManager.make_test_instance(
fake_extensions,
'cloudkitty.rating.processors')
stevemock.return_value = fake_mgr
worker = orchestrator.BaseWorker()
stevemock.assertCalled()
self.assertEqual('fake1', worker._processors[0].name)
self.assertEqual(3, worker._processors[0].obj.priority)
self.assertEqual('fake3', worker._processors[1].name)
self.assertEqual(2, worker._processors[1].obj.priority)
self.assertEqual('fake2', worker._processors[2].name)
self.assertEqual(1, worker._processors[2].obj.priority)

View File

@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
import mock
from cloudkitty.db import api as ck_db_api
from cloudkitty import tests
class FakeRPCClient(object):
def __init__(self, namespace=None, fanout=False):
self._queue = []
self._namespace = namespace
self._fanout = fanout
def prepare(self, namespace=None, fanout=False):
self._namespace = namespace
self._fanout = fanout
return self
def cast(self, ctx, data, **kwargs):
cast_data = {'ctx': ctx,
'data': data}
cast_data.update(kwargs)
self._queue.append(cast_data)
class RatingTest(tests.TestCase):
def setUp(self):
super(RatingTest, self).setUp()
self._tenant_id = 'f266f30b11f246b589fd266f85eeec39'
self._module = tests.FakeRatingModule(self._tenant_id)
self._fake_rpc = FakeRPCClient()
def test_get_module_info(self):
mod_infos = self._module.module_info
expected_infos = {'name': 'fake',
'description': 'fake rating module',
'hot_config': False,
'enabled': False,
'priority': 1}
self.assertEqual(expected_infos, mod_infos)
def test_set_state_triggers_rpc(self):
with mock.patch('cloudkitty.rpc.get_client') as rpcmock:
rpcmock.return_value = self._fake_rpc
self._module.set_state(True)
self.assertTrue(self._fake_rpc._fanout)
self.assertEqual('rating', self._fake_rpc._namespace)
self.assertEqual(1, len(self._fake_rpc._queue))
rpc_data = self._fake_rpc._queue[0]
expected_data = {'ctx': {},
'data': 'enable_module',
'name': 'fake'}
self.assertEqual(expected_data, rpc_data)
self._module.set_state(False)
self.assertEqual(2, len(self._fake_rpc._queue))
rpc_data = self._fake_rpc._queue[1]
expected_data['data'] = 'disable_module'
self.assertEqual(expected_data, rpc_data)
def test_enable_module(self):
with mock.patch('cloudkitty.rpc.get_client') as rpcmock:
rpcmock.return_value = self._fake_rpc
self._module.set_state(True)
db_api = ck_db_api.get_instance()
module_db = db_api.get_module_enable_state()
self.assertTrue(module_db.get_state('fake'))
def test_disable_module(self):
with mock.patch('cloudkitty.rpc.get_client') as rpcmock:
rpcmock.return_value = self._fake_rpc
self._module.set_state(False)
db_api = ck_db_api.get_instance()
module_db = db_api.get_module_enable_state()
self.assertFalse(module_db.get_state('fake'))
def test_enabled_property(self):
db_api = ck_db_api.get_instance()
module_db = db_api.get_module_enable_state()
module_db.set_state('fake', True)
self.assertTrue(self._module.enabled)
module_db.set_state('fake', False)
self.assertFalse(self._module.enabled)
def test_get_default_priority(self):
self.assertEqual(1, self._module.priority)
def test_set_priority(self):
self._module.set_priority(10)
db_api = ck_db_api.get_instance()
module_db = db_api.get_module_info()
self.assertEqual(10, module_db.get_priority('fake'))
def test_update_priority(self):
old_prio = self._module.priority
self._module.set_priority(10)
new_prio = self._module.priority
self.assertNotEqual(old_prio, new_prio)

View File

@ -43,6 +43,7 @@ commands = {posargs}
[flake8]
exclude = .git,.venv,.tox,dist,doc,*egg,build,.ropeproject,./cloudkitty/openstack/common,*/alembic/versions/*
ignore = H105
[hacking]
import_exceptions = cloudkitty.i18n