Using 'with-items' instead of 'for-each'

Partially implements blueprint mistral-with-items

Change-Id: I679e1d525ef2764199d7bdeb9c91509a800547ff
This commit is contained in:
Nikolay Mahotkin 2014-12-29 14:05:52 +03:00
parent 7e8e27415c
commit 58da57330a
8 changed files with 52 additions and 53 deletions

View File

@ -28,8 +28,8 @@ from mistral.services import action_manager as a_m
from mistral import utils from mistral import utils
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow from mistral.workflow import data_flow
from mistral.workflow import for_each
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import with_items
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -219,12 +219,11 @@ class RunTask(EngineCommand):
action_db.action_class, action_db.attributes or {}): action_db.action_class, action_db.attributes or {}):
action_input.update(a_m.get_action_context(self.task_db)) action_input.update(a_m.get_action_context(self.task_db))
for_each_spec = self.task_spec.get_for_each() with_items_spec = self.task_spec.get_with_items()
if with_items_spec:
action_input_collection = with_items.calc_input(action_input)
if for_each_spec:
action_input_collection = for_each.calc_for_each_input(
action_input
)
for a_input in action_input_collection: for a_input in action_input_collection:
rpc.get_executor_client().run_action( rpc.get_executor_client().run_action(
self.task_db.id, self.task_db.id,

View File

@ -36,7 +36,7 @@ version: "2.0"
name: wb1 name: wb1
workflows: workflows:
for_each: with_items:
type: direct type: direct
input: input:
@ -44,7 +44,7 @@ workflows:
tasks: tasks:
task1: task1:
for-each: with-items:
name_info: $.names_info name_info: $.names_info
action: std.echo output={$.name_info.name} action: std.echo output={$.name_info.name}
publish: publish:
@ -59,7 +59,7 @@ version: "2.0"
name: wb1 name: wb1
workflows: workflows:
for_each: with_items:
type: direct type: direct
input: input:
@ -68,7 +68,7 @@ workflows:
tasks: tasks:
task1: task1:
for-each: with-items:
name_info: $.names_info name_info: $.names_info
action: std.echo output="{$.greeting}, {$.name_info.name}!" action: std.echo output="{$.greeting}, {$.name_info.name}!"
publish: publish:
@ -86,11 +86,11 @@ WORKFLOW_INPUT = {
class ForEachEngineTest(base.EngineTestCase): class ForEachEngineTest(base.EngineTestCase):
def test_for_each_simple(self): def test_with_items_simple(self):
wb_service.create_workbook_v2(WORKBOOK) wb_service.create_workbook_v2(WORKBOOK)
# Start workflow. # Start workflow.
exec_db = self.engine.start_workflow('wb1.for_each', WORKFLOW_INPUT) exec_db = self.engine.start_workflow('wb1.with_items', WORKFLOW_INPUT)
self._await( self._await(
lambda: self.is_execution_success(exec_db.id), lambda: self.is_execution_success(exec_db.id),
@ -114,13 +114,13 @@ class ForEachEngineTest(base.EngineTestCase):
self.assertEqual(1, len(tasks)) self.assertEqual(1, len(tasks))
self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task1.state)
def test_for_each_static_var(self): def test_with_items_static_var(self):
wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR) wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR)
wf_input = copy.copy(WORKFLOW_INPUT) wf_input = copy.copy(WORKFLOW_INPUT)
wf_input.update({'greeting': 'Hello'}) wf_input.update({'greeting': 'Hello'})
# Start workflow. # Start workflow.
exec_db = self.engine.start_workflow('wb1.for_each', wf_input) exec_db = self.engine.start_workflow('wb1.with_items', wf_input)
self._await( self._await(
lambda: self.is_execution_success(exec_db.id), lambda: self.is_execution_success(exec_db.id),

View File

@ -103,7 +103,7 @@ workflows:
action: std.echo output="Task 6 echo" action: std.echo output="Task 6 echo"
task7: task7:
for-each: with-items:
vm_info: $.vms vm_info: $.vms
workflow: wf2 is_true=true object_list=[1, null, "str"] workflow: wf2 is_true=true object_list=[1, null, "str"]
on-complete: on-complete:
@ -149,7 +149,7 @@ workflows:
tasks: tasks:
task1: task1:
action: std.echo output="Hey!" action: std.echo output="Hey!"
for-each: with-items:
vms: 3 vms: 3
""" """
@ -312,7 +312,7 @@ class DSLv2ModelTest(base.BaseTest):
self.assertEqual( self.assertEqual(
{'vm_info': '$.vms'}, {'vm_info': '$.vms'},
task7_spec.get_for_each() task7_spec.get_with_items()
) )
task8_spec = wf2_spec.get_tasks().get('task8') task8_spec = wf2_spec.get_tasks().get('task8')
@ -361,13 +361,13 @@ class DSLv2ModelTest(base.BaseTest):
self.assertEqual({'output': 'Echo output'}, self.assertEqual({'output': 'Echo output'},
action_spec.get_base_input()) action_spec.get_base_input())
def test_invalid_for_each(self): def test_invalid_with_items_spec(self):
exc = self.assertRaises( exc = self.assertRaises(
exceptions.InvalidModelException, exceptions.InvalidModelException,
spec_parser.get_workbook_spec_from_yaml, spec_parser.get_workbook_spec_from_yaml,
INVALID_WB INVALID_WB
) )
self.assertIn("for-each", str(exc)) self.assertIn("with-items", str(exc))
def test_to_dict(self): def test_to_dict(self):
wb_spec = spec_parser.get_workbook_spec_from_yaml(VALID_WB) wb_spec = spec_parser.get_workbook_spec_from_yaml(VALID_WB)

View File

@ -17,16 +17,16 @@
from mistral.db.v2.sqlalchemy import models from mistral.db.v2.sqlalchemy import models
from mistral.tests import base from mistral.tests import base
from mistral.workbook.v2 import tasks from mistral.workbook.v2 import tasks
from mistral.workflow import for_each
from mistral.workflow import utils from mistral.workflow import utils
from mistral.workflow import with_items
TASK_DICT = { TASK_DICT = {
"name": "task1", "name": "task1",
"version": "2.0", "version": "2.0",
"action": "std.echo", "action": "std.echo",
"for-each": { "with-items": {
"array_for_each": "$.array" "item": "$.array"
}, },
"input": { "input": {
"array": "$.my_array" "array": "$.my_array"
@ -39,7 +39,7 @@ TASK_DB = models.Task(
) )
class ForEachCalculationsTest(base.BaseTest): class WithItemsCalculationsTest(base.BaseTest):
def test_calculate_output_with_key(self): def test_calculate_output_with_key(self):
task_dict = TASK_DICT.copy() task_dict = TASK_DICT.copy()
task_dict['publish'] = {"result": "$"} task_dict['publish'] = {"result": "$"}
@ -47,7 +47,7 @@ class ForEachCalculationsTest(base.BaseTest):
task_spec = tasks.TaskSpec(task_dict) task_spec = tasks.TaskSpec(task_dict)
raw_result = utils.TaskResult(data="output!") raw_result = utils.TaskResult(data="output!")
output = for_each.get_for_each_output(TASK_DB, task_spec, raw_result) output = with_items.get_output(TASK_DB, task_spec, raw_result)
self.assertDictEqual( self.assertDictEqual(
{ {
@ -62,7 +62,7 @@ class ForEachCalculationsTest(base.BaseTest):
def test_calculate_output_without_key(self): def test_calculate_output_without_key(self):
raw_result = utils.TaskResult(data="output!") raw_result = utils.TaskResult(data="output!")
output = for_each.get_for_each_output(TASK_DB, TASK_SPEC, raw_result) output = with_items.get_output(TASK_DB, TASK_SPEC, raw_result)
self.assertDictEqual( self.assertDictEqual(
{ {
@ -81,7 +81,7 @@ class ForEachCalculationsTest(base.BaseTest):
{'name': 'Mistral'} {'name': 'Mistral'}
] ]
} }
action_input_collection = for_each.calc_for_each_input(a_input) action_input_collection = with_items.calc_input(a_input)
self.assertListEqual( self.assertListEqual(
[ [

View File

@ -32,7 +32,7 @@ class TaskSpec(base.BaseSpec):
"action": {"type": ["string", "null"]}, "action": {"type": ["string", "null"]},
"workflow": {"type": ["string", "null"]}, "workflow": {"type": ["string", "null"]},
"input": {"type": ["object", "null"]}, "input": {"type": ["object", "null"]},
"for-each": {"type": ["object", "null"]}, "with-items": {"type": ["object", "null"]},
"publish": {"type": ["object", "null"]}, "publish": {"type": ["object", "null"]},
"policies": {"type": ["object", "null"]}, "policies": {"type": ["object", "null"]},
"target": {"type": ["string", "null"]}, "target": {"type": ["string", "null"]},
@ -56,7 +56,7 @@ class TaskSpec(base.BaseSpec):
self._action = data.get('action') self._action = data.get('action')
self._workflow = data.get('workflow') self._workflow = data.get('workflow')
self._input = data.get('input', {}) self._input = data.get('input', {})
self._for_each = data.get('for-each', {}) self._with_items = data.get('with-items', {})
self._publish = data.get('publish', {}) self._publish = data.get('publish', {})
self._policies = self._spec_property( self._policies = self._spec_property(
'policies', 'policies',
@ -89,12 +89,12 @@ class TaskSpec(base.BaseSpec):
" specified both: %s" % self._data) " specified both: %s" % self._data)
raise exc.InvalidModelException(msg) raise exc.InvalidModelException(msg)
for_each = self._data.get('for-each') with_items = self._data.get('with-items')
if for_each: if with_items:
for _, v in for_each.iteritems(): for _, v in with_items.iteritems():
if not isinstance(v, (list, six.string_types)): if not isinstance(v, (list, six.string_types)):
msg = ("Items of task property 'for-each' can only be " msg = ("Items of task property 'with-items' can only be "
"a list or an expression string: %s" % self._data) "a list or an expression string: %s" % self._data)
raise exc.InvalidModelException(msg) raise exc.InvalidModelException(msg)
@ -129,8 +129,8 @@ class TaskSpec(base.BaseSpec):
def get_input(self): def get_input(self):
return self._input return self._input
def get_for_each(self): def get_with_items(self):
return self._for_each return self._with_items
def get_policies(self): def get_policies(self):
return self._policies return self._policies

View File

@ -22,9 +22,9 @@ from mistral.openstack.common import log as logging
from mistral import utils from mistral import utils
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow from mistral.workflow import data_flow
from mistral.workflow import for_each
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
from mistral.workflow import with_items
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
@ -130,10 +130,10 @@ class WorkflowHandler(object):
@staticmethod @staticmethod
def _determine_task_output(task_spec, task_db, raw_result): def _determine_task_output(task_spec, task_db, raw_result):
for_each_spec = task_spec.get_for_each() with_items_spec = task_spec.get_with_items()
if for_each_spec: if with_items_spec:
return for_each.get_for_each_output( return with_items.get_output(
task_db, task_spec, raw_result task_db, task_spec, raw_result
) )
else: else:
@ -143,11 +143,11 @@ class WorkflowHandler(object):
def _determine_task_state(task_db, task_spec, raw_result): def _determine_task_state(task_db, task_spec, raw_result):
state = states.ERROR if raw_result.is_error() else states.SUCCESS state = states.ERROR if raw_result.is_error() else states.SUCCESS
for_each_spec = task_spec.get_for_each() with_items_spec = task_spec.get_with_items()
if for_each_spec: if with_items_spec:
# Check if all iterations are completed. # Check if all iterations are completed.
if for_each.is_iteration_incomplete(task_db, task_spec): if with_items.is_iteration_incomplete(task_db, task_spec):
state = states.RUNNING state = states.RUNNING
return state return state

View File

@ -65,12 +65,12 @@ def prepare_db_task(task_db, task_spec, upstream_task_specs, exec_db,
def evaluate_task_input(task_spec, context): def evaluate_task_input(task_spec, context):
for_each = task_spec.get_for_each() with_items = task_spec.get_with_items()
# Do not evaluate input in case of for-each task. # Do not evaluate input in case of with-items task.
# Instead of it, input is considered as data defined in for-each. # Instead of it, input is considered as data defined in with-items.
if for_each: if with_items:
return expr.evaluate_recursively(for_each, context or {}) return expr.evaluate_recursively(with_items, context or {})
else: else:
return expr.evaluate_recursively(task_spec.get_input(), context) return expr.evaluate_recursively(task_spec.get_input(), context)

View File

@ -17,8 +17,8 @@ import copy
from mistral import expressions as expr from mistral import expressions as expr
def get_for_each_output(task_db, task_spec, raw_result): def get_output(task_db, task_spec, raw_result):
"""Returns output from task markered as for-each """Returns output from task markered as with-items
Examples of output: Examples of output:
1. Without publish clause: 1. Without publish clause:
@ -52,7 +52,7 @@ def get_for_each_output(task_db, task_spec, raw_result):
t_name = task_db.name t_name = task_db.name
e_data = raw_result.error e_data = raw_result.error
# Calc output for for-each (only list form is used). # Calc output for with-items (only list form is used).
output = expr.evaluate_recursively( output = expr.evaluate_recursively(
task_spec.get_publish(), task_spec.get_publish(),
raw_result.data or {} raw_result.data or {}
@ -88,8 +88,8 @@ def get_for_each_output(task_db, task_spec, raw_result):
return task_output return task_output
def calc_for_each_input(action_input): def calc_input(action_input):
# In case of for-each iterate over action_input and send # In case of with-items iterate over action_input and send
# each part of data to executor. # each part of data to executor.
# Calculate action input collection for separating input. # Calculate action input collection for separating input.
action_input_collection = [] action_input_collection = []
@ -112,8 +112,8 @@ def _get_output_key(task_spec):
def is_iteration_incomplete(task_db, task_spec): def is_iteration_incomplete(task_db, task_spec):
for_each_spec = task_spec.get_for_each() with_items_spec = task_spec.get_with_items()
main_key = for_each_spec.keys()[0] main_key = with_items_spec.keys()[0]
iterations_count = len(task_db.input[main_key]) iterations_count = len(task_db.input[main_key])
output_key = _get_output_key(task_spec) output_key = _get_output_key(task_spec)