Merge "Add initial resume logic to engine"

This commit is contained in:
Jenkins 2013-07-01 22:36:45 +00:00 committed by Gerrit Code Review
commit 4b355a732a
9 changed files with 218 additions and 14 deletions

View File

@ -42,8 +42,9 @@ logger = logging.getLogger(__name__)
class Stack(object):
ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND
) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK', 'SUSPEND')
ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND, RESUME
) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK', 'SUSPEND',
'RESUME')
STATUSES = (IN_PROGRESS, FAILED, COMPLETE
) = ('IN_PROGRESS', 'FAILED', 'COMPLETE')
@ -464,6 +465,20 @@ class Stack(object):
reverse=True)
sus_task(timeout=self.timeout_secs())
def resume(self):
'''
Resume the stack, which invokes handle_resume for all stack resources
waits for all resources to become RESUME_COMPLETE then declares the
stack RESUME_COMPLETE.
Note the default implementation for all resources is to do nothing
other than move to RESUME_COMPLETE, so the resources must implement
handle_resume for this to have any effect.
'''
sus_task = scheduler.TaskRunner(self.stack_task,
action=self.RESUME,
reverse=False)
sus_task(timeout=self.timeout_secs())
def output(self, key):
'''
Get the value of the specified stack output.

View File

@ -113,8 +113,9 @@ class Metadata(object):
class Resource(object):
ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND
) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK', 'SUSPEND')
ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND, RESUME
) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK',
'SUSPEND', 'RESUME')
STATUSES = (IN_PROGRESS, FAILED, COMPLETE
) = ('IN_PROGRESS', 'FAILED', 'COMPLETE')
@ -439,9 +440,7 @@ class Resource(object):
def suspend(self):
'''
Suspend the resource. Subclasses should provide a handle_suspend()
method to implement suspend, the base-class handle_update does nothing
Note this uses the same coroutine logic as create() since suspending
instances is a non-immediate operation and we want to paralellize
method to implement suspend
'''
# Don't try to suspend the resource unless it's in a stable state
if (self.action == self.DELETE or self.status != self.COMPLETE):
@ -452,6 +451,20 @@ class Resource(object):
logger.info('suspending %s' % str(self))
return self._do_action(self.SUSPEND)
def resume(self):
'''
Resume the resource. Subclasses should provide a handle_resume()
method to implement resume
'''
# Can't resume a resource unless it's SUSPEND_COMPLETE
if self.state != (self.SUSPEND, self.COMPLETE):
exc = exception.Error('State %s invalid for resume'
% str(self.state))
raise exception.ResourceFailure(exc)
logger.info('resuming %s' % str(self))
return self._do_action(self.RESUME)
def physical_resource_name(self):
if self.id is None:
return None

View File

@ -476,9 +476,7 @@ class EngineService(service.Service):
@request_context
def stack_suspend(self, cnxt, stack_identity):
'''
Handle request to perform an action on an existing stack
actions are non-lifecycle operations which manipulate the
state of the stack but not the definition
Handle request to perform suspend action on a stack
'''
def _stack_suspend(stack):
logger.debug("suspending stack %s" % stack.name)
@ -489,6 +487,20 @@ class EngineService(service.Service):
stack = parser.Stack.load(cnxt, stack=s)
self._start_in_thread(stack.id, _stack_suspend, stack)
@request_context
def stack_resume(self, cnxt, stack_identity):
'''
Handle request to perform a resume action on a stack
'''
def _stack_resume(stack):
logger.debug("resuming stack %s" % stack.name)
stack.resume()
s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s)
self._start_in_thread(stack.id, _stack_resume, stack)
@request_context
def metadata_update(self, cnxt, stack_identity,
resource_name, metadata):

View File

@ -217,6 +217,10 @@ class EngineClient(heat.openstack.common.rpc.proxy.RpcProxy):
return self.call(ctxt, self.make_msg('stack_suspend',
stack_identity=stack_identity))
def stack_resume(self, ctxt, stack_identity):
return self.call(ctxt, self.make_msg('stack_resume',
stack_identity=stack_identity))
def metadata_update(self, ctxt, stack_identity, resource_name, metadata):
"""
Update the metadata for the given resource.

View File

@ -38,3 +38,6 @@ class GenericResource(resource.Resource):
def handle_suspend(self):
logger.warning('Suspending generic resource (Type "%s")' % self.type())
def handle_resume(self):
logger.warning('Resuming generic resource (Type "%s")' % self.type())

View File

@ -465,10 +465,10 @@ class stackServiceCreateUpdateDeleteTest(HeatTestCase):
self.m.VerifyAll()
class stackServiceSuspendTest(HeatTestCase):
class stackServiceSuspendResumeTest(HeatTestCase):
def setUp(self):
super(stackServiceSuspendTest, self).setUp()
super(stackServiceSuspendResumeTest, self).setUp()
self.username = 'stack_service_suspend_test_user'
self.tenant = 'stack_service_suspend_test_tenant'
setup_dummy_db()
@ -493,6 +493,23 @@ class stackServiceSuspendTest(HeatTestCase):
result = self.man.stack_suspend(self.ctx, stack.identifier())
self.assertEqual(result, None)
self.m.VerifyAll()
@stack_context('service_resume_test_stack', False)
def test_stack_resume(self):
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx,
stack=mox.IgnoreArg()).AndReturn(self.stack)
self.m.StubOutWithMock(service.EngineService, '_start_in_thread')
service.EngineService._start_in_thread(self.stack.id,
mox.IgnoreArg(),
self.stack).AndReturn(None)
self.m.ReplayAll()
result = self.man.stack_resume(self.ctx, self.stack.identifier())
self.assertEqual(result, None)
self.m.VerifyAll()
def test_stack_suspend_nonexist(self):
@ -505,6 +522,16 @@ class stackServiceSuspendTest(HeatTestCase):
self.man.stack_suspend, self.ctx, stack.identifier())
self.m.VerifyAll()
def test_stack_resume_nonexist(self):
stack_name = 'service_resume_nonexist_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
self.m.ReplayAll()
self.assertRaises(exception.StackNotFound,
self.man.stack_resume, self.ctx, stack.identifier())
self.m.VerifyAll()
class stackServiceTest(HeatTestCase):

View File

@ -597,7 +597,7 @@ class StackTest(HeatTestCase):
(parser.Stack.DELETE, parser.Stack.COMPLETE))
@stack_delete_after
def test_suspend(self):
def test_suspend_resume(self):
self.m.ReplayAll()
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.stack = parser.Stack(self.ctx, 'suspend_test',
@ -611,6 +611,12 @@ class StackTest(HeatTestCase):
self.assertEqual(self.stack.state,
(self.stack.SUSPEND, self.stack.COMPLETE))
self.stack.resume()
self.assertEqual(self.stack.state,
(self.stack.RESUME, self.stack.COMPLETE))
self.m.VerifyAll()
@stack_delete_after
@ -637,6 +643,35 @@ class StackTest(HeatTestCase):
'Resource suspend failed: Exception: foo')
self.m.VerifyAll()
@stack_delete_after
def test_resume_fail(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_resume')
exc = exception.ResourceFailure(Exception('foo'))
generic_rsrc.GenericResource.handle_resume().AndRaise(exc)
self.m.ReplayAll()
self.stack = parser.Stack(self.ctx, 'resume_test_fail',
parser.Template(tmpl))
stack_id = self.stack.store()
self.stack.create()
self.assertEqual(self.stack.state,
(self.stack.CREATE, self.stack.COMPLETE))
self.stack.suspend()
self.assertEqual(self.stack.state,
(self.stack.SUSPEND, self.stack.COMPLETE))
self.stack.resume()
self.assertEqual(self.stack.state,
(self.stack.RESUME, self.stack.FAILED))
self.assertEqual(self.stack.status_reason,
'Resource resume failed: Exception: foo')
self.m.VerifyAll()
@stack_delete_after
def test_suspend_timeout(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
@ -660,6 +695,35 @@ class StackTest(HeatTestCase):
self.assertEqual(self.stack.status_reason, 'Suspend timed out')
self.m.VerifyAll()
@stack_delete_after
def test_resume_timeout(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_resume')
exc = scheduler.Timeout('foo', 0)
generic_rsrc.GenericResource.handle_resume().AndRaise(exc)
self.m.ReplayAll()
self.stack = parser.Stack(self.ctx, 'resume_test_fail_timeout',
parser.Template(tmpl))
stack_id = self.stack.store()
self.stack.create()
self.assertEqual(self.stack.state,
(self.stack.CREATE, self.stack.COMPLETE))
self.stack.suspend()
self.assertEqual(self.stack.state,
(self.stack.SUSPEND, self.stack.COMPLETE))
self.stack.resume()
self.assertEqual(self.stack.state,
(self.stack.RESUME, self.stack.FAILED))
self.assertEqual(self.stack.status_reason, 'Resume timed out')
self.m.VerifyAll()
@stack_delete_after
def test_delete_rollback(self):
self.stack = parser.Stack(self.ctx, 'delete_rollback_test',

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from eventlet.support import greenlets as greenlet
from heat.common import context
@ -387,7 +388,7 @@ class ResourceTest(HeatTestCase):
self.assertEqual((res.UPDATE, res.FAILED), res.state)
self.m.VerifyAll()
def test_suspend_ok(self):
def test_suspend_resume_ok(self):
# patch in a dummy property schema for GenericResource
dummy_schema = {'Foo': {'Type': 'String'}}
generic_rsrc.GenericResource.properties_schema = dummy_schema
@ -400,6 +401,8 @@ class ResourceTest(HeatTestCase):
self.assertEqual((res.CREATE, res.COMPLETE), res.state)
scheduler.TaskRunner(res.suspend)()
self.assertEqual((res.SUSPEND, res.COMPLETE), res.state)
scheduler.TaskRunner(res.resume)()
self.assertEqual((res.RESUME, res.COMPLETE), res.state)
def test_suspend_fail_inprogress(self):
# patch in a dummy property schema for GenericResource
@ -423,6 +426,24 @@ class ResourceTest(HeatTestCase):
suspend = scheduler.TaskRunner(res.suspend)
self.assertRaises(exception.ResourceFailure, suspend)
def test_resume_fail_not_suspend_complete(self):
# patch in a dummy property schema for GenericResource
dummy_schema = {'Foo': {'Type': 'String'}}
generic_rsrc.GenericResource.properties_schema = dummy_schema
tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
scheduler.TaskRunner(res.create)()
self.assertEqual((res.CREATE, res.COMPLETE), res.state)
non_suspended_states = [s for s in
itertools.product(res.ACTIONS, res.STATUSES)
if s != (res.SUSPEND, res.COMPLETE)]
for state in non_suspended_states:
res.state_set(*state)
resume = scheduler.TaskRunner(res.resume)
self.assertRaises(exception.ResourceFailure, resume)
def test_suspend_fail_exit(self):
# patch in a dummy property schema for GenericResource
dummy_schema = {'Foo': {'Type': 'String'}}
@ -442,6 +463,27 @@ class ResourceTest(HeatTestCase):
self.assertRaises(greenlet.GreenletExit, suspend)
self.assertEqual((res.SUSPEND, res.FAILED), res.state)
def test_resume_fail_exit(self):
# patch in a dummy property schema for GenericResource
dummy_schema = {'Foo': {'Type': 'String'}}
generic_rsrc.GenericResource.properties_schema = dummy_schema
tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
scheduler.TaskRunner(res.create)()
self.assertEqual((res.CREATE, res.COMPLETE), res.state)
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_resume')
generic_rsrc.GenericResource.handle_resume().AndRaise(
greenlet.GreenletExit())
self.m.ReplayAll()
res.state_set(res.SUSPEND, res.COMPLETE)
resume = scheduler.TaskRunner(res.resume)
self.assertRaises(greenlet.GreenletExit, resume)
self.assertEqual((res.RESUME, res.FAILED), res.state)
def test_suspend_fail_exception(self):
# patch in a dummy property schema for GenericResource
dummy_schema = {'Foo': {'Type': 'String'}}
@ -460,6 +502,26 @@ class ResourceTest(HeatTestCase):
self.assertRaises(exception.ResourceFailure, suspend)
self.assertEqual((res.SUSPEND, res.FAILED), res.state)
def test_resume_fail_exception(self):
# patch in a dummy property schema for GenericResource
dummy_schema = {'Foo': {'Type': 'String'}}
generic_rsrc.GenericResource.properties_schema = dummy_schema
tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
scheduler.TaskRunner(res.create)()
self.assertEqual((res.CREATE, res.COMPLETE), res.state)
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_resume')
generic_rsrc.GenericResource.handle_resume().AndRaise(Exception())
self.m.ReplayAll()
res.state_set(res.SUSPEND, res.COMPLETE)
resume = scheduler.TaskRunner(res.resume)
self.assertRaises(exception.ResourceFailure, resume)
self.assertEqual((res.RESUME, res.FAILED), res.state)
class MetadataTest(HeatTestCase):
def setUp(self):

View File

@ -145,6 +145,10 @@ class EngineRpcAPITestCase(testtools.TestCase):
self._test_engine_api('stack_suspend', 'call',
stack_identity=self.identity)
def test_stack_resume(self):
self._test_engine_api('stack_resume', 'call',
stack_identity=self.identity)
def test_metadata_update(self):
self._test_engine_api('metadata_update', 'call',
stack_identity=self.identity,