From 58da57330a074256e64c8c4a02ff17febc4a4af4 Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Mon, 29 Dec 2014 14:05:52 +0300 Subject: [PATCH] Using 'with-items' instead of 'for-each' Partially implements blueprint mistral-with-items Change-Id: I679e1d525ef2764199d7bdeb9c91509a800547ff --- mistral/engine1/commands.py | 11 +++++------ mistral/tests/unit/engine1/test_for_each.py | 16 ++++++++-------- .../tests/unit/workbook/v2/test_dsl_specs_v2.py | 10 +++++----- .../{test_for_each.py => test_with_items.py} | 14 +++++++------- mistral/workbook/v2/tasks.py | 16 ++++++++-------- mistral/workflow/base.py | 14 +++++++------- mistral/workflow/data_flow.py | 10 +++++----- mistral/workflow/{for_each.py => with_items.py} | 14 +++++++------- 8 files changed, 52 insertions(+), 53 deletions(-) rename mistral/tests/unit/workflow/{test_for_each.py => test_with_items.py} (85%) rename mistral/workflow/{for_each.py => with_items.py} (88%) diff --git a/mistral/engine1/commands.py b/mistral/engine1/commands.py index 2997658c7..f147ff746 100644 --- a/mistral/engine1/commands.py +++ b/mistral/engine1/commands.py @@ -28,8 +28,8 @@ from mistral.services import action_manager as a_m from mistral import utils from mistral.workbook import parser as spec_parser from mistral.workflow import data_flow -from mistral.workflow import for_each from mistral.workflow import states +from mistral.workflow import with_items LOG = logging.getLogger(__name__) @@ -219,12 +219,11 @@ class RunTask(EngineCommand): action_db.action_class, action_db.attributes or {}): action_input.update(a_m.get_action_context(self.task_db)) - for_each_spec = self.task_spec.get_for_each() + with_items_spec = self.task_spec.get_with_items() + + if with_items_spec: + action_input_collection = with_items.calc_input(action_input) - if for_each_spec: - action_input_collection = for_each.calc_for_each_input( - action_input - ) for a_input in action_input_collection: rpc.get_executor_client().run_action( self.task_db.id, diff --git a/mistral/tests/unit/engine1/test_for_each.py b/mistral/tests/unit/engine1/test_for_each.py index 1334618b8..67dfcb422 100644 --- a/mistral/tests/unit/engine1/test_for_each.py +++ b/mistral/tests/unit/engine1/test_for_each.py @@ -36,7 +36,7 @@ version: "2.0" name: wb1 workflows: - for_each: + with_items: type: direct input: @@ -44,7 +44,7 @@ workflows: tasks: task1: - for-each: + with-items: name_info: $.names_info action: std.echo output={$.name_info.name} publish: @@ -59,7 +59,7 @@ version: "2.0" name: wb1 workflows: - for_each: + with_items: type: direct input: @@ -68,7 +68,7 @@ workflows: tasks: task1: - for-each: + with-items: name_info: $.names_info action: std.echo output="{$.greeting}, {$.name_info.name}!" publish: @@ -86,11 +86,11 @@ WORKFLOW_INPUT = { class ForEachEngineTest(base.EngineTestCase): - def test_for_each_simple(self): + def test_with_items_simple(self): wb_service.create_workbook_v2(WORKBOOK) # Start workflow. - exec_db = self.engine.start_workflow('wb1.for_each', WORKFLOW_INPUT) + exec_db = self.engine.start_workflow('wb1.with_items', WORKFLOW_INPUT) self._await( lambda: self.is_execution_success(exec_db.id), @@ -114,13 +114,13 @@ class ForEachEngineTest(base.EngineTestCase): self.assertEqual(1, len(tasks)) self.assertEqual(states.SUCCESS, task1.state) - def test_for_each_static_var(self): + def test_with_items_static_var(self): wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR) wf_input = copy.copy(WORKFLOW_INPUT) wf_input.update({'greeting': 'Hello'}) # Start workflow. - exec_db = self.engine.start_workflow('wb1.for_each', wf_input) + exec_db = self.engine.start_workflow('wb1.with_items', wf_input) self._await( lambda: self.is_execution_success(exec_db.id), diff --git a/mistral/tests/unit/workbook/v2/test_dsl_specs_v2.py b/mistral/tests/unit/workbook/v2/test_dsl_specs_v2.py index 183b1fff2..e1b72cb04 100644 --- a/mistral/tests/unit/workbook/v2/test_dsl_specs_v2.py +++ b/mistral/tests/unit/workbook/v2/test_dsl_specs_v2.py @@ -103,7 +103,7 @@ workflows: action: std.echo output="Task 6 echo" task7: - for-each: + with-items: vm_info: $.vms workflow: wf2 is_true=true object_list=[1, null, "str"] on-complete: @@ -149,7 +149,7 @@ workflows: tasks: task1: action: std.echo output="Hey!" - for-each: + with-items: vms: 3 """ @@ -312,7 +312,7 @@ class DSLv2ModelTest(base.BaseTest): self.assertEqual( {'vm_info': '$.vms'}, - task7_spec.get_for_each() + task7_spec.get_with_items() ) task8_spec = wf2_spec.get_tasks().get('task8') @@ -361,13 +361,13 @@ class DSLv2ModelTest(base.BaseTest): self.assertEqual({'output': 'Echo output'}, action_spec.get_base_input()) - def test_invalid_for_each(self): + def test_invalid_with_items_spec(self): exc = self.assertRaises( exceptions.InvalidModelException, spec_parser.get_workbook_spec_from_yaml, INVALID_WB ) - self.assertIn("for-each", str(exc)) + self.assertIn("with-items", str(exc)) def test_to_dict(self): wb_spec = spec_parser.get_workbook_spec_from_yaml(VALID_WB) diff --git a/mistral/tests/unit/workflow/test_for_each.py b/mistral/tests/unit/workflow/test_with_items.py similarity index 85% rename from mistral/tests/unit/workflow/test_for_each.py rename to mistral/tests/unit/workflow/test_with_items.py index c31b0c665..bb4d31e1b 100644 --- a/mistral/tests/unit/workflow/test_for_each.py +++ b/mistral/tests/unit/workflow/test_with_items.py @@ -17,16 +17,16 @@ from mistral.db.v2.sqlalchemy import models from mistral.tests import base from mistral.workbook.v2 import tasks -from mistral.workflow import for_each from mistral.workflow import utils +from mistral.workflow import with_items TASK_DICT = { "name": "task1", "version": "2.0", "action": "std.echo", - "for-each": { - "array_for_each": "$.array" + "with-items": { + "item": "$.array" }, "input": { "array": "$.my_array" @@ -39,7 +39,7 @@ TASK_DB = models.Task( ) -class ForEachCalculationsTest(base.BaseTest): +class WithItemsCalculationsTest(base.BaseTest): def test_calculate_output_with_key(self): task_dict = TASK_DICT.copy() task_dict['publish'] = {"result": "$"} @@ -47,7 +47,7 @@ class ForEachCalculationsTest(base.BaseTest): task_spec = tasks.TaskSpec(task_dict) raw_result = utils.TaskResult(data="output!") - output = for_each.get_for_each_output(TASK_DB, task_spec, raw_result) + output = with_items.get_output(TASK_DB, task_spec, raw_result) self.assertDictEqual( { @@ -62,7 +62,7 @@ class ForEachCalculationsTest(base.BaseTest): def test_calculate_output_without_key(self): raw_result = utils.TaskResult(data="output!") - output = for_each.get_for_each_output(TASK_DB, TASK_SPEC, raw_result) + output = with_items.get_output(TASK_DB, TASK_SPEC, raw_result) self.assertDictEqual( { @@ -81,7 +81,7 @@ class ForEachCalculationsTest(base.BaseTest): {'name': 'Mistral'} ] } - action_input_collection = for_each.calc_for_each_input(a_input) + action_input_collection = with_items.calc_input(a_input) self.assertListEqual( [ diff --git a/mistral/workbook/v2/tasks.py b/mistral/workbook/v2/tasks.py index 7b96fb5c8..b68fdc8b9 100644 --- a/mistral/workbook/v2/tasks.py +++ b/mistral/workbook/v2/tasks.py @@ -32,7 +32,7 @@ class TaskSpec(base.BaseSpec): "action": {"type": ["string", "null"]}, "workflow": {"type": ["string", "null"]}, "input": {"type": ["object", "null"]}, - "for-each": {"type": ["object", "null"]}, + "with-items": {"type": ["object", "null"]}, "publish": {"type": ["object", "null"]}, "policies": {"type": ["object", "null"]}, "target": {"type": ["string", "null"]}, @@ -56,7 +56,7 @@ class TaskSpec(base.BaseSpec): self._action = data.get('action') self._workflow = data.get('workflow') self._input = data.get('input', {}) - self._for_each = data.get('for-each', {}) + self._with_items = data.get('with-items', {}) self._publish = data.get('publish', {}) self._policies = self._spec_property( 'policies', @@ -89,12 +89,12 @@ class TaskSpec(base.BaseSpec): " specified both: %s" % self._data) raise exc.InvalidModelException(msg) - for_each = self._data.get('for-each') + with_items = self._data.get('with-items') - if for_each: - for _, v in for_each.iteritems(): + if with_items: + for _, v in with_items.iteritems(): if not isinstance(v, (list, six.string_types)): - msg = ("Items of task property 'for-each' can only be " + msg = ("Items of task property 'with-items' can only be " "a list or an expression string: %s" % self._data) raise exc.InvalidModelException(msg) @@ -129,8 +129,8 @@ class TaskSpec(base.BaseSpec): def get_input(self): return self._input - def get_for_each(self): - return self._for_each + def get_with_items(self): + return self._with_items def get_policies(self): return self._policies diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index f443d6c75..acf846c61 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -22,9 +22,9 @@ from mistral.openstack.common import log as logging from mistral import utils from mistral.workbook import parser as spec_parser from mistral.workflow import data_flow -from mistral.workflow import for_each from mistral.workflow import states from mistral.workflow import utils as wf_utils +from mistral.workflow import with_items LOG = logging.getLogger(__name__) WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) @@ -130,10 +130,10 @@ class WorkflowHandler(object): @staticmethod def _determine_task_output(task_spec, task_db, raw_result): - for_each_spec = task_spec.get_for_each() + with_items_spec = task_spec.get_with_items() - if for_each_spec: - return for_each.get_for_each_output( + if with_items_spec: + return with_items.get_output( task_db, task_spec, raw_result ) else: @@ -143,11 +143,11 @@ class WorkflowHandler(object): def _determine_task_state(task_db, task_spec, raw_result): state = states.ERROR if raw_result.is_error() else states.SUCCESS - for_each_spec = task_spec.get_for_each() + with_items_spec = task_spec.get_with_items() - if for_each_spec: + if with_items_spec: # Check if all iterations are completed. - if for_each.is_iteration_incomplete(task_db, task_spec): + if with_items.is_iteration_incomplete(task_db, task_spec): state = states.RUNNING return state diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 0c0180c82..3acb14983 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -65,12 +65,12 @@ def prepare_db_task(task_db, task_spec, upstream_task_specs, exec_db, def evaluate_task_input(task_spec, context): - for_each = task_spec.get_for_each() + with_items = task_spec.get_with_items() - # Do not evaluate input in case of for-each task. - # Instead of it, input is considered as data defined in for-each. - if for_each: - return expr.evaluate_recursively(for_each, context or {}) + # Do not evaluate input in case of with-items task. + # Instead of it, input is considered as data defined in with-items. + if with_items: + return expr.evaluate_recursively(with_items, context or {}) else: return expr.evaluate_recursively(task_spec.get_input(), context) diff --git a/mistral/workflow/for_each.py b/mistral/workflow/with_items.py similarity index 88% rename from mistral/workflow/for_each.py rename to mistral/workflow/with_items.py index 2769f1896..b6d5380bb 100644 --- a/mistral/workflow/for_each.py +++ b/mistral/workflow/with_items.py @@ -17,8 +17,8 @@ import copy from mistral import expressions as expr -def get_for_each_output(task_db, task_spec, raw_result): - """Returns output from task markered as for-each +def get_output(task_db, task_spec, raw_result): + """Returns output from task markered as with-items Examples of output: 1. Without publish clause: @@ -52,7 +52,7 @@ def get_for_each_output(task_db, task_spec, raw_result): t_name = task_db.name e_data = raw_result.error - # Calc output for for-each (only list form is used). + # Calc output for with-items (only list form is used). output = expr.evaluate_recursively( task_spec.get_publish(), raw_result.data or {} @@ -88,8 +88,8 @@ def get_for_each_output(task_db, task_spec, raw_result): return task_output -def calc_for_each_input(action_input): - # In case of for-each iterate over action_input and send +def calc_input(action_input): + # In case of with-items iterate over action_input and send # each part of data to executor. # Calculate action input collection for separating input. action_input_collection = [] @@ -112,8 +112,8 @@ def _get_output_key(task_spec): def is_iteration_incomplete(task_db, task_spec): - for_each_spec = task_spec.get_for_each() - main_key = for_each_spec.keys()[0] + with_items_spec = task_spec.get_with_items() + main_key = with_items_spec.keys()[0] iterations_count = len(task_db.input[main_key]) output_key = _get_output_key(task_spec)