Merge "Refactor Mistral Engine"
This commit is contained in:
commit
5e042c7403
@ -199,7 +199,7 @@ def validate_long_type_length(cls, field_name, value):
|
||||
size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb
|
||||
|
||||
# If the size is unlimited.
|
||||
if (size_limit_kb < 0):
|
||||
if size_limit_kb < 0:
|
||||
return
|
||||
|
||||
size_kb = int(sys.getsizeof(str(value)) / 1024)
|
||||
@ -394,6 +394,8 @@ class CronTrigger(mb.MistralSecureModelBase):
|
||||
# Register all hooks related to secure models.
|
||||
mb.register_secure_model_hooks()
|
||||
|
||||
# TODO(rakhmerov): This is a bad solution. It's hard to find in the code,
|
||||
# configure flexibly etc. Fix it.
|
||||
# Register an event listener to verify that the size of all the long columns
|
||||
# affected by the user do not exceed the limit configuration.
|
||||
for attr_name in ['input', 'output', 'params', 'published']:
|
||||
|
@ -12,302 +12,81 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import traceback as tb
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import rpc
|
||||
from mistral.engine import utils as e_utils
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral.engine import actions
|
||||
from mistral.engine import task_handler
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.services import action_manager as a_m
|
||||
from mistral.services import security
|
||||
from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
def create_action_execution(action_def, action_input, task_ex=None,
|
||||
index=0, description=''):
|
||||
# TODO(rakhmerov): We can avoid hitting DB at all when calling things like
|
||||
# create_action_execution(), these operations can be just done using
|
||||
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
|
||||
# send necessary SQL queries to DB. Currently, session flush happens
|
||||
# on every operation which may not be optimal. The problem with using just
|
||||
# session level cache is in generating ids. Ids are generated only on
|
||||
# session flush. And now we have a lot places where we need to have ids
|
||||
# before TX completion.
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
# Otherwise, the input property of the action execution DB object needs
|
||||
# to be updated with the action execution ID after the action execution
|
||||
# DB object is created.
|
||||
action_ex_id = utils.generate_unicode_uuid()
|
||||
|
||||
if a_m.has_action_context(
|
||||
action_def.action_class, action_def.attributes or {}) and task_ex:
|
||||
action_input.update(a_m.get_action_context(task_ex, action_ex_id))
|
||||
def on_action_complete(action_ex, result):
|
||||
task_ex = action_ex.task_execution
|
||||
|
||||
values = {
|
||||
'id': action_ex_id,
|
||||
'name': action_def.name,
|
||||
'spec': action_def.spec,
|
||||
'state': states.RUNNING,
|
||||
'input': action_input,
|
||||
'runtime_context': {'with_items_index': index},
|
||||
'description': description
|
||||
}
|
||||
action = _build_action(action_ex)
|
||||
|
||||
try:
|
||||
action.complete(result)
|
||||
except exc.MistralException as e:
|
||||
msg = ("Failed to complete action [action=%s, task=%s]: %s\n%s" %
|
||||
(action_ex.name, task_ex.name, e, tb.format_exc()))
|
||||
|
||||
LOG.error(msg)
|
||||
|
||||
action.fail(msg)
|
||||
|
||||
if task_ex:
|
||||
values.update({
|
||||
'task_execution_id': task_ex.id,
|
||||
'workflow_name': task_ex.workflow_name,
|
||||
'workflow_id': task_ex.workflow_id,
|
||||
'project_id': task_ex.project_id,
|
||||
})
|
||||
else:
|
||||
values.update({
|
||||
'project_id': security.get_project_id(),
|
||||
})
|
||||
task_handler.fail_task(task_ex, msg)
|
||||
|
||||
action_ex = db_api.create_action_execution(values)
|
||||
return
|
||||
|
||||
if task_ex:
|
||||
# Add to collection explicitly so that it's in a proper
|
||||
# state within the current session.
|
||||
task_ex.executions.append(action_ex)
|
||||
|
||||
return action_ex
|
||||
task_handler.on_action_complete(action_ex)
|
||||
|
||||
|
||||
def _inject_action_ctx_for_validating(action_def, input_dict):
|
||||
if a_m.has_action_context(action_def.action_class, action_def.attributes):
|
||||
input_dict.update(a_m.get_empty_action_context())
|
||||
def _build_action(action_ex):
|
||||
if isinstance(action_ex, models.WorkflowExecution):
|
||||
return actions.WorkflowAction(None, action_ex=action_ex)
|
||||
|
||||
wf_name = None
|
||||
wf_spec_name = None
|
||||
|
||||
def get_action_input(action_name, input_dict, wf_name=None, wf_spec=None):
|
||||
action_def = resolve_action_definition(
|
||||
action_name,
|
||||
wf_name,
|
||||
wf_spec.get_name() if wf_spec else None
|
||||
if action_ex.workflow_name:
|
||||
wf_name = action_ex.workflow_name
|
||||
wf_spec = spec_parser.get_workflow_spec(
|
||||
action_ex.task_execution.workflow_execution.spec
|
||||
)
|
||||
wf_spec_name = wf_spec.get_name()
|
||||
|
||||
if action_def.action_class:
|
||||
_inject_action_ctx_for_validating(action_def, input_dict)
|
||||
adhoc_action_name = action_ex.runtime_context.get('adhoc_action_name')
|
||||
|
||||
# NOTE(xylan): Don't validate action input if action initialization method
|
||||
# contains ** argument.
|
||||
if '**' not in action_def.input:
|
||||
e_utils.validate_input(action_def, input_dict)
|
||||
|
||||
if action_def.spec:
|
||||
# Ad-hoc action.
|
||||
return _get_adhoc_action_input(
|
||||
action_def,
|
||||
input_dict,
|
||||
wf_name,
|
||||
wf_spec
|
||||
)
|
||||
|
||||
return input_dict
|
||||
|
||||
|
||||
def _get_adhoc_action_input(action_def, input_dict,
|
||||
wf_name=None, wf_spec=None):
|
||||
action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
base_name = action_spec.get_base()
|
||||
|
||||
action_def = resolve_action_definition(
|
||||
base_name,
|
||||
wf_name if wf_name else None,
|
||||
wf_spec.get_name() if wf_spec else None
|
||||
)
|
||||
|
||||
_inject_action_ctx_for_validating(action_def, input_dict)
|
||||
e_utils.validate_input(action_def, input_dict, action_spec)
|
||||
|
||||
base_input = action_spec.get_base_input()
|
||||
|
||||
if base_input:
|
||||
input_dict = expr.evaluate_recursively(
|
||||
base_input,
|
||||
input_dict
|
||||
)
|
||||
else:
|
||||
input_dict = {}
|
||||
|
||||
return input_dict
|
||||
|
||||
|
||||
def run_action(action_def, action_input,
|
||||
action_ex_id=None, target=None, async=True):
|
||||
action_result = rpc.get_executor_client().run_action(
|
||||
action_ex_id,
|
||||
action_def.action_class,
|
||||
action_def.attributes or {},
|
||||
action_input,
|
||||
target,
|
||||
async
|
||||
)
|
||||
|
||||
if action_result:
|
||||
return _get_action_output(action_result)
|
||||
|
||||
|
||||
def _get_action_output(result):
|
||||
"""Returns action output.
|
||||
|
||||
:param result: ActionResult instance or ActionResult dict
|
||||
:return: dict containing result.
|
||||
"""
|
||||
if isinstance(result, dict):
|
||||
result = wf_utils.Result(result.get('data'), result.get('error'))
|
||||
|
||||
return ({'result': result.data}
|
||||
if result.is_success() else {'result': result.error})
|
||||
|
||||
|
||||
def store_action_result(action_ex, result):
|
||||
prev_state = action_ex.state
|
||||
|
||||
action_ex.state = states.SUCCESS if result.is_success() else states.ERROR
|
||||
action_ex.output = _get_action_output(result)
|
||||
|
||||
action_ex.accepted = True
|
||||
|
||||
_log_action_result(action_ex, prev_state, action_ex.state, result)
|
||||
|
||||
return action_ex
|
||||
|
||||
|
||||
def _log_action_result(action_ex, from_state, to_state, result):
|
||||
def _result_msg():
|
||||
if action_ex.state == states.ERROR:
|
||||
return "error = %s" % utils.cut(result.error)
|
||||
|
||||
return "result = %s" % utils.cut(result.data)
|
||||
|
||||
wf_trace.info(
|
||||
None,
|
||||
"Action execution '%s' [%s -> %s, %s]" %
|
||||
(action_ex.name, from_state, to_state, _result_msg())
|
||||
)
|
||||
|
||||
|
||||
def run_existing_action(action_ex_id, target):
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
action_def = db_api.get_action_definition(action_ex.name)
|
||||
|
||||
return run_action(
|
||||
action_def,
|
||||
action_ex.input,
|
||||
action_ex_id,
|
||||
target
|
||||
)
|
||||
|
||||
|
||||
def resolve_definition(action_name, task_ex=None, wf_spec=None):
|
||||
if task_ex and wf_spec:
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
action_def = resolve_action_definition(
|
||||
action_name,
|
||||
wf_ex.workflow_name,
|
||||
wf_spec.get_name()
|
||||
)
|
||||
else:
|
||||
action_def = resolve_action_definition(action_name)
|
||||
|
||||
if action_def.spec:
|
||||
# Ad-hoc action.
|
||||
action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
base_name = action_spec.get_base()
|
||||
|
||||
action_def = resolve_action_definition(
|
||||
base_name,
|
||||
task_ex.workflow_name if task_ex else None,
|
||||
wf_spec.get_name() if wf_spec else None
|
||||
)
|
||||
|
||||
return action_def
|
||||
|
||||
|
||||
def resolve_action_definition(action_spec_name, wf_name=None,
|
||||
wf_spec_name=None):
|
||||
action_db = None
|
||||
|
||||
if wf_name and wf_name != wf_spec_name:
|
||||
# If workflow belongs to a workbook then check
|
||||
# action within the same workbook (to be able to
|
||||
# use short names within workbooks).
|
||||
# If it doesn't exist then use a name from spec
|
||||
# to find an action in DB.
|
||||
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
|
||||
|
||||
action_full_name = "%s.%s" % (wb_name, action_spec_name)
|
||||
|
||||
action_db = db_api.load_action_definition(action_full_name)
|
||||
|
||||
if not action_db:
|
||||
action_db = db_api.load_action_definition(action_spec_name)
|
||||
|
||||
if not action_db:
|
||||
raise exc.InvalidActionException(
|
||||
"Failed to find action [action_name=%s]" % action_spec_name
|
||||
)
|
||||
|
||||
return action_db
|
||||
|
||||
|
||||
def transform_result(result, task_ex, task_spec):
|
||||
"""Transforms task result accounting for ad-hoc actions.
|
||||
|
||||
In case if the given result is an action result and action is
|
||||
an ad-hoc action the method transforms the result according to
|
||||
ad-hoc action configuration.
|
||||
|
||||
:param result: Result of task action/workflow.
|
||||
:param task_ex: Task DB model.
|
||||
:param task_spec: Task specification.
|
||||
"""
|
||||
if result.is_error():
|
||||
return result
|
||||
|
||||
action_spec_name = task_spec.get_action_name()
|
||||
|
||||
if action_spec_name:
|
||||
wf_ex = task_ex.workflow_execution
|
||||
wf_spec_name = wf_ex.spec['name']
|
||||
|
||||
return transform_action_result(
|
||||
action_spec_name,
|
||||
result,
|
||||
wf_ex.workflow_name,
|
||||
wf_spec_name,
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def transform_action_result(action_spec_name, result,
|
||||
wf_name=None, wf_spec_name=None):
|
||||
action_def = resolve_action_definition(
|
||||
action_spec_name,
|
||||
if adhoc_action_name:
|
||||
action_def = actions.resolve_action_definition(
|
||||
adhoc_action_name,
|
||||
wf_name,
|
||||
wf_spec_name
|
||||
)
|
||||
|
||||
if not action_def.spec:
|
||||
return result
|
||||
return actions.AdHocAction(action_def, action_ex=action_ex)
|
||||
|
||||
transformer = spec_parser.get_action_spec(action_def.spec).get_output()
|
||||
|
||||
if transformer is None:
|
||||
return result
|
||||
|
||||
return wf_utils.Result(
|
||||
data=expr.evaluate_recursively(transformer, result.data),
|
||||
error=result.error
|
||||
action_def = actions.resolve_action_definition(
|
||||
action_ex.name,
|
||||
wf_name,
|
||||
wf_spec_name
|
||||
)
|
||||
|
||||
return actions.PythonAction(action_def, action_ex=action_ex)
|
||||
|
||||
|
||||
def build_action_by_name(action_name):
|
||||
action_def = actions.resolve_action_definition(action_name)
|
||||
|
||||
action_cls = (actions.PythonAction if not action_def.spec
|
||||
else actions.AdHocAction)
|
||||
|
||||
return action_cls(action_def)
|
||||
|
477
mistral/engine/actions.py
Normal file
477
mistral/engine/actions.py
Normal file
@ -0,0 +1,477 @@
|
||||
# Copyright 2016 - Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import rpc
|
||||
from mistral.engine import utils as e_utils
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.services import action_manager as a_m
|
||||
from mistral.services import executions as wf_ex_service
|
||||
from mistral.services import scheduler
|
||||
from mistral.services import security
|
||||
from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_RUN_EXISTING_ACTION_PATH = 'mistral.engine.actions._run_existing_action'
|
||||
_RESUME_WORKFLOW_PATH = 'mistral.engine.actions._resume_workflow'
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Action(object):
|
||||
"""Action.
|
||||
|
||||
Represents a workflow action and defines interface that can be used by
|
||||
Mistral engine or its components in order to manipulate with actions.
|
||||
"""
|
||||
|
||||
def __init__(self, action_def, action_ex=None, task_ex=None):
|
||||
self.action_def = action_def
|
||||
self.action_ex = action_ex
|
||||
self.task_ex = action_ex.task_execution if action_ex else task_ex
|
||||
|
||||
@abc.abstractmethod
|
||||
def complete(self, result):
|
||||
"""Complete action and process its result.
|
||||
|
||||
:param result: Action result.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def fail(self, msg):
|
||||
# When we set an ERROR state we should safely set output value getting
|
||||
# w/o exceptions due to field size limitations.
|
||||
msg = utils.cut_by_kb(
|
||||
msg,
|
||||
cfg.CONF.engine.execution_field_size_limit_kb
|
||||
)
|
||||
|
||||
self.action_ex.state = states.ERROR
|
||||
self.action_ex.output = {'result': msg}
|
||||
|
||||
@abc.abstractmethod
|
||||
def schedule(self, input_dict, target, index=0, desc=''):
|
||||
"""Schedule action run.
|
||||
|
||||
This method is needed to schedule action run so its result can
|
||||
be received later by engine. In this sense it will be running in
|
||||
asynchronous mode from engine perspective (don't confuse with
|
||||
executor asynchrony when executor doesn't immediately send a
|
||||
result).
|
||||
|
||||
:param input_dict: Action input.
|
||||
:param target: Target (group of action executors).
|
||||
:param index: Action execution index. Makes sense for some types.
|
||||
:param desc: Action execution description.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self, input_dict, target, index=0, desc='', save=True):
|
||||
"""Immediately run action.
|
||||
|
||||
This method runs method w/o scheduling its run for a later time.
|
||||
From engine perspective action will be processed in synchronous
|
||||
mode.
|
||||
|
||||
:param input_dict: Action input.
|
||||
:param target: Target (group of action executors).
|
||||
:param index: Action execution index. Makes sense for some types.
|
||||
:param desc: Action execution description.
|
||||
:param save: True if action execution object needs to be saved.
|
||||
:return: Action output.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def validate_input(self, input_dict):
|
||||
"""Validates action input parameters.
|
||||
|
||||
:param input_dict: Dictionary with input parameters.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def is_sync(self, input_dict):
|
||||
"""Determines if action is synchronous.
|
||||
|
||||
:param input_dict: Dictionary with input parameters.
|
||||
"""
|
||||
return True
|
||||
|
||||
def _create_action_execution(self, input_dict, runtime_ctx, desc=''):
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
# Otherwise, the input property of the action execution DB object needs
|
||||
# to be updated with the action execution ID after the action execution
|
||||
# DB object is created.
|
||||
action_ex_id = utils.generate_unicode_uuid()
|
||||
|
||||
# TODO(rakhmerov): Bad place, we probably need to push action context
|
||||
# to all actions. It's related to
|
||||
# https://blueprints.launchpad.net/mistral/+spec/mistral-custom-actions-api
|
||||
if a_m.has_action_context(
|
||||
self.action_def.action_class,
|
||||
self.action_def.attributes or {}) and self.task_ex:
|
||||
input_dict.update(
|
||||
a_m.get_action_context(self.task_ex, action_ex_id)
|
||||
)
|
||||
|
||||
values = {
|
||||
'id': action_ex_id,
|
||||
'name': self.action_def.name,
|
||||
'spec': self.action_def.spec,
|
||||
'state': states.RUNNING,
|
||||
'input': input_dict,
|
||||
'runtime_context': runtime_ctx,
|
||||
'description': desc
|
||||
}
|
||||
|
||||
if self.task_ex:
|
||||
values.update({
|
||||
'task_execution_id': self.task_ex.id,
|
||||
'workflow_name': self.task_ex.workflow_name,
|
||||
'workflow_id': self.task_ex.workflow_id,
|
||||
'project_id': self.task_ex.project_id,
|
||||
})
|
||||
else:
|
||||
values.update({
|
||||
'project_id': security.get_project_id(),
|
||||
})
|
||||
|
||||
self.action_ex = db_api.create_action_execution(values)
|
||||
|
||||
if self.task_ex:
|
||||
# Add to collection explicitly so that it's in a proper
|
||||
# state within the current session.
|
||||
self.task_ex.executions.append(self.action_ex)
|
||||
|
||||
def _inject_action_ctx_for_validating(self, input_dict):
|
||||
if a_m.has_action_context(
|
||||
self.action_def.action_class, self.action_def.attributes):
|
||||
input_dict.update(a_m.get_empty_action_context())
|
||||
|
||||
def _log_result(self, prev_state, result):
|
||||
state = self.action_ex.state
|
||||
|
||||
def _result_msg():
|
||||
if state == states.ERROR:
|
||||
return "error = %s" % utils.cut(result.error)
|
||||
|
||||
return "result = %s" % utils.cut(result.data)
|
||||
|
||||
wf_trace.info(
|
||||
None,
|
||||
"Action execution '%s' [%s -> %s, %s]" %
|
||||
(self.action_ex.name, prev_state, state, _result_msg())
|
||||
)
|
||||
|
||||
|
||||
class PythonAction(Action):
|
||||
"""Regular Python action."""
|
||||
|
||||
def complete(self, result):
|
||||
if states.is_completed(self.action_ex.state):
|
||||
return
|
||||
|
||||
prev_state = self.action_ex.state
|
||||
|
||||
self.action_ex.state = (states.SUCCESS if result.is_success()
|
||||
else states.ERROR)
|
||||
self.action_ex.output = self._prepare_output(result)
|
||||
self.action_ex.accepted = True
|
||||
|
||||
self._log_result(prev_state, result)
|
||||
|
||||
def schedule(self, input_dict, target, index=0, desc=''):
|
||||
self._create_action_execution(
|
||||
self._prepare_input(input_dict),
|
||||
self._prepare_runtime_context(index),
|
||||
desc=desc
|
||||
)
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
_RUN_EXISTING_ACTION_PATH,
|
||||
0,
|
||||
action_ex_id=self.action_ex.id,
|
||||
target=target
|
||||
)
|
||||
|
||||
def run(self, input_dict, target, index=0, desc='', save=True):
|
||||
input_dict = self._prepare_input(input_dict)
|
||||
runtime_ctx = self._prepare_runtime_context(index)
|
||||
|
||||
if save:
|
||||
self._create_action_execution(input_dict, runtime_ctx, desc=desc)
|
||||
|
||||
result = rpc.get_executor_client().run_action(
|
||||
self.action_ex.id if self.action_ex else None,
|
||||
self.action_def.action_class,
|
||||
self.action_def.attributes or {},
|
||||
input_dict,
|
||||
target,
|
||||
async=False
|
||||
)
|
||||
|
||||
return self._prepare_output(result)
|
||||
|
||||
def is_sync(self, input_dict):
|
||||
input_dict = self._prepare_input(input_dict)
|
||||
|
||||
a = a_m.get_action_class(self.action_def.name)(**input_dict)
|
||||
|
||||
return a.is_sync()
|
||||
|
||||
def validate_input(self, input_dict):
|
||||
if self.action_def.action_class:
|
||||
self._inject_action_ctx_for_validating(input_dict)
|
||||
|
||||
# TODO(rakhmerov): I'm not sure what this is for.
|
||||
# NOTE(xylan): Don't validate action input if action initialization
|
||||
# method contains ** argument.
|
||||
if '**' not in self.action_def.input:
|
||||
e_utils.validate_input(self.action_def, input_dict)
|
||||
|
||||
def _prepare_input(self, input_dict):
|
||||
"""Template method to do manipulations with input parameters.
|
||||
|
||||
Python action doesn't do anything specific with initial input.
|
||||
"""
|
||||
return input_dict
|
||||
|
||||
def _prepare_output(self, result):
|
||||
"""Template method to do manipulations with action result.
|
||||
|
||||
Python action just wraps action result into dict that can
|
||||
be stored in DB.
|
||||
"""
|
||||
return _get_action_output(result) if result else None
|
||||
|
||||
def _prepare_runtime_context(self, index):
|
||||
"""Template method to prepare action runtime context.
|
||||
|
||||
Python action inserts index into runtime context.
|
||||
"""
|
||||
return {'index': index}
|
||||
|
||||
|
||||
class AdHocAction(PythonAction):
|
||||
"""Ad-hoc action."""
|
||||
|
||||
def __init__(self, action_def, action_ex=None, task_ex=None):
|
||||
self.action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
base_action_def = db_api.get_action_definition(
|
||||
self.action_spec.get_base()
|
||||
)
|
||||
|
||||
super(AdHocAction, self).__init__(
|
||||
base_action_def,
|
||||
action_ex,
|
||||
task_ex
|
||||
)
|
||||
|
||||
self.adhoc_action_def = action_def
|
||||
|
||||
def validate_input(self, input_dict):
|
||||
e_utils.validate_input(
|
||||
self.adhoc_action_def,
|
||||
input_dict,
|
||||
self.action_spec
|
||||
)
|
||||
|
||||
super(AdHocAction, self).validate_input(
|
||||
self._prepare_input(input_dict)
|
||||
)
|
||||
|
||||
def _prepare_input(self, input_dict):
|
||||
base_input_expr = self.action_spec.get_base_input()
|
||||
|
||||
if base_input_expr:
|
||||
base_input_dict = expr.evaluate_recursively(
|
||||
base_input_expr,
|
||||
input_dict
|
||||
)
|
||||
else:
|
||||
base_input_dict = {}
|
||||
|
||||
return super(AdHocAction, self)._prepare_input(base_input_dict)
|
||||
|
||||
def _prepare_output(self, result):
|
||||
# In case of error, we don't transform a result.
|
||||
if not result.is_error():
|
||||
adhoc_action_spec = spec_parser.get_action_spec(
|
||||
self.adhoc_action_def.spec
|
||||
)
|
||||
|
||||
transformer = adhoc_action_spec.get_output()
|
||||
|
||||
if transformer is not None:
|
||||
result = wf_utils.Result(
|
||||
data=expr.evaluate_recursively(transformer, result.data),
|
||||
error=result.error
|
||||
)
|
||||
|
||||
return _get_action_output(result) if result else None
|
||||
|
||||
def _prepare_runtime_context(self, index):
|
||||
ctx = super(AdHocAction, self)._prepare_runtime_context(index)
|
||||
|
||||
# Insert special field into runtime context so that we track
|
||||
# a relationship between python action and adhoc action.
|
||||
return utils.merge_dicts(
|
||||
ctx,
|
||||
{'adhoc_action_name': self.adhoc_action_def.name}
|
||||
)
|
||||
|
||||
|
||||
class WorkflowAction(Action):
|
||||
"""Workflow action."""
|
||||
|
||||
def complete(self, result):
|
||||
# No-op because in case of workflow result is already processed.
|
||||
pass
|
||||
|
||||
def schedule(self, input_dict, target, index=0, desc=''):
|
||||
parent_wf_ex = self.task_ex.workflow_execution
|
||||
parent_wf_spec = spec_parser.get_workflow_spec(parent_wf_ex.spec)
|
||||
|
||||
task_spec = spec_parser.get_task_spec(self.task_ex.spec)
|
||||
|
||||
wf_spec_name = task_spec.get_workflow_name()
|
||||
|
||||
wf_def = e_utils.resolve_workflow_definition(
|
||||
parent_wf_ex.workflow_name,
|
||||
parent_wf_spec.get_name(),
|
||||
wf_spec_name
|
||||
)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
|
||||
|
||||
wf_params = {
|
||||
'task_execution_id': self.task_ex.id,
|
||||
'index': index
|
||||
}
|
||||
|
||||
if 'env' in parent_wf_ex.params:
|
||||
wf_params['env'] = parent_wf_ex.params['env']
|
||||
|
||||
for k, v in list(input_dict.items()):
|
||||
if k not in wf_spec.get_input():
|
||||
wf_params[k] = v
|
||||
del input_dict[k]
|
||||
|
||||
wf_ex, _ = wf_ex_service.create_workflow_execution(
|
||||
wf_def.name,
|
||||
input_dict,
|
||||
"sub-workflow execution",
|
||||
wf_params,
|
||||
wf_spec
|
||||
)
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
_RESUME_WORKFLOW_PATH,
|
||||
0,
|
||||
wf_ex_id=wf_ex.id,
|
||||
env=None
|
||||
)
|
||||
|
||||
# TODO(rakhmerov): Add info logging.
|
||||
|
||||
def run(self, input_dict, target, index=0, desc='', save=True):
|
||||
raise NotImplemented('Does not apply to this WorkflowAction.')
|
||||
|
||||
def is_sync(self, input_dict):
|
||||
# Workflow action is always asynchronous.
|
||||
return False
|
||||
|
||||
def validate_input(self, input_dict):
|
||||
# TODO(rakhmerov): Implement.
|
||||
pass
|
||||
|
||||
|
||||
def _resume_workflow(wf_ex_id, env):
|
||||
rpc.get_engine_client().resume_workflow(wf_ex_id, env=env)
|
||||
|
||||
|
||||
def _run_existing_action(action_ex_id, target):
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
action_def = db_api.get_action_definition(action_ex.name)
|
||||
|
||||
result = rpc.get_executor_client().run_action(
|
||||
action_ex_id,
|
||||
action_def.action_class,
|
||||
action_def.attributes or {},
|
||||
action_ex.input,
|
||||
target
|
||||
)
|
||||
|
||||
return _get_action_output(result) if result else None
|
||||
|
||||
|
||||
def _get_action_output(result):
|
||||
"""Returns action output.
|
||||
|
||||
:param result: ActionResult instance or ActionResult dict
|
||||
:return: dict containing result.
|
||||
"""
|
||||
if isinstance(result, dict):
|
||||
result = wf_utils.Result(result.get('data'), result.get('error'))
|
||||
|
||||
return ({'result': result.data}
|
||||
if result.is_success() else {'result': result.error})
|
||||
|
||||
|
||||
def resolve_action_definition(action_spec_name, wf_name=None,
|
||||
wf_spec_name=None):
|
||||
"""Resolve action definition accounting for ad-hoc action namespacing.
|
||||
|
||||
:param action_spec_name: Action name according to a spec.
|
||||
:param wf_name: Workflow name.
|
||||
:param wf_spec_name: Workflow name according to a spec.
|
||||
:return: Action definition (python or ad-hoc).
|
||||
"""
|
||||
action_db = None
|
||||
|
||||
if wf_name and wf_name != wf_spec_name:
|
||||
# If workflow belongs to a workbook then check
|
||||
# action within the same workbook (to be able to
|
||||
# use short names within workbooks).
|
||||
# If it doesn't exist then use a name from spec
|
||||
# to find an action in DB.
|
||||
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
|
||||
|
||||
action_full_name = "%s.%s" % (wb_name, action_spec_name)
|
||||
|
||||
action_db = db_api.load_action_definition(action_full_name)
|
||||
|
||||
if not action_db:
|
||||
action_db = db_api.load_action_definition(action_spec_name)
|
||||
|
||||
if not action_db:
|
||||
raise exc.InvalidActionException(
|
||||
"Failed to find action [action_name=%s]" % action_spec_name
|
||||
)
|
||||
|
||||
return action_db
|
@ -13,8 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral import coordination
|
||||
@ -22,19 +20,14 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import action_handler
|
||||
from mistral.engine import base
|
||||
from mistral.engine import task_handler
|
||||
from mistral.engine import dispatcher
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import action_manager as a_m
|
||||
from mistral.services import executions as wf_ex_service
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral import utils as u
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import base as wf_base
|
||||
from mistral.workflow import commands
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -53,85 +46,47 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
@u.log_exec(LOG)
|
||||
def start_workflow(self, wf_identifier, wf_input, description='',
|
||||
**params):
|
||||
wf_ex_id = None
|
||||
|
||||
try:
|
||||
# Create a persistent workflow execution in a separate transaction
|
||||
# so that we can return it even in case of unexpected errors that
|
||||
# lead to transaction rollback.
|
||||
with db_api.transaction():
|
||||
# TODO(rakhmerov): It needs to be hidden in workflow_handler and
|
||||
# Workflow abstraction.
|
||||
# The new workflow execution will be in an IDLE
|
||||
# state on initial record creation.
|
||||
wf_ex_id, wf_spec = wf_ex_service.create_workflow_execution(
|
||||
wf_ex, wf_spec = wf_ex_service.create_workflow_execution(
|
||||
wf_identifier,
|
||||
wf_input,
|
||||
description,
|
||||
params
|
||||
)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
wf_handler.set_execution_state(wf_ex, states.RUNNING)
|
||||
wf_handler.set_workflow_state(wf_ex, states.RUNNING)
|
||||
|
||||
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
|
||||
|
||||
self._dispatch_workflow_commands(
|
||||
wf_ex,
|
||||
wf_ctrl.continue_workflow(),
|
||||
wf_spec
|
||||
)
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
|
||||
dispatcher.dispatch_workflow_commands(wf_ex, cmds)
|
||||
|
||||
return wf_ex.get_clone()
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
"Failed to start workflow '%s' id=%s: %s\n%s",
|
||||
wf_identifier, wf_ex_id, e, traceback.format_exc()
|
||||
)
|
||||
|
||||
wf_ex = self._fail_workflow(wf_ex_id, e)
|
||||
|
||||
if wf_ex:
|
||||
return wf_ex.get_clone()
|
||||
|
||||
raise e
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def start_action(self, action_name, action_input,
|
||||
description=None, **params):
|
||||
with db_api.transaction():
|
||||
action_def = action_handler.resolve_definition(action_name)
|
||||
resolved_action_input = action_handler.get_action_input(
|
||||
action_name,
|
||||
action_input
|
||||
)
|
||||
action = a_m.get_action_class(action_def.name)(
|
||||
**resolved_action_input
|
||||
)
|
||||
action = action_handler.build_action_by_name(action_name)
|
||||
|
||||
# If we see action is asynchronous, then we enforce 'save_result'.
|
||||
if params.get('save_result') or not action.is_sync():
|
||||
action_ex = action_handler.create_action_execution(
|
||||
action_def,
|
||||
resolved_action_input,
|
||||
description=description
|
||||
)
|
||||
action.validate_input(action_input)
|
||||
|
||||
action_handler.run_action(
|
||||
action_def,
|
||||
resolved_action_input,
|
||||
action_ex.id,
|
||||
params.get('target')
|
||||
)
|
||||
save = params.get('save_result')
|
||||
target = params.get('target')
|
||||
|
||||
return action_ex.get_clone()
|
||||
else:
|
||||
output = action_handler.run_action(
|
||||
action_def,
|
||||
resolved_action_input,
|
||||
target=params.get('target'),
|
||||
async=False
|
||||
)
|
||||
if save or not action.is_sync(action_input):
|
||||
action.schedule(action_input, target)
|
||||
|
||||
return action.action_ex.get_clone()
|
||||
|
||||
output = action.run(action_input, target, save=save)
|
||||
|
||||
# Action execution is not created but we need to return similar
|
||||
# object to a client anyway.
|
||||
return db_models.ActionExecution(
|
||||
name=action_name,
|
||||
description=description,
|
||||
@ -139,154 +94,36 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
output=output
|
||||
)
|
||||
|
||||
def on_task_state_change(self, task_ex_id, state, state_info=None):
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
# TODO(rakhmerov): The method is mostly needed for policy and
|
||||
# we are supposed to get the same action execution as when the
|
||||
# policy worked.
|
||||
|
||||
wf_ex_id = task_ex.workflow_execution_id
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
wf_trace.info(
|
||||
task_ex,
|
||||
"Task '%s' [%s -> %s] state_info : %s"
|
||||
% (task_ex.name, task_ex.state, state, state_info)
|
||||
)
|
||||
|
||||
task_ex.state = state
|
||||
task_ex.state_info = state_info
|
||||
|
||||
self._on_task_state_change(task_ex, wf_ex, wf_spec)
|
||||
|
||||
def _on_task_state_change(self, task_ex, wf_ex, wf_spec):
|
||||
task_spec = wf_spec.get_tasks()[task_ex.name]
|
||||
|
||||
if task_handler.is_task_completed(task_ex, task_spec):
|
||||
task_handler.after_task_complete(task_ex, task_spec, wf_spec)
|
||||
|
||||
# Ignore DELAYED state.
|
||||
if task_ex.state == states.RUNNING_DELAYED:
|
||||
return
|
||||
|
||||
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
|
||||
|
||||
# Calculate commands to process next.
|
||||
try:
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
except exc.YaqlEvaluationException as e:
|
||||
LOG.error(
|
||||
'YAQL error occurred while calculating next workflow '
|
||||
'commands [wf_ex_id=%s, task_ex_id=%s]: %s',
|
||||
wf_ex.id, task_ex.id, e
|
||||
)
|
||||
|
||||
wf_handler.fail_workflow(wf_ex, str(e))
|
||||
|
||||
return
|
||||
|
||||
# Mark task as processed after all decisions have been made
|
||||
# upon its completion.
|
||||
task_ex.processed = True
|
||||
|
||||
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
|
||||
|
||||
self._check_workflow_completion(wf_ex, wf_ctrl, wf_spec)
|
||||
elif task_handler.need_to_continue(task_ex, task_spec):
|
||||
# Re-run existing task.
|
||||
cmds = [commands.RunExistingTask(task_ex, reset=False)]
|
||||
|
||||
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
|
||||
|
||||
@staticmethod
|
||||
def _check_workflow_completion(wf_ex, wf_ctrl, wf_spec):
|
||||
if states.is_paused_or_completed(wf_ex.state):
|
||||
return
|
||||
|
||||
# Workflow is not completed if there are any incomplete task
|
||||
# executions that are not in WAITING state. If all incomplete
|
||||
# tasks are waiting and there are unhandled errors, then these
|
||||
# tasks will not reach completion. In this case, mark the
|
||||
# workflow complete.
|
||||
incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex)
|
||||
|
||||
if any(not states.is_waiting(t.state) for t in incomplete_tasks):
|
||||
return
|
||||
|
||||
if wf_ctrl.all_errors_handled():
|
||||
wf_handler.succeed_workflow(
|
||||
wf_ex,
|
||||
wf_ctrl.evaluate_workflow_final_context(),
|
||||
wf_spec
|
||||
)
|
||||
else:
|
||||
state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex)
|
||||
|
||||
wf_handler.fail_workflow(wf_ex, state_info)
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def on_action_complete(self, action_ex_id, result):
|
||||
wf_ex_id = None
|
||||
|
||||
try:
|
||||
with db_api.transaction():
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
|
||||
# In case of single action execution there is no
|
||||
# assigned task execution.
|
||||
if not action_ex.task_execution:
|
||||
return action_handler.store_action_result(
|
||||
action_ex,
|
||||
result
|
||||
).get_clone()
|
||||
task_ex = action_ex.task_execution
|
||||
|
||||
wf_ex_id = action_ex.task_execution.workflow_execution_id
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
task_ex = task_handler.on_action_complete(
|
||||
action_ex,
|
||||
wf_spec,
|
||||
result
|
||||
if task_ex:
|
||||
wf_handler.lock_workflow_execution(
|
||||
task_ex.workflow_execution_id
|
||||
)
|
||||
|
||||
# If workflow is on pause or completed then there's no
|
||||
# need to continue workflow.
|
||||
if states.is_paused_or_completed(wf_ex.state):
|
||||
return action_ex.get_clone()
|
||||
|
||||
self._on_task_state_change(task_ex, wf_ex, wf_spec)
|
||||
action_handler.on_action_complete(action_ex, result)
|
||||
|
||||
return action_ex.get_clone()
|
||||
except Exception as e:
|
||||
# TODO(rakhmerov): Need to refactor logging in a more elegant way.
|
||||
LOG.error(
|
||||
'Failed to handle action execution result [id=%s]: %s\n%s',
|
||||
action_ex_id, e, traceback.format_exc()
|
||||
)
|
||||
|
||||
# If an exception was thrown after we got the wf_ex_id
|
||||
if wf_ex_id:
|
||||
self._fail_workflow(wf_ex_id, e)
|
||||
|
||||
raise e
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def pause_workflow(self, wf_ex_id):
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
wf_handler.set_execution_state(wf_ex, states.PAUSED)
|
||||
wf_handler.set_workflow_state(wf_ex, states.PAUSED)
|
||||
|
||||
return wf_ex
|
||||
|
||||
def _continue_workflow(self, wf_ex, task_ex=None, reset=True, env=None):
|
||||
@staticmethod
|
||||
def _continue_workflow(wf_ex, task_ex=None, reset=True, env=None):
|
||||
wf_ex = wf_service.update_workflow_execution_env(wf_ex, env)
|
||||
|
||||
wf_handler.set_execution_state(
|
||||
wf_handler.set_workflow_state(
|
||||
wf_ex,
|
||||
states.RUNNING,
|
||||
set_upstream=True
|
||||
@ -294,13 +131,15 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
|
||||
wf_ctrl = wf_base.get_controller(wf_ex)
|
||||
|
||||
# TODO(rakhmerov): Add YAQL error handling.
|
||||
# TODO(rakhmerov): Add error handling.
|
||||
# Calculate commands to process next.
|
||||
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
|
||||
|
||||
# When resuming a workflow we need to ignore all 'pause'
|
||||
# commands because workflow controller takes tasks that
|
||||
# completed within the period when the workflow was paused.
|
||||
# TODO(rakhmerov): This all should be in workflow handler, it's too
|
||||
# specific for engine level.
|
||||
cmds = list(
|
||||
filter(
|
||||
lambda c: not isinstance(c, commands.PauseWorkflow),
|
||||
@ -316,23 +155,16 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
if states.is_completed(t_ex.state) and not t_ex.processed:
|
||||
t_ex.processed = True
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
|
||||
dispatcher.dispatch_workflow_commands(wf_ex, cmds)
|
||||
|
||||
if not cmds:
|
||||
if not wf_utils.find_incomplete_task_executions(wf_ex):
|
||||
wf_handler.succeed_workflow(
|
||||
wf_ex,
|
||||
wf_ctrl.evaluate_workflow_final_context(),
|
||||
wf_spec
|
||||
)
|
||||
wf_handler.check_workflow_completion(wf_ex)
|
||||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None):
|
||||
try:
|
||||
# TODO(rakhmerov): Rewrite this functionality with Task abstraction.
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
@ -344,18 +176,12 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
if wf_ex.state == states.PAUSED:
|
||||
return wf_ex.get_clone()
|
||||
|
||||
# TODO(rakhmerov): This should be a call to workflow handler.
|
||||
return self._continue_workflow(wf_ex, task_ex, reset, env=env)
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
"Failed to rerun execution id=%s at task=%s: %s\n%s",
|
||||
wf_ex_id, task_ex_id, e, traceback.format_exc()
|
||||
)
|
||||
self._fail_workflow(wf_ex_id, e)
|
||||
raise e
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def resume_workflow(self, wf_ex_id, env=None):
|
||||
try:
|
||||
# TODO(rakhmerov): Rewrite this functionality with Task abstraction.
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
@ -364,117 +190,15 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
return wf_ex.get_clone()
|
||||
|
||||
return self._continue_workflow(wf_ex, env=env)
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
"Failed to resume execution id=%s: %s\n%s",
|
||||
wf_ex_id, e, traceback.format_exc()
|
||||
)
|
||||
self._fail_workflow(wf_ex_id, e)
|
||||
raise e
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def stop_workflow(self, wf_ex_id, state, message=None):
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
return self._stop_workflow(wf_ex, state, message)
|
||||
|
||||
@staticmethod
|
||||
def _stop_workflow(wf_ex, state, message=None):
|
||||
if state == states.SUCCESS:
|
||||
wf_ctrl = wf_base.get_controller(wf_ex)
|
||||
|
||||
final_context = {}
|
||||
|
||||
try:
|
||||
final_context = wf_ctrl.evaluate_workflow_final_context()
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
'Failed to get final context for %s: %s' % (wf_ex, e)
|
||||
)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
return wf_handler.succeed_workflow(
|
||||
wf_ex,
|
||||
final_context,
|
||||
wf_spec,
|
||||
message
|
||||
)
|
||||
elif state == states.ERROR:
|
||||
return wf_handler.fail_workflow(wf_ex, message)
|
||||
|
||||
return wf_ex
|
||||
return wf_handler.stop_workflow(wf_ex, state, message)
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def rollback_workflow(self, wf_ex_id):
|
||||
# TODO(rakhmerov): Implement.
|
||||
raise NotImplementedError
|
||||
|
||||
def _dispatch_workflow_commands(self, wf_ex, wf_cmds, wf_spec):
|
||||
if not wf_cmds:
|
||||
return
|
||||
|
||||
for cmd in wf_cmds:
|
||||
if isinstance(cmd, commands.RunTask) and cmd.is_waiting():
|
||||
task_handler.defer_task(cmd)
|
||||
elif isinstance(cmd, commands.RunTask):
|
||||
task_ex = task_handler.run_new_task(cmd, wf_spec)
|
||||
|
||||
if task_ex.state == states.ERROR:
|
||||
wf_handler.fail_workflow(
|
||||
wf_ex,
|
||||
'Failed to start task [task_ex=%s]: %s' %
|
||||
(task_ex, task_ex.state_info)
|
||||
)
|
||||
elif isinstance(cmd, commands.RunExistingTask):
|
||||
task_ex = task_handler.run_existing_task(
|
||||
cmd.task_ex.id,
|
||||
reset=cmd.reset
|
||||
)
|
||||
|
||||
if task_ex.state == states.ERROR:
|
||||
wf_handler.fail_workflow(
|
||||
wf_ex,
|
||||
'Failed to start task [task_ex=%s]: %s' %
|
||||
(task_ex, task_ex.state_info)
|
||||
)
|
||||
elif isinstance(cmd, commands.SetWorkflowState):
|
||||
if states.is_completed(cmd.new_state):
|
||||
self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
|
||||
else:
|
||||
wf_handler.set_execution_state(wf_ex, cmd.new_state)
|
||||
elif isinstance(cmd, commands.Noop):
|
||||
# Do nothing.
|
||||
pass
|
||||
else:
|
||||
raise RuntimeError('Unsupported workflow command: %s' % cmd)
|
||||
|
||||
if wf_ex.state != states.RUNNING:
|
||||
break
|
||||
|
||||
# TODO(rakhmerov): This method may not be needed at all because error
|
||||
# handling is now implemented too roughly w/o distinguishing different
|
||||
# errors. On most errors (like YAQLException) we shouldn't rollback
|
||||
# transactions, we just need to fail corresponding execution objects
|
||||
# where a problem happened (action, task or workflow).
|
||||
@staticmethod
|
||||
def _fail_workflow(wf_ex_id, exc):
|
||||
"""Private helper to fail workflow on exceptions."""
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.load_workflow_execution(wf_ex_id)
|
||||
|
||||
if wf_ex is None:
|
||||
LOG.error(
|
||||
"Can't fail workflow execution with id='%s': not found.",
|
||||
wf_ex_id
|
||||
)
|
||||
return None
|
||||
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
|
||||
if not states.is_paused_or_completed(wf_ex.state):
|
||||
wf_handler.set_execution_state(wf_ex, states.ERROR, str(exc))
|
||||
|
||||
return wf_ex
|
||||
|
46
mistral/engine/dispatcher.py
Normal file
46
mistral/engine/dispatcher.py
Normal file
@ -0,0 +1,46 @@
|
||||
# Copyright 2016 - Nokia Networks
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from mistral import exceptions as exc
|
||||
from mistral.workflow import commands
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
def dispatch_workflow_commands(wf_ex, wf_cmds):
|
||||
# TODO(rakhmerov): I don't like these imports but otherwise we have
|
||||
# import cycles.
|
||||
from mistral.engine import task_handler
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
|
||||
if not wf_cmds:
|
||||
return
|
||||
|
||||
for cmd in wf_cmds:
|
||||
if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
|
||||
task_handler.run_task(cmd)
|
||||
elif isinstance(cmd, commands.SetWorkflowState):
|
||||
# TODO(rakhmerov): Make just a single call to workflow_handler
|
||||
if states.is_completed(cmd.new_state):
|
||||
wf_handler.stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
|
||||
else:
|
||||
wf_handler.set_workflow_state(wf_ex, cmd.new_state)
|
||||
elif isinstance(cmd, commands.Noop):
|
||||
# Do nothing.
|
||||
pass
|
||||
else:
|
||||
raise exc.MistralError('Unsupported workflow command: %s' % cmd)
|
||||
|
||||
if wf_ex.state != states.RUNNING:
|
||||
break
|
@ -15,7 +15,6 @@
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import base
|
||||
from mistral.engine import rpc
|
||||
from mistral import expressions
|
||||
from mistral.services import scheduler
|
||||
from mistral.utils import wf_trace
|
||||
@ -24,8 +23,8 @@ from mistral.workflow import states
|
||||
|
||||
import six
|
||||
|
||||
_ENGINE_CLIENT_PATH = 'mistral.engine.rpc.get_engine_client'
|
||||
_RUN_EXISTING_TASK_PATH = 'mistral.engine.task_handler.run_existing_task'
|
||||
_CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task'
|
||||
_COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task'
|
||||
|
||||
|
||||
def _log_task_delay(task_ex, delay_sec):
|
||||
@ -180,7 +179,7 @@ class WaitBeforePolicy(base.TaskPolicy):
|
||||
policy_context = runtime_context[context_key]
|
||||
|
||||
if policy_context.get('skip'):
|
||||
# Unset state 'DELAYED'.
|
||||
# Unset state 'RUNNING_DELAYED'.
|
||||
wf_trace.info(
|
||||
task_ex,
|
||||
"Task '%s' [%s -> %s]"
|
||||
@ -193,13 +192,16 @@ class WaitBeforePolicy(base.TaskPolicy):
|
||||
|
||||
if task_ex.state != states.IDLE:
|
||||
policy_context.update({'skip': True})
|
||||
|
||||
_log_task_delay(task_ex, self.delay)
|
||||
|
||||
task_ex.state = states.RUNNING_DELAYED
|
||||
|
||||
# TODO(rakhmerov): This is wrong as task handler doesn't manage
|
||||
# transactions and hence it can't be called explicitly.
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
_RUN_EXISTING_TASK_PATH,
|
||||
_CONTINUE_TASK_PATH,
|
||||
self.delay,
|
||||
task_ex_id=task_ex.id,
|
||||
)
|
||||
@ -228,6 +230,7 @@ class WaitAfterPolicy(base.TaskPolicy):
|
||||
task_ex.runtime_context = runtime_context
|
||||
|
||||
policy_context = runtime_context[context_key]
|
||||
|
||||
if policy_context.get('skip'):
|
||||
# Skip, already processed.
|
||||
return
|
||||
@ -236,17 +239,25 @@ class WaitAfterPolicy(base.TaskPolicy):
|
||||
|
||||
_log_task_delay(task_ex, self.delay)
|
||||
|
||||
state = task_ex.state
|
||||
end_state = task_ex.state
|
||||
end_state_info = task_ex.state_info
|
||||
|
||||
# TODO(rakhmerov): Policies probably needs to have tasks.Task
|
||||
# interface in order to change manage task state safely.
|
||||
# Set task state to 'DELAYED'.
|
||||
task_ex.state = states.RUNNING_DELAYED
|
||||
task_ex.state_info = (
|
||||
'Suspended by wait-after policy for %s seconds' % self.delay
|
||||
)
|
||||
|
||||
# Schedule to change task state to RUNNING again.
|
||||
scheduler.schedule_call(
|
||||
_ENGINE_CLIENT_PATH,
|
||||
'on_task_state_change',
|
||||
None,
|
||||
_COMPLETE_TASK_PATH,
|
||||
self.delay,
|
||||
state=state,
|
||||
task_ex_id=task_ex.id,
|
||||
state=end_state,
|
||||
state_info=end_state_info
|
||||
)
|
||||
|
||||
|
||||
@ -339,7 +350,7 @@ class RetryPolicy(base.TaskPolicy):
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
_RUN_EXISTING_TASK_PATH,
|
||||
_CONTINUE_TASK_PATH,
|
||||
self.delay,
|
||||
task_ex_id=task_ex.id,
|
||||
)
|
||||
@ -360,7 +371,7 @@ class TimeoutPolicy(base.TaskPolicy):
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.engine.policies.fail_task_if_incomplete',
|
||||
'mistral.engine.policies._fail_task_if_incomplete',
|
||||
self.delay,
|
||||
task_ex_id=task_ex.id,
|
||||
timeout=self.delay
|
||||
@ -424,21 +435,35 @@ class ConcurrencyPolicy(base.TaskPolicy):
|
||||
task_ex.runtime_context = runtime_context
|
||||
|
||||
|
||||
def fail_task_if_incomplete(task_ex_id, timeout):
|
||||
def _continue_task(task_ex_id):
|
||||
from mistral.engine import task_handler
|
||||
|
||||
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed.
|
||||
task_handler.continue_task(db_api.get_task_execution(task_ex_id))
|
||||
|
||||
|
||||
def _complete_task(task_ex_id, state, state_info):
|
||||
from mistral.engine import task_handler
|
||||
|
||||
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed.
|
||||
task_handler.complete_task(
|
||||
db_api.get_task_execution(task_ex_id),
|
||||
state,
|
||||
state_info
|
||||
)
|
||||
|
||||
|
||||
def _fail_task_if_incomplete(task_ex_id, timeout):
|
||||
from mistral.engine import task_handler
|
||||
|
||||
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed.
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
|
||||
if not states.is_completed(task_ex.state):
|
||||
msg = "Task timed out [id=%s, timeout(s)=%s]." % (task_ex_id, timeout)
|
||||
msg = 'Task timed out [timeout(s)=%s].' % timeout
|
||||
|
||||
wf_trace.info(task_ex, msg)
|
||||
|
||||
wf_trace.info(
|
||||
task_ex,
|
||||
"Task '%s' [%s -> ERROR]" % (task_ex.name, task_ex.state)
|
||||
)
|
||||
|
||||
rpc.get_engine_client().on_task_state_change(
|
||||
task_ex_id,
|
||||
task_handler.complete_task(
|
||||
db_api.get_task_execution(task_ex_id),
|
||||
states.ERROR,
|
||||
msg
|
||||
)
|
||||
|
@ -503,11 +503,12 @@ class ExecutorClient(base.Executor):
|
||||
:param transport: Messaging transport.
|
||||
:type transport: Transport.
|
||||
"""
|
||||
self.topic = cfg.CONF.executor.topic
|
||||
|
||||
serializer = auth_ctx.RpcContextSerializer(
|
||||
auth_ctx.JsonPayloadSerializer()
|
||||
)
|
||||
|
||||
self.topic = cfg.CONF.executor.topic
|
||||
self._client = messaging.RPCClient(
|
||||
transport,
|
||||
messaging.Target(),
|
||||
@ -539,8 +540,15 @@ class ExecutorClient(base.Executor):
|
||||
|
||||
rpc_client_method = call_ctx.cast if async else call_ctx.call
|
||||
|
||||
return rpc_client_method(
|
||||
auth_ctx.ctx(),
|
||||
'run_action',
|
||||
**kwargs
|
||||
res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs)
|
||||
|
||||
# TODO(rakhmerov): It doesn't seem a good approach since we have
|
||||
# a serializer for Result class. A better solution would be to
|
||||
# use a composite serializer that dispatches serialization and
|
||||
# deserialization to concrete serializers depending on object
|
||||
# type.
|
||||
|
||||
return (
|
||||
wf_utils.Result(data=res['data'], error=res['error'])
|
||||
if res else None
|
||||
)
|
||||
|
@ -1,5 +1,6 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
# Copyright 2016 - Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -13,28 +14,15 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
|
||||
from oslo_log import log as logging
|
||||
import traceback as tb
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral.engine import action_handler
|
||||
from mistral.engine import policies
|
||||
from mistral.engine import rpc
|
||||
from mistral.engine import utils as e_utils
|
||||
from mistral.engine import tasks
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.services import executions as wf_ex_service
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import commands as wf_cmds
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral.workflow import with_items
|
||||
|
||||
|
||||
"""Responsible for running tasks and handling results."""
|
||||
@ -42,548 +30,145 @@ from mistral.workflow import with_items
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_existing_task(task_ex_id, reset=True):
|
||||
"""This function runs existing task execution.
|
||||
def run_task(wf_cmd):
|
||||
"""Runs workflow task.
|
||||
|
||||
It is needed mostly by scheduler.
|
||||
|
||||
:param task_ex_id: Task execution id.
|
||||
:param reset: Reset action executions for the task.
|
||||
:param wf_cmd: Workflow command.
|
||||
"""
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
task_spec = spec_parser.get_task_spec(task_ex.spec)
|
||||
wf_def = db_api.get_workflow_definition(task_ex.workflow_name)
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
|
||||
|
||||
# Throw exception if the existing task already succeeded.
|
||||
if task_ex.state == states.SUCCESS:
|
||||
raise exc.EngineException(
|
||||
'Rerunning existing task that already succeeded is not supported.'
|
||||
)
|
||||
task = _build_task_from_command(wf_cmd)
|
||||
|
||||
# Exit if the existing task failed and reset is not instructed.
|
||||
# For a with-items task without reset, re-running the existing
|
||||
# task will re-run the failed and unstarted items.
|
||||
if (task_ex.state == states.ERROR and not reset and
|
||||
not task_spec.get_with_items()):
|
||||
return task_ex
|
||||
|
||||
# Reset nested executions only if task is not already RUNNING.
|
||||
if task_ex.state != states.RUNNING:
|
||||
# Reset state of processed task and related action executions.
|
||||
if reset:
|
||||
action_exs = task_ex.executions
|
||||
else:
|
||||
action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_ex.id,
|
||||
state=states.ERROR,
|
||||
accepted=True
|
||||
)
|
||||
|
||||
for action_ex in action_exs:
|
||||
action_ex.accepted = False
|
||||
|
||||
# Explicitly change task state to RUNNING.
|
||||
set_task_state(task_ex, states.RUNNING, None, processed=False)
|
||||
|
||||
_run_existing_task(task_ex, task_spec, wf_spec)
|
||||
|
||||
return task_ex
|
||||
|
||||
|
||||
def _run_existing_task(task_ex, task_spec, wf_spec):
|
||||
try:
|
||||
input_dicts = _get_input_dictionaries(
|
||||
wf_spec,
|
||||
task_ex,
|
||||
task_spec,
|
||||
task_ex.in_context
|
||||
)
|
||||
task.run()
|
||||
except exc.MistralException as e:
|
||||
LOG.error(
|
||||
'An error while calculating task action inputs'
|
||||
' [task_execution_id=%s]: %s',
|
||||
task_ex.id, e
|
||||
wf_ex = wf_cmd.wf_ex
|
||||
task_spec = wf_cmd.task_spec
|
||||
|
||||
msg = (
|
||||
"Failed to run task [wf=%s, task=%s]: %s\n%s" %
|
||||
(wf_ex, task_spec.get_name(), e, tb.format_exc())
|
||||
)
|
||||
|
||||
set_task_state(task_ex, states.ERROR, str(e))
|
||||
LOG.error(msg)
|
||||
|
||||
task.set_state(states.ERROR, msg)
|
||||
|
||||
wf_handler.fail_workflow(wf_ex, msg)
|
||||
|
||||
return
|
||||
|
||||
# In some cases we can have no input, e.g. in case of 'with-items'.
|
||||
if input_dicts:
|
||||
for index, input_d in input_dicts:
|
||||
_run_action_or_workflow(
|
||||
task_ex,
|
||||
task_spec,
|
||||
input_d,
|
||||
index,
|
||||
wf_spec
|
||||
)
|
||||
else:
|
||||
_schedule_noop_action(task_ex, task_spec, wf_spec)
|
||||
if task.is_completed():
|
||||
wf_handler.check_workflow_completion(wf_cmd.wf_ex)
|
||||
|
||||
|
||||
def defer_task(wf_cmd):
|
||||
"""Defers a task"""
|
||||
ctx = wf_cmd.ctx
|
||||
wf_ex = wf_cmd.wf_ex
|
||||
task_spec = wf_cmd.task_spec
|
||||
def on_action_complete(action_ex):
|
||||
"""Handles action completion event.
|
||||
|
||||
if wf_utils.find_task_executions_by_spec(wf_ex, task_spec):
|
||||
return None
|
||||
|
||||
return _create_task_execution(
|
||||
wf_ex,
|
||||
task_spec,
|
||||
ctx,
|
||||
state=states.WAITING
|
||||
)
|
||||
|
||||
|
||||
def run_new_task(wf_cmd, wf_spec):
|
||||
"""Runs a task."""
|
||||
ctx = wf_cmd.ctx
|
||||
wf_ex = wf_cmd.wf_ex
|
||||
task_spec = wf_cmd.task_spec
|
||||
|
||||
# NOTE(xylan): Need to think how to get rid of this weird judgment to keep
|
||||
# it more consistent with the function name.
|
||||
task_ex = wf_utils.find_task_execution_with_state(
|
||||
wf_ex,
|
||||
task_spec,
|
||||
states.WAITING
|
||||
)
|
||||
|
||||
if task_ex:
|
||||
set_task_state(task_ex, states.RUNNING, None)
|
||||
task_ex.in_context = ctx
|
||||
else:
|
||||
task_ex = _create_task_execution(wf_ex, task_spec, ctx)
|
||||
|
||||
LOG.debug(
|
||||
'Starting workflow task [workflow=%s, task_spec=%s, init_state=%s]' %
|
||||
(wf_ex.name, task_spec, task_ex.state)
|
||||
)
|
||||
|
||||
# TODO(rakhmerov): 'concurrency' policy should keep a number of running
|
||||
# actions/workflows under control so it can't be implemented if it runs
|
||||
# before any action executions are created.
|
||||
before_task_start(task_ex, task_spec, wf_spec)
|
||||
|
||||
# Policies could possibly change task state.
|
||||
if task_ex.state != states.RUNNING:
|
||||
return task_ex
|
||||
|
||||
_run_existing_task(task_ex, task_spec, wf_spec)
|
||||
|
||||
return task_ex
|
||||
|
||||
|
||||
def on_action_complete(action_ex, wf_spec, result):
|
||||
"""Handles event of action result arrival.
|
||||
|
||||
Given action result this method changes corresponding task execution
|
||||
object. This method must never be called for the case of individual
|
||||
action which is not associated with any tasks.
|
||||
|
||||
:param action_ex: Action execution objects the result belongs to.
|
||||
:param wf_spec: Workflow specification.
|
||||
:param result: Task action/workflow output wrapped into
|
||||
mistral.workflow.utils.Result instance.
|
||||
:return Task execution object.
|
||||
:param action_ex: Action execution.
|
||||
"""
|
||||
|
||||
task_ex = action_ex.task_execution
|
||||
|
||||
# Ignore if action already completed.
|
||||
if (states.is_completed(action_ex.state) and not
|
||||
isinstance(action_ex, models.WorkflowExecution)):
|
||||
return task_ex
|
||||
if not task_ex:
|
||||
return
|
||||
|
||||
task_spec = wf_spec.get_tasks()[task_ex.name]
|
||||
task_spec = spec_parser.get_task_spec(task_ex.spec)
|
||||
|
||||
try:
|
||||
result = action_handler.transform_result(result, task_ex, task_spec)
|
||||
except exc.YaqlEvaluationException as e:
|
||||
err_msg = str(e)
|
||||
|
||||
LOG.error(
|
||||
'YAQL error while transforming action result'
|
||||
' [action_execution_id=%s, result=%s]: %s',
|
||||
action_ex.id, result, err_msg
|
||||
)
|
||||
|
||||
result = wf_utils.Result(error=err_msg)
|
||||
|
||||
# Ignore workflow executions because they're handled during
|
||||
# workflow completion.
|
||||
if not isinstance(action_ex, models.WorkflowExecution):
|
||||
action_handler.store_action_result(action_ex, result)
|
||||
|
||||
if result.is_success():
|
||||
task_state = states.SUCCESS
|
||||
task_state_info = None
|
||||
else:
|
||||
task_state = states.ERROR
|
||||
task_state_info = result.error
|
||||
|
||||
if not task_spec.get_with_items():
|
||||
_complete_task(task_ex, task_spec, task_state, task_state_info)
|
||||
else:
|
||||
with_items.increase_capacity(task_ex)
|
||||
|
||||
if with_items.is_completed(task_ex):
|
||||
_complete_task(
|
||||
task_ex,
|
||||
task_spec,
|
||||
with_items.get_final_state(task_ex),
|
||||
task_state_info
|
||||
)
|
||||
|
||||
return task_ex
|
||||
|
||||
|
||||
def _create_task_execution(wf_ex, task_spec, ctx, state=states.RUNNING):
|
||||
task_ex = db_api.create_task_execution({
|
||||
'name': task_spec.get_name(),
|
||||
'workflow_execution_id': wf_ex.id,
|
||||
'workflow_name': wf_ex.workflow_name,
|
||||
'workflow_id': wf_ex.workflow_id,
|
||||
'state': state,
|
||||
'spec': task_spec.to_dict(),
|
||||
'in_context': ctx,
|
||||
'published': {},
|
||||
'runtime_context': {},
|
||||
'project_id': wf_ex.project_id
|
||||
})
|
||||
|
||||
# Add to collection explicitly so that it's in a proper
|
||||
# state within the current session.
|
||||
wf_ex.task_executions.append(task_ex)
|
||||
|
||||
return task_ex
|
||||
|
||||
|
||||
def before_task_start(task_ex, task_spec, wf_spec):
|
||||
for p in policies.build_policies(task_spec.get_policies(), wf_spec):
|
||||
p.before_task_start(task_ex, task_spec)
|
||||
|
||||
|
||||
def after_task_complete(task_ex, task_spec, wf_spec):
|
||||
for p in policies.build_policies(task_spec.get_policies(), wf_spec):
|
||||
p.after_task_complete(task_ex, task_spec)
|
||||
|
||||
|
||||
def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx):
|
||||
"""Calculates a collection of inputs for task action/workflow.
|
||||
|
||||
If the given task is not configured as 'with-items' then return list
|
||||
will consist of one dictionary containing input that task action/workflow
|
||||
should run with.
|
||||
In case of 'with-items' the result list will contain input dictionaries
|
||||
for all 'with-items' iterations correspondingly.
|
||||
|
||||
:return the list of tuples containing indexes
|
||||
and the corresponding input dict.
|
||||
"""
|
||||
|
||||
if not task_spec.get_with_items():
|
||||
input_dict = _get_workflow_or_action_input(
|
||||
wf_spec,
|
||||
task_ex,
|
||||
task_spec,
|
||||
ctx
|
||||
)
|
||||
|
||||
return enumerate([input_dict])
|
||||
else:
|
||||
return _get_with_items_input(wf_spec, task_ex, task_spec, ctx)
|
||||
|
||||
|
||||
def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx):
|
||||
if task_spec.get_action_name():
|
||||
return _get_action_input(
|
||||
wf_spec,
|
||||
task_ex,
|
||||
task_spec,
|
||||
ctx
|
||||
)
|
||||
elif task_spec.get_workflow_name():
|
||||
return _get_workflow_input(task_spec, ctx)
|
||||
else:
|
||||
raise RuntimeError('Must never happen.')
|
||||
|
||||
|
||||
def _get_with_items_input(wf_spec, task_ex, task_spec, ctx):
|
||||
"""Calculate input array for separating each action input.
|
||||
|
||||
Example:
|
||||
DSL:
|
||||
with_items:
|
||||
- itemX in <% $.arrayI %>
|
||||
- itemY in <% $.arrayJ %>
|
||||
|
||||
Assume arrayI = [1, 2], arrayJ = ['a', 'b'].
|
||||
with_items_input = {
|
||||
"itemX": [1, 2],
|
||||
"itemY": ['a', 'b']
|
||||
}
|
||||
|
||||
Then we get separated input:
|
||||
inputs_per_item = [
|
||||
{'itemX': 1, 'itemY': 'a'},
|
||||
{'itemX': 2, 'itemY': 'b'}
|
||||
]
|
||||
|
||||
:return: the list of tuples containing indexes
|
||||
and the corresponding input dict.
|
||||
"""
|
||||
with_items_inputs = expr.evaluate_recursively(
|
||||
task_spec.get_with_items(), ctx
|
||||
)
|
||||
|
||||
with_items.validate_input(with_items_inputs)
|
||||
|
||||
inputs_per_item = []
|
||||
|
||||
for key, value in with_items_inputs.items():
|
||||
for index, item in enumerate(value):
|
||||
iter_context = {key: item}
|
||||
|
||||
if index >= len(inputs_per_item):
|
||||
inputs_per_item.append(iter_context)
|
||||
else:
|
||||
inputs_per_item[index].update(iter_context)
|
||||
|
||||
action_inputs = []
|
||||
|
||||
for item_input in inputs_per_item:
|
||||
new_ctx = utils.merge_dicts(item_input, ctx)
|
||||
|
||||
action_inputs.append(_get_workflow_or_action_input(
|
||||
wf_spec, task_ex, task_spec, new_ctx
|
||||
))
|
||||
|
||||
with_items.prepare_runtime_context(task_ex, task_spec, action_inputs)
|
||||
|
||||
indices = with_items.get_indices_for_loop(task_ex)
|
||||
with_items.decrease_capacity(task_ex, len(indices))
|
||||
|
||||
if indices:
|
||||
current_inputs = operator.itemgetter(*indices)(action_inputs)
|
||||
|
||||
return zip(
|
||||
indices,
|
||||
current_inputs if isinstance(current_inputs, tuple)
|
||||
else [current_inputs]
|
||||
)
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def _get_action_input(wf_spec, task_ex, task_spec, ctx):
|
||||
input_dict = expr.evaluate_recursively(task_spec.get_input(), ctx)
|
||||
|
||||
action_spec_name = task_spec.get_action_name()
|
||||
|
||||
input_dict = utils.merge_dicts(
|
||||
input_dict,
|
||||
_get_action_defaults(task_ex, task_spec),
|
||||
overwrite=False
|
||||
)
|
||||
|
||||
return action_handler.get_action_input(
|
||||
action_spec_name,
|
||||
input_dict,
|
||||
task_ex.workflow_name,
|
||||
wf_spec
|
||||
)
|
||||
|
||||
|
||||
def _get_workflow_input(task_spec, ctx):
|
||||
return expr.evaluate_recursively(task_spec.get_input(), ctx)
|
||||
|
||||
|
||||
def _run_action_or_workflow(task_ex, task_spec, input_dict, index, wf_spec):
|
||||
t_name = task_ex.name
|
||||
|
||||
if task_spec.get_action_name():
|
||||
wf_trace.info(
|
||||
task_ex,
|
||||
"Task '%s' is RUNNING [action_name = %s]" %
|
||||
(t_name, task_spec.get_action_name())
|
||||
)
|
||||
|
||||
_schedule_run_action(task_ex, task_spec, input_dict, index, wf_spec)
|
||||
elif task_spec.get_workflow_name():
|
||||
wf_trace.info(
|
||||
task_ex,
|
||||
"Task '%s' is RUNNING [workflow_name = %s]" %
|
||||
(t_name, task_spec.get_workflow_name()))
|
||||
|
||||
_schedule_run_workflow(task_ex, task_spec, input_dict, index, wf_spec)
|
||||
|
||||
|
||||
def _get_action_defaults(task_ex, task_spec):
|
||||
actions = task_ex.in_context.get('__env', {}).get('__actions', {})
|
||||
|
||||
return actions.get(task_spec.get_action_name(), {})
|
||||
|
||||
|
||||
def _schedule_run_action(task_ex, task_spec, action_input, index, wf_spec):
|
||||
action_spec_name = task_spec.get_action_name()
|
||||
|
||||
action_def = action_handler.resolve_definition(
|
||||
action_spec_name,
|
||||
task_ex,
|
||||
wf_spec
|
||||
)
|
||||
|
||||
action_ex = action_handler.create_action_execution(
|
||||
action_def,
|
||||
action_input,
|
||||
task_ex,
|
||||
index
|
||||
)
|
||||
|
||||
target = expr.evaluate_recursively(
|
||||
task_spec.get_target(),
|
||||
utils.merge_dicts(
|
||||
copy.deepcopy(action_input),
|
||||
copy.deepcopy(task_ex.in_context)
|
||||
)
|
||||
)
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.engine.action_handler.run_existing_action',
|
||||
0,
|
||||
action_ex_id=action_ex.id,
|
||||
target=target
|
||||
)
|
||||
|
||||
|
||||
def _schedule_noop_action(task_ex, task_spec, wf_spec):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
action_def = action_handler.resolve_action_definition(
|
||||
'std.noop',
|
||||
wf_ex.workflow_name,
|
||||
wf_spec.get_name()
|
||||
task = _create_task(
|
||||
wf_ex,
|
||||
task_spec,
|
||||
task_ex.in_context,
|
||||
task_ex
|
||||
)
|
||||
|
||||
action_ex = action_handler.create_action_execution(action_def, {}, task_ex)
|
||||
|
||||
target = expr.evaluate_recursively(
|
||||
task_spec.get_target(),
|
||||
task_ex.in_context
|
||||
)
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.engine.action_handler.run_existing_action',
|
||||
0,
|
||||
action_ex_id=action_ex.id,
|
||||
target=target
|
||||
)
|
||||
|
||||
|
||||
def _schedule_run_workflow(task_ex, task_spec, wf_input, index,
|
||||
parent_wf_spec):
|
||||
parent_wf_ex = task_ex.workflow_execution
|
||||
|
||||
wf_spec_name = task_spec.get_workflow_name()
|
||||
|
||||
wf_def = e_utils.resolve_workflow_definition(
|
||||
parent_wf_ex.workflow_name,
|
||||
parent_wf_spec.get_name(),
|
||||
wf_spec_name
|
||||
)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
|
||||
|
||||
wf_params = {
|
||||
'task_execution_id': task_ex.id,
|
||||
'with_items_index': index
|
||||
}
|
||||
|
||||
if 'env' in parent_wf_ex.params:
|
||||
wf_params['env'] = parent_wf_ex.params['env']
|
||||
|
||||
for k, v in list(wf_input.items()):
|
||||
if k not in wf_spec.get_input():
|
||||
wf_params[k] = v
|
||||
del wf_input[k]
|
||||
|
||||
wf_ex_id, _ = wf_ex_service.create_workflow_execution(
|
||||
wf_def.name,
|
||||
wf_input,
|
||||
"sub-workflow execution",
|
||||
wf_params,
|
||||
wf_spec
|
||||
)
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.engine.task_handler.resume_workflow',
|
||||
0,
|
||||
wf_ex_id=wf_ex_id,
|
||||
env=None
|
||||
)
|
||||
|
||||
|
||||
def resume_workflow(wf_ex_id, env):
|
||||
rpc.get_engine_client().resume_workflow(wf_ex_id, env=env)
|
||||
|
||||
|
||||
def _complete_task(task_ex, task_spec, state, state_info=None):
|
||||
# Ignore if task already completed.
|
||||
if states.is_completed(task_ex.state):
|
||||
return []
|
||||
|
||||
set_task_state(task_ex, state, state_info)
|
||||
|
||||
try:
|
||||
data_flow.publish_variables(task_ex, task_spec)
|
||||
task.on_action_complete(action_ex)
|
||||
except exc.MistralException as e:
|
||||
LOG.error(
|
||||
'An error while publishing task variables'
|
||||
' [task_execution_id=%s]: %s',
|
||||
task_ex.id, str(e)
|
||||
)
|
||||
task_ex = action_ex.task_execution
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
set_task_state(task_ex, states.ERROR, str(e))
|
||||
msg = ("Failed to handle action completion [wf=%s, task=%s,"
|
||||
" action=%s]: %s\n%s" %
|
||||
(wf_ex.name, task_ex.name, action_ex.name, e, tb.format_exc()))
|
||||
|
||||
if not task_spec.get_keep_result():
|
||||
data_flow.destroy_task_result(task_ex)
|
||||
LOG.error(msg)
|
||||
|
||||
task.set_state(states.ERROR, msg)
|
||||
|
||||
wf_handler.fail_workflow(wf_ex, msg)
|
||||
|
||||
return
|
||||
|
||||
if task.is_completed():
|
||||
wf_handler.check_workflow_completion(wf_ex)
|
||||
|
||||
|
||||
def set_task_state(task_ex, state, state_info, processed=None):
|
||||
wf_trace.info(
|
||||
def fail_task(task_ex, msg):
|
||||
task = _build_task_from_execution(task_ex)
|
||||
|
||||
task.set_state(states.ERROR, msg)
|
||||
|
||||
wf_handler.fail_workflow(task_ex.workflow_execution, msg)
|
||||
|
||||
|
||||
def continue_task(task_ex):
|
||||
task = _build_task_from_execution(task_ex)
|
||||
|
||||
# TODO(rakhmerov): Error handling.
|
||||
task.run()
|
||||
|
||||
if task.is_completed():
|
||||
wf_handler.check_workflow_completion(task_ex.workflow_execution)
|
||||
|
||||
|
||||
def complete_task(task_ex, state, state_info):
|
||||
task = _build_task_from_execution(task_ex)
|
||||
|
||||
# TODO(rakhmerov): Error handling.
|
||||
task.complete(state, state_info)
|
||||
|
||||
if task.is_completed():
|
||||
wf_handler.check_workflow_completion(task_ex.workflow_execution)
|
||||
|
||||
|
||||
def _build_task_from_execution(task_ex, task_spec=None):
|
||||
return _create_task(
|
||||
task_ex.workflow_execution,
|
||||
"Task execution '%s' [%s -> %s]" %
|
||||
(task_ex.name, task_ex.state, state)
|
||||
task_spec or spec_parser.get_task_spec(task_ex.spec),
|
||||
task_ex.in_context,
|
||||
task_ex
|
||||
)
|
||||
|
||||
task_ex.state = state
|
||||
task_ex.state_info = state_info
|
||||
|
||||
if processed is not None:
|
||||
task_ex.processed = processed
|
||||
def _build_task_from_command(cmd):
|
||||
if isinstance(cmd, wf_cmds.RunExistingTask):
|
||||
task = _create_task(
|
||||
cmd.wf_ex,
|
||||
spec_parser.get_task_spec(cmd.task_ex.spec),
|
||||
cmd.ctx,
|
||||
cmd.task_ex
|
||||
)
|
||||
|
||||
if cmd.reset:
|
||||
task.reset()
|
||||
|
||||
return task
|
||||
|
||||
if isinstance(cmd, wf_cmds.RunTask):
|
||||
task = _create_task(cmd.wf_ex, cmd.task_spec, cmd.ctx)
|
||||
|
||||
if cmd.is_waiting():
|
||||
task.defer()
|
||||
|
||||
return task
|
||||
|
||||
raise exc.MistralError('Unsupported workflow command: %s' % cmd)
|
||||
|
||||
|
||||
def is_task_completed(task_ex, task_spec):
|
||||
def _create_task(wf_ex, task_spec, ctx, task_ex=None):
|
||||
if task_spec.get_with_items():
|
||||
return with_items.is_completed(task_ex)
|
||||
return tasks.WithItemsTask(wf_ex, task_spec, ctx, task_ex)
|
||||
|
||||
return states.is_completed(task_ex.state)
|
||||
|
||||
|
||||
def need_to_continue(task_ex, task_spec):
|
||||
# For now continue is available only for with-items.
|
||||
if task_spec.get_with_items():
|
||||
return (with_items.has_more_iterations(task_ex)
|
||||
and with_items.get_concurrency(task_ex))
|
||||
|
||||
return False
|
||||
return tasks.RegularTask(wf_ex, task_spec, ctx, task_ex)
|
||||
|
456
mistral/engine/tasks.py
Normal file
456
mistral/engine/tasks.py
Normal file
@ -0,0 +1,456 @@
|
||||
# Copyright 2016 - Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
import operator
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import actions
|
||||
from mistral.engine import dispatcher
|
||||
from mistral.engine import policies
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import base as wf_base
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral.workflow import with_items
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Task(object):
|
||||
"""Task.
|
||||
|
||||
Represents a workflow task and defines interface that can be used by
|
||||
Mistral engine or its components in order to manipulate with tasks.
|
||||
"""
|
||||
|
||||
def __init__(self, wf_ex, task_spec, ctx, task_ex=None):
|
||||
self.wf_ex = wf_ex
|
||||
self.task_spec = task_spec
|
||||
self.ctx = ctx
|
||||
self.task_ex = task_ex
|
||||
self.wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
self.waiting = False
|
||||
self.reset_flag = False
|
||||
|
||||
@abc.abstractmethod
|
||||
def on_action_complete(self, action_ex):
|
||||
"""Handle action completion.
|
||||
|
||||
:param action_ex: Action execution.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self):
|
||||
"""Runs task."""
|
||||
raise NotImplementedError
|
||||
|
||||
def defer(self):
|
||||
"""Defers task.
|
||||
|
||||
This methods finds task execution or creates new and puts task
|
||||
to a waiting state.
|
||||
"""
|
||||
|
||||
if not self.task_ex:
|
||||
self.task_ex = wf_utils.find_task_executions_by_spec(
|
||||
self.wf_ex,
|
||||
self.task_spec
|
||||
)
|
||||
|
||||
if not self.task_ex:
|
||||
self._create_task_execution()
|
||||
|
||||
self.set_state(states.WAITING, 'Task execution is deferred.')
|
||||
|
||||
self.waiting = True
|
||||
|
||||
def reset(self):
|
||||
self.reset_flag = True
|
||||
|
||||
def set_state(self, state, state_info, processed=None):
|
||||
"""Sets task state without executing post completion logic.
|
||||
|
||||
:param state: New task state.
|
||||
:param state_info: New state information (i.e. error message).
|
||||
:param processed: New "processed" flag value.
|
||||
"""
|
||||
|
||||
wf_trace.info(
|
||||
self.task_ex.workflow_execution,
|
||||
"Task execution '%s' [%s -> %s]: %s" %
|
||||
(self.task_ex.id, self.task_ex.state, state, state_info)
|
||||
)
|
||||
|
||||
self.task_ex.state = state
|
||||
self.task_ex.state_info = state_info
|
||||
|
||||
if processed is not None:
|
||||
self.task_ex.processed = processed
|
||||
|
||||
def complete(self, state, state_info=None):
|
||||
"""Complete task and set specified state.
|
||||
|
||||
Method sets specified task state and runs all necessary post
|
||||
completion logic such as publishing workflow variables and
|
||||
scheduling new workflow commands.
|
||||
|
||||
:param state: New task state.
|
||||
:param state_info: New state information (i.e. error message).
|
||||
"""
|
||||
|
||||
# Ignore if task already completed.
|
||||
if states.is_completed(self.task_ex.state):
|
||||
return
|
||||
|
||||
self.set_state(state, state_info)
|
||||
|
||||
data_flow.publish_variables(self.task_ex, self.task_spec)
|
||||
|
||||
if not self.task_spec.get_keep_result():
|
||||
# Destroy task result.
|
||||
for ex in self.task_ex.executions:
|
||||
if hasattr(ex, 'output'):
|
||||
ex.output = {}
|
||||
|
||||
self._after_task_complete()
|
||||
|
||||
# Ignore DELAYED state.
|
||||
if self.task_ex.state == states.RUNNING_DELAYED:
|
||||
return
|
||||
|
||||
# If workflow is paused we shouldn't schedule new commands
|
||||
# and mark task as processed.
|
||||
if states.is_paused(self.wf_ex.state):
|
||||
return
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
||||
|
||||
# Calculate commands to process next.
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
|
||||
# Mark task as processed after all decisions have been made
|
||||
# upon its completion.
|
||||
self.task_ex.processed = True
|
||||
|
||||
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
|
||||
|
||||
def _before_task_start(self):
|
||||
policies_spec = self.task_spec.get_policies()
|
||||
|
||||
for p in policies.build_policies(policies_spec, self.wf_spec):
|
||||
p.before_task_start(self.task_ex, self.task_spec)
|
||||
|
||||
def _after_task_complete(self):
|
||||
policies_spec = self.task_spec.get_policies()
|
||||
|
||||
for p in policies.build_policies(policies_spec, self.wf_spec):
|
||||
p.after_task_complete(self.task_ex, self.task_spec)
|
||||
|
||||
def _create_task_execution(self, state=states.RUNNING):
|
||||
self.task_ex = db_api.create_task_execution({
|
||||
'name': self.task_spec.get_name(),
|
||||
'workflow_execution_id': self.wf_ex.id,
|
||||
'workflow_name': self.wf_ex.workflow_name,
|
||||
'workflow_id': self.wf_ex.workflow_id,
|
||||
'state': state,
|
||||
'spec': self.task_spec.to_dict(),
|
||||
'in_context': self.ctx,
|
||||
'published': {},
|
||||
'runtime_context': {},
|
||||
'project_id': self.wf_ex.project_id
|
||||
})
|
||||
|
||||
# Add to collection explicitly so that it's in a proper
|
||||
# state within the current session.
|
||||
self.wf_ex.task_executions.append(self.task_ex)
|
||||
|
||||
def _get_action_defaults(self):
|
||||
action_name = self.task_spec.get_action_name()
|
||||
|
||||
if not action_name:
|
||||
return {}
|
||||
|
||||
env = self.task_ex.in_context.get('__env', {})
|
||||
|
||||
return env.get('__actions', {}).get(action_name, {})
|
||||
|
||||
|
||||
class RegularTask(Task):
|
||||
"""Regular task.
|
||||
|
||||
Takes care of processing regular tasks with one action.
|
||||
"""
|
||||
|
||||
def on_action_complete(self, action_ex):
|
||||
state = action_ex.state
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
# cases when action is successful and when it's not. For example,
|
||||
# in state_info we can specify the cause action.
|
||||
state_info = (None if state == states.SUCCESS
|
||||
else action_ex.output.get('result'))
|
||||
|
||||
self.complete(state, state_info)
|
||||
|
||||
def is_completed(self):
|
||||
return self.task_ex and states.is_completed(self.task_ex.state)
|
||||
|
||||
def run(self):
|
||||
if not self.task_ex:
|
||||
self._run_new()
|
||||
else:
|
||||
self._run_existing()
|
||||
|
||||
def _run_new(self):
|
||||
# NOTE(xylan): Need to think how to get rid of this weird judgment
|
||||
# to keep it more consistent with the function name.
|
||||
self.task_ex = wf_utils.find_task_execution_with_state(
|
||||
self.wf_ex,
|
||||
self.task_spec,
|
||||
states.WAITING
|
||||
)
|
||||
|
||||
if self.task_ex:
|
||||
self.set_state(states.RUNNING, None)
|
||||
|
||||
self.task_ex.in_context = self.ctx
|
||||
else:
|
||||
self._create_task_execution()
|
||||
|
||||
LOG.debug(
|
||||
'Starting task [workflow=%s, task_spec=%s, init_state=%s]' %
|
||||
(self.wf_ex.name, self.task_spec, self.task_ex.state)
|
||||
)
|
||||
|
||||
self._before_task_start()
|
||||
|
||||
# Policies could possibly change task state.
|
||||
if self.task_ex.state != states.RUNNING:
|
||||
return
|
||||
|
||||
self._schedule_actions()
|
||||
|
||||
def _run_existing(self):
|
||||
if self.waiting:
|
||||
return
|
||||
|
||||
# Explicitly change task state to RUNNING.
|
||||
# Throw exception if the existing task already succeeded.
|
||||
if self.task_ex.state == states.SUCCESS:
|
||||
raise exc.MistralError(
|
||||
'Rerunning succeeded tasks is not supported.'
|
||||
)
|
||||
|
||||
self.set_state(states.RUNNING, None, processed=False)
|
||||
|
||||
self._reset_actions()
|
||||
|
||||
self._schedule_actions()
|
||||
|
||||
def _reset_actions(self):
|
||||
"""Resets task state.
|
||||
|
||||
Depending on task type this method may reset task state. For example,
|
||||
delete all task actions etc.
|
||||
"""
|
||||
|
||||
# Reset state of processed task and related action executions.
|
||||
if self.reset_flag:
|
||||
action_exs = self.task_ex.executions
|
||||
else:
|
||||
action_exs = db_api.get_action_executions(
|
||||
task_execution_id=self.task_ex.id,
|
||||
state=states.ERROR,
|
||||
accepted=True
|
||||
)
|
||||
|
||||
for action_ex in action_exs:
|
||||
action_ex.accepted = False
|
||||
|
||||
def _schedule_actions(self):
|
||||
# Regular task schedules just one action.
|
||||
input_dict = self._get_action_input()
|
||||
target = self._get_target(input_dict)
|
||||
|
||||
action = self._build_action()
|
||||
|
||||
action.validate_input(input_dict)
|
||||
|
||||
action.schedule(input_dict, target)
|
||||
|
||||
def _get_target(self, input_dict):
|
||||
return expr.evaluate_recursively(
|
||||
self.task_spec.get_target(),
|
||||
utils.merge_dicts(
|
||||
copy.deepcopy(input_dict),
|
||||
copy.deepcopy(self.ctx)
|
||||
)
|
||||
)
|
||||
|
||||
def _get_action_input(self, ctx=None):
|
||||
ctx = ctx or self.ctx
|
||||
|
||||
input_dict = expr.evaluate_recursively(self.task_spec.get_input(), ctx)
|
||||
|
||||
return utils.merge_dicts(
|
||||
input_dict,
|
||||
self._get_action_defaults(),
|
||||
overwrite=False
|
||||
)
|
||||
|
||||
def _build_action(self):
|
||||
action_name = self.task_spec.get_action_name()
|
||||
wf_name = self.task_spec.get_workflow_name()
|
||||
|
||||
if wf_name:
|
||||
return actions.WorkflowAction(wf_name, task_ex=self.task_ex)
|
||||
|
||||
if not action_name:
|
||||
action_name = 'std.noop'
|
||||
|
||||
action_def = actions.resolve_action_definition(
|
||||
action_name,
|
||||
self.wf_ex.name,
|
||||
self.wf_spec.get_name()
|
||||
)
|
||||
|
||||
if action_def.spec:
|
||||
return actions.AdHocAction(action_def, task_ex=self.task_ex)
|
||||
|
||||
return actions.PythonAction(action_def, task_ex=self.task_ex)
|
||||
|
||||
|
||||
class WithItemsTask(RegularTask):
|
||||
"""With-items task.
|
||||
|
||||
Takes care of processing "with-items" tasks.
|
||||
"""
|
||||
|
||||
def on_action_complete(self, action_ex):
|
||||
state = action_ex.state
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
# cases when action is successful and when it's not. For example,
|
||||
# in state_info we can specify the cause action.
|
||||
state_info = (None if state == states.SUCCESS
|
||||
else action_ex.output.get('result'))
|
||||
|
||||
with_items.increase_capacity(self.task_ex)
|
||||
|
||||
if with_items.is_completed(self.task_ex):
|
||||
self.complete(
|
||||
with_items.get_final_state(self.task_ex),
|
||||
state_info
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
if (with_items.has_more_iterations(self.task_ex)
|
||||
and with_items.get_concurrency(self.task_ex)):
|
||||
self._schedule_actions()
|
||||
|
||||
def _schedule_actions(self):
|
||||
input_dicts = self._get_with_items_input()
|
||||
|
||||
if not input_dicts:
|
||||
self.complete(states.SUCCESS)
|
||||
|
||||
return
|
||||
|
||||
for idx, input_dict in input_dicts:
|
||||
target = self._get_target(input_dict)
|
||||
|
||||
action = self._build_action()
|
||||
|
||||
action.schedule(input_dict, target, index=idx)
|
||||
|
||||
def _get_with_items_input(self):
|
||||
"""Calculate input array for separating each action input.
|
||||
|
||||
Example:
|
||||
DSL:
|
||||
with_items:
|
||||
- itemX in <% $.arrayI %>
|
||||
- itemY in <% $.arrayJ %>
|
||||
|
||||
Assume arrayI = [1, 2], arrayJ = ['a', 'b'].
|
||||
with_items_input = {
|
||||
"itemX": [1, 2],
|
||||
"itemY": ['a', 'b']
|
||||
}
|
||||
|
||||
Then we get separated input:
|
||||
inputs_per_item = [
|
||||
{'itemX': 1, 'itemY': 'a'},
|
||||
{'itemX': 2, 'itemY': 'b'}
|
||||
]
|
||||
|
||||
:return: the list of tuples containing indexes
|
||||
and the corresponding input dict.
|
||||
"""
|
||||
with_items_inputs = expr.evaluate_recursively(
|
||||
self.task_spec.get_with_items(),
|
||||
self.ctx
|
||||
)
|
||||
|
||||
with_items.validate_input(with_items_inputs)
|
||||
|
||||
inputs_per_item = []
|
||||
|
||||
for key, value in with_items_inputs.items():
|
||||
for index, item in enumerate(value):
|
||||
iter_context = {key: item}
|
||||
|
||||
if index >= len(inputs_per_item):
|
||||
inputs_per_item.append(iter_context)
|
||||
else:
|
||||
inputs_per_item[index].update(iter_context)
|
||||
|
||||
action_inputs = []
|
||||
|
||||
for item_input in inputs_per_item:
|
||||
new_ctx = utils.merge_dicts(item_input, self.ctx)
|
||||
|
||||
action_inputs.append(self._get_action_input(new_ctx))
|
||||
|
||||
with_items.prepare_runtime_context(
|
||||
self.task_ex,
|
||||
self.task_spec,
|
||||
action_inputs
|
||||
)
|
||||
|
||||
indices = with_items.get_indices_for_loop(self.task_ex)
|
||||
|
||||
with_items.decrease_capacity(self.task_ex, len(indices))
|
||||
|
||||
if indices:
|
||||
current_inputs = operator.itemgetter(*indices)(action_inputs)
|
||||
|
||||
return zip(
|
||||
indices,
|
||||
current_inputs if isinstance(current_inputs, tuple)
|
||||
else [current_inputs]
|
||||
)
|
||||
|
||||
return []
|
@ -21,6 +21,9 @@ from mistral import exceptions as exc
|
||||
from mistral import utils
|
||||
|
||||
|
||||
# TODO(rakhmerov): This method is too abstract, validation rules may vary
|
||||
# depending on object type (action, wf), it's not clear what it can be
|
||||
# applied to.
|
||||
def validate_input(definition, input, spec=None):
|
||||
input_param_names = copy.deepcopy(list((input or {}).keys()))
|
||||
missing_param_names = []
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
# Copyright 2016 - Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -12,18 +12,89 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import rpc
|
||||
from mistral.engine import task_handler
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import base as wf_base
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def on_task_complete(task_ex):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
check_workflow_completion(wf_ex)
|
||||
|
||||
|
||||
def check_workflow_completion(wf_ex):
|
||||
if states.is_paused_or_completed(wf_ex.state):
|
||||
return
|
||||
|
||||
# Workflow is not completed if there are any incomplete task
|
||||
# executions that are not in WAITING state. If all incomplete
|
||||
# tasks are waiting and there are no unhandled errors, then these
|
||||
# tasks will not reach completion. In this case, mark the
|
||||
# workflow complete.
|
||||
incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex)
|
||||
|
||||
if any(not states.is_waiting(t.state) for t in incomplete_tasks):
|
||||
return
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
|
||||
|
||||
if wf_ctrl.all_errors_handled():
|
||||
succeed_workflow(
|
||||
wf_ex,
|
||||
wf_ctrl.evaluate_workflow_final_context(),
|
||||
wf_spec
|
||||
)
|
||||
else:
|
||||
state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex)
|
||||
|
||||
fail_workflow(wf_ex, state_info)
|
||||
|
||||
|
||||
def stop_workflow(wf_ex, state, message=None):
|
||||
if state == states.SUCCESS:
|
||||
wf_ctrl = wf_base.get_controller(wf_ex)
|
||||
|
||||
final_context = {}
|
||||
|
||||
try:
|
||||
final_context = wf_ctrl.evaluate_workflow_final_context()
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
'Failed to get final context for %s: %s' % (wf_ex, e)
|
||||
)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
return succeed_workflow(
|
||||
wf_ex,
|
||||
final_context,
|
||||
wf_spec,
|
||||
message
|
||||
)
|
||||
elif state == states.ERROR:
|
||||
return fail_workflow(wf_ex, message)
|
||||
|
||||
return wf_ex
|
||||
|
||||
|
||||
def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None):
|
||||
# Fail workflow if output is not successfully evaluated.
|
||||
try:
|
||||
@ -35,7 +106,7 @@ def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None):
|
||||
return fail_workflow(wf_ex, e.message)
|
||||
|
||||
# Set workflow execution to success until after output is evaluated.
|
||||
set_execution_state(wf_ex, states.SUCCESS, state_info)
|
||||
set_workflow_state(wf_ex, states.SUCCESS, state_info)
|
||||
|
||||
if wf_ex.task_execution_id:
|
||||
_schedule_send_result_to_parent_workflow(wf_ex)
|
||||
@ -47,7 +118,16 @@ def fail_workflow(wf_ex, state_info):
|
||||
if states.is_paused_or_completed(wf_ex.state):
|
||||
return wf_ex
|
||||
|
||||
set_execution_state(wf_ex, states.ERROR, state_info)
|
||||
set_workflow_state(wf_ex, states.ERROR, state_info)
|
||||
|
||||
# When we set an ERROR state we should safely set output value getting
|
||||
# w/o exceptions due to field size limitations.
|
||||
state_info = utils.cut_by_kb(
|
||||
state_info,
|
||||
cfg.CONF.engine.execution_field_size_limit_kb
|
||||
)
|
||||
|
||||
wf_ex.output = {'result': state_info}
|
||||
|
||||
if wf_ex.task_execution_id:
|
||||
_schedule_send_result_to_parent_workflow(wf_ex)
|
||||
@ -84,7 +164,9 @@ def send_result_to_parent_workflow(wf_ex_id):
|
||||
)
|
||||
|
||||
|
||||
def set_execution_state(wf_ex, state, state_info=None, set_upstream=False):
|
||||
# TODO(rakhmerov): Should not be public, should be encapsulated inside Workflow
|
||||
# abstraction.
|
||||
def set_workflow_state(wf_ex, state, state_info=None, set_upstream=False):
|
||||
cur_state = wf_ex.state
|
||||
|
||||
if states.is_valid_transition(cur_state, state):
|
||||
@ -109,24 +191,28 @@ def set_execution_state(wf_ex, state, state_info=None, set_upstream=False):
|
||||
# If specified, then recursively set the state of the parent workflow
|
||||
# executions to the same state. Only changing state to RUNNING is
|
||||
# supported.
|
||||
# TODO(rakhmerov): I don't like this hardcoded special case. It's
|
||||
# used only to continue the workflow (rerun) but at the first glance
|
||||
# seems like a generic behavior. Need to handle it differently.
|
||||
if set_upstream and state == states.RUNNING and wf_ex.task_execution_id:
|
||||
task_ex = db_api.get_task_execution(wf_ex.task_execution_id)
|
||||
|
||||
parent_wf_ex = lock_workflow_execution(task_ex.workflow_execution_id)
|
||||
|
||||
set_execution_state(
|
||||
set_workflow_state(
|
||||
parent_wf_ex,
|
||||
state,
|
||||
state_info=state_info,
|
||||
set_upstream=set_upstream
|
||||
)
|
||||
|
||||
task_handler.set_task_state(
|
||||
task_ex,
|
||||
state,
|
||||
state_info=None,
|
||||
processed=False
|
||||
)
|
||||
# TODO(rakhmerov): How do we need to set task state properly?
|
||||
# It doesn't seem right to intervene into the parent workflow
|
||||
# internals. We just need to communicate changes back to parent
|
||||
# worklfow and it should do what's needed itself.
|
||||
task_ex.state = state
|
||||
task_ex.state_info = None
|
||||
task_ex.processed = False
|
||||
|
||||
|
||||
def lock_workflow_execution(wf_ex_id):
|
||||
|
@ -59,7 +59,7 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
|
||||
'context': copy.deepcopy(wf_input) or {},
|
||||
'task_execution_id': params.get('task_execution_id'),
|
||||
'runtime_context': {
|
||||
'with_items_index': params.get('with_items_index', 0)
|
||||
'index': params.get('index', 0)
|
||||
},
|
||||
})
|
||||
|
||||
@ -92,4 +92,4 @@ def create_workflow_execution(wf_identifier, wf_input, description, params,
|
||||
|
||||
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier)
|
||||
|
||||
return wf_ex.id, wf_spec
|
||||
return wf_ex, wf_spec
|
||||
|
@ -162,9 +162,9 @@ class CallScheduler(periodic_task.PeriodicTasks):
|
||||
)
|
||||
|
||||
for (target_auth_context, target_method, method_args) in delayed_calls:
|
||||
|
||||
# TODO(rakhmerov): https://bugs.launchpad.net/mistral/+bug/1484521
|
||||
# Transaction is needed here because some of the
|
||||
# target_method can use the DB
|
||||
# target_method can use the DB.
|
||||
with db_api.transaction():
|
||||
try:
|
||||
# Set the correct context for the method.
|
||||
@ -175,9 +175,7 @@ class CallScheduler(periodic_task.PeriodicTasks):
|
||||
# Call the method.
|
||||
target_method(**method_args)
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
"Delayed call failed [exception=%s]", e
|
||||
)
|
||||
LOG.exception("Delayed call failed [exception=%s]: %s", e)
|
||||
finally:
|
||||
# Remove context.
|
||||
context.set_ctx(None)
|
||||
|
@ -31,7 +31,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
# Default delay and timeout in seconds for await_xxx() functions.
|
||||
DEFAULT_DELAY = 1
|
||||
DEFAULT_TIMEOUT = 60
|
||||
DEFAULT_TIMEOUT = 20
|
||||
|
||||
|
||||
def launch_engine_server(transport, engine):
|
||||
@ -135,16 +135,36 @@ class EngineTestCase(base.DbTestCase):
|
||||
|
||||
wf_execs = db_api.get_workflow_executions()
|
||||
|
||||
for wf_ex in wf_execs:
|
||||
for w in wf_execs:
|
||||
print(
|
||||
"\n%s [state=%s, output=%s]" %
|
||||
(wf_ex.name, wf_ex.state, wf_ex.output)
|
||||
"\n%s [state=%s, state_info=%s, output=%s]" %
|
||||
(w.name, w.state, w.state_info, w.output)
|
||||
)
|
||||
|
||||
for t_ex in wf_ex.task_executions:
|
||||
for t in w.task_executions:
|
||||
print(
|
||||
"\t%s [id=%s, state=%s, published=%s]" %
|
||||
(t_ex.name, t_ex.id, t_ex.state, t_ex.published)
|
||||
"\t%s [id=%s, state=%s, state_info=%s, processed=%s,"
|
||||
" published=%s]" %
|
||||
(t.name,
|
||||
t.id,
|
||||
t.state,
|
||||
t.state_info,
|
||||
t.processed,
|
||||
t.published)
|
||||
)
|
||||
|
||||
a_execs = db_api.get_action_executions(task_execution_id=t.id)
|
||||
|
||||
for a in a_execs:
|
||||
print(
|
||||
"\t\t%s [id=%s, state=%s, state_info=%s, accepted=%s,"
|
||||
" output=%s]" %
|
||||
(a.name,
|
||||
a.id,
|
||||
a.state,
|
||||
a.state_info,
|
||||
a.accepted,
|
||||
a.output)
|
||||
)
|
||||
|
||||
# Various methods for abstract execution objects.
|
||||
|
@ -486,10 +486,10 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
task1 = self._assert_single_item(wf_ex.task_executions, name='task1')
|
||||
|
||||
result = data_flow.get_task_execution_result(task1)
|
||||
|
||||
self.assertListEqual([], result)
|
||||
|
||||
|
||||
@ -512,7 +512,7 @@ class DataFlowTest(test_base.BaseTest):
|
||||
name='my_action',
|
||||
output={'result': 1},
|
||||
accepted=True,
|
||||
runtime_context={'with_items_index': 0}
|
||||
runtime_context={'index': 0}
|
||||
)]
|
||||
|
||||
with mock.patch.object(db_api, 'get_action_executions',
|
||||
@ -523,14 +523,14 @@ class DataFlowTest(test_base.BaseTest):
|
||||
name='my_action',
|
||||
output={'result': 1},
|
||||
accepted=True,
|
||||
runtime_context={'with_items_index': 0}
|
||||
runtime_context={'index': 0}
|
||||
))
|
||||
|
||||
action_exs.append(models.ActionExecution(
|
||||
name='my_action',
|
||||
output={'result': 1},
|
||||
accepted=False,
|
||||
runtime_context={'with_items_index': 0}
|
||||
runtime_context={'index': 0}
|
||||
))
|
||||
|
||||
with mock.patch.object(db_api, 'get_action_executions',
|
||||
|
@ -143,6 +143,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
|
||||
self.await_execution_success(wf_ex.id)
|
||||
@ -411,6 +412,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
wf:
|
||||
input:
|
||||
- var
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output=<% $.var + $.var2 %>
|
||||
@ -484,7 +486,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
name='task1'
|
||||
)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_1_ex.state)
|
||||
|
||||
# Assert that there is only one action execution and it's SUCCESS.
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
@ -550,7 +552,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
name='task1'
|
||||
)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_1_ex.state)
|
||||
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id
|
||||
|
@ -460,18 +460,27 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIsNotNone(wf_ex.state_info)
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(2, len(task_execs))
|
||||
|
||||
task_1_ex = self._assert_single_item(
|
||||
task_execs,
|
||||
name='t1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
task_2_ex = self._assert_single_item(
|
||||
task_execs,
|
||||
name='t2',
|
||||
state=states.ERROR
|
||||
)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_2_ex.state)
|
||||
self.assertIsNotNone(task_2_ex.state_info)
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
e = self.assertRaises(
|
||||
exc.EngineException,
|
||||
exc.MistralError,
|
||||
self.engine.rerun_workflow,
|
||||
wf_ex.id,
|
||||
task_1_ex.id
|
||||
@ -505,9 +514,12 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIsNotNone(wf_ex.state_info)
|
||||
self.assertEqual(1, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(1, len(task_execs))
|
||||
|
||||
task_1_ex = self._assert_single_item(task_execs, name='t1')
|
||||
|
||||
self.assertEqual(states.ERROR, task_1_ex.state)
|
||||
self.assertIsNotNone(task_1_ex.state_info)
|
||||
@ -532,10 +544,13 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state)
|
||||
self.assertIsNone(wf_ex.state_info)
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(2, len(task_execs))
|
||||
|
||||
task_1_ex = self._assert_single_item(task_execs, name='t1')
|
||||
task_2_ex = self._assert_single_item(task_execs, name='t2')
|
||||
|
||||
# Check action executions of task 1.
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
@ -547,8 +562,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
# The single action execution that succeeded should not re-run.
|
||||
self.assertEqual(5, len(task_1_action_exs))
|
||||
self.assertListEqual(['Task 1.0', 'Task 1.1', 'Task 1.2'],
|
||||
task_1_ex.published.get('v1'))
|
||||
self.assertListEqual(
|
||||
['Task 1.0', 'Task 1.1', 'Task 1.2'],
|
||||
task_1_ex.published.get('v1')
|
||||
)
|
||||
|
||||
# Check action executions of task 2.
|
||||
self.assertEqual(states.SUCCESS, task_2_ex.state)
|
||||
|
@ -172,8 +172,7 @@ class EnvironmentTest(base.EngineTestCase):
|
||||
'mistral.actions.std_actions.EchoAction',
|
||||
{},
|
||||
a_ex.input,
|
||||
TARGET,
|
||||
True
|
||||
TARGET
|
||||
)
|
||||
|
||||
def test_subworkflow_env_task_input(self):
|
||||
|
61
mistral/tests/unit/engine/test_error_handling.py
Normal file
61
mistral/tests/unit/engine/test_error_handling.py
Normal file
@ -0,0 +1,61 @@
|
||||
# Copyright 2016 - Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
|
||||
class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
def test_action_error(self):
|
||||
wf_text = """
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.fail
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(1, len(task_execs))
|
||||
|
||||
self._assert_single_item(task_execs, name='task1', state=states.ERROR)
|
||||
|
||||
# TODO(rakhmerov): Finish. Need to check more complicated cases.
|
||||
|
||||
def test_task_error(self):
|
||||
# TODO(rakhmerov): Implement.
|
||||
pass
|
||||
|
||||
def test_workflow_error(self):
|
||||
# TODO(rakhmerov): Implement.
|
||||
pass
|
@ -178,13 +178,13 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertIn(
|
||||
'Failure caused by error in tasks: task1',
|
||||
'Failed to handle action completion [wf=wf, task=task1',
|
||||
wf_ex.state_info
|
||||
)
|
||||
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
|
||||
|
||||
self.assertEqual(
|
||||
self.assertIn(
|
||||
"Size of 'published' is 1KB which exceeds the limit of 0KB",
|
||||
task_ex.state_info
|
||||
)
|
||||
|
@ -576,6 +576,7 @@ class JoinEngineTest(base.EngineTestCase):
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task22
|
||||
|
||||
task22:
|
||||
action: std.noop
|
||||
on-success:
|
||||
@ -585,6 +586,7 @@ class JoinEngineTest(base.EngineTestCase):
|
||||
action: std.fail
|
||||
on-success:
|
||||
- task32
|
||||
|
||||
task32:
|
||||
action: std.noop
|
||||
on-success:
|
||||
|
@ -303,7 +303,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
e = self.assertRaises(
|
||||
exc.EngineException,
|
||||
exc.MistralError,
|
||||
self.engine.rerun_workflow,
|
||||
wf_ex.id,
|
||||
task_1_ex.id
|
||||
|
@ -65,6 +65,7 @@ class RunActionEngineTest(base.EngineTestCase):
|
||||
# Start action and see the result.
|
||||
action_ex = self.engine.start_action('std.echo', {'output': 'Hello!'})
|
||||
|
||||
self.assertIsNotNone(action_ex.output)
|
||||
self.assertIn('some error', action_ex.output['result'])
|
||||
|
||||
def test_run_action_save_result(self):
|
||||
@ -148,10 +149,15 @@ class RunActionEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertIn('concat', exception.message)
|
||||
|
||||
@mock.patch('mistral.engine.action_handler.resolve_action_definition')
|
||||
# TODO(rakhmerov): This is an example of a bad test. It pins to
|
||||
# implementation details too much and prevents from making refactoring
|
||||
# easily. When writing tests we should make assertions about
|
||||
# consequences, not about how internal machinery works, i.e. we need to
|
||||
# follow "black box" testing paradigm.
|
||||
@mock.patch('mistral.engine.actions.resolve_action_definition')
|
||||
@mock.patch('mistral.engine.utils.validate_input')
|
||||
@mock.patch('mistral.services.action_manager.get_action_class')
|
||||
@mock.patch('mistral.engine.action_handler.run_action')
|
||||
@mock.patch('mistral.engine.actions.PythonAction.run')
|
||||
def test_run_action_with_kwargs_input(self, run_mock, class_mock,
|
||||
validate_mock, def_mock):
|
||||
action_def = models.ActionDefinition()
|
||||
@ -172,16 +178,15 @@ class RunActionEngineTest(base.EngineTestCase):
|
||||
|
||||
self.engine.start_action('fake_action', {'input': 'Hello'})
|
||||
|
||||
self.assertEqual(2, def_mock.call_count)
|
||||
def_mock.assert_called_with('fake_action', None, None)
|
||||
self.assertEqual(1, def_mock.call_count)
|
||||
def_mock.assert_called_with('fake_action')
|
||||
|
||||
self.assertEqual(0, validate_mock.call_count)
|
||||
|
||||
class_ret.assert_called_once_with(input='Hello')
|
||||
|
||||
run_mock.assert_called_once_with(
|
||||
action_def,
|
||||
{'input': 'Hello'},
|
||||
target=None,
|
||||
async=False
|
||||
None,
|
||||
save=None
|
||||
)
|
||||
|
@ -181,7 +181,7 @@ class SubworkflowsTest(base.EngineTestCase):
|
||||
@mock.patch.object(std_actions.EchoAction, 'run',
|
||||
mock.MagicMock(side_effect=exc.ActionException))
|
||||
def test_subworkflow_error(self):
|
||||
wf2_ex = self.engine.start_workflow('wb1.wf2', None)
|
||||
self.engine.start_workflow('wb1.wf2', None)
|
||||
|
||||
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
|
||||
|
||||
@ -208,11 +208,13 @@ class SubworkflowsTest(base.EngineTestCase):
|
||||
self.assertEqual(2, len(wf_execs))
|
||||
|
||||
wf2_ex = self._assert_single_item(wf_execs, name='wb2.wf2')
|
||||
|
||||
self.assertEqual(states.ERROR, wf2_ex.state)
|
||||
self.assertIn('Can not evaluate YAQL expression', wf2_ex.state_info)
|
||||
|
||||
# Ensure error message is bubbled up to the main workflow.
|
||||
wf1_ex = self._assert_single_item(wf_execs, name='wb2.wf1')
|
||||
|
||||
self.assertEqual(states.ERROR, wf1_ex.state)
|
||||
self.assertIn('Can not evaluate YAQL expression', wf1_ex.state_info)
|
||||
|
||||
|
@ -16,7 +16,6 @@ import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import default_engine as de
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workbooks as wb_service
|
||||
from mistral.tests.unit.engine import base
|
||||
@ -139,7 +138,7 @@ workflows:
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output="Hi!"
|
||||
action: std.echo output="Task 1"
|
||||
on-complete:
|
||||
- task3
|
||||
- pause
|
||||
@ -299,7 +298,7 @@ class WorkflowResumeTest(base.EngineTestCase):
|
||||
|
||||
self.engine.resume_workflow(wf_ex.id)
|
||||
|
||||
self.await_execution_success(wf_ex.id, 1, 5)
|
||||
self.await_execution_success(wf_ex.id)
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
@ -351,8 +350,7 @@ class WorkflowResumeTest(base.EngineTestCase):
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state, wf_ex.state_info)
|
||||
self.assertEqual(4, len(wf_ex.task_executions))
|
||||
|
||||
@mock.patch.object(de.DefaultEngine, '_fail_workflow')
|
||||
def test_resume_fails(self, mock_fw):
|
||||
def test_resume_fails(self):
|
||||
# Start and pause workflow.
|
||||
wb_service.create_workbook_v2(WORKBOOK_DIFFERENT_TASK_STATES)
|
||||
|
||||
@ -365,7 +363,7 @@ class WorkflowResumeTest(base.EngineTestCase):
|
||||
self.assertEqual(states.PAUSED, wf_ex.state)
|
||||
|
||||
# Simulate failure and check if it is handled.
|
||||
err = exc.MistralException('foo')
|
||||
err = exc.MistralError('foo')
|
||||
|
||||
with mock.patch.object(
|
||||
db_api,
|
||||
@ -373,13 +371,11 @@ class WorkflowResumeTest(base.EngineTestCase):
|
||||
side_effect=err):
|
||||
|
||||
self.assertRaises(
|
||||
exc.MistralException,
|
||||
exc.MistralError,
|
||||
self.engine.resume_workflow,
|
||||
wf_ex.id
|
||||
)
|
||||
|
||||
mock_fw.assert_called_once_with(wf_ex.id, err)
|
||||
|
||||
def test_resume_diff_env_vars(self):
|
||||
wb_service.create_workbook_v2(RESUME_WORKBOOK_DIFF_ENV_VAR)
|
||||
|
||||
|
@ -103,7 +103,7 @@ class DirectWorkflowControllerTest(base.DbTestCase):
|
||||
state=states.SUCCESS,
|
||||
output={'result': 'Hey'},
|
||||
accepted=True,
|
||||
runtime_context={'with_items_index': 0}
|
||||
runtime_context={'index': 0}
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -25,9 +25,7 @@ class WithItemsTest(base.BaseTest):
|
||||
return models.ActionExecution(
|
||||
accepted=accepted,
|
||||
state=state,
|
||||
runtime_context={
|
||||
'with_items_index': index
|
||||
}
|
||||
runtime_context={'index': index}
|
||||
)
|
||||
|
||||
def test_get_indices(self):
|
||||
|
@ -23,6 +23,7 @@ from os import path
|
||||
import shutil
|
||||
import six
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import uuid
|
||||
@ -169,6 +170,16 @@ def cut(data, length=100):
|
||||
return string
|
||||
|
||||
|
||||
def cut_by_kb(data, kilobytes):
|
||||
if kilobytes <= 0:
|
||||
return cut(data)
|
||||
|
||||
bytes_per_char = sys.getsizeof('s') - sys.getsizeof('')
|
||||
length = int(kilobytes * 1024 / bytes_per_char)
|
||||
|
||||
return cut(data, length)
|
||||
|
||||
|
||||
def iter_subclasses(cls, _seen=None):
|
||||
"""Generator over all subclasses of a given class in depth first order."""
|
||||
|
||||
|
@ -83,7 +83,8 @@ class WorkflowController(object):
|
||||
|
||||
self.wf_spec = wf_spec
|
||||
|
||||
def _update_task_ex_env(self, task_ex, env):
|
||||
@staticmethod
|
||||
def _update_task_ex_env(task_ex, env):
|
||||
if not env:
|
||||
return task_ex
|
||||
|
||||
|
@ -45,17 +45,18 @@ class RunTask(WorkflowCommand):
|
||||
|
||||
def __init__(self, wf_ex, task_spec, ctx):
|
||||
super(RunTask, self).__init__(wf_ex, task_spec, ctx)
|
||||
self.wait_flag = False
|
||||
|
||||
self.wait = False
|
||||
|
||||
def is_waiting(self):
|
||||
return (self.wait_flag and
|
||||
return (self.wait and
|
||||
isinstance(self.task_spec, tasks.DirectWorkflowTaskSpec) and
|
||||
self.task_spec.get_join())
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
"Run task [workflow=%s, task=%s, waif_flag=%s]"
|
||||
% (self.wf_ex.name, self.task_spec.get_name(), self.wait_flag)
|
||||
% (self.wf_ex.name, self.task_spec.get_name(), self.wait)
|
||||
)
|
||||
|
||||
|
||||
|
@ -73,7 +73,7 @@ def get_task_execution_result(task_ex):
|
||||
# use db_api.get_action_executions here to avoid session-less use cases.
|
||||
action_execs = db_api.get_action_executions(task_execution_id=task_ex.id)
|
||||
action_execs.sort(
|
||||
key=lambda x: x.runtime_context.get('with_items_index')
|
||||
key=lambda x: x.runtime_context.get('index')
|
||||
)
|
||||
|
||||
results = [
|
||||
@ -111,12 +111,6 @@ def publish_variables(task_ex, task_spec):
|
||||
)
|
||||
|
||||
|
||||
def destroy_task_result(task_ex):
|
||||
for ex in task_ex.executions:
|
||||
if hasattr(ex, 'output'):
|
||||
ex.output = {}
|
||||
|
||||
|
||||
def evaluate_task_outbound_context(task_ex):
|
||||
"""Evaluates task outbound Data Flow context.
|
||||
|
||||
|
@ -121,7 +121,7 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
# NOTE(xylan): Decide whether or not a join task should run
|
||||
# immediately.
|
||||
if self._is_unsatisfied_join(cmd):
|
||||
cmd.wait_flag = True
|
||||
cmd.wait = True
|
||||
|
||||
cmds.append(cmd)
|
||||
|
||||
|
@ -20,6 +20,9 @@ from mistral import exceptions as exc
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
# TODO(rakhmerov): Seems like it makes sense to get rid of this module in favor
|
||||
# of implementing all the needed logic in engine.tasks.WithItemsTask directly.
|
||||
|
||||
_CAPACITY = 'capacity'
|
||||
_CONCURRENCY = 'concurrency'
|
||||
_COUNT = 'count'
|
||||
@ -79,7 +82,7 @@ def _get_with_item_indices(exs):
|
||||
:param exs: List of executions.
|
||||
:return: a list of numbers.
|
||||
"""
|
||||
return sorted(set([ex.runtime_context['with_items_index'] for ex in exs]))
|
||||
return sorted(set([ex.runtime_context['index'] for ex in exs]))
|
||||
|
||||
|
||||
def _get_accepted_act_exs(task_ex):
|
||||
|
@ -597,18 +597,14 @@ class ExecutionTestsV2(base.TestCase):
|
||||
|
||||
@test.attr(type='negative')
|
||||
def test_create_execution_for_reverse_wf_invalid_start_task(self):
|
||||
_, wf_ex = self.client.create_execution(
|
||||
self.assertRaises(
|
||||
exceptions.BadRequest,
|
||||
self.client.create_execution,
|
||||
self.reverse_wf['name'],
|
||||
{
|
||||
self.reverse_wf['input']: "Bye"},
|
||||
{
|
||||
"task_name": "nonexist"
|
||||
}
|
||||
{self.reverse_wf['input']: "Bye"},
|
||||
{"task_name": "nonexist"}
|
||||
)
|
||||
|
||||
self.assertEqual("ERROR", wf_ex['state'])
|
||||
self.assertIn("Invalid task name", wf_ex['state_info'])
|
||||
|
||||
@test.attr(type='negative')
|
||||
def test_create_execution_forgot_input_params(self):
|
||||
self.assertRaises(exceptions.BadRequest,
|
||||
|
Loading…
Reference in New Issue
Block a user