mistral/mistral/engine/workflow_handler.py

98 lines
3.0 KiB
Python

# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db.v2 import api as db_api
from mistral.engine import rpc
from mistral import exceptions as exc
from mistral.services import scheduler
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
def succeed_workflow(wf_ex, final_context, state_info=None):
set_execution_state(wf_ex, states.SUCCESS, state_info)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_ex.output = data_flow.evaluate_workflow_output(wf_spec, final_context)
if wf_ex.task_execution_id:
_schedule_send_result_to_parent_workflow(wf_ex)
return wf_ex
def fail_workflow(wf_ex, state_info):
if states.is_paused_or_completed(wf_ex.state):
return wf_ex
set_execution_state(wf_ex, states.ERROR, state_info)
if wf_ex.task_execution_id:
_schedule_send_result_to_parent_workflow(wf_ex)
return wf_ex
def _schedule_send_result_to_parent_workflow(wf_ex):
scheduler.schedule_call(
None,
'mistral.engine.workflow_handler.send_result_to_parent_workflow',
0,
wf_ex_id=wf_ex.id
)
def send_result_to_parent_workflow(wf_ex_id):
wf_ex = db_api.get_workflow_execution(wf_ex_id)
if wf_ex.state == states.SUCCESS:
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(data=wf_ex.output)
)
elif wf_ex.state == states.ERROR:
err_msg = 'Failed subworkflow [execution_id=%s]' % wf_ex.id
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(error=err_msg)
)
def set_execution_state(wf_ex, state, state_info=None):
cur_state = wf_ex.state
if states.is_valid_transition(cur_state, state):
wf_ex.state = state
wf_ex.state_info = state_info
wf_trace.info(
wf_ex,
"Execution of workflow '%s' [%s -> %s]"
% (wf_ex.workflow_name, cur_state, state)
)
else:
msg = ("Can't change workflow execution state from %s to %s. "
"[workflow=%s, execution_id=%s]" %
(cur_state, state, wf_ex.name, wf_ex.id))
raise exc.WorkflowException(msg)
# Workflow result should be accepted by parent workflows (if any)
# only if it completed successfully.
wf_ex.accepted = wf_ex.state == states.SUCCESS