diff --git a/heat/db/api.py b/heat/db/api.py index 77d98d988c..87440412ce 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -164,6 +164,10 @@ def stack_lock_create(stack_id, engine_id): return IMPL.stack_lock_create(stack_id, engine_id) +def stack_lock_get_engine_id(stack_id): + return IMPL.stack_lock_get_engine_id(stack_id) + + def stack_lock_steal(stack_id, old_engine_id, new_engine_id): return IMPL.stack_lock_steal(stack_id, old_engine_id, new_engine_id) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 520d182694..5291583214 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -470,6 +470,14 @@ def stack_lock_create(stack_id, engine_id): session.add(models.StackLock(stack_id=stack_id, engine_id=engine_id)) +def stack_lock_get_engine_id(stack_id): + session = get_session() + with session.begin(): + lock = session.query(models.StackLock).get(stack_id) + if lock is not None: + return lock.engine_id + + def stack_lock_steal(stack_id, old_engine_id, new_engine_id): session = get_session() with session.begin(): diff --git a/heat/engine/service.py b/heat/engine/service.py index a1097db54a..cfadfc9b8a 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -352,6 +352,7 @@ class EngineService(service.Service): self.manage_thread_grp = threadgroup.ThreadGroup() self.manage_thread_grp.add_timer(cfg.CONF.periodic_interval, self.service_manage_report) + self.manage_thread_grp.add_thread(self.reset_stack_status) super(EngineService, self).start() @@ -1554,3 +1555,33 @@ class EngineService(service.Service): # hasn't been updated, assuming it's died. LOG.info(_LI('Service %s was aborted'), service_ref['id']) service_objects.Service.delete(cnxt, service_ref['id']) + + def reset_stack_status(self): + cnxt = context.get_admin_context() + filters = {'status': parser.Stack.IN_PROGRESS} + stacks = stack_object.Stack.get_all(cnxt, + filters=filters, + tenant_safe=False) or [] + for s in stacks: + stk = parser.Stack.load(cnxt, stack=s, + use_stored_context=True) + lock = stack_lock.StackLock(cnxt, stk, self.engine_id) + # If stacklock is released, means stack status may changed. + engine_id = lock.get_engine_id() + if not engine_id: + continue + # Try to steal the lock and set status to failed. + try: + lock.acquire(retry=False) + except exception.ActionInProgress: + continue + LOG.info(_LI('Engine %(engine)s went down when stack %(stack_id)s' + ' was in action %(action)s'), + {'engine': engine_id, 'action': stk.action, + 'stack_id': stk.id}) + # Set stack status to FAILED. + status_reason = ('Engine went down during stack %s' % stk.action) + self.thread_group_mgr.start_with_acquired_lock( + stk, lock, stk.state_set, stk.action, + stk.FAILED, six.text_type(status_reason) + ) diff --git a/heat/engine/stack_lock.py b/heat/engine/stack_lock.py index 638b36a091..c66758cdaf 100644 --- a/heat/engine/stack_lock.py +++ b/heat/engine/stack_lock.py @@ -54,6 +54,9 @@ class StackLock(object): def generate_engine_id(): return str(uuid.uuid4()) + def get_engine_id(self): + return stack_lock_object.StackLock.get_engine_id(self.stack.id) + def try_acquire(self): """ Try to acquire a stack lock, but don't raise an ActionInProgress diff --git a/heat/objects/stack_lock.py b/heat/objects/stack_lock.py index 213b3ac708..89c47daaef 100644 --- a/heat/objects/stack_lock.py +++ b/heat/objects/stack_lock.py @@ -45,3 +45,7 @@ class StackLock(base.VersionedObject, @classmethod def release(cls, stack_id, engine_id): return db_api.stack_lock_release(stack_id, engine_id) + + @classmethod + def get_engine_id(cls, stack_id): + return db_api.stack_lock_get_engine_id(stack_id) diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index 2f3fad7fe0..920491da1e 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -1681,6 +1681,15 @@ class DBAPIStackLockTest(common.HeatTestCase): observed = db_api.stack_lock_create(self.stack.id, UUID2) self.assertEqual(UUID1, observed) + def test_stack_lock_get_id_success(self): + db_api.stack_lock_create(self.stack.id, UUID1) + observed = db_api.stack_lock_get_engine_id(self.stack.id) + self.assertEqual(UUID1, observed) + + def test_stack_lock_get_id_return_none(self): + observed = db_api.stack_lock_get_engine_id(self.stack.id) + self.assertIsNone(observed) + def test_stack_lock_steal_success(self): db_api.stack_lock_create(self.stack.id, UUID1) observed = db_api.stack_lock_steal(self.stack.id, UUID1, UUID2) diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 19045985e7..cdcc473551 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -3277,6 +3277,7 @@ class StackServiceTest(common.HeatTestCase): target_class, rpc_server_method): self.patchobject(self.eng, 'service_manage_cleanup') + self.patchobject(self.eng, 'reset_stack_status') self.eng.start() # engine id @@ -3346,6 +3347,57 @@ class StackServiceTest(common.HeatTestCase): self.eng.service_manage_report ) + @mock.patch('heat.engine.service.ThreadGroupManager', + return_value=mock.Mock()) + @mock.patch.object(service_objects.Stack, 'get_all') + @mock.patch('heat.engine.stack_lock.StackLock', + return_value=mock.Mock()) + @mock.patch.object(parser.Stack, 'load') + @mock.patch.object(context, 'get_admin_context') + def test_engine_reset_stack_status( + self, + mock_admin_context, + mock_stack_load, + mock_stacklock, + mock_get_all, + mock_thread): + mock_admin_context.return_value = self.ctx + + db_stack = mock.MagicMock() + db_stack.id = 'foo' + db_stack.status = 'IN_PROGRESS' + db_stack.status_reason = None + mock_get_all.return_value = [db_stack] + + fake_stack = mock.MagicMock() + fake_stack.action = 'CREATE' + fake_stack.id = 'foo' + fake_stack.status = 'IN_PROGRESS' + fake_stack.state_set.return_value = None + mock_stack_load.return_value = fake_stack + + fake_lock = mock.MagicMock() + fake_lock.get_engine_id.return_value = 'old-engine' + fake_lock.acquire.return_value = None + mock_stacklock.return_value = fake_lock + + self.eng.thread_group_mgr = mock_thread + + self.eng.reset_stack_status() + + mock_admin_context.assert_called_once_with() + filters = {'status': parser.Stack.IN_PROGRESS} + mock_get_all.assert_called_once_with(self.ctx, + filters=filters, + tenant_safe=False) + mock_stack_load.assert_call_once_with(self.ctx, + stack=db_stack, + use_stored_context=True) + mock_thread.start_with_acquired_lock.assert_call_once_with( + fake_stack, fake_stack.state_set, fake_stack.action, + parser.Stack.FAILED, 'Engine went down during stack CREATE' + ) + @mock.patch('heat.common.messaging.get_rpc_server', return_value=mock.Mock()) @mock.patch('oslo_messaging.Target', @@ -3425,6 +3477,7 @@ class StackServiceTest(common.HeatTestCase): admin_context_method): cfg.CONF.set_default('periodic_interval', 60) self.patchobject(self.eng, 'service_manage_cleanup') + self.patchobject(self.eng, 'reset_stack_status') self.eng.start() # Add dummy thread group to test thread_group_mgr.stop() is executed?