Add a new ThreadGroupManager class

Introduce a ThreadGroupManager class to manage active ThreadGroups for
engine services and move the thread-related EngineService methods to
ThreadGroupManager.  This will make it easier for multiple engine
services to manage thread groups.

Partial-Bug: #1262012
Change-Id: I1ce7785021726c274a294ef402b036540d395a29
This commit is contained in:
Jason Dunsmore 2014-01-20 15:02:34 -06:00
parent 834eb42e2e
commit 07884448fe
4 changed files with 183 additions and 132 deletions

View File

@ -64,6 +64,92 @@ def request_context(func):
return wrapped
class ThreadGroupManager(object):
def __init__(self):
super(ThreadGroupManager, self).__init__()
self.groups = {}
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
self.add_timer(cfg.CONF.periodic_interval, self._service_task)
def _service_task(self):
"""
This is a dummy task which gets queued on the service.Service
threadgroup. Without this service.Service sees nothing running
i.e has nothing to wait() on, so the process exits..
This could also be used to trigger periodic non-stack-specific
housekeeping tasks
"""
pass
def start(self, stack_id, func, *args, **kwargs):
"""
Run the given method in a sub-thread.
"""
if stack_id not in self.groups:
self.groups[stack_id] = threadgroup.ThreadGroup()
return self.groups[stack_id].add_thread(func, *args, **kwargs)
def start_with_lock(self, cnxt, stack, engine_id, func, *args, **kwargs):
"""
Try to acquire a stack lock and, if successful, run the given
method in a sub-thread. Release the lock when the thread
finishes.
:param cnxt: RPC context
:param stack: Stack to be operated on
:type stack: heat.engine.parser.Stack
:param engine_id: The UUID of the engine acquiring the lock
:param func: Callable to be invoked in sub-thread
:type func: function or instancemethod
:param args: Args to be passed to func
:param kwargs: Keyword-args to be passed to func.
"""
lock = stack_lock.StackLock(cnxt, stack, engine_id)
lock.acquire()
self.start_with_acquired_lock(stack, lock, func, *args, **kwargs)
def start_with_acquired_lock(self, stack, lock, func, *args, **kwargs):
"""
Run the given method in a sub-thread and release the provided lock
when the thread finishes.
:param stack: Stack to be operated on
:type stack: heat.engine.parser.Stack
:param lock: The acquired stack lock
:type lock: heat.engine.stack_lock.StackLock
:param func: Callable to be invoked in sub-thread
:type func: function or instancemethod
:param args: Args to be passed to func
:param kwargs: Keyword-args to be passed to func
"""
def release(gt, *args, **kwargs):
"""
Callback function that will be passed to GreenThread.link().
"""
lock.release()
try:
th = self.start(stack.id, func, *args)
th.link(release)
except:
with excutils.save_and_reraise_exception():
lock.release()
def add_timer(self, stack_id, func, *args, **kwargs):
"""
Define a periodic task, to be run in a separate thread, in the stack
threadgroups. Periodicity is cfg.CONF.periodic_interval
"""
if stack_id not in self.groups:
self.groups[stack_id] = threadgroup.ThreadGroup()
self.groups[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
class EngineListener(service.Service):
'''
Listen on an AMQP queue while a stack action is in-progress and
@ -92,68 +178,14 @@ class EngineService(service.Service):
def __init__(self, host, topic, manager=None):
super(EngineService, self).__init__(host, topic)
# stg == "Stack Thread Groups"
self.stg = {}
resources.initialise()
self.listener = EngineListener(host, stack_lock.engine_id)
logger.debug(_("Starting listener for engine %s")
% stack_lock.engine_id)
self.engine_id = stack_lock.StackLock.generate_engine_id()
self.thread_group_mgr = ThreadGroupManager()
self.listener = EngineListener(host, self.engine_id)
logger.debug(_("Starting listener for engine %s") % self.engine_id)
self.listener.start()
def _start_in_thread(self, stack_id, func, *args, **kwargs):
if stack_id not in self.stg:
self.stg[stack_id] = threadgroup.ThreadGroup()
return self.stg[stack_id].add_thread(func, *args, **kwargs)
def _start_thread_with_lock(self, cnxt, stack, func, *args):
"""
Try to acquire a stack lock and, if successful, run the method in a
sub-thread.
:param cnxt: RPC context
:param stack: Stack to be operated on
:type stack: heat.engine.parser.Stack
:param func: Callable to be invoked in sub-thread
:type func: function or instancemethod
:param args: Args to be passed to func
"""
lock = stack_lock.StackLock(cnxt, stack)
def release(gt, *args, **kwargs):
"""
Callback function that will be passed to GreenThread.link().
"""
lock.release()
lock.acquire()
try:
th = self._start_in_thread(stack.id, func, *args)
th.link(release)
except:
with excutils.save_and_reraise_exception():
lock.release()
def _timer_in_thread(self, stack_id, func, *args, **kwargs):
"""
Define a periodic task, to be run in a separate thread, in the stack
threadgroups. Periodicity is cfg.CONF.periodic_interval
"""
if stack_id not in self.stg:
self.stg[stack_id] = threadgroup.ThreadGroup()
self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
def _service_task(self):
"""
This is a dummy task which gets queued on the service.Service
threadgroup. Without this service.Service sees nothing running
i.e has nothing to wait() on, so the process exits..
This could also be used to trigger periodic non-stack-specific
housekeeping tasks
"""
pass
def _start_watch_task(self, stack_id, cnxt):
def stack_has_a_watchrule(sid):
@ -177,17 +209,13 @@ class EngineService(service.Service):
return start_watch_thread
if stack_has_a_watchrule(stack_id):
self._timer_in_thread(stack_id, self._periodic_watcher_task,
sid=stack_id)
self.thread_group_mgr.add_timer(stack_id,
self._periodic_watcher_task,
sid=stack_id)
def start(self):
super(EngineService, self).start()
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
self.tg.add_timer(cfg.CONF.periodic_interval,
self._service_task)
# Create a periodic_watcher_task per-stack
admin_context = context.get_admin_context()
stacks = db_api.stack_get_all(admin_context)
@ -363,7 +391,8 @@ class EngineService(service.Service):
stack.store()
self._start_thread_with_lock(cnxt, stack, _stack_create, stack)
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
_stack_create, stack)
return dict(stack.identifier())
@ -413,8 +442,10 @@ class EngineService(service.Service):
self._validate_deferred_auth_context(cnxt, updated_stack)
updated_stack.validate()
self._start_thread_with_lock(cnxt, current_stack, current_stack.update,
updated_stack)
self.thread_group_mgr.start_with_lock(cnxt, current_stack,
self.engine_id,
current_stack.update,
updated_stack)
return dict(current_stack.identifier())
@ -513,7 +544,8 @@ class EngineService(service.Service):
stack = parser.Stack.load(cnxt, stack=st)
self._start_thread_with_lock(cnxt, stack, stack.delete)
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
stack.delete)
return None
@request_context
@ -531,7 +563,8 @@ class EngineService(service.Service):
stack_info = stack.get_abandon_data()
# Set deletion policy to 'Retain' for all resources in the stack.
stack.set_deletion_policy(resource.RETAIN)
self._start_thread_with_lock(cnxt, stack, stack.delete)
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
stack.delete)
return stack_info
def list_resource_types(self, cnxt, support_status=None):
@ -725,7 +758,8 @@ class EngineService(service.Service):
s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s)
self._start_thread_with_lock(cnxt, stack, _stack_suspend, stack)
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
_stack_suspend, stack)
@request_context
def stack_resume(self, cnxt, stack_identity):
@ -739,7 +773,8 @@ class EngineService(service.Service):
s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s)
self._start_thread_with_lock(cnxt, stack, _stack_resume, stack)
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
_stack_resume, stack)
def _load_user_creds(self, creds_id):
user_creds = db_api.user_creds_get(creds_id)
@ -821,8 +856,8 @@ class EngineService(service.Service):
rule = watchrule.WatchRule.load(stack_context, watch=wr)
actions = rule.evaluate()
if actions:
self._start_in_thread(sid, run_alarm_action, actions,
rule.get_details())
self.thread_group_mgr.start(sid, run_alarm_action, actions,
rule.get_details())
def _periodic_watcher_task(self, sid):
"""
@ -922,7 +957,7 @@ class EngineService(service.Service):
return
actions = wr.set_watch_state(state)
for action in actions:
self._start_in_thread(wr.stack_id, action)
self.thread_group_mgr.start(wr.stack_id, action)
# Return the watch with the state overriden to indicate success
# We do not update the timestamps as we are not modifying the DB

View File

@ -25,13 +25,13 @@ from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import proxy
logger = logging.getLogger(__name__)
engine_id = str(uuid.uuid4())
class StackLock(object):
def __init__(self, context, stack):
def __init__(self, context, stack, engine_id):
self.context = context
self.stack = stack
self.engine_id = engine_id
self.listener = None
def _engine_alive(self, engine_id):
@ -44,17 +44,28 @@ class StackLock(object):
except rpc_common.Timeout:
return False
@staticmethod
def generate_engine_id():
return str(uuid.uuid4())
@rpc_common.client_exceptions(exception.ActionInProgress)
def acquire(self, retry=True):
"""Acquire a lock on the stack."""
lock_engine_id = db_api.stack_lock_create(self.stack.id, engine_id)
"""
Acquire a lock on the stack.
:param retry: When True, retry if lock was released while stealing.
:type retry: boolean
"""
lock_engine_id = db_api.stack_lock_create(self.stack.id,
self.engine_id)
if lock_engine_id is None:
logger.debug(_("Engine %(engine)s acquired lock on stack "
"%(stack)s") % {'engine': engine_id,
"%(stack)s") % {'engine': self.engine_id,
'stack': self.stack.id})
return
if lock_engine_id == engine_id or self._engine_alive(lock_engine_id):
if lock_engine_id == self.engine_id or \
self._engine_alive(lock_engine_id):
logger.debug(_("Lock on stack %(stack)s is owned by engine "
"%(engine)s") % {'stack': self.stack.id,
'engine': lock_engine_id})
@ -63,22 +74,23 @@ class StackLock(object):
else:
logger.info(_("Stale lock detected on stack %(stack)s. Engine "
"%(engine)s will attempt to steal the lock")
% {'stack': self.stack.id, 'engine': engine_id})
% {'stack': self.stack.id, 'engine': self.engine_id})
result = db_api.stack_lock_steal(self.stack.id, lock_engine_id,
engine_id)
self.engine_id)
if result is None:
logger.info(_("Engine %(engine)s successfully stole the lock "
"on stack %(stack)s") % {'engine': engine_id,
'stack': self.stack.id})
"on stack %(stack)s")
% {'engine': self.engine_id,
'stack': self.stack.id})
return
elif result is True:
if retry:
logger.info(_("The lock on stack %(stack)s was released "
"while engine %(engine)s was stealing it. "
"Trying again") % {'stack': self.stack.id,
'engine': engine_id})
'engine': self.engine_id})
return self.acquire(retry=False)
else:
new_lock_engine_id = result
@ -93,11 +105,11 @@ class StackLock(object):
def release(self):
"""Release a stack lock."""
# Only the engine that owns the lock will be releasing it.
result = db_api.stack_lock_release(self.stack.id, engine_id)
result = db_api.stack_lock_release(self.stack.id, self.engine_id)
if result is True:
logger.warning(_("Lock was already released on stack %s!")
% self.stack.id)
else:
logger.debug(_("Engine %(engine)s released lock on stack "
"%(stack)s") % {'engine': engine_id,
"%(stack)s") % {'engine': self.engine_id,
'stack': self.stack.id})

View File

@ -911,10 +911,9 @@ class StackServiceSuspendResumeTest(HeatTestCase):
thread = self.m.CreateMockAnything()
thread.link(mox.IgnoreArg()).AndReturn(None)
self.m.StubOutWithMock(service.EngineService, '_start_in_thread')
service.EngineService._start_in_thread(sid,
mox.IgnoreArg(),
stack).AndReturn(thread)
self.m.StubOutWithMock(service.ThreadGroupManager, 'start')
service.ThreadGroupManager.start(sid, mox.IgnoreArg(),
stack).AndReturn(thread)
self.m.ReplayAll()
result = self.man.stack_suspend(self.ctx, stack.identifier())
@ -930,10 +929,9 @@ class StackServiceSuspendResumeTest(HeatTestCase):
thread = self.m.CreateMockAnything()
thread.link(mox.IgnoreArg()).AndReturn(None)
self.m.StubOutWithMock(service.EngineService, '_start_in_thread')
service.EngineService._start_in_thread(self.stack.id,
mox.IgnoreArg(),
self.stack).AndReturn(thread)
self.m.StubOutWithMock(service.ThreadGroupManager, 'start')
service.ThreadGroupManager.start(self.stack.id, mox.IgnoreArg(),
self.stack).AndReturn(thread)
self.m.ReplayAll()
@ -1158,7 +1156,7 @@ class StackServiceTest(HeatTestCase):
def run(stack_id, func, *args):
func(*args)
return thread
self.eng._start_in_thread = run
self.eng.thread_group_mgr.start = run
new_tmpl = {'Resources': {'AResource': {'Type':
'GenericResourceType'}}}
@ -1753,9 +1751,10 @@ class StackServiceTest(HeatTestCase):
@stack_context('periodic_watch_task_not_created')
def test_periodic_watch_task_not_created(self):
self.eng.stg[self.stack.id] = DummyThreadGroup()
self.eng.thread_group_mgr.groups[self.stack.id] = DummyThreadGroup()
self.eng._start_watch_task(self.stack.id, self.ctx)
self.assertEqual([], self.eng.stg[self.stack.id].threads)
self.assertEqual(
[], self.eng.thread_group_mgr.groups[self.stack.id].threads)
def test_periodic_watch_task_created(self):
stack = get_stack('period_watch_task_created',
@ -1765,10 +1764,11 @@ class StackServiceTest(HeatTestCase):
self.m.ReplayAll()
stack.store()
stack.create()
self.eng.stg[stack.id] = DummyThreadGroup()
self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
self.eng._start_watch_task(stack.id, self.ctx)
self.assertEqual([self.eng._periodic_watcher_task],
self.eng.stg[stack.id].threads)
expected = [self.eng._periodic_watcher_task]
observed = self.eng.thread_group_mgr.groups[stack.id].threads
self.assertEqual(expected, observed)
self.stack.delete()
def test_periodic_watch_task_created_nested(self):
@ -1785,10 +1785,10 @@ class StackServiceTest(HeatTestCase):
self.m.ReplayAll()
stack.store()
stack.create()
self.eng.stg[stack.id] = DummyThreadGroup()
self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
self.eng._start_watch_task(stack.id, self.ctx)
self.assertEqual([self.eng._periodic_watcher_task],
self.eng.stg[stack.id].threads)
self.eng.thread_group_mgr.groups[stack.id].threads)
self.stack.delete()
@stack_context('service_show_watch_test_stack', False)
@ -1923,7 +1923,7 @@ class StackServiceTest(HeatTestCase):
# Replace the real stack threadgroup with a dummy one, so we can
# check the function returned on ALARM is correctly scheduled
self.eng.stg[self.stack.id] = DummyThreadGroup()
self.eng.thread_group_mgr.groups[self.stack.id] = DummyThreadGroup()
self.m.ReplayAll()
@ -1932,22 +1932,25 @@ class StackServiceTest(HeatTestCase):
watch_name="OverrideAlarm",
state=state)
self.assertEqual(state, result[engine_api.WATCH_STATE_VALUE])
self.assertEqual([], self.eng.stg[self.stack.id].threads)
self.assertEqual(
[], self.eng.thread_group_mgr.groups[self.stack.id].threads)
state = watchrule.WatchRule.NORMAL
result = self.eng.set_watch_state(self.ctx,
watch_name="OverrideAlarm",
state=state)
self.assertEqual(state, result[engine_api.WATCH_STATE_VALUE])
self.assertEqual([], self.eng.stg[self.stack.id].threads)
self.assertEqual(
[], self.eng.thread_group_mgr.groups[self.stack.id].threads)
state = watchrule.WatchRule.ALARM
result = self.eng.set_watch_state(self.ctx,
watch_name="OverrideAlarm",
state=state)
self.assertEqual(state, result[engine_api.WATCH_STATE_VALUE])
self.assertEqual([DummyAction.signal],
self.eng.stg[self.stack.id].threads)
self.assertEqual(
[DummyAction.signal],
self.eng.thread_group_mgr.groups[self.stack.id].threads)
self.m.VerifyAll()

View File

@ -28,32 +28,33 @@ class StackLockTest(HeatTestCase):
self.stack.id = "aae01f2d-52ae-47ac-8a0d-3fde3d220fea"
self.stack.name = "test_stack"
self.stack.action = "CREATE"
self.engine_id = stack_lock.StackLock.generate_engine_id()
def test_successful_acquire_new_lock(self):
self.m.StubOutWithMock(db_api, "stack_lock_create")
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn(None)
self.m.ReplayAll()
slock = stack_lock.StackLock(self.context, self.stack)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock.acquire()
self.m.VerifyAll()
def test_failed_acquire_existing_lock_current_engine(self):
self.m.StubOutWithMock(db_api, "stack_lock_create")
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
AndReturn(stack_lock.engine_id)
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn(self.engine_id)
self.m.ReplayAll()
slock = stack_lock.StackLock(self.context, self.stack)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
self.assertRaises(rpc_common.ClientException, slock.acquire)
self.m.VerifyAll()
def test_successful_acquire_existing_lock_engine_dead(self):
self.m.StubOutWithMock(db_api, "stack_lock_create")
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn("fake-engine-id")
topic = self.stack.id
@ -64,17 +65,17 @@ class StackLockTest(HeatTestCase):
self.m.StubOutWithMock(db_api, "stack_lock_steal")
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
stack_lock.engine_id).AndReturn(None)
self.engine_id).AndReturn(None)
self.m.ReplayAll()
slock = stack_lock.StackLock(self.context, self.stack)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock.acquire()
self.m.VerifyAll()
def test_failed_acquire_existing_lock_engine_alive(self):
self.m.StubOutWithMock(db_api, "stack_lock_create")
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn("fake-engine-id")
topic = self.stack.id
@ -85,13 +86,13 @@ class StackLockTest(HeatTestCase):
self.m.ReplayAll()
slock = stack_lock.StackLock(self.context, self.stack)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
self.assertRaises(rpc_common.ClientException, slock.acquire)
self.m.VerifyAll()
def test_failed_acquire_existing_lock_engine_dead(self):
self.m.StubOutWithMock(db_api, "stack_lock_create")
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn("fake-engine-id")
topic = self.stack.id
@ -102,18 +103,18 @@ class StackLockTest(HeatTestCase):
self.m.StubOutWithMock(db_api, "stack_lock_steal")
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
stack_lock.engine_id).\
self.engine_id).\
AndReturn("fake-engine-id2")
self.m.ReplayAll()
slock = stack_lock.StackLock(self.context, self.stack)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
self.assertRaises(rpc_common.ClientException, slock.acquire)
self.m.VerifyAll()
def test_successful_acquire_with_retry(self):
self.m.StubOutWithMock(db_api, "stack_lock_create")
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn("fake-engine-id")
topic = self.stack.id
@ -124,10 +125,10 @@ class StackLockTest(HeatTestCase):
self.m.StubOutWithMock(db_api, "stack_lock_steal")
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
stack_lock.engine_id).\
self.engine_id).\
AndReturn(True)
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn("fake-engine-id")
topic = self.stack.id
@ -136,18 +137,18 @@ class StackLockTest(HeatTestCase):
topic="fake-engine-id").AndRaise(rpc_common.Timeout)
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
stack_lock.engine_id).\
self.engine_id).\
AndReturn(None)
self.m.ReplayAll()
slock = stack_lock.StackLock(self.context, self.stack)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock.acquire()
self.m.VerifyAll()
def test_failed_acquire_one_retry_only(self):
self.m.StubOutWithMock(db_api, "stack_lock_create")
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn("fake-engine-id")
topic = self.stack.id
@ -158,10 +159,10 @@ class StackLockTest(HeatTestCase):
self.m.StubOutWithMock(db_api, "stack_lock_steal")
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
stack_lock.engine_id).\
self.engine_id).\
AndReturn(True)
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
db_api.stack_lock_create(self.stack.id, self.engine_id).\
AndReturn("fake-engine-id")
topic = self.stack.id
@ -170,11 +171,11 @@ class StackLockTest(HeatTestCase):
topic="fake-engine-id").AndRaise(rpc_common.Timeout)
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
stack_lock.engine_id).\
self.engine_id).\
AndReturn(True)
self.m.ReplayAll()
slock = stack_lock.StackLock(self.context, self.stack)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
self.assertRaises(rpc_common.ClientException, slock.acquire)
self.m.VerifyAll()