Fixing 'with-items' functionality

After refactoring 'with-items' was broken

Implements blueprint mistral-with-items

Change-Id: I8d2ef1b9cd6ff8c06ee9e837c72fafe38ae6c002
This commit is contained in:
Nikolay Mahotkin 2015-03-25 17:10:22 +03:00
parent e84f091e1e
commit ea1187825c
4 changed files with 105 additions and 310 deletions

View File

@ -57,6 +57,11 @@ def _run_existent_task(task_ex, task_spec, wf_spec):
input_dicts = _get_input_dictionaries( input_dicts = _get_input_dictionaries(
wf_spec, task_ex, task_spec, task_ex.in_context wf_spec, task_ex, task_spec, task_ex.in_context
) )
# TODO(rakhmerov): May be it shouldn't be here. Need to think.
if task_spec.get_with_items():
with_items.prepare_runtime_context(task_ex, task_spec, input_dicts)
for input_d in input_dicts: for input_d in input_dicts:
_run_action_or_workflow(task_ex, task_spec, input_d) _run_action_or_workflow(task_ex, task_spec, input_d)
@ -123,8 +128,9 @@ def on_action_complete(action_ex, result):
if not task_spec.get_with_items(): if not task_spec.get_with_items():
_complete_task(task_ex, task_spec, states.SUCCESS) _complete_task(task_ex, task_spec, states.SUCCESS)
else: else:
# TODO(rakhmerov): Implement 'with-items' logic. if with_items.iterations_completed(task_ex):
pass _complete_task(task_ex, task_spec, states.SUCCESS)
else: else:
_complete_task(task_ex, task_spec, states.ERROR) _complete_task(task_ex, task_spec, states.ERROR)
@ -148,10 +154,6 @@ def _create_task_execution(wf_ex, task_spec, ctx):
# state within the current session. # state within the current session.
wf_ex.task_executions.append(task_ex) wf_ex.task_executions.append(task_ex)
# TODO(rakhmerov): May be it shouldn't be here. Need to think.
if task_spec.get_with_items():
with_items.prepare_runtime_context(task_ex, task_spec)
return task_ex return task_ex
@ -194,22 +196,82 @@ def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx):
""" """
if not task_spec.get_with_items(): if not task_spec.get_with_items():
input_dict = _get_workflow_or_action_input(
wf_spec,
task_ex,
task_spec,
ctx
)
return [input_dict]
else:
return get_with_items_input(wf_spec, task_ex, task_spec, ctx)
def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx):
if task_spec.get_action_name(): if task_spec.get_action_name():
input_dict = get_action_input( return get_action_input(
wf_spec, wf_spec,
task_ex, task_ex,
task_spec, task_spec,
ctx ctx
) )
elif task_spec.get_workflow_name(): elif task_spec.get_workflow_name():
input_dict = get_workflow_input(task_spec, ctx) return get_workflow_input(task_spec, ctx)
else: else:
raise RuntimeError('Must never happen.') raise RuntimeError('Must never happen.')
return [input_dict]
def get_with_items_input(wf_spec, task_ex, task_spec, ctx):
"""Calculate input array for separating each action input.
Example:
DSL:
with_items:
- itemX in <% $.arrayI %>
- itemY in <% $.arrayJ %>
Assume arrayI = [1, 2], arrayJ = ['a', 'b'].
with_items_input = {
"itemX": [1, 2],
"itemY": ['a', 'b']
}
Then we get separated input:
inputs_per_item = [
{'itemX': 1, 'itemY': 'a'},
{'itemX': 2, 'itemY': 'b'}
]
:return: list containing dicts of each action input.
"""
with_items_inputs = expr.evaluate_recursively(
task_spec.get_with_items(), ctx
)
with_items.validate_input(with_items_inputs)
inputs_per_item = []
for key, value in with_items_inputs.items():
for index, item in enumerate(value):
iter_context = {key: item}
if index >= len(inputs_per_item):
inputs_per_item.append(iter_context)
else: else:
# TODO(rakhmerov): Implement 'with-items'. inputs_per_item[index].update(iter_context)
return []
action_inputs = []
for item_input in inputs_per_item:
new_ctx = utils.merge_dicts(item_input, ctx)
action_inputs.append(_get_workflow_or_action_input(
wf_spec, task_ex, task_spec, new_ctx
))
return action_inputs
def get_action_input(wf_spec, task_ex, task_spec, ctx): def get_action_input(wf_spec, task_ex, task_spec, ctx):

View File

@ -14,13 +14,13 @@
import copy import copy
from oslo.config import cfg from oslo.config import cfg
import testtools
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import states from mistral.engine import states
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
from mistral.services import workbooks as wb_service from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine1 import base from mistral.tests.unit.engine1 import base
from mistral.workflow import data_flow
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
# TODO(nmakhotkin) Need to write more tests. # TODO(nmakhotkin) Need to write more tests.
@ -48,7 +48,7 @@ workflows:
with-items: name_info in <% $.names_info %> with-items: name_info in <% $.names_info %>
action: std.echo output=<% $.name_info.name %> action: std.echo output=<% $.name_info.name %>
publish: publish:
result: <% $.task1 %> result: <% $.task1[0] %>
""" """
@ -114,7 +114,7 @@ workflows:
tasks: tasks:
task1: task1:
with-items: link in <% $.links %> with-items: link in <% $.links %>
action: std.mistral_http url=<% $.link %> action: std.http url=<% $.link %>
publish: publish:
result: <% $.task1 %> result: <% $.task1 %>
""" """
@ -139,7 +139,6 @@ WF_INPUT_URLS = {
class WithItemsEngineTest(base.EngineTestCase): class WithItemsEngineTest(base.EngineTestCase):
@testtools.skip("Fix 'with-items'.")
def test_with_items_simple(self): def test_with_items_simple(self):
wb_service.create_workbook_v2(WORKBOOK) wb_service.create_workbook_v2(WORKBOOK)
@ -158,11 +157,10 @@ class WithItemsEngineTest(base.EngineTestCase):
with_items_context = task1.runtime_context['with_items'] with_items_context = task1.runtime_context['with_items']
self.assertEqual(3, with_items_context['count']) self.assertEqual(3, with_items_context['count'])
self.assertEqual(3, with_items_context['index'])
# Since we know that we can receive results in random order, # Since we know that we can receive results in random order,
# check is not depend on order of items. # check is not depend on order of items.
result = task1.result['result'] result = data_flow.get_task_execution_result(task1)
self.assertTrue(isinstance(result, list)) self.assertTrue(isinstance(result, list))
@ -170,10 +168,13 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertIn('Ivan', result) self.assertIn('Ivan', result)
self.assertIn('Mistral', result) self.assertIn('Mistral', result)
published = task1.published
self.assertIn(published['result'], ['John', 'Ivan', 'Mistral'])
self.assertEqual(1, len(tasks)) self.assertEqual(1, len(tasks))
self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task1.state)
@testtools.skip("Fix 'with-items'.")
def test_with_items_static_var(self): def test_with_items_static_var(self):
wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR) wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR)
@ -191,7 +192,7 @@ class WithItemsEngineTest(base.EngineTestCase):
tasks = wf_ex.task_executions tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1') task1 = self._assert_single_item(tasks, name='task1')
result = task1.result['result'] result = data_flow.get_task_execution_result(task1)
self.assertTrue(isinstance(result, list)) self.assertTrue(isinstance(result, list))
@ -202,7 +203,6 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertEqual(1, len(tasks)) self.assertEqual(1, len(tasks))
self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task1.state)
@testtools.skip("Fix 'with-items'.")
def test_with_items_multi_array(self): def test_with_items_multi_array(self):
wb_service.create_workbook_v2(WORKBOOK_MULTI_ARRAY) wb_service.create_workbook_v2(WORKBOOK_MULTI_ARRAY)
@ -223,7 +223,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Since we know that we can receive results in random order, # Since we know that we can receive results in random order,
# check is not depend on order of items. # check is not depend on order of items.
result = task1.result['result'] result = data_flow.get_task_execution_result(task1)
self.assertTrue(isinstance(result, list)) self.assertTrue(isinstance(result, list))
@ -234,7 +234,6 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertEqual(1, len(tasks)) self.assertEqual(1, len(tasks))
self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task1.state)
@testtools.skip("Fix 'with-items'.")
def test_with_items_action_context(self): def test_with_items_action_context(self):
wb_service.create_workbook_v2(WORKBOOK_ACTION_CONTEXT) wb_service.create_workbook_v2(WORKBOOK_ACTION_CONTEXT)
@ -246,9 +245,12 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0] task_ex = wf_ex.task_executions[0]
self.engine.on_task_result(task_ex.id, wf_utils.Result("Ivan")) act_exs = task_ex.executions
self.engine.on_task_result(task_ex.id, wf_utils.Result("John")) self.engine.on_action_complete(act_exs[0].id, wf_utils.Result("Ivan"))
self.engine.on_task_result(task_ex.id, wf_utils.Result("Mistral")) self.engine.on_action_complete(act_exs[1].id, wf_utils.Result("John"))
self.engine.on_action_complete(
act_exs[2].id, wf_utils.Result("Mistral")
)
self._await( self._await(
lambda: self.is_execution_success(wf_ex.id), lambda: self.is_execution_success(wf_ex.id),
@ -258,7 +260,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = db_api.get_task_execution(task_ex.id) task_ex = db_api.get_task_execution(task_ex.id)
result = task_ex.result['result'] result = data_flow.get_task_execution_result(task_ex)
self.assertTrue(isinstance(result, list)) self.assertTrue(isinstance(result, list))

View File

@ -1,152 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.tests import base
from mistral.workbook.v2 import tasks
from mistral.workflow import utils
from mistral.workflow import with_items
TASK_DICT = {
'name': 'task1',
'version': '2.0',
'action': 'std.echo',
'with-items': [
'item in <% $.array %>'
],
'input': {
'array': '<% $.my_array %>'
}
}
TASK_SPEC = tasks.TaskSpec(TASK_DICT)
task_ex = models.TaskExecution(
name='task1'
)
class WithItemsCalculationsTest(base.BaseTest):
@testtools.skip("Fix 'with-items'.")
def test_calculate_output_with_key(self):
task_dict = TASK_DICT.copy()
task_dict['publish'] = {'result': '<% $.task1 %>'}
task_spec = tasks.TaskSpec(task_dict)
output = with_items.get_result(
task_ex,
task_spec,
utils.Result(data='output!')
)
self.assertDictEqual({'result': ['output!']}, output)
@testtools.skip("Fix 'with-items'.")
def test_calculate_output_without_key(self):
output = with_items.get_result(
task_ex,
TASK_SPEC,
utils.Result(data='output!')
)
# TODO(rakhmerov): Fix during result/output refactoring.
self.assertDictEqual({}, output)
def test_calculate_input(self):
with_items_input = {
'name_info': [
{'name': 'John'},
{'name': 'Ivan'},
{'name': 'Mistral'}
]
}
action_input_collection = with_items.calc_input(with_items_input)
self.assertListEqual(
[
{'name_info': {'name': 'John'}},
{'name_info': {'name': 'Ivan'}},
{'name_info': {'name': 'Mistral'}}
],
action_input_collection
)
def test_calculate_input_multiple_array(self):
with_items_input = {
'name_info': [
{'name': 'John'},
{'name': 'Ivan'},
{'name': 'Mistral'}
],
'server_info': [
'server1',
'server2',
'server3'
]
}
action_input_collection = with_items.calc_input(with_items_input)
self.assertListEqual(
[
{'name_info': {'name': 'John'}, 'server_info': 'server1'},
{'name_info': {'name': 'Ivan'}, 'server_info': 'server2'},
{'name_info': {'name': 'Mistral'}, 'server_info': 'server3'},
],
action_input_collection
)
def test_calculate_input_wrong_array_length(self):
with_items_input = {
'name_info': [
{'name': 'John'},
{'name': 'Ivan'},
{'name': 'Mistral'}
],
'server_info': [
'server1',
'server2'
]
}
exception = self.assertRaises(
exc.InputException,
with_items.calc_input,
with_items_input
)
self.assertIn('the same length', exception.message)
def test_calculate_input_not_list(self):
with_items_input = {
'name_info': [
{'name': 'John'},
{'name': 'Ivan'},
{'name': 'Mistral'}
],
'server_info': 'some_string'
}
exception = self.assertRaises(
exc.InputException,
with_items.calc_input,
with_items_input
)
self.assertIn('List type', exception.message)

View File

@ -12,82 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral import expressions as expr from mistral.workflow import states
# TODO(rakhmerov): The module should probably go into task_handler.
def get_result(task_ex, task_spec, result):
"""Returns result from task markered as with-items
Examples of result:
1. Without publish clause:
{
"task": {
"task1": [None]
}
}
Note: In this case we don't create any specific
result to prevent generating large data in DB.
Note: None here used for calculating number of
finished iterations.
2. With publish clause and specific result key:
{
"result": [
"result1",
"result2"
],
"task": {
"task1": {
"result": [
"result1",
"result2"
]
}
}
}
"""
e_data = result.error
expr_ctx = copy.deepcopy(task_ex.in_context) or {}
expr_ctx[task_ex.name] = copy.deepcopy(result.data) or {}
# Calc result for with-items (only list form is used).
result = expr.evaluate_recursively(task_spec.get_publish(), expr_ctx)
if not task_ex.result:
task_ex.result = {}
task_result = copy.copy(task_ex.result)
res_key = _get_result_key(task_spec)
if res_key:
if res_key in task_result:
task_result[res_key].append(
result.get(res_key) or e_data
)
else:
task_result[res_key] = [result.get(res_key) or e_data]
# Add same result to task result under key 'task'.
# TODO(rakhmerov): Fix this during task result refactoring.
# task_result[t_name] =
# {
# res_key: task_result[res_key]
# }
# else:
# if 'task' not in task_result:
# task_result.update({'task': {t_name: [None or e_data]}})
# else:
# task_result['task'][t_name].append(None or e_data)
return task_result
def _get_context(task_ex): def _get_context(task_ex):
@ -130,7 +56,7 @@ def do_step(task_ex):
task_ex.runtime_context.update({'with_items': with_items_context}) task_ex.runtime_context.update({'with_items': with_items_context})
def prepare_runtime_context(task_ex, task_spec): def prepare_runtime_context(task_ex, task_spec, input_dicts):
runtime_context = task_ex.runtime_context runtime_context = task_ex.runtime_context
with_items_spec = task_spec.get_with_items() with_items_spec = task_spec.get_with_items()
@ -139,50 +65,10 @@ def prepare_runtime_context(task_ex, task_spec):
runtime_context['with_items'] = { runtime_context['with_items'] = {
'capacity': get_concurrency_spec(task_spec), 'capacity': get_concurrency_spec(task_spec),
'index': 0, 'index': 0,
'count': len(task_ex.input[with_items_spec.keys()[0]]) 'count': len(input_dicts)
} }
def calc_input(with_items_input):
"""Calculate action input collection for separating each action input.
Example:
DSL:
with_items:
- itemX in <% $.arrayI %>
- itemY in <% $.arrayJ %>
Assume arrayI = [1, 2], arrayJ = ['a', 'b'].
with_items_input = {
"itemX": [1, 2],
"itemY": ['a', 'b']
}
Then we get separated input:
action_input_collection = [
{'itemX': 1, 'itemY': 'a'},
{'itemX': 2, 'itemY': 'b'}
]
:param with_items_input: Dict containing mapped variables to their arrays.
:return: list containing dicts of each action input.
"""
validate_input(with_items_input)
action_input_collection = []
for key, value in with_items_input.items():
for index, item in enumerate(value):
iter_context = {key: item}
if index >= len(action_input_collection):
action_input_collection.append(iter_context)
else:
action_input_collection[index].update(iter_context)
return action_input_collection
def validate_input(with_items_input): def validate_input(with_items_input):
# Take only mapped values and check them. # Take only mapped values and check them.
values = with_items_input.values() values = with_items_input.values()
@ -202,10 +88,7 @@ def validate_input(with_items_input):
) )
def _get_result_key(task_spec): def iterations_completed(task_ex):
return (task_spec.get_publish().keys()[0] completed = all([states.is_completed(ex.state)
if task_spec.get_publish() else None) for ex in task_ex.executions])
return completed
def is_iterations_incomplete(task_ex):
return get_index(task_ex) < get_count(task_ex)