From 433d8e7e994744a38ffe515b4d19095eeaa215ac Mon Sep 17 00:00:00 2001 From: Winson Chan Date: Fri, 29 Jan 2016 02:43:57 +0000 Subject: [PATCH] Fix with-items concurrency for sub-workflows For with-items task that uses concurrency to execute subworkflows, the remaining iterations after the first iteration will return the same item on subworkflow completion. This bug does not affect with-items task that executes action. The bug is caused by a delay in the scheduling of workflow execution and the creation of the execution record. For action execution, the record is created within the same DB transaction prior to scheduling. This patch creates the workflow execution record first just like for action execution and then schedule a resume workflow. Change-Id: Iba80068260caead9ae8f2f8f105abc5b9349db52 Closes-Bug: #1536415 --- mistral/api/controllers/v2/execution.py | 2 +- mistral/engine/default_engine.py | 86 ++++-------------- mistral/engine/task_handler.py | 30 ++++--- mistral/services/executions.py | 91 ++++++++++++++++++++ mistral/tests/unit/engine/test_with_items.py | 48 +++++++++++ mistral/workflow/states.py | 10 ++- mistral/workflow/with_items.py | 10 ++- 7 files changed, 189 insertions(+), 88 deletions(-) create mode 100644 mistral/services/executions.py diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index c3814b21..eb9f46b2 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -56,7 +56,7 @@ class Execution(resource.Resource): "reference to the parent task execution" state = wtypes.text - "state can be one of: RUNNING, SUCCESS, ERROR, PAUSED" + "state can be one of: IDLE, RUNNING, SUCCESS, ERROR, PAUSED" state_info = wtypes.text "an optional state information string" diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 55ea28f6..8f7cb344 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import six import traceback from oslo_log import log as logging @@ -25,16 +23,15 @@ from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler from mistral.engine import base from mistral.engine import task_handler -from mistral.engine import utils as eng_utils from mistral.engine import workflow_handler as wf_handler from mistral.services import action_manager as a_m +from mistral.services import executions as wf_ex_service from mistral.services import workflows as wf_service from mistral import utils as u from mistral.utils import wf_trace from mistral.workbook import parser as spec_parser from mistral.workflow import base as wf_base from mistral.workflow import commands -from mistral.workflow import data_flow from mistral.workflow import states from mistral.workflow import utils as wf_utils @@ -54,39 +51,33 @@ class DefaultEngine(base.Engine, coordination.Service): @u.log_exec(LOG) def start_workflow(self, wf_name, wf_input, description='', **params): - wf_exec_id = None - - params = self._canonize_workflow_params(params) + wf_ex_id = None try: with db_api.transaction(): - wf_def = db_api.get_workflow_definition(wf_name) - wf_spec = spec_parser.get_workflow_spec(wf_def.spec) - - eng_utils.validate_input(wf_def, wf_input, wf_spec) - - wf_ex = self._create_workflow_execution( - wf_def, - wf_spec, + # The new workflow execution will be in an IDLE + # state on initial record creation. + wf_ex_id = wf_ex_service.create_workflow_execution( + wf_name, wf_input, description, params ) - wf_exec_id = wf_ex.id - - wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) # Separate workflow execution creation and dispatching command # transactions in order to be able to return workflow execution # with corresponding error message in state_info when error occurs # at dispatching commands. with db_api.transaction(): - wf_ex = db_api.get_workflow_execution(wf_exec_id) + wf_ex = db_api.get_workflow_execution(wf_ex_id) + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + wf_handler.set_execution_state(wf_ex, states.RUNNING) wf_ctrl = wf_base.WorkflowController.get_controller( wf_ex, wf_spec ) + self._dispatch_workflow_commands( wf_ex, wf_ctrl.continue_workflow() @@ -96,9 +87,10 @@ class DefaultEngine(base.Engine, coordination.Service): except Exception as e: LOG.error( "Failed to start workflow '%s' id=%s: %s\n%s", - wf_name, wf_exec_id, e, traceback.format_exc() + wf_name, wf_ex_id, e, traceback.format_exc() ) - wf_ex = self._fail_workflow(wf_exec_id, e) + + wf_ex = self._fail_workflow(wf_ex_id, e) if wf_ex: return wf_ex.get_clone() @@ -252,16 +244,16 @@ class DefaultEngine(base.Engine, coordination.Service): return action_ex.get_clone() prev_task_state = task_ex.state + # Separate the task transition in a separate transaction. The task # has already completed for better or worst. The task state should # not be affected by errors during transition on conditions such as # on-success and on-error. with db_api.transaction(): + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) action_ex = db_api.get_action_execution(action_ex_id) task_ex = action_ex.task_execution - wf_ex = wf_handler.lock_workflow_execution( - task_ex.workflow_execution_id - ) + self._on_task_state_change( task_ex, wf_ex, @@ -364,7 +356,8 @@ class DefaultEngine(base.Engine, coordination.Service): with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - if wf_ex.state != states.PAUSED: + if (not states.is_paused(wf_ex.state) and + not states.is_idle(wf_ex.state)): return wf_ex.get_clone() return self._continue_workflow(wf_ex, env=env) @@ -467,46 +460,3 @@ class DefaultEngine(base.Engine, coordination.Service): ) return wf_ex - - @staticmethod - def _canonize_workflow_params(params): - # Resolve environment parameter. - env = params.get('env', {}) - - if not isinstance(env, dict) and not isinstance(env, six.string_types): - raise ValueError( - 'Unexpected type for environment [environment=%s]' % str(env) - ) - - if isinstance(env, six.string_types): - env_db = db_api.get_environment(env) - env = env_db.variables - params['env'] = env - - return params - - @staticmethod - def _create_workflow_execution(wf_def, wf_spec, wf_input, description, - params): - wf_ex = db_api.create_workflow_execution({ - 'name': wf_def.name, - 'description': description, - 'workflow_name': wf_def.name, - 'spec': wf_spec.to_dict(), - 'params': params or {}, - 'state': states.RUNNING, - 'input': wf_input or {}, - 'output': {}, - 'context': copy.deepcopy(wf_input) or {}, - 'task_execution_id': params.get('task_execution_id'), - 'runtime_context': { - 'with_items_index': params.get('with_items_index', 0) - }, - }) - - data_flow.add_openstack_data_to_context(wf_ex) - data_flow.add_execution_to_context(wf_ex) - data_flow.add_environment_to_context(wf_ex) - data_flow.add_workflow_variables_to_context(wf_ex, wf_spec) - - return wf_ex diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index ab02b6bf..854d7bd4 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -26,6 +26,7 @@ from mistral.engine import rpc from mistral.engine import utils as e_utils from mistral import exceptions as exc from mistral import expressions as expr +from mistral.services import executions as wf_ex_service from mistral.services import scheduler from mistral import utils from mistral.utils import wf_trace @@ -476,24 +477,25 @@ def _schedule_run_workflow(task_ex, task_spec, wf_input, index): wf_params[k] = v del wf_input[k] - scheduler.schedule_call( - None, - 'mistral.engine.task_handler.run_workflow', - 0, - wf_name=wf_def.name, - wf_input=wf_input, - wf_params=wf_params - ) - - -def run_workflow(wf_name, wf_input, wf_params): - rpc.get_engine_client().start_workflow( - wf_name, + wf_ex_id = wf_ex_service.create_workflow_execution( + wf_def.name, wf_input, "sub-workflow execution", - **wf_params + wf_params ) + scheduler.schedule_call( + None, + 'mistral.engine.task_handler.resume_workflow', + 0, + wf_ex_id=wf_ex_id, + env=None + ) + + +def resume_workflow(wf_ex_id, env): + rpc.get_engine_client().resume_workflow(wf_ex_id, env=env) + def _complete_task(task_ex, task_spec, state, state_info=None): # Ignore if task already completed. diff --git a/mistral/services/executions.py b/mistral/services/executions.py new file mode 100644 index 00000000..9a15863f --- /dev/null +++ b/mistral/services/executions.py @@ -0,0 +1,91 @@ +# Copyright 2016 - StackStorm, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import six + +from oslo_log import log as logging + +from mistral.db.v2 import api as db_api +from mistral.engine import utils as eng_utils +from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import data_flow +from mistral.workflow import states + + +LOG = logging.getLogger(__name__) + + +def canonize_workflow_params(params): + # Resolve environment parameter. + env = params.get('env', {}) + + if not isinstance(env, dict) and not isinstance(env, six.string_types): + raise ValueError( + 'Unexpected type for environment [environment=%s]' % str(env) + ) + + if isinstance(env, six.string_types): + env_db = db_api.get_environment(env) + env = env_db.variables + params['env'] = env + + return params + + +def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): + wf_ex = db_api.create_workflow_execution({ + 'name': wf_def.name, + 'description': desc, + 'workflow_name': wf_def.name, + 'spec': wf_spec.to_dict(), + 'params': params or {}, + 'state': states.IDLE, + 'input': wf_input or {}, + 'output': {}, + 'context': copy.deepcopy(wf_input) or {}, + 'task_execution_id': params.get('task_execution_id'), + 'runtime_context': { + 'with_items_index': params.get('with_items_index', 0) + }, + }) + + data_flow.add_openstack_data_to_context(wf_ex) + data_flow.add_execution_to_context(wf_ex) + data_flow.add_environment_to_context(wf_ex) + data_flow.add_workflow_variables_to_context(wf_ex, wf_spec) + + return wf_ex + + +def create_workflow_execution(wf_name, wf_input, description, params): + params = canonize_workflow_params(params) + + wf_def = db_api.get_workflow_definition(wf_name) + wf_spec = spec_parser.get_workflow_spec(wf_def.spec) + + eng_utils.validate_input(wf_def, wf_input, wf_spec) + + wf_ex = _create_workflow_execution( + wf_def, + wf_spec, + wf_input, + description, + params + ) + + wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) + + return wf_ex.id diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 8d64d4cf..28c9664a 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -1061,3 +1061,51 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn(2, result_task1) self.assertIn(3, result_task2) self.assertIn(4, result_task2) + + def test_with_items_subflow_concurrency_gt_list_length(self): + workbook_definition = """--- + version: "2.0" + name: wb1 + + workflows: + main: + type: direct + input: + - names + tasks: + task1: + with-items: name in <% $.names %> + workflow: subflow1 name=<% $.name %> + concurrency: 3 + subflow1: + type: direct + input: + - name + output: + result: <% task(task1).result %> + tasks: + task1: + action: std.echo output=<% $.name %> + """ + + wb_service.create_workbook_v2(workbook_definition) + + # Start workflow. + names = ["Peter", "Susan", "Edmund", "Lucy", "Aslan", "Caspian"] + wf_ex = self.engine.start_workflow('wb1.main', {'names': names}) + + self._await( + lambda: self.is_execution_success(wf_ex.id), + ) + + wf_ex = db_api.get_execution(wf_ex.id) + task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') + + self.assertEqual(states.SUCCESS, task_ex.state) + + result = [ + item['result'] + for item in data_flow.get_task_execution_result(task_ex) + ] + + self.assertListEqual(sorted(result), sorted(names)) diff --git a/mistral/workflow/states.py b/mistral/workflow/states.py index f94f7b9b..dc0ab611 100644 --- a/mistral/workflow/states.py +++ b/mistral/workflow/states.py @@ -49,6 +49,10 @@ def is_completed(state): return state in [SUCCESS, ERROR] +def is_running(state): + return state in [RUNNING, RUNNING_DELAYED] + + def is_waiting(state): return state == WAITING @@ -57,8 +61,12 @@ def is_idle(state): return state == IDLE +def is_paused(state): + return state == PAUSED + + def is_paused_or_completed(state): - return state == PAUSED or is_completed(state) + return is_paused(state) or is_completed(state) def is_valid_transition(from_state, to_state): diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index a77ba929..282bb5a0 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -45,12 +45,14 @@ def is_completed(task_ex): def get_index(task_ex): - return len( - list(filter( - lambda x: x.accepted or states.RUNNING, task_ex.executions - )) + f = lambda x: ( + x.accepted or + states.is_running(x.state) or + states.is_idle(x.state) ) + return len(list(filter(f, task_ex.executions))) + def get_concurrency(task_ex): return task_ex.runtime_context.get(_CONCURRENCY)