Merge "heat engine move to per-stack periodic watch threads"
This commit is contained in:
commit
339ca7bf06
@ -53,12 +53,48 @@ class EngineService(service.Service):
|
|||||||
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
|
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
|
||||||
self.stg[stack_id].add_thread(func, *args, **kwargs)
|
self.stg[stack_id].add_thread(func, *args, **kwargs)
|
||||||
|
|
||||||
|
def _timer_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Define a periodic task, to be run in a separate thread, in the stack
|
||||||
|
threadgroups. Periodicity is cfg.CONF.periodic_interval
|
||||||
|
"""
|
||||||
|
if stack_id not in self.stg:
|
||||||
|
thr_name = '%s-%s' % (stack_name, stack_id)
|
||||||
|
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
|
||||||
|
self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
|
||||||
|
func, *args, **kwargs)
|
||||||
|
|
||||||
|
def _service_task(self):
|
||||||
|
"""
|
||||||
|
This is a dummy task which gets queued on the service.Service
|
||||||
|
threadgroup. Without this service.Service sees nothing running
|
||||||
|
i.e has nothing to wait() on, so the process exits..
|
||||||
|
This could also be used to trigger periodic non-stack-specific
|
||||||
|
housekeeping tasks
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
super(EngineService, self).start()
|
super(EngineService, self).start()
|
||||||
admin_context = context.get_admin_context()
|
|
||||||
|
# Create dummy service task, because when there is nothing queued
|
||||||
|
# on self.tg the process exits
|
||||||
self.tg.add_timer(cfg.CONF.periodic_interval,
|
self.tg.add_timer(cfg.CONF.periodic_interval,
|
||||||
self._periodic_watcher_task,
|
self._service_task)
|
||||||
context=admin_context)
|
|
||||||
|
# Create a periodic_watcher_task per-stack
|
||||||
|
# We use the admin context to get the list of all stacks
|
||||||
|
# then retrieve the stored per-stack context to be passed to
|
||||||
|
# the periodic task
|
||||||
|
admin_context = context.get_admin_context()
|
||||||
|
stacks = db_api.stack_get_all(admin_context)
|
||||||
|
for s in stacks:
|
||||||
|
user_creds = db_api.user_creds_get(s.user_creds_id)
|
||||||
|
stack_context = context.RequestContext.from_dict(user_creds)
|
||||||
|
self._timer_in_thread(s.id, s.name,
|
||||||
|
self._periodic_watcher_task,
|
||||||
|
context=stack_context,
|
||||||
|
sid=s.id)
|
||||||
|
|
||||||
def identify_stack(self, context, stack_name):
|
def identify_stack(self, context, stack_name):
|
||||||
"""
|
"""
|
||||||
@ -157,6 +193,12 @@ class EngineService(service.Service):
|
|||||||
|
|
||||||
self._start_in_thread(stack_id, stack_name, stack.create)
|
self._start_in_thread(stack_id, stack_name, stack.create)
|
||||||
|
|
||||||
|
# Schedule a periodic watcher task for this stack
|
||||||
|
self._timer_in_thread(stack_id, stack_name,
|
||||||
|
self._periodic_watcher_task,
|
||||||
|
context=context,
|
||||||
|
sid=stack_id)
|
||||||
|
|
||||||
return dict(stack.identifier())
|
return dict(stack.identifier())
|
||||||
|
|
||||||
def update_stack(self, context, stack_identity, template, params, args):
|
def update_stack(self, context, stack_identity, template, params, args):
|
||||||
@ -353,9 +395,10 @@ class EngineService(service.Service):
|
|||||||
|
|
||||||
return [None, resource.metadata]
|
return [None, resource.metadata]
|
||||||
|
|
||||||
def _periodic_watcher_task(self, context):
|
def _periodic_watcher_task(self, context, sid):
|
||||||
try:
|
try:
|
||||||
wrn = [w.name for w in db_api.watch_rule_get_all(context)]
|
wrn = [w.name for w in
|
||||||
|
db_api.watch_rule_get_all_by_stack(context, sid)]
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
logger.warn('periodic_task db error (%s) %s' %
|
logger.warn('periodic_task db error (%s) %s' %
|
||||||
('watch rule removed?', str(ex)))
|
('watch rule removed?', str(ex)))
|
||||||
|
@ -229,29 +229,10 @@ class WatchRule(object):
|
|||||||
new_state)
|
new_state)
|
||||||
actioned = True
|
actioned = True
|
||||||
else:
|
else:
|
||||||
# FIXME : hack workaround for new stack_get_by_name tenant
|
s = db_api.stack_get(self.context, self.stack_id)
|
||||||
# scoping, this is the simplest possible solution to the
|
|
||||||
# HA/Autoscaling regression described in bug/1078779
|
|
||||||
# Further work in-progress here (shardy) as this
|
|
||||||
# breaks when stack_id is not unique accross tenants
|
|
||||||
sl = [x for x in
|
|
||||||
db_api.stack_get_all(self.context)
|
|
||||||
if x.id == self.stack_id]
|
|
||||||
s = None
|
|
||||||
if len(sl) == 1:
|
|
||||||
s = sl[0]
|
|
||||||
elif len(sl) > 1:
|
|
||||||
logger.error("stack %s not unique, " % self.stack_id
|
|
||||||
+ "cannot action watch rule")
|
|
||||||
else:
|
|
||||||
logger.error("stack %s could not be found, " %
|
|
||||||
self.stack_id + "cannot action watch rule")
|
|
||||||
|
|
||||||
if s and s.status in (parser.Stack.CREATE_COMPLETE,
|
if s and s.status in (parser.Stack.CREATE_COMPLETE,
|
||||||
parser.Stack.UPDATE_COMPLETE):
|
parser.Stack.UPDATE_COMPLETE):
|
||||||
user_creds = db_api.user_creds_get(s.user_creds_id)
|
stack = parser.Stack.load(self.context, stack=s)
|
||||||
ctxt = ctxtlib.RequestContext.from_dict(user_creds)
|
|
||||||
stack = parser.Stack.load(ctxt, stack=s)
|
|
||||||
for a in self.rule[self.ACTION_MAP[new_state]]:
|
for a in self.rule[self.ACTION_MAP[new_state]]:
|
||||||
greenpool.spawn_n(stack[a].alarm)
|
greenpool.spawn_n(stack[a].alarm)
|
||||||
actioned = True
|
actioned = True
|
||||||
|
@ -80,6 +80,10 @@ def setup_mocks(mocks, stack):
|
|||||||
|
|
||||||
|
|
||||||
class DummyThreadGroup(object):
|
class DummyThreadGroup(object):
|
||||||
|
def add_timer(self, interval, callback, initial_delay=None,
|
||||||
|
*args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
def add_thread(self, callback, *args, **kwargs):
|
def add_thread(self, callback, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user