heat engine move to per-stack periodic watch threads

Use the stack thread groups, so a separate watch thread is started
for each stack - this avoids some of the context scoping problems
previously encountered since we can pass the correct context to the
periodic task when starting it

Fixes bug 1078779

Change-Id: I56e6a4b126199587e91548f450956d77ab2158f3
Signed-off-by: Steven Hardy <shardy@redhat.com>
This commit is contained in:
Steven Hardy 2012-11-19 16:55:23 +00:00
parent 1563f3039f
commit d89f6c0ba1
3 changed files with 55 additions and 27 deletions

View File

@ -53,12 +53,48 @@ class EngineService(service.Service):
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
self.stg[stack_id].add_thread(func, *args, **kwargs)
def _timer_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
"""
Define a periodic task, to be run in a separate thread, in the stack
threadgroups. Periodicity is cfg.CONF.periodic_interval
"""
if stack_id not in self.stg:
thr_name = '%s-%s' % (stack_name, stack_id)
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
def _service_task(self):
"""
This is a dummy task which gets queued on the service.Service
threadgroup. Without this service.Service sees nothing running
i.e has nothing to wait() on, so the process exits..
This could also be used to trigger periodic non-stack-specific
housekeeping tasks
"""
pass
def start(self):
super(EngineService, self).start()
admin_context = context.get_admin_context()
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
self.tg.add_timer(cfg.CONF.periodic_interval,
self._periodic_watcher_task,
context=admin_context)
self._service_task)
# Create a periodic_watcher_task per-stack
# We use the admin context to get the list of all stacks
# then retrieve the stored per-stack context to be passed to
# the periodic task
admin_context = context.get_admin_context()
stacks = db_api.stack_get_all(admin_context)
for s in stacks:
user_creds = db_api.user_creds_get(s.user_creds_id)
stack_context = context.RequestContext.from_dict(user_creds)
self._timer_in_thread(s.id, s.name,
self._periodic_watcher_task,
context=stack_context,
sid=s.id)
def identify_stack(self, context, stack_name):
"""
@ -157,6 +193,12 @@ class EngineService(service.Service):
self._start_in_thread(stack_id, stack_name, stack.create)
# Schedule a periodic watcher task for this stack
self._timer_in_thread(stack_id, stack_name,
self._periodic_watcher_task,
context=context,
sid=stack_id)
return dict(stack.identifier())
def update_stack(self, context, stack_identity, template, params, args):
@ -353,9 +395,10 @@ class EngineService(service.Service):
return [None, resource.metadata]
def _periodic_watcher_task(self, context):
def _periodic_watcher_task(self, context, sid):
try:
wrn = [w.name for w in db_api.watch_rule_get_all(context)]
wrn = [w.name for w in
db_api.watch_rule_get_all_by_stack(context, sid)]
except Exception as ex:
logger.warn('periodic_task db error (%s) %s' %
('watch rule removed?', str(ex)))

View File

@ -229,29 +229,10 @@ class WatchRule(object):
new_state)
actioned = True
else:
# FIXME : hack workaround for new stack_get_by_name tenant
# scoping, this is the simplest possible solution to the
# HA/Autoscaling regression described in bug/1078779
# Further work in-progress here (shardy) as this
# breaks when stack_id is not unique accross tenants
sl = [x for x in
db_api.stack_get_all(self.context)
if x.id == self.stack_id]
s = None
if len(sl) == 1:
s = sl[0]
elif len(sl) > 1:
logger.error("stack %s not unique, " % self.stack_id
+ "cannot action watch rule")
else:
logger.error("stack %s could not be found, " %
self.stack_id + "cannot action watch rule")
s = db_api.stack_get(self.context, self.stack_id)
if s and s.status in (parser.Stack.CREATE_COMPLETE,
parser.Stack.UPDATE_COMPLETE):
user_creds = db_api.user_creds_get(s.user_creds_id)
ctxt = ctxtlib.RequestContext.from_dict(user_creds)
stack = parser.Stack.load(ctxt, stack=s)
parser.Stack.UPDATE_COMPLETE):
stack = parser.Stack.load(self.context, stack=s)
for a in self.rule[self.ACTION_MAP[new_state]]:
greenpool.spawn_n(stack[a].alarm)
actioned = True

View File

@ -80,6 +80,10 @@ def setup_mocks(mocks, stack):
class DummyThreadGroup(object):
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pass
def add_thread(self, callback, *args, **kwargs):
pass