# Copyright 2013 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import traceback from oslo_log import log as logging from mistral import coordination from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler from mistral.engine import base from mistral.engine import task_handler from mistral.engine import utils as eng_utils from mistral.engine import workflow_handler as wf_handler from mistral.services import action_manager as a_m from mistral import utils as u from mistral.utils import wf_trace from mistral.workbook import parser as spec_parser from mistral.workflow import base as wf_base from mistral.workflow import commands from mistral.workflow import data_flow from mistral.workflow import states from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) # Submodules of mistral.engine will throw NoSuchOptError if configuration # options required at top level of this __init__.py are not imported before # the submodules are referenced. class DefaultEngine(base.Engine, coordination.Service): def __init__(self, engine_client): self._engine_client = engine_client coordination.Service.__init__(self, 'engine_group') @u.log_exec(LOG) def start_workflow(self, wf_name, wf_input, description='', **params): wf_exec_id = None try: params = self._canonize_workflow_params(params) with db_api.transaction(): wf_def = db_api.get_workflow_definition(wf_name) wf_spec = spec_parser.get_workflow_spec(wf_def.spec) eng_utils.validate_input(wf_def, wf_input, wf_spec) wf_ex = self._create_workflow_execution( wf_def, wf_spec, wf_input, description, params ) wf_exec_id = wf_ex.id wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) wf_ctrl = wf_base.WorkflowController.get_controller( wf_ex, wf_spec ) self._dispatch_workflow_commands( wf_ex, wf_ctrl.continue_workflow() ) return wf_ex.get_clone() except Exception as e: LOG.error( "Failed to start workflow '%s' id=%s: %s\n%s", wf_name, wf_exec_id, e, traceback.format_exc() ) self._fail_workflow(wf_exec_id, e) raise e @u.log_exec(LOG) def start_action(self, action_name, action_input, description=None, **params): with db_api.transaction(): action_def = action_handler.resolve_definition(action_name) resolved_action_input = action_handler.get_action_input( action_name, action_input ) action = a_m.get_action_class(action_def.name)( **resolved_action_input ) # If we see action is asynchronous, then we enforce 'save_result'. if params.get('save_result') or not action.is_sync(): action_ex = action_handler.create_action_execution( action_def, resolved_action_input, description=description ) action_handler.run_action( action_def, resolved_action_input, action_ex.id, params.get('target') ) return action_ex.get_clone() else: output = action_handler.run_action( action_def, resolved_action_input, target=params.get('target'), async=False ) return db_models.ActionExecution( name=action_name, description=description, input=action_input, output=output ) def on_task_state_change(self, task_ex_id, state): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) # TODO(rakhmerov): The method is mostly needed for policy and # we are supposed to get the same action execution as when the # policy worked. But by the moment this method is called the # last execution object may have changed. It's a race condition. execution = task_ex.executions[-1] wf_ex_id = task_ex.workflow_execution_id # Must be before loading the object itself (see method doc). self._lock_workflow_execution(wf_ex_id) wf_ex = task_ex.workflow_execution wf_trace.info( task_ex, "Task '%s' [%s -> %s]" % (task_ex.name, task_ex.state, state) ) task_ex.state = state self._on_task_state_change(task_ex, wf_ex, action_ex=execution) def _on_task_state_change(self, task_ex, wf_ex, action_ex=None): task_spec = spec_parser.get_task_spec(task_ex.spec) wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) if states.is_completed(task_ex.state): task_handler.after_task_complete(task_ex, task_spec, wf_spec) # Ignore DELAYED state. if task_ex.state == states.DELAYED: return wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex) # Calculate commands to process next. cmds = wf_ctrl.continue_workflow() task_ex.processed = True self._dispatch_workflow_commands(wf_ex, cmds) self._check_workflow_completion(wf_ex, wf_ctrl) @staticmethod def _check_workflow_completion(wf_ex, wf_ctrl): if states.is_paused_or_completed(wf_ex.state): return if wf_utils.find_incomplete_tasks(wf_ex): return if wf_ctrl.all_errors_handled(): wf_handler.succeed_workflow( wf_ex, wf_ctrl.evaluate_workflow_final_context() ) else: state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex) wf_handler.fail_workflow(wf_ex, state_info) @u.log_exec(LOG) def on_action_complete(self, action_ex_id, result): wf_ex_id = None try: with db_api.transaction(): action_ex = db_api.get_action_execution(action_ex_id) # In case of single action execution there is no # assigned task execution. if not action_ex.task_execution: return action_handler.store_action_result( action_ex, result ).get_clone() wf_ex_id = action_ex.task_execution.workflow_execution_id # Must be before loading the object itself (see method doc). self._lock_workflow_execution(wf_ex_id) wf_ex = action_ex.task_execution.workflow_execution task_ex = task_handler.on_action_complete(action_ex, result) # If workflow is on pause or completed then there's no # need to continue workflow. if states.is_paused_or_completed(wf_ex.state): return action_ex self._on_task_state_change(task_ex, wf_ex, action_ex) return action_ex.get_clone() except Exception as e: # TODO(dzimine): try to find out which command caused failure. # TODO(rakhmerov): Need to refactor logging in a more elegant way. LOG.error( "Failed to handle action execution result [id=%s]: %s\n%s", action_ex_id, e, traceback.format_exc() ) self._fail_workflow(wf_ex_id, e) raise e @u.log_exec(LOG) def pause_workflow(self, execution_id): with db_api.transaction(): # Must be before loading the object itself (see method doc). self._lock_workflow_execution(execution_id) wf_ex = db_api.get_workflow_execution(execution_id) wf_handler.set_execution_state(wf_ex, states.PAUSED) return wf_ex def _continue_workflow(self, wf_ex, task_ex=None, reset=True): wf_handler.set_execution_state(wf_ex, states.RUNNING) wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex) # Calculate commands to process next. cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset) # When resuming a workflow we need to ignore all 'pause' # commands because workflow controller takes tasks that # completed within the period when the workflow was pause. cmds = filter( lambda c: not isinstance(c, commands.PauseWorkflow), cmds ) # Since there's no explicit task causing the operation # we need to mark all not processed tasks as processed # because workflow controller takes only completed tasks # with flag 'processed' equal to False. for t_ex in wf_ex.task_executions: if states.is_completed(t_ex.state) and not t_ex.processed: t_ex.processed = True self._dispatch_workflow_commands(wf_ex, cmds) if not cmds: if not wf_utils.find_incomplete_tasks(wf_ex): wf_handler.succeed_workflow( wf_ex, wf_ctrl.evaluate_workflow_final_context() ) return wf_ex.get_clone() @u.log_exec(LOG) def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True): try: with db_api.transaction(): # Must be before loading the object itself (see method doc). self._lock_workflow_execution(wf_ex_id) task_ex = db_api.get_task_execution(task_ex_id) if task_ex.workflow_execution.id != wf_ex_id: raise ValueError('Workflow execution ID does not match.') wf_ex = task_ex.workflow_execution if wf_ex.state == states.PAUSED: return wf_ex.get_clone() return self._continue_workflow(wf_ex, task_ex, reset) except Exception as e: LOG.error( "Failed to rerun execution id=%s at task=%s: %s\n%s", wf_ex_id, task_ex_id, e, traceback.format_exc() ) self._fail_workflow(wf_ex_id, e) raise e @u.log_exec(LOG) def resume_workflow(self, wf_ex_id): try: with db_api.transaction(): # Must be before loading the object itself (see method doc). self._lock_workflow_execution(wf_ex_id) wf_ex = db_api.get_workflow_execution(wf_ex_id) if wf_ex.state != states.PAUSED: return wf_ex.get_clone() return self._continue_workflow(wf_ex) except Exception as e: LOG.error( "Failed to resume execution id=%s: %s\n%s", wf_ex_id, e, traceback.format_exc() ) self._fail_workflow(wf_ex_id, e) raise e @u.log_exec(LOG) def stop_workflow(self, execution_id, state, message=None): with db_api.transaction(): # Must be before loading the object itself (see method doc). self._lock_workflow_execution(execution_id) wf_ex = db_api.get_execution(execution_id) return self._stop_workflow(wf_ex, state, message) @staticmethod def _stop_workflow(wf_ex, state, message=None): if state == states.SUCCESS: wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex) final_context = {} try: final_context = wf_ctrl.evaluate_workflow_final_context() except Exception as e: LOG.warn( "Failed to get final context for %s: %s" % (wf_ex, e) ) return wf_handler.succeed_workflow( wf_ex, final_context, message ) elif state == states.ERROR: return wf_handler.fail_workflow(wf_ex, message) return wf_ex @u.log_exec(LOG) def rollback_workflow(self, execution_id): # TODO(rakhmerov): Implement. raise NotImplementedError def _dispatch_workflow_commands(self, wf_ex, wf_cmds): if not wf_cmds: return for cmd in wf_cmds: if isinstance(cmd, commands.RunTask) and cmd.is_waiting(): task_handler.defer_task(cmd) elif isinstance(cmd, commands.RunTask): task_handler.run_new_task(cmd) elif isinstance(cmd, commands.RunExistingTask): task_handler.run_existing_task( cmd.task_ex.id, reset=cmd.reset ) elif isinstance(cmd, commands.SetWorkflowState): if states.is_completed(cmd.new_state): self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg) else: wf_handler.set_execution_state(wf_ex, cmd.new_state) elif isinstance(cmd, commands.Noop): # Do nothing. pass else: raise RuntimeError('Unsupported workflow command: %s' % cmd) if wf_ex.state != states.RUNNING: break @staticmethod def _fail_workflow(wf_ex_id, err, action_ex_id=None): """Private helper to fail workflow on exceptions.""" with db_api.transaction(): err_msg = str(err) wf_ex = db_api.load_workflow_execution(wf_ex_id) if wf_ex is None: LOG.error( "Cant fail workflow execution with id='%s': not found.", wf_ex_id ) return wf_handler.set_execution_state(wf_ex, states.ERROR, err_msg) if action_ex_id: # Note(dzimine): Don't call self.engine_client: # 1) to avoid computing and triggering next tasks # 2) to avoid a loop in case of error in transport action_ex = db_api.get_action_execution(action_ex_id) task_handler.on_action_complete( action_ex, wf_utils.Result(error=err_msg) ) @staticmethod def _canonize_workflow_params(params): # Resolve environment parameter. env = params.get('env', {}) if not isinstance(env, dict) and not isinstance(env, basestring): raise ValueError( 'Unexpected type for environment [environment=%s]' % str(env) ) if isinstance(env, basestring): env_db = db_api.get_environment(env) env = env_db.variables params['env'] = env return params @staticmethod def _create_workflow_execution(wf_def, wf_spec, wf_input, description, params): wf_ex = db_api.create_workflow_execution({ 'name': wf_def.name, 'description': description, 'workflow_name': wf_def.name, 'spec': wf_spec.to_dict(), 'params': params or {}, 'state': states.RUNNING, 'input': wf_input or {}, 'output': {}, 'context': copy.copy(wf_input) or {}, 'task_execution_id': params.get('task_execution_id'), 'runtime_context': { 'with_items_index': params.get('with_items_index', 0) }, }) data_flow.add_openstack_data_to_context(wf_ex) data_flow.add_execution_to_context(wf_ex) data_flow.add_environment_to_context(wf_ex) data_flow.add_workflow_variables_to_context(wf_ex, wf_spec) return wf_ex @staticmethod def _lock_workflow_execution(wf_exec_id): # NOTE: Workflow execution object must be locked before # loading the object itself into the session (either with # 'get_XXX' or 'load_XXX' methods). Otherwise, there can be # multiple parallel transactions that see the same state # and hence the rest of the method logic would not be atomic. db_api.acquire_lock(db_models.WorkflowExecution, wf_exec_id)