Merge "Set stack status to FAILED when engine is down"

This commit is contained in:
Jenkins 2015-04-03 19:01:29 +00:00 committed by Gerrit Code Review
commit 0a1639a5bc
7 changed files with 112 additions and 0 deletions

View File

@ -164,6 +164,10 @@ def stack_lock_create(stack_id, engine_id):
return IMPL.stack_lock_create(stack_id, engine_id)
def stack_lock_get_engine_id(stack_id):
return IMPL.stack_lock_get_engine_id(stack_id)
def stack_lock_steal(stack_id, old_engine_id, new_engine_id):
return IMPL.stack_lock_steal(stack_id, old_engine_id, new_engine_id)

View File

@ -470,6 +470,14 @@ def stack_lock_create(stack_id, engine_id):
session.add(models.StackLock(stack_id=stack_id, engine_id=engine_id))
def stack_lock_get_engine_id(stack_id):
session = get_session()
with session.begin():
lock = session.query(models.StackLock).get(stack_id)
if lock is not None:
return lock.engine_id
def stack_lock_steal(stack_id, old_engine_id, new_engine_id):
session = get_session()
with session.begin():

View File

@ -352,6 +352,7 @@ class EngineService(service.Service):
self.manage_thread_grp = threadgroup.ThreadGroup()
self.manage_thread_grp.add_timer(cfg.CONF.periodic_interval,
self.service_manage_report)
self.manage_thread_grp.add_thread(self.reset_stack_status)
super(EngineService, self).start()
@ -1554,3 +1555,33 @@ class EngineService(service.Service):
# hasn't been updated, assuming it's died.
LOG.info(_LI('Service %s was aborted'), service_ref['id'])
service_objects.Service.delete(cnxt, service_ref['id'])
def reset_stack_status(self):
cnxt = context.get_admin_context()
filters = {'status': parser.Stack.IN_PROGRESS}
stacks = stack_object.Stack.get_all(cnxt,
filters=filters,
tenant_safe=False) or []
for s in stacks:
stk = parser.Stack.load(cnxt, stack=s,
use_stored_context=True)
lock = stack_lock.StackLock(cnxt, stk, self.engine_id)
# If stacklock is released, means stack status may changed.
engine_id = lock.get_engine_id()
if not engine_id:
continue
# Try to steal the lock and set status to failed.
try:
lock.acquire(retry=False)
except exception.ActionInProgress:
continue
LOG.info(_LI('Engine %(engine)s went down when stack %(stack_id)s'
' was in action %(action)s'),
{'engine': engine_id, 'action': stk.action,
'stack_id': stk.id})
# Set stack status to FAILED.
status_reason = ('Engine went down during stack %s' % stk.action)
self.thread_group_mgr.start_with_acquired_lock(
stk, lock, stk.state_set, stk.action,
stk.FAILED, six.text_type(status_reason)
)

View File

@ -54,6 +54,9 @@ class StackLock(object):
def generate_engine_id():
return str(uuid.uuid4())
def get_engine_id(self):
return stack_lock_object.StackLock.get_engine_id(self.stack.id)
def try_acquire(self):
"""
Try to acquire a stack lock, but don't raise an ActionInProgress

View File

@ -45,3 +45,7 @@ class StackLock(base.VersionedObject,
@classmethod
def release(cls, stack_id, engine_id):
return db_api.stack_lock_release(stack_id, engine_id)
@classmethod
def get_engine_id(cls, stack_id):
return db_api.stack_lock_get_engine_id(stack_id)

View File

@ -1681,6 +1681,15 @@ class DBAPIStackLockTest(common.HeatTestCase):
observed = db_api.stack_lock_create(self.stack.id, UUID2)
self.assertEqual(UUID1, observed)
def test_stack_lock_get_id_success(self):
db_api.stack_lock_create(self.stack.id, UUID1)
observed = db_api.stack_lock_get_engine_id(self.stack.id)
self.assertEqual(UUID1, observed)
def test_stack_lock_get_id_return_none(self):
observed = db_api.stack_lock_get_engine_id(self.stack.id)
self.assertIsNone(observed)
def test_stack_lock_steal_success(self):
db_api.stack_lock_create(self.stack.id, UUID1)
observed = db_api.stack_lock_steal(self.stack.id, UUID1, UUID2)

View File

@ -3277,6 +3277,7 @@ class StackServiceTest(common.HeatTestCase):
target_class,
rpc_server_method):
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# engine id
@ -3346,6 +3347,57 @@ class StackServiceTest(common.HeatTestCase):
self.eng.service_manage_report
)
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch.object(service_objects.Stack, 'get_all')
@mock.patch('heat.engine.stack_lock.StackLock',
return_value=mock.Mock())
@mock.patch.object(parser.Stack, 'load')
@mock.patch.object(context, 'get_admin_context')
def test_engine_reset_stack_status(
self,
mock_admin_context,
mock_stack_load,
mock_stacklock,
mock_get_all,
mock_thread):
mock_admin_context.return_value = self.ctx
db_stack = mock.MagicMock()
db_stack.id = 'foo'
db_stack.status = 'IN_PROGRESS'
db_stack.status_reason = None
mock_get_all.return_value = [db_stack]
fake_stack = mock.MagicMock()
fake_stack.action = 'CREATE'
fake_stack.id = 'foo'
fake_stack.status = 'IN_PROGRESS'
fake_stack.state_set.return_value = None
mock_stack_load.return_value = fake_stack
fake_lock = mock.MagicMock()
fake_lock.get_engine_id.return_value = 'old-engine'
fake_lock.acquire.return_value = None
mock_stacklock.return_value = fake_lock
self.eng.thread_group_mgr = mock_thread
self.eng.reset_stack_status()
mock_admin_context.assert_called_once_with()
filters = {'status': parser.Stack.IN_PROGRESS}
mock_get_all.assert_called_once_with(self.ctx,
filters=filters,
tenant_safe=False)
mock_stack_load.assert_call_once_with(self.ctx,
stack=db_stack,
use_stored_context=True)
mock_thread.start_with_acquired_lock.assert_call_once_with(
fake_stack, fake_stack.state_set, fake_stack.action,
parser.Stack.FAILED, 'Engine went down during stack CREATE'
)
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
@ -3425,6 +3477,7 @@ class StackServiceTest(common.HeatTestCase):
admin_context_method):
cfg.CONF.set_default('periodic_interval', 60)
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# Add dummy thread group to test thread_group_mgr.stop() is executed?