diff --git a/heat/api/cfn/v1/__init__.py b/heat/api/cfn/v1/__init__.py index 2654e0742f..d00d7f1302 100644 --- a/heat/api/cfn/v1/__init__.py +++ b/heat/api/cfn/v1/__init__.py @@ -18,7 +18,7 @@ import routes from webob import Request from heat.api.cfn.v1 import stacks -from heat.api.cfn.v1 import waitcondition +from heat.api.cfn.v1 import signal from heat.common import wsgi from heat.openstack.common import log as logging @@ -71,13 +71,18 @@ class API(wsgi.Router): mapper.connect("/", controller=stacks_resource, action="index") - # Add controller which handles waitcondition notifications + # Add controller which handles signals on resources like: + # waitconditions and alarms. # This is not part of the main CFN API spec, hence handle it # separately via a different path - waitcondition_controller = waitcondition.create_resource(conf) + signal_controller = signal.create_resource(conf) mapper.connect('/waitcondition/{arn:.*}', - controller=waitcondition_controller, + controller=signal_controller, action='update_waitcondition', conditions=dict(method=['PUT'])) + mapper.connect('/signal/{arn:.*}', + controller=signal_controller, + action='signal', + conditions=dict(method=['POST'])) super(API, self).__init__(mapper) diff --git a/heat/api/cfn/v1/waitcondition.py b/heat/api/cfn/v1/signal.py similarity index 72% rename from heat/api/cfn/v1/waitcondition.py rename to heat/api/cfn/v1/signal.py index 1be0d6c45a..47ad866be0 100644 --- a/heat/api/cfn/v1/waitcondition.py +++ b/heat/api/cfn/v1/signal.py @@ -20,7 +20,7 @@ from heat.api.aws import exception import heat.openstack.common.rpc.common as rpc_common -class WaitConditionController(object): +class SignalController(object): def __init__(self, options): self.options = options self.engine = rpc_client.EngineClient() @@ -39,10 +39,22 @@ class WaitConditionController(object): return {'resource': identity.resource_name, 'metadata': md} + def signal(self, req, body, arn): + con = req.context + identity = identifier.ResourceIdentifier.from_arn(arn) + try: + md = self.engine.resource_signal( + con, + stack_identity=dict(identity.stack()), + resource_name=identity.resource_name, + details=body) + except rpc_common.RemoteError as ex: + return exception.map_remote_error(ex) + def create_resource(options): """ - Stacks resource factory method. + Signal resource factory method. """ deserializer = wsgi.JSONRequestDeserializer() - return wsgi.Resource(WaitConditionController(options), deserializer) + return wsgi.Resource(SignalController(options), deserializer) diff --git a/heat/engine/resource.py b/heat/engine/resource.py index 59f5ef0501..8f41392de2 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -676,6 +676,29 @@ class Resource(object): ''' return base64.b64encode(data) + def signal(self, details=None): + ''' + signal the resource. Subclasses should provide a handle_signal() method + to implement the signal, the base-class raise an exception if no + handler is implemented. + ''' + if self.action in (self.SUSPEND, self.DELETE): + raise exception.ResourceFailure(Exception( + 'Can not send a signal to a Resource whilst actioning a %s' % + self.action)) + + if not callable(getattr(self, 'handle_signal', None)): + raise exception.ResourceFailure(Exception( + 'Resource %s is not able to receive a signal' % str(self))) + + try: + self._add_event('signal', self.status, details) + self.handle_signal(details) + except Exception as ex: + logger.exception('signal %s : %s' % (str(self), str(ex))) + failure = exception.ResourceFailure(ex) + raise failure + def handle_update(self, json_snippet=None, tmpl_diff=None, prop_diff=None): raise UpdateReplace(self.name) diff --git a/heat/engine/service.py b/heat/engine/service.py index 66797fceec..e500f42e11 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -465,6 +465,29 @@ class EngineService(service.Service): return api.format_stack_resource(stack[resource_name]) + @request_context + def resource_signal(self, cnxt, stack_identity, resource_name, details): + s = self._get_stack(cnxt, stack_identity) + + # This is not "nice" converting to the stored context here, + # but this happens because the keystone user associated with the + # signal doesn't have permission to read the secret key of + # the user associated with the cfn-credentials file + user_creds = db_api.user_creds_get(s.user_creds_id) + stack_context = context.RequestContext.from_dict(user_creds) + stack = parser.Stack.load(stack_context, stack=s) + + if resource_name not in stack: + raise exception.ResourceNotFound(resource_name=resource_name, + stack_name=stack.name) + + resource = stack[resource_name] + if resource.id is None: + raise exception.ResourceNotAvailable(resource_name=resource_name) + + if callable(stack[resource_name].signal): + stack[resource_name].signal(details) + @request_context def find_physical_resource(self, cnxt, physical_resource_id): """ diff --git a/heat/engine/signal_responder.py b/heat/engine/signal_responder.py index 10a28457e4..2bf08a18fa 100644 --- a/heat/engine/signal_responder.py +++ b/heat/engine/signal_responder.py @@ -31,6 +31,8 @@ SIGNAL_TYPES = ( ) = ( '/waitcondition', '/signal' ) +SIGNAL_VERB = {WAITCONDITION: 'PUT', + SIGNAL: 'POST'} class SignalResponder(resource.Resource): @@ -75,7 +77,7 @@ class SignalResponder(resource.Resource): # ensure the actual URL contains the quoted version... unquoted_path = urllib.unquote(host_url.path + path) request = {'host': host_url.netloc.lower(), - 'verb': 'PUT', + 'verb': SIGNAL_VERB[signal_type], 'path': unquoted_path, 'params': {'SignatureMethod': 'HmacSHA256', 'SignatureVersion': '2', diff --git a/heat/rpc/client.py b/heat/rpc/client.py index 214ce0ce2e..dba3c4ff87 100644 --- a/heat/rpc/client.py +++ b/heat/rpc/client.py @@ -230,6 +230,19 @@ class EngineClient(heat.openstack.common.rpc.proxy.RpcProxy): resource_name=resource_name, metadata=metadata)) + def resource_signal(self, ctxt, stack_identity, resource_name, details): + """ + Generate an alarm on the resource. + :param ctxt: RPC context. + :param stack_identity: Name of the stack. + :param resource_name: the Resource. + :param details: the details of the signal. + """ + return self.call(ctxt, self.make_msg('resource_signal', + stack_identity=stack_identity, + resource_name=resource_name, + details=details)) + def create_watch_data(self, ctxt, watch_name, stats_data): ''' This could be used by CloudWatch and WaitConditions diff --git a/heat/tests/generic_resource.py b/heat/tests/generic_resource.py index 26a7eec60f..d519e6fe24 100644 --- a/heat/tests/generic_resource.py +++ b/heat/tests/generic_resource.py @@ -13,6 +13,7 @@ # under the License. from heat.engine import resource +from heat.engine import signal_responder from heat.openstack.common import log as logging @@ -50,3 +51,16 @@ class ResourceWithProps(GenericResource): class ResourceWithRequiredProps(GenericResource): properties_schema = {'Foo': {'Type': 'String', 'Required': True}} + + +class SignalResource(signal_responder.SignalResponder): + properties_schema = {} + attributes_schema = {'AlarmUrl': 'Get a signed webhook'} + + def handle_signal(self, details=None): + logger.warning('Signaled resource (Type "%s") %s' % (self.type(), + details)) + + def _resolve_attribute(self, name): + if name == 'AlarmUrl' and self.resource_id is not None: + return unicode(self._get_signed_url()) diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 0ef899815d..a32f29436d 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -33,6 +33,7 @@ from heat.engine import parser from heat.engine import service from heat.engine.properties import Properties from heat.engine.resources import instance as instances +from heat.engine import resource as rsrs from heat.engine import watchrule from heat.openstack.common import threadgroup from heat.tests.common import HeatTestCase @@ -89,6 +90,24 @@ alarm_template = ''' } ''' +policy_template = ''' +{ + "AWSTemplateFormatVersion" : "2010-09-09", + "Description" : "alarming", + "Resources" : { + "WebServerScaleDownPolicy" : { + "Type" : "AWS::AutoScaling::ScalingPolicy", + "Properties" : { + "AdjustmentType" : "ChangeInCapacity", + "AutoScalingGroupName" : "", + "Cooldown" : "60", + "ScalingAdjustment" : "-1" + } + } + } +} +''' + def create_context(mocks, user='stacks_test_user', tenant_id='test_admin', password='stacks_test_password'): @@ -110,8 +129,8 @@ def get_wordpress_stack(stack_name, ctx): return stack -def get_alarm_stack(stack_name, ctx): - t = template_format.parse(alarm_template) +def get_stack(stack_name, ctx, template): + t = template_format.parse(template) template = parser.Template(t) stack = parser.Stack(ctx, stack_name, template) return stack @@ -1072,6 +1091,64 @@ class StackServiceTest(HeatTestCase): self.m.VerifyAll() + def test_signal_reception(self): + stack = get_stack('signal_reception', + self.ctx, + policy_template) + self.stack = stack + self.m.ReplayAll() + stack.store() + stack.create() + test_data = {'food': 'yum'} + + self.m.StubOutWithMock(service.EngineService, '_get_stack') + s = db_api.stack_get(self.ctx, self.stack.id) + service.EngineService._get_stack(self.ctx, + self.stack.identifier()).AndReturn(s) + + self.m.StubOutWithMock(db_api, 'user_creds_get') + db_api.user_creds_get(mox.IgnoreArg()).MultipleTimes().AndReturn( + self.ctx.to_dict()) + + self.m.StubOutWithMock(rsrs.Resource, 'signal') + rsrs.Resource.signal(mox.IgnoreArg()).AndReturn(None) + self.m.ReplayAll() + + result = self.eng.resource_signal(self.ctx, + dict(self.stack.identifier()), + 'WebServerScaleDownPolicy', + test_data) + self.m.VerifyAll() + self.stack.delete() + + def test_signal_reception_no_resource(self): + stack = get_stack('signal_reception_no_resource', + self.ctx, + policy_template) + self.stack = stack + self.m.ReplayAll() + stack.store() + stack.create() + test_data = {'food': 'yum'} + + self.m.StubOutWithMock(service.EngineService, '_get_stack') + s = db_api.stack_get(self.ctx, self.stack.id) + service.EngineService._get_stack(self.ctx, + self.stack.identifier()).AndReturn(s) + + self.m.StubOutWithMock(db_api, 'user_creds_get') + db_api.user_creds_get(mox.IgnoreArg()).MultipleTimes().AndReturn( + self.ctx.to_dict()) + self.m.ReplayAll() + + self.assertRaises(exception.ResourceNotFound, + self.eng.resource_signal, self.ctx, + dict(self.stack.identifier()), + 'resource_does_not_exist', + test_data) + self.m.VerifyAll() + self.stack.delete() + @stack_context('service_metadata_test_stack') def test_metadata(self): test_metadata = {'foo': 'bar', 'baz': 'quux', 'blarg': 'wibble'} @@ -1136,8 +1213,9 @@ class StackServiceTest(HeatTestCase): self.assertEqual([], self.eng.stg[self.stack.id].threads) def test_periodic_watch_task_created(self): - stack = get_alarm_stack('period_watch_task_created', - create_context(self.m)) + stack = get_stack('period_watch_task_created', + create_context(self.m), + alarm_template) self.stack = stack self.m.ReplayAll() stack.store() diff --git a/heat/tests/test_rpc_client.py b/heat/tests/test_rpc_client.py index ccfd5238e5..3b25f7fbda 100644 --- a/heat/tests/test_rpc_client.py +++ b/heat/tests/test_rpc_client.py @@ -153,6 +153,12 @@ class EngineRpcAPITestCase(testtools.TestCase): resource_name='LogicalResourceId', metadata={u'wordpress': []}) + def test_resource_signal(self): + self._test_engine_api('resource_signal', 'call', + stack_identity=self.identity, + resource_name='LogicalResourceId', + details={u'wordpress': []}) + def test_create_watch_data(self): self._test_engine_api('create_watch_data', 'call', watch_name='watch1', diff --git a/heat/tests/test_signal.py b/heat/tests/test_signal.py new file mode 100644 index 0000000000..0fd73eef26 --- /dev/null +++ b/heat/tests/test_signal.py @@ -0,0 +1,212 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from oslo.config import cfg + +from heat.tests import generic_resource +from heat.tests import fakes +from heat.tests.common import HeatTestCase +from heat.tests.utils import stack_delete_after +from heat.tests.utils import setup_dummy_db + +from heat.common import context +from heat.common import exception +from heat.common import template_format + +from heat.engine import parser +from heat.engine import resource +from heat.engine import signal_responder as sr + + +test_template_signal = ''' +{ + "AWSTemplateFormatVersion" : "2010-09-09", + "Description" : "Just a test.", + "Parameters" : {}, + "Resources" : { + "signal_handler" : {"Type" : "SignalResourceType"}, + "resource_X" : {"Type" : "GenericResourceType"} + }, + "Outputs": { + "signed_url": {"Fn::GetAtt": ["signal_handler", "AlarmUrl"]} + } +} +''' + + +class UUIDStub(object): + def __init__(self, value): + self.value = value + + def __enter__(self): + self.uuid4 = uuid.uuid4 + uuid_stub = lambda: self.value + uuid.uuid4 = uuid_stub + + def __exit__(self, *exc_info): + uuid.uuid4 = self.uuid4 + + +class SignalTest(HeatTestCase): + + def setUp(self): + super(SignalTest, self).setUp() + setup_dummy_db() + + resource._register_class('SignalResourceType', + generic_resource.SignalResource) + resource._register_class('GenericResourceType', + generic_resource.GenericResource) + + cfg.CONF.set_default('heat_waitcondition_server_url', + 'http://127.0.0.1:8000/v1/waitcondition') + + self.stack_id = 'STACKABCD1234' + self.fc = fakes.FakeKeystoneClient() + + # Note tests creating a stack should be decorated with @stack_delete_after + # to ensure the stack is properly cleaned up + def create_stack(self, stack_name='test_stack', stub=True): + temp = template_format.parse(test_template_signal) + template = parser.Template(temp) + ctx = context.get_admin_context() + ctx.tenant_id = 'test_tenant' + stack = parser.Stack(ctx, stack_name, template, + disable_rollback=True) + + # Stub out the stack ID so we have a known value + with UUIDStub(self.stack_id): + stack.store() + + if stub: + self.m.StubOutWithMock(sr.SignalResponder, 'keystone') + sr.SignalResponder.keystone().MultipleTimes().AndReturn( + self.fc) + return stack + + @stack_delete_after + def test_FnGetAtt_Alarm_Url(self): + self.stack = self.create_stack() + + self.m.ReplayAll() + self.stack.create() + + rsrc = self.stack.resources['signal_handler'] + created_time = datetime.datetime(2012, 11, 29, 13, 49, 37) + rsrc.created_time = created_time + self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE)) + + expected_url = "".join([ + 'http://127.0.0.1:8000/v1/signal/', + 'arn%3Aopenstack%3Aheat%3A%3Atest_tenant%3Astacks%2F', + 'test_stack%2FSTACKABCD1234%2Fresources%2F', + 'signal_handler?', + 'Timestamp=2012-11-29T13%3A49%3A37Z&', + 'SignatureMethod=HmacSHA256&', + 'AWSAccessKeyId=4567&', + 'SignatureVersion=2&', + 'Signature=', + 'MJIFh7LKCpVlK6pCxe2WfYrRsfO7FU3Wt%2BzQFo2rYSY%3D']) + + self.assertEqual(expected_url, rsrc.FnGetAtt('AlarmUrl')) + self.m.VerifyAll() + + @stack_delete_after + def test_signal(self): + test_d = {'Data': 'foo', 'Reason': 'bar', + 'Status': 'SUCCESS', 'UniqueId': '123'} + + self.stack = self.create_stack() + + # to confirm we get a call to handle_signal + self.m.StubOutWithMock(generic_resource.SignalResource, + 'handle_signal') + generic_resource.SignalResource.handle_signal(test_d).AndReturn(None) + + self.m.ReplayAll() + self.stack.create() + + rsrc = self.stack.resources['signal_handler'] + self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE)) + + rsrc.signal(details=test_d) + + self.m.VerifyAll() + + @stack_delete_after + def test_signal_wrong_resource(self): + # assert that we get the correct exception when calling a + # resource.signal() that does not have a handle_signal() + self.stack = self.create_stack() + + self.m.ReplayAll() + self.stack.create() + + rsrc = self.stack.resources['resource_X'] + self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE)) + + err_metadata = {'Data': 'foo', 'Status': 'SUCCESS', 'UniqueId': '123'} + self.assertRaises(exception.ResourceFailure, rsrc.signal, + details=err_metadata) + + self.m.VerifyAll() + + @stack_delete_after + def test_signal_reception_wrong_state(self): + # assert that we get the correct exception when calling a + # resource.signal() that is in having a destructive action. + self.stack = self.create_stack() + + self.m.ReplayAll() + self.stack.create() + + rsrc = self.stack.resources['signal_handler'] + self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE)) + # manually override the action to DELETE + rsrc.action = rsrc.DELETE + + err_metadata = {'Data': 'foo', 'Status': 'SUCCESS', 'UniqueId': '123'} + self.assertRaises(exception.ResourceFailure, rsrc.signal, + details=err_metadata) + + self.m.VerifyAll() + + @stack_delete_after + def test_signal_reception_failed_call(self): + # assert that we get the correct exception from resource.signal() + # when resource.handle_signal() raises an exception. + self.stack = self.create_stack() + + test_d = {'Data': 'foo', 'Reason': 'bar', + 'Status': 'SUCCESS', 'UniqueId': '123'} + + # to confirm we get a call to handle_signal + self.m.StubOutWithMock(generic_resource.SignalResource, + 'handle_signal') + generic_resource.SignalResource.handle_signal(test_d).AndRaise( + ValueError) + + self.m.ReplayAll() + self.stack.create() + + rsrc = self.stack.resources['signal_handler'] + self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE)) + + self.assertRaises(exception.ResourceFailure, + rsrc.signal, details=test_d) + + self.m.VerifyAll()