Allow an in-progress stack to be deleted

Since multi-engine was merged, it is no longer possible to delete a
stack which is create/update IN_PROGRESS.  This re-introduces that
ability.

If the current engine has an active stack lock, stop any active
threads, remove the lock, and delete the stack.  If another engine has
the active stack lock, send an RPC command to the remote engine to stop
any active threads and remove the lock, and then delete the stack
locally.  If no engines have the an active stack lock, acquire the lock
and delete the stack.

Closes-Bug: #1262012
Change-Id: I188e43ad88b98da7d1a08269189aaefa57c36df2
This commit is contained in:
Jason Dunsmore 2013-12-16 14:16:26 -06:00
parent cafe45ecc5
commit 42f788b56f
5 changed files with 179 additions and 6 deletions

View File

@ -72,6 +72,7 @@ class FaultWrapper(wsgi.Middleware):
'InvalidTemplateReference': webob.exc.HTTPBadRequest,
'UnknownUserParameter': webob.exc.HTTPBadRequest,
'RevertFailed': webob.exc.HTTPInternalServerError,
'StopActionFailed': webob.exc.HTTPInternalServerError,
'ServerBuildFailed': webob.exc.HTTPInternalServerError,
'NotSupported': webob.exc.HTTPBadRequest,
'MissingCredentialError': webob.exc.HTTPBadRequest,

View File

@ -331,3 +331,8 @@ class ActionInProgress(HeatException):
class SoftwareConfigMissing(HeatException):
msg_fmt = _("The config (%(software_config_id)s) could not be found.")
class StopActionFailed(HeatException):
msg_fmt = _("Failed to stop stack (%(stack_name)s) on other engine "
"(%(engine_id)s)")

View File

@ -19,6 +19,7 @@ import json
from oslo.config import cfg
import webob
cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
cfg.CONF.import_opt('max_resources_per_stack', 'heat.common.config')
cfg.CONF.import_opt('max_stacks_per_tenant', 'heat.common.config')
@ -44,12 +45,12 @@ from heat.engine import watchrule
from heat.openstack.common import log as logging
from heat.openstack.common import threadgroup
from heat.openstack.common.gettextutils import _
from heat.openstack.common.rpc import service
from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import proxy
from heat.openstack.common.rpc import service
from heat.openstack.common import excutils
from heat.openstack.common import uuidutils
logger = logging.getLogger(__name__)
@ -147,12 +148,24 @@ class ThreadGroupManager(object):
self.groups[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
def stop(self, stack_id):
'''Stop any active threads on a stack.'''
if stack_id in self.groups:
self.groups[stack_id].stop()
del self.groups[stack_id]
class EngineListener(service.Service):
'''
Listen on an AMQP queue while a stack action is in-progress and
respond to stack-related questions. Used for multi-engine support.
Listen on an AMQP queue named for the engine. Allows individual
engines to communicate with each other for multi-engine support.
'''
def __init__(self, host, engine_id, thread_group_mgr):
super(EngineListener, self).__init__(host, engine_id)
self.thread_group_mgr = thread_group_mgr
self.engine_id = engine_id
def listening(self, ctxt):
'''
Respond affirmatively to confirm that the engine performing the
@ -160,6 +173,11 @@ class EngineListener(service.Service):
'''
return True
def stop_stack(self, ctxt, stack_identity):
'''Stop any active threads on a stack.'''
stack_id = stack_identity['stack_id']
self.thread_group_mgr.stop(stack_id)
class EngineService(service.Service):
"""
@ -180,7 +198,8 @@ class EngineService(service.Service):
self.engine_id = stack_lock.StackLock.generate_engine_id()
self.thread_group_mgr = ThreadGroupManager()
self.listener = EngineListener(host, self.engine_id)
self.listener = EngineListener(host, self.engine_id,
self.thread_group_mgr)
logger.debug(_("Starting listener for engine %s") % self.engine_id)
self.listener.start()
@ -541,10 +560,52 @@ class EngineService(service.Service):
:param cnxt: RPC context.
:param stack_identity: Name of the stack you want to delete.
"""
def remote_stop(lock_engine_id):
rpc = proxy.RpcProxy(lock_engine_id, "1.0")
msg = rpc.make_msg("stop_stack", stack_identity=stack_identity)
timeout = cfg.CONF.engine_life_check_timeout
try:
rpc.call(cnxt, msg, topic=lock_engine_id, timeout=timeout)
except rpc_common.Timeout:
return False
st = self._get_stack(cnxt, stack_identity)
logger.info(_('Deleting stack %s') % st.name)
stack = parser.Stack.load(cnxt, stack=st)
logger.info(_('deleting stack %s') % st.name)
lock = stack_lock.StackLock(cnxt, stack, self.engine_id)
acquire_result = lock.try_acquire()
if acquire_result is None:
self.thread_group_mgr.start_with_acquired_lock(stack, lock,
stack.delete)
return
elif acquire_result == self.engine_id: # Current engine has the lock
self.thread_group_mgr.stop(stack.id)
# If the lock isn't released here, then the call to
# start_with_lock below will raise an ActionInProgress
# exception. Ideally, we wouldn't be calling another
# release() here, since it should be called as soon as the
# ThreadGroup is stopped. But apparently there's a race
# between release() the next call to lock.acquire().
db_api.stack_lock_release(stack.id, self.engine_id)
else: # Another engine has the lock
other_engine_id = acquire_result
stop_result = remote_stop(other_engine_id)
if stop_result is None:
logger.debug(_("Successfully stopped remote task on engine %s")
% other_engine_id)
else:
raise exception.StopActionFailed(stack_name=stack.name,
engine_id=other_engine_id)
# There may be additional resources that we don't know about
# if an update was in-progress when the stack was stopped, so
# reload the stack from the database.
st = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=st)
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,

View File

@ -48,6 +48,13 @@ class StackLock(object):
def generate_engine_id():
return str(uuid.uuid4())
def try_acquire(self):
"""
Try to acquire a stack lock, but don't raise an ActionInProgress
exception or try to steal lock.
"""
return db_api.stack_lock_create(self.stack.id, self.engine_id)
@rpc_common.client_exceptions(exception.ActionInProgress)
def acquire(self, retry=True):
"""

View File

@ -24,6 +24,8 @@ import mox
from oslo.config import cfg
cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
from heat.engine import environment
from heat.common import exception
from heat.common import urlfetch
@ -42,9 +44,11 @@ from heat.engine import resource as res
from heat.engine.resources import instance as instances
from heat.engine.resources import nova_utils
from heat.engine import resource as rsrs
from heat.engine import stack_lock
from heat.engine import watchrule
from heat.openstack.common import threadgroup
from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import proxy
from heat.tests.common import HeatTestCase
from heat.tests import generic_resource as generic_rsrc
from heat.tests import utils
@ -670,6 +674,101 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
self.ctx, stack.identifier())
self.m.VerifyAll()
def test_stack_delete_acquired_lock(self):
stack_name = 'service_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
sid = stack.store()
st = db_api.stack_get(self.ctx, sid)
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=st).MultipleTimes().AndReturn(stack)
self.man.tg = DummyThreadGroup()
self.m.StubOutWithMock(stack_lock.StackLock, 'try_acquire')
stack_lock.StackLock.try_acquire().AndReturn(self.man.engine_id)
self.m.ReplayAll()
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
self.m.VerifyAll()
def test_stack_delete_current_engine_active_lock(self):
stack_name = 'service_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
sid = stack.store()
# Insert a fake lock into the db
db_api.stack_lock_create(stack.id, self.man.engine_id)
# Create a fake ThreadGroup too
self.man.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
st = db_api.stack_get(self.ctx, sid)
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=st).MultipleTimes().AndReturn(stack)
self.m.StubOutWithMock(stack_lock.StackLock, 'try_acquire')
stack_lock.StackLock.try_acquire().AndReturn(self.man.engine_id)
self.m.ReplayAll()
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
self.m.VerifyAll()
def test_stack_delete_other_engine_active_lock_failed(self):
stack_name = 'service_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
sid = stack.store()
# Insert a fake lock into the db
db_api.stack_lock_create(stack.id, "other-engine-fake-uuid")
st = db_api.stack_get(self.ctx, sid)
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=st).AndReturn(stack)
self.m.StubOutWithMock(stack_lock.StackLock, 'try_acquire')
stack_lock.StackLock.try_acquire().AndReturn("other-engine-fake-uuid")
rpc = proxy.RpcProxy("other-engine-fake-uuid", "1.0")
msg = rpc.make_msg("stop_stack", stack_identity=mox.IgnoreArg())
self.m.StubOutWithMock(proxy.RpcProxy, 'call')
proxy.RpcProxy.call(self.ctx, msg, topic='other-engine-fake-uuid',
timeout=cfg.CONF.engine_life_check_timeout)\
.AndRaise(rpc_common.Timeout)
self.m.ReplayAll()
self.assertRaises(exception.StopActionFailed,
self.man.delete_stack, self.ctx, stack.identifier())
self.m.VerifyAll()
def test_stack_delete_other_engine_active_lock_succeeded(self):
stack_name = 'service_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
sid = stack.store()
# Insert a fake lock into the db
db_api.stack_lock_create(stack.id, "other-engine-fake-uuid")
st = db_api.stack_get(self.ctx, sid)
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=st).MultipleTimes().AndReturn(stack)
self.m.StubOutWithMock(stack_lock.StackLock, 'try_acquire')
stack_lock.StackLock.try_acquire().AndReturn("other-engine-fake-uuid")
rpc = proxy.RpcProxy("other-engine-fake-uuid", "1.0")
msg = rpc.make_msg("stop_stack", stack_identity=mox.IgnoreArg())
self.m.StubOutWithMock(proxy.RpcProxy, 'call')
proxy.RpcProxy.call(self.ctx, msg, topic='other-engine-fake-uuid',
timeout=cfg.CONF.engine_life_check_timeout)\
.AndReturn(None)
self.m.StubOutWithMock(stack_lock.StackLock, 'acquire')
stack_lock.StackLock.acquire().AndReturn(None)
self.m.ReplayAll()
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
self.m.VerifyAll()
def test_stack_update(self):
stack_name = 'service_update_test_stack'
params = {'foo': 'bar'}