5e14857538
* At a couple of places we use basestring directly. Change-Id: I7a2280dc8168223856a7ad11fb9b3dc44a12fab2
491 lines
17 KiB
Python
491 lines
17 KiB
Python
# Copyright 2013 - Mirantis, Inc.
|
|
# Copyright 2015 - StackStorm, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import copy
|
|
import six
|
|
import traceback
|
|
|
|
from oslo_log import log as logging
|
|
|
|
from mistral import coordination
|
|
from mistral.db.v2 import api as db_api
|
|
from mistral.db.v2.sqlalchemy import models as db_models
|
|
from mistral.engine import action_handler
|
|
from mistral.engine import base
|
|
from mistral.engine import task_handler
|
|
from mistral.engine import utils as eng_utils
|
|
from mistral.engine import workflow_handler as wf_handler
|
|
from mistral.services import action_manager as a_m
|
|
from mistral import utils as u
|
|
from mistral.utils import wf_trace
|
|
from mistral.workbook import parser as spec_parser
|
|
from mistral.workflow import base as wf_base
|
|
from mistral.workflow import commands
|
|
from mistral.workflow import data_flow
|
|
from mistral.workflow import states
|
|
from mistral.workflow import utils as wf_utils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
# Submodules of mistral.engine will throw NoSuchOptError if configuration
|
|
# options required at top level of this __init__.py are not imported before
|
|
# the submodules are referenced.
|
|
|
|
|
|
class DefaultEngine(base.Engine, coordination.Service):
|
|
def __init__(self, engine_client):
|
|
self._engine_client = engine_client
|
|
|
|
coordination.Service.__init__(self, 'engine_group')
|
|
|
|
@u.log_exec(LOG)
|
|
def start_workflow(self, wf_name, wf_input, description='', **params):
|
|
wf_exec_id = None
|
|
|
|
try:
|
|
params = self._canonize_workflow_params(params)
|
|
|
|
with db_api.transaction():
|
|
wf_def = db_api.get_workflow_definition(wf_name)
|
|
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
|
|
|
|
eng_utils.validate_input(wf_def, wf_input, wf_spec)
|
|
|
|
wf_ex = self._create_workflow_execution(
|
|
wf_def,
|
|
wf_spec,
|
|
wf_input,
|
|
description,
|
|
params
|
|
)
|
|
wf_exec_id = wf_ex.id
|
|
|
|
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name)
|
|
|
|
wf_ctrl = wf_base.WorkflowController.get_controller(
|
|
wf_ex,
|
|
wf_spec
|
|
)
|
|
|
|
self._dispatch_workflow_commands(
|
|
wf_ex,
|
|
wf_ctrl.continue_workflow()
|
|
)
|
|
|
|
return wf_ex.get_clone()
|
|
except Exception as e:
|
|
LOG.error(
|
|
"Failed to start workflow '%s' id=%s: %s\n%s",
|
|
wf_name, wf_exec_id, e, traceback.format_exc()
|
|
)
|
|
self._fail_workflow(wf_exec_id, e)
|
|
raise e
|
|
|
|
@u.log_exec(LOG)
|
|
def start_action(self, action_name, action_input,
|
|
description=None, **params):
|
|
with db_api.transaction():
|
|
action_def = action_handler.resolve_definition(action_name)
|
|
resolved_action_input = action_handler.get_action_input(
|
|
action_name,
|
|
action_input
|
|
)
|
|
action = a_m.get_action_class(action_def.name)(
|
|
**resolved_action_input
|
|
)
|
|
|
|
# If we see action is asynchronous, then we enforce 'save_result'.
|
|
if params.get('save_result') or not action.is_sync():
|
|
action_ex = action_handler.create_action_execution(
|
|
action_def,
|
|
resolved_action_input,
|
|
description=description
|
|
)
|
|
|
|
action_handler.run_action(
|
|
action_def,
|
|
resolved_action_input,
|
|
action_ex.id,
|
|
params.get('target')
|
|
)
|
|
|
|
return action_ex.get_clone()
|
|
else:
|
|
output = action_handler.run_action(
|
|
action_def,
|
|
resolved_action_input,
|
|
target=params.get('target'),
|
|
async=False
|
|
)
|
|
|
|
return db_models.ActionExecution(
|
|
name=action_name,
|
|
description=description,
|
|
input=action_input,
|
|
output=output
|
|
)
|
|
|
|
def on_task_state_change(self, task_ex_id, state):
|
|
with db_api.transaction():
|
|
task_ex = db_api.get_task_execution(task_ex_id)
|
|
# TODO(rakhmerov): The method is mostly needed for policy and
|
|
# we are supposed to get the same action execution as when the
|
|
# policy worked.
|
|
|
|
wf_ex_id = task_ex.workflow_execution_id
|
|
|
|
# Must be before loading the object itself (see method doc).
|
|
self._lock_workflow_execution(wf_ex_id)
|
|
|
|
wf_ex = task_ex.workflow_execution
|
|
|
|
wf_trace.info(
|
|
task_ex,
|
|
"Task '%s' [%s -> %s]"
|
|
% (task_ex.name, task_ex.state, state)
|
|
)
|
|
|
|
task_ex.state = state
|
|
|
|
self._on_task_state_change(task_ex, wf_ex)
|
|
|
|
def _on_task_state_change(self, task_ex, wf_ex):
|
|
task_spec = spec_parser.get_task_spec(task_ex.spec)
|
|
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
|
|
|
if task_handler.is_task_completed(task_ex, task_spec):
|
|
task_handler.after_task_complete(task_ex, task_spec, wf_spec)
|
|
|
|
# Ignore DELAYED state.
|
|
if task_ex.state == states.DELAYED:
|
|
return
|
|
|
|
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
|
|
|
|
# Calculate commands to process next.
|
|
cmds = wf_ctrl.continue_workflow()
|
|
|
|
task_ex.processed = True
|
|
|
|
self._dispatch_workflow_commands(wf_ex, cmds)
|
|
|
|
self._check_workflow_completion(wf_ex, wf_ctrl)
|
|
elif task_handler.need_to_continue(task_ex, task_spec):
|
|
# Re-run existing task.
|
|
cmds = [commands.RunExistingTask(task_ex, reset=False)]
|
|
|
|
self._dispatch_workflow_commands(wf_ex, cmds)
|
|
|
|
@staticmethod
|
|
def _check_workflow_completion(wf_ex, wf_ctrl):
|
|
if states.is_paused_or_completed(wf_ex.state):
|
|
return
|
|
|
|
if wf_utils.find_incomplete_task_executions(wf_ex):
|
|
return
|
|
|
|
if wf_ctrl.all_errors_handled():
|
|
wf_handler.succeed_workflow(
|
|
wf_ex,
|
|
wf_ctrl.evaluate_workflow_final_context()
|
|
)
|
|
else:
|
|
state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex)
|
|
|
|
wf_handler.fail_workflow(wf_ex, state_info)
|
|
|
|
@u.log_exec(LOG)
|
|
def on_action_complete(self, action_ex_id, result):
|
|
wf_ex_id = None
|
|
|
|
try:
|
|
with db_api.transaction():
|
|
action_ex = db_api.get_action_execution(action_ex_id)
|
|
|
|
# In case of single action execution there is no
|
|
# assigned task execution.
|
|
if not action_ex.task_execution:
|
|
return action_handler.store_action_result(
|
|
action_ex,
|
|
result
|
|
).get_clone()
|
|
|
|
wf_ex_id = action_ex.task_execution.workflow_execution_id
|
|
|
|
# Must be before loading the object itself (see method doc).
|
|
self._lock_workflow_execution(wf_ex_id)
|
|
|
|
wf_ex = action_ex.task_execution.workflow_execution
|
|
|
|
task_ex = task_handler.on_action_complete(action_ex, result)
|
|
|
|
# If workflow is on pause or completed then there's no
|
|
# need to continue workflow.
|
|
if states.is_paused_or_completed(wf_ex.state):
|
|
return action_ex
|
|
|
|
self._on_task_state_change(task_ex, wf_ex)
|
|
|
|
return action_ex.get_clone()
|
|
except Exception as e:
|
|
# TODO(dzimine): try to find out which command caused failure.
|
|
# TODO(rakhmerov): Need to refactor logging in a more elegant way.
|
|
LOG.error(
|
|
"Failed to handle action execution result [id=%s]: %s\n%s",
|
|
action_ex_id, e, traceback.format_exc()
|
|
)
|
|
self._fail_workflow(wf_ex_id, e)
|
|
raise e
|
|
|
|
@u.log_exec(LOG)
|
|
def pause_workflow(self, execution_id):
|
|
with db_api.transaction():
|
|
# Must be before loading the object itself (see method doc).
|
|
self._lock_workflow_execution(execution_id)
|
|
|
|
wf_ex = db_api.get_workflow_execution(execution_id)
|
|
|
|
wf_handler.set_execution_state(wf_ex, states.PAUSED)
|
|
|
|
return wf_ex
|
|
|
|
def _continue_workflow(self, wf_ex, task_ex=None, reset=True):
|
|
wf_handler.set_execution_state(wf_ex, states.RUNNING)
|
|
|
|
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
|
|
|
|
# Calculate commands to process next.
|
|
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset)
|
|
|
|
# When resuming a workflow we need to ignore all 'pause'
|
|
# commands because workflow controller takes tasks that
|
|
# completed within the period when the workflow was pause.
|
|
cmds = filter(
|
|
lambda c: not isinstance(c, commands.PauseWorkflow),
|
|
cmds
|
|
)
|
|
|
|
# Since there's no explicit task causing the operation
|
|
# we need to mark all not processed tasks as processed
|
|
# because workflow controller takes only completed tasks
|
|
# with flag 'processed' equal to False.
|
|
for t_ex in wf_ex.task_executions:
|
|
if states.is_completed(t_ex.state) and not t_ex.processed:
|
|
t_ex.processed = True
|
|
|
|
self._dispatch_workflow_commands(wf_ex, cmds)
|
|
|
|
if not cmds:
|
|
if not wf_utils.find_incomplete_task_executions(wf_ex):
|
|
wf_handler.succeed_workflow(
|
|
wf_ex,
|
|
wf_ctrl.evaluate_workflow_final_context()
|
|
)
|
|
|
|
return wf_ex.get_clone()
|
|
|
|
@u.log_exec(LOG)
|
|
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True):
|
|
try:
|
|
with db_api.transaction():
|
|
# Must be before loading the object itself (see method doc).
|
|
self._lock_workflow_execution(wf_ex_id)
|
|
|
|
task_ex = db_api.get_task_execution(task_ex_id)
|
|
|
|
if task_ex.workflow_execution.id != wf_ex_id:
|
|
raise ValueError('Workflow execution ID does not match.')
|
|
|
|
wf_ex = task_ex.workflow_execution
|
|
|
|
if wf_ex.state == states.PAUSED:
|
|
return wf_ex.get_clone()
|
|
|
|
return self._continue_workflow(wf_ex, task_ex, reset)
|
|
except Exception as e:
|
|
LOG.error(
|
|
"Failed to rerun execution id=%s at task=%s: %s\n%s",
|
|
wf_ex_id, task_ex_id, e, traceback.format_exc()
|
|
)
|
|
self._fail_workflow(wf_ex_id, e)
|
|
raise e
|
|
|
|
@u.log_exec(LOG)
|
|
def resume_workflow(self, wf_ex_id):
|
|
try:
|
|
with db_api.transaction():
|
|
# Must be before loading the object itself (see method doc).
|
|
self._lock_workflow_execution(wf_ex_id)
|
|
|
|
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
|
|
|
if wf_ex.state != states.PAUSED:
|
|
return wf_ex.get_clone()
|
|
|
|
return self._continue_workflow(wf_ex)
|
|
except Exception as e:
|
|
LOG.error(
|
|
"Failed to resume execution id=%s: %s\n%s",
|
|
wf_ex_id, e, traceback.format_exc()
|
|
)
|
|
self._fail_workflow(wf_ex_id, e)
|
|
raise e
|
|
|
|
@u.log_exec(LOG)
|
|
def stop_workflow(self, execution_id, state, message=None):
|
|
with db_api.transaction():
|
|
# Must be before loading the object itself (see method doc).
|
|
self._lock_workflow_execution(execution_id)
|
|
|
|
wf_ex = db_api.get_execution(execution_id)
|
|
|
|
return self._stop_workflow(wf_ex, state, message)
|
|
|
|
@staticmethod
|
|
def _stop_workflow(wf_ex, state, message=None):
|
|
if state == states.SUCCESS:
|
|
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
|
|
|
|
final_context = {}
|
|
try:
|
|
final_context = wf_ctrl.evaluate_workflow_final_context()
|
|
except Exception as e:
|
|
LOG.warn(
|
|
"Failed to get final context for %s: %s" % (wf_ex, e)
|
|
)
|
|
return wf_handler.succeed_workflow(
|
|
wf_ex,
|
|
final_context,
|
|
message
|
|
)
|
|
elif state == states.ERROR:
|
|
return wf_handler.fail_workflow(wf_ex, message)
|
|
|
|
return wf_ex
|
|
|
|
@u.log_exec(LOG)
|
|
def rollback_workflow(self, execution_id):
|
|
# TODO(rakhmerov): Implement.
|
|
raise NotImplementedError
|
|
|
|
def _dispatch_workflow_commands(self, wf_ex, wf_cmds):
|
|
if not wf_cmds:
|
|
return
|
|
|
|
for cmd in wf_cmds:
|
|
if isinstance(cmd, commands.RunTask) and cmd.is_waiting():
|
|
task_handler.defer_task(cmd)
|
|
elif isinstance(cmd, commands.RunTask):
|
|
task_handler.run_new_task(cmd)
|
|
elif isinstance(cmd, commands.RunExistingTask):
|
|
task_handler.run_existing_task(
|
|
cmd.task_ex.id,
|
|
reset=cmd.reset
|
|
)
|
|
elif isinstance(cmd, commands.SetWorkflowState):
|
|
if states.is_completed(cmd.new_state):
|
|
self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
|
|
else:
|
|
wf_handler.set_execution_state(wf_ex, cmd.new_state)
|
|
elif isinstance(cmd, commands.Noop):
|
|
# Do nothing.
|
|
pass
|
|
else:
|
|
raise RuntimeError('Unsupported workflow command: %s' % cmd)
|
|
|
|
if wf_ex.state != states.RUNNING:
|
|
break
|
|
|
|
@staticmethod
|
|
def _fail_workflow(wf_ex_id, err, action_ex_id=None):
|
|
"""Private helper to fail workflow on exceptions."""
|
|
with db_api.transaction():
|
|
err_msg = str(err)
|
|
|
|
wf_ex = db_api.load_workflow_execution(wf_ex_id)
|
|
|
|
if wf_ex is None:
|
|
LOG.error(
|
|
"Cant fail workflow execution with id='%s': not found.",
|
|
wf_ex_id
|
|
)
|
|
return
|
|
|
|
wf_handler.set_execution_state(wf_ex, states.ERROR, err_msg)
|
|
|
|
if action_ex_id:
|
|
# Note(dzimine): Don't call self.engine_client:
|
|
# 1) to avoid computing and triggering next tasks
|
|
# 2) to avoid a loop in case of error in transport
|
|
action_ex = db_api.get_action_execution(action_ex_id)
|
|
|
|
task_handler.on_action_complete(
|
|
action_ex,
|
|
wf_utils.Result(error=err_msg)
|
|
)
|
|
|
|
@staticmethod
|
|
def _canonize_workflow_params(params):
|
|
# Resolve environment parameter.
|
|
env = params.get('env', {})
|
|
|
|
if not isinstance(env, dict) and not isinstance(env, six.string_types):
|
|
raise ValueError(
|
|
'Unexpected type for environment [environment=%s]' % str(env)
|
|
)
|
|
|
|
if isinstance(env, six.string_types):
|
|
env_db = db_api.get_environment(env)
|
|
env = env_db.variables
|
|
params['env'] = env
|
|
|
|
return params
|
|
|
|
@staticmethod
|
|
def _create_workflow_execution(wf_def, wf_spec, wf_input, description,
|
|
params):
|
|
wf_ex = db_api.create_workflow_execution({
|
|
'name': wf_def.name,
|
|
'description': description,
|
|
'workflow_name': wf_def.name,
|
|
'spec': wf_spec.to_dict(),
|
|
'params': params or {},
|
|
'state': states.RUNNING,
|
|
'input': wf_input or {},
|
|
'output': {},
|
|
'context': copy.copy(wf_input) or {},
|
|
'task_execution_id': params.get('task_execution_id'),
|
|
'runtime_context': {
|
|
'with_items_index': params.get('with_items_index', 0)
|
|
},
|
|
})
|
|
|
|
data_flow.add_openstack_data_to_context(wf_ex)
|
|
data_flow.add_execution_to_context(wf_ex)
|
|
data_flow.add_environment_to_context(wf_ex)
|
|
data_flow.add_workflow_variables_to_context(wf_ex, wf_spec)
|
|
|
|
return wf_ex
|
|
|
|
@staticmethod
|
|
def _lock_workflow_execution(wf_exec_id):
|
|
# NOTE: Workflow execution object must be locked before
|
|
# loading the object itself into the session (either with
|
|
# 'get_XXX' or 'load_XXX' methods). Otherwise, there can be
|
|
# multiple parallel transactions that see the same state
|
|
# and hence the rest of the method logic would not be atomic.
|
|
db_api.acquire_lock(db_models.WorkflowExecution, wf_exec_id)
|