From 94fb733a47ef76eb359ffd31ce4a346c6facfbe9 Mon Sep 17 00:00:00 2001 From: Limor Stotland Date: Sun, 30 Aug 2015 08:34:10 +0000 Subject: [PATCH] fix : removing policies from mistral types 1)Removing policies from mistral types 2)Adding other fields to the workflow Change-Id: I034acaf2741cc9d0d7a7b7b8362e1eeb1e04a174 Closes-Bug:#1489519 --- .../resources/openstack/mistral/workflow.py | 165 +++++++++++++++++- heat/tests/mistral/test_mistral_workflow.py | 133 +++++++++++++- 2 files changed, 291 insertions(+), 7 deletions(-) diff --git a/heat/engine/resources/openstack/mistral/workflow.py b/heat/engine/resources/openstack/mistral/workflow.py index c152a10421..dccaccc77a 100644 --- a/heat/engine/resources/openstack/mistral/workflow.py +++ b/heat/engine/resources/openstack/mistral/workflow.py @@ -35,19 +35,29 @@ class Workflow(signal_responder.SignalResponder, entity = 'workflows' PROPERTIES = ( - NAME, TYPE, DESCRIPTION, INPUT, OUTPUT, TASKS, PARAMS + NAME, TYPE, DESCRIPTION, INPUT, OUTPUT, TASKS, PARAMS, TASK_DEFAULTS ) = ( - 'name', 'type', 'description', 'input', 'output', 'tasks', 'params' + 'name', 'type', 'description', 'input', 'output', 'tasks', 'params', + 'task_defaults' ) _TASKS_KEYS = ( TASK_NAME, TASK_DESCRIPTION, ON_ERROR, ON_COMPLETE, ON_SUCCESS, - POLICIES, ACTION, WORKFLOW, PUBLISH, TASK_INPUT, REQUIRES + POLICIES, ACTION, WORKFLOW, PUBLISH, TASK_INPUT, REQUIRES, + RETRY, WAIT_BEFORE, WAIT_AFTER, PAUSE_BEFORE, TIMEOUT, + WITH_ITEMS, KEEP_RESULT, TARGET ) = ( 'name', 'description', 'on_error', 'on_complete', 'on_success', - 'policies', 'action', 'workflow', 'publish', 'input', 'requires' + 'policies', 'action', 'workflow', 'publish', 'input', 'requires', + 'retry', 'wait_before', 'wait_after', 'pause_before', 'timeout', + 'with_items', 'keep_result', 'target' ) + _TASKS_TASK_DEFAULTS = [ + ON_ERROR, ON_COMPLETE, ON_SUCCESS, + REQUIRES, RETRY, WAIT_BEFORE, WAIT_AFTER, PAUSE_BEFORE, TIMEOUT + ] + _SIGNAL_DATA_KEYS = ( SIGNAL_DATA_INPUT, SIGNAL_DATA_PARAMS ) = ( @@ -97,6 +107,65 @@ class Workflow(signal_responder.SignalResponder, "params requires 'task_name', which defines initial task."), update_allowed=True ), + TASK_DEFAULTS: properties.Schema( + properties.Schema.MAP, + _("Default settings for some of task " + "attributes defined " + "at workflow level."), + support_status=support.SupportStatus(version='5.0.0'), + schema={ + ON_SUCCESS: properties.Schema( + properties.Schema.LIST, + _('List of tasks which will run after ' + 'the task has completed successfully.') + ), + ON_ERROR: properties.Schema( + properties.Schema.LIST, + _('List of tasks which will run after ' + 'the task has completed with an error.') + ), + ON_COMPLETE: properties.Schema( + properties.Schema.LIST, + _('List of tasks which will run after ' + 'the task has completed regardless of whether ' + 'it is successful or not.') + ), + REQUIRES: properties.Schema( + properties.Schema.LIST, + _('List of tasks which should be executed before ' + 'this task. Used only in reverse workflows.') + ), + RETRY: properties.Schema( + properties.Schema.MAP, + _('Defines a pattern how task should be repeated in ' + 'case of an error.') + ), + WAIT_BEFORE: properties.Schema( + properties.Schema.INTEGER, + _('Defines a delay in seconds that Mistral Engine' + ' should wait before starting a task.') + ), + WAIT_AFTER: properties.Schema( + properties.Schema.INTEGER, + _('Defines a delay in seconds that Mistral Engine' + ' should wait after a task has completed before' + ' starting next tasks defined in ' + 'on-success, on-error or on-complete.') + ), + PAUSE_BEFORE: properties.Schema( + properties.Schema.BOOLEAN, + _('Defines whether Mistral Engine should put the ' + 'workflow on hold or not before starting a task') + ), + TIMEOUT: properties.Schema( + properties.Schema.INTEGER, + _('Defines a period of time in seconds after which ' + 'a task will be failed automatically ' + 'by engine if hasn\'t completed.') + ), + }, + update_allowed=True + ), TASKS: properties.Schema( properties.Schema.LIST, _('Dictionary containing workflow tasks.'), @@ -156,13 +225,74 @@ class Workflow(signal_responder.SignalResponder, properties.Schema.MAP, _('Dictionary-like section defining task policies ' 'that influence how Mistral Engine runs tasks. Must ' - 'satisfy Mistral DSL v2.') + 'satisfy Mistral DSL v2.'), + support_status=support.SupportStatus( + status=support.DEPRECATED, + version='5.0.0', + message=_('Add needed policies directly to ' + 'the task, Policy keyword is not ' + 'needed'), + previous_status=support.SupportStatus( + version='2015.1')) ), REQUIRES: properties.Schema( properties.Schema.LIST, _('List of tasks which should be executed before ' 'this task. Used only in reverse workflows.') ), + RETRY: properties.Schema( + properties.Schema.MAP, + _('Defines a pattern how task should be repeated in ' + 'case of an error.'), + support_status=support.SupportStatus(version='5.0.0') + ), + WAIT_BEFORE: properties.Schema( + properties.Schema.INTEGER, + _('Defines a delay in seconds that Mistral Engine ' + 'should wait before starting a task.'), + support_status=support.SupportStatus(version='5.0.0') + ), + WAIT_AFTER: properties.Schema( + properties.Schema.INTEGER, + _('Defines a delay in seconds that Mistral ' + 'Engine should wait after ' + 'a task has completed before starting next tasks ' + 'defined in on-success, on-error or on-complete.'), + support_status=support.SupportStatus(version='5.0.0') + ), + PAUSE_BEFORE: properties.Schema( + properties.Schema.BOOLEAN, + _('Defines whether Mistral Engine should ' + 'put the workflow on hold ' + 'or not before starting a task.'), + support_status=support.SupportStatus(version='5.0.0') + ), + TIMEOUT: properties.Schema( + properties.Schema.INTEGER, + _('Defines a period of time in seconds after which a ' + 'task will be failed automatically by engine ' + 'if hasn\'t completed.'), + support_status=support.SupportStatus(version='5.0.0') + ), + WITH_ITEMS: properties.Schema( + properties.Schema.STRING, + _('If configured, it allows to run action or workflow ' + 'associated with a task multiple times ' + 'on a provided list of items.'), + support_status=support.SupportStatus(version='5.0.0') + ), + KEEP_RESULT: properties.Schema( + properties.Schema.BOOLEAN, + _('Allowing not to store action results ' + 'after task completion.'), + support_status=support.SupportStatus(version='5.0.0') + ), + TARGET: properties.Schema( + properties.Schema.STRING, + _('It defines an executor to which task action ' + 'should be sent to.'), + support_status=support.SupportStatus(version='5.0.0') + ), }, ), required=True, @@ -260,6 +390,16 @@ class Workflow(signal_responder.SignalResponder, self.REQUIRES], message=msg) + if task.get(self.POLICIES) is not None: + for task_item in task.get(self.POLICIES): + if task.get(task_item) is not None: + msg = _('Property %(policies)s and %(item)s cannot be ' + 'used both at one time.') % { + 'policies': self.POLICIES, + 'item': task_item + } + raise exception.StackValidationFailed(message=msg) + def _workflow_name(self): return self.properties.get(self.NAME) or self.physical_resource_name() @@ -275,8 +415,16 @@ class Workflow(signal_responder.SignalResponder, msg = _("No such workflow %s") % wf_value raise ValueError(msg) + # backward support for kilo. + if task.get(self.POLICIES) is not None: + task.update(task.get(self.POLICIES)) + task_keys = [key for key in self._TASKS_KEYS - if key not in [self.WORKFLOW, self.TASK_NAME]] + if key not in [ + self.WORKFLOW, + self.TASK_NAME, + self.POLICIES + ]] for task_prop in task_keys: if task.get(task_prop) is not None: current_task.update( @@ -302,6 +450,11 @@ class Workflow(signal_responder.SignalResponder, for task in self.build_tasks(props): definition.get(defn_name).get(self.TASKS).update(task) + if props.get(self.TASK_DEFAULTS) is not None: + definition[defn_name][self.TASK_DEFAULTS.replace('_', '-')] = { + k.replace('_', '-'): v for k, v in + six.iteritems(props.get(self.TASK_DEFAULTS)) if v} + return yaml.dump(definition, Dumper=yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') else yaml.SafeDumper) diff --git a/heat/tests/mistral/test_mistral_workflow.py b/heat/tests/mistral/test_mistral_workflow.py index 431f008088..6af2617cf3 100644 --- a/heat/tests/mistral/test_mistral_workflow.py +++ b/heat/tests/mistral/test_mistral_workflow.py @@ -13,6 +13,7 @@ import mock import six +import yaml from mistralclient.api.v2 import executions @@ -74,6 +75,61 @@ resources: workflow_template_full = """ heat_template_version: 2013-05-23 +resources: + create_vm: + type: OS::Mistral::Workflow + properties: + name: create_vm + type: direct + input: + name: create_test_server + image: 31d8eeaf-686e-4e95-bb27-765014b9f20b + flavor: 2 + output: + vm_id: <% $.vm_id %> + task_defaults: + on_error: + - on_error + tasks: + - name: create_server + action: | + nova.servers_create name=<% $.name %> image=<% $.image %> + flavor=<% $.flavor %> + publish: + vm_id: <% $.create_server.id %> + on_success: + - check_server_exists + - name: check_server_exists + action: nova.servers_get server=<% $.vm_id %> + publish: + server_exists: True + on_success: + - list_machines + - name: wait_instance + action: nova.servers_find id=<% $.vm_id_new %> status='ACTIVE' + retry: + delay: 5 + count: 15 + wait_before: 7 + wait_after: 8 + pause_before: true + timeout: 11 + keep_result: false + target: test + with_items: vm_id_new in <% $.list_servers %> + - name: list_machines + action: nova.servers_list + publish: + -list_servers: <% $.list_machines %> + on_success: + - wait_instance + - name: on_error + action: std.echo output="output" + +""" + +workflow_template_backward_support = """ +heat_template_version: 2013-05-23 resources: create_vm: type: OS::Mistral::Workflow @@ -169,6 +225,26 @@ resources: result: <% $.hello %> """ +workflow_template_duplicate_polices = """ +heat_template_version: 2013-05-23 +resources: + workflow: + type: OS::Mistral::Workflow + properties: + name: list + type: direct + tasks: + - name: list + action: nova.servers_list + policies: + retry: + delay: 5 + count: 15 + retry: + delay: 6 + count: 16 +""" + class FakeWorkflow(object): def __init__(self, name): @@ -177,7 +253,6 @@ class FakeWorkflow(object): class TestMistralWorkflow(common.HeatTestCase): - def setUp(self): super(TestMistralWorkflow, self).setUp() resources.initialise() @@ -240,6 +315,37 @@ class TestMistralWorkflow(common.HeatTestCase): self.assertEqual(expected_state, wf.state) self.assertEqual('create_vm', wf.resource_id) + def test_create_with_task_parms(self): + tmpl = template_format.parse(workflow_template_full) + stack = utils.parse_stack(tmpl) + + rsrc_defns = stack.t.resource_definitions(stack)['create_vm'] + wf = workflow.Workflow('create_vm', rsrc_defns, stack) + self.mistral.workflows.create.side_effect = (lambda args: + self.verify_create_params( + args)) + scheduler.TaskRunner(wf.create)() + + def test_backward_support(self): + tmpl = template_format.parse(workflow_template_backward_support) + stack = utils.parse_stack(tmpl) + + rsrc_defns = stack.t.resource_definitions(stack)['create_vm'] + + wf = workflow.Workflow('create_vm', rsrc_defns, stack) + self.mistral.workflows.create.return_value = [ + FakeWorkflow('create_vm')] + scheduler.TaskRunner(wf.create)() + + expected_state = (wf.CREATE, wf.COMPLETE) + self.assertEqual(expected_state, wf.state) + self.assertEqual('create_vm', wf.resource_id) + for task in wf.properties['tasks']: + if task['name'] is 'wait_instance': + self.assertEqual(5, task['retry']['delay']) + self.assertEqual(15, task['retry']['count']) + break + def test_attributes(self): wf = self._create_resource('workflow', self.rsrc_defn, self.stack) self.mistral.workflows.get.return_value = \ @@ -496,9 +602,34 @@ class TestMistralWorkflow(common.HeatTestCase): lambda *args, **kw: self.verify_params(*args, **kw)) scheduler.TaskRunner(wf.signal, details)() + def test_duplicate_attribute_validation_error(self): + error_msg = ("Property policies and retry cannot be " + "used both at one time.") + self._test_validation_failed(workflow_template_duplicate_polices, + error_msg) + def verify_params(self, workflow_name, workflow_input=None, **params): self.assertEqual({'test': 'param_value', 'test1': 'param_value_1'}, params) execution = mock.Mock() execution.id = '12345' return execution + + def verify_create_params(self, wf_yaml): + wf = yaml.load(wf_yaml)["create_vm"] + self.assertEqual(['on_error'], wf["task-defaults"]["on-error"]) + + tasks = wf['tasks'] + task = tasks['wait_instance'] + self.assertEqual('vm_id_new in <% $.list_servers %>', + task['with-items']) + self.assertEqual(5, task['retry']['delay']) + self.assertEqual(15, task['retry']['count']) + self.assertEqual(8, task['wait-after']) + self.assertEqual(True, task['pause-before']) + self.assertEqual(11, task['timeout']) + self.assertEqual('test', task['target']) + self.assertEqual(7, task['wait-before']) + self.assertEqual(False, task['keep-result']) + + return [FakeWorkflow('create_vm')]