Merge "Pass stack id to init stacklock"

This commit is contained in:
Jenkins 2015-04-23 04:33:52 +00:00 committed by Gerrit Code Review
commit e02136d8b8
5 changed files with 101 additions and 78 deletions

View File

@ -135,8 +135,8 @@ class ThreadGroupManager(object):
:param args: Args to be passed to func
:param kwargs: Keyword-args to be passed to func.
"""
lock = stack_lock.StackLock(cnxt, stack, engine_id)
with lock.thread_lock(stack.id):
lock = stack_lock.StackLock(cnxt, stack.id, engine_id)
with lock.thread_lock():
th = self.start_with_acquired_lock(stack, lock,
func, *args, **kwargs)
return th
@ -156,14 +156,14 @@ class ThreadGroupManager(object):
:param kwargs: Keyword-args to be passed to func
"""
def release(gt, *args):
def release(gt):
"""
Callback function that will be passed to GreenThread.link().
"""
lock.release(*args)
lock.release()
th = self.start(stack.id, func, *args, **kwargs)
th.link(release, stack.id)
th.link(release)
return th
def add_timer(self, stack_id, func, *args, **kwargs):
@ -795,7 +795,7 @@ class EngineService(service.Service):
# stop the running update and take the lock
# as we cancel only running update, the acquire_result is
# always some engine_id, not None
lock = stack_lock.StackLock(cnxt, current_stack,
lock = stack_lock.StackLock(cnxt, current_stack.id,
self.engine_id)
engine_id = lock.try_acquire()
# Current engine has the lock
@ -927,8 +927,8 @@ class EngineService(service.Service):
LOG.info(_LI('Deleting stack %s'), st.name)
stack = parser.Stack.load(cnxt, stack=st)
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
with lock.try_thread_lock(stack.id) as acquire_result:
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
with lock.try_thread_lock() as acquire_result:
# Successfully acquired lock
if acquire_result is None:
@ -979,8 +979,8 @@ class EngineService(service.Service):
st = self._get_stack(cnxt, stack_identity)
LOG.info(_LI('abandoning stack %s'), st.name)
stack = parser.Stack.load(cnxt, stack=st)
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
with lock.thread_lock(stack.id):
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
with lock.thread_lock():
# Get stack details before deleting it.
stack_info = stack.prepare_abandon()
self.thread_group_mgr.start_with_acquired_lock(stack,
@ -1274,9 +1274,9 @@ class EngineService(service.Service):
raise exception.ActionInProgress(stack_name=stack.name,
action=stack.action)
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
with lock.thread_lock(stack.id):
with lock.thread_lock():
snapshot = snapshot_object.Snapshot.create(cnxt, {
'tenant': cnxt.tenant_id,
'name': name,
@ -1605,9 +1605,7 @@ class EngineService(service.Service):
filters=filters,
tenant_safe=False) or []
for s in stacks:
stk = parser.Stack.load(cnxt, stack=s,
use_stored_context=True)
lock = stack_lock.StackLock(cnxt, stk, self.engine_id)
lock = stack_lock.StackLock(cnxt, s.id, self.engine_id)
# If stacklock is released, means stack status may changed.
engine_id = lock.get_engine_id()
if not engine_id:
@ -1617,6 +1615,8 @@ class EngineService(service.Service):
lock.acquire(retry=False)
except exception.ActionInProgress:
continue
stk = parser.Stack.load(cnxt, stack=s,
use_stored_context=True)
LOG.info(_LI('Engine %(engine)s went down when stack %(stack_id)s'
' was in action %(action)s'),
{'engine': engine_id, 'action': stk.action,

View File

@ -23,6 +23,7 @@ from heat.common import exception
from heat.common.i18n import _LI
from heat.common.i18n import _LW
from heat.common import messaging as rpc_messaging
from heat.objects import stack as stack_object
from heat.objects import stack_lock as stack_lock_object
from heat.rpc import api as rpc_api
@ -32,9 +33,9 @@ LOG = logging.getLogger(__name__)
class StackLock(object):
def __init__(self, context, stack, engine_id):
def __init__(self, context, stack_id, engine_id):
self.context = context
self.stack = stack
self.stack_id = stack_id
self.engine_id = engine_id
self.listener = None
@ -55,14 +56,14 @@ class StackLock(object):
return str(uuid.uuid4())
def get_engine_id(self):
return stack_lock_object.StackLock.get_engine_id(self.stack.id)
return stack_lock_object.StackLock.get_engine_id(self.stack_id)
def try_acquire(self):
"""
Try to acquire a stack lock, but don't raise an ActionInProgress
exception or try to steal lock.
"""
return stack_lock_object.StackLock.create(self.stack.id,
return stack_lock_object.StackLock.create(self.stack_id,
self.engine_id)
def acquire(self, retry=True):
@ -72,27 +73,29 @@ class StackLock(object):
:param retry: When True, retry if lock was released while stealing.
:type retry: boolean
"""
lock_engine_id = stack_lock_object.StackLock.create(self.stack.id,
lock_engine_id = stack_lock_object.StackLock.create(self.stack_id,
self.engine_id)
if lock_engine_id is None:
LOG.debug("Engine %(engine)s acquired lock on stack "
"%(stack)s" % {'engine': self.engine_id,
'stack': self.stack.id})
'stack': self.stack_id})
return
stack = stack_object.Stack.get_by_id(self.context, self.stack_id,
show_deleted=True)
if (lock_engine_id == self.engine_id or
self.engine_alive(self.context, lock_engine_id)):
LOG.debug("Lock on stack %(stack)s is owned by engine "
"%(engine)s" % {'stack': self.stack.id,
"%(engine)s" % {'stack': self.stack_id,
'engine': lock_engine_id})
raise exception.ActionInProgress(stack_name=self.stack.name,
action=self.stack.action)
raise exception.ActionInProgress(stack_name=stack.name,
action=stack.action)
else:
LOG.info(_LI("Stale lock detected on stack %(stack)s. Engine "
"%(engine)s will attempt to steal the lock"),
{'stack': self.stack.id, 'engine': self.engine_id})
{'stack': self.stack_id, 'engine': self.engine_id})
result = stack_lock_object.StackLock.steal(self.stack.id,
result = stack_lock_object.StackLock.steal(self.stack_id,
lock_engine_id,
self.engine_id)
@ -100,39 +103,40 @@ class StackLock(object):
LOG.info(_LI("Engine %(engine)s successfully stole the lock "
"on stack %(stack)s"),
{'engine': self.engine_id,
'stack': self.stack.id})
'stack': self.stack_id})
return
elif result is True:
if retry:
LOG.info(_LI("The lock on stack %(stack)s was released "
"while engine %(engine)s was stealing it. "
"Trying again"), {'stack': self.stack.id,
"Trying again"), {'stack': self.stack_id,
'engine': self.engine_id})
return self.acquire(retry=False)
else:
new_lock_engine_id = result
LOG.info(_LI("Failed to steal lock on stack %(stack)s. "
"Engine %(engine)s stole the lock first"),
{'stack': self.stack.id,
{'stack': self.stack_id,
'engine': new_lock_engine_id})
raise exception.ActionInProgress(
stack_name=self.stack.name, action=self.stack.action)
stack_name=stack.name, action=stack.action)
def release(self, stack_id):
def release(self):
"""Release a stack lock."""
# Only the engine that owns the lock will be releasing it.
result = stack_lock_object.StackLock.release(stack_id,
result = stack_lock_object.StackLock.release(self.stack_id,
self.engine_id)
if result is True:
LOG.warn(_LW("Lock was already released on stack %s!"), stack_id)
LOG.warn(_LW("Lock was already released on stack %s!"),
self.stack_id)
else:
LOG.debug("Engine %(engine)s released lock on stack "
"%(stack)s" % {'engine': self.engine_id,
'stack': stack_id})
'stack': self.stack_id})
@contextlib.contextmanager
def thread_lock(self, stack_id):
def thread_lock(self):
"""
Acquire a lock and release it only if there is an exception. The
release method still needs to be scheduled to be run at the
@ -145,10 +149,10 @@ class StackLock(object):
raise
except: # noqa
with excutils.save_and_reraise_exception():
self.release(stack_id)
self.release()
@contextlib.contextmanager
def try_thread_lock(self, stack_id):
def try_thread_lock(self):
"""
Similar to thread_lock, but acquire the lock using try_acquire
and only release it upon any exception after a successful
@ -161,5 +165,5 @@ class StackLock(object):
except: # noqa
if result is None: # Lock was successfully acquired
with excutils.save_and_reraise_exception():
self.release(stack_id)
self.release()
raise

View File

@ -75,7 +75,7 @@ class StackServiceActionsTest(common.HeatTestCase):
result = self.man.stack_suspend(self.ctx, stk.identifier())
self.assertIsNone(result)
mock_load.assert_called_once_with(self.ctx, stack=s)
mock_link.assert_called_once_with(mock.ANY, sid)
mock_link.assert_called_once_with(mock.ANY)
mock_start.assert_called_once_with(sid, mock.ANY, stk)
stk.delete()
@ -96,7 +96,7 @@ class StackServiceActionsTest(common.HeatTestCase):
self.assertIsNone(result)
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
mock_link.assert_called_once_with(mock.ANY, sid)
mock_link.assert_called_once_with(mock.ANY)
mock_start.assert_called_once_with(sid, mock.ANY, stk)
stk.delete()

View File

@ -1744,7 +1744,7 @@ class StackServiceTest(common.HeatTestCase):
generic_rsrc.GenericResource)
thread = self.m.CreateMockAnything()
thread.link(mox.IgnoreArg(), self.stack.id).AndReturn(None)
thread.link(mox.IgnoreArg()).AndReturn(None)
thread.link(mox.IgnoreArg(), self.stack.id,
mox.IgnoreArg()).AndReturn(None)
@ -4280,7 +4280,7 @@ class ThreadGroupManagerTest(common.HeatTestCase):
thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f,
*self.fargs, **self.fkwargs)
self.stlock_mock.StackLock.assert_called_with(self.cnxt,
self.stack,
self.stack.id,
self.engine_id)
thm.start_with_acquired_lock.assert_called_once_with(

View File

@ -16,6 +16,7 @@ import oslo_messaging as messaging
from heat.common import exception
from heat.engine import stack_lock
from heat.objects import stack as stack_object
from heat.objects import stack_lock as stack_lock_object
from heat.tests import common
from heat.tests import utils
@ -25,11 +26,14 @@ class StackLockTest(common.HeatTestCase):
def setUp(self):
super(StackLockTest, self).setUp()
self.context = utils.dummy_context()
self.stack = mock.MagicMock()
self.stack.id = "aae01f2d-52ae-47ac-8a0d-3fde3d220fea"
self.stack.name = "test_stack"
self.stack.action = "CREATE"
self.stack_id = "aae01f2d-52ae-47ac-8a0d-3fde3d220fea"
self.engine_id = stack_lock.StackLock.generate_engine_id()
stack = mock.MagicMock()
stack.id = self.stack_id
stack.name = "test_stack"
stack.action = "CREATE"
self.patchobject(stack_object.Stack, 'get_by_id',
return_value=stack)
class TestThreadLockException(Exception):
pass
@ -39,20 +43,22 @@ class StackLockTest(common.HeatTestCase):
'create',
return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
slock.acquire()
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
def test_failed_acquire_existing_lock_current_engine(self):
mock_create = self.patchobject(stack_lock_object.StackLock,
'create',
return_value=self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.assertRaises(exception.ActionInProgress, slock.acquire)
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
def test_successful_acquire_existing_lock_engine_dead(self):
mock_create = self.patchobject(stack_lock_object.StackLock,
@ -62,12 +68,13 @@ class StackLockTest(common.HeatTestCase):
'steal',
return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=False)
slock.acquire()
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_steal.assert_called_once_with(self.stack.id, 'fake-engine-id',
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
mock_steal.assert_called_once_with(self.stack_id, 'fake-engine-id',
self.engine_id)
def test_failed_acquire_existing_lock_engine_alive(self):
@ -75,11 +82,12 @@ class StackLockTest(common.HeatTestCase):
'create',
return_value='fake-engine-id')
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=True)
self.assertRaises(exception.ActionInProgress, slock.acquire)
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
def test_failed_acquire_existing_lock_engine_dead(self):
mock_create = self.patchobject(stack_lock_object.StackLock,
@ -89,12 +97,13 @@ class StackLockTest(common.HeatTestCase):
'steal',
return_value='fake-engine-id2')
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=False)
self.assertRaises(exception.ActionInProgress, slock.acquire)
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_steal.assert_called_once_with(self.stack.id, 'fake-engine-id',
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
mock_steal.assert_called_once_with(self.stack_id, 'fake-engine-id',
self.engine_id)
def test_successful_acquire_with_retry(self):
@ -105,14 +114,15 @@ class StackLockTest(common.HeatTestCase):
'steal',
side_effect=[True, None])
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=False)
slock.acquire()
mock_create.assert_has_calls(
[mock.call(self.stack.id, self.engine_id)] * 2)
[mock.call(self.stack_id, self.engine_id)] * 2)
mock_steal.assert_has_calls(
[mock.call(self.stack.id, 'fake-engine-id', self.engine_id)] * 2)
[mock.call(self.stack_id, 'fake-engine-id', self.engine_id)] * 2)
def test_failed_acquire_one_retry_only(self):
mock_create = self.patchobject(stack_lock_object.StackLock,
@ -122,22 +132,24 @@ class StackLockTest(common.HeatTestCase):
'steal',
return_value=True)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=False)
self.assertRaises(exception.ActionInProgress, slock.acquire)
mock_create.assert_has_calls(
[mock.call(self.stack.id, self.engine_id)] * 2)
[mock.call(self.stack_id, self.engine_id)] * 2)
mock_steal.assert_has_calls(
[mock.call(self.stack.id, 'fake-engine-id', self.engine_id)] * 2)
[mock.call(self.stack_id, 'fake-engine-id', self.engine_id)] * 2)
def test_thread_lock_context_mgr_exception_acquire_success(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=None)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
def check_thread_lock():
with slock.thread_lock(self.stack.id):
with slock.thread_lock():
self.assertEqual(1,
stack_lock_object.StackLock.create.call_count)
raise self.TestThreadLockException
@ -148,10 +160,11 @@ class StackLockTest(common.HeatTestCase):
stack_lock_object.StackLock.create = mock.Mock(
return_value=self.engine_id)
stack_lock_object.StackLock.release = mock.Mock()
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
def check_thread_lock():
with slock.thread_lock(self.stack.id):
with slock.thread_lock():
self.assertEqual(1,
stack_lock_object.StackLock.create.call_count)
raise exception.ActionInProgress
@ -161,18 +174,20 @@ class StackLockTest(common.HeatTestCase):
def test_thread_lock_context_mgr_no_exception(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=None)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
with slock.thread_lock(self.stack.id):
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
with slock.thread_lock():
self.assertEqual(1, stack_lock_object.StackLock.create.call_count)
assert not stack_lock_object.StackLock.release.called
def test_try_thread_lock_context_mgr_exception(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=None)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
def check_thread_lock():
with slock.try_thread_lock(self.stack.id):
with slock.try_thread_lock():
self.assertEqual(1,
stack_lock_object.StackLock.create.call_count)
raise self.TestThreadLockException
@ -182,18 +197,20 @@ class StackLockTest(common.HeatTestCase):
def test_try_thread_lock_context_mgr_no_exception(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=None)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
with slock.try_thread_lock(self.stack.id):
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
with slock.try_thread_lock():
self.assertEqual(1, stack_lock_object.StackLock.create.call_count)
assert not stack_lock_object.StackLock.release.called
def test_try_thread_lock_context_mgr_existing_lock(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=1234)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
def check_thread_lock():
with slock.try_thread_lock(self.stack.id):
with slock.try_thread_lock():
self.assertEqual(1,
stack_lock_object.StackLock.create.call_count)
raise self.TestThreadLockException
@ -201,7 +218,8 @@ class StackLockTest(common.HeatTestCase):
assert not stack_lock_object.StackLock.release.called
def test_engine_alive_ok(self):
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
mget_client = self.patchobject(stack_lock.rpc_messaging,
'get_rpc_client')
mclient = mget_client.return_value
@ -213,7 +231,8 @@ class StackLockTest(common.HeatTestCase):
mclient_ctx.call.assert_called_once_with(self.context, 'listening')
def test_engine_alive_timeout(self):
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
mget_client = self.patchobject(stack_lock.rpc_messaging,
'get_rpc_client')
mclient = mget_client.return_value