Files
deb-mistral/mistral/engine/default_engine.py
LingxianKong 77cf6eb26a Fix invalid workflow completion in case of "join"
When we use 'join' clause in workflow execution, the execution should be
in running status, until all possible upstream tasks are completed and
corresponding conditions have triggered.

Do not remove the tasks with unsatisfied join when continuing execution,
or the workflow will succeed erroneously. So, a new attribute
'wait_flag' is added to RunTask class, to indicate if the task should
run immediately.

The difference between 'waiting' and 'delayed' status is, 'waiting'
indicates a task needs to wait for its conditions to be satisfied,
'delayed' means the task has to wait because of policy(wait-before,
wait-after or retry).

Change-Id: Idcd162b9c000b85acf2adabc158f848d0824ac57
Closes-Bug: #1455038
2015-06-30 17:11:42 +08:00

456 lines
16 KiB
Python

# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import traceback
from oslo_log import log as logging
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler
from mistral.engine import base
from mistral.engine import task_handler
from mistral.engine import utils as eng_utils
from mistral.engine import workflow_handler as wf_handler
from mistral.services import action_manager as a_m
from mistral import utils as u
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
# Submodules of mistral.engine will throw NoSuchOptError if configuration
# options required at top level of this __init__.py are not imported before
# the submodules are referenced.
class DefaultEngine(base.Engine):
def __init__(self, engine_client):
self._engine_client = engine_client
@u.log_exec(LOG)
def start_workflow(self, wf_name, wf_input, description='', **params):
wf_exec_id = None
try:
params = self._canonize_workflow_params(params)
with db_api.transaction():
wf_def = db_api.get_workflow_definition(wf_name)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
eng_utils.validate_input(wf_def, wf_input, wf_spec)
wf_ex = self._create_workflow_execution(
wf_def,
wf_spec,
wf_input,
description,
params
)
wf_exec_id = wf_ex.id
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name)
wf_ctrl = wf_base.WorkflowController.get_controller(
wf_ex,
wf_spec
)
self._dispatch_workflow_commands(
wf_ex,
wf_ctrl.continue_workflow()
)
return wf_ex.get_clone()
except Exception as e:
LOG.error(
"Failed to start workflow '%s' id=%s: %s\n%s",
wf_name, wf_exec_id, e, traceback.format_exc()
)
self._fail_workflow(wf_exec_id, e)
raise e
@u.log_exec(LOG)
def start_action(self, action_name, action_input,
description=None, **params):
with db_api.transaction():
action_def = action_handler.resolve_definition(action_name)
resolved_action_input = action_handler.get_action_input(
action_name,
action_input
)
action = a_m.get_action_class(action_def.name)(
**resolved_action_input
)
# If we see action is asynchronous, then we enforce 'save_result'.
if params.get('save_result') or not action.is_sync():
action_ex = action_handler.create_action_execution(
action_def,
resolved_action_input,
description=description
)
action_handler.run_action(
action_def,
resolved_action_input,
action_ex.id,
params.get('target')
)
return action_ex.get_clone()
else:
result = action_handler.run_action(
action_def,
resolved_action_input,
target=params.get('target'),
async=False
)
return db_models.ActionExecution(
name=action_name,
description=description,
input=action_input,
output={'result': result}
)
def on_task_state_change(self, task_ex_id, state):
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
# TODO(rakhmerov): The method is mostly needed for policy and
# we are supposed to get the same action execution as when the
# policy worked. But by the moment this method is called the
# last execution object may have changed. It's a race condition.
execution = task_ex.executions[-1]
wf_ex_id = task_ex.workflow_execution_id
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(wf_ex_id)
wf_ex = task_ex.workflow_execution
wf_trace.info(
task_ex,
"Task '%s' [%s -> %s]"
% (task_ex.name, task_ex.state, state)
)
task_ex.state = state
self._on_task_state_change(task_ex, wf_ex, action_ex=execution)
def _on_task_state_change(self, task_ex, wf_ex, action_ex=None):
task_spec = spec_parser.get_task_spec(task_ex.spec)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
if states.is_completed(task_ex.state):
task_handler.after_task_complete(task_ex, task_spec, wf_spec)
# Ignore DELAYED state.
if task_ex.state == states.DELAYED:
return
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow()
task_ex.processed = True
self._dispatch_workflow_commands(wf_ex, cmds)
self._check_workflow_completion(wf_ex, action_ex, wf_ctrl)
@staticmethod
def _check_workflow_completion(wf_ex, action_ex, wf_ctrl):
if states.is_paused_or_completed(wf_ex.state):
return
if wf_utils.find_incomplete_tasks(wf_ex):
return
if wf_ctrl.all_errors_handled():
wf_handler.succeed_workflow(
wf_ex,
wf_ctrl.evaluate_workflow_final_context()
)
else:
result_str = str(action_ex.output.get('result', "Unknown"))
state_info = (
"Failure caused by error in task '%s': %s" %
(action_ex.task_execution.name, result_str)
)
wf_handler.fail_workflow(wf_ex, state_info)
@u.log_exec(LOG)
def on_action_complete(self, action_ex_id, result):
wf_ex_id = None
try:
with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex_id)
# In case of single action execution there is no
# assigned task execution.
if not action_ex.task_execution:
return action_handler.store_action_result(
action_ex, result
).get_clone()
wf_ex_id = action_ex.task_execution.workflow_execution_id
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(wf_ex_id)
wf_ex = action_ex.task_execution.workflow_execution
task_ex = task_handler.on_action_complete(action_ex, result)
# If workflow is on pause or completed then there's no
# need to continue workflow.
if states.is_paused_or_completed(wf_ex.state):
return action_ex
self._on_task_state_change(task_ex, wf_ex, action_ex)
return action_ex.get_clone()
except Exception as e:
# TODO(dzimine): try to find out which command caused failure.
# TODO(rakhmerov): Need to refactor logging in a more elegant way.
LOG.error(
"Failed to handle action execution result [id=%s]: %s\n%s",
action_ex_id, e, traceback.format_exc()
)
self._fail_workflow(wf_ex_id, e)
raise e
@u.log_exec(LOG)
def pause_workflow(self, execution_id):
with db_api.transaction():
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(execution_id)
wf_ex = db_api.get_workflow_execution(execution_id)
wf_handler.set_execution_state(wf_ex, states.PAUSED)
return wf_ex
@u.log_exec(LOG)
def resume_workflow(self, execution_id):
try:
with db_api.transaction():
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(execution_id)
wf_ex = db_api.get_workflow_execution(execution_id)
if wf_ex.state != states.PAUSED:
return wf_ex
wf_handler.set_execution_state(wf_ex, states.RUNNING)
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow()
# When resuming a workflow we need to ignore all 'pause'
# commands because workflow controller takes tasks that
# completed within the period when the workflow was pause.
cmds = filter(
lambda c: not isinstance(c, commands.PauseWorkflow),
cmds
)
# Since there's no explicit task causing the operation
# we need to mark all not processed tasks as processed
# because workflow controller takes only completed tasks
# with flag 'processed' equal to False.
for t_ex in wf_ex.task_executions:
if states.is_completed(t_ex.state) and not t_ex.processed:
t_ex.processed = True
self._dispatch_workflow_commands(wf_ex, cmds)
if not cmds:
if not wf_utils.find_incomplete_tasks(wf_ex):
wf_handler.succeed_workflow(
wf_ex,
wf_ctrl.evaluate_workflow_final_context()
)
return wf_ex
except Exception as e:
LOG.error(
"Failed to resume execution id=%s: %s\n%s",
execution_id, e, traceback.format_exc()
)
self._fail_workflow(execution_id, e)
raise e
@u.log_exec(LOG)
def stop_workflow(self, execution_id, state, message=None):
with db_api.transaction():
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(execution_id)
wf_ex = db_api.get_execution(execution_id)
return self._stop_workflow(wf_ex, state, message)
@staticmethod
def _stop_workflow(wf_ex, state, message=None):
if state == states.SUCCESS:
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warn(
"Failed to get final context for %s: %s" % (wf_ex, e)
)
return wf_handler.succeed_workflow(
wf_ex,
final_context,
message
)
elif state == states.ERROR:
return wf_handler.fail_workflow(wf_ex, message)
return wf_ex
@u.log_exec(LOG)
def rollback_workflow(self, execution_id):
# TODO(rakhmerov): Implement.
raise NotImplementedError
def _dispatch_workflow_commands(self, wf_ex, wf_cmds):
if not wf_cmds:
return
for cmd in wf_cmds:
if isinstance(cmd, commands.RunTask) and cmd.is_waiting():
task_handler.defer_task(cmd)
elif isinstance(cmd, commands.RunTask):
task_handler.run_new_task(cmd)
elif isinstance(cmd, commands.RunExistingTask):
task_handler.run_existing_task(cmd.task_ex.id)
elif isinstance(cmd, commands.SetWorkflowState):
if states.is_completed(cmd.new_state):
self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
else:
wf_handler.set_execution_state(wf_ex, cmd.new_state)
elif isinstance(cmd, commands.Noop):
# Do nothing.
pass
else:
raise RuntimeError('Unsupported workflow command: %s' % cmd)
if wf_ex.state != states.RUNNING:
break
@staticmethod
def _fail_workflow(wf_ex_id, err, action_ex_id=None):
"""Private helper to fail workflow on exceptions."""
with db_api.transaction():
err_msg = str(err)
wf_ex = db_api.load_workflow_execution(wf_ex_id)
if wf_ex is None:
LOG.error(
"Cant fail workflow execution with id='%s': not found.",
wf_ex_id
)
return
wf_handler.set_execution_state(wf_ex, states.ERROR, err_msg)
if action_ex_id:
# Note(dzimine): Don't call self.engine_client:
# 1) to avoid computing and triggering next tasks
# 2) to avoid a loop in case of error in transport
action_ex = db_api.get_action_execution(action_ex_id)
task_handler.on_action_complete(
action_ex,
wf_utils.Result(error=err_msg)
)
@staticmethod
def _canonize_workflow_params(params):
# Resolve environment parameter.
env = params.get('env', {})
if not isinstance(env, dict) and not isinstance(env, basestring):
raise ValueError(
'Unexpected type for environment [environment=%s]' % str(env)
)
if isinstance(env, basestring):
env_db = db_api.get_environment(env)
env = env_db.variables
params['env'] = env
return params
@staticmethod
def _create_workflow_execution(wf_def, wf_spec, wf_input, description,
params):
wf_ex = db_api.create_workflow_execution({
'name': wf_def.name,
'description': description,
'workflow_name': wf_def.name,
'spec': wf_spec.to_dict(),
'params': params or {},
'state': states.RUNNING,
'input': wf_input or {},
'output': {},
'context': copy.copy(wf_input) or {},
'task_execution_id': params.get('task_execution_id'),
'runtime_context': {
'with_items_index': params.get('with_items_index', 0)
},
})
data_flow.add_openstack_data_to_context(wf_ex)
data_flow.add_execution_to_context(wf_ex)
data_flow.add_environment_to_context(wf_ex)
data_flow.add_workflow_variables_to_context(wf_ex, wf_spec)
return wf_ex
@staticmethod
def _lock_workflow_execution(wf_exec_id):
# NOTE: Workflow execution object must be locked before
# loading the object itself into the session (either with
# 'get_XXX' or 'load_XXX' methods). Otherwise, there can be
# multiple parallel transactions that see the same state
# and hence the rest of the method logic would not be atomic.
db_api.acquire_lock(db_models.WorkflowExecution, wf_exec_id)