Add an API for passing a signal through to a resource

Reuse the /waitcondition functionality to provide a
/signal pathway.
a signal could be a waitcondition or an alarm.

blueprint watch-ceilometer
Change-Id: Ia9da222b8072ee0dc65bf137e7d3eb7fa4d944f5
This commit is contained in:
Angus Salkeld 2013-07-25 10:36:50 +10:00
parent eade86a674
commit c8dae557b3
10 changed files with 400 additions and 12 deletions

View File

@ -18,7 +18,7 @@ import routes
from webob import Request
from heat.api.cfn.v1 import stacks
from heat.api.cfn.v1 import waitcondition
from heat.api.cfn.v1 import signal
from heat.common import wsgi
from heat.openstack.common import log as logging
@ -71,13 +71,18 @@ class API(wsgi.Router):
mapper.connect("/", controller=stacks_resource, action="index")
# Add controller which handles waitcondition notifications
# Add controller which handles signals on resources like:
# waitconditions and alarms.
# This is not part of the main CFN API spec, hence handle it
# separately via a different path
waitcondition_controller = waitcondition.create_resource(conf)
signal_controller = signal.create_resource(conf)
mapper.connect('/waitcondition/{arn:.*}',
controller=waitcondition_controller,
controller=signal_controller,
action='update_waitcondition',
conditions=dict(method=['PUT']))
mapper.connect('/signal/{arn:.*}',
controller=signal_controller,
action='signal',
conditions=dict(method=['POST']))
super(API, self).__init__(mapper)

View File

@ -20,7 +20,7 @@ from heat.api.aws import exception
import heat.openstack.common.rpc.common as rpc_common
class WaitConditionController(object):
class SignalController(object):
def __init__(self, options):
self.options = options
self.engine = rpc_client.EngineClient()
@ -39,10 +39,22 @@ class WaitConditionController(object):
return {'resource': identity.resource_name, 'metadata': md}
def signal(self, req, body, arn):
con = req.context
identity = identifier.ResourceIdentifier.from_arn(arn)
try:
md = self.engine.resource_signal(
con,
stack_identity=dict(identity.stack()),
resource_name=identity.resource_name,
details=body)
except rpc_common.RemoteError as ex:
return exception.map_remote_error(ex)
def create_resource(options):
"""
Stacks resource factory method.
Signal resource factory method.
"""
deserializer = wsgi.JSONRequestDeserializer()
return wsgi.Resource(WaitConditionController(options), deserializer)
return wsgi.Resource(SignalController(options), deserializer)

View File

@ -676,6 +676,29 @@ class Resource(object):
'''
return base64.b64encode(data)
def signal(self, details=None):
'''
signal the resource. Subclasses should provide a handle_signal() method
to implement the signal, the base-class raise an exception if no
handler is implemented.
'''
if self.action in (self.SUSPEND, self.DELETE):
raise exception.ResourceFailure(Exception(
'Can not send a signal to a Resource whilst actioning a %s' %
self.action))
if not callable(getattr(self, 'handle_signal', None)):
raise exception.ResourceFailure(Exception(
'Resource %s is not able to receive a signal' % str(self)))
try:
self._add_event('signal', self.status, details)
self.handle_signal(details)
except Exception as ex:
logger.exception('signal %s : %s' % (str(self), str(ex)))
failure = exception.ResourceFailure(ex)
raise failure
def handle_update(self, json_snippet=None, tmpl_diff=None, prop_diff=None):
raise UpdateReplace(self.name)

View File

@ -465,6 +465,29 @@ class EngineService(service.Service):
return api.format_stack_resource(stack[resource_name])
@request_context
def resource_signal(self, cnxt, stack_identity, resource_name, details):
s = self._get_stack(cnxt, stack_identity)
# This is not "nice" converting to the stored context here,
# but this happens because the keystone user associated with the
# signal doesn't have permission to read the secret key of
# the user associated with the cfn-credentials file
user_creds = db_api.user_creds_get(s.user_creds_id)
stack_context = context.RequestContext.from_dict(user_creds)
stack = parser.Stack.load(stack_context, stack=s)
if resource_name not in stack:
raise exception.ResourceNotFound(resource_name=resource_name,
stack_name=stack.name)
resource = stack[resource_name]
if resource.id is None:
raise exception.ResourceNotAvailable(resource_name=resource_name)
if callable(stack[resource_name].signal):
stack[resource_name].signal(details)
@request_context
def find_physical_resource(self, cnxt, physical_resource_id):
"""

View File

@ -31,6 +31,8 @@ SIGNAL_TYPES = (
) = (
'/waitcondition', '/signal'
)
SIGNAL_VERB = {WAITCONDITION: 'PUT',
SIGNAL: 'POST'}
class SignalResponder(resource.Resource):
@ -75,7 +77,7 @@ class SignalResponder(resource.Resource):
# ensure the actual URL contains the quoted version...
unquoted_path = urllib.unquote(host_url.path + path)
request = {'host': host_url.netloc.lower(),
'verb': 'PUT',
'verb': SIGNAL_VERB[signal_type],
'path': unquoted_path,
'params': {'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',

View File

@ -230,6 +230,19 @@ class EngineClient(heat.openstack.common.rpc.proxy.RpcProxy):
resource_name=resource_name,
metadata=metadata))
def resource_signal(self, ctxt, stack_identity, resource_name, details):
"""
Generate an alarm on the resource.
:param ctxt: RPC context.
:param stack_identity: Name of the stack.
:param resource_name: the Resource.
:param details: the details of the signal.
"""
return self.call(ctxt, self.make_msg('resource_signal',
stack_identity=stack_identity,
resource_name=resource_name,
details=details))
def create_watch_data(self, ctxt, watch_name, stats_data):
'''
This could be used by CloudWatch and WaitConditions

View File

@ -13,6 +13,7 @@
# under the License.
from heat.engine import resource
from heat.engine import signal_responder
from heat.openstack.common import log as logging
@ -50,3 +51,16 @@ class ResourceWithProps(GenericResource):
class ResourceWithRequiredProps(GenericResource):
properties_schema = {'Foo': {'Type': 'String',
'Required': True}}
class SignalResource(signal_responder.SignalResponder):
properties_schema = {}
attributes_schema = {'AlarmUrl': 'Get a signed webhook'}
def handle_signal(self, details=None):
logger.warning('Signaled resource (Type "%s") %s' % (self.type(),
details))
def _resolve_attribute(self, name):
if name == 'AlarmUrl' and self.resource_id is not None:
return unicode(self._get_signed_url())

View File

@ -33,6 +33,7 @@ from heat.engine import parser
from heat.engine import service
from heat.engine.properties import Properties
from heat.engine.resources import instance as instances
from heat.engine import resource as rsrs
from heat.engine import watchrule
from heat.openstack.common import threadgroup
from heat.tests.common import HeatTestCase
@ -89,6 +90,24 @@ alarm_template = '''
}
'''
policy_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "alarming",
"Resources" : {
"WebServerScaleDownPolicy" : {
"Type" : "AWS::AutoScaling::ScalingPolicy",
"Properties" : {
"AdjustmentType" : "ChangeInCapacity",
"AutoScalingGroupName" : "",
"Cooldown" : "60",
"ScalingAdjustment" : "-1"
}
}
}
}
'''
def create_context(mocks, user='stacks_test_user',
tenant_id='test_admin', password='stacks_test_password'):
@ -110,8 +129,8 @@ def get_wordpress_stack(stack_name, ctx):
return stack
def get_alarm_stack(stack_name, ctx):
t = template_format.parse(alarm_template)
def get_stack(stack_name, ctx, template):
t = template_format.parse(template)
template = parser.Template(t)
stack = parser.Stack(ctx, stack_name, template)
return stack
@ -1072,6 +1091,64 @@ class StackServiceTest(HeatTestCase):
self.m.VerifyAll()
def test_signal_reception(self):
stack = get_stack('signal_reception',
self.ctx,
policy_template)
self.stack = stack
self.m.ReplayAll()
stack.store()
stack.create()
test_data = {'food': 'yum'}
self.m.StubOutWithMock(service.EngineService, '_get_stack')
s = db_api.stack_get(self.ctx, self.stack.id)
service.EngineService._get_stack(self.ctx,
self.stack.identifier()).AndReturn(s)
self.m.StubOutWithMock(db_api, 'user_creds_get')
db_api.user_creds_get(mox.IgnoreArg()).MultipleTimes().AndReturn(
self.ctx.to_dict())
self.m.StubOutWithMock(rsrs.Resource, 'signal')
rsrs.Resource.signal(mox.IgnoreArg()).AndReturn(None)
self.m.ReplayAll()
result = self.eng.resource_signal(self.ctx,
dict(self.stack.identifier()),
'WebServerScaleDownPolicy',
test_data)
self.m.VerifyAll()
self.stack.delete()
def test_signal_reception_no_resource(self):
stack = get_stack('signal_reception_no_resource',
self.ctx,
policy_template)
self.stack = stack
self.m.ReplayAll()
stack.store()
stack.create()
test_data = {'food': 'yum'}
self.m.StubOutWithMock(service.EngineService, '_get_stack')
s = db_api.stack_get(self.ctx, self.stack.id)
service.EngineService._get_stack(self.ctx,
self.stack.identifier()).AndReturn(s)
self.m.StubOutWithMock(db_api, 'user_creds_get')
db_api.user_creds_get(mox.IgnoreArg()).MultipleTimes().AndReturn(
self.ctx.to_dict())
self.m.ReplayAll()
self.assertRaises(exception.ResourceNotFound,
self.eng.resource_signal, self.ctx,
dict(self.stack.identifier()),
'resource_does_not_exist',
test_data)
self.m.VerifyAll()
self.stack.delete()
@stack_context('service_metadata_test_stack')
def test_metadata(self):
test_metadata = {'foo': 'bar', 'baz': 'quux', 'blarg': 'wibble'}
@ -1136,8 +1213,9 @@ class StackServiceTest(HeatTestCase):
self.assertEqual([], self.eng.stg[self.stack.id].threads)
def test_periodic_watch_task_created(self):
stack = get_alarm_stack('period_watch_task_created',
create_context(self.m))
stack = get_stack('period_watch_task_created',
create_context(self.m),
alarm_template)
self.stack = stack
self.m.ReplayAll()
stack.store()

View File

@ -153,6 +153,12 @@ class EngineRpcAPITestCase(testtools.TestCase):
resource_name='LogicalResourceId',
metadata={u'wordpress': []})
def test_resource_signal(self):
self._test_engine_api('resource_signal', 'call',
stack_identity=self.identity,
resource_name='LogicalResourceId',
details={u'wordpress': []})
def test_create_watch_data(self):
self._test_engine_api('create_watch_data', 'call',
watch_name='watch1',

212
heat/tests/test_signal.py Normal file
View File

@ -0,0 +1,212 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
from oslo.config import cfg
from heat.tests import generic_resource
from heat.tests import fakes
from heat.tests.common import HeatTestCase
from heat.tests.utils import stack_delete_after
from heat.tests.utils import setup_dummy_db
from heat.common import context
from heat.common import exception
from heat.common import template_format
from heat.engine import parser
from heat.engine import resource
from heat.engine import signal_responder as sr
test_template_signal = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "Just a test.",
"Parameters" : {},
"Resources" : {
"signal_handler" : {"Type" : "SignalResourceType"},
"resource_X" : {"Type" : "GenericResourceType"}
},
"Outputs": {
"signed_url": {"Fn::GetAtt": ["signal_handler", "AlarmUrl"]}
}
}
'''
class UUIDStub(object):
def __init__(self, value):
self.value = value
def __enter__(self):
self.uuid4 = uuid.uuid4
uuid_stub = lambda: self.value
uuid.uuid4 = uuid_stub
def __exit__(self, *exc_info):
uuid.uuid4 = self.uuid4
class SignalTest(HeatTestCase):
def setUp(self):
super(SignalTest, self).setUp()
setup_dummy_db()
resource._register_class('SignalResourceType',
generic_resource.SignalResource)
resource._register_class('GenericResourceType',
generic_resource.GenericResource)
cfg.CONF.set_default('heat_waitcondition_server_url',
'http://127.0.0.1:8000/v1/waitcondition')
self.stack_id = 'STACKABCD1234'
self.fc = fakes.FakeKeystoneClient()
# Note tests creating a stack should be decorated with @stack_delete_after
# to ensure the stack is properly cleaned up
def create_stack(self, stack_name='test_stack', stub=True):
temp = template_format.parse(test_template_signal)
template = parser.Template(temp)
ctx = context.get_admin_context()
ctx.tenant_id = 'test_tenant'
stack = parser.Stack(ctx, stack_name, template,
disable_rollback=True)
# Stub out the stack ID so we have a known value
with UUIDStub(self.stack_id):
stack.store()
if stub:
self.m.StubOutWithMock(sr.SignalResponder, 'keystone')
sr.SignalResponder.keystone().MultipleTimes().AndReturn(
self.fc)
return stack
@stack_delete_after
def test_FnGetAtt_Alarm_Url(self):
self.stack = self.create_stack()
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack.resources['signal_handler']
created_time = datetime.datetime(2012, 11, 29, 13, 49, 37)
rsrc.created_time = created_time
self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
expected_url = "".join([
'http://127.0.0.1:8000/v1/signal/',
'arn%3Aopenstack%3Aheat%3A%3Atest_tenant%3Astacks%2F',
'test_stack%2FSTACKABCD1234%2Fresources%2F',
'signal_handler?',
'Timestamp=2012-11-29T13%3A49%3A37Z&',
'SignatureMethod=HmacSHA256&',
'AWSAccessKeyId=4567&',
'SignatureVersion=2&',
'Signature=',
'MJIFh7LKCpVlK6pCxe2WfYrRsfO7FU3Wt%2BzQFo2rYSY%3D'])
self.assertEqual(expected_url, rsrc.FnGetAtt('AlarmUrl'))
self.m.VerifyAll()
@stack_delete_after
def test_signal(self):
test_d = {'Data': 'foo', 'Reason': 'bar',
'Status': 'SUCCESS', 'UniqueId': '123'}
self.stack = self.create_stack()
# to confirm we get a call to handle_signal
self.m.StubOutWithMock(generic_resource.SignalResource,
'handle_signal')
generic_resource.SignalResource.handle_signal(test_d).AndReturn(None)
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack.resources['signal_handler']
self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
rsrc.signal(details=test_d)
self.m.VerifyAll()
@stack_delete_after
def test_signal_wrong_resource(self):
# assert that we get the correct exception when calling a
# resource.signal() that does not have a handle_signal()
self.stack = self.create_stack()
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack.resources['resource_X']
self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
err_metadata = {'Data': 'foo', 'Status': 'SUCCESS', 'UniqueId': '123'}
self.assertRaises(exception.ResourceFailure, rsrc.signal,
details=err_metadata)
self.m.VerifyAll()
@stack_delete_after
def test_signal_reception_wrong_state(self):
# assert that we get the correct exception when calling a
# resource.signal() that is in having a destructive action.
self.stack = self.create_stack()
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack.resources['signal_handler']
self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
# manually override the action to DELETE
rsrc.action = rsrc.DELETE
err_metadata = {'Data': 'foo', 'Status': 'SUCCESS', 'UniqueId': '123'}
self.assertRaises(exception.ResourceFailure, rsrc.signal,
details=err_metadata)
self.m.VerifyAll()
@stack_delete_after
def test_signal_reception_failed_call(self):
# assert that we get the correct exception from resource.signal()
# when resource.handle_signal() raises an exception.
self.stack = self.create_stack()
test_d = {'Data': 'foo', 'Reason': 'bar',
'Status': 'SUCCESS', 'UniqueId': '123'}
# to confirm we get a call to handle_signal
self.m.StubOutWithMock(generic_resource.SignalResource,
'handle_signal')
generic_resource.SignalResource.handle_signal(test_d).AndRaise(
ValueError)
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack.resources['signal_handler']
self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
self.assertRaises(exception.ResourceFailure,
rsrc.signal, details=test_d)
self.m.VerifyAll()