From 9d50fabe829da6be20e847cee29e2518b4591fcc Mon Sep 17 00:00:00 2001 From: Peter Razumovsky Date: Thu, 15 Jan 2015 22:16:32 +0300 Subject: [PATCH] Add OS::Mistral::Workflow for Mistral workflows This patch add OS::Mistral::Workflow resource for Mistral workflow supporting. Change-Id: Ib32730751d082b87074cba714ba308ef865d9fd9 Implements: partial blueprint mistral-resources-for-heat --- .../heat_mistral/resources/__init__.py | 0 .../heat_mistral/resources/workflow.py | 422 ++++++++++++++++++ .../heat_mistral/tests/__init__.py | 0 .../heat_mistral/tests/test_workflow.py | 293 ++++++++++++ 4 files changed, 715 insertions(+) create mode 100644 contrib/heat_mistral/heat_mistral/resources/__init__.py create mode 100644 contrib/heat_mistral/heat_mistral/resources/workflow.py create mode 100644 contrib/heat_mistral/heat_mistral/tests/__init__.py create mode 100644 contrib/heat_mistral/heat_mistral/tests/test_workflow.py diff --git a/contrib/heat_mistral/heat_mistral/resources/__init__.py b/contrib/heat_mistral/heat_mistral/resources/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/contrib/heat_mistral/heat_mistral/resources/workflow.py b/contrib/heat_mistral/heat_mistral/resources/workflow.py new file mode 100644 index 000000000..6cd8d024d --- /dev/null +++ b/contrib/heat_mistral/heat_mistral/resources/workflow.py @@ -0,0 +1,422 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_serialization import jsonutils +import six +import yaml + +from heat.common import exception +from heat.common.i18n import _ +from heat.engine import attributes +from heat.engine import clients +from heat.engine import constraints +from heat.engine import properties +from heat.engine import resource +from heat.engine.resources import signal_responder +from heat.engine import support + + +class Workflow(signal_responder.SignalResponder, + resource.Resource): + support_status = support.SupportStatus(version='2015.1') + + PROPERTIES = ( + NAME, TYPE, DESCRIPTION, INPUT, OUTPUT, TASKS, PARAMS + ) = ( + 'name', 'type', 'description', 'input', 'output', 'tasks', 'params' + ) + + _TASKS_KEYS = ( + TASK_NAME, TASK_DESCRIPTION, ON_ERROR, ON_COMPLETE, ON_SUCCESS, + POLICIES, ACTION, WORKFLOW, PUBLISH, TASK_INPUT, REQUIRES + ) = ( + 'name', 'description', 'on_error', 'on_complete', 'on_success', + 'policies', 'action', 'workflow', 'publish', 'input', 'requires' + ) + + _SIGNAL_DATA_KEYS = ( + SIGNAL_DATA_INPUT, SIGNAL_DATA_PARAMS + ) = ( + 'input', 'params' + ) + + ATTRIBUTES = ( + WORKFLOW_DATA, ALARM_URL, EXECUTIONS + ) = ( + 'data', 'alarm_url', 'executions' + ) + + properties_schema = { + NAME: properties.Schema( + properties.Schema.STRING, + _('Workflow name.') + ), + TYPE: properties.Schema( + properties.Schema.STRING, + _('Workflow type.'), + constraints=[ + constraints.AllowedValues(['direct', 'reverse']) + ], + required=True, + update_allowed=True + ), + DESCRIPTION: properties.Schema( + properties.Schema.STRING, + _('Workflow description.'), + update_allowed=True, + default='' + ), + INPUT: properties.Schema( + properties.Schema.MAP, + _('Dictionary which contains input for workflow.'), + update_allowed=True, + default={} + ), + OUTPUT: properties.Schema( + properties.Schema.MAP, + _('Any data structure arbitrarily containing YAQL ' + 'expressions that defines workflow output. May be ' + 'nested.'), + update_allowed=True, + default={} + ), + PARAMS: properties.Schema( + properties.Schema.MAP, + _("Workflow additional parameters. If Workflow is reverse typed, " + "params requires 'task_name', which defines initial task."), + update_allowed=True, + default={} + ), + TASKS: properties.Schema( + properties.Schema.LIST, + _('Dictionary containing workflow tasks.'), + schema=properties.Schema( + properties.Schema.MAP, + schema={ + TASK_NAME: properties.Schema( + properties.Schema.STRING, + _('Task name.'), + required=True + ), + TASK_DESCRIPTION: properties.Schema( + properties.Schema.STRING, + _('Task description.') + ), + TASK_INPUT: properties.Schema( + properties.Schema.MAP, + _('Actual input parameter values of the task.') + ), + ACTION: properties.Schema( + properties.Schema.STRING, + _('Name of the action associated with the task. ' + 'Either action or workflow may be defined in the ' + 'task.') + ), + WORKFLOW: properties.Schema( + properties.Schema.STRING, + _('Name of the workflow associated with the task. ' + 'Can be defined by intrinsic function get_resource ' + 'or by name of the referenced workflow, i.e. ' + '{ workflow: wf_name } or ' + '{ workflow: { get_resource: wf_name }}. Either ' + 'action or workflow may be defined in the task.') + ), + PUBLISH: properties.Schema( + properties.Schema.MAP, + _('Dictionary of variables to publish to ' + 'the workflow context.') + ), + ON_SUCCESS: properties.Schema( + properties.Schema.LIST, + _('List of tasks which will run after ' + 'the task has completed successfully.') + ), + ON_ERROR: properties.Schema( + properties.Schema.LIST, + _('List of tasks which will run after ' + 'the task has completed with an error.') + ), + ON_COMPLETE: properties.Schema( + properties.Schema.LIST, + _('List of tasks which will run after ' + 'the task has completed regardless of whether ' + 'it is successful or not.') + ), + POLICIES: properties.Schema( + properties.Schema.MAP, + _('Dictionary-like section defining task policies ' + 'that influence how Mistral Engine runs tasks. Must ' + 'satisfy Mistral DSL v2.') + ), + REQUIRES: properties.Schema( + properties.Schema.LIST, + _('List of tasks which should be executed before ' + 'this task. Used only in reverse workflows.') + ), + }, + ), + required=True, + update_allowed=True + ) + } + + attributes_schema = { + WORKFLOW_DATA: attributes.Schema( + _('A dictionary which contains name and input of the workflow.') + ), + ALARM_URL: attributes.Schema( + _("A signed url to create executions for workflows specified in " + "Workflow resource.") + ), + EXECUTIONS: attributes.Schema( + _("List of workflows' executions, each of them is a dictionary " + "with information about execution. Each dictionary returns " + "values for next keys: id, workflow_name, created_at, " + "updated_at, state for current execution state, input, output.") + ) + } + + def mistral(self): + return self.client('mistral') + + def FnGetRefId(self): + return self._workflow_name() + + def _validate_signal_data(self, data): + if data is not None: + input_value = data.get(self.SIGNAL_DATA_INPUT) + params_value = data.get(self.SIGNAL_DATA_PARAMS) + if input_value is not None: + if not isinstance(input_value, dict): + message = (_('Input in signal data must be a map, ' + 'find a %s') % type(input_value)) + raise exception.StackValidationFailed( + error=_('Signal data error'), + message=message) + for key in input_value.keys(): + if (self.properties.get(self.INPUT) is None + or key not in self.properties.get(self.INPUT)): + message = _('Unknown input %s') % key + raise exception.StackValidationFailed( + error=_('Signal data error'), + message=message) + if params_value is not None and not isinstance(params_value, dict): + message = (_('Params must be a map, find a ' + '%s') % type(params_value)) + raise exception.StackValidationFailed( + error=_('Signal data error'), + message=message) + + def validate(self): + super(Workflow, self).validate() + if self.properties.get(self.TYPE) == 'reverse': + params = self.properties.get(self.PARAMS) + if params is None or not params.get('task_name'): + raise exception.StackValidationFailed( + error=_('Mistral resource validation error'), + path=[self.name, + ('properties' + if self.stack.t.VERSION == 'heat_template_version' + else 'Properties'), + self.PARAMS], + message=_("'task_name' is not assigned in 'params' " + "in case of reverse type workflow.") + ) + for task in self.properties.get(self.TASKS): + wf_value = task.get(self.WORKFLOW) + action_value = task.get(self.ACTION) + if wf_value and action_value: + raise exception.ResourcePropertyConflict(self.WORKFLOW, + self.ACTION) + if not wf_value and not action_value: + raise exception.PropertyUnspecifiedError(self.WORKFLOW, + self.ACTION) + if (task.get(self.REQUIRES) is not None + and self.properties.get(self.TYPE)) == 'direct': + msg = _("task %(task)s contains property 'requires' " + "in case of direct workflow. Only reverse workflows " + "can contain property 'requires'.") % { + 'name': self.name, + 'task': task.get(self.TASK_NAME) + } + raise exception.StackValidationFailed( + error=_('Mistral resource validation error'), + path=[self.name, + ('properties' + if self.stack.t.VERSION == 'heat_template_version' + else 'Properties'), + self.TASKS, + task.get(self.TASK_NAME), + self.REQUIRES], + message=msg) + + def _workflow_name(self): + return self.properties.get(self.NAME) or self.physical_resource_name() + + def build_tasks(self, props): + for task in props[self.TASKS]: + current_task = {} + wf_value = task.get(self.WORKFLOW) + if wf_value is not None: + if wf_value in [res.resource_id + for res in six.itervalues(self.stack)]: + current_task.update({self.WORKFLOW: wf_value}) + else: + msg = _("No such workflow %s") % wf_value + raise ValueError(msg) + + task_keys = [key for key in self._TASKS_KEYS + if key not in [self.WORKFLOW, self.TASK_NAME]] + for task_prop in task_keys: + if task.get(task_prop) is not None: + current_task.update( + {task_prop.replace('_', '-'): task[task_prop]}) + + yield {task[self.TASK_NAME]: current_task} + + def prepare_properties(self, props): + """Prepare correct YAML-formatted definition for Mistral.""" + defn_name = self._workflow_name() + definition = {'version': '2.0', + defn_name: {self.TYPE: props[self.TYPE], + self.DESCRIPTION: props[self.DESCRIPTION], + self.OUTPUT: props[self.OUTPUT], + self.INPUT: props[self.INPUT].keys(), + self.TASKS: {}}} + + for task in self.build_tasks(props): + definition.get(defn_name).get(self.TASKS).update(task) + + return yaml.dump(definition, Dumper=yaml.CSafeDumper + if hasattr(yaml, 'CSafeDumper') + else yaml.SafeDumper) + + def handle_create(self): + super(Workflow, self).handle_create() + props = self.prepare_properties(self.properties) + try: + workflow = self.mistral().workflows.create(props) + except Exception as ex: + raise exception.ResourceFailure(ex, self) + # NOTE(prazumovsky): Mistral uses unique names for resource + # identification. + self.resource_id_set(workflow[0].name) + + def handle_signal(self, details=None): + self._validate_signal_data(details) + + result_input = {} + result_params = {} + + if details is not None: + if details.get(self.INPUT) is not None: + # NOTE(prazumovsky): Signal can contains some data, interesting + # for workflow, e.g. inputs. So, if signal data contains input + # we update override inputs, other leaved defined in template. + for key, value in six.iteritems( + self.properties.get(self.INPUT)): + result_input.update( + {key: details.get( + self.SIGNAL_DATA_INPUT).get(key) or value}) + if details.get(self.PARAMS) is not None: + for key, value in six.iteritems( + self.properties.get(self.PARAMS)): + result_params.update( + {key: details.get( + self.SIGNAL_DATA_PARAMS).get(key) or value}) + + if not result_input: + result_input.update(self.properties.get(self.INPUT)) + if not result_params: + result_params.update(self.properties.get(self.PARAMS)) + + try: + execution = self.mistral().executions.create( + self._workflow_name(), + jsonutils.dumps(result_input), + jsonutils.dumps(result_params)) + except Exception as ex: + raise exception.ResourceFailure(ex, self) + executions = [execution.id] + if self.EXECUTIONS in self.data(): + executions.extend(self.data().get(self.EXECUTIONS).split(',')) + self.data_set(self.EXECUTIONS, ','.join(executions)) + + def handle_update(self, json_snippet=None, tmpl_diff=None, prop_diff=None): + update_allowed = [self.INPUT, self.PARAMS, self.DESCRIPTION] + for prop in update_allowed: + if prop in prop_diff: + del prop_diff[prop] + if len(prop_diff) > 0: + new_props = self.prepare_properties(tmpl_diff['Properties']) + try: + workflow = self.mistral().workflows.update(new_props) + except Exception as ex: + raise exception.ResourceFailure(ex, self) + self.data_set(self.NAME, workflow[0].name) + self.resource_id_set(workflow[0].name) + + def handle_delete(self): + super(Workflow, self).handle_delete() + + if self.resource_id is None: + return + + try: + self.mistral().workflows.delete(self.resource_id) + if self.data().get(self.EXECUTIONS): + for id in self.data().get(self.EXECUTIONS).split(','): + self.mistral().executions.delete(id) + except Exception as e: + self.client_plugin('mistral').ignore_not_found(e) + + def _resolve_attribute(self, name): + if name == self.EXECUTIONS: + if self.EXECUTIONS not in self.data(): + return [] + + def parse_execution_response(execution): + return { + 'id': execution.id, + 'workflow_name': execution.workflow_name, + 'created_at': execution.created_at, + 'updated_at': execution.updated_at, + 'state': execution.state, + 'input': jsonutils.loads(six.text_type(execution.input)), + 'output': jsonutils.loads(six.text_type(execution.output)) + } + + return [parse_execution_response( + self.mistral().executions.get(exec_id)) + for exec_id in + self.data().get(self.EXECUTIONS).split(',')] + + elif name == self.WORKFLOW_DATA: + return {self.NAME: self.resource_id, + self.INPUT: self.properties.get(self.INPUT)} + + elif name == self.ALARM_URL: + return six.text_type(self._get_signed_url()) + + +def resource_mapping(): + return { + 'OS::Mistral::Workflow': Workflow + } + + +def available_resource_mapping(): + if not clients.has_client('mistral'): + return {} + + return resource_mapping() diff --git a/contrib/heat_mistral/heat_mistral/tests/__init__.py b/contrib/heat_mistral/heat_mistral/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/contrib/heat_mistral/heat_mistral/tests/test_workflow.py b/contrib/heat_mistral/heat_mistral/tests/test_workflow.py new file mode 100644 index 000000000..840437370 --- /dev/null +++ b/contrib/heat_mistral/heat_mistral/tests/test_workflow.py @@ -0,0 +1,293 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import six + +from heat.common import exception +from heat.common import template_format +from heat.engine import resource +from heat.engine.resources import signal_responder +from heat.engine.resources import stack_user +from heat.engine import scheduler +from heat.engine import template +from heat.tests import common +from heat.tests import utils + +from .. import client # noqa +from ..resources import workflow # noqa + +workflow_template = """ +heat_template_version: 2013-05-23 +resources: + workflow: + type: OS::Mistral::Workflow + properties: + type: direct + tasks: + - name: hello + action: std.echo output='Good morning!' + publish: + result: <% $.hello %> +""" + +workflow_template_full = """ +heat_template_version: 2013-05-23 +resources: + create_vm: + type: OS::Mistral::Workflow + properties: + name: create_vm + type: direct + input: + name: create_test_server + image: 31d8eeaf-686e-4e95-bb27-765014b9f20b + flavor: 2 + output: + vm_id: <% $.vm_id %> + tasks: + - name: create_server + action: | + nova.servers_create name=<% $.name %> image=<% $.image %> + flavor=<% $.flavor %> + publish: + vm_id: <% $.create_server.id %> + on_success: + - check_server_exists + - name: check_server_exists + action: nova.servers_get server=<% $.vm_id %> + publish: + server_exists: True + on_success: + - wait_instance + - name: wait_instance + action: nova.servers_find id=<% $.vm_id %> status='ACTIVE' + policies: + retry: + delay: 5 + count: 15 +""" + +workflow_template_bad = """ +heat_template_version: 2013-05-23 +resources: + workflow: + type: OS::Mistral::Workflow + properties: + type: direct + tasks: + - name: second_task + action: std.noop + requires: [first_task] + - name: first_task + action: std.noop +""" + +workflow_template_bad_reverse = """ +heat_template_version: 2013-05-23 +resources: + workflow: + type: OS::Mistral::Workflow + properties: + type: reverse + tasks: + - name: second_task + action: std.noop + requires: [first_task] + - name: first_task + action: std.noop +""" + +workflow_template_update = """ +heat_template_version: 2013-05-23 +resources: + workflow: + type: OS::Mistral::Workflow + properties: + name: hello_action + type: direct + tasks: + - name: hello + action: std.echo output='Good evening!' + publish: + result: <% $.hello %> +""" + + +class FakeWorkflow(object): + + def __init__(self, name): + self.name = name + + +class TestWorkflow(common.HeatTestCase): + + def setUp(self): + super(TestWorkflow, self).setUp() + utils.setup_dummy_db() + self.ctx = utils.dummy_context() + tmpl = template_format.parse(workflow_template) + self.stack = utils.parse_stack(tmpl, stack_name='test_stack') + + resource_defns = self.stack.t.resource_definitions(self.stack) + self.rsrc_defn = resource_defns['workflow'] + + self.patcher_client = mock.patch.object(workflow.Workflow, 'mistral') + mock.patch.object(stack_user.StackUser, '_create_user').start() + mock.patch.object(signal_responder.SignalResponder, + '_create_keypair').start() + mock.patch.object(client, 'mistral_base').start() + mock.patch.object(client.MistralClientPlugin, '_create').start() + self.client = client.MistralClientPlugin(self.ctx) + mock_client = self.patcher_client.start() + self.mistral = mock_client.return_value + + def tearDown(self): + super(TestWorkflow, self).tearDown() + self.patcher_client.stop() + + def _create_resource(self, name, snippet, stack): + wf = workflow.Workflow(name, snippet, stack) + self.mistral.workflows.create.return_value = [ + FakeWorkflow('test_stack-workflow-b5fiekfci3yc')] + scheduler.TaskRunner(wf.create)() + return wf + + def test_create(self): + wf = self._create_resource('workflow', self.rsrc_defn, self.stack) + expected_state = (wf.CREATE, wf.COMPLETE) + self.assertEqual(expected_state, wf.state) + self.assertEqual('test_stack-workflow-b5fiekfci3yc', wf.resource_id) + + def test_create_with_name(self): + tmpl = template_format.parse(workflow_template_full) + stack = utils.parse_stack(tmpl) + + rsrc_defns = stack.t.resource_definitions(stack)['create_vm'] + + wf = workflow.Workflow('create_vm', rsrc_defns, stack) + self.mistral.workflows.create.return_value = [ + FakeWorkflow('create_vm')] + scheduler.TaskRunner(wf.create)() + + expected_state = (wf.CREATE, wf.COMPLETE) + self.assertEqual(expected_state, wf.state) + self.assertEqual('create_vm', wf.resource_id) + + def test_attributes(self): + wf = self._create_resource('workflow', self.rsrc_defn, self.stack) + self.assertEqual({'name': 'test_stack-workflow-b5fiekfci3yc', + 'input': {}}, wf.FnGetAtt('data')) + self.assertEqual([], wf.FnGetAtt('executions')) + + def test_direct_workflow_validation_error(self): + error_msg = ("Mistral resource validation error : " + "workflow.properties.tasks.second_task.requires: " + "task second_task contains property 'requires' " + "in case of direct workflow. Only reverse workflows " + "can contain property 'requires'.") + self._test_validation_failed(workflow_template_bad, error_msg) + + def test_wrong_params_using(self): + error_msg = ("Mistral resource validation error : " + "workflow.properties.params: 'task_name' is not assigned " + "in 'params' in case of reverse type workflow.") + self._test_validation_failed(workflow_template_bad_reverse, error_msg) + + def _test_validation_failed(self, templatem, error_msg): + tmpl = template_format.parse(templatem) + stack = utils.parse_stack(tmpl) + + rsrc_defns = stack.t.resource_definitions(stack)['workflow'] + + wf = workflow.Workflow('workflow', rsrc_defns, stack) + + exc = self.assertRaises(exception.StackValidationFailed, + wf.validate) + self.assertEqual(error_msg, six.text_type(exc)) + + def test_create_wrong_definition(self): + tmpl = template_format.parse(workflow_template) + stack = utils.parse_stack(tmpl) + + rsrc_defns = stack.t.resource_definitions(stack)['workflow'] + + wf = workflow.Workflow('workflow', rsrc_defns, stack) + + self.mistral.workflows.create.side_effect = Exception('boom!') + + exc = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(wf.create)) + expected_state = (wf.CREATE, wf.FAILED) + self.assertEqual(expected_state, wf.state) + self.assertIn('Exception: boom!', six.text_type(exc)) + + def test_update(self): + wf = self._create_resource('workflow', self.rsrc_defn, self.stack) + + t = template_format.parse(workflow_template_update) + rsrc_defns = template.Template(t).resource_definitions(self.stack) + new_workflow = rsrc_defns['workflow'] + + new_workflows = [FakeWorkflow('hello_action')] + self.mistral.workflows.update.return_value = new_workflows + self.mistral.workflows.delete.return_value = None + + err = self.assertRaises(resource.UpdateReplace, + scheduler.TaskRunner(wf.update, + new_workflow)) + msg = 'The Resource workflow requires replacement.' + self.assertEqual(msg, six.text_type(err)) + + def test_delete(self): + wf = self._create_resource('workflow', self.rsrc_defn, self.stack) + + scheduler.TaskRunner(wf.delete)() + self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state) + + def test_delete_no_data(self): + wf = self._create_resource('workflow', self.rsrc_defn, self.stack) + + wf.data_delete('executions') + self.assertEqual([], wf.FnGetAtt('executions')) + scheduler.TaskRunner(wf.delete)() + self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state) + + def test_delete_not_found(self): + wf = self._create_resource('workflow', self.rsrc_defn, self.stack) + + self.mistral.workflows.delete.side_effect = ( + self.mistral.mistral_base.APIException(error_code=404)) + + scheduler.TaskRunner(wf.delete)() + self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state) + + @mock.patch.object(resource.Resource, 'client_plugin') + def test_delete_other_errors(self, mock_plugin): + """We mock client_plugin for returning correct mistral client.""" + mock_plugin.return_value = self.client + client.mistral_base.APIException = exception.Error + wf = self._create_resource('workflow', self.rsrc_defn, self.stack) + + self.mistral.workflows.delete.side_effect = (Exception('boom!')) + + exc = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(wf.delete)) + self.assertEqual((wf.DELETE, wf.FAILED), wf.state) + self.assertIn('boom!', six.text_type(exc)) + + def test_resource_mapping(self): + mapping = workflow.resource_mapping() + self.assertEqual(1, len(mapping)) + self.assertEqual(workflow.Workflow, + mapping['OS::Mistral::Workflow'])