Add native WaitConditionHandle resource

Adds a native OS::HeatWaitConditionHandle resource, which works
in a similar way to the CFN compatible one, but with a few changes
to make it simpler to use:
- The data passed is validated less strictly, so we tolerate missing keys
  for any of the data (we just fill in default values)
- A signal passed with no data is assumed to mean success
- There are two convenience attributes which provide a string representing
  the necessary curl call to send the signal (to the native ReST API)

This allows a similarly simple signalling mechanism to the CFN compatible
Handle resource, but with no dependency on the ec2tokens keystone extension
or heat-api-cfn service.

Some usage examples here:
  - https://review.openstack.org/106424

blueprint: native-waitcondition
Change-Id: Ie9b5aeb13bfab5fba55c1a49d1572e0777864b29
This commit is contained in:
Steven Hardy 2014-06-25 16:18:50 +01:00
parent 85aa4fcc5a
commit adf02483dc
3 changed files with 512 additions and 56 deletions

View File

@ -12,6 +12,7 @@
# under the License.
import json
import uuid
from heat.common import exception
from heat.common import identifier
@ -26,13 +27,13 @@ from heat.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class WaitConditionHandle(signal_responder.SignalResponder):
class BaseWaitConditionHandle(signal_responder.SignalResponder):
'''
the main point of this class is to :
have no dependencies (so the instance can reference it)
generate a unique url (to be returned in the reference)
then the cfn-signal will use this url to post to and
WaitCondition will poll it to see if has been written to.
Base WaitConditionHandle resource.
The main point of this class is to :
- have no dependencies (so the instance can reference it)
- create credentials to allow for signalling from the instance.
- handle signals from the instance, validate and store result
'''
properties_schema = {}
@ -44,6 +45,181 @@ class WaitConditionHandle(signal_responder.SignalResponder):
'SUCCESS',
)
def handle_create(self):
super(BaseWaitConditionHandle, self).handle_create()
self.resource_id_set(self._get_user_id())
def _status_ok(self, status):
return status in self.WAIT_STATUSES
def _metadata_format_ok(self, metadata):
if sorted(tuple(metadata.keys())) == sorted(self.METADATA_KEYS):
return self._status_ok(metadata[self.STATUS])
def handle_signal(self, metadata=None):
if self._metadata_format_ok(metadata):
rsrc_metadata = self.metadata_get(refresh=True)
if metadata[self.UNIQUE_ID] in rsrc_metadata:
LOG.warning(_("Overwriting Metadata item for id %s!")
% metadata[self.UNIQUE_ID])
safe_metadata = {}
for k in self.METADATA_KEYS:
if k == self.UNIQUE_ID:
continue
safe_metadata[k] = metadata[k]
rsrc_metadata.update({metadata[self.UNIQUE_ID]: safe_metadata})
self.metadata_set(rsrc_metadata)
else:
LOG.error(_("Metadata failed validation for %s") % self.name)
raise ValueError(_("Metadata format invalid"))
def get_status(self):
'''
Return a list of the Status values for the handle signals
'''
return [v[self.STATUS]
for v in self.metadata_get(refresh=True).values()]
def get_status_reason(self, status):
'''
Return a list of reasons associated with a particular status
'''
return [v[self.REASON]
for v in self.metadata_get(refresh=True).values()
if v[self.STATUS] == status]
class HeatWaitConditionHandle(BaseWaitConditionHandle):
METADATA_KEYS = (
DATA, REASON, STATUS, UNIQUE_ID
) = (
'data', 'reason', 'status', 'id'
)
ATTRIBUTES = (
TOKEN,
ENDPOINT,
CURL_CLI_SUCCESS,
CURL_CLI_FAILURE,
) = (
'token',
'endpoint',
'curl_cli_success',
'curl_cli_failure',
)
attributes_schema = {
TOKEN: attributes.Schema(
_('Token for stack-user which can be used for signalling handle'),
cache_mode=attributes.Schema.CACHE_NONE
),
ENDPOINT: attributes.Schema(
_('Endpoint/url which can be used for signalling handle'),
cache_mode=attributes.Schema.CACHE_NONE
),
CURL_CLI_SUCCESS: attributes.Schema(
_('Convenience attribute, provides curl CLI command '
'which can be used for signalling handle completion'),
cache_mode=attributes.Schema.CACHE_NONE
),
CURL_CLI_FAILURE: attributes.Schema(
_('Convenience attribute, provides curl CLI command '
'which can be used for signalling handle failure'),
cache_mode=attributes.Schema.CACHE_NONE
),
}
def handle_create(self):
password = uuid.uuid4().hex
self.data_set('password', password, True)
self._create_user()
self.resource_id_set(self._get_user_id())
# FIXME(shardy): The assumption here is that token expiry > timeout
# but we probably need a check here to fail fast if that's not true
# Also need to implement an update property, such that the handle
# can be replaced on update which will replace the token
token = self._user_token()
self.data_set('token', token, True)
self.data_set('endpoint', '%s/signal' % self._get_resource_endpoint())
def _get_resource_endpoint(self):
# Get the endpoint from stack.clients then replace the context
# project_id with the path to the resource (which includes the
# context project_id), then replace the context project with
# the one needed for signalling from the stack_user_project
heat_client_plugin = self.stack.clients.client_plugin('heat')
endpoint = heat_client_plugin.get_heat_url()
rsrc_ep = endpoint.replace(self.context.tenant_id,
self.identifier().url_path())
return rsrc_ep.replace(self.context.tenant_id,
self.stack.stack_user_project_id)
def handle_delete(self):
self._delete_user()
@property
def password(self):
return self.data().get('password')
def _resolve_attribute(self, key):
if self.resource_id:
if key == self.TOKEN:
return self.data().get('token')
elif key == self.ENDPOINT:
return self.data().get('endpoint')
elif key == self.CURL_CLI_SUCCESS:
# Construct curl command for template-author convenience
return ('curl -i -X POST '
'-H \'X-Auth-Token: %(token)s\' '
'-H \'Content-Type: application/json\' '
'-H \'Accept: application/json\' '
'%(endpoint)s' %
dict(token=self.data().get('token'),
endpoint=self.data().get('endpoint')))
elif key == self.CURL_CLI_FAILURE:
return ('curl -i -X POST '
'--data-binary \'{"status": "%(status)s"}\' '
'-H \'X-Auth-Token: %(token)s\' '
'-H \'Content-Type: application/json\' '
'-H \'Accept: application/json\' '
'%(endpoint)s' %
dict(status=self.STATUS_FAILURE,
token=self.data().get('token'),
endpoint=self.data().get('endpoint')))
def handle_signal(self, details=None):
'''
Validate and update the resource metadata.
metadata is not mandatory, but if passed it must use the following
format:
{
"status" : "Status (must be SUCCESS or FAILURE)"
"data" : "Arbitrary data",
"reason" : "Reason string"
}
Optionally "id" may also be specified, but if missing the index
of the signal received will be used.
'''
rsrc_metadata = self.metadata_get(refresh=True)
signal_num = len(rsrc_metadata) + 1
reason = 'Signal %s received' % signal_num
# Tolerate missing values, default to success
metadata = details or {}
metadata.setdefault(self.REASON, reason)
metadata.setdefault(self.DATA, None)
metadata.setdefault(self.UNIQUE_ID, signal_num)
metadata.setdefault(self.STATUS, self.STATUS_SUCCESS)
super(HeatWaitConditionHandle, self).handle_signal(metadata)
class WaitConditionHandle(BaseWaitConditionHandle):
'''
the main point of this class is to :
have no dependencies (so the instance can reference it)
generate a unique url (to be returned in the reference)
then the cfn-signal will use this url to post to and
WaitCondition will poll it to see if has been written to.
'''
METADATA_KEYS = (
DATA, REASON, STATUS, UNIQUE_ID
) = (
@ -64,20 +240,6 @@ class WaitConditionHandle(signal_responder.SignalResponder):
else:
return unicode(self.name)
def _metadata_format_ok(self, metadata):
"""
Check the format of the provided metadata is as expected.
metadata must use the following format:
{
"Status" : "Status (must be SUCCESS or FAILURE)"
"UniqueId" : "Some ID, should be unique for Count>1",
"Data" : "Arbitrary Data",
"Reason" : "Reason String"
}
"""
if tuple(sorted(metadata.keys())) == self.METADATA_KEYS:
return metadata[self.STATUS] in self.WAIT_STATUSES
def metadata_update(self, new_metadata=None):
"""DEPRECATED. Should use handle_signal instead."""
self.handle_signal(details=new_metadata)
@ -85,40 +247,17 @@ class WaitConditionHandle(signal_responder.SignalResponder):
def handle_signal(self, details=None):
'''
Validate and update the resource metadata
metadata must use the following format:
{
"Status" : "Status (must be SUCCESS or FAILURE)"
"UniqueId" : "Some ID, should be unique for Count>1",
"Data" : "Arbitrary Data",
"Reason" : "Reason String"
}
'''
if details is None:
return
if self._metadata_format_ok(details):
rsrc_metadata = self.metadata_get(refresh=True)
if details[self.UNIQUE_ID] in rsrc_metadata:
LOG.warning(_("Overwriting Metadata item for UniqueId %s!")
% details[self.UNIQUE_ID])
safe_metadata = {}
for k in self.METADATA_KEYS:
if k == self.UNIQUE_ID:
continue
safe_metadata[k] = details[k]
rsrc_metadata.update({details[self.UNIQUE_ID]: safe_metadata})
self.metadata_set(rsrc_metadata)
else:
LOG.error(_("Metadata failed validation for %s") % self.name)
raise ValueError(_("Metadata format invalid"))
def get_status(self):
'''
Return a list of the Status values for the handle signals
'''
return [v[self.STATUS]
for v in self.metadata_get(refresh=True).values()]
def get_status_reason(self, status):
'''
Return a list of reasons associated with a particular status
'''
return [v[self.REASON]
for v in self.metadata_get(refresh=True).values()
if v[self.STATUS] == status]
super(WaitConditionHandle, self).handle_signal(details)
class UpdateWaitConditionHandle(WaitConditionHandle):
@ -358,6 +497,7 @@ def resource_mapping():
return {
'AWS::CloudFormation::WaitCondition': WaitCondition,
'OS::Heat::WaitCondition': HeatWaitCondition,
'OS::Heat::WaitConditionHandle': HeatWaitConditionHandle,
'AWS::CloudFormation::WaitConditionHandle': WaitConditionHandle,
'OS::Heat::UpdateWaitConditionHandle': UpdateWaitConditionHandle,
}

View File

@ -166,4 +166,4 @@ class FakeKeystoneClient(object):
pass
def stack_domain_user_token(self, username, project_id, password):
pass
return 'adomainusertoken'

View File

@ -23,6 +23,7 @@ from oslo.config import cfg
from heat.common import identifier
from heat.common import template_format
from heat.db import api as db_api
from heat.engine.clients.os import heat_plugin
from heat.engine import environment
from heat.engine import parser
from heat.engine import resource
@ -95,20 +96,51 @@ test_template_update_waitcondition = '''
'''
test_template_heat_waitcondition = '''
heat_template_version: 2013-05-23
resources:
wait_condition:
type: OS::Heat::WaitCondition
properties:
handle: {get_resource: wait_handle}
timeout: 5
wait_handle:
type: OS::Heat::WaitConditionHandle
'''
test_template_heat_waitcondition_count = '''
heat_template_version: 2013-05-23
resources:
wait_condition:
type: OS::Heat::WaitCondition
properties:
handle: {get_resource: wait_handle}
count: 3
timeout: 5
wait_handle:
type: OS::Heat::WaitConditionHandle
'''
test_template_heat_waithandle = '''
heat_template_version: 2013-05-23
resources:
wait_handle:
type: OS::Heat::WaitConditionHandle
'''
class WaitConditionTest(HeatTestCase):
def setUp(self):
super(WaitConditionTest, self).setUp()
self.m.StubOutWithMock(wc.WaitConditionHandle,
'get_status')
cfg.CONF.set_default('heat_waitcondition_server_url',
'http://server.test:8000/v1/waitcondition')
self.stub_keystoneclient()
def create_stack(self, stack_id=None,
template=test_template_waitcondition, params=None,
stub=True):
stub=True, stub_status=True):
params = params or {}
temp = template_format.parse(template)
template = parser.Template(temp)
@ -131,6 +163,10 @@ class WaitConditionTest(HeatTestCase):
self.m.StubOutWithMock(wc.WaitConditionHandle, 'identifier')
wc.WaitConditionHandle.identifier().MultipleTimes().AndReturn(id)
if stub_status:
self.m.StubOutWithMock(wc.WaitConditionHandle,
'get_status')
return stack
def test_post_success_to_handle(self):
@ -718,3 +754,283 @@ class WaitConditionUpdateTest(HeatTestCase):
wait_condition_handle = self.stack['WaitHandle']
self.assertRaises(
resource.UpdateReplace, wait_condition_handle.update, None, None)
class HeatWaitConditionTest(HeatTestCase):
def setUp(self):
super(HeatWaitConditionTest, self).setUp()
self.stub_keystoneclient()
self.tenant_id = 'test_tenant'
def create_stack(self, stack_id=None,
template=test_template_heat_waitcondition_count,
params={},
stub=True, stub_status=True):
temp = template_format.parse(template)
template = parser.Template(temp)
ctx = utils.dummy_context(tenant_id=self.tenant_id)
stack = parser.Stack(ctx, 'test_stack', template,
environment.Environment(params),
disable_rollback=True)
# Stub out the stack ID so we have a known value
if stack_id is None:
stack_id = str(uuid.uuid4())
self.stack_id = stack_id
with utils.UUIDStub(self.stack_id):
stack.store()
if stub:
id = identifier.ResourceIdentifier('test_tenant', stack.name,
stack.id, '', 'wait_handle')
self.m.StubOutWithMock(wc.HeatWaitConditionHandle, 'identifier')
wc.HeatWaitConditionHandle.identifier().MultipleTimes().AndReturn(
id)
if stub_status:
self.m.StubOutWithMock(wc.HeatWaitConditionHandle,
'get_status')
return stack
def test_post_complete_to_handle(self):
self.stack = self.create_stack()
wc.HeatWaitConditionHandle.get_status().AndReturn(['SUCCESS'])
wc.HeatWaitConditionHandle.get_status().AndReturn(['SUCCESS',
'SUCCESS'])
wc.HeatWaitConditionHandle.get_status().AndReturn(['SUCCESS',
'SUCCESS',
'SUCCESS'])
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack['wait_condition']
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE),
rsrc.state)
r = db_api.resource_get_by_name_and_stack(None, 'wait_handle',
self.stack.id)
self.assertEqual('wait_handle', r.name)
self.m.VerifyAll()
def test_post_failed_to_handle(self):
self.stack = self.create_stack()
wc.HeatWaitConditionHandle.get_status().AndReturn(['SUCCESS'])
wc.HeatWaitConditionHandle.get_status().AndReturn(['SUCCESS',
'SUCCESS'])
wc.HeatWaitConditionHandle.get_status().AndReturn(['SUCCESS',
'SUCCESS',
'FAILURE'])
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack['wait_condition']
self.assertEqual((rsrc.CREATE, rsrc.FAILED),
rsrc.state)
reason = rsrc.status_reason
self.assertTrue(reason.startswith('WaitConditionFailure:'))
r = db_api.resource_get_by_name_and_stack(None, 'wait_handle',
self.stack.id)
self.assertEqual('wait_handle', r.name)
self.m.VerifyAll()
def test_timeout(self):
st = time.time()
self.stack = self.create_stack()
# Avoid the stack create exercising the timeout code at the same time
self.m.StubOutWithMock(self.stack, 'timeout_secs')
self.stack.timeout_secs().MultipleTimes().AndReturn(None)
self.m.StubOutWithMock(scheduler, 'wallclock')
scheduler.wallclock().AndReturn(st)
scheduler.wallclock().AndReturn(st + 0.001)
scheduler.wallclock().AndReturn(st + 0.1)
wc.HeatWaitConditionHandle.get_status().AndReturn([])
scheduler.wallclock().AndReturn(st + 4.1)
wc.HeatWaitConditionHandle.get_status().AndReturn([])
scheduler.wallclock().AndReturn(st + 5.1)
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack['wait_condition']
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
reason = rsrc.status_reason
self.assertTrue(reason.startswith('WaitConditionTimeout:'))
self.m.VerifyAll()
def _create_heat_wc_and_handle(self):
self.stack = self.create_stack(
template=test_template_heat_waitcondition)
wc.HeatWaitConditionHandle.get_status().AndReturn(['SUCCESS'])
self.m.ReplayAll()
self.stack.create()
rsrc = self.stack['wait_condition']
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
wc_att = rsrc.FnGetAtt('data')
self.assertEqual(unicode({}), wc_att)
handle = self.stack['wait_handle']
self.assertEqual((handle.CREATE, handle.COMPLETE), handle.state)
return (rsrc, handle)
def test_data(self):
rsrc, handle = self._create_heat_wc_and_handle()
test_metadata = {'data': 'foo', 'reason': 'bar',
'status': 'SUCCESS', 'id': '123'}
handle.handle_signal(details=test_metadata)
wc_att = rsrc.FnGetAtt('data')
self.assertEqual('{"123": "foo"}', wc_att)
test_metadata = {'data': 'dog', 'reason': 'cat',
'status': 'SUCCESS', 'id': '456'}
handle.handle_signal(details=test_metadata)
wc_att = rsrc.FnGetAtt('data')
self.assertEqual(u'{"123": "foo", "456": "dog"}', wc_att)
self.m.VerifyAll()
def test_data_noid(self):
rsrc, handle = self._create_heat_wc_and_handle()
test_metadata = {'data': 'foo', 'reason': 'bar',
'status': 'SUCCESS'}
handle.handle_signal(details=test_metadata)
wc_att = rsrc.FnGetAtt('data')
self.assertEqual('{"1": "foo"}', wc_att)
test_metadata = {'data': 'dog', 'reason': 'cat',
'status': 'SUCCESS'}
handle.handle_signal(details=test_metadata)
wc_att = rsrc.FnGetAtt('data')
self.assertEqual(u'{"1": "foo", "2": "dog"}', wc_att)
self.m.VerifyAll()
def test_data_nodata(self):
rsrc, handle = self._create_heat_wc_and_handle()
handle.handle_signal()
wc_att = rsrc.FnGetAtt('data')
self.assertEqual('{"1": null}', wc_att)
handle.handle_signal()
wc_att = rsrc.FnGetAtt('data')
self.assertEqual(u'{"1": null, "2": null}', wc_att)
self.m.VerifyAll()
def test_data_partial_complete(self):
rsrc, handle = self._create_heat_wc_and_handle()
test_metadata = {'status': 'SUCCESS'}
handle.handle_signal(details=test_metadata)
wc_att = rsrc.FnGetAtt('data')
self.assertEqual('{"1": null}', wc_att)
test_metadata = {'status': 'SUCCESS'}
handle.handle_signal(details=test_metadata)
wc_att = rsrc.FnGetAtt('data')
self.assertEqual(u'{"1": null, "2": null}', wc_att)
self.m.VerifyAll()
def _create_heat_handle(self):
self.stack = self.create_stack(
template=test_template_heat_waithandle, stub_status=False)
self.m.ReplayAll()
self.stack.create()
handle = self.stack['wait_handle']
self.assertEqual((handle.CREATE, handle.COMPLETE), handle.state)
return handle
def test_get_status_none_complete(self):
handle = self._create_heat_handle()
handle.handle_signal()
self.assertEqual(['SUCCESS'], handle.get_status())
md_expected = {'1': {'data': None, 'reason': 'Signal 1 received',
'status': 'SUCCESS'}}
self.assertEqual(md_expected, handle.metadata_get())
self.m.VerifyAll()
def test_get_status_partial_complete(self):
handle = self._create_heat_handle()
test_metadata = {'status': 'SUCCESS'}
handle.handle_signal(details=test_metadata)
self.assertEqual(['SUCCESS'], handle.get_status())
md_expected = {'1': {'data': None, 'reason': 'Signal 1 received',
'status': 'SUCCESS'}}
self.assertEqual(md_expected, handle.metadata_get())
self.m.VerifyAll()
def test_get_status_failure(self):
handle = self._create_heat_handle()
test_metadata = {'status': 'FAILURE'}
handle.handle_signal(details=test_metadata)
self.assertEqual(['FAILURE'], handle.get_status())
md_expected = {'1': {'data': None, 'reason': 'Signal 1 received',
'status': 'FAILURE'}}
self.assertEqual(md_expected, handle.metadata_get())
self.m.VerifyAll()
def test_getatt_token(self):
handle = self._create_heat_handle()
self.assertEqual('adomainusertoken', handle.FnGetAtt('token'))
self.m.VerifyAll()
def test_getatt_endpoint(self):
self.m.StubOutWithMock(heat_plugin.HeatClientPlugin, 'get_heat_url')
heat_plugin.HeatClientPlugin.get_heat_url().AndReturn(
'foo/%s' % self.tenant_id)
self.m.ReplayAll()
handle = self._create_heat_handle()
expected = ('foo/aprojectid/stacks/test_stack/%s/resources/'
'wait_handle/signal'
% self.stack_id)
self.assertEqual(expected, handle.FnGetAtt('endpoint'))
self.m.VerifyAll()
def test_getatt_curl_cli_success(self):
self.m.StubOutWithMock(heat_plugin.HeatClientPlugin, 'get_heat_url')
heat_plugin.HeatClientPlugin.get_heat_url().AndReturn(
'foo/%s' % self.tenant_id)
self.m.ReplayAll()
handle = self._create_heat_handle()
expected = ("curl -i -X POST -H 'X-Auth-Token: adomainusertoken' "
"-H 'Content-Type: application/json' "
"-H 'Accept: application/json' "
"foo/aprojectid/stacks/test_stack/%s/resources/wait_handle"
"/signal" % self.stack_id)
self.assertEqual(expected, handle.FnGetAtt('curl_cli_success'))
self.m.VerifyAll()
def test_getatt_curl_cli_failure(self):
self.m.StubOutWithMock(heat_plugin.HeatClientPlugin, 'get_heat_url')
heat_plugin.HeatClientPlugin.get_heat_url().AndReturn(
'foo/%s' % self.tenant_id)
self.m.ReplayAll()
handle = self._create_heat_handle()
expected = ("curl -i -X POST "
"--data-binary '{\"status\": \"FAILURE\"}' "
"-H 'X-Auth-Token: adomainusertoken' "
"-H 'Content-Type: application/json' "
"-H 'Accept: application/json' "
"foo/aprojectid/stacks/test_stack/%s/resources/wait_handle"
"/signal" % self.stack_id)
self.assertEqual(expected, handle.FnGetAtt('curl_cli_failure'))
self.m.VerifyAll()