Added support for dynamic rating module reloading
Added API call to trigger a rating module reload. Added patch to reload entry points and clear stevedore cache. Refactored code to remove duplicated code. Change-Id: I2231521708f163d80538fe2e7e5559d4c06cf0c3
This commit is contained in:
parent
7dbf3accf6
commit
c9267532cb
|
@ -15,6 +15,7 @@
|
|||
#
|
||||
# @author: Stéphane Albert
|
||||
#
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
import pecan
|
||||
from pecan import rest
|
||||
|
@ -24,22 +25,43 @@ import wsmeext.pecan as wsme_pecan
|
|||
|
||||
from cloudkitty.api.v1.datamodels import rating as rating_models
|
||||
from cloudkitty.common import policy
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
PROCESSORS_NAMESPACE = 'cloudkitty.rating.processors'
|
||||
|
||||
|
||||
class ModulesController(rest.RestController):
|
||||
"""REST Controller managing rating modules."""
|
||||
|
||||
def __init__(self):
|
||||
class RatingModulesMixin(object):
|
||||
def reload_extensions(self):
|
||||
lock = lockutils.lock('rating-modules')
|
||||
with lock:
|
||||
ck_utils.refresh_stevedore(PROCESSORS_NAMESPACE)
|
||||
# FIXME(sheeprine): Implement RPC messages to trigger reload on
|
||||
# processors
|
||||
self.extensions = extension.ExtensionManager(
|
||||
PROCESSORS_NAMESPACE,
|
||||
# FIXME(sheeprine): don't want to load it here as we just need the
|
||||
# controller
|
||||
invoke_on_load=True
|
||||
)
|
||||
# FIXME(sheeprine): don't want to load it here as we just need
|
||||
# the controller
|
||||
invoke_on_load=True)
|
||||
if not self._first_call:
|
||||
self.notify_reload()
|
||||
else:
|
||||
self._first_call = False
|
||||
|
||||
def notify_reload(self):
|
||||
client = pecan.request.rpc_client.prepare(namespace='rating',
|
||||
version='1.1')
|
||||
client.cast({}, 'reload_modules')
|
||||
|
||||
def __init__(self):
|
||||
self._first_call = True
|
||||
self.extensions = []
|
||||
self.reload_extensions()
|
||||
|
||||
|
||||
class ModulesController(rest.RestController, RatingModulesMixin):
|
||||
"""REST Controller managing rating modules."""
|
||||
|
||||
def route(self, *args):
|
||||
route = args[0]
|
||||
|
@ -57,6 +79,8 @@ class ModulesController(rest.RestController):
|
|||
policy.enforce(pecan.request.context, 'rating:list_modules', {})
|
||||
|
||||
modules_list = []
|
||||
lock = lockutils.lock('rating-modules')
|
||||
with lock:
|
||||
for module in self.extensions:
|
||||
infos = module.obj.module_info.copy()
|
||||
infos['module_id'] = infos.pop('name')
|
||||
|
@ -74,6 +98,8 @@ class ModulesController(rest.RestController):
|
|||
policy.enforce(pecan.request.context, 'rating:get_module', {})
|
||||
|
||||
try:
|
||||
lock = lockutils.lock('rating-modules')
|
||||
with lock:
|
||||
module = self.extensions[module_id]
|
||||
except KeyError:
|
||||
pecan.abort(404, 'Module not found.')
|
||||
|
@ -94,6 +120,8 @@ class ModulesController(rest.RestController):
|
|||
policy.enforce(pecan.request.context, 'rating:update_module', {})
|
||||
|
||||
try:
|
||||
lock = lockutils.lock('rating-modules')
|
||||
with lock:
|
||||
ext = self.extensions[module_id].obj
|
||||
except KeyError:
|
||||
pecan.abort(404, 'Module not found.')
|
||||
|
@ -119,7 +147,7 @@ class UnconfigurableController(rest.RestController):
|
|||
pecan.abort(409, "Module is not configurable")
|
||||
|
||||
|
||||
class ModulesExposer(rest.RestController):
|
||||
class ModulesExposer(rest.RestController, RatingModulesMixin):
|
||||
"""REST Controller exposing rating modules.
|
||||
|
||||
This is the controller that exposes the modules own configuration
|
||||
|
@ -127,22 +155,27 @@ class ModulesExposer(rest.RestController):
|
|||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.extensions = extension.ExtensionManager(
|
||||
PROCESSORS_NAMESPACE,
|
||||
# FIXME(sheeprine): don't want to load it here as we just need the
|
||||
# controller
|
||||
invoke_on_load=True
|
||||
)
|
||||
super(ModulesExposer, self).__init__()
|
||||
self._loaded_modules = []
|
||||
self.expose_modules()
|
||||
|
||||
def expose_modules(self):
|
||||
"""Load rating modules to expose API controllers."""
|
||||
lock = lockutils.lock('rating-modules')
|
||||
with lock:
|
||||
for ext in self.extensions:
|
||||
# FIXME(sheeprine): we should notify two modules with same name
|
||||
if not hasattr(self, ext.name):
|
||||
name = ext.name
|
||||
if not ext.obj.config_controller:
|
||||
ext.obj.config_controller = UnconfigurableController
|
||||
setattr(self, ext.name, ext.obj.config_controller())
|
||||
# Update extension reference
|
||||
setattr(self, name, ext.obj.config_controller())
|
||||
if name in self._loaded_modules:
|
||||
self._loaded_modules.remove(name)
|
||||
# Clear removed modules
|
||||
for module in self._loaded_modules:
|
||||
delattr(self, module)
|
||||
self._loaded_modules = self.extensions.names()
|
||||
|
||||
|
||||
class RatingController(rest.RestController):
|
||||
|
@ -154,6 +187,7 @@ class RatingController(rest.RestController):
|
|||
|
||||
_custom_actions = {
|
||||
'quote': ['POST'],
|
||||
'reload_modules': ['GET'],
|
||||
}
|
||||
|
||||
modules = ModulesController()
|
||||
|
@ -179,3 +213,12 @@ class RatingController(rest.RestController):
|
|||
|
||||
res = client.call({}, 'quote', res_data=[{'usage': res_dict}])
|
||||
return res
|
||||
|
||||
@wsme_pecan.wsexpose(None)
|
||||
def reload_modules(self):
|
||||
"""Trigger a rating module list reload.
|
||||
|
||||
"""
|
||||
policy.enforce(pecan.request.context, 'rating:module_config', {})
|
||||
self.modules.reload_extensions()
|
||||
self.module_config.reload_extensions()
|
||||
|
|
|
@ -23,7 +23,9 @@ TRANSPORT = None
|
|||
|
||||
def init():
|
||||
global TRANSPORT
|
||||
if not TRANSPORT:
|
||||
TRANSPORT = messaging.get_transport(cfg.CONF)
|
||||
return TRANSPORT
|
||||
|
||||
|
||||
def get_client(target, version_cap=None):
|
||||
|
|
|
@ -50,9 +50,10 @@ STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
|
|||
|
||||
class RatingEndpoint(object):
|
||||
target = messaging.Target(namespace='rating',
|
||||
version='1.0')
|
||||
version='1.1')
|
||||
|
||||
def __init__(self, orchestrator):
|
||||
self._global_reload = False
|
||||
self._pending_reload = []
|
||||
self._module_state = {}
|
||||
self._orchestrator = orchestrator
|
||||
|
@ -76,6 +77,12 @@ class RatingEndpoint(object):
|
|||
worker = APIWorker()
|
||||
return str(worker.quote(res_data))
|
||||
|
||||
def reload_modules(self, ctxt):
|
||||
LOG.info('Received reload modules command.')
|
||||
lock = lockutils.lock('module-reload')
|
||||
with lock:
|
||||
self._global_reload = True
|
||||
|
||||
def reload_module(self, ctxt, name):
|
||||
LOG.info('Received reload command for module {}.'.format(name))
|
||||
lock = lockutils.lock('module-reload')
|
||||
|
@ -110,8 +117,7 @@ class BaseWorker(object):
|
|||
self._processors = []
|
||||
processors = extension_manager.EnabledExtensionManager(
|
||||
PROCESSORS_NAMESPACE,
|
||||
invoke_kwds={'tenant_id': self._tenant_id}
|
||||
)
|
||||
invoke_kwds={'tenant_id': self._tenant_id})
|
||||
|
||||
for processor in processors:
|
||||
self._processors.append(processor)
|
||||
|
|
|
@ -92,16 +92,14 @@ class SQLAlchemyStorage(storage.BaseStorage):
|
|||
|
||||
session = db.get_session()
|
||||
q = session.query(
|
||||
sqlalchemy.func.sum(model.rate).label('rate')
|
||||
)
|
||||
sqlalchemy.func.sum(model.rate).label('rate'))
|
||||
if tenant_id:
|
||||
q = q.filter(
|
||||
models.RatedDataFrame.tenant_id == tenant_id
|
||||
)
|
||||
rate = q.filter(
|
||||
models.RatedDataFrame.tenant_id == tenant_id)
|
||||
q = q.filter(
|
||||
model.begin >= begin,
|
||||
model.end <= end
|
||||
).scalar()
|
||||
model.end <= end)
|
||||
rate = q.scalar()
|
||||
return rate
|
||||
|
||||
def get_tenants(self, begin=None, end=None):
|
||||
|
|
|
@ -23,8 +23,10 @@ to ease maintenance in case of library modifications.
|
|||
"""
|
||||
import calendar
|
||||
import datetime
|
||||
import sys
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from stevedore import extension
|
||||
|
||||
|
||||
def dt2ts(orig_dt):
|
||||
|
@ -123,3 +125,15 @@ def get_next_month(dt=None):
|
|||
|
||||
def get_next_month_timestamp(dt=None):
|
||||
return dt2ts(get_next_month(dt))
|
||||
|
||||
|
||||
def refresh_stevedore(namespace=None):
|
||||
# Trigger reload of entry points
|
||||
reload(sys.modules['pkg_resources'])
|
||||
# Clear stevedore cache
|
||||
cache = extension.ExtensionManager.ENTRY_POINT_CACHE
|
||||
if namespace:
|
||||
if namespace in cache:
|
||||
del cache[namespace]
|
||||
else:
|
||||
cache.clear()
|
||||
|
|
Loading…
Reference in New Issue