diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index c3814b21..eb9f46b2 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -56,7 +56,7 @@ class Execution(resource.Resource): "reference to the parent task execution" state = wtypes.text - "state can be one of: RUNNING, SUCCESS, ERROR, PAUSED" + "state can be one of: IDLE, RUNNING, SUCCESS, ERROR, PAUSED" state_info = wtypes.text "an optional state information string" diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 55ea28f6..8f7cb344 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import six import traceback from oslo_log import log as logging @@ -25,16 +23,15 @@ from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler from mistral.engine import base from mistral.engine import task_handler -from mistral.engine import utils as eng_utils from mistral.engine import workflow_handler as wf_handler from mistral.services import action_manager as a_m +from mistral.services import executions as wf_ex_service from mistral.services import workflows as wf_service from mistral import utils as u from mistral.utils import wf_trace from mistral.workbook import parser as spec_parser from mistral.workflow import base as wf_base from mistral.workflow import commands -from mistral.workflow import data_flow from mistral.workflow import states from mistral.workflow import utils as wf_utils @@ -54,39 +51,33 @@ class DefaultEngine(base.Engine, coordination.Service): @u.log_exec(LOG) def start_workflow(self, wf_name, wf_input, description='', **params): - wf_exec_id = None - - params = self._canonize_workflow_params(params) + wf_ex_id = None try: with db_api.transaction(): - wf_def = db_api.get_workflow_definition(wf_name) - wf_spec = spec_parser.get_workflow_spec(wf_def.spec) - - eng_utils.validate_input(wf_def, wf_input, wf_spec) - - wf_ex = self._create_workflow_execution( - wf_def, - wf_spec, + # The new workflow execution will be in an IDLE + # state on initial record creation. + wf_ex_id = wf_ex_service.create_workflow_execution( + wf_name, wf_input, description, params ) - wf_exec_id = wf_ex.id - - wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) # Separate workflow execution creation and dispatching command # transactions in order to be able to return workflow execution # with corresponding error message in state_info when error occurs # at dispatching commands. with db_api.transaction(): - wf_ex = db_api.get_workflow_execution(wf_exec_id) + wf_ex = db_api.get_workflow_execution(wf_ex_id) + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + wf_handler.set_execution_state(wf_ex, states.RUNNING) wf_ctrl = wf_base.WorkflowController.get_controller( wf_ex, wf_spec ) + self._dispatch_workflow_commands( wf_ex, wf_ctrl.continue_workflow() @@ -96,9 +87,10 @@ class DefaultEngine(base.Engine, coordination.Service): except Exception as e: LOG.error( "Failed to start workflow '%s' id=%s: %s\n%s", - wf_name, wf_exec_id, e, traceback.format_exc() + wf_name, wf_ex_id, e, traceback.format_exc() ) - wf_ex = self._fail_workflow(wf_exec_id, e) + + wf_ex = self._fail_workflow(wf_ex_id, e) if wf_ex: return wf_ex.get_clone() @@ -252,16 +244,16 @@ class DefaultEngine(base.Engine, coordination.Service): return action_ex.get_clone() prev_task_state = task_ex.state + # Separate the task transition in a separate transaction. The task # has already completed for better or worst. The task state should # not be affected by errors during transition on conditions such as # on-success and on-error. with db_api.transaction(): + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) action_ex = db_api.get_action_execution(action_ex_id) task_ex = action_ex.task_execution - wf_ex = wf_handler.lock_workflow_execution( - task_ex.workflow_execution_id - ) + self._on_task_state_change( task_ex, wf_ex, @@ -364,7 +356,8 @@ class DefaultEngine(base.Engine, coordination.Service): with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - if wf_ex.state != states.PAUSED: + if (not states.is_paused(wf_ex.state) and + not states.is_idle(wf_ex.state)): return wf_ex.get_clone() return self._continue_workflow(wf_ex, env=env) @@ -467,46 +460,3 @@ class DefaultEngine(base.Engine, coordination.Service): ) return wf_ex - - @staticmethod - def _canonize_workflow_params(params): - # Resolve environment parameter. - env = params.get('env', {}) - - if not isinstance(env, dict) and not isinstance(env, six.string_types): - raise ValueError( - 'Unexpected type for environment [environment=%s]' % str(env) - ) - - if isinstance(env, six.string_types): - env_db = db_api.get_environment(env) - env = env_db.variables - params['env'] = env - - return params - - @staticmethod - def _create_workflow_execution(wf_def, wf_spec, wf_input, description, - params): - wf_ex = db_api.create_workflow_execution({ - 'name': wf_def.name, - 'description': description, - 'workflow_name': wf_def.name, - 'spec': wf_spec.to_dict(), - 'params': params or {}, - 'state': states.RUNNING, - 'input': wf_input or {}, - 'output': {}, - 'context': copy.deepcopy(wf_input) or {}, - 'task_execution_id': params.get('task_execution_id'), - 'runtime_context': { - 'with_items_index': params.get('with_items_index', 0) - }, - }) - - data_flow.add_openstack_data_to_context(wf_ex) - data_flow.add_execution_to_context(wf_ex) - data_flow.add_environment_to_context(wf_ex) - data_flow.add_workflow_variables_to_context(wf_ex, wf_spec) - - return wf_ex diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index ab02b6bf..854d7bd4 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -26,6 +26,7 @@ from mistral.engine import rpc from mistral.engine import utils as e_utils from mistral import exceptions as exc from mistral import expressions as expr +from mistral.services import executions as wf_ex_service from mistral.services import scheduler from mistral import utils from mistral.utils import wf_trace @@ -476,24 +477,25 @@ def _schedule_run_workflow(task_ex, task_spec, wf_input, index): wf_params[k] = v del wf_input[k] - scheduler.schedule_call( - None, - 'mistral.engine.task_handler.run_workflow', - 0, - wf_name=wf_def.name, - wf_input=wf_input, - wf_params=wf_params - ) - - -def run_workflow(wf_name, wf_input, wf_params): - rpc.get_engine_client().start_workflow( - wf_name, + wf_ex_id = wf_ex_service.create_workflow_execution( + wf_def.name, wf_input, "sub-workflow execution", - **wf_params + wf_params ) + scheduler.schedule_call( + None, + 'mistral.engine.task_handler.resume_workflow', + 0, + wf_ex_id=wf_ex_id, + env=None + ) + + +def resume_workflow(wf_ex_id, env): + rpc.get_engine_client().resume_workflow(wf_ex_id, env=env) + def _complete_task(task_ex, task_spec, state, state_info=None): # Ignore if task already completed. diff --git a/mistral/services/executions.py b/mistral/services/executions.py new file mode 100644 index 00000000..9a15863f --- /dev/null +++ b/mistral/services/executions.py @@ -0,0 +1,91 @@ +# Copyright 2016 - StackStorm, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import six + +from oslo_log import log as logging + +from mistral.db.v2 import api as db_api +from mistral.engine import utils as eng_utils +from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import data_flow +from mistral.workflow import states + + +LOG = logging.getLogger(__name__) + + +def canonize_workflow_params(params): + # Resolve environment parameter. + env = params.get('env', {}) + + if not isinstance(env, dict) and not isinstance(env, six.string_types): + raise ValueError( + 'Unexpected type for environment [environment=%s]' % str(env) + ) + + if isinstance(env, six.string_types): + env_db = db_api.get_environment(env) + env = env_db.variables + params['env'] = env + + return params + + +def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): + wf_ex = db_api.create_workflow_execution({ + 'name': wf_def.name, + 'description': desc, + 'workflow_name': wf_def.name, + 'spec': wf_spec.to_dict(), + 'params': params or {}, + 'state': states.IDLE, + 'input': wf_input or {}, + 'output': {}, + 'context': copy.deepcopy(wf_input) or {}, + 'task_execution_id': params.get('task_execution_id'), + 'runtime_context': { + 'with_items_index': params.get('with_items_index', 0) + }, + }) + + data_flow.add_openstack_data_to_context(wf_ex) + data_flow.add_execution_to_context(wf_ex) + data_flow.add_environment_to_context(wf_ex) + data_flow.add_workflow_variables_to_context(wf_ex, wf_spec) + + return wf_ex + + +def create_workflow_execution(wf_name, wf_input, description, params): + params = canonize_workflow_params(params) + + wf_def = db_api.get_workflow_definition(wf_name) + wf_spec = spec_parser.get_workflow_spec(wf_def.spec) + + eng_utils.validate_input(wf_def, wf_input, wf_spec) + + wf_ex = _create_workflow_execution( + wf_def, + wf_spec, + wf_input, + description, + params + ) + + wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) + + return wf_ex.id diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 8d64d4cf..28c9664a 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -1061,3 +1061,51 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn(2, result_task1) self.assertIn(3, result_task2) self.assertIn(4, result_task2) + + def test_with_items_subflow_concurrency_gt_list_length(self): + workbook_definition = """--- + version: "2.0" + name: wb1 + + workflows: + main: + type: direct + input: + - names + tasks: + task1: + with-items: name in <% $.names %> + workflow: subflow1 name=<% $.name %> + concurrency: 3 + subflow1: + type: direct + input: + - name + output: + result: <% task(task1).result %> + tasks: + task1: + action: std.echo output=<% $.name %> + """ + + wb_service.create_workbook_v2(workbook_definition) + + # Start workflow. + names = ["Peter", "Susan", "Edmund", "Lucy", "Aslan", "Caspian"] + wf_ex = self.engine.start_workflow('wb1.main', {'names': names}) + + self._await( + lambda: self.is_execution_success(wf_ex.id), + ) + + wf_ex = db_api.get_execution(wf_ex.id) + task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') + + self.assertEqual(states.SUCCESS, task_ex.state) + + result = [ + item['result'] + for item in data_flow.get_task_execution_result(task_ex) + ] + + self.assertListEqual(sorted(result), sorted(names)) diff --git a/mistral/workflow/states.py b/mistral/workflow/states.py index f94f7b9b..dc0ab611 100644 --- a/mistral/workflow/states.py +++ b/mistral/workflow/states.py @@ -49,6 +49,10 @@ def is_completed(state): return state in [SUCCESS, ERROR] +def is_running(state): + return state in [RUNNING, RUNNING_DELAYED] + + def is_waiting(state): return state == WAITING @@ -57,8 +61,12 @@ def is_idle(state): return state == IDLE +def is_paused(state): + return state == PAUSED + + def is_paused_or_completed(state): - return state == PAUSED or is_completed(state) + return is_paused(state) or is_completed(state) def is_valid_transition(from_state, to_state): diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index a77ba929..282bb5a0 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -45,12 +45,14 @@ def is_completed(task_ex): def get_index(task_ex): - return len( - list(filter( - lambda x: x.accepted or states.RUNNING, task_ex.executions - )) + f = lambda x: ( + x.accepted or + states.is_running(x.state) or + states.is_idle(x.state) ) + return len(list(filter(f, task_ex.executions))) + def get_concurrency(task_ex): return task_ex.runtime_context.get(_CONCURRENCY)