Refactor Mistral Engine

* Introduced class hierarchies Task and Action used by Mistral engine.
  Note: Action here is a different than executor Action and represents
  rather actions of different types: regular python action, ad-hoc
  action and workflow action (since for task action and workflow are
  polymorphic)
* Refactored task_handler.py and action_handler.py with Task and Action
  hierarchies
* Rebuilt a chain call so that the entire action processing would look
  like a chain of calls Action -> Task -> Workflow where each level
  knows only about the next level and can influence it (e.g. if adhoc
  action has failed due to YAQL error in 'output' transformer action
  itself fails its task)
* Refactored policies according to new object model
* Fixed some of the tests to match the idea of having two types of
  exceptions, MistralException and MistralError, where the latter
  is considered either a harsh environmental problem or a logical
  issue in the system itself so that it must not be handled anywhere
  in the code

TODO(in subsequent patches):
 * Refactor WithItemsTask w/o using with_items.py
 * Remove DB transaction in Scheduler when making a delayed call,
   helper policy methods like 'continue_workflow'
 * Refactor policies test so that workflow definitions live right
   in test methods
 * Refactor workflow_handler with Workflow abstraction
 * Get rid of RunExistingTask workflow command, it should be just
   one command with various properties
 * Refactor resume and rerun with Task abstraction (same way as
   other methods, e.g. on_action_complete())
 * Add error handling to all required places such as
   task_handler.continue_task()
 * More tests for error handling

P.S. This patch is very big but it was nearly impossible to split
it into multiple smaller patches just because how entangled everything
was in Mistral Engine.

Partially implements: blueprint mistral-engine-error-handling
Implements: blueprint mistral-action-result-processing-pipeline
Implements: blueprint mistral-refactor-task-handler
Closes-Bug: #1568909

Change-Id: I0668e695c60dde31efc690563fc891387d44d6ba
This commit is contained in:
Renat Akhmerov 2016-05-11 13:36:50 +07:00
parent 2cf2fc5c74
commit 816bfd9dcc
34 changed files with 1564 additions and 1267 deletions

View File

@ -199,7 +199,7 @@ def validate_long_type_length(cls, field_name, value):
size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb
# If the size is unlimited. # If the size is unlimited.
if (size_limit_kb < 0): if size_limit_kb < 0:
return return
size_kb = int(sys.getsizeof(str(value)) / 1024) size_kb = int(sys.getsizeof(str(value)) / 1024)
@ -394,6 +394,8 @@ class CronTrigger(mb.MistralSecureModelBase):
# Register all hooks related to secure models. # Register all hooks related to secure models.
mb.register_secure_model_hooks() mb.register_secure_model_hooks()
# TODO(rakhmerov): This is a bad solution. It's hard to find in the code,
# configure flexibly etc. Fix it.
# Register an event listener to verify that the size of all the long columns # Register an event listener to verify that the size of all the long columns
# affected by the user do not exceed the limit configuration. # affected by the user do not exceed the limit configuration.
for attr_name in ['input', 'output', 'params', 'published']: for attr_name in ['input', 'output', 'params', 'published']:

View File

@ -12,302 +12,81 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from oslo_log import log as logging
import traceback as tb
from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models
from mistral.engine import rpc from mistral.engine import actions
from mistral.engine import utils as e_utils from mistral.engine import task_handler
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.services import action_manager as a_m
from mistral.services import security
from mistral import utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
def create_action_execution(action_def, action_input, task_ex=None, LOG = logging.getLogger(__name__)
index=0, description=''):
# TODO(rakhmerov): We can avoid hitting DB at all when calling things like
# create_action_execution(), these operations can be just done using
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
# send necessary SQL queries to DB. Currently, session flush happens
# on every operation which may not be optimal. The problem with using just
# session level cache is in generating ids. Ids are generated only on
# session flush. And now we have a lot places where we need to have ids
# before TX completion.
# Assign the action execution ID here to minimize database calls.
# Otherwise, the input property of the action execution DB object needs
# to be updated with the action execution ID after the action execution
# DB object is created.
action_ex_id = utils.generate_unicode_uuid()
if a_m.has_action_context( def on_action_complete(action_ex, result):
action_def.action_class, action_def.attributes or {}) and task_ex: task_ex = action_ex.task_execution
action_input.update(a_m.get_action_context(task_ex, action_ex_id))
values = { action = _build_action(action_ex)
'id': action_ex_id,
'name': action_def.name, try:
'spec': action_def.spec, action.complete(result)
'state': states.RUNNING, except exc.MistralException as e:
'input': action_input, msg = ("Failed to complete action [action=%s, task=%s]: %s\n%s" %
'runtime_context': {'with_items_index': index}, (action_ex.name, task_ex.name, e, tb.format_exc()))
'description': description
} LOG.error(msg)
action.fail(msg)
if task_ex:
task_handler.fail_task(task_ex, msg)
return
if task_ex: if task_ex:
values.update({ task_handler.on_action_complete(action_ex)
'task_execution_id': task_ex.id,
'workflow_name': task_ex.workflow_name,
'workflow_id': task_ex.workflow_id,
'project_id': task_ex.project_id,
})
else:
values.update({
'project_id': security.get_project_id(),
})
action_ex = db_api.create_action_execution(values)
if task_ex:
# Add to collection explicitly so that it's in a proper
# state within the current session.
task_ex.executions.append(action_ex)
return action_ex
def _inject_action_ctx_for_validating(action_def, input_dict): def _build_action(action_ex):
if a_m.has_action_context(action_def.action_class, action_def.attributes): if isinstance(action_ex, models.WorkflowExecution):
input_dict.update(a_m.get_empty_action_context()) return actions.WorkflowAction(None, action_ex=action_ex)
wf_name = None
wf_spec_name = None
def get_action_input(action_name, input_dict, wf_name=None, wf_spec=None): if action_ex.workflow_name:
action_def = resolve_action_definition( wf_name = action_ex.workflow_name
action_name, wf_spec = spec_parser.get_workflow_spec(
wf_name, action_ex.task_execution.workflow_execution.spec
wf_spec.get_name() if wf_spec else None )
) wf_spec_name = wf_spec.get_name()
if action_def.action_class: adhoc_action_name = action_ex.runtime_context.get('adhoc_action_name')
_inject_action_ctx_for_validating(action_def, input_dict)
# NOTE(xylan): Don't validate action input if action initialization method if adhoc_action_name:
# contains ** argument. action_def = actions.resolve_action_definition(
if '**' not in action_def.input: adhoc_action_name,
e_utils.validate_input(action_def, input_dict)
if action_def.spec:
# Ad-hoc action.
return _get_adhoc_action_input(
action_def,
input_dict,
wf_name, wf_name,
wf_spec wf_spec_name
) )
return input_dict return actions.AdHocAction(action_def, action_ex=action_ex)
action_def = actions.resolve_action_definition(
def _get_adhoc_action_input(action_def, input_dict, action_ex.name,
wf_name=None, wf_spec=None):
action_spec = spec_parser.get_action_spec(action_def.spec)
base_name = action_spec.get_base()
action_def = resolve_action_definition(
base_name,
wf_name if wf_name else None,
wf_spec.get_name() if wf_spec else None
)
_inject_action_ctx_for_validating(action_def, input_dict)
e_utils.validate_input(action_def, input_dict, action_spec)
base_input = action_spec.get_base_input()
if base_input:
input_dict = expr.evaluate_recursively(
base_input,
input_dict
)
else:
input_dict = {}
return input_dict
def run_action(action_def, action_input,
action_ex_id=None, target=None, async=True):
action_result = rpc.get_executor_client().run_action(
action_ex_id,
action_def.action_class,
action_def.attributes or {},
action_input,
target,
async
)
if action_result:
return _get_action_output(action_result)
def _get_action_output(result):
"""Returns action output.
:param result: ActionResult instance or ActionResult dict
:return: dict containing result.
"""
if isinstance(result, dict):
result = wf_utils.Result(result.get('data'), result.get('error'))
return ({'result': result.data}
if result.is_success() else {'result': result.error})
def store_action_result(action_ex, result):
prev_state = action_ex.state
action_ex.state = states.SUCCESS if result.is_success() else states.ERROR
action_ex.output = _get_action_output(result)
action_ex.accepted = True
_log_action_result(action_ex, prev_state, action_ex.state, result)
return action_ex
def _log_action_result(action_ex, from_state, to_state, result):
def _result_msg():
if action_ex.state == states.ERROR:
return "error = %s" % utils.cut(result.error)
return "result = %s" % utils.cut(result.data)
wf_trace.info(
None,
"Action execution '%s' [%s -> %s, %s]" %
(action_ex.name, from_state, to_state, _result_msg())
)
def run_existing_action(action_ex_id, target):
action_ex = db_api.get_action_execution(action_ex_id)
action_def = db_api.get_action_definition(action_ex.name)
return run_action(
action_def,
action_ex.input,
action_ex_id,
target
)
def resolve_definition(action_name, task_ex=None, wf_spec=None):
if task_ex and wf_spec:
wf_ex = task_ex.workflow_execution
action_def = resolve_action_definition(
action_name,
wf_ex.workflow_name,
wf_spec.get_name()
)
else:
action_def = resolve_action_definition(action_name)
if action_def.spec:
# Ad-hoc action.
action_spec = spec_parser.get_action_spec(action_def.spec)
base_name = action_spec.get_base()
action_def = resolve_action_definition(
base_name,
task_ex.workflow_name if task_ex else None,
wf_spec.get_name() if wf_spec else None
)
return action_def
def resolve_action_definition(action_spec_name, wf_name=None,
wf_spec_name=None):
action_db = None
if wf_name and wf_name != wf_spec_name:
# If workflow belongs to a workbook then check
# action within the same workbook (to be able to
# use short names within workbooks).
# If it doesn't exist then use a name from spec
# to find an action in DB.
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = db_api.load_action_definition(action_full_name)
if not action_db:
action_db = db_api.load_action_definition(action_spec_name)
if not action_db:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" % action_spec_name
)
return action_db
def transform_result(result, task_ex, task_spec):
"""Transforms task result accounting for ad-hoc actions.
In case if the given result is an action result and action is
an ad-hoc action the method transforms the result according to
ad-hoc action configuration.
:param result: Result of task action/workflow.
:param task_ex: Task DB model.
:param task_spec: Task specification.
"""
if result.is_error():
return result
action_spec_name = task_spec.get_action_name()
if action_spec_name:
wf_ex = task_ex.workflow_execution
wf_spec_name = wf_ex.spec['name']
return transform_action_result(
action_spec_name,
result,
wf_ex.workflow_name,
wf_spec_name,
)
return result
def transform_action_result(action_spec_name, result,
wf_name=None, wf_spec_name=None):
action_def = resolve_action_definition(
action_spec_name,
wf_name, wf_name,
wf_spec_name wf_spec_name
) )
if not action_def.spec: return actions.PythonAction(action_def, action_ex=action_ex)
return result
transformer = spec_parser.get_action_spec(action_def.spec).get_output()
if transformer is None: def build_action_by_name(action_name):
return result action_def = actions.resolve_action_definition(action_name)
return wf_utils.Result( action_cls = (actions.PythonAction if not action_def.spec
data=expr.evaluate_recursively(transformer, result.data), else actions.AdHocAction)
error=result.error
) return action_cls(action_def)

477
mistral/engine/actions.py Normal file
View File

@ -0,0 +1,477 @@
# Copyright 2016 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from oslo_config import cfg
from oslo_log import log as logging
import six
from mistral.db.v2 import api as db_api
from mistral.engine import rpc
from mistral.engine import utils as e_utils
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.services import action_manager as a_m
from mistral.services import executions as wf_ex_service
from mistral.services import scheduler
from mistral.services import security
from mistral import utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
_RUN_EXISTING_ACTION_PATH = 'mistral.engine.actions._run_existing_action'
_RESUME_WORKFLOW_PATH = 'mistral.engine.actions._resume_workflow'
@six.add_metaclass(abc.ABCMeta)
class Action(object):
"""Action.
Represents a workflow action and defines interface that can be used by
Mistral engine or its components in order to manipulate with actions.
"""
def __init__(self, action_def, action_ex=None, task_ex=None):
self.action_def = action_def
self.action_ex = action_ex
self.task_ex = action_ex.task_execution if action_ex else task_ex
@abc.abstractmethod
def complete(self, result):
"""Complete action and process its result.
:param result: Action result.
"""
raise NotImplementedError
def fail(self, msg):
# When we set an ERROR state we should safely set output value getting
# w/o exceptions due to field size limitations.
msg = utils.cut_by_kb(
msg,
cfg.CONF.engine.execution_field_size_limit_kb
)
self.action_ex.state = states.ERROR
self.action_ex.output = {'result': msg}
@abc.abstractmethod
def schedule(self, input_dict, target, index=0, desc=''):
"""Schedule action run.
This method is needed to schedule action run so its result can
be received later by engine. In this sense it will be running in
asynchronous mode from engine perspective (don't confuse with
executor asynchrony when executor doesn't immediately send a
result).
:param input_dict: Action input.
:param target: Target (group of action executors).
:param index: Action execution index. Makes sense for some types.
:param desc: Action execution description.
"""
raise NotImplementedError
@abc.abstractmethod
def run(self, input_dict, target, index=0, desc='', save=True):
"""Immediately run action.
This method runs method w/o scheduling its run for a later time.
From engine perspective action will be processed in synchronous
mode.
:param input_dict: Action input.
:param target: Target (group of action executors).
:param index: Action execution index. Makes sense for some types.
:param desc: Action execution description.
:param save: True if action execution object needs to be saved.
:return: Action output.
"""
raise NotImplementedError
def validate_input(self, input_dict):
"""Validates action input parameters.
:param input_dict: Dictionary with input parameters.
"""
raise NotImplementedError
def is_sync(self, input_dict):
"""Determines if action is synchronous.
:param input_dict: Dictionary with input parameters.
"""
return True
def _create_action_execution(self, input_dict, runtime_ctx, desc=''):
# Assign the action execution ID here to minimize database calls.
# Otherwise, the input property of the action execution DB object needs
# to be updated with the action execution ID after the action execution
# DB object is created.
action_ex_id = utils.generate_unicode_uuid()
# TODO(rakhmerov): Bad place, we probably need to push action context
# to all actions. It's related to
# https://blueprints.launchpad.net/mistral/+spec/mistral-custom-actions-api
if a_m.has_action_context(
self.action_def.action_class,
self.action_def.attributes or {}) and self.task_ex:
input_dict.update(
a_m.get_action_context(self.task_ex, action_ex_id)
)
values = {
'id': action_ex_id,
'name': self.action_def.name,
'spec': self.action_def.spec,
'state': states.RUNNING,
'input': input_dict,
'runtime_context': runtime_ctx,
'description': desc
}
if self.task_ex:
values.update({
'task_execution_id': self.task_ex.id,
'workflow_name': self.task_ex.workflow_name,
'workflow_id': self.task_ex.workflow_id,
'project_id': self.task_ex.project_id,
})
else:
values.update({
'project_id': security.get_project_id(),
})
self.action_ex = db_api.create_action_execution(values)
if self.task_ex:
# Add to collection explicitly so that it's in a proper
# state within the current session.
self.task_ex.executions.append(self.action_ex)
def _inject_action_ctx_for_validating(self, input_dict):
if a_m.has_action_context(
self.action_def.action_class, self.action_def.attributes):
input_dict.update(a_m.get_empty_action_context())
def _log_result(self, prev_state, result):
state = self.action_ex.state
def _result_msg():
if state == states.ERROR:
return "error = %s" % utils.cut(result.error)
return "result = %s" % utils.cut(result.data)
wf_trace.info(
None,
"Action execution '%s' [%s -> %s, %s]" %
(self.action_ex.name, prev_state, state, _result_msg())
)
class PythonAction(Action):
"""Regular Python action."""
def complete(self, result):
if states.is_completed(self.action_ex.state):
return
prev_state = self.action_ex.state
self.action_ex.state = (states.SUCCESS if result.is_success()
else states.ERROR)
self.action_ex.output = self._prepare_output(result)
self.action_ex.accepted = True
self._log_result(prev_state, result)
def schedule(self, input_dict, target, index=0, desc=''):
self._create_action_execution(
self._prepare_input(input_dict),
self._prepare_runtime_context(index),
desc=desc
)
scheduler.schedule_call(
None,
_RUN_EXISTING_ACTION_PATH,
0,
action_ex_id=self.action_ex.id,
target=target
)
def run(self, input_dict, target, index=0, desc='', save=True):
input_dict = self._prepare_input(input_dict)
runtime_ctx = self._prepare_runtime_context(index)
if save:
self._create_action_execution(input_dict, runtime_ctx, desc=desc)
result = rpc.get_executor_client().run_action(
self.action_ex.id if self.action_ex else None,
self.action_def.action_class,
self.action_def.attributes or {},
input_dict,
target,
async=False
)
return self._prepare_output(result)
def is_sync(self, input_dict):
input_dict = self._prepare_input(input_dict)
a = a_m.get_action_class(self.action_def.name)(**input_dict)
return a.is_sync()
def validate_input(self, input_dict):
if self.action_def.action_class:
self._inject_action_ctx_for_validating(input_dict)
# TODO(rakhmerov): I'm not sure what this is for.
# NOTE(xylan): Don't validate action input if action initialization
# method contains ** argument.
if '**' not in self.action_def.input:
e_utils.validate_input(self.action_def, input_dict)
def _prepare_input(self, input_dict):
"""Template method to do manipulations with input parameters.
Python action doesn't do anything specific with initial input.
"""
return input_dict
def _prepare_output(self, result):
"""Template method to do manipulations with action result.
Python action just wraps action result into dict that can
be stored in DB.
"""
return _get_action_output(result) if result else None
def _prepare_runtime_context(self, index):
"""Template method to prepare action runtime context.
Python action inserts index into runtime context.
"""
return {'index': index}
class AdHocAction(PythonAction):
"""Ad-hoc action."""
def __init__(self, action_def, action_ex=None, task_ex=None):
self.action_spec = spec_parser.get_action_spec(action_def.spec)
base_action_def = db_api.get_action_definition(
self.action_spec.get_base()
)
super(AdHocAction, self).__init__(
base_action_def,
action_ex,
task_ex
)
self.adhoc_action_def = action_def
def validate_input(self, input_dict):
e_utils.validate_input(
self.adhoc_action_def,
input_dict,
self.action_spec
)
super(AdHocAction, self).validate_input(
self._prepare_input(input_dict)
)
def _prepare_input(self, input_dict):
base_input_expr = self.action_spec.get_base_input()
if base_input_expr:
base_input_dict = expr.evaluate_recursively(
base_input_expr,
input_dict
)
else:
base_input_dict = {}
return super(AdHocAction, self)._prepare_input(base_input_dict)
def _prepare_output(self, result):
# In case of error, we don't transform a result.
if not result.is_error():
adhoc_action_spec = spec_parser.get_action_spec(
self.adhoc_action_def.spec
)
transformer = adhoc_action_spec.get_output()
if transformer is not None:
result = wf_utils.Result(
data=expr.evaluate_recursively(transformer, result.data),
error=result.error
)
return _get_action_output(result) if result else None
def _prepare_runtime_context(self, index):
ctx = super(AdHocAction, self)._prepare_runtime_context(index)
# Insert special field into runtime context so that we track
# a relationship between python action and adhoc action.
return utils.merge_dicts(
ctx,
{'adhoc_action_name': self.adhoc_action_def.name}
)
class WorkflowAction(Action):
"""Workflow action."""
def complete(self, result):
# No-op because in case of workflow result is already processed.
pass
def schedule(self, input_dict, target, index=0, desc=''):
parent_wf_ex = self.task_ex.workflow_execution
parent_wf_spec = spec_parser.get_workflow_spec(parent_wf_ex.spec)
task_spec = spec_parser.get_task_spec(self.task_ex.spec)
wf_spec_name = task_spec.get_workflow_name()
wf_def = e_utils.resolve_workflow_definition(
parent_wf_ex.workflow_name,
parent_wf_spec.get_name(),
wf_spec_name
)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
wf_params = {
'task_execution_id': self.task_ex.id,
'index': index
}
if 'env' in parent_wf_ex.params:
wf_params['env'] = parent_wf_ex.params['env']
for k, v in list(input_dict.items()):
if k not in wf_spec.get_input():
wf_params[k] = v
del input_dict[k]
wf_ex, _ = wf_ex_service.create_workflow_execution(
wf_def.name,
input_dict,
"sub-workflow execution",
wf_params,
wf_spec
)
scheduler.schedule_call(
None,
_RESUME_WORKFLOW_PATH,
0,
wf_ex_id=wf_ex.id,
env=None
)
# TODO(rakhmerov): Add info logging.
def run(self, input_dict, target, index=0, desc='', save=True):
raise NotImplemented('Does not apply to this WorkflowAction.')
def is_sync(self, input_dict):
# Workflow action is always asynchronous.
return False
def validate_input(self, input_dict):
# TODO(rakhmerov): Implement.
pass
def _resume_workflow(wf_ex_id, env):
rpc.get_engine_client().resume_workflow(wf_ex_id, env=env)
def _run_existing_action(action_ex_id, target):
action_ex = db_api.get_action_execution(action_ex_id)
action_def = db_api.get_action_definition(action_ex.name)
result = rpc.get_executor_client().run_action(
action_ex_id,
action_def.action_class,
action_def.attributes or {},
action_ex.input,
target
)
return _get_action_output(result) if result else None
def _get_action_output(result):
"""Returns action output.
:param result: ActionResult instance or ActionResult dict
:return: dict containing result.
"""
if isinstance(result, dict):
result = wf_utils.Result(result.get('data'), result.get('error'))
return ({'result': result.data}
if result.is_success() else {'result': result.error})
def resolve_action_definition(action_spec_name, wf_name=None,
wf_spec_name=None):
"""Resolve action definition accounting for ad-hoc action namespacing.
:param action_spec_name: Action name according to a spec.
:param wf_name: Workflow name.
:param wf_spec_name: Workflow name according to a spec.
:return: Action definition (python or ad-hoc).
"""
action_db = None
if wf_name and wf_name != wf_spec_name:
# If workflow belongs to a workbook then check
# action within the same workbook (to be able to
# use short names within workbooks).
# If it doesn't exist then use a name from spec
# to find an action in DB.
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = db_api.load_action_definition(action_full_name)
if not action_db:
action_db = db_api.load_action_definition(action_spec_name)
if not action_db:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" % action_spec_name
)
return action_db

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from mistral import coordination from mistral import coordination
@ -22,19 +20,14 @@ from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler from mistral.engine import action_handler
from mistral.engine import base from mistral.engine import base
from mistral.engine import task_handler from mistral.engine import dispatcher
from mistral.engine import workflow_handler as wf_handler from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc
from mistral.services import action_manager as a_m
from mistral.services import executions as wf_ex_service from mistral.services import executions as wf_ex_service
from mistral.services import workflows as wf_service from mistral.services import workflows as wf_service
from mistral import utils as u from mistral import utils as u
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base from mistral.workflow import base as wf_base
from mistral.workflow import commands from mistral.workflow import commands
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -53,240 +46,84 @@ class DefaultEngine(base.Engine, coordination.Service):
@u.log_exec(LOG) @u.log_exec(LOG)
def start_workflow(self, wf_identifier, wf_input, description='', def start_workflow(self, wf_identifier, wf_input, description='',
**params): **params):
wf_ex_id = None with db_api.transaction():
# TODO(rakhmerov): It needs to be hidden in workflow_handler and
try: # Workflow abstraction.
# Create a persistent workflow execution in a separate transaction # The new workflow execution will be in an IDLE
# so that we can return it even in case of unexpected errors that # state on initial record creation.
# lead to transaction rollback. wf_ex, wf_spec = wf_ex_service.create_workflow_execution(
with db_api.transaction(): wf_identifier,
# The new workflow execution will be in an IDLE wf_input,
# state on initial record creation. description,
wf_ex_id, wf_spec = wf_ex_service.create_workflow_execution( params
wf_identifier,
wf_input,
description,
params
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_handler.set_execution_state(wf_ex, states.RUNNING)
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
self._dispatch_workflow_commands(
wf_ex,
wf_ctrl.continue_workflow(),
wf_spec
)
return wf_ex.get_clone()
except Exception as e:
LOG.error(
"Failed to start workflow '%s' id=%s: %s\n%s",
wf_identifier, wf_ex_id, e, traceback.format_exc()
) )
wf_handler.set_workflow_state(wf_ex, states.RUNNING)
wf_ex = self._fail_workflow(wf_ex_id, e) wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
if wf_ex: cmds = wf_ctrl.continue_workflow()
return wf_ex.get_clone()
raise e dispatcher.dispatch_workflow_commands(wf_ex, cmds)
return wf_ex.get_clone()
@u.log_exec(LOG) @u.log_exec(LOG)
def start_action(self, action_name, action_input, def start_action(self, action_name, action_input,
description=None, **params): description=None, **params):
with db_api.transaction(): with db_api.transaction():
action_def = action_handler.resolve_definition(action_name) action = action_handler.build_action_by_name(action_name)
resolved_action_input = action_handler.get_action_input(
action_name, action.validate_input(action_input)
action_input
save = params.get('save_result')
target = params.get('target')
if save or not action.is_sync(action_input):
action.schedule(action_input, target)
return action.action_ex.get_clone()
output = action.run(action_input, target, save=save)
# Action execution is not created but we need to return similar
# object to a client anyway.
return db_models.ActionExecution(
name=action_name,
description=description,
input=action_input,
output=output
) )
action = a_m.get_action_class(action_def.name)(
**resolved_action_input
)
# If we see action is asynchronous, then we enforce 'save_result'.
if params.get('save_result') or not action.is_sync():
action_ex = action_handler.create_action_execution(
action_def,
resolved_action_input,
description=description
)
action_handler.run_action(
action_def,
resolved_action_input,
action_ex.id,
params.get('target')
)
return action_ex.get_clone()
else:
output = action_handler.run_action(
action_def,
resolved_action_input,
target=params.get('target'),
async=False
)
return db_models.ActionExecution(
name=action_name,
description=description,
input=action_input,
output=output
)
def on_task_state_change(self, task_ex_id, state, state_info=None):
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
# TODO(rakhmerov): The method is mostly needed for policy and
# we are supposed to get the same action execution as when the
# policy worked.
wf_ex_id = task_ex.workflow_execution_id
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_trace.info(
task_ex,
"Task '%s' [%s -> %s] state_info : %s"
% (task_ex.name, task_ex.state, state, state_info)
)
task_ex.state = state
task_ex.state_info = state_info
self._on_task_state_change(task_ex, wf_ex, wf_spec)
def _on_task_state_change(self, task_ex, wf_ex, wf_spec):
task_spec = wf_spec.get_tasks()[task_ex.name]
if task_handler.is_task_completed(task_ex, task_spec):
task_handler.after_task_complete(task_ex, task_spec, wf_spec)
# Ignore DELAYED state.
if task_ex.state == states.RUNNING_DELAYED:
return
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
# Calculate commands to process next.
try:
cmds = wf_ctrl.continue_workflow()
except exc.YaqlEvaluationException as e:
LOG.error(
'YAQL error occurred while calculating next workflow '
'commands [wf_ex_id=%s, task_ex_id=%s]: %s',
wf_ex.id, task_ex.id, e
)
wf_handler.fail_workflow(wf_ex, str(e))
return
# Mark task as processed after all decisions have been made
# upon its completion.
task_ex.processed = True
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
self._check_workflow_completion(wf_ex, wf_ctrl, wf_spec)
elif task_handler.need_to_continue(task_ex, task_spec):
# Re-run existing task.
cmds = [commands.RunExistingTask(task_ex, reset=False)]
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
@staticmethod
def _check_workflow_completion(wf_ex, wf_ctrl, wf_spec):
if states.is_paused_or_completed(wf_ex.state):
return
# Workflow is not completed if there are any incomplete task
# executions that are not in WAITING state. If all incomplete
# tasks are waiting and there are unhandled errors, then these
# tasks will not reach completion. In this case, mark the
# workflow complete.
incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex)
if any(not states.is_waiting(t.state) for t in incomplete_tasks):
return
if wf_ctrl.all_errors_handled():
wf_handler.succeed_workflow(
wf_ex,
wf_ctrl.evaluate_workflow_final_context(),
wf_spec
)
else:
state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex)
wf_handler.fail_workflow(wf_ex, state_info)
@u.log_exec(LOG) @u.log_exec(LOG)
def on_action_complete(self, action_ex_id, result): def on_action_complete(self, action_ex_id, result):
wf_ex_id = None with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex_id)
try: task_ex = action_ex.task_execution
with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex_id)
# In case of single action execution there is no if task_ex:
# assigned task execution. wf_handler.lock_workflow_execution(
if not action_ex.task_execution: task_ex.workflow_execution_id
return action_handler.store_action_result(
action_ex,
result
).get_clone()
wf_ex_id = action_ex.task_execution.workflow_execution_id
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
task_ex = task_handler.on_action_complete(
action_ex,
wf_spec,
result
) )
# If workflow is on pause or completed then there's no action_handler.on_action_complete(action_ex, result)
# need to continue workflow.
if states.is_paused_or_completed(wf_ex.state):
return action_ex.get_clone()
self._on_task_state_change(task_ex, wf_ex, wf_spec) return action_ex.get_clone()
return action_ex.get_clone()
except Exception as e:
# TODO(rakhmerov): Need to refactor logging in a more elegant way.
LOG.error(
'Failed to handle action execution result [id=%s]: %s\n%s',
action_ex_id, e, traceback.format_exc()
)
# If an exception was thrown after we got the wf_ex_id
if wf_ex_id:
self._fail_workflow(wf_ex_id, e)
raise e
@u.log_exec(LOG) @u.log_exec(LOG)
def pause_workflow(self, wf_ex_id): def pause_workflow(self, wf_ex_id):
with db_api.transaction(): with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
wf_handler.set_execution_state(wf_ex, states.PAUSED) wf_handler.set_workflow_state(wf_ex, states.PAUSED)
return wf_ex return wf_ex
def _continue_workflow(self, wf_ex, task_ex=None, reset=True, env=None): @staticmethod
def _continue_workflow(wf_ex, task_ex=None, reset=True, env=None):
wf_ex = wf_service.update_workflow_execution_env(wf_ex, env) wf_ex = wf_service.update_workflow_execution_env(wf_ex, env)
wf_handler.set_execution_state( wf_handler.set_workflow_state(
wf_ex, wf_ex,
states.RUNNING, states.RUNNING,
set_upstream=True set_upstream=True
@ -294,13 +131,15 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ctrl = wf_base.get_controller(wf_ex) wf_ctrl = wf_base.get_controller(wf_ex)
# TODO(rakhmerov): Add YAQL error handling. # TODO(rakhmerov): Add error handling.
# Calculate commands to process next. # Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
# When resuming a workflow we need to ignore all 'pause' # When resuming a workflow we need to ignore all 'pause'
# commands because workflow controller takes tasks that # commands because workflow controller takes tasks that
# completed within the period when the workflow was paused. # completed within the period when the workflow was paused.
# TODO(rakhmerov): This all should be in workflow handler, it's too
# specific for engine level.
cmds = list( cmds = list(
filter( filter(
lambda c: not isinstance(c, commands.PauseWorkflow), lambda c: not isinstance(c, commands.PauseWorkflow),
@ -316,165 +155,50 @@ class DefaultEngine(base.Engine, coordination.Service):
if states.is_completed(t_ex.state) and not t_ex.processed: if states.is_completed(t_ex.state) and not t_ex.processed:
t_ex.processed = True t_ex.processed = True
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) dispatcher.dispatch_workflow_commands(wf_ex, cmds)
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
if not cmds: if not cmds:
if not wf_utils.find_incomplete_task_executions(wf_ex): wf_handler.check_workflow_completion(wf_ex)
wf_handler.succeed_workflow(
wf_ex,
wf_ctrl.evaluate_workflow_final_context(),
wf_spec
)
return wf_ex.get_clone() return wf_ex.get_clone()
@u.log_exec(LOG) @u.log_exec(LOG)
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None): def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None):
try: # TODO(rakhmerov): Rewrite this functionality with Task abstraction.
with db_api.transaction(): with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
task_ex = db_api.get_task_execution(task_ex_id) task_ex = db_api.get_task_execution(task_ex_id)
if task_ex.workflow_execution.id != wf_ex_id: if task_ex.workflow_execution.id != wf_ex_id:
raise ValueError('Workflow execution ID does not match.') raise ValueError('Workflow execution ID does not match.')
if wf_ex.state == states.PAUSED: if wf_ex.state == states.PAUSED:
return wf_ex.get_clone() return wf_ex.get_clone()
return self._continue_workflow(wf_ex, task_ex, reset, env=env) # TODO(rakhmerov): This should be a call to workflow handler.
except Exception as e: return self._continue_workflow(wf_ex, task_ex, reset, env=env)
LOG.error(
"Failed to rerun execution id=%s at task=%s: %s\n%s",
wf_ex_id, task_ex_id, e, traceback.format_exc()
)
self._fail_workflow(wf_ex_id, e)
raise e
@u.log_exec(LOG) @u.log_exec(LOG)
def resume_workflow(self, wf_ex_id, env=None): def resume_workflow(self, wf_ex_id, env=None):
try: # TODO(rakhmerov): Rewrite this functionality with Task abstraction.
with db_api.transaction(): with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
if (not states.is_paused(wf_ex.state) and if (not states.is_paused(wf_ex.state) and
not states.is_idle(wf_ex.state)): not states.is_idle(wf_ex.state)):
return wf_ex.get_clone() return wf_ex.get_clone()
return self._continue_workflow(wf_ex, env=env) return self._continue_workflow(wf_ex, env=env)
except Exception as e:
LOG.error(
"Failed to resume execution id=%s: %s\n%s",
wf_ex_id, e, traceback.format_exc()
)
self._fail_workflow(wf_ex_id, e)
raise e
@u.log_exec(LOG) @u.log_exec(LOG)
def stop_workflow(self, wf_ex_id, state, message=None): def stop_workflow(self, wf_ex_id, state, message=None):
with db_api.transaction(): with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
return self._stop_workflow(wf_ex, state, message) return wf_handler.stop_workflow(wf_ex, state, message)
@staticmethod
def _stop_workflow(wf_ex, state, message=None):
if state == states.SUCCESS:
wf_ctrl = wf_base.get_controller(wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
'Failed to get final context for %s: %s' % (wf_ex, e)
)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
return wf_handler.succeed_workflow(
wf_ex,
final_context,
wf_spec,
message
)
elif state == states.ERROR:
return wf_handler.fail_workflow(wf_ex, message)
return wf_ex
@u.log_exec(LOG) @u.log_exec(LOG)
def rollback_workflow(self, wf_ex_id): def rollback_workflow(self, wf_ex_id):
# TODO(rakhmerov): Implement. # TODO(rakhmerov): Implement.
raise NotImplementedError raise NotImplementedError
def _dispatch_workflow_commands(self, wf_ex, wf_cmds, wf_spec):
if not wf_cmds:
return
for cmd in wf_cmds:
if isinstance(cmd, commands.RunTask) and cmd.is_waiting():
task_handler.defer_task(cmd)
elif isinstance(cmd, commands.RunTask):
task_ex = task_handler.run_new_task(cmd, wf_spec)
if task_ex.state == states.ERROR:
wf_handler.fail_workflow(
wf_ex,
'Failed to start task [task_ex=%s]: %s' %
(task_ex, task_ex.state_info)
)
elif isinstance(cmd, commands.RunExistingTask):
task_ex = task_handler.run_existing_task(
cmd.task_ex.id,
reset=cmd.reset
)
if task_ex.state == states.ERROR:
wf_handler.fail_workflow(
wf_ex,
'Failed to start task [task_ex=%s]: %s' %
(task_ex, task_ex.state_info)
)
elif isinstance(cmd, commands.SetWorkflowState):
if states.is_completed(cmd.new_state):
self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
else:
wf_handler.set_execution_state(wf_ex, cmd.new_state)
elif isinstance(cmd, commands.Noop):
# Do nothing.
pass
else:
raise RuntimeError('Unsupported workflow command: %s' % cmd)
if wf_ex.state != states.RUNNING:
break
# TODO(rakhmerov): This method may not be needed at all because error
# handling is now implemented too roughly w/o distinguishing different
# errors. On most errors (like YAQLException) we shouldn't rollback
# transactions, we just need to fail corresponding execution objects
# where a problem happened (action, task or workflow).
@staticmethod
def _fail_workflow(wf_ex_id, exc):
"""Private helper to fail workflow on exceptions."""
with db_api.transaction():
wf_ex = db_api.load_workflow_execution(wf_ex_id)
if wf_ex is None:
LOG.error(
"Can't fail workflow execution with id='%s': not found.",
wf_ex_id
)
return None
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
if not states.is_paused_or_completed(wf_ex.state):
wf_handler.set_execution_state(wf_ex, states.ERROR, str(exc))
return wf_ex

View File

@ -0,0 +1,46 @@
# Copyright 2016 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral import exceptions as exc
from mistral.workflow import commands
from mistral.workflow import states
def dispatch_workflow_commands(wf_ex, wf_cmds):
# TODO(rakhmerov): I don't like these imports but otherwise we have
# import cycles.
from mistral.engine import task_handler
from mistral.engine import workflow_handler as wf_handler
if not wf_cmds:
return
for cmd in wf_cmds:
if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
task_handler.run_task(cmd)
elif isinstance(cmd, commands.SetWorkflowState):
# TODO(rakhmerov): Make just a single call to workflow_handler
if states.is_completed(cmd.new_state):
wf_handler.stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
else:
wf_handler.set_workflow_state(wf_ex, cmd.new_state)
elif isinstance(cmd, commands.Noop):
# Do nothing.
pass
else:
raise exc.MistralError('Unsupported workflow command: %s' % cmd)
if wf_ex.state != states.RUNNING:
break

View File

@ -15,7 +15,6 @@
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import base from mistral.engine import base
from mistral.engine import rpc
from mistral import expressions from mistral import expressions
from mistral.services import scheduler from mistral.services import scheduler
from mistral.utils import wf_trace from mistral.utils import wf_trace
@ -24,8 +23,8 @@ from mistral.workflow import states
import six import six
_ENGINE_CLIENT_PATH = 'mistral.engine.rpc.get_engine_client' _CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task'
_RUN_EXISTING_TASK_PATH = 'mistral.engine.task_handler.run_existing_task' _COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task'
def _log_task_delay(task_ex, delay_sec): def _log_task_delay(task_ex, delay_sec):
@ -180,7 +179,7 @@ class WaitBeforePolicy(base.TaskPolicy):
policy_context = runtime_context[context_key] policy_context = runtime_context[context_key]
if policy_context.get('skip'): if policy_context.get('skip'):
# Unset state 'DELAYED'. # Unset state 'RUNNING_DELAYED'.
wf_trace.info( wf_trace.info(
task_ex, task_ex,
"Task '%s' [%s -> %s]" "Task '%s' [%s -> %s]"
@ -193,13 +192,16 @@ class WaitBeforePolicy(base.TaskPolicy):
if task_ex.state != states.IDLE: if task_ex.state != states.IDLE:
policy_context.update({'skip': True}) policy_context.update({'skip': True})
_log_task_delay(task_ex, self.delay) _log_task_delay(task_ex, self.delay)
task_ex.state = states.RUNNING_DELAYED task_ex.state = states.RUNNING_DELAYED
# TODO(rakhmerov): This is wrong as task handler doesn't manage
# transactions and hence it can't be called explicitly.
scheduler.schedule_call( scheduler.schedule_call(
None, None,
_RUN_EXISTING_TASK_PATH, _CONTINUE_TASK_PATH,
self.delay, self.delay,
task_ex_id=task_ex.id, task_ex_id=task_ex.id,
) )
@ -228,6 +230,7 @@ class WaitAfterPolicy(base.TaskPolicy):
task_ex.runtime_context = runtime_context task_ex.runtime_context = runtime_context
policy_context = runtime_context[context_key] policy_context = runtime_context[context_key]
if policy_context.get('skip'): if policy_context.get('skip'):
# Skip, already processed. # Skip, already processed.
return return
@ -236,17 +239,25 @@ class WaitAfterPolicy(base.TaskPolicy):
_log_task_delay(task_ex, self.delay) _log_task_delay(task_ex, self.delay)
state = task_ex.state end_state = task_ex.state
end_state_info = task_ex.state_info
# TODO(rakhmerov): Policies probably needs to have tasks.Task
# interface in order to change manage task state safely.
# Set task state to 'DELAYED'. # Set task state to 'DELAYED'.
task_ex.state = states.RUNNING_DELAYED task_ex.state = states.RUNNING_DELAYED
task_ex.state_info = (
'Suspended by wait-after policy for %s seconds' % self.delay
)
# Schedule to change task state to RUNNING again. # Schedule to change task state to RUNNING again.
scheduler.schedule_call( scheduler.schedule_call(
_ENGINE_CLIENT_PATH, None,
'on_task_state_change', _COMPLETE_TASK_PATH,
self.delay, self.delay,
state=state,
task_ex_id=task_ex.id, task_ex_id=task_ex.id,
state=end_state,
state_info=end_state_info
) )
@ -339,7 +350,7 @@ class RetryPolicy(base.TaskPolicy):
scheduler.schedule_call( scheduler.schedule_call(
None, None,
_RUN_EXISTING_TASK_PATH, _CONTINUE_TASK_PATH,
self.delay, self.delay,
task_ex_id=task_ex.id, task_ex_id=task_ex.id,
) )
@ -360,7 +371,7 @@ class TimeoutPolicy(base.TaskPolicy):
scheduler.schedule_call( scheduler.schedule_call(
None, None,
'mistral.engine.policies.fail_task_if_incomplete', 'mistral.engine.policies._fail_task_if_incomplete',
self.delay, self.delay,
task_ex_id=task_ex.id, task_ex_id=task_ex.id,
timeout=self.delay timeout=self.delay
@ -424,21 +435,35 @@ class ConcurrencyPolicy(base.TaskPolicy):
task_ex.runtime_context = runtime_context task_ex.runtime_context = runtime_context
def fail_task_if_incomplete(task_ex_id, timeout): def _continue_task(task_ex_id):
from mistral.engine import task_handler
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed.
task_handler.continue_task(db_api.get_task_execution(task_ex_id))
def _complete_task(task_ex_id, state, state_info):
from mistral.engine import task_handler
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed.
task_handler.complete_task(
db_api.get_task_execution(task_ex_id),
state,
state_info
)
def _fail_task_if_incomplete(task_ex_id, timeout):
from mistral.engine import task_handler
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed.
task_ex = db_api.get_task_execution(task_ex_id) task_ex = db_api.get_task_execution(task_ex_id)
if not states.is_completed(task_ex.state): if not states.is_completed(task_ex.state):
msg = "Task timed out [id=%s, timeout(s)=%s]." % (task_ex_id, timeout) msg = 'Task timed out [timeout(s)=%s].' % timeout
wf_trace.info(task_ex, msg) task_handler.complete_task(
db_api.get_task_execution(task_ex_id),
wf_trace.info(
task_ex,
"Task '%s' [%s -> ERROR]" % (task_ex.name, task_ex.state)
)
rpc.get_engine_client().on_task_state_change(
task_ex_id,
states.ERROR, states.ERROR,
msg msg
) )

View File

@ -503,11 +503,12 @@ class ExecutorClient(base.Executor):
:param transport: Messaging transport. :param transport: Messaging transport.
:type transport: Transport. :type transport: Transport.
""" """
self.topic = cfg.CONF.executor.topic
serializer = auth_ctx.RpcContextSerializer( serializer = auth_ctx.RpcContextSerializer(
auth_ctx.JsonPayloadSerializer() auth_ctx.JsonPayloadSerializer()
) )
self.topic = cfg.CONF.executor.topic
self._client = messaging.RPCClient( self._client = messaging.RPCClient(
transport, transport,
messaging.Target(), messaging.Target(),
@ -539,8 +540,15 @@ class ExecutorClient(base.Executor):
rpc_client_method = call_ctx.cast if async else call_ctx.call rpc_client_method = call_ctx.cast if async else call_ctx.call
return rpc_client_method( res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs)
auth_ctx.ctx(),
'run_action', # TODO(rakhmerov): It doesn't seem a good approach since we have
**kwargs # a serializer for Result class. A better solution would be to
# use a composite serializer that dispatches serialization and
# deserialization to concrete serializers depending on object
# type.
return (
wf_utils.Result(data=res['data'], error=res['error'])
if res else None
) )

View File

@ -1,5 +1,6 @@
# Copyright 2015 - Mirantis, Inc. # Copyright 2015 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc. # Copyright 2015 - StackStorm, Inc.
# Copyright 2016 - Nokia Networks.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -13,28 +14,15 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy
import operator
from oslo_log import log as logging from oslo_log import log as logging
import traceback as tb
from mistral.db.v2 import api as db_api from mistral.engine import tasks
from mistral.db.v2.sqlalchemy import models from mistral.engine import workflow_handler as wf_handler
from mistral.engine import action_handler
from mistral.engine import policies
from mistral.engine import rpc
from mistral.engine import utils as e_utils
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.services import executions as wf_ex_service
from mistral.services import scheduler
from mistral import utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow from mistral.workflow import commands as wf_cmds
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import utils as wf_utils
from mistral.workflow import with_items
"""Responsible for running tasks and handling results.""" """Responsible for running tasks and handling results."""
@ -42,548 +30,145 @@ from mistral.workflow import with_items
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def run_existing_task(task_ex_id, reset=True): def run_task(wf_cmd):
"""This function runs existing task execution. """Runs workflow task.
It is needed mostly by scheduler. :param wf_cmd: Workflow command.
:param task_ex_id: Task execution id.
:param reset: Reset action executions for the task.
""" """
task_ex = db_api.get_task_execution(task_ex_id)
task_spec = spec_parser.get_task_spec(task_ex.spec)
wf_def = db_api.get_workflow_definition(task_ex.workflow_name)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
# Throw exception if the existing task already succeeded. task = _build_task_from_command(wf_cmd)
if task_ex.state == states.SUCCESS:
raise exc.EngineException(
'Rerunning existing task that already succeeded is not supported.'
)
# Exit if the existing task failed and reset is not instructed.
# For a with-items task without reset, re-running the existing
# task will re-run the failed and unstarted items.
if (task_ex.state == states.ERROR and not reset and
not task_spec.get_with_items()):
return task_ex
# Reset nested executions only if task is not already RUNNING.
if task_ex.state != states.RUNNING:
# Reset state of processed task and related action executions.
if reset:
action_exs = task_ex.executions
else:
action_exs = db_api.get_action_executions(
task_execution_id=task_ex.id,
state=states.ERROR,
accepted=True
)
for action_ex in action_exs:
action_ex.accepted = False
# Explicitly change task state to RUNNING.
set_task_state(task_ex, states.RUNNING, None, processed=False)
_run_existing_task(task_ex, task_spec, wf_spec)
return task_ex
def _run_existing_task(task_ex, task_spec, wf_spec):
try: try:
input_dicts = _get_input_dictionaries( task.run()
wf_spec,
task_ex,
task_spec,
task_ex.in_context
)
except exc.MistralException as e: except exc.MistralException as e:
LOG.error( wf_ex = wf_cmd.wf_ex
'An error while calculating task action inputs' task_spec = wf_cmd.task_spec
' [task_execution_id=%s]: %s',
task_ex.id, e msg = (
"Failed to run task [wf=%s, task=%s]: %s\n%s" %
(wf_ex, task_spec.get_name(), e, tb.format_exc())
) )
set_task_state(task_ex, states.ERROR, str(e)) LOG.error(msg)
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(wf_ex, msg)
return return
# In some cases we can have no input, e.g. in case of 'with-items'. if task.is_completed():
if input_dicts: wf_handler.check_workflow_completion(wf_cmd.wf_ex)
for index, input_d in input_dicts:
_run_action_or_workflow(
task_ex,
task_spec,
input_d,
index,
wf_spec
)
else:
_schedule_noop_action(task_ex, task_spec, wf_spec)
def defer_task(wf_cmd): def on_action_complete(action_ex):
"""Defers a task""" """Handles action completion event.
ctx = wf_cmd.ctx
wf_ex = wf_cmd.wf_ex
task_spec = wf_cmd.task_spec
if wf_utils.find_task_executions_by_spec(wf_ex, task_spec): :param action_ex: Action execution.
return None
return _create_task_execution(
wf_ex,
task_spec,
ctx,
state=states.WAITING
)
def run_new_task(wf_cmd, wf_spec):
"""Runs a task."""
ctx = wf_cmd.ctx
wf_ex = wf_cmd.wf_ex
task_spec = wf_cmd.task_spec
# NOTE(xylan): Need to think how to get rid of this weird judgment to keep
# it more consistent with the function name.
task_ex = wf_utils.find_task_execution_with_state(
wf_ex,
task_spec,
states.WAITING
)
if task_ex:
set_task_state(task_ex, states.RUNNING, None)
task_ex.in_context = ctx
else:
task_ex = _create_task_execution(wf_ex, task_spec, ctx)
LOG.debug(
'Starting workflow task [workflow=%s, task_spec=%s, init_state=%s]' %
(wf_ex.name, task_spec, task_ex.state)
)
# TODO(rakhmerov): 'concurrency' policy should keep a number of running
# actions/workflows under control so it can't be implemented if it runs
# before any action executions are created.
before_task_start(task_ex, task_spec, wf_spec)
# Policies could possibly change task state.
if task_ex.state != states.RUNNING:
return task_ex
_run_existing_task(task_ex, task_spec, wf_spec)
return task_ex
def on_action_complete(action_ex, wf_spec, result):
"""Handles event of action result arrival.
Given action result this method changes corresponding task execution
object. This method must never be called for the case of individual
action which is not associated with any tasks.
:param action_ex: Action execution objects the result belongs to.
:param wf_spec: Workflow specification.
:param result: Task action/workflow output wrapped into
mistral.workflow.utils.Result instance.
:return Task execution object.
""" """
task_ex = action_ex.task_execution task_ex = action_ex.task_execution
# Ignore if action already completed. if not task_ex:
if (states.is_completed(action_ex.state) and not return
isinstance(action_ex, models.WorkflowExecution)):
return task_ex
task_spec = wf_spec.get_tasks()[task_ex.name] task_spec = spec_parser.get_task_spec(task_ex.spec)
try:
result = action_handler.transform_result(result, task_ex, task_spec)
except exc.YaqlEvaluationException as e:
err_msg = str(e)
LOG.error(
'YAQL error while transforming action result'
' [action_execution_id=%s, result=%s]: %s',
action_ex.id, result, err_msg
)
result = wf_utils.Result(error=err_msg)
# Ignore workflow executions because they're handled during
# workflow completion.
if not isinstance(action_ex, models.WorkflowExecution):
action_handler.store_action_result(action_ex, result)
if result.is_success():
task_state = states.SUCCESS
task_state_info = None
else:
task_state = states.ERROR
task_state_info = result.error
if not task_spec.get_with_items():
_complete_task(task_ex, task_spec, task_state, task_state_info)
else:
with_items.increase_capacity(task_ex)
if with_items.is_completed(task_ex):
_complete_task(
task_ex,
task_spec,
with_items.get_final_state(task_ex),
task_state_info
)
return task_ex
def _create_task_execution(wf_ex, task_spec, ctx, state=states.RUNNING):
task_ex = db_api.create_task_execution({
'name': task_spec.get_name(),
'workflow_execution_id': wf_ex.id,
'workflow_name': wf_ex.workflow_name,
'workflow_id': wf_ex.workflow_id,
'state': state,
'spec': task_spec.to_dict(),
'in_context': ctx,
'published': {},
'runtime_context': {},
'project_id': wf_ex.project_id
})
# Add to collection explicitly so that it's in a proper
# state within the current session.
wf_ex.task_executions.append(task_ex)
return task_ex
def before_task_start(task_ex, task_spec, wf_spec):
for p in policies.build_policies(task_spec.get_policies(), wf_spec):
p.before_task_start(task_ex, task_spec)
def after_task_complete(task_ex, task_spec, wf_spec):
for p in policies.build_policies(task_spec.get_policies(), wf_spec):
p.after_task_complete(task_ex, task_spec)
def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx):
"""Calculates a collection of inputs for task action/workflow.
If the given task is not configured as 'with-items' then return list
will consist of one dictionary containing input that task action/workflow
should run with.
In case of 'with-items' the result list will contain input dictionaries
for all 'with-items' iterations correspondingly.
:return the list of tuples containing indexes
and the corresponding input dict.
"""
if not task_spec.get_with_items():
input_dict = _get_workflow_or_action_input(
wf_spec,
task_ex,
task_spec,
ctx
)
return enumerate([input_dict])
else:
return _get_with_items_input(wf_spec, task_ex, task_spec, ctx)
def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx):
if task_spec.get_action_name():
return _get_action_input(
wf_spec,
task_ex,
task_spec,
ctx
)
elif task_spec.get_workflow_name():
return _get_workflow_input(task_spec, ctx)
else:
raise RuntimeError('Must never happen.')
def _get_with_items_input(wf_spec, task_ex, task_spec, ctx):
"""Calculate input array for separating each action input.
Example:
DSL:
with_items:
- itemX in <% $.arrayI %>
- itemY in <% $.arrayJ %>
Assume arrayI = [1, 2], arrayJ = ['a', 'b'].
with_items_input = {
"itemX": [1, 2],
"itemY": ['a', 'b']
}
Then we get separated input:
inputs_per_item = [
{'itemX': 1, 'itemY': 'a'},
{'itemX': 2, 'itemY': 'b'}
]
:return: the list of tuples containing indexes
and the corresponding input dict.
"""
with_items_inputs = expr.evaluate_recursively(
task_spec.get_with_items(), ctx
)
with_items.validate_input(with_items_inputs)
inputs_per_item = []
for key, value in with_items_inputs.items():
for index, item in enumerate(value):
iter_context = {key: item}
if index >= len(inputs_per_item):
inputs_per_item.append(iter_context)
else:
inputs_per_item[index].update(iter_context)
action_inputs = []
for item_input in inputs_per_item:
new_ctx = utils.merge_dicts(item_input, ctx)
action_inputs.append(_get_workflow_or_action_input(
wf_spec, task_ex, task_spec, new_ctx
))
with_items.prepare_runtime_context(task_ex, task_spec, action_inputs)
indices = with_items.get_indices_for_loop(task_ex)
with_items.decrease_capacity(task_ex, len(indices))
if indices:
current_inputs = operator.itemgetter(*indices)(action_inputs)
return zip(
indices,
current_inputs if isinstance(current_inputs, tuple)
else [current_inputs]
)
return []
def _get_action_input(wf_spec, task_ex, task_spec, ctx):
input_dict = expr.evaluate_recursively(task_spec.get_input(), ctx)
action_spec_name = task_spec.get_action_name()
input_dict = utils.merge_dicts(
input_dict,
_get_action_defaults(task_ex, task_spec),
overwrite=False
)
return action_handler.get_action_input(
action_spec_name,
input_dict,
task_ex.workflow_name,
wf_spec
)
def _get_workflow_input(task_spec, ctx):
return expr.evaluate_recursively(task_spec.get_input(), ctx)
def _run_action_or_workflow(task_ex, task_spec, input_dict, index, wf_spec):
t_name = task_ex.name
if task_spec.get_action_name():
wf_trace.info(
task_ex,
"Task '%s' is RUNNING [action_name = %s]" %
(t_name, task_spec.get_action_name())
)
_schedule_run_action(task_ex, task_spec, input_dict, index, wf_spec)
elif task_spec.get_workflow_name():
wf_trace.info(
task_ex,
"Task '%s' is RUNNING [workflow_name = %s]" %
(t_name, task_spec.get_workflow_name()))
_schedule_run_workflow(task_ex, task_spec, input_dict, index, wf_spec)
def _get_action_defaults(task_ex, task_spec):
actions = task_ex.in_context.get('__env', {}).get('__actions', {})
return actions.get(task_spec.get_action_name(), {})
def _schedule_run_action(task_ex, task_spec, action_input, index, wf_spec):
action_spec_name = task_spec.get_action_name()
action_def = action_handler.resolve_definition(
action_spec_name,
task_ex,
wf_spec
)
action_ex = action_handler.create_action_execution(
action_def,
action_input,
task_ex,
index
)
target = expr.evaluate_recursively(
task_spec.get_target(),
utils.merge_dicts(
copy.deepcopy(action_input),
copy.deepcopy(task_ex.in_context)
)
)
scheduler.schedule_call(
None,
'mistral.engine.action_handler.run_existing_action',
0,
action_ex_id=action_ex.id,
target=target
)
def _schedule_noop_action(task_ex, task_spec, wf_spec):
wf_ex = task_ex.workflow_execution wf_ex = task_ex.workflow_execution
action_def = action_handler.resolve_action_definition( task = _create_task(
'std.noop', wf_ex,
wf_ex.workflow_name, task_spec,
wf_spec.get_name() task_ex.in_context,
task_ex
) )
action_ex = action_handler.create_action_execution(action_def, {}, task_ex)
target = expr.evaluate_recursively(
task_spec.get_target(),
task_ex.in_context
)
scheduler.schedule_call(
None,
'mistral.engine.action_handler.run_existing_action',
0,
action_ex_id=action_ex.id,
target=target
)
def _schedule_run_workflow(task_ex, task_spec, wf_input, index,
parent_wf_spec):
parent_wf_ex = task_ex.workflow_execution
wf_spec_name = task_spec.get_workflow_name()
wf_def = e_utils.resolve_workflow_definition(
parent_wf_ex.workflow_name,
parent_wf_spec.get_name(),
wf_spec_name
)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
wf_params = {
'task_execution_id': task_ex.id,
'with_items_index': index
}
if 'env' in parent_wf_ex.params:
wf_params['env'] = parent_wf_ex.params['env']
for k, v in list(wf_input.items()):
if k not in wf_spec.get_input():
wf_params[k] = v
del wf_input[k]
wf_ex_id, _ = wf_ex_service.create_workflow_execution(
wf_def.name,
wf_input,
"sub-workflow execution",
wf_params,
wf_spec
)
scheduler.schedule_call(
None,
'mistral.engine.task_handler.resume_workflow',
0,
wf_ex_id=wf_ex_id,
env=None
)
def resume_workflow(wf_ex_id, env):
rpc.get_engine_client().resume_workflow(wf_ex_id, env=env)
def _complete_task(task_ex, task_spec, state, state_info=None):
# Ignore if task already completed.
if states.is_completed(task_ex.state):
return []
set_task_state(task_ex, state, state_info)
try: try:
data_flow.publish_variables(task_ex, task_spec) task.on_action_complete(action_ex)
except exc.MistralException as e: except exc.MistralException as e:
LOG.error( task_ex = action_ex.task_execution
'An error while publishing task variables' wf_ex = task_ex.workflow_execution
' [task_execution_id=%s]: %s',
task_ex.id, str(e)
)
set_task_state(task_ex, states.ERROR, str(e)) msg = ("Failed to handle action completion [wf=%s, task=%s,"
" action=%s]: %s\n%s" %
(wf_ex.name, task_ex.name, action_ex.name, e, tb.format_exc()))
if not task_spec.get_keep_result(): LOG.error(msg)
data_flow.destroy_task_result(task_ex)
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(wf_ex, msg)
return
if task.is_completed():
wf_handler.check_workflow_completion(wf_ex)
def set_task_state(task_ex, state, state_info, processed=None): def fail_task(task_ex, msg):
wf_trace.info( task = _build_task_from_execution(task_ex)
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(task_ex.workflow_execution, msg)
def continue_task(task_ex):
task = _build_task_from_execution(task_ex)
# TODO(rakhmerov): Error handling.
task.run()
if task.is_completed():
wf_handler.check_workflow_completion(task_ex.workflow_execution)
def complete_task(task_ex, state, state_info):
task = _build_task_from_execution(task_ex)
# TODO(rakhmerov): Error handling.
task.complete(state, state_info)
if task.is_completed():
wf_handler.check_workflow_completion(task_ex.workflow_execution)
def _build_task_from_execution(task_ex, task_spec=None):
return _create_task(
task_ex.workflow_execution, task_ex.workflow_execution,
"Task execution '%s' [%s -> %s]" % task_spec or spec_parser.get_task_spec(task_ex.spec),
(task_ex.name, task_ex.state, state) task_ex.in_context,
task_ex
) )
task_ex.state = state
task_ex.state_info = state_info
if processed is not None: def _build_task_from_command(cmd):
task_ex.processed = processed if isinstance(cmd, wf_cmds.RunExistingTask):
task = _create_task(
cmd.wf_ex,
spec_parser.get_task_spec(cmd.task_ex.spec),
cmd.ctx,
cmd.task_ex
)
if cmd.reset:
task.reset()
return task
if isinstance(cmd, wf_cmds.RunTask):
task = _create_task(cmd.wf_ex, cmd.task_spec, cmd.ctx)
if cmd.is_waiting():
task.defer()
return task
raise exc.MistralError('Unsupported workflow command: %s' % cmd)
def is_task_completed(task_ex, task_spec): def _create_task(wf_ex, task_spec, ctx, task_ex=None):
if task_spec.get_with_items(): if task_spec.get_with_items():
return with_items.is_completed(task_ex) return tasks.WithItemsTask(wf_ex, task_spec, ctx, task_ex)
return states.is_completed(task_ex.state) return tasks.RegularTask(wf_ex, task_spec, ctx, task_ex)
def need_to_continue(task_ex, task_spec):
# For now continue is available only for with-items.
if task_spec.get_with_items():
return (with_items.has_more_iterations(task_ex)
and with_items.get_concurrency(task_ex))
return False

456
mistral/engine/tasks.py Normal file
View File

@ -0,0 +1,456 @@
# Copyright 2016 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import copy
import operator
from oslo_log import log as logging
import six
from mistral.db.v2 import api as db_api
from mistral.engine import actions
from mistral.engine import dispatcher
from mistral.engine import policies
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral import utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
from mistral.workflow import with_items
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Task(object):
"""Task.
Represents a workflow task and defines interface that can be used by
Mistral engine or its components in order to manipulate with tasks.
"""
def __init__(self, wf_ex, task_spec, ctx, task_ex=None):
self.wf_ex = wf_ex
self.task_spec = task_spec
self.ctx = ctx
self.task_ex = task_ex
self.wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
self.waiting = False
self.reset_flag = False
@abc.abstractmethod
def on_action_complete(self, action_ex):
"""Handle action completion.
:param action_ex: Action execution.
"""
raise NotImplementedError
@abc.abstractmethod
def run(self):
"""Runs task."""
raise NotImplementedError
def defer(self):
"""Defers task.
This methods finds task execution or creates new and puts task
to a waiting state.
"""
if not self.task_ex:
self.task_ex = wf_utils.find_task_executions_by_spec(
self.wf_ex,
self.task_spec
)
if not self.task_ex:
self._create_task_execution()
self.set_state(states.WAITING, 'Task execution is deferred.')
self.waiting = True
def reset(self):
self.reset_flag = True
def set_state(self, state, state_info, processed=None):
"""Sets task state without executing post completion logic.
:param state: New task state.
:param state_info: New state information (i.e. error message).
:param processed: New "processed" flag value.
"""
wf_trace.info(
self.task_ex.workflow_execution,
"Task execution '%s' [%s -> %s]: %s" %
(self.task_ex.id, self.task_ex.state, state, state_info)
)
self.task_ex.state = state
self.task_ex.state_info = state_info
if processed is not None:
self.task_ex.processed = processed
def complete(self, state, state_info=None):
"""Complete task and set specified state.
Method sets specified task state and runs all necessary post
completion logic such as publishing workflow variables and
scheduling new workflow commands.
:param state: New task state.
:param state_info: New state information (i.e. error message).
"""
# Ignore if task already completed.
if states.is_completed(self.task_ex.state):
return
self.set_state(state, state_info)
data_flow.publish_variables(self.task_ex, self.task_spec)
if not self.task_spec.get_keep_result():
# Destroy task result.
for ex in self.task_ex.executions:
if hasattr(ex, 'output'):
ex.output = {}
self._after_task_complete()
# Ignore DELAYED state.
if self.task_ex.state == states.RUNNING_DELAYED:
return
# If workflow is paused we shouldn't schedule new commands
# and mark task as processed.
if states.is_paused(self.wf_ex.state):
return
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow()
# Mark task as processed after all decisions have been made
# upon its completion.
self.task_ex.processed = True
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
def _before_task_start(self):
policies_spec = self.task_spec.get_policies()
for p in policies.build_policies(policies_spec, self.wf_spec):
p.before_task_start(self.task_ex, self.task_spec)
def _after_task_complete(self):
policies_spec = self.task_spec.get_policies()
for p in policies.build_policies(policies_spec, self.wf_spec):
p.after_task_complete(self.task_ex, self.task_spec)
def _create_task_execution(self, state=states.RUNNING):
self.task_ex = db_api.create_task_execution({
'name': self.task_spec.get_name(),
'workflow_execution_id': self.wf_ex.id,
'workflow_name': self.wf_ex.workflow_name,
'workflow_id': self.wf_ex.workflow_id,
'state': state,
'spec': self.task_spec.to_dict(),
'in_context': self.ctx,
'published': {},
'runtime_context': {},
'project_id': self.wf_ex.project_id
})
# Add to collection explicitly so that it's in a proper
# state within the current session.
self.wf_ex.task_executions.append(self.task_ex)
def _get_action_defaults(self):
action_name = self.task_spec.get_action_name()
if not action_name:
return {}
env = self.task_ex.in_context.get('__env', {})
return env.get('__actions', {}).get(action_name, {})
class RegularTask(Task):
"""Regular task.
Takes care of processing regular tasks with one action.
"""
def on_action_complete(self, action_ex):
state = action_ex.state
# TODO(rakhmerov): Here we can define more informative messages
# cases when action is successful and when it's not. For example,
# in state_info we can specify the cause action.
state_info = (None if state == states.SUCCESS
else action_ex.output.get('result'))
self.complete(state, state_info)
def is_completed(self):
return self.task_ex and states.is_completed(self.task_ex.state)
def run(self):
if not self.task_ex:
self._run_new()
else:
self._run_existing()
def _run_new(self):
# NOTE(xylan): Need to think how to get rid of this weird judgment
# to keep it more consistent with the function name.
self.task_ex = wf_utils.find_task_execution_with_state(
self.wf_ex,
self.task_spec,
states.WAITING
)
if self.task_ex:
self.set_state(states.RUNNING, None)
self.task_ex.in_context = self.ctx
else:
self._create_task_execution()
LOG.debug(
'Starting task [workflow=%s, task_spec=%s, init_state=%s]' %
(self.wf_ex.name, self.task_spec, self.task_ex.state)
)
self._before_task_start()
# Policies could possibly change task state.
if self.task_ex.state != states.RUNNING:
return
self._schedule_actions()
def _run_existing(self):
if self.waiting:
return
# Explicitly change task state to RUNNING.
# Throw exception if the existing task already succeeded.
if self.task_ex.state == states.SUCCESS:
raise exc.MistralError(
'Rerunning succeeded tasks is not supported.'
)
self.set_state(states.RUNNING, None, processed=False)
self._reset_actions()
self._schedule_actions()
def _reset_actions(self):
"""Resets task state.
Depending on task type this method may reset task state. For example,
delete all task actions etc.
"""
# Reset state of processed task and related action executions.
if self.reset_flag:
action_exs = self.task_ex.executions
else:
action_exs = db_api.get_action_executions(
task_execution_id=self.task_ex.id,
state=states.ERROR,
accepted=True
)
for action_ex in action_exs:
action_ex.accepted = False
def _schedule_actions(self):
# Regular task schedules just one action.
input_dict = self._get_action_input()
target = self._get_target(input_dict)
action = self._build_action()
action.validate_input(input_dict)
action.schedule(input_dict, target)
def _get_target(self, input_dict):
return expr.evaluate_recursively(
self.task_spec.get_target(),
utils.merge_dicts(
copy.deepcopy(input_dict),
copy.deepcopy(self.ctx)
)
)
def _get_action_input(self, ctx=None):
ctx = ctx or self.ctx
input_dict = expr.evaluate_recursively(self.task_spec.get_input(), ctx)
return utils.merge_dicts(
input_dict,
self._get_action_defaults(),
overwrite=False
)
def _build_action(self):
action_name = self.task_spec.get_action_name()
wf_name = self.task_spec.get_workflow_name()
if wf_name:
return actions.WorkflowAction(wf_name, task_ex=self.task_ex)
if not action_name:
action_name = 'std.noop'
action_def = actions.resolve_action_definition(
action_name,
self.wf_ex.name,
self.wf_spec.get_name()
)
if action_def.spec:
return actions.AdHocAction(action_def, task_ex=self.task_ex)
return actions.PythonAction(action_def, task_ex=self.task_ex)
class WithItemsTask(RegularTask):
"""With-items task.
Takes care of processing "with-items" tasks.
"""
def on_action_complete(self, action_ex):
state = action_ex.state
# TODO(rakhmerov): Here we can define more informative messages
# cases when action is successful and when it's not. For example,
# in state_info we can specify the cause action.
state_info = (None if state == states.SUCCESS
else action_ex.output.get('result'))
with_items.increase_capacity(self.task_ex)
if with_items.is_completed(self.task_ex):
self.complete(
with_items.get_final_state(self.task_ex),
state_info
)
return
if (with_items.has_more_iterations(self.task_ex)
and with_items.get_concurrency(self.task_ex)):
self._schedule_actions()
def _schedule_actions(self):
input_dicts = self._get_with_items_input()
if not input_dicts:
self.complete(states.SUCCESS)
return
for idx, input_dict in input_dicts:
target = self._get_target(input_dict)
action = self._build_action()
action.schedule(input_dict, target, index=idx)
def _get_with_items_input(self):
"""Calculate input array for separating each action input.
Example:
DSL:
with_items:
- itemX in <% $.arrayI %>
- itemY in <% $.arrayJ %>
Assume arrayI = [1, 2], arrayJ = ['a', 'b'].
with_items_input = {
"itemX": [1, 2],
"itemY": ['a', 'b']
}
Then we get separated input:
inputs_per_item = [
{'itemX': 1, 'itemY': 'a'},
{'itemX': 2, 'itemY': 'b'}
]
:return: the list of tuples containing indexes
and the corresponding input dict.
"""
with_items_inputs = expr.evaluate_recursively(
self.task_spec.get_with_items(),
self.ctx
)
with_items.validate_input(with_items_inputs)
inputs_per_item = []
for key, value in with_items_inputs.items():
for index, item in enumerate(value):
iter_context = {key: item}
if index >= len(inputs_per_item):
inputs_per_item.append(iter_context)
else:
inputs_per_item[index].update(iter_context)
action_inputs = []
for item_input in inputs_per_item:
new_ctx = utils.merge_dicts(item_input, self.ctx)
action_inputs.append(self._get_action_input(new_ctx))
with_items.prepare_runtime_context(
self.task_ex,
self.task_spec,
action_inputs
)
indices = with_items.get_indices_for_loop(self.task_ex)
with_items.decrease_capacity(self.task_ex, len(indices))
if indices:
current_inputs = operator.itemgetter(*indices)(action_inputs)
return zip(
indices,
current_inputs if isinstance(current_inputs, tuple)
else [current_inputs]
)
return []

View File

@ -21,6 +21,9 @@ from mistral import exceptions as exc
from mistral import utils from mistral import utils
# TODO(rakhmerov): This method is too abstract, validation rules may vary
# depending on object type (action, wf), it's not clear what it can be
# applied to.
def validate_input(definition, input, spec=None): def validate_input(definition, input, spec=None):
input_param_names = copy.deepcopy(list((input or {}).keys())) input_param_names = copy.deepcopy(list((input or {}).keys()))
missing_param_names = [] missing_param_names = []

View File

@ -1,4 +1,4 @@
# Copyright 2015 - Mirantis, Inc. # Copyright 2016 - Nokia Networks.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,18 +12,89 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import rpc from mistral.engine import rpc
from mistral.engine import task_handler
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.services import scheduler from mistral.services import scheduler
from mistral import utils
from mistral.utils import wf_trace from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import data_flow from mistral.workflow import data_flow
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
def on_task_complete(task_ex):
wf_ex = task_ex.workflow_execution
check_workflow_completion(wf_ex)
def check_workflow_completion(wf_ex):
if states.is_paused_or_completed(wf_ex.state):
return
# Workflow is not completed if there are any incomplete task
# executions that are not in WAITING state. If all incomplete
# tasks are waiting and there are no unhandled errors, then these
# tasks will not reach completion. In this case, mark the
# workflow complete.
incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex)
if any(not states.is_waiting(t.state) for t in incomplete_tasks):
return
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
if wf_ctrl.all_errors_handled():
succeed_workflow(
wf_ex,
wf_ctrl.evaluate_workflow_final_context(),
wf_spec
)
else:
state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex)
fail_workflow(wf_ex, state_info)
def stop_workflow(wf_ex, state, message=None):
if state == states.SUCCESS:
wf_ctrl = wf_base.get_controller(wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
'Failed to get final context for %s: %s' % (wf_ex, e)
)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
return succeed_workflow(
wf_ex,
final_context,
wf_spec,
message
)
elif state == states.ERROR:
return fail_workflow(wf_ex, message)
return wf_ex
def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None): def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None):
# Fail workflow if output is not successfully evaluated. # Fail workflow if output is not successfully evaluated.
try: try:
@ -35,7 +106,7 @@ def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None):
return fail_workflow(wf_ex, e.message) return fail_workflow(wf_ex, e.message)
# Set workflow execution to success until after output is evaluated. # Set workflow execution to success until after output is evaluated.
set_execution_state(wf_ex, states.SUCCESS, state_info) set_workflow_state(wf_ex, states.SUCCESS, state_info)
if wf_ex.task_execution_id: if wf_ex.task_execution_id:
_schedule_send_result_to_parent_workflow(wf_ex) _schedule_send_result_to_parent_workflow(wf_ex)
@ -47,7 +118,16 @@ def fail_workflow(wf_ex, state_info):
if states.is_paused_or_completed(wf_ex.state): if states.is_paused_or_completed(wf_ex.state):
return wf_ex return wf_ex
set_execution_state(wf_ex, states.ERROR, state_info) set_workflow_state(wf_ex, states.ERROR, state_info)
# When we set an ERROR state we should safely set output value getting
# w/o exceptions due to field size limitations.
state_info = utils.cut_by_kb(
state_info,
cfg.CONF.engine.execution_field_size_limit_kb
)
wf_ex.output = {'result': state_info}
if wf_ex.task_execution_id: if wf_ex.task_execution_id:
_schedule_send_result_to_parent_workflow(wf_ex) _schedule_send_result_to_parent_workflow(wf_ex)
@ -84,7 +164,9 @@ def send_result_to_parent_workflow(wf_ex_id):
) )
def set_execution_state(wf_ex, state, state_info=None, set_upstream=False): # TODO(rakhmerov): Should not be public, should be encapsulated inside Workflow
# abstraction.
def set_workflow_state(wf_ex, state, state_info=None, set_upstream=False):
cur_state = wf_ex.state cur_state = wf_ex.state
if states.is_valid_transition(cur_state, state): if states.is_valid_transition(cur_state, state):
@ -109,24 +191,28 @@ def set_execution_state(wf_ex, state, state_info=None, set_upstream=False):
# If specified, then recursively set the state of the parent workflow # If specified, then recursively set the state of the parent workflow
# executions to the same state. Only changing state to RUNNING is # executions to the same state. Only changing state to RUNNING is
# supported. # supported.
# TODO(rakhmerov): I don't like this hardcoded special case. It's
# used only to continue the workflow (rerun) but at the first glance
# seems like a generic behavior. Need to handle it differently.
if set_upstream and state == states.RUNNING and wf_ex.task_execution_id: if set_upstream and state == states.RUNNING and wf_ex.task_execution_id:
task_ex = db_api.get_task_execution(wf_ex.task_execution_id) task_ex = db_api.get_task_execution(wf_ex.task_execution_id)
parent_wf_ex = lock_workflow_execution(task_ex.workflow_execution_id) parent_wf_ex = lock_workflow_execution(task_ex.workflow_execution_id)
set_execution_state( set_workflow_state(
parent_wf_ex, parent_wf_ex,
state, state,
state_info=state_info, state_info=state_info,
set_upstream=set_upstream set_upstream=set_upstream
) )
task_handler.set_task_state( # TODO(rakhmerov): How do we need to set task state properly?
task_ex, # It doesn't seem right to intervene into the parent workflow
state, # internals. We just need to communicate changes back to parent
state_info=None, # worklfow and it should do what's needed itself.
processed=False task_ex.state = state
) task_ex.state_info = None
task_ex.processed = False
def lock_workflow_execution(wf_ex_id): def lock_workflow_execution(wf_ex_id):

View File

@ -59,7 +59,7 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
'context': copy.deepcopy(wf_input) or {}, 'context': copy.deepcopy(wf_input) or {},
'task_execution_id': params.get('task_execution_id'), 'task_execution_id': params.get('task_execution_id'),
'runtime_context': { 'runtime_context': {
'with_items_index': params.get('with_items_index', 0) 'index': params.get('index', 0)
}, },
}) })
@ -92,4 +92,4 @@ def create_workflow_execution(wf_identifier, wf_input, description, params,
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier) wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier)
return wf_ex.id, wf_spec return wf_ex, wf_spec

View File

@ -162,9 +162,9 @@ class CallScheduler(periodic_task.PeriodicTasks):
) )
for (target_auth_context, target_method, method_args) in delayed_calls: for (target_auth_context, target_method, method_args) in delayed_calls:
# TODO(rakhmerov): https://bugs.launchpad.net/mistral/+bug/1484521
# Transaction is needed here because some of the # Transaction is needed here because some of the
# target_method can use the DB # target_method can use the DB.
with db_api.transaction(): with db_api.transaction():
try: try:
# Set the correct context for the method. # Set the correct context for the method.
@ -175,9 +175,7 @@ class CallScheduler(periodic_task.PeriodicTasks):
# Call the method. # Call the method.
target_method(**method_args) target_method(**method_args)
except Exception as e: except Exception as e:
LOG.error( LOG.exception("Delayed call failed [exception=%s]: %s", e)
"Delayed call failed [exception=%s]", e
)
finally: finally:
# Remove context. # Remove context.
context.set_ctx(None) context.set_ctx(None)

View File

@ -31,7 +31,7 @@ LOG = logging.getLogger(__name__)
# Default delay and timeout in seconds for await_xxx() functions. # Default delay and timeout in seconds for await_xxx() functions.
DEFAULT_DELAY = 1 DEFAULT_DELAY = 1
DEFAULT_TIMEOUT = 60 DEFAULT_TIMEOUT = 20
def launch_engine_server(transport, engine): def launch_engine_server(transport, engine):
@ -135,18 +135,38 @@ class EngineTestCase(base.DbTestCase):
wf_execs = db_api.get_workflow_executions() wf_execs = db_api.get_workflow_executions()
for wf_ex in wf_execs: for w in wf_execs:
print( print(
"\n%s [state=%s, output=%s]" % "\n%s [state=%s, state_info=%s, output=%s]" %
(wf_ex.name, wf_ex.state, wf_ex.output) (w.name, w.state, w.state_info, w.output)
) )
for t_ex in wf_ex.task_executions: for t in w.task_executions:
print( print(
"\t%s [id=%s, state=%s, published=%s]" % "\t%s [id=%s, state=%s, state_info=%s, processed=%s,"
(t_ex.name, t_ex.id, t_ex.state, t_ex.published) " published=%s]" %
(t.name,
t.id,
t.state,
t.state_info,
t.processed,
t.published)
) )
a_execs = db_api.get_action_executions(task_execution_id=t.id)
for a in a_execs:
print(
"\t\t%s [id=%s, state=%s, state_info=%s, accepted=%s,"
" output=%s]" %
(a.name,
a.id,
a.state,
a.state_info,
a.accepted,
a.output)
)
# Various methods for abstract execution objects. # Various methods for abstract execution objects.
def is_execution_in_state(self, ex_id, state): def is_execution_in_state(self, ex_id, state):

View File

@ -486,10 +486,10 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
# Note: We need to reread execution to access related tasks. # Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions task1 = self._assert_single_item(wf_ex.task_executions, name='task1')
task1 = self._assert_single_item(tasks, name='task1')
result = data_flow.get_task_execution_result(task1) result = data_flow.get_task_execution_result(task1)
self.assertListEqual([], result) self.assertListEqual([], result)
@ -512,7 +512,7 @@ class DataFlowTest(test_base.BaseTest):
name='my_action', name='my_action',
output={'result': 1}, output={'result': 1},
accepted=True, accepted=True,
runtime_context={'with_items_index': 0} runtime_context={'index': 0}
)] )]
with mock.patch.object(db_api, 'get_action_executions', with mock.patch.object(db_api, 'get_action_executions',
@ -523,14 +523,14 @@ class DataFlowTest(test_base.BaseTest):
name='my_action', name='my_action',
output={'result': 1}, output={'result': 1},
accepted=True, accepted=True,
runtime_context={'with_items_index': 0} runtime_context={'index': 0}
)) ))
action_exs.append(models.ActionExecution( action_exs.append(models.ActionExecution(
name='my_action', name='my_action',
output={'result': 1}, output={'result': 1},
accepted=False, accepted=False,
runtime_context={'with_items_index': 0} runtime_context={'index': 0}
)) ))
with mock.patch.object(db_api, 'get_action_executions', with mock.patch.object(db_api, 'get_action_executions',

View File

@ -143,6 +143,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
""" """
wf_service.create_workflows(wf_text) wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {}) wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id) self.await_execution_success(wf_ex.id)
@ -411,6 +412,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf: wf:
input: input:
- var - var
tasks: tasks:
task1: task1:
action: std.echo output=<% $.var + $.var2 %> action: std.echo output=<% $.var + $.var2 %>
@ -484,7 +486,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
name='task1' name='task1'
) )
self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.ERROR, task_1_ex.state)
# Assert that there is only one action execution and it's SUCCESS. # Assert that there is only one action execution and it's SUCCESS.
task_1_action_exs = db_api.get_action_executions( task_1_action_exs = db_api.get_action_executions(
@ -550,7 +552,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
name='task1' name='task1'
) )
self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.ERROR, task_1_ex.state)
task_1_action_exs = db_api.get_action_executions( task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id task_execution_id=task_1_ex.id

View File

@ -460,18 +460,27 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state) self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info) self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') task_execs = wf_ex.task_executions
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(
task_execs,
name='t1',
state=states.SUCCESS
)
task_2_ex = self._assert_single_item(
task_execs,
name='t2',
state=states.ERROR
)
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
self.assertIsNotNone(task_2_ex.state_info) self.assertIsNotNone(task_2_ex.state_info)
# Resume workflow and re-run failed task. # Resume workflow and re-run failed task.
e = self.assertRaises( e = self.assertRaises(
exc.EngineException, exc.MistralError,
self.engine.rerun_workflow, self.engine.rerun_workflow,
wf_ex.id, wf_ex.id,
task_1_ex.id task_1_ex.id
@ -505,9 +514,12 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state) self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info) self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='t1')
self.assertEqual(states.ERROR, task_1_ex.state) self.assertEqual(states.ERROR, task_1_ex.state)
self.assertIsNotNone(task_1_ex.state_info) self.assertIsNotNone(task_1_ex.state_info)
@ -532,10 +544,13 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_ex.state) self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info) self.assertIsNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') task_execs = wf_ex.task_executions
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
# Check action executions of task 1. # Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -547,8 +562,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# The single action execution that succeeded should not re-run. # The single action execution that succeeded should not re-run.
self.assertEqual(5, len(task_1_action_exs)) self.assertEqual(5, len(task_1_action_exs))
self.assertListEqual(['Task 1.0', 'Task 1.1', 'Task 1.2'], self.assertListEqual(
task_1_ex.published.get('v1')) ['Task 1.0', 'Task 1.1', 'Task 1.2'],
task_1_ex.published.get('v1')
)
# Check action executions of task 2. # Check action executions of task 2.
self.assertEqual(states.SUCCESS, task_2_ex.state) self.assertEqual(states.SUCCESS, task_2_ex.state)

View File

@ -172,8 +172,7 @@ class EnvironmentTest(base.EngineTestCase):
'mistral.actions.std_actions.EchoAction', 'mistral.actions.std_actions.EchoAction',
{}, {},
a_ex.input, a_ex.input,
TARGET, TARGET
True
) )
def test_subworkflow_env_task_input(self): def test_subworkflow_env_task_input(self):

View File

@ -0,0 +1,61 @@
# Copyright 2016 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
from mistral.workflow import states
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
class DirectWorkflowEngineTest(base.EngineTestCase):
def test_action_error(self):
wf_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.fail
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(task_execs, name='task1', state=states.ERROR)
# TODO(rakhmerov): Finish. Need to check more complicated cases.
def test_task_error(self):
# TODO(rakhmerov): Implement.
pass
def test_workflow_error(self):
# TODO(rakhmerov): Implement.
pass

View File

@ -178,13 +178,13 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn( self.assertIn(
'Failure caused by error in tasks: task1', 'Failed to handle action completion [wf=wf, task=task1',
wf_ex.state_info wf_ex.state_info
) )
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
self.assertEqual( self.assertIn(
"Size of 'published' is 1KB which exceeds the limit of 0KB", "Size of 'published' is 1KB which exceeds the limit of 0KB",
task_ex.state_info task_ex.state_info
) )

View File

@ -576,6 +576,7 @@ class JoinEngineTest(base.EngineTestCase):
action: std.noop action: std.noop
on-success: on-success:
- task22 - task22
task22: task22:
action: std.noop action: std.noop
on-success: on-success:
@ -585,6 +586,7 @@ class JoinEngineTest(base.EngineTestCase):
action: std.fail action: std.fail
on-success: on-success:
- task32 - task32
task32: task32:
action: std.noop action: std.noop
on-success: on-success:

View File

@ -303,7 +303,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Resume workflow and re-run failed task. # Resume workflow and re-run failed task.
e = self.assertRaises( e = self.assertRaises(
exc.EngineException, exc.MistralError,
self.engine.rerun_workflow, self.engine.rerun_workflow,
wf_ex.id, wf_ex.id,
task_1_ex.id task_1_ex.id

View File

@ -65,6 +65,7 @@ class RunActionEngineTest(base.EngineTestCase):
# Start action and see the result. # Start action and see the result.
action_ex = self.engine.start_action('std.echo', {'output': 'Hello!'}) action_ex = self.engine.start_action('std.echo', {'output': 'Hello!'})
self.assertIsNotNone(action_ex.output)
self.assertIn('some error', action_ex.output['result']) self.assertIn('some error', action_ex.output['result'])
def test_run_action_save_result(self): def test_run_action_save_result(self):
@ -148,10 +149,15 @@ class RunActionEngineTest(base.EngineTestCase):
self.assertIn('concat', exception.message) self.assertIn('concat', exception.message)
@mock.patch('mistral.engine.action_handler.resolve_action_definition') # TODO(rakhmerov): This is an example of a bad test. It pins to
# implementation details too much and prevents from making refactoring
# easily. When writing tests we should make assertions about
# consequences, not about how internal machinery works, i.e. we need to
# follow "black box" testing paradigm.
@mock.patch('mistral.engine.actions.resolve_action_definition')
@mock.patch('mistral.engine.utils.validate_input') @mock.patch('mistral.engine.utils.validate_input')
@mock.patch('mistral.services.action_manager.get_action_class') @mock.patch('mistral.services.action_manager.get_action_class')
@mock.patch('mistral.engine.action_handler.run_action') @mock.patch('mistral.engine.actions.PythonAction.run')
def test_run_action_with_kwargs_input(self, run_mock, class_mock, def test_run_action_with_kwargs_input(self, run_mock, class_mock,
validate_mock, def_mock): validate_mock, def_mock):
action_def = models.ActionDefinition() action_def = models.ActionDefinition()
@ -172,16 +178,15 @@ class RunActionEngineTest(base.EngineTestCase):
self.engine.start_action('fake_action', {'input': 'Hello'}) self.engine.start_action('fake_action', {'input': 'Hello'})
self.assertEqual(2, def_mock.call_count) self.assertEqual(1, def_mock.call_count)
def_mock.assert_called_with('fake_action', None, None) def_mock.assert_called_with('fake_action')
self.assertEqual(0, validate_mock.call_count) self.assertEqual(0, validate_mock.call_count)
class_ret.assert_called_once_with(input='Hello') class_ret.assert_called_once_with(input='Hello')
run_mock.assert_called_once_with( run_mock.assert_called_once_with(
action_def,
{'input': 'Hello'}, {'input': 'Hello'},
target=None, None,
async=False save=None
) )

View File

@ -181,7 +181,7 @@ class SubworkflowsTest(base.EngineTestCase):
@mock.patch.object(std_actions.EchoAction, 'run', @mock.patch.object(std_actions.EchoAction, 'run',
mock.MagicMock(side_effect=exc.ActionException)) mock.MagicMock(side_effect=exc.ActionException))
def test_subworkflow_error(self): def test_subworkflow_error(self):
wf2_ex = self.engine.start_workflow('wb1.wf2', None) self.engine.start_workflow('wb1.wf2', None)
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
@ -208,11 +208,13 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertEqual(2, len(wf_execs)) self.assertEqual(2, len(wf_execs))
wf2_ex = self._assert_single_item(wf_execs, name='wb2.wf2') wf2_ex = self._assert_single_item(wf_execs, name='wb2.wf2')
self.assertEqual(states.ERROR, wf2_ex.state) self.assertEqual(states.ERROR, wf2_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf2_ex.state_info) self.assertIn('Can not evaluate YAQL expression', wf2_ex.state_info)
# Ensure error message is bubbled up to the main workflow. # Ensure error message is bubbled up to the main workflow.
wf1_ex = self._assert_single_item(wf_execs, name='wb2.wf1') wf1_ex = self._assert_single_item(wf_execs, name='wb2.wf1')
self.assertEqual(states.ERROR, wf1_ex.state) self.assertEqual(states.ERROR, wf1_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf1_ex.state_info) self.assertIn('Can not evaluate YAQL expression', wf1_ex.state_info)

View File

@ -16,7 +16,6 @@ import mock
from oslo_config import cfg from oslo_config import cfg
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import default_engine as de
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.services import workbooks as wb_service from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine import base from mistral.tests.unit.engine import base
@ -139,7 +138,7 @@ workflows:
tasks: tasks:
task1: task1:
action: std.echo output="Hi!" action: std.echo output="Task 1"
on-complete: on-complete:
- task3 - task3
- pause - pause
@ -299,7 +298,7 @@ class WorkflowResumeTest(base.EngineTestCase):
self.engine.resume_workflow(wf_ex.id) self.engine.resume_workflow(wf_ex.id)
self.await_execution_success(wf_ex.id, 1, 5) self.await_execution_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -351,8 +350,7 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_ex.state, wf_ex.state_info) self.assertEqual(states.SUCCESS, wf_ex.state, wf_ex.state_info)
self.assertEqual(4, len(wf_ex.task_executions)) self.assertEqual(4, len(wf_ex.task_executions))
@mock.patch.object(de.DefaultEngine, '_fail_workflow') def test_resume_fails(self):
def test_resume_fails(self, mock_fw):
# Start and pause workflow. # Start and pause workflow.
wb_service.create_workbook_v2(WORKBOOK_DIFFERENT_TASK_STATES) wb_service.create_workbook_v2(WORKBOOK_DIFFERENT_TASK_STATES)
@ -365,7 +363,7 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.PAUSED, wf_ex.state) self.assertEqual(states.PAUSED, wf_ex.state)
# Simulate failure and check if it is handled. # Simulate failure and check if it is handled.
err = exc.MistralException('foo') err = exc.MistralError('foo')
with mock.patch.object( with mock.patch.object(
db_api, db_api,
@ -373,13 +371,11 @@ class WorkflowResumeTest(base.EngineTestCase):
side_effect=err): side_effect=err):
self.assertRaises( self.assertRaises(
exc.MistralException, exc.MistralError,
self.engine.resume_workflow, self.engine.resume_workflow,
wf_ex.id wf_ex.id
) )
mock_fw.assert_called_once_with(wf_ex.id, err)
def test_resume_diff_env_vars(self): def test_resume_diff_env_vars(self):
wb_service.create_workbook_v2(RESUME_WORKBOOK_DIFF_ENV_VAR) wb_service.create_workbook_v2(RESUME_WORKBOOK_DIFF_ENV_VAR)

View File

@ -103,7 +103,7 @@ class DirectWorkflowControllerTest(base.DbTestCase):
state=states.SUCCESS, state=states.SUCCESS,
output={'result': 'Hey'}, output={'result': 'Hey'},
accepted=True, accepted=True,
runtime_context={'with_items_index': 0} runtime_context={'index': 0}
) )
) )

View File

@ -25,9 +25,7 @@ class WithItemsTest(base.BaseTest):
return models.ActionExecution( return models.ActionExecution(
accepted=accepted, accepted=accepted,
state=state, state=state,
runtime_context={ runtime_context={'index': index}
'with_items_index': index
}
) )
def test_get_indices(self): def test_get_indices(self):

View File

@ -23,6 +23,7 @@ from os import path
import shutil import shutil
import six import six
import socket import socket
import sys
import tempfile import tempfile
import threading import threading
import uuid import uuid
@ -169,6 +170,16 @@ def cut(data, length=100):
return string return string
def cut_by_kb(data, kilobytes):
if kilobytes <= 0:
return cut(data)
bytes_per_char = sys.getsizeof('s') - sys.getsizeof('')
length = int(kilobytes * 1024 / bytes_per_char)
return cut(data, length)
def iter_subclasses(cls, _seen=None): def iter_subclasses(cls, _seen=None):
"""Generator over all subclasses of a given class in depth first order.""" """Generator over all subclasses of a given class in depth first order."""

View File

@ -83,7 +83,8 @@ class WorkflowController(object):
self.wf_spec = wf_spec self.wf_spec = wf_spec
def _update_task_ex_env(self, task_ex, env): @staticmethod
def _update_task_ex_env(task_ex, env):
if not env: if not env:
return task_ex return task_ex

View File

@ -45,17 +45,18 @@ class RunTask(WorkflowCommand):
def __init__(self, wf_ex, task_spec, ctx): def __init__(self, wf_ex, task_spec, ctx):
super(RunTask, self).__init__(wf_ex, task_spec, ctx) super(RunTask, self).__init__(wf_ex, task_spec, ctx)
self.wait_flag = False
self.wait = False
def is_waiting(self): def is_waiting(self):
return (self.wait_flag and return (self.wait and
isinstance(self.task_spec, tasks.DirectWorkflowTaskSpec) and isinstance(self.task_spec, tasks.DirectWorkflowTaskSpec) and
self.task_spec.get_join()) self.task_spec.get_join())
def __repr__(self): def __repr__(self):
return ( return (
"Run task [workflow=%s, task=%s, waif_flag=%s]" "Run task [workflow=%s, task=%s, waif_flag=%s]"
% (self.wf_ex.name, self.task_spec.get_name(), self.wait_flag) % (self.wf_ex.name, self.task_spec.get_name(), self.wait)
) )

View File

@ -73,7 +73,7 @@ def get_task_execution_result(task_ex):
# use db_api.get_action_executions here to avoid session-less use cases. # use db_api.get_action_executions here to avoid session-less use cases.
action_execs = db_api.get_action_executions(task_execution_id=task_ex.id) action_execs = db_api.get_action_executions(task_execution_id=task_ex.id)
action_execs.sort( action_execs.sort(
key=lambda x: x.runtime_context.get('with_items_index') key=lambda x: x.runtime_context.get('index')
) )
results = [ results = [
@ -111,12 +111,6 @@ def publish_variables(task_ex, task_spec):
) )
def destroy_task_result(task_ex):
for ex in task_ex.executions:
if hasattr(ex, 'output'):
ex.output = {}
def evaluate_task_outbound_context(task_ex): def evaluate_task_outbound_context(task_ex):
"""Evaluates task outbound Data Flow context. """Evaluates task outbound Data Flow context.

View File

@ -121,7 +121,7 @@ class DirectWorkflowController(base.WorkflowController):
# NOTE(xylan): Decide whether or not a join task should run # NOTE(xylan): Decide whether or not a join task should run
# immediately. # immediately.
if self._is_unsatisfied_join(cmd): if self._is_unsatisfied_join(cmd):
cmd.wait_flag = True cmd.wait = True
cmds.append(cmd) cmds.append(cmd)

View File

@ -20,6 +20,9 @@ from mistral import exceptions as exc
from mistral.workflow import states from mistral.workflow import states
# TODO(rakhmerov): Seems like it makes sense to get rid of this module in favor
# of implementing all the needed logic in engine.tasks.WithItemsTask directly.
_CAPACITY = 'capacity' _CAPACITY = 'capacity'
_CONCURRENCY = 'concurrency' _CONCURRENCY = 'concurrency'
_COUNT = 'count' _COUNT = 'count'
@ -73,7 +76,7 @@ def _get_with_item_indices(exs):
:param exs: List of executions. :param exs: List of executions.
:return: a list of numbers. :return: a list of numbers.
""" """
return sorted(set([ex.runtime_context['with_items_index'] for ex in exs])) return sorted(set([ex.runtime_context['index'] for ex in exs]))
def _get_accepted_act_exs(task_ex): def _get_accepted_act_exs(task_ex):

View File

@ -597,18 +597,14 @@ class ExecutionTestsV2(base.TestCase):
@test.attr(type='negative') @test.attr(type='negative')
def test_create_execution_for_reverse_wf_invalid_start_task(self): def test_create_execution_for_reverse_wf_invalid_start_task(self):
_, wf_ex = self.client.create_execution( self.assertRaises(
exceptions.BadRequest,
self.client.create_execution,
self.reverse_wf['name'], self.reverse_wf['name'],
{ {self.reverse_wf['input']: "Bye"},
self.reverse_wf['input']: "Bye"}, {"task_name": "nonexist"}
{
"task_name": "nonexist"
}
) )
self.assertEqual("ERROR", wf_ex['state'])
self.assertIn("Invalid task name", wf_ex['state_info'])
@test.attr(type='negative') @test.attr(type='negative')
def test_create_execution_forgot_input_params(self): def test_create_execution_forgot_input_params(self):
self.assertRaises(exceptions.BadRequest, self.assertRaises(exceptions.BadRequest,