Add OS::Mistral::Workflow for Mistral workflows

This patch add OS::Mistral::Workflow resource for
Mistral workflow supporting.

Change-Id: Ib32730751d082b87074cba714ba308ef865d9fd9
Implements: partial blueprint mistral-resources-for-heat
This commit is contained in:
Peter Razumovsky 2015-01-15 22:16:32 +03:00
parent 17dc00ce33
commit 9d50fabe82
4 changed files with 715 additions and 0 deletions

View File

@ -0,0 +1,422 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils
import six
import yaml
from heat.common import exception
from heat.common.i18n import _
from heat.engine import attributes
from heat.engine import clients
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine.resources import signal_responder
from heat.engine import support
class Workflow(signal_responder.SignalResponder,
resource.Resource):
support_status = support.SupportStatus(version='2015.1')
PROPERTIES = (
NAME, TYPE, DESCRIPTION, INPUT, OUTPUT, TASKS, PARAMS
) = (
'name', 'type', 'description', 'input', 'output', 'tasks', 'params'
)
_TASKS_KEYS = (
TASK_NAME, TASK_DESCRIPTION, ON_ERROR, ON_COMPLETE, ON_SUCCESS,
POLICIES, ACTION, WORKFLOW, PUBLISH, TASK_INPUT, REQUIRES
) = (
'name', 'description', 'on_error', 'on_complete', 'on_success',
'policies', 'action', 'workflow', 'publish', 'input', 'requires'
)
_SIGNAL_DATA_KEYS = (
SIGNAL_DATA_INPUT, SIGNAL_DATA_PARAMS
) = (
'input', 'params'
)
ATTRIBUTES = (
WORKFLOW_DATA, ALARM_URL, EXECUTIONS
) = (
'data', 'alarm_url', 'executions'
)
properties_schema = {
NAME: properties.Schema(
properties.Schema.STRING,
_('Workflow name.')
),
TYPE: properties.Schema(
properties.Schema.STRING,
_('Workflow type.'),
constraints=[
constraints.AllowedValues(['direct', 'reverse'])
],
required=True,
update_allowed=True
),
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Workflow description.'),
update_allowed=True,
default=''
),
INPUT: properties.Schema(
properties.Schema.MAP,
_('Dictionary which contains input for workflow.'),
update_allowed=True,
default={}
),
OUTPUT: properties.Schema(
properties.Schema.MAP,
_('Any data structure arbitrarily containing YAQL '
'expressions that defines workflow output. May be '
'nested.'),
update_allowed=True,
default={}
),
PARAMS: properties.Schema(
properties.Schema.MAP,
_("Workflow additional parameters. If Workflow is reverse typed, "
"params requires 'task_name', which defines initial task."),
update_allowed=True,
default={}
),
TASKS: properties.Schema(
properties.Schema.LIST,
_('Dictionary containing workflow tasks.'),
schema=properties.Schema(
properties.Schema.MAP,
schema={
TASK_NAME: properties.Schema(
properties.Schema.STRING,
_('Task name.'),
required=True
),
TASK_DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Task description.')
),
TASK_INPUT: properties.Schema(
properties.Schema.MAP,
_('Actual input parameter values of the task.')
),
ACTION: properties.Schema(
properties.Schema.STRING,
_('Name of the action associated with the task. '
'Either action or workflow may be defined in the '
'task.')
),
WORKFLOW: properties.Schema(
properties.Schema.STRING,
_('Name of the workflow associated with the task. '
'Can be defined by intrinsic function get_resource '
'or by name of the referenced workflow, i.e. '
'{ workflow: wf_name } or '
'{ workflow: { get_resource: wf_name }}. Either '
'action or workflow may be defined in the task.')
),
PUBLISH: properties.Schema(
properties.Schema.MAP,
_('Dictionary of variables to publish to '
'the workflow context.')
),
ON_SUCCESS: properties.Schema(
properties.Schema.LIST,
_('List of tasks which will run after '
'the task has completed successfully.')
),
ON_ERROR: properties.Schema(
properties.Schema.LIST,
_('List of tasks which will run after '
'the task has completed with an error.')
),
ON_COMPLETE: properties.Schema(
properties.Schema.LIST,
_('List of tasks which will run after '
'the task has completed regardless of whether '
'it is successful or not.')
),
POLICIES: properties.Schema(
properties.Schema.MAP,
_('Dictionary-like section defining task policies '
'that influence how Mistral Engine runs tasks. Must '
'satisfy Mistral DSL v2.')
),
REQUIRES: properties.Schema(
properties.Schema.LIST,
_('List of tasks which should be executed before '
'this task. Used only in reverse workflows.')
),
},
),
required=True,
update_allowed=True
)
}
attributes_schema = {
WORKFLOW_DATA: attributes.Schema(
_('A dictionary which contains name and input of the workflow.')
),
ALARM_URL: attributes.Schema(
_("A signed url to create executions for workflows specified in "
"Workflow resource.")
),
EXECUTIONS: attributes.Schema(
_("List of workflows' executions, each of them is a dictionary "
"with information about execution. Each dictionary returns "
"values for next keys: id, workflow_name, created_at, "
"updated_at, state for current execution state, input, output.")
)
}
def mistral(self):
return self.client('mistral')
def FnGetRefId(self):
return self._workflow_name()
def _validate_signal_data(self, data):
if data is not None:
input_value = data.get(self.SIGNAL_DATA_INPUT)
params_value = data.get(self.SIGNAL_DATA_PARAMS)
if input_value is not None:
if not isinstance(input_value, dict):
message = (_('Input in signal data must be a map, '
'find a %s') % type(input_value))
raise exception.StackValidationFailed(
error=_('Signal data error'),
message=message)
for key in input_value.keys():
if (self.properties.get(self.INPUT) is None
or key not in self.properties.get(self.INPUT)):
message = _('Unknown input %s') % key
raise exception.StackValidationFailed(
error=_('Signal data error'),
message=message)
if params_value is not None and not isinstance(params_value, dict):
message = (_('Params must be a map, find a '
'%s') % type(params_value))
raise exception.StackValidationFailed(
error=_('Signal data error'),
message=message)
def validate(self):
super(Workflow, self).validate()
if self.properties.get(self.TYPE) == 'reverse':
params = self.properties.get(self.PARAMS)
if params is None or not params.get('task_name'):
raise exception.StackValidationFailed(
error=_('Mistral resource validation error'),
path=[self.name,
('properties'
if self.stack.t.VERSION == 'heat_template_version'
else 'Properties'),
self.PARAMS],
message=_("'task_name' is not assigned in 'params' "
"in case of reverse type workflow.")
)
for task in self.properties.get(self.TASKS):
wf_value = task.get(self.WORKFLOW)
action_value = task.get(self.ACTION)
if wf_value and action_value:
raise exception.ResourcePropertyConflict(self.WORKFLOW,
self.ACTION)
if not wf_value and not action_value:
raise exception.PropertyUnspecifiedError(self.WORKFLOW,
self.ACTION)
if (task.get(self.REQUIRES) is not None
and self.properties.get(self.TYPE)) == 'direct':
msg = _("task %(task)s contains property 'requires' "
"in case of direct workflow. Only reverse workflows "
"can contain property 'requires'.") % {
'name': self.name,
'task': task.get(self.TASK_NAME)
}
raise exception.StackValidationFailed(
error=_('Mistral resource validation error'),
path=[self.name,
('properties'
if self.stack.t.VERSION == 'heat_template_version'
else 'Properties'),
self.TASKS,
task.get(self.TASK_NAME),
self.REQUIRES],
message=msg)
def _workflow_name(self):
return self.properties.get(self.NAME) or self.physical_resource_name()
def build_tasks(self, props):
for task in props[self.TASKS]:
current_task = {}
wf_value = task.get(self.WORKFLOW)
if wf_value is not None:
if wf_value in [res.resource_id
for res in six.itervalues(self.stack)]:
current_task.update({self.WORKFLOW: wf_value})
else:
msg = _("No such workflow %s") % wf_value
raise ValueError(msg)
task_keys = [key for key in self._TASKS_KEYS
if key not in [self.WORKFLOW, self.TASK_NAME]]
for task_prop in task_keys:
if task.get(task_prop) is not None:
current_task.update(
{task_prop.replace('_', '-'): task[task_prop]})
yield {task[self.TASK_NAME]: current_task}
def prepare_properties(self, props):
"""Prepare correct YAML-formatted definition for Mistral."""
defn_name = self._workflow_name()
definition = {'version': '2.0',
defn_name: {self.TYPE: props[self.TYPE],
self.DESCRIPTION: props[self.DESCRIPTION],
self.OUTPUT: props[self.OUTPUT],
self.INPUT: props[self.INPUT].keys(),
self.TASKS: {}}}
for task in self.build_tasks(props):
definition.get(defn_name).get(self.TASKS).update(task)
return yaml.dump(definition, Dumper=yaml.CSafeDumper
if hasattr(yaml, 'CSafeDumper')
else yaml.SafeDumper)
def handle_create(self):
super(Workflow, self).handle_create()
props = self.prepare_properties(self.properties)
try:
workflow = self.mistral().workflows.create(props)
except Exception as ex:
raise exception.ResourceFailure(ex, self)
# NOTE(prazumovsky): Mistral uses unique names for resource
# identification.
self.resource_id_set(workflow[0].name)
def handle_signal(self, details=None):
self._validate_signal_data(details)
result_input = {}
result_params = {}
if details is not None:
if details.get(self.INPUT) is not None:
# NOTE(prazumovsky): Signal can contains some data, interesting
# for workflow, e.g. inputs. So, if signal data contains input
# we update override inputs, other leaved defined in template.
for key, value in six.iteritems(
self.properties.get(self.INPUT)):
result_input.update(
{key: details.get(
self.SIGNAL_DATA_INPUT).get(key) or value})
if details.get(self.PARAMS) is not None:
for key, value in six.iteritems(
self.properties.get(self.PARAMS)):
result_params.update(
{key: details.get(
self.SIGNAL_DATA_PARAMS).get(key) or value})
if not result_input:
result_input.update(self.properties.get(self.INPUT))
if not result_params:
result_params.update(self.properties.get(self.PARAMS))
try:
execution = self.mistral().executions.create(
self._workflow_name(),
jsonutils.dumps(result_input),
jsonutils.dumps(result_params))
except Exception as ex:
raise exception.ResourceFailure(ex, self)
executions = [execution.id]
if self.EXECUTIONS in self.data():
executions.extend(self.data().get(self.EXECUTIONS).split(','))
self.data_set(self.EXECUTIONS, ','.join(executions))
def handle_update(self, json_snippet=None, tmpl_diff=None, prop_diff=None):
update_allowed = [self.INPUT, self.PARAMS, self.DESCRIPTION]
for prop in update_allowed:
if prop in prop_diff:
del prop_diff[prop]
if len(prop_diff) > 0:
new_props = self.prepare_properties(tmpl_diff['Properties'])
try:
workflow = self.mistral().workflows.update(new_props)
except Exception as ex:
raise exception.ResourceFailure(ex, self)
self.data_set(self.NAME, workflow[0].name)
self.resource_id_set(workflow[0].name)
def handle_delete(self):
super(Workflow, self).handle_delete()
if self.resource_id is None:
return
try:
self.mistral().workflows.delete(self.resource_id)
if self.data().get(self.EXECUTIONS):
for id in self.data().get(self.EXECUTIONS).split(','):
self.mistral().executions.delete(id)
except Exception as e:
self.client_plugin('mistral').ignore_not_found(e)
def _resolve_attribute(self, name):
if name == self.EXECUTIONS:
if self.EXECUTIONS not in self.data():
return []
def parse_execution_response(execution):
return {
'id': execution.id,
'workflow_name': execution.workflow_name,
'created_at': execution.created_at,
'updated_at': execution.updated_at,
'state': execution.state,
'input': jsonutils.loads(six.text_type(execution.input)),
'output': jsonutils.loads(six.text_type(execution.output))
}
return [parse_execution_response(
self.mistral().executions.get(exec_id))
for exec_id in
self.data().get(self.EXECUTIONS).split(',')]
elif name == self.WORKFLOW_DATA:
return {self.NAME: self.resource_id,
self.INPUT: self.properties.get(self.INPUT)}
elif name == self.ALARM_URL:
return six.text_type(self._get_signed_url())
def resource_mapping():
return {
'OS::Mistral::Workflow': Workflow
}
def available_resource_mapping():
if not clients.has_client('mistral'):
return {}
return resource_mapping()

View File

@ -0,0 +1,293 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import six
from heat.common import exception
from heat.common import template_format
from heat.engine import resource
from heat.engine.resources import signal_responder
from heat.engine.resources import stack_user
from heat.engine import scheduler
from heat.engine import template
from heat.tests import common
from heat.tests import utils
from .. import client # noqa
from ..resources import workflow # noqa
workflow_template = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
tasks:
- name: hello
action: std.echo output='Good morning!'
publish:
result: <% $.hello %>
"""
workflow_template_full = """
heat_template_version: 2013-05-23
resources:
create_vm:
type: OS::Mistral::Workflow
properties:
name: create_vm
type: direct
input:
name: create_test_server
image: 31d8eeaf-686e-4e95-bb27-765014b9f20b
flavor: 2
output:
vm_id: <% $.vm_id %>
tasks:
- name: create_server
action: |
nova.servers_create name=<% $.name %> image=<% $.image %>
flavor=<% $.flavor %>
publish:
vm_id: <% $.create_server.id %>
on_success:
- check_server_exists
- name: check_server_exists
action: nova.servers_get server=<% $.vm_id %>
publish:
server_exists: True
on_success:
- wait_instance
- name: wait_instance
action: nova.servers_find id=<% $.vm_id %> status='ACTIVE'
policies:
retry:
delay: 5
count: 15
"""
workflow_template_bad = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
tasks:
- name: second_task
action: std.noop
requires: [first_task]
- name: first_task
action: std.noop
"""
workflow_template_bad_reverse = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: reverse
tasks:
- name: second_task
action: std.noop
requires: [first_task]
- name: first_task
action: std.noop
"""
workflow_template_update = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
name: hello_action
type: direct
tasks:
- name: hello
action: std.echo output='Good evening!'
publish:
result: <% $.hello %>
"""
class FakeWorkflow(object):
def __init__(self, name):
self.name = name
class TestWorkflow(common.HeatTestCase):
def setUp(self):
super(TestWorkflow, self).setUp()
utils.setup_dummy_db()
self.ctx = utils.dummy_context()
tmpl = template_format.parse(workflow_template)
self.stack = utils.parse_stack(tmpl, stack_name='test_stack')
resource_defns = self.stack.t.resource_definitions(self.stack)
self.rsrc_defn = resource_defns['workflow']
self.patcher_client = mock.patch.object(workflow.Workflow, 'mistral')
mock.patch.object(stack_user.StackUser, '_create_user').start()
mock.patch.object(signal_responder.SignalResponder,
'_create_keypair').start()
mock.patch.object(client, 'mistral_base').start()
mock.patch.object(client.MistralClientPlugin, '_create').start()
self.client = client.MistralClientPlugin(self.ctx)
mock_client = self.patcher_client.start()
self.mistral = mock_client.return_value
def tearDown(self):
super(TestWorkflow, self).tearDown()
self.patcher_client.stop()
def _create_resource(self, name, snippet, stack):
wf = workflow.Workflow(name, snippet, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('test_stack-workflow-b5fiekfci3yc')]
scheduler.TaskRunner(wf.create)()
return wf
def test_create(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
expected_state = (wf.CREATE, wf.COMPLETE)
self.assertEqual(expected_state, wf.state)
self.assertEqual('test_stack-workflow-b5fiekfci3yc', wf.resource_id)
def test_create_with_name(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
expected_state = (wf.CREATE, wf.COMPLETE)
self.assertEqual(expected_state, wf.state)
self.assertEqual('create_vm', wf.resource_id)
def test_attributes(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
self.assertEqual({'name': 'test_stack-workflow-b5fiekfci3yc',
'input': {}}, wf.FnGetAtt('data'))
self.assertEqual([], wf.FnGetAtt('executions'))
def test_direct_workflow_validation_error(self):
error_msg = ("Mistral resource validation error : "
"workflow.properties.tasks.second_task.requires: "
"task second_task contains property 'requires' "
"in case of direct workflow. Only reverse workflows "
"can contain property 'requires'.")
self._test_validation_failed(workflow_template_bad, error_msg)
def test_wrong_params_using(self):
error_msg = ("Mistral resource validation error : "
"workflow.properties.params: 'task_name' is not assigned "
"in 'params' in case of reverse type workflow.")
self._test_validation_failed(workflow_template_bad_reverse, error_msg)
def _test_validation_failed(self, templatem, error_msg):
tmpl = template_format.parse(templatem)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
exc = self.assertRaises(exception.StackValidationFailed,
wf.validate)
self.assertEqual(error_msg, six.text_type(exc))
def test_create_wrong_definition(self):
tmpl = template_format.parse(workflow_template)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
self.mistral.workflows.create.side_effect = Exception('boom!')
exc = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.create))
expected_state = (wf.CREATE, wf.FAILED)
self.assertEqual(expected_state, wf.state)
self.assertIn('Exception: boom!', six.text_type(exc))
def test_update(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
t = template_format.parse(workflow_template_update)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_workflow = rsrc_defns['workflow']
new_workflows = [FakeWorkflow('hello_action')]
self.mistral.workflows.update.return_value = new_workflows
self.mistral.workflows.delete.return_value = None
err = self.assertRaises(resource.UpdateReplace,
scheduler.TaskRunner(wf.update,
new_workflow))
msg = 'The Resource workflow requires replacement.'
self.assertEqual(msg, six.text_type(err))
def test_delete(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
scheduler.TaskRunner(wf.delete)()
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
def test_delete_no_data(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
wf.data_delete('executions')
self.assertEqual([], wf.FnGetAtt('executions'))
scheduler.TaskRunner(wf.delete)()
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
def test_delete_not_found(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
self.mistral.workflows.delete.side_effect = (
self.mistral.mistral_base.APIException(error_code=404))
scheduler.TaskRunner(wf.delete)()
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
@mock.patch.object(resource.Resource, 'client_plugin')
def test_delete_other_errors(self, mock_plugin):
"""We mock client_plugin for returning correct mistral client."""
mock_plugin.return_value = self.client
client.mistral_base.APIException = exception.Error
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
self.mistral.workflows.delete.side_effect = (Exception('boom!'))
exc = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.delete))
self.assertEqual((wf.DELETE, wf.FAILED), wf.state)
self.assertIn('boom!', six.text_type(exc))
def test_resource_mapping(self):
mapping = workflow.resource_mapping()
self.assertEqual(1, len(mapping))
self.assertEqual(workflow.Workflow,
mapping['OS::Mistral::Workflow'])