From ea1187825c49d13e53e632ecaebab1a7eaeb8206 Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Wed, 25 Mar 2015 17:10:22 +0300 Subject: [PATCH] Fixing 'with-items' functionality After refactoring 'with-items' was broken Implements blueprint mistral-with-items Change-Id: I8d2ef1b9cd6ff8c06ee9e837c72fafe38ae6c002 --- mistral/engine1/task_handler.py | 100 +++++++++--- mistral/tests/unit/engine1/test_with_items.py | 32 ++-- .../tests/unit/workflow/test_with_items.py | 152 ------------------ mistral/workflow/with_items.py | 131 +-------------- 4 files changed, 105 insertions(+), 310 deletions(-) delete mode 100644 mistral/tests/unit/workflow/test_with_items.py diff --git a/mistral/engine1/task_handler.py b/mistral/engine1/task_handler.py index e1ba937a..50ee903c 100644 --- a/mistral/engine1/task_handler.py +++ b/mistral/engine1/task_handler.py @@ -57,6 +57,11 @@ def _run_existent_task(task_ex, task_spec, wf_spec): input_dicts = _get_input_dictionaries( wf_spec, task_ex, task_spec, task_ex.in_context ) + + # TODO(rakhmerov): May be it shouldn't be here. Need to think. + if task_spec.get_with_items(): + with_items.prepare_runtime_context(task_ex, task_spec, input_dicts) + for input_d in input_dicts: _run_action_or_workflow(task_ex, task_spec, input_d) @@ -123,8 +128,9 @@ def on_action_complete(action_ex, result): if not task_spec.get_with_items(): _complete_task(task_ex, task_spec, states.SUCCESS) else: - # TODO(rakhmerov): Implement 'with-items' logic. - pass + if with_items.iterations_completed(task_ex): + _complete_task(task_ex, task_spec, states.SUCCESS) + else: _complete_task(task_ex, task_spec, states.ERROR) @@ -148,10 +154,6 @@ def _create_task_execution(wf_ex, task_spec, ctx): # state within the current session. wf_ex.task_executions.append(task_ex) - # TODO(rakhmerov): May be it shouldn't be here. Need to think. - if task_spec.get_with_items(): - with_items.prepare_runtime_context(task_ex, task_spec) - return task_ex @@ -194,22 +196,82 @@ def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx): """ if not task_spec.get_with_items(): - if task_spec.get_action_name(): - input_dict = get_action_input( - wf_spec, - task_ex, - task_spec, - ctx - ) - elif task_spec.get_workflow_name(): - input_dict = get_workflow_input(task_spec, ctx) - else: - raise RuntimeError('Must never happen.') + input_dict = _get_workflow_or_action_input( + wf_spec, + task_ex, + task_spec, + ctx + ) return [input_dict] else: - # TODO(rakhmerov): Implement 'with-items'. - return [] + return get_with_items_input(wf_spec, task_ex, task_spec, ctx) + + +def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx): + if task_spec.get_action_name(): + return get_action_input( + wf_spec, + task_ex, + task_spec, + ctx + ) + elif task_spec.get_workflow_name(): + return get_workflow_input(task_spec, ctx) + else: + raise RuntimeError('Must never happen.') + + +def get_with_items_input(wf_spec, task_ex, task_spec, ctx): + """Calculate input array for separating each action input. + + Example: + DSL: + with_items: + - itemX in <% $.arrayI %> + - itemY in <% $.arrayJ %> + + Assume arrayI = [1, 2], arrayJ = ['a', 'b']. + with_items_input = { + "itemX": [1, 2], + "itemY": ['a', 'b'] + } + + Then we get separated input: + inputs_per_item = [ + {'itemX': 1, 'itemY': 'a'}, + {'itemX': 2, 'itemY': 'b'} + ] + + :return: list containing dicts of each action input. + """ + with_items_inputs = expr.evaluate_recursively( + task_spec.get_with_items(), ctx + ) + + with_items.validate_input(with_items_inputs) + + inputs_per_item = [] + + for key, value in with_items_inputs.items(): + for index, item in enumerate(value): + iter_context = {key: item} + + if index >= len(inputs_per_item): + inputs_per_item.append(iter_context) + else: + inputs_per_item[index].update(iter_context) + + action_inputs = [] + + for item_input in inputs_per_item: + new_ctx = utils.merge_dicts(item_input, ctx) + + action_inputs.append(_get_workflow_or_action_input( + wf_spec, task_ex, task_spec, new_ctx + )) + + return action_inputs def get_action_input(wf_spec, task_ex, task_spec, ctx): diff --git a/mistral/tests/unit/engine1/test_with_items.py b/mistral/tests/unit/engine1/test_with_items.py index 0ec75730..db88c8a1 100644 --- a/mistral/tests/unit/engine1/test_with_items.py +++ b/mistral/tests/unit/engine1/test_with_items.py @@ -14,13 +14,13 @@ import copy from oslo.config import cfg -import testtools from mistral.db.v2 import api as db_api from mistral.engine import states from mistral.openstack.common import log as logging from mistral.services import workbooks as wb_service from mistral.tests.unit.engine1 import base +from mistral.workflow import data_flow from mistral.workflow import utils as wf_utils # TODO(nmakhotkin) Need to write more tests. @@ -48,7 +48,7 @@ workflows: with-items: name_info in <% $.names_info %> action: std.echo output=<% $.name_info.name %> publish: - result: <% $.task1 %> + result: <% $.task1[0] %> """ @@ -114,7 +114,7 @@ workflows: tasks: task1: with-items: link in <% $.links %> - action: std.mistral_http url=<% $.link %> + action: std.http url=<% $.link %> publish: result: <% $.task1 %> """ @@ -139,7 +139,6 @@ WF_INPUT_URLS = { class WithItemsEngineTest(base.EngineTestCase): - @testtools.skip("Fix 'with-items'.") def test_with_items_simple(self): wb_service.create_workbook_v2(WORKBOOK) @@ -158,11 +157,10 @@ class WithItemsEngineTest(base.EngineTestCase): with_items_context = task1.runtime_context['with_items'] self.assertEqual(3, with_items_context['count']) - self.assertEqual(3, with_items_context['index']) # Since we know that we can receive results in random order, # check is not depend on order of items. - result = task1.result['result'] + result = data_flow.get_task_execution_result(task1) self.assertTrue(isinstance(result, list)) @@ -170,10 +168,13 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn('Ivan', result) self.assertIn('Mistral', result) + published = task1.published + + self.assertIn(published['result'], ['John', 'Ivan', 'Mistral']) + self.assertEqual(1, len(tasks)) self.assertEqual(states.SUCCESS, task1.state) - @testtools.skip("Fix 'with-items'.") def test_with_items_static_var(self): wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR) @@ -191,7 +192,7 @@ class WithItemsEngineTest(base.EngineTestCase): tasks = wf_ex.task_executions task1 = self._assert_single_item(tasks, name='task1') - result = task1.result['result'] + result = data_flow.get_task_execution_result(task1) self.assertTrue(isinstance(result, list)) @@ -202,7 +203,6 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertEqual(1, len(tasks)) self.assertEqual(states.SUCCESS, task1.state) - @testtools.skip("Fix 'with-items'.") def test_with_items_multi_array(self): wb_service.create_workbook_v2(WORKBOOK_MULTI_ARRAY) @@ -223,7 +223,7 @@ class WithItemsEngineTest(base.EngineTestCase): # Since we know that we can receive results in random order, # check is not depend on order of items. - result = task1.result['result'] + result = data_flow.get_task_execution_result(task1) self.assertTrue(isinstance(result, list)) @@ -234,7 +234,6 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertEqual(1, len(tasks)) self.assertEqual(states.SUCCESS, task1.state) - @testtools.skip("Fix 'with-items'.") def test_with_items_action_context(self): wb_service.create_workbook_v2(WORKBOOK_ACTION_CONTEXT) @@ -246,9 +245,12 @@ class WithItemsEngineTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) task_ex = wf_ex.task_executions[0] - self.engine.on_task_result(task_ex.id, wf_utils.Result("Ivan")) - self.engine.on_task_result(task_ex.id, wf_utils.Result("John")) - self.engine.on_task_result(task_ex.id, wf_utils.Result("Mistral")) + act_exs = task_ex.executions + self.engine.on_action_complete(act_exs[0].id, wf_utils.Result("Ivan")) + self.engine.on_action_complete(act_exs[1].id, wf_utils.Result("John")) + self.engine.on_action_complete( + act_exs[2].id, wf_utils.Result("Mistral") + ) self._await( lambda: self.is_execution_success(wf_ex.id), @@ -258,7 +260,7 @@ class WithItemsEngineTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) task_ex = db_api.get_task_execution(task_ex.id) - result = task_ex.result['result'] + result = data_flow.get_task_execution_result(task_ex) self.assertTrue(isinstance(result, list)) diff --git a/mistral/tests/unit/workflow/test_with_items.py b/mistral/tests/unit/workflow/test_with_items.py deleted file mode 100644 index 6679e6a3..00000000 --- a/mistral/tests/unit/workflow/test_with_items.py +++ /dev/null @@ -1,152 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2014 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import testtools - -from mistral.db.v2.sqlalchemy import models -from mistral import exceptions as exc -from mistral.tests import base -from mistral.workbook.v2 import tasks -from mistral.workflow import utils -from mistral.workflow import with_items - - -TASK_DICT = { - 'name': 'task1', - 'version': '2.0', - 'action': 'std.echo', - 'with-items': [ - 'item in <% $.array %>' - ], - 'input': { - 'array': '<% $.my_array %>' - } -} - -TASK_SPEC = tasks.TaskSpec(TASK_DICT) - -task_ex = models.TaskExecution( - name='task1' -) - - -class WithItemsCalculationsTest(base.BaseTest): - @testtools.skip("Fix 'with-items'.") - def test_calculate_output_with_key(self): - task_dict = TASK_DICT.copy() - task_dict['publish'] = {'result': '<% $.task1 %>'} - - task_spec = tasks.TaskSpec(task_dict) - - output = with_items.get_result( - task_ex, - task_spec, - utils.Result(data='output!') - ) - - self.assertDictEqual({'result': ['output!']}, output) - - @testtools.skip("Fix 'with-items'.") - def test_calculate_output_without_key(self): - output = with_items.get_result( - task_ex, - TASK_SPEC, - utils.Result(data='output!') - ) - - # TODO(rakhmerov): Fix during result/output refactoring. - self.assertDictEqual({}, output) - - def test_calculate_input(self): - with_items_input = { - 'name_info': [ - {'name': 'John'}, - {'name': 'Ivan'}, - {'name': 'Mistral'} - ] - } - action_input_collection = with_items.calc_input(with_items_input) - - self.assertListEqual( - [ - {'name_info': {'name': 'John'}}, - {'name_info': {'name': 'Ivan'}}, - {'name_info': {'name': 'Mistral'}} - ], - action_input_collection - ) - - def test_calculate_input_multiple_array(self): - with_items_input = { - 'name_info': [ - {'name': 'John'}, - {'name': 'Ivan'}, - {'name': 'Mistral'} - ], - 'server_info': [ - 'server1', - 'server2', - 'server3' - ] - } - - action_input_collection = with_items.calc_input(with_items_input) - - self.assertListEqual( - [ - {'name_info': {'name': 'John'}, 'server_info': 'server1'}, - {'name_info': {'name': 'Ivan'}, 'server_info': 'server2'}, - {'name_info': {'name': 'Mistral'}, 'server_info': 'server3'}, - ], - action_input_collection - ) - - def test_calculate_input_wrong_array_length(self): - with_items_input = { - 'name_info': [ - {'name': 'John'}, - {'name': 'Ivan'}, - {'name': 'Mistral'} - ], - 'server_info': [ - 'server1', - 'server2' - ] - } - exception = self.assertRaises( - exc.InputException, - with_items.calc_input, - with_items_input - ) - - self.assertIn('the same length', exception.message) - - def test_calculate_input_not_list(self): - with_items_input = { - 'name_info': [ - {'name': 'John'}, - {'name': 'Ivan'}, - {'name': 'Mistral'} - ], - 'server_info': 'some_string' - } - exception = self.assertRaises( - exc.InputException, - with_items.calc_input, - with_items_input - ) - - self.assertIn('List type', exception.message) diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index e6a8978c..f3cbe5c2 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -12,82 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy - from mistral import exceptions as exc -from mistral import expressions as expr - - -# TODO(rakhmerov): The module should probably go into task_handler. -def get_result(task_ex, task_spec, result): - """Returns result from task markered as with-items - - Examples of result: - 1. Without publish clause: - { - "task": { - "task1": [None] - } - } - Note: In this case we don't create any specific - result to prevent generating large data in DB. - - Note: None here used for calculating number of - finished iterations. - - 2. With publish clause and specific result key: - { - "result": [ - "result1", - "result2" - ], - "task": { - "task1": { - "result": [ - "result1", - "result2" - ] - } - } - } - """ - e_data = result.error - - expr_ctx = copy.deepcopy(task_ex.in_context) or {} - - expr_ctx[task_ex.name] = copy.deepcopy(result.data) or {} - - # Calc result for with-items (only list form is used). - result = expr.evaluate_recursively(task_spec.get_publish(), expr_ctx) - - if not task_ex.result: - task_ex.result = {} - - task_result = copy.copy(task_ex.result) - - res_key = _get_result_key(task_spec) - - if res_key: - if res_key in task_result: - task_result[res_key].append( - result.get(res_key) or e_data - ) - else: - task_result[res_key] = [result.get(res_key) or e_data] - - # Add same result to task result under key 'task'. - # TODO(rakhmerov): Fix this during task result refactoring. - # task_result[t_name] = - # { - # res_key: task_result[res_key] - # } - # else: - # if 'task' not in task_result: - # task_result.update({'task': {t_name: [None or e_data]}}) - # else: - # task_result['task'][t_name].append(None or e_data) - - return task_result +from mistral.workflow import states def _get_context(task_ex): @@ -130,7 +56,7 @@ def do_step(task_ex): task_ex.runtime_context.update({'with_items': with_items_context}) -def prepare_runtime_context(task_ex, task_spec): +def prepare_runtime_context(task_ex, task_spec, input_dicts): runtime_context = task_ex.runtime_context with_items_spec = task_spec.get_with_items() @@ -139,50 +65,10 @@ def prepare_runtime_context(task_ex, task_spec): runtime_context['with_items'] = { 'capacity': get_concurrency_spec(task_spec), 'index': 0, - 'count': len(task_ex.input[with_items_spec.keys()[0]]) + 'count': len(input_dicts) } -def calc_input(with_items_input): - """Calculate action input collection for separating each action input. - - Example: - DSL: - with_items: - - itemX in <% $.arrayI %> - - itemY in <% $.arrayJ %> - - Assume arrayI = [1, 2], arrayJ = ['a', 'b']. - with_items_input = { - "itemX": [1, 2], - "itemY": ['a', 'b'] - } - - Then we get separated input: - action_input_collection = [ - {'itemX': 1, 'itemY': 'a'}, - {'itemX': 2, 'itemY': 'b'} - ] - - :param with_items_input: Dict containing mapped variables to their arrays. - :return: list containing dicts of each action input. - """ - validate_input(with_items_input) - - action_input_collection = [] - - for key, value in with_items_input.items(): - for index, item in enumerate(value): - iter_context = {key: item} - - if index >= len(action_input_collection): - action_input_collection.append(iter_context) - else: - action_input_collection[index].update(iter_context) - - return action_input_collection - - def validate_input(with_items_input): # Take only mapped values and check them. values = with_items_input.values() @@ -202,10 +88,7 @@ def validate_input(with_items_input): ) -def _get_result_key(task_spec): - return (task_spec.get_publish().keys()[0] - if task_spec.get_publish() else None) - - -def is_iterations_incomplete(task_ex): - return get_index(task_ex) < get_count(task_ex) +def iterations_completed(task_ex): + completed = all([states.is_completed(ex.state) + for ex in task_ex.executions]) + return completed