Add initial suspend logic to engine
blueprint: stack-suspend-resume Change-Id: Icf81672534de6b07d938785e659b7f6c733eacc4
This commit is contained in:
parent
123b386a1b
commit
422360e755
|
@ -41,8 +41,8 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
class Stack(object):
|
||||
|
||||
ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK
|
||||
) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK')
|
||||
ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND
|
||||
) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK', 'SUSPEND')
|
||||
|
||||
STATUSES = (IN_PROGRESS, FAILED, COMPLETE
|
||||
) = ('IN_PROGRESS', 'FAILED', 'COMPLETE')
|
||||
|
@ -483,6 +483,47 @@ class Stack(object):
|
|||
db_api.stack_delete(self.context, self.id)
|
||||
self.id = None
|
||||
|
||||
def suspend(self):
|
||||
'''
|
||||
Suspend the stack, which invokes handle_suspend for all stack resources
|
||||
waits for all resources to become SUSPEND_COMPLETE then declares the
|
||||
stack SUSPEND_COMPLETE.
|
||||
Note the default implementation for all resources is to do nothing
|
||||
other than move to SUSPEND_COMPLETE, so the resources must implement
|
||||
handle_suspend for this to have any effect.
|
||||
'''
|
||||
sus_task = scheduler.TaskRunner(self.suspend_task)
|
||||
sus_task(timeout=self.timeout_secs())
|
||||
|
||||
@scheduler.wrappertask
|
||||
def suspend_task(self):
|
||||
'''
|
||||
A task to suspend the stack, suspends each resource in reverse
|
||||
dependency order
|
||||
'''
|
||||
logger.info("Stack %s suspend started" % self.name)
|
||||
self.state_set(self.SUSPEND, self.IN_PROGRESS, 'Stack suspend started')
|
||||
|
||||
stack_status = self.COMPLETE
|
||||
reason = 'Stack suspend complete'
|
||||
|
||||
def resource_suspend(r):
|
||||
return r.suspend()
|
||||
|
||||
sus_task = scheduler.DependencyTaskGroup(self.dependencies,
|
||||
resource_suspend,
|
||||
reverse=True)
|
||||
try:
|
||||
yield sus_task()
|
||||
except exception.ResourceFailure as ex:
|
||||
stack_status = self.FAILED
|
||||
reason = 'Resource failed: %s' % str(ex)
|
||||
except scheduler.Timeout:
|
||||
stack_status = self.FAILED
|
||||
reason = 'Suspend timed out'
|
||||
|
||||
self.state_set(self.SUSPEND, stack_status, reason)
|
||||
|
||||
def output(self, key):
|
||||
'''
|
||||
Get the value of the specified stack output.
|
||||
|
|
|
@ -102,8 +102,8 @@ class Metadata(object):
|
|||
|
||||
|
||||
class Resource(object):
|
||||
ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK
|
||||
) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK')
|
||||
ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND
|
||||
) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK', 'SUSPEND')
|
||||
|
||||
STATUSES = (IN_PROGRESS, FAILED, COMPLETE
|
||||
) = ('IN_PROGRESS', 'FAILED', 'COMPLETE')
|
||||
|
@ -394,6 +394,16 @@ class Resource(object):
|
|||
'''
|
||||
return True
|
||||
|
||||
def check_suspend_complete(self, suspend_data):
|
||||
'''
|
||||
Check if the resource is suspended
|
||||
By default this happens as soon as the handle_suspend() method
|
||||
has completed successfully, but subclasses may customise this by
|
||||
overriding this function. The return value of handle_suspend() is
|
||||
passed in to this function each time it is called.
|
||||
'''
|
||||
return True
|
||||
|
||||
def update(self, json_snippet=None):
|
||||
'''
|
||||
update the resource. Subclasses should provide a handle_update() method
|
||||
|
@ -431,6 +441,24 @@ class Resource(object):
|
|||
self.t = self.stack.resolve_static_data(json_snippet)
|
||||
self.state_set(self.UPDATE, self.COMPLETE)
|
||||
|
||||
def suspend(self):
|
||||
'''
|
||||
Suspend the resource. Subclasses should provide a handle_suspend()
|
||||
method to implement suspend, the base-class handle_update does nothing
|
||||
Note this uses the same coroutine logic as create() since suspending
|
||||
instances is a non-immediate operation and we want to paralellize
|
||||
'''
|
||||
# Don't try to suspend the resource unless it's in a stable state
|
||||
if self.state not in ((self.CREATE, self.COMPLETE),
|
||||
(self.UPDATE, self.COMPLETE),
|
||||
(self.ROLLBACK, self.COMPLETE)):
|
||||
exc = exception.Error('State %s invalid for suspend'
|
||||
% str(self.state))
|
||||
raise exception.ResourceFailure(exc)
|
||||
|
||||
logger.info('suspending %s' % str(self))
|
||||
return self._do_action(self.SUSPEND)
|
||||
|
||||
def physical_resource_name(self):
|
||||
if self.id is None:
|
||||
return None
|
||||
|
|
|
@ -473,6 +473,22 @@ class EngineService(service.Service):
|
|||
return [api.format_stack_resource(resource, detail=False)
|
||||
for resource in stack if resource.id is not None]
|
||||
|
||||
@request_context
|
||||
def stack_suspend(self, cnxt, stack_identity):
|
||||
'''
|
||||
Handle request to perform an action on an existing stack
|
||||
actions are non-lifecycle operations which manipulate the
|
||||
state of the stack but not the definition
|
||||
'''
|
||||
def _stack_suspend(stack):
|
||||
logger.debug("suspending stack %s" % stack.name)
|
||||
stack.suspend()
|
||||
|
||||
s = self._get_stack(cnxt, stack_identity)
|
||||
|
||||
stack = parser.Stack.load(cnxt, stack=s)
|
||||
self._start_in_thread(stack.id, _stack_suspend, stack)
|
||||
|
||||
@request_context
|
||||
def metadata_update(self, cnxt, stack_identity,
|
||||
resource_name, metadata):
|
||||
|
|
|
@ -213,6 +213,10 @@ class EngineClient(heat.openstack.common.rpc.proxy.RpcProxy):
|
|||
return self.call(ctxt, self.make_msg('list_stack_resources',
|
||||
stack_identity=stack_identity))
|
||||
|
||||
def stack_suspend(self, ctxt, stack_identity):
|
||||
return self.call(ctxt, self.make_msg('stack_suspend',
|
||||
stack_identity=stack_identity))
|
||||
|
||||
def metadata_update(self, ctxt, stack_identity, resource_name, metadata):
|
||||
"""
|
||||
Update the metadata for the given resource.
|
||||
|
|
|
@ -35,3 +35,6 @@ class GenericResource(resource.Resource):
|
|||
|
||||
def _resolve_attribute(self, name):
|
||||
return self.name
|
||||
|
||||
def handle_suspend(self):
|
||||
logger.warning('Suspending generic resource (Type "%s")' % self.type())
|
||||
|
|
|
@ -465,6 +465,47 @@ class stackServiceCreateUpdateDeleteTest(HeatTestCase):
|
|||
self.m.VerifyAll()
|
||||
|
||||
|
||||
class stackServiceSuspendTest(HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(stackServiceSuspendTest, self).setUp()
|
||||
self.username = 'stack_service_suspend_test_user'
|
||||
self.tenant = 'stack_service_suspend_test_tenant'
|
||||
setup_dummy_db()
|
||||
self.ctx = create_context(self.m, self.username, self.tenant)
|
||||
|
||||
self.man = service.EngineService('a-host', 'a-topic')
|
||||
|
||||
def test_stack_suspend(self):
|
||||
stack_name = 'service_suspend_test_stack'
|
||||
stack = get_wordpress_stack(stack_name, self.ctx)
|
||||
sid = stack.store()
|
||||
s = db_api.stack_get(self.ctx, sid)
|
||||
|
||||
self.m.StubOutWithMock(parser.Stack, 'load')
|
||||
parser.Stack.load(self.ctx, stack=s).AndReturn(stack)
|
||||
|
||||
self.m.StubOutWithMock(service.EngineService, '_start_in_thread')
|
||||
service.EngineService._start_in_thread(sid,
|
||||
mox.IgnoreArg(),
|
||||
stack).AndReturn(None)
|
||||
self.m.ReplayAll()
|
||||
|
||||
result = self.man.stack_suspend(self.ctx, stack.identifier())
|
||||
self.assertEqual(result, None)
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_stack_suspend_nonexist(self):
|
||||
stack_name = 'service_suspend_nonexist_test_stack'
|
||||
stack = get_wordpress_stack(stack_name, self.ctx)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.assertRaises(exception.StackNotFound,
|
||||
self.man.stack_suspend, self.ctx, stack.identifier())
|
||||
self.m.VerifyAll()
|
||||
|
||||
|
||||
class stackServiceTest(HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -1160,3 +1201,32 @@ class stackServiceTest(HeatTestCase):
|
|||
sl = self.eng.show_stack(self.ctx, None)
|
||||
|
||||
self.assertEqual(len(sl), 0)
|
||||
|
||||
def test_stack_suspend(self):
|
||||
stack_name = 'service_suspend_test_stack'
|
||||
stack = get_wordpress_stack(stack_name, self.ctx)
|
||||
sid = stack.store()
|
||||
s = db_api.stack_get(self.ctx, sid)
|
||||
|
||||
self.m.StubOutWithMock(parser.Stack, 'load')
|
||||
parser.Stack.load(self.ctx, stack=s).AndReturn(stack)
|
||||
|
||||
self.m.StubOutWithMock(service.EngineService, '_start_in_thread')
|
||||
service.EngineService._start_in_thread(sid,
|
||||
mox.IgnoreArg(),
|
||||
stack).AndReturn(None)
|
||||
self.m.ReplayAll()
|
||||
|
||||
result = self.eng.stack_suspend(self.ctx, stack.identifier())
|
||||
self.assertEqual(result, None)
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_stack_suspend_nonexist(self):
|
||||
stack_name = 'service_suspend_nonexist_test_stack'
|
||||
stack = get_wordpress_stack(stack_name, self.ctx)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.assertRaises(exception.StackNotFound,
|
||||
self.eng.stack_suspend, self.ctx, stack.identifier())
|
||||
self.m.VerifyAll()
|
||||
|
|
|
@ -580,6 +580,70 @@ class StackTest(HeatTestCase):
|
|||
self.assertEqual(self.stack.state,
|
||||
(parser.Stack.DELETE, parser.Stack.COMPLETE))
|
||||
|
||||
@stack_delete_after
|
||||
def test_suspend(self):
|
||||
self.m.ReplayAll()
|
||||
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
|
||||
self.stack = parser.Stack(self.ctx, 'suspend_test',
|
||||
parser.Template(tmpl))
|
||||
stack_id = self.stack.store()
|
||||
self.stack.create()
|
||||
self.assertEqual(self.stack.state,
|
||||
(self.stack.CREATE, self.stack.COMPLETE))
|
||||
|
||||
self.stack.suspend()
|
||||
|
||||
self.assertEqual(self.stack.state,
|
||||
(self.stack.SUSPEND, self.stack.COMPLETE))
|
||||
self.m.VerifyAll()
|
||||
|
||||
@stack_delete_after
|
||||
def test_suspend_fail(self):
|
||||
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
|
||||
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
|
||||
exc = exception.ResourceFailure(Exception('foo'))
|
||||
generic_rsrc.GenericResource.handle_suspend().AndRaise(exc)
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = parser.Stack(self.ctx, 'suspend_test_fail',
|
||||
parser.Template(tmpl))
|
||||
|
||||
stack_id = self.stack.store()
|
||||
self.stack.create()
|
||||
self.assertEqual(self.stack.state,
|
||||
(self.stack.CREATE, self.stack.COMPLETE))
|
||||
|
||||
self.stack.suspend()
|
||||
|
||||
self.assertEqual(self.stack.state,
|
||||
(self.stack.SUSPEND, self.stack.FAILED))
|
||||
self.assertEqual(self.stack.status_reason,
|
||||
'Resource failed: Exception: foo')
|
||||
self.m.VerifyAll()
|
||||
|
||||
@stack_delete_after
|
||||
def test_suspend_timeout(self):
|
||||
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
|
||||
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
|
||||
exc = scheduler.Timeout('foo', 0)
|
||||
generic_rsrc.GenericResource.handle_suspend().AndRaise(exc)
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = parser.Stack(self.ctx, 'suspend_test_fail_timeout',
|
||||
parser.Template(tmpl))
|
||||
|
||||
stack_id = self.stack.store()
|
||||
self.stack.create()
|
||||
self.assertEqual(self.stack.state,
|
||||
(self.stack.CREATE, self.stack.COMPLETE))
|
||||
|
||||
self.stack.suspend()
|
||||
|
||||
self.assertEqual(self.stack.state,
|
||||
(self.stack.SUSPEND, self.stack.FAILED))
|
||||
self.assertEqual(self.stack.status_reason, 'Suspend timed out')
|
||||
self.m.VerifyAll()
|
||||
|
||||
@stack_delete_after
|
||||
def test_delete_rollback(self):
|
||||
self.stack = parser.Stack(self.ctx, 'delete_rollback_test',
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from eventlet.support import greenlets as greenlet
|
||||
|
||||
from heat.common import context
|
||||
from heat.common import exception
|
||||
|
@ -386,6 +387,79 @@ class ResourceTest(HeatTestCase):
|
|||
self.assertEqual((res.UPDATE, res.FAILED), res.state)
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_suspend_ok(self):
|
||||
# patch in a dummy property schema for GenericResource
|
||||
dummy_schema = {'Foo': {'Type': 'String'}}
|
||||
generic_rsrc.GenericResource.properties_schema = dummy_schema
|
||||
|
||||
tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
|
||||
res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
|
||||
res.update_allowed_keys = ('Properties',)
|
||||
res.update_allowed_properties = ('Foo',)
|
||||
scheduler.TaskRunner(res.create)()
|
||||
self.assertEqual((res.CREATE, res.COMPLETE), res.state)
|
||||
scheduler.TaskRunner(res.suspend)()
|
||||
self.assertEqual((res.SUSPEND, res.COMPLETE), res.state)
|
||||
|
||||
def test_suspend_fail_inprogress(self):
|
||||
# patch in a dummy property schema for GenericResource
|
||||
dummy_schema = {'Foo': {'Type': 'String'}}
|
||||
generic_rsrc.GenericResource.properties_schema = dummy_schema
|
||||
|
||||
tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
|
||||
res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
|
||||
scheduler.TaskRunner(res.create)()
|
||||
self.assertEqual((res.CREATE, res.COMPLETE), res.state)
|
||||
|
||||
res.state_set(res.CREATE, res.IN_PROGRESS)
|
||||
suspend = scheduler.TaskRunner(res.suspend)
|
||||
self.assertRaises(exception.ResourceFailure, suspend)
|
||||
|
||||
res.state_set(res.UPDATE, res.IN_PROGRESS)
|
||||
suspend = scheduler.TaskRunner(res.suspend)
|
||||
self.assertRaises(exception.ResourceFailure, suspend)
|
||||
|
||||
res.state_set(res.DELETE, res.IN_PROGRESS)
|
||||
suspend = scheduler.TaskRunner(res.suspend)
|
||||
self.assertRaises(exception.ResourceFailure, suspend)
|
||||
|
||||
def test_suspend_fail_exit(self):
|
||||
# patch in a dummy property schema for GenericResource
|
||||
dummy_schema = {'Foo': {'Type': 'String'}}
|
||||
generic_rsrc.GenericResource.properties_schema = dummy_schema
|
||||
|
||||
tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
|
||||
res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
|
||||
scheduler.TaskRunner(res.create)()
|
||||
self.assertEqual((res.CREATE, res.COMPLETE), res.state)
|
||||
|
||||
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
|
||||
generic_rsrc.GenericResource.handle_suspend().AndRaise(
|
||||
greenlet.GreenletExit())
|
||||
self.m.ReplayAll()
|
||||
|
||||
suspend = scheduler.TaskRunner(res.suspend)
|
||||
self.assertRaises(greenlet.GreenletExit, suspend)
|
||||
self.assertEqual((res.SUSPEND, res.FAILED), res.state)
|
||||
|
||||
def test_suspend_fail_exception(self):
|
||||
# patch in a dummy property schema for GenericResource
|
||||
dummy_schema = {'Foo': {'Type': 'String'}}
|
||||
generic_rsrc.GenericResource.properties_schema = dummy_schema
|
||||
|
||||
tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
|
||||
res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
|
||||
scheduler.TaskRunner(res.create)()
|
||||
self.assertEqual((res.CREATE, res.COMPLETE), res.state)
|
||||
|
||||
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
|
||||
generic_rsrc.GenericResource.handle_suspend().AndRaise(Exception())
|
||||
self.m.ReplayAll()
|
||||
|
||||
suspend = scheduler.TaskRunner(res.suspend)
|
||||
self.assertRaises(exception.ResourceFailure, suspend)
|
||||
self.assertEqual((res.SUSPEND, res.FAILED), res.state)
|
||||
|
||||
|
||||
class MetadataTest(HeatTestCase):
|
||||
def setUp(self):
|
||||
|
|
|
@ -141,6 +141,10 @@ class EngineRpcAPITestCase(testtools.TestCase):
|
|||
self._test_engine_api('list_stack_resources', 'call',
|
||||
stack_identity=self.identity)
|
||||
|
||||
def test_stack_suspend(self):
|
||||
self._test_engine_api('stack_suspend', 'call',
|
||||
stack_identity=self.identity)
|
||||
|
||||
def test_metadata_update(self):
|
||||
self._test_engine_api('metadata_update', 'call',
|
||||
stack_identity=self.identity,
|
||||
|
|
Loading…
Reference in New Issue