Pass stack id to init stacklock

Only acquire() function in stacklock needs to load whole stack,
other function only needs stack id. This patch only pass
stack id for stacklock so that in some place we don't need
to load whole stack.

Change-Id: Iaad5f4a871f925c4052d159a5d95184681d5bd28
Closes-Bug: #1441972
This commit is contained in:
Ethan Lynn 2015-04-09 13:25:19 +08:00
parent 5854f1d121
commit 0b59af6acc
5 changed files with 101 additions and 78 deletions

View File

@ -135,8 +135,8 @@ class ThreadGroupManager(object):
:param args: Args to be passed to func
:param kwargs: Keyword-args to be passed to func.
"""
lock = stack_lock.StackLock(cnxt, stack, engine_id)
with lock.thread_lock(stack.id):
lock = stack_lock.StackLock(cnxt, stack.id, engine_id)
with lock.thread_lock():
th = self.start_with_acquired_lock(stack, lock,
func, *args, **kwargs)
return th
@ -156,14 +156,14 @@ class ThreadGroupManager(object):
:param kwargs: Keyword-args to be passed to func
"""
def release(gt, *args):
def release(gt):
"""
Callback function that will be passed to GreenThread.link().
"""
lock.release(*args)
lock.release()
th = self.start(stack.id, func, *args, **kwargs)
th.link(release, stack.id)
th.link(release)
return th
def add_timer(self, stack_id, func, *args, **kwargs):
@ -795,7 +795,7 @@ class EngineService(service.Service):
# stop the running update and take the lock
# as we cancel only running update, the acquire_result is
# always some engine_id, not None
lock = stack_lock.StackLock(cnxt, current_stack,
lock = stack_lock.StackLock(cnxt, current_stack.id,
self.engine_id)
engine_id = lock.try_acquire()
# Current engine has the lock
@ -927,8 +927,8 @@ class EngineService(service.Service):
LOG.info(_LI('Deleting stack %s'), st.name)
stack = parser.Stack.load(cnxt, stack=st)
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
with lock.try_thread_lock(stack.id) as acquire_result:
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
with lock.try_thread_lock() as acquire_result:
# Successfully acquired lock
if acquire_result is None:
@ -979,8 +979,8 @@ class EngineService(service.Service):
st = self._get_stack(cnxt, stack_identity)
LOG.info(_LI('abandoning stack %s'), st.name)
stack = parser.Stack.load(cnxt, stack=st)
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
with lock.thread_lock(stack.id):
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
with lock.thread_lock():
# Get stack details before deleting it.
stack_info = stack.prepare_abandon()
self.thread_group_mgr.start_with_acquired_lock(stack,
@ -1274,9 +1274,9 @@ class EngineService(service.Service):
raise exception.ActionInProgress(stack_name=stack.name,
action=stack.action)
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
with lock.thread_lock(stack.id):
with lock.thread_lock():
snapshot = snapshot_object.Snapshot.create(cnxt, {
'tenant': cnxt.tenant_id,
'name': name,
@ -1605,9 +1605,7 @@ class EngineService(service.Service):
filters=filters,
tenant_safe=False) or []
for s in stacks:
stk = parser.Stack.load(cnxt, stack=s,
use_stored_context=True)
lock = stack_lock.StackLock(cnxt, stk, self.engine_id)
lock = stack_lock.StackLock(cnxt, s.id, self.engine_id)
# If stacklock is released, means stack status may changed.
engine_id = lock.get_engine_id()
if not engine_id:
@ -1617,6 +1615,8 @@ class EngineService(service.Service):
lock.acquire(retry=False)
except exception.ActionInProgress:
continue
stk = parser.Stack.load(cnxt, stack=s,
use_stored_context=True)
LOG.info(_LI('Engine %(engine)s went down when stack %(stack_id)s'
' was in action %(action)s'),
{'engine': engine_id, 'action': stk.action,

View File

@ -23,6 +23,7 @@ from heat.common import exception
from heat.common.i18n import _LI
from heat.common.i18n import _LW
from heat.common import messaging as rpc_messaging
from heat.objects import stack as stack_object
from heat.objects import stack_lock as stack_lock_object
from heat.rpc import api as rpc_api
@ -32,9 +33,9 @@ LOG = logging.getLogger(__name__)
class StackLock(object):
def __init__(self, context, stack, engine_id):
def __init__(self, context, stack_id, engine_id):
self.context = context
self.stack = stack
self.stack_id = stack_id
self.engine_id = engine_id
self.listener = None
@ -55,14 +56,14 @@ class StackLock(object):
return str(uuid.uuid4())
def get_engine_id(self):
return stack_lock_object.StackLock.get_engine_id(self.stack.id)
return stack_lock_object.StackLock.get_engine_id(self.stack_id)
def try_acquire(self):
"""
Try to acquire a stack lock, but don't raise an ActionInProgress
exception or try to steal lock.
"""
return stack_lock_object.StackLock.create(self.stack.id,
return stack_lock_object.StackLock.create(self.stack_id,
self.engine_id)
def acquire(self, retry=True):
@ -72,27 +73,29 @@ class StackLock(object):
:param retry: When True, retry if lock was released while stealing.
:type retry: boolean
"""
lock_engine_id = stack_lock_object.StackLock.create(self.stack.id,
lock_engine_id = stack_lock_object.StackLock.create(self.stack_id,
self.engine_id)
if lock_engine_id is None:
LOG.debug("Engine %(engine)s acquired lock on stack "
"%(stack)s" % {'engine': self.engine_id,
'stack': self.stack.id})
'stack': self.stack_id})
return
stack = stack_object.Stack.get_by_id(self.context, self.stack_id,
show_deleted=True)
if (lock_engine_id == self.engine_id or
self.engine_alive(self.context, lock_engine_id)):
LOG.debug("Lock on stack %(stack)s is owned by engine "
"%(engine)s" % {'stack': self.stack.id,
"%(engine)s" % {'stack': self.stack_id,
'engine': lock_engine_id})
raise exception.ActionInProgress(stack_name=self.stack.name,
action=self.stack.action)
raise exception.ActionInProgress(stack_name=stack.name,
action=stack.action)
else:
LOG.info(_LI("Stale lock detected on stack %(stack)s. Engine "
"%(engine)s will attempt to steal the lock"),
{'stack': self.stack.id, 'engine': self.engine_id})
{'stack': self.stack_id, 'engine': self.engine_id})
result = stack_lock_object.StackLock.steal(self.stack.id,
result = stack_lock_object.StackLock.steal(self.stack_id,
lock_engine_id,
self.engine_id)
@ -100,39 +103,40 @@ class StackLock(object):
LOG.info(_LI("Engine %(engine)s successfully stole the lock "
"on stack %(stack)s"),
{'engine': self.engine_id,
'stack': self.stack.id})
'stack': self.stack_id})
return
elif result is True:
if retry:
LOG.info(_LI("The lock on stack %(stack)s was released "
"while engine %(engine)s was stealing it. "
"Trying again"), {'stack': self.stack.id,
"Trying again"), {'stack': self.stack_id,
'engine': self.engine_id})
return self.acquire(retry=False)
else:
new_lock_engine_id = result
LOG.info(_LI("Failed to steal lock on stack %(stack)s. "
"Engine %(engine)s stole the lock first"),
{'stack': self.stack.id,
{'stack': self.stack_id,
'engine': new_lock_engine_id})
raise exception.ActionInProgress(
stack_name=self.stack.name, action=self.stack.action)
stack_name=stack.name, action=stack.action)
def release(self, stack_id):
def release(self):
"""Release a stack lock."""
# Only the engine that owns the lock will be releasing it.
result = stack_lock_object.StackLock.release(stack_id,
result = stack_lock_object.StackLock.release(self.stack_id,
self.engine_id)
if result is True:
LOG.warn(_LW("Lock was already released on stack %s!"), stack_id)
LOG.warn(_LW("Lock was already released on stack %s!"),
self.stack_id)
else:
LOG.debug("Engine %(engine)s released lock on stack "
"%(stack)s" % {'engine': self.engine_id,
'stack': stack_id})
'stack': self.stack_id})
@contextlib.contextmanager
def thread_lock(self, stack_id):
def thread_lock(self):
"""
Acquire a lock and release it only if there is an exception. The
release method still needs to be scheduled to be run at the
@ -145,10 +149,10 @@ class StackLock(object):
raise
except: # noqa
with excutils.save_and_reraise_exception():
self.release(stack_id)
self.release()
@contextlib.contextmanager
def try_thread_lock(self, stack_id):
def try_thread_lock(self):
"""
Similar to thread_lock, but acquire the lock using try_acquire
and only release it upon any exception after a successful
@ -161,5 +165,5 @@ class StackLock(object):
except: # noqa
if result is None: # Lock was successfully acquired
with excutils.save_and_reraise_exception():
self.release(stack_id)
self.release()
raise

View File

@ -75,7 +75,7 @@ class StackServiceActionsTest(common.HeatTestCase):
result = self.man.stack_suspend(self.ctx, stk.identifier())
self.assertIsNone(result)
mock_load.assert_called_once_with(self.ctx, stack=s)
mock_link.assert_called_once_with(mock.ANY, sid)
mock_link.assert_called_once_with(mock.ANY)
mock_start.assert_called_once_with(sid, mock.ANY, stk)
stk.delete()
@ -96,7 +96,7 @@ class StackServiceActionsTest(common.HeatTestCase):
self.assertIsNone(result)
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
mock_link.assert_called_once_with(mock.ANY, sid)
mock_link.assert_called_once_with(mock.ANY)
mock_start.assert_called_once_with(sid, mock.ANY, stk)
stk.delete()

View File

@ -1744,7 +1744,7 @@ class StackServiceTest(common.HeatTestCase):
generic_rsrc.GenericResource)
thread = self.m.CreateMockAnything()
thread.link(mox.IgnoreArg(), self.stack.id).AndReturn(None)
thread.link(mox.IgnoreArg()).AndReturn(None)
thread.link(mox.IgnoreArg(), self.stack.id,
mox.IgnoreArg()).AndReturn(None)
@ -4280,7 +4280,7 @@ class ThreadGroupManagerTest(common.HeatTestCase):
thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f,
*self.fargs, **self.fkwargs)
self.stlock_mock.StackLock.assert_called_with(self.cnxt,
self.stack,
self.stack.id,
self.engine_id)
thm.start_with_acquired_lock.assert_called_once_with(

View File

@ -16,6 +16,7 @@ import oslo_messaging as messaging
from heat.common import exception
from heat.engine import stack_lock
from heat.objects import stack as stack_object
from heat.objects import stack_lock as stack_lock_object
from heat.tests import common
from heat.tests import utils
@ -25,11 +26,14 @@ class StackLockTest(common.HeatTestCase):
def setUp(self):
super(StackLockTest, self).setUp()
self.context = utils.dummy_context()
self.stack = mock.MagicMock()
self.stack.id = "aae01f2d-52ae-47ac-8a0d-3fde3d220fea"
self.stack.name = "test_stack"
self.stack.action = "CREATE"
self.stack_id = "aae01f2d-52ae-47ac-8a0d-3fde3d220fea"
self.engine_id = stack_lock.StackLock.generate_engine_id()
stack = mock.MagicMock()
stack.id = self.stack_id
stack.name = "test_stack"
stack.action = "CREATE"
self.patchobject(stack_object.Stack, 'get_by_id',
return_value=stack)
class TestThreadLockException(Exception):
pass
@ -39,20 +43,22 @@ class StackLockTest(common.HeatTestCase):
'create',
return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
slock.acquire()
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
def test_failed_acquire_existing_lock_current_engine(self):
mock_create = self.patchobject(stack_lock_object.StackLock,
'create',
return_value=self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.assertRaises(exception.ActionInProgress, slock.acquire)
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
def test_successful_acquire_existing_lock_engine_dead(self):
mock_create = self.patchobject(stack_lock_object.StackLock,
@ -62,12 +68,13 @@ class StackLockTest(common.HeatTestCase):
'steal',
return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=False)
slock.acquire()
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_steal.assert_called_once_with(self.stack.id, 'fake-engine-id',
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
mock_steal.assert_called_once_with(self.stack_id, 'fake-engine-id',
self.engine_id)
def test_failed_acquire_existing_lock_engine_alive(self):
@ -75,11 +82,12 @@ class StackLockTest(common.HeatTestCase):
'create',
return_value='fake-engine-id')
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=True)
self.assertRaises(exception.ActionInProgress, slock.acquire)
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
def test_failed_acquire_existing_lock_engine_dead(self):
mock_create = self.patchobject(stack_lock_object.StackLock,
@ -89,12 +97,13 @@ class StackLockTest(common.HeatTestCase):
'steal',
return_value='fake-engine-id2')
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=False)
self.assertRaises(exception.ActionInProgress, slock.acquire)
mock_create.assert_called_once_with(self.stack.id, self.engine_id)
mock_steal.assert_called_once_with(self.stack.id, 'fake-engine-id',
mock_create.assert_called_once_with(self.stack_id, self.engine_id)
mock_steal.assert_called_once_with(self.stack_id, 'fake-engine-id',
self.engine_id)
def test_successful_acquire_with_retry(self):
@ -105,14 +114,15 @@ class StackLockTest(common.HeatTestCase):
'steal',
side_effect=[True, None])
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=False)
slock.acquire()
mock_create.assert_has_calls(
[mock.call(self.stack.id, self.engine_id)] * 2)
[mock.call(self.stack_id, self.engine_id)] * 2)
mock_steal.assert_has_calls(
[mock.call(self.stack.id, 'fake-engine-id', self.engine_id)] * 2)
[mock.call(self.stack_id, 'fake-engine-id', self.engine_id)] * 2)
def test_failed_acquire_one_retry_only(self):
mock_create = self.patchobject(stack_lock_object.StackLock,
@ -122,22 +132,24 @@ class StackLockTest(common.HeatTestCase):
'steal',
return_value=True)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
self.patchobject(slock, 'engine_alive', return_value=False)
self.assertRaises(exception.ActionInProgress, slock.acquire)
mock_create.assert_has_calls(
[mock.call(self.stack.id, self.engine_id)] * 2)
[mock.call(self.stack_id, self.engine_id)] * 2)
mock_steal.assert_has_calls(
[mock.call(self.stack.id, 'fake-engine-id', self.engine_id)] * 2)
[mock.call(self.stack_id, 'fake-engine-id', self.engine_id)] * 2)
def test_thread_lock_context_mgr_exception_acquire_success(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=None)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
def check_thread_lock():
with slock.thread_lock(self.stack.id):
with slock.thread_lock():
self.assertEqual(1,
stack_lock_object.StackLock.create.call_count)
raise self.TestThreadLockException
@ -148,10 +160,11 @@ class StackLockTest(common.HeatTestCase):
stack_lock_object.StackLock.create = mock.Mock(
return_value=self.engine_id)
stack_lock_object.StackLock.release = mock.Mock()
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
def check_thread_lock():
with slock.thread_lock(self.stack.id):
with slock.thread_lock():
self.assertEqual(1,
stack_lock_object.StackLock.create.call_count)
raise exception.ActionInProgress
@ -161,18 +174,20 @@ class StackLockTest(common.HeatTestCase):
def test_thread_lock_context_mgr_no_exception(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=None)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
with slock.thread_lock(self.stack.id):
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
with slock.thread_lock():
self.assertEqual(1, stack_lock_object.StackLock.create.call_count)
assert not stack_lock_object.StackLock.release.called
def test_try_thread_lock_context_mgr_exception(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=None)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
def check_thread_lock():
with slock.try_thread_lock(self.stack.id):
with slock.try_thread_lock():
self.assertEqual(1,
stack_lock_object.StackLock.create.call_count)
raise self.TestThreadLockException
@ -182,18 +197,20 @@ class StackLockTest(common.HeatTestCase):
def test_try_thread_lock_context_mgr_no_exception(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=None)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
with slock.try_thread_lock(self.stack.id):
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
with slock.try_thread_lock():
self.assertEqual(1, stack_lock_object.StackLock.create.call_count)
assert not stack_lock_object.StackLock.release.called
def test_try_thread_lock_context_mgr_existing_lock(self):
stack_lock_object.StackLock.create = mock.Mock(return_value=1234)
stack_lock_object.StackLock.release = mock.Mock(return_value=None)
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
def check_thread_lock():
with slock.try_thread_lock(self.stack.id):
with slock.try_thread_lock():
self.assertEqual(1,
stack_lock_object.StackLock.create.call_count)
raise self.TestThreadLockException
@ -201,7 +218,8 @@ class StackLockTest(common.HeatTestCase):
assert not stack_lock_object.StackLock.release.called
def test_engine_alive_ok(self):
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
mget_client = self.patchobject(stack_lock.rpc_messaging,
'get_rpc_client')
mclient = mget_client.return_value
@ -213,7 +231,8 @@ class StackLockTest(common.HeatTestCase):
mclient_ctx.call.assert_called_once_with(self.context, 'listening')
def test_engine_alive_timeout(self):
slock = stack_lock.StackLock(self.context, self.stack, self.engine_id)
slock = stack_lock.StackLock(self.context, self.stack_id,
self.engine_id)
mget_client = self.patchobject(stack_lock.rpc_messaging,
'get_rpc_client')
mclient = mget_client.return_value