diff --git a/heat/engine/service.py b/heat/engine/service.py index 21b94a2497..510dc952a3 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -135,8 +135,8 @@ class ThreadGroupManager(object): :param args: Args to be passed to func :param kwargs: Keyword-args to be passed to func. """ - lock = stack_lock.StackLock(cnxt, stack, engine_id) - with lock.thread_lock(stack.id): + lock = stack_lock.StackLock(cnxt, stack.id, engine_id) + with lock.thread_lock(): th = self.start_with_acquired_lock(stack, lock, func, *args, **kwargs) return th @@ -156,14 +156,14 @@ class ThreadGroupManager(object): :param kwargs: Keyword-args to be passed to func """ - def release(gt, *args): + def release(gt): """ Callback function that will be passed to GreenThread.link(). """ - lock.release(*args) + lock.release() th = self.start(stack.id, func, *args, **kwargs) - th.link(release, stack.id) + th.link(release) return th def add_timer(self, stack_id, func, *args, **kwargs): @@ -795,7 +795,7 @@ class EngineService(service.Service): # stop the running update and take the lock # as we cancel only running update, the acquire_result is # always some engine_id, not None - lock = stack_lock.StackLock(cnxt, current_stack, + lock = stack_lock.StackLock(cnxt, current_stack.id, self.engine_id) engine_id = lock.try_acquire() # Current engine has the lock @@ -927,8 +927,8 @@ class EngineService(service.Service): LOG.info(_LI('Deleting stack %s'), st.name) stack = parser.Stack.load(cnxt, stack=st) - lock = stack_lock.StackLock(cnxt, stack, self.engine_id) - with lock.try_thread_lock(stack.id) as acquire_result: + lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id) + with lock.try_thread_lock() as acquire_result: # Successfully acquired lock if acquire_result is None: @@ -979,8 +979,8 @@ class EngineService(service.Service): st = self._get_stack(cnxt, stack_identity) LOG.info(_LI('abandoning stack %s'), st.name) stack = parser.Stack.load(cnxt, stack=st) - lock = stack_lock.StackLock(cnxt, stack, self.engine_id) - with lock.thread_lock(stack.id): + lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id) + with lock.thread_lock(): # Get stack details before deleting it. stack_info = stack.prepare_abandon() self.thread_group_mgr.start_with_acquired_lock(stack, @@ -1274,9 +1274,9 @@ class EngineService(service.Service): raise exception.ActionInProgress(stack_name=stack.name, action=stack.action) - lock = stack_lock.StackLock(cnxt, stack, self.engine_id) + lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id) - with lock.thread_lock(stack.id): + with lock.thread_lock(): snapshot = snapshot_object.Snapshot.create(cnxt, { 'tenant': cnxt.tenant_id, 'name': name, @@ -1605,9 +1605,7 @@ class EngineService(service.Service): filters=filters, tenant_safe=False) or [] for s in stacks: - stk = parser.Stack.load(cnxt, stack=s, - use_stored_context=True) - lock = stack_lock.StackLock(cnxt, stk, self.engine_id) + lock = stack_lock.StackLock(cnxt, s.id, self.engine_id) # If stacklock is released, means stack status may changed. engine_id = lock.get_engine_id() if not engine_id: @@ -1617,6 +1615,8 @@ class EngineService(service.Service): lock.acquire(retry=False) except exception.ActionInProgress: continue + stk = parser.Stack.load(cnxt, stack=s, + use_stored_context=True) LOG.info(_LI('Engine %(engine)s went down when stack %(stack_id)s' ' was in action %(action)s'), {'engine': engine_id, 'action': stk.action, diff --git a/heat/engine/stack_lock.py b/heat/engine/stack_lock.py index c66758cdaf..314863cdd0 100644 --- a/heat/engine/stack_lock.py +++ b/heat/engine/stack_lock.py @@ -23,6 +23,7 @@ from heat.common import exception from heat.common.i18n import _LI from heat.common.i18n import _LW from heat.common import messaging as rpc_messaging +from heat.objects import stack as stack_object from heat.objects import stack_lock as stack_lock_object from heat.rpc import api as rpc_api @@ -32,9 +33,9 @@ LOG = logging.getLogger(__name__) class StackLock(object): - def __init__(self, context, stack, engine_id): + def __init__(self, context, stack_id, engine_id): self.context = context - self.stack = stack + self.stack_id = stack_id self.engine_id = engine_id self.listener = None @@ -55,14 +56,14 @@ class StackLock(object): return str(uuid.uuid4()) def get_engine_id(self): - return stack_lock_object.StackLock.get_engine_id(self.stack.id) + return stack_lock_object.StackLock.get_engine_id(self.stack_id) def try_acquire(self): """ Try to acquire a stack lock, but don't raise an ActionInProgress exception or try to steal lock. """ - return stack_lock_object.StackLock.create(self.stack.id, + return stack_lock_object.StackLock.create(self.stack_id, self.engine_id) def acquire(self, retry=True): @@ -72,27 +73,29 @@ class StackLock(object): :param retry: When True, retry if lock was released while stealing. :type retry: boolean """ - lock_engine_id = stack_lock_object.StackLock.create(self.stack.id, + lock_engine_id = stack_lock_object.StackLock.create(self.stack_id, self.engine_id) if lock_engine_id is None: LOG.debug("Engine %(engine)s acquired lock on stack " "%(stack)s" % {'engine': self.engine_id, - 'stack': self.stack.id}) + 'stack': self.stack_id}) return + stack = stack_object.Stack.get_by_id(self.context, self.stack_id, + show_deleted=True) if (lock_engine_id == self.engine_id or self.engine_alive(self.context, lock_engine_id)): LOG.debug("Lock on stack %(stack)s is owned by engine " - "%(engine)s" % {'stack': self.stack.id, + "%(engine)s" % {'stack': self.stack_id, 'engine': lock_engine_id}) - raise exception.ActionInProgress(stack_name=self.stack.name, - action=self.stack.action) + raise exception.ActionInProgress(stack_name=stack.name, + action=stack.action) else: LOG.info(_LI("Stale lock detected on stack %(stack)s. Engine " "%(engine)s will attempt to steal the lock"), - {'stack': self.stack.id, 'engine': self.engine_id}) + {'stack': self.stack_id, 'engine': self.engine_id}) - result = stack_lock_object.StackLock.steal(self.stack.id, + result = stack_lock_object.StackLock.steal(self.stack_id, lock_engine_id, self.engine_id) @@ -100,39 +103,40 @@ class StackLock(object): LOG.info(_LI("Engine %(engine)s successfully stole the lock " "on stack %(stack)s"), {'engine': self.engine_id, - 'stack': self.stack.id}) + 'stack': self.stack_id}) return elif result is True: if retry: LOG.info(_LI("The lock on stack %(stack)s was released " "while engine %(engine)s was stealing it. " - "Trying again"), {'stack': self.stack.id, + "Trying again"), {'stack': self.stack_id, 'engine': self.engine_id}) return self.acquire(retry=False) else: new_lock_engine_id = result LOG.info(_LI("Failed to steal lock on stack %(stack)s. " "Engine %(engine)s stole the lock first"), - {'stack': self.stack.id, + {'stack': self.stack_id, 'engine': new_lock_engine_id}) raise exception.ActionInProgress( - stack_name=self.stack.name, action=self.stack.action) + stack_name=stack.name, action=stack.action) - def release(self, stack_id): + def release(self): """Release a stack lock.""" # Only the engine that owns the lock will be releasing it. - result = stack_lock_object.StackLock.release(stack_id, + result = stack_lock_object.StackLock.release(self.stack_id, self.engine_id) if result is True: - LOG.warn(_LW("Lock was already released on stack %s!"), stack_id) + LOG.warn(_LW("Lock was already released on stack %s!"), + self.stack_id) else: LOG.debug("Engine %(engine)s released lock on stack " "%(stack)s" % {'engine': self.engine_id, - 'stack': stack_id}) + 'stack': self.stack_id}) @contextlib.contextmanager - def thread_lock(self, stack_id): + def thread_lock(self): """ Acquire a lock and release it only if there is an exception. The release method still needs to be scheduled to be run at the @@ -145,10 +149,10 @@ class StackLock(object): raise except: # noqa with excutils.save_and_reraise_exception(): - self.release(stack_id) + self.release() @contextlib.contextmanager - def try_thread_lock(self, stack_id): + def try_thread_lock(self): """ Similar to thread_lock, but acquire the lock using try_acquire and only release it upon any exception after a successful @@ -161,5 +165,5 @@ class StackLock(object): except: # noqa if result is None: # Lock was successfully acquired with excutils.save_and_reraise_exception(): - self.release(stack_id) + self.release() raise diff --git a/heat/tests/engine/test_stack_action.py b/heat/tests/engine/test_stack_action.py index 97653b676a..f5d78c2e4d 100644 --- a/heat/tests/engine/test_stack_action.py +++ b/heat/tests/engine/test_stack_action.py @@ -75,7 +75,7 @@ class StackServiceActionsTest(common.HeatTestCase): result = self.man.stack_suspend(self.ctx, stk.identifier()) self.assertIsNone(result) mock_load.assert_called_once_with(self.ctx, stack=s) - mock_link.assert_called_once_with(mock.ANY, sid) + mock_link.assert_called_once_with(mock.ANY) mock_start.assert_called_once_with(sid, mock.ANY, stk) stk.delete() @@ -96,7 +96,7 @@ class StackServiceActionsTest(common.HeatTestCase): self.assertIsNone(result) mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) - mock_link.assert_called_once_with(mock.ANY, sid) + mock_link.assert_called_once_with(mock.ANY) mock_start.assert_called_once_with(sid, mock.ANY, stk) stk.delete() diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 7cf06c2949..489c91872f 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -1744,7 +1744,7 @@ class StackServiceTest(common.HeatTestCase): generic_rsrc.GenericResource) thread = self.m.CreateMockAnything() - thread.link(mox.IgnoreArg(), self.stack.id).AndReturn(None) + thread.link(mox.IgnoreArg()).AndReturn(None) thread.link(mox.IgnoreArg(), self.stack.id, mox.IgnoreArg()).AndReturn(None) @@ -4280,7 +4280,7 @@ class ThreadGroupManagerTest(common.HeatTestCase): thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f, *self.fargs, **self.fkwargs) self.stlock_mock.StackLock.assert_called_with(self.cnxt, - self.stack, + self.stack.id, self.engine_id) thm.start_with_acquired_lock.assert_called_once_with( diff --git a/heat/tests/test_stack_lock.py b/heat/tests/test_stack_lock.py index 5caeb55bc3..babc406dff 100644 --- a/heat/tests/test_stack_lock.py +++ b/heat/tests/test_stack_lock.py @@ -16,6 +16,7 @@ import oslo_messaging as messaging from heat.common import exception from heat.engine import stack_lock +from heat.objects import stack as stack_object from heat.objects import stack_lock as stack_lock_object from heat.tests import common from heat.tests import utils @@ -25,11 +26,14 @@ class StackLockTest(common.HeatTestCase): def setUp(self): super(StackLockTest, self).setUp() self.context = utils.dummy_context() - self.stack = mock.MagicMock() - self.stack.id = "aae01f2d-52ae-47ac-8a0d-3fde3d220fea" - self.stack.name = "test_stack" - self.stack.action = "CREATE" + self.stack_id = "aae01f2d-52ae-47ac-8a0d-3fde3d220fea" self.engine_id = stack_lock.StackLock.generate_engine_id() + stack = mock.MagicMock() + stack.id = self.stack_id + stack.name = "test_stack" + stack.action = "CREATE" + self.patchobject(stack_object.Stack, 'get_by_id', + return_value=stack) class TestThreadLockException(Exception): pass @@ -39,20 +43,22 @@ class StackLockTest(common.HeatTestCase): 'create', return_value=None) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) slock.acquire() - mock_create.assert_called_once_with(self.stack.id, self.engine_id) + mock_create.assert_called_once_with(self.stack_id, self.engine_id) def test_failed_acquire_existing_lock_current_engine(self): mock_create = self.patchobject(stack_lock_object.StackLock, 'create', return_value=self.engine_id) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) self.assertRaises(exception.ActionInProgress, slock.acquire) - mock_create.assert_called_once_with(self.stack.id, self.engine_id) + mock_create.assert_called_once_with(self.stack_id, self.engine_id) def test_successful_acquire_existing_lock_engine_dead(self): mock_create = self.patchobject(stack_lock_object.StackLock, @@ -62,12 +68,13 @@ class StackLockTest(common.HeatTestCase): 'steal', return_value=None) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) self.patchobject(slock, 'engine_alive', return_value=False) slock.acquire() - mock_create.assert_called_once_with(self.stack.id, self.engine_id) - mock_steal.assert_called_once_with(self.stack.id, 'fake-engine-id', + mock_create.assert_called_once_with(self.stack_id, self.engine_id) + mock_steal.assert_called_once_with(self.stack_id, 'fake-engine-id', self.engine_id) def test_failed_acquire_existing_lock_engine_alive(self): @@ -75,11 +82,12 @@ class StackLockTest(common.HeatTestCase): 'create', return_value='fake-engine-id') - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) self.patchobject(slock, 'engine_alive', return_value=True) self.assertRaises(exception.ActionInProgress, slock.acquire) - mock_create.assert_called_once_with(self.stack.id, self.engine_id) + mock_create.assert_called_once_with(self.stack_id, self.engine_id) def test_failed_acquire_existing_lock_engine_dead(self): mock_create = self.patchobject(stack_lock_object.StackLock, @@ -89,12 +97,13 @@ class StackLockTest(common.HeatTestCase): 'steal', return_value='fake-engine-id2') - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) self.patchobject(slock, 'engine_alive', return_value=False) self.assertRaises(exception.ActionInProgress, slock.acquire) - mock_create.assert_called_once_with(self.stack.id, self.engine_id) - mock_steal.assert_called_once_with(self.stack.id, 'fake-engine-id', + mock_create.assert_called_once_with(self.stack_id, self.engine_id) + mock_steal.assert_called_once_with(self.stack_id, 'fake-engine-id', self.engine_id) def test_successful_acquire_with_retry(self): @@ -105,14 +114,15 @@ class StackLockTest(common.HeatTestCase): 'steal', side_effect=[True, None]) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) self.patchobject(slock, 'engine_alive', return_value=False) slock.acquire() mock_create.assert_has_calls( - [mock.call(self.stack.id, self.engine_id)] * 2) + [mock.call(self.stack_id, self.engine_id)] * 2) mock_steal.assert_has_calls( - [mock.call(self.stack.id, 'fake-engine-id', self.engine_id)] * 2) + [mock.call(self.stack_id, 'fake-engine-id', self.engine_id)] * 2) def test_failed_acquire_one_retry_only(self): mock_create = self.patchobject(stack_lock_object.StackLock, @@ -122,22 +132,24 @@ class StackLockTest(common.HeatTestCase): 'steal', return_value=True) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) self.patchobject(slock, 'engine_alive', return_value=False) self.assertRaises(exception.ActionInProgress, slock.acquire) mock_create.assert_has_calls( - [mock.call(self.stack.id, self.engine_id)] * 2) + [mock.call(self.stack_id, self.engine_id)] * 2) mock_steal.assert_has_calls( - [mock.call(self.stack.id, 'fake-engine-id', self.engine_id)] * 2) + [mock.call(self.stack_id, 'fake-engine-id', self.engine_id)] * 2) def test_thread_lock_context_mgr_exception_acquire_success(self): stack_lock_object.StackLock.create = mock.Mock(return_value=None) stack_lock_object.StackLock.release = mock.Mock(return_value=None) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) def check_thread_lock(): - with slock.thread_lock(self.stack.id): + with slock.thread_lock(): self.assertEqual(1, stack_lock_object.StackLock.create.call_count) raise self.TestThreadLockException @@ -148,10 +160,11 @@ class StackLockTest(common.HeatTestCase): stack_lock_object.StackLock.create = mock.Mock( return_value=self.engine_id) stack_lock_object.StackLock.release = mock.Mock() - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) def check_thread_lock(): - with slock.thread_lock(self.stack.id): + with slock.thread_lock(): self.assertEqual(1, stack_lock_object.StackLock.create.call_count) raise exception.ActionInProgress @@ -161,18 +174,20 @@ class StackLockTest(common.HeatTestCase): def test_thread_lock_context_mgr_no_exception(self): stack_lock_object.StackLock.create = mock.Mock(return_value=None) stack_lock_object.StackLock.release = mock.Mock(return_value=None) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) - with slock.thread_lock(self.stack.id): + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) + with slock.thread_lock(): self.assertEqual(1, stack_lock_object.StackLock.create.call_count) assert not stack_lock_object.StackLock.release.called def test_try_thread_lock_context_mgr_exception(self): stack_lock_object.StackLock.create = mock.Mock(return_value=None) stack_lock_object.StackLock.release = mock.Mock(return_value=None) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) def check_thread_lock(): - with slock.try_thread_lock(self.stack.id): + with slock.try_thread_lock(): self.assertEqual(1, stack_lock_object.StackLock.create.call_count) raise self.TestThreadLockException @@ -182,18 +197,20 @@ class StackLockTest(common.HeatTestCase): def test_try_thread_lock_context_mgr_no_exception(self): stack_lock_object.StackLock.create = mock.Mock(return_value=None) stack_lock_object.StackLock.release = mock.Mock(return_value=None) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) - with slock.try_thread_lock(self.stack.id): + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) + with slock.try_thread_lock(): self.assertEqual(1, stack_lock_object.StackLock.create.call_count) assert not stack_lock_object.StackLock.release.called def test_try_thread_lock_context_mgr_existing_lock(self): stack_lock_object.StackLock.create = mock.Mock(return_value=1234) stack_lock_object.StackLock.release = mock.Mock(return_value=None) - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) def check_thread_lock(): - with slock.try_thread_lock(self.stack.id): + with slock.try_thread_lock(): self.assertEqual(1, stack_lock_object.StackLock.create.call_count) raise self.TestThreadLockException @@ -201,7 +218,8 @@ class StackLockTest(common.HeatTestCase): assert not stack_lock_object.StackLock.release.called def test_engine_alive_ok(self): - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) mget_client = self.patchobject(stack_lock.rpc_messaging, 'get_rpc_client') mclient = mget_client.return_value @@ -213,7 +231,8 @@ class StackLockTest(common.HeatTestCase): mclient_ctx.call.assert_called_once_with(self.context, 'listening') def test_engine_alive_timeout(self): - slock = stack_lock.StackLock(self.context, self.stack, self.engine_id) + slock = stack_lock.StackLock(self.context, self.stack_id, + self.engine_id) mget_client = self.patchobject(stack_lock.rpc_messaging, 'get_rpc_client') mclient = mget_client.return_value