Add RPC API for cancel update stack

Implements cancelling of stack update in progress, for any stack
regardless of disable_rollback status.

As a side-effect, a framework for sending arbitrary one-time signals from
outside into a task running as greenthread is added. ThreadgroupManager
holds references of greenlet events associated with a given stack now.

Implements blueprint cancel-update-stack (partial)

Change-Id: Ic929d42946cf28eeb2a7caea8bf908f492693c09
This commit is contained in:
Pavlo Shchelokovskyy 2014-08-26 12:02:37 +03:00
parent 38aa12a015
commit a75f055caf
11 changed files with 219 additions and 28 deletions

View File

@ -76,6 +76,7 @@ class FaultWrapper(wsgi.Middleware):
'UnknownUserParameter': webob.exc.HTTPBadRequest, 'UnknownUserParameter': webob.exc.HTTPBadRequest,
'RevertFailed': webob.exc.HTTPInternalServerError, 'RevertFailed': webob.exc.HTTPInternalServerError,
'StopActionFailed': webob.exc.HTTPInternalServerError, 'StopActionFailed': webob.exc.HTTPInternalServerError,
'EventSendFailed': webob.exc.HTTPInternalServerError,
'ServerBuildFailed': webob.exc.HTTPInternalServerError, 'ServerBuildFailed': webob.exc.HTTPInternalServerError,
'NotSupported': webob.exc.HTTPBadRequest, 'NotSupported': webob.exc.HTTPBadRequest,
'MissingCredentialError': webob.exc.HTTPBadRequest, 'MissingCredentialError': webob.exc.HTTPBadRequest,

View File

@ -356,3 +356,8 @@ class ActionInProgress(HeatException):
class StopActionFailed(HeatException): class StopActionFailed(HeatException):
msg_fmt = _("Failed to stop stack (%(stack_name)s) on other engine " msg_fmt = _("Failed to stop stack (%(stack_name)s) on other engine "
"(%(engine_id)s)") "(%(engine_id)s)")
class EventSendFailed(HeatException):
msg_fmt = _("Failed to send message to stack (%(stack_name)s) "
"on other engine (%(engine_id)s)")

View File

@ -375,13 +375,11 @@ class DependencyTaskGroup(object):
if self.aggregate_exceptions: if self.aggregate_exceptions:
self._cancel_recursively(k, r) self._cancel_recursively(k, r)
else: else:
for r in self._runners.itervalues(): self.cancel_all(grace_period=self.error_wait_time)
r.cancel(grace_period=self.error_wait_time)
raised_exceptions.append(exc_info) raised_exceptions.append(exc_info)
except: # noqa except: # noqa
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
for r in self._runners.itervalues(): self.cancel_all()
r.cancel()
if raised_exceptions: if raised_exceptions:
if self.aggregate_exceptions: if self.aggregate_exceptions:
@ -390,6 +388,10 @@ class DependencyTaskGroup(object):
exc_type, exc_val, traceback = raised_exceptions[0] exc_type, exc_val, traceback = raised_exceptions[0]
raise exc_type, exc_val, traceback raise exc_type, exc_val, traceback
def cancel_all(self, grace_period=None):
for r in self._runners.itervalues():
r.cancel(grace_period=grace_period)
def _cancel_recursively(self, key, runner): def _cancel_recursively(self, key, runner):
runner.cancel() runner.cancel()
node = self._graph[key] node = self._graph[key]

View File

@ -11,11 +11,12 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import eventlet import collections
import functools import functools
import json import json
import os import os
import eventlet
from oslo.config import cfg from oslo.config import cfg
from oslo import messaging from oslo import messaging
import requests import requests
@ -74,6 +75,7 @@ class ThreadGroupManager(object):
def __init__(self): def __init__(self):
super(ThreadGroupManager, self).__init__() super(ThreadGroupManager, self).__init__()
self.groups = {} self.groups = {}
self.events = collections.defaultdict(list)
# Create dummy service task, because when there is nothing queued # Create dummy service task, because when there is nothing queued
# on self.tg the process exits # on self.tg the process exits
@ -150,6 +152,9 @@ class ThreadGroupManager(object):
self.groups[stack_id].add_timer(cfg.CONF.periodic_interval, self.groups[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs) func, *args, **kwargs)
def add_event(self, stack_id, event):
self.events[stack_id].append(event)
def stop_timers(self, stack_id): def stop_timers(self, stack_id):
if stack_id in self.groups: if stack_id in self.groups:
self.groups[stack_id].stop_timers() self.groups[stack_id].stop_timers()
@ -157,6 +162,7 @@ class ThreadGroupManager(object):
def stop(self, stack_id, graceful=False): def stop(self, stack_id, graceful=False):
'''Stop any active threads on a stack.''' '''Stop any active threads on a stack.'''
if stack_id in self.groups: if stack_id in self.groups:
self.events.pop(stack_id, None)
threadgroup = self.groups.pop(stack_id) threadgroup = self.groups.pop(stack_id)
threads = threadgroup.threads[:] threads = threadgroup.threads[:]
@ -174,6 +180,10 @@ class ThreadGroupManager(object):
while not all(links_done.values()): while not all(links_done.values()):
eventlet.sleep() eventlet.sleep()
def send(self, stack_id, message):
for event in self.events.get(stack_id, []):
event.send(message)
class StackWatch(object): class StackWatch(object):
def __init__(self, thread_group_mgr): def __init__(self, thread_group_mgr):
@ -261,6 +271,9 @@ class EngineListener(service.Service):
Listen on an AMQP queue named for the engine. Allows individual Listen on an AMQP queue named for the engine. Allows individual
engines to communicate with each other for multi-engine support. engines to communicate with each other for multi-engine support.
''' '''
ACTIONS = (STOP_STACK, SEND) = ('stop_stack', 'send')
def __init__(self, host, engine_id, thread_group_mgr): def __init__(self, host, engine_id, thread_group_mgr):
super(EngineListener, self).__init__() super(EngineListener, self).__init__()
self.thread_group_mgr = thread_group_mgr self.thread_group_mgr = thread_group_mgr
@ -285,6 +298,10 @@ class EngineListener(service.Service):
stack_id = stack_identity['stack_id'] stack_id = stack_identity['stack_id']
self.thread_group_mgr.stop(stack_id) self.thread_group_mgr.stop(stack_id)
def send(self, ctxt, stack_identity, message):
stack_id = stack_identity['stack_id']
self.thread_group_mgr.send(stack_id, message)
class EngineService(service.Service): class EngineService(service.Service):
""" """
@ -647,13 +664,55 @@ class EngineService(service.Service):
self._validate_deferred_auth_context(cnxt, updated_stack) self._validate_deferred_auth_context(cnxt, updated_stack)
updated_stack.validate() updated_stack.validate()
event = eventlet.event.Event()
self.thread_group_mgr.add_event(current_stack.id, event)
self.thread_group_mgr.start_with_lock(cnxt, current_stack, self.thread_group_mgr.start_with_lock(cnxt, current_stack,
self.engine_id, self.engine_id,
current_stack.update, current_stack.update,
updated_stack) updated_stack,
event=event)
return dict(current_stack.identifier()) return dict(current_stack.identifier())
@request_context
def stack_cancel_update(self, cnxt, stack_identity):
"""Cancel currently running stack update.
:param cnxt: RPC context.
:param stack_identity: Name of the stack for which to cancel update.
"""
# Get the database representation of the existing stack
db_stack = self._get_stack(cnxt, stack_identity)
current_stack = parser.Stack.load(cnxt, stack=db_stack)
if current_stack.state != (current_stack.UPDATE,
current_stack.IN_PROGRESS):
msg = _("Cancelling update when stack is %s"
) % str(current_stack.state)
raise exception.NotSupported(feature=msg)
LOG.info(_('Starting cancel of updating stack %s') % db_stack.name)
# stop the running update and take the lock
# as we cancel only running update, the acquire_result is
# always some engine_id, not None
lock = stack_lock.StackLock(cnxt, current_stack,
self.engine_id)
engine_id = lock.try_acquire()
# Current engine has the lock
if engine_id == self.engine_id:
self.thread_group_mgr.send(current_stack.id, 'cancel')
# Another active engine has the lock
elif stack_lock.StackLock.engine_alive(cnxt, engine_id):
cancel_result = self._remote_call(
cnxt, engine_id, self.listener.SEND,
stack_identity, rpc_api.THREAD_CANCEL)
if cancel_result is None:
LOG.debug("Successfully sent %(msg)s message "
"to remote task on engine %(eng)s" % {
'eng': engine_id, 'msg': 'cancel'})
else:
raise exception.EventSendFailed(stack_name=current_stack.name,
engine_id=engine_id)
@request_context @request_context
def validate_template(self, cnxt, template, params=None): def validate_template(self, cnxt, template, params=None):
""" """
@ -738,6 +797,17 @@ class EngineService(service.Service):
return s.raw_template.template return s.raw_template.template
return None return None
def _remote_call(self, cnxt, lock_engine_id, call, *args, **kwargs):
timeout = cfg.CONF.engine_life_check_timeout
self.cctxt = self._client.prepare(
version='1.0',
timeout=timeout,
topic=lock_engine_id)
try:
self.cctxt.call(cnxt, call, *args, **kwargs)
except messaging.MessagingTimeout:
return False
@request_context @request_context
def delete_stack(self, cnxt, stack_identity): def delete_stack(self, cnxt, stack_identity):
""" """
@ -746,18 +816,6 @@ class EngineService(service.Service):
:param cnxt: RPC context. :param cnxt: RPC context.
:param stack_identity: Name of the stack you want to delete. :param stack_identity: Name of the stack you want to delete.
""" """
def remote_stop(lock_engine_id):
timeout = cfg.CONF.engine_life_check_timeout
self.cctxt = self._client.prepare(
version='1.0',
timeout=timeout,
topic=lock_engine_id)
try:
self.cctxt.call(cnxt,
'stop_stack',
stack_identity=stack_identity)
except messaging.MessagingTimeout:
return False
st = self._get_stack(cnxt, stack_identity) st = self._get_stack(cnxt, stack_identity)
LOG.info(_('Deleting stack %s') % st.name) LOG.info(_('Deleting stack %s') % st.name)
@ -782,7 +840,9 @@ class EngineService(service.Service):
# Another active engine has the lock # Another active engine has the lock
elif stack_lock.StackLock.engine_alive(cnxt, acquire_result): elif stack_lock.StackLock.engine_alive(cnxt, acquire_result):
stop_result = remote_stop(acquire_result) stop_result = self._remote_call(
cnxt, acquire_result, self.listener.STOP_STACK,
stack_identity=stack_identity)
if stop_result is None: if stop_result is None:
LOG.debug("Successfully stopped remote task on engine %s" LOG.debug("Successfully stopped remote task on engine %s"
% acquire_result) % acquire_result)

View File

@ -39,12 +39,20 @@ from heat.engine import update
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
from heat.openstack.common import strutils from heat.openstack.common import strutils
from heat.rpc import api as rpc_api
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
ERROR_WAIT_TIME = 240 ERROR_WAIT_TIME = 240
class ForcedCancel(BaseException):
"""Exception raised to cancel task execution."""
def __str__(self):
return "Operation cancelled"
class Stack(collections.Mapping): class Stack(collections.Mapping):
ACTIONS = ( ACTIONS = (
@ -660,7 +668,7 @@ class Stack(collections.Mapping):
post_func=rollback) post_func=rollback)
creator(timeout=self.timeout_secs()) creator(timeout=self.timeout_secs())
def update(self, newstack): def update(self, newstack, event=None):
''' '''
Compare the current stack with newstack, Compare the current stack with newstack,
and where necessary create/update/delete the resources until and where necessary create/update/delete the resources until
@ -673,11 +681,12 @@ class Stack(collections.Mapping):
60 minutes, set in the constructor 60 minutes, set in the constructor
''' '''
self.updated_time = datetime.utcnow() self.updated_time = datetime.utcnow()
updater = scheduler.TaskRunner(self.update_task, newstack) updater = scheduler.TaskRunner(self.update_task, newstack,
event=event)
updater() updater()
@scheduler.wrappertask @scheduler.wrappertask
def update_task(self, newstack, action=UPDATE): def update_task(self, newstack, action=UPDATE, event=None):
if action not in (self.UPDATE, self.ROLLBACK): if action not in (self.UPDATE, self.ROLLBACK):
LOG.error(_("Unexpected action %s passed to update!") % action) LOG.error(_("Unexpected action %s passed to update!") % action)
self.state_set(self.UPDATE, self.FAILED, self.state_set(self.UPDATE, self.FAILED,
@ -725,7 +734,12 @@ class Stack(collections.Mapping):
updater.start(timeout=self.timeout_secs()) updater.start(timeout=self.timeout_secs())
yield yield
while not updater.step(): while not updater.step():
yield if event is None or not event.ready():
yield
else:
message = event.wait()
if message == rpc_api.THREAD_CANCEL:
raise ForcedCancel()
finally: finally:
self.reset_dependencies() self.reset_dependencies()
@ -738,6 +752,15 @@ class Stack(collections.Mapping):
except scheduler.Timeout: except scheduler.Timeout:
stack_status = self.FAILED stack_status = self.FAILED
reason = 'Timed out' reason = 'Timed out'
except ForcedCancel as e:
reason = six.text_type(e)
stack_status = self.FAILED
if action == self.UPDATE:
update_task.updater.cancel_all()
yield self.update_task(oldstack, action=self.ROLLBACK)
return
except exception.ResourceFailure as e: except exception.ResourceFailure as e:
reason = six.text_type(e) reason = six.text_type(e)

View File

@ -57,14 +57,14 @@ class StackUpdate(object):
reverse=True, reverse=True,
error_wait_time=self.error_wait_time) error_wait_time=self.error_wait_time)
update = scheduler.DependencyTaskGroup(self.dependencies(), self.updater = scheduler.DependencyTaskGroup(self.dependencies(),
self._resource_update) self._resource_update)
if not self.rollback: if not self.rollback:
yield cleanup_prev() yield cleanup_prev()
try: try:
yield update() yield self.updater()
finally: finally:
self.previous_stack.reset_dependencies() self.previous_stack.reset_dependencies()

View File

@ -242,3 +242,5 @@ SNAPSHOT_KEYS = (
'status', 'status',
'status_reason' 'status_reason'
) )
THREAD_MESSAGES = (THREAD_CANCEL,) = ('cancel',)

View File

@ -362,6 +362,10 @@ class EngineClient(object):
return self.call(ctxt, self.make_msg('stack_check', return self.call(ctxt, self.make_msg('stack_check',
stack_identity=stack_identity)) stack_identity=stack_identity))
def stack_cancel_update(self, ctxt, stack_identity):
return self.call(ctxt, self.make_msg('stack_cancel_update',
stack_identity=stack_identity))
def metadata_update(self, ctxt, stack_identity, resource_name, metadata): def metadata_update(self, ctxt, stack_identity, resource_name, metadata):
""" """
Update the metadata for the given resource. Update the metadata for the given resource.

View File

@ -18,6 +18,7 @@ import json
import sys import sys
import uuid import uuid
from eventlet import event as grevent
import mock import mock
import mox import mox
from oslo.config import cfg from oslo.config import cfg
@ -878,6 +879,9 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
self.m.StubOutWithMock(stack, 'validate') self.m.StubOutWithMock(stack, 'validate')
stack.validate().AndReturn(None) stack.validate().AndReturn(None)
evt_mock = self.m.CreateMockAnything()
self.m.StubOutWithMock(grevent, 'Event')
grevent.Event().AndReturn(evt_mock)
self.m.StubOutWithMock(threadgroup, 'ThreadGroup') self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
threadgroup.ThreadGroup().AndReturn(DummyThreadGroup()) threadgroup.ThreadGroup().AndReturn(DummyThreadGroup())
@ -889,6 +893,7 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
self.assertEqual(old_stack.identifier(), result) self.assertEqual(old_stack.identifier(), result)
self.assertIsInstance(result, dict) self.assertIsInstance(result, dict)
self.assertTrue(result['stack_id']) self.assertTrue(result['stack_id'])
self.assertEqual(self.man.thread_group_mgr.events[sid], [evt_mock])
self.m.VerifyAll() self.m.VerifyAll()
def test_stack_update_reuses_api_params(self): def test_stack_update_reuses_api_params(self):
@ -928,6 +933,40 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
self.assertTrue(result['stack_id']) self.assertTrue(result['stack_id'])
self.m.VerifyAll() self.m.VerifyAll()
def test_stack_cancel_update_same_engine(self):
stack_name = 'service_update_cancel_test_stack'
old_stack = get_wordpress_stack(stack_name, self.ctx)
old_stack.state_set(old_stack.UPDATE, old_stack.IN_PROGRESS,
'test_override')
old_stack.disable_rollback = False
old_stack.store()
load_mock = self.patchobject(parser.Stack, 'load')
load_mock.return_value = old_stack
lock_mock = self.patchobject(stack_lock.StackLock, 'try_acquire')
lock_mock.return_value = self.man.engine_id
self.patchobject(self.man.thread_group_mgr, 'send')
self.man.stack_cancel_update(self.ctx, old_stack.identifier())
self.man.thread_group_mgr.send.assert_called_once_with(old_stack.id,
'cancel')
def test_stack_cancel_update_wrong_state_fails(self):
stack_name = 'service_update_cancel_test_stack'
old_stack = get_wordpress_stack(stack_name, self.ctx)
old_stack.state_set(old_stack.UPDATE, old_stack.COMPLETE,
'test_override')
old_stack.store()
load_mock = self.patchobject(parser.Stack, 'load')
load_mock.return_value = old_stack
ex = self.assertRaises(
dispatcher.ExpectedException,
self.man.stack_cancel_update, self.ctx, old_stack.identifier())
self.assertEqual(ex.exc_info[0], exception.NotSupported)
self.assertIn("Cancelling update when stack is "
"('UPDATE', 'COMPLETE')",
six.text_type(ex.exc_info[1]))
def test_stack_update_equals(self): def test_stack_update_equals(self):
stack_name = 'test_stack_update_equals_resource_limit' stack_name = 'test_stack_update_equals_resource_limit'
params = {} params = {}
@ -1638,7 +1677,7 @@ class StackServiceTest(HeatTestCase):
thread = self.m.CreateMockAnything() thread = self.m.CreateMockAnything()
thread.link(mox.IgnoreArg(), self.stack.id).AndReturn(None) thread.link(mox.IgnoreArg(), self.stack.id).AndReturn(None)
def run(stack_id, func, *args): def run(stack_id, func, *args, **kwargs):
func(*args) func(*args)
return thread return thread
self.eng.thread_group_mgr.start = run self.eng.thread_group_mgr.start = run
@ -3246,6 +3285,22 @@ class ThreadGroupManagerTest(HeatTestCase):
self.cfg_mock.CONF.periodic_interval, self.cfg_mock.CONF.periodic_interval,
self.f, *self.fargs, **self.fkwargs) self.f, *self.fargs, **self.fkwargs)
def test_tgm_add_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
self.assertEqual(thm.events[stack_id], [e1, e2])
def test_tgm_send(self):
stack_id = 'send_test'
e1, e2 = mock.MagicMock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.send(stack_id, 'test_message')
class ThreadGroupManagerStopTest(HeatTestCase): class ThreadGroupManagerStopTest(HeatTestCase):
def test_tgm_stop(self): def test_tgm_stop(self):
@ -3262,6 +3317,7 @@ class ThreadGroupManagerStopTest(HeatTestCase):
done.append(thread) done.append(thread)
thm = service.ThreadGroupManager() thm = service.ThreadGroupManager()
thm.add_event(stack_id, mock.Mock())
thread = thm.start(stack_id, function) thread = thm.start(stack_id, function)
thread.link(linked, thread) thread.link(linked, thread)
@ -3269,6 +3325,7 @@ class ThreadGroupManagerStopTest(HeatTestCase):
self.assertIn(thread, done) self.assertIn(thread, done)
self.assertNotIn(stack_id, thm.groups) self.assertNotIn(stack_id, thm.groups)
self.assertNotIn(stack_id, thm.events)
class SnapshotServiceTest(HeatTestCase): class SnapshotServiceTest(HeatTestCase):

View File

@ -2605,6 +2605,39 @@ class StackTest(HeatTestCase):
mock_state.call_args_list[1][0][:2]) mock_state.call_args_list[1][0][:2])
self.m.VerifyAll() self.m.VerifyAll()
def test_update_rollback_on_cancel_event(self):
tmpl = {'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {'AResource': {'Type': 'ResourceWithPropsType',
'Properties': {'Foo': 'abc'}}}}
self.stack = parser.Stack(self.ctx, 'update_test_stack',
template.Template(tmpl),
disable_rollback=False)
self.stack.store()
self.stack.create()
self.assertEqual((parser.Stack.CREATE, parser.Stack.COMPLETE),
self.stack.state)
tmpl2 = {'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {'AResource': {'Type': 'ResourceWithPropsType',
'Properties': {'Foo': 'xyz'}},
}}
updated_stack = parser.Stack(self.ctx, 'updated_stack',
template.Template(tmpl2),
disable_rollback=False)
evt_mock = mock.MagicMock()
evt_mock.ready.return_value = True
evt_mock.wait.return_value = 'cancel'
self.m.ReplayAll()
self.stack.update(updated_stack, event=evt_mock)
self.assertEqual((parser.Stack.ROLLBACK, parser.Stack.COMPLETE),
self.stack.state)
self.assertEqual('abc', self.stack['AResource'].properties['Foo'])
self.m.VerifyAll()
def test_update_rollback_fail(self): def test_update_rollback_fail(self):
tmpl = {'HeatTemplateFormatVersion': '2012-12-12', tmpl = {'HeatTemplateFormatVersion': '2012-12-12',
'Parameters': {'AParam': {'Type': 'String'}}, 'Parameters': {'AParam': {'Type': 'String'}},

View File

@ -193,6 +193,10 @@ class EngineRpcAPITestCase(testtools.TestCase):
self._test_engine_api('stack_resume', 'call', self._test_engine_api('stack_resume', 'call',
stack_identity=self.identity) stack_identity=self.identity)
def test_stack_cancel_update(self):
self._test_engine_api('stack_cancel_update', 'call',
stack_identity=self.identity)
def test_metadata_update(self): def test_metadata_update(self):
self._test_engine_api('metadata_update', 'call', self._test_engine_api('metadata_update', 'call',
stack_identity=self.identity, stack_identity=self.identity,