diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index 78a42e5c..4f52ea4c 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -199,7 +199,7 @@ def validate_long_type_length(cls, field_name, value): size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb # If the size is unlimited. - if (size_limit_kb < 0): + if size_limit_kb < 0: return size_kb = int(sys.getsizeof(str(value)) / 1024) @@ -394,6 +394,8 @@ class CronTrigger(mb.MistralSecureModelBase): # Register all hooks related to secure models. mb.register_secure_model_hooks() +# TODO(rakhmerov): This is a bad solution. It's hard to find in the code, +# configure flexibly etc. Fix it. # Register an event listener to verify that the size of all the long columns # affected by the user do not exceed the limit configuration. for attr_name in ['input', 'output', 'params', 'published']: diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index aa5e152f..118722fa 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -12,302 +12,81 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_log import log as logging +import traceback as tb -from mistral.db.v2 import api as db_api -from mistral.engine import rpc -from mistral.engine import utils as e_utils +from mistral.db.v2.sqlalchemy import models +from mistral.engine import actions +from mistral.engine import task_handler from mistral import exceptions as exc -from mistral import expressions as expr -from mistral.services import action_manager as a_m -from mistral.services import security -from mistral import utils -from mistral.utils import wf_trace from mistral.workbook import parser as spec_parser -from mistral.workflow import states -from mistral.workflow import utils as wf_utils -def create_action_execution(action_def, action_input, task_ex=None, - index=0, description=''): - # TODO(rakhmerov): We can avoid hitting DB at all when calling things like - # create_action_execution(), these operations can be just done using - # SQLAlchemy session (1-level cache) and session flush (on TX commit) would - # send necessary SQL queries to DB. Currently, session flush happens - # on every operation which may not be optimal. The problem with using just - # session level cache is in generating ids. Ids are generated only on - # session flush. And now we have a lot places where we need to have ids - # before TX completion. +LOG = logging.getLogger(__name__) - # Assign the action execution ID here to minimize database calls. - # Otherwise, the input property of the action execution DB object needs - # to be updated with the action execution ID after the action execution - # DB object is created. - action_ex_id = utils.generate_unicode_uuid() - if a_m.has_action_context( - action_def.action_class, action_def.attributes or {}) and task_ex: - action_input.update(a_m.get_action_context(task_ex, action_ex_id)) +def on_action_complete(action_ex, result): + task_ex = action_ex.task_execution - values = { - 'id': action_ex_id, - 'name': action_def.name, - 'spec': action_def.spec, - 'state': states.RUNNING, - 'input': action_input, - 'runtime_context': {'with_items_index': index}, - 'description': description - } + action = _build_action(action_ex) + + try: + action.complete(result) + except exc.MistralException as e: + msg = ("Failed to complete action [action=%s, task=%s]: %s\n%s" % + (action_ex.name, task_ex.name, e, tb.format_exc())) + + LOG.error(msg) + + action.fail(msg) + + if task_ex: + task_handler.fail_task(task_ex, msg) + + return if task_ex: - values.update({ - 'task_execution_id': task_ex.id, - 'workflow_name': task_ex.workflow_name, - 'workflow_id': task_ex.workflow_id, - 'project_id': task_ex.project_id, - }) - else: - values.update({ - 'project_id': security.get_project_id(), - }) - - action_ex = db_api.create_action_execution(values) - - if task_ex: - # Add to collection explicitly so that it's in a proper - # state within the current session. - task_ex.executions.append(action_ex) - - return action_ex + task_handler.on_action_complete(action_ex) -def _inject_action_ctx_for_validating(action_def, input_dict): - if a_m.has_action_context(action_def.action_class, action_def.attributes): - input_dict.update(a_m.get_empty_action_context()) +def _build_action(action_ex): + if isinstance(action_ex, models.WorkflowExecution): + return actions.WorkflowAction(None, action_ex=action_ex) + wf_name = None + wf_spec_name = None -def get_action_input(action_name, input_dict, wf_name=None, wf_spec=None): - action_def = resolve_action_definition( - action_name, - wf_name, - wf_spec.get_name() if wf_spec else None - ) + if action_ex.workflow_name: + wf_name = action_ex.workflow_name + wf_spec = spec_parser.get_workflow_spec( + action_ex.task_execution.workflow_execution.spec + ) + wf_spec_name = wf_spec.get_name() - if action_def.action_class: - _inject_action_ctx_for_validating(action_def, input_dict) + adhoc_action_name = action_ex.runtime_context.get('adhoc_action_name') - # NOTE(xylan): Don't validate action input if action initialization method - # contains ** argument. - if '**' not in action_def.input: - e_utils.validate_input(action_def, input_dict) - - if action_def.spec: - # Ad-hoc action. - return _get_adhoc_action_input( - action_def, - input_dict, + if adhoc_action_name: + action_def = actions.resolve_action_definition( + adhoc_action_name, wf_name, - wf_spec + wf_spec_name ) - return input_dict + return actions.AdHocAction(action_def, action_ex=action_ex) - -def _get_adhoc_action_input(action_def, input_dict, - wf_name=None, wf_spec=None): - action_spec = spec_parser.get_action_spec(action_def.spec) - - base_name = action_spec.get_base() - - action_def = resolve_action_definition( - base_name, - wf_name if wf_name else None, - wf_spec.get_name() if wf_spec else None - ) - - _inject_action_ctx_for_validating(action_def, input_dict) - e_utils.validate_input(action_def, input_dict, action_spec) - - base_input = action_spec.get_base_input() - - if base_input: - input_dict = expr.evaluate_recursively( - base_input, - input_dict - ) - else: - input_dict = {} - - return input_dict - - -def run_action(action_def, action_input, - action_ex_id=None, target=None, async=True): - action_result = rpc.get_executor_client().run_action( - action_ex_id, - action_def.action_class, - action_def.attributes or {}, - action_input, - target, - async - ) - - if action_result: - return _get_action_output(action_result) - - -def _get_action_output(result): - """Returns action output. - - :param result: ActionResult instance or ActionResult dict - :return: dict containing result. - """ - if isinstance(result, dict): - result = wf_utils.Result(result.get('data'), result.get('error')) - - return ({'result': result.data} - if result.is_success() else {'result': result.error}) - - -def store_action_result(action_ex, result): - prev_state = action_ex.state - - action_ex.state = states.SUCCESS if result.is_success() else states.ERROR - action_ex.output = _get_action_output(result) - - action_ex.accepted = True - - _log_action_result(action_ex, prev_state, action_ex.state, result) - - return action_ex - - -def _log_action_result(action_ex, from_state, to_state, result): - def _result_msg(): - if action_ex.state == states.ERROR: - return "error = %s" % utils.cut(result.error) - - return "result = %s" % utils.cut(result.data) - - wf_trace.info( - None, - "Action execution '%s' [%s -> %s, %s]" % - (action_ex.name, from_state, to_state, _result_msg()) - ) - - -def run_existing_action(action_ex_id, target): - action_ex = db_api.get_action_execution(action_ex_id) - action_def = db_api.get_action_definition(action_ex.name) - - return run_action( - action_def, - action_ex.input, - action_ex_id, - target - ) - - -def resolve_definition(action_name, task_ex=None, wf_spec=None): - if task_ex and wf_spec: - wf_ex = task_ex.workflow_execution - - action_def = resolve_action_definition( - action_name, - wf_ex.workflow_name, - wf_spec.get_name() - ) - else: - action_def = resolve_action_definition(action_name) - - if action_def.spec: - # Ad-hoc action. - action_spec = spec_parser.get_action_spec(action_def.spec) - - base_name = action_spec.get_base() - - action_def = resolve_action_definition( - base_name, - task_ex.workflow_name if task_ex else None, - wf_spec.get_name() if wf_spec else None - ) - - return action_def - - -def resolve_action_definition(action_spec_name, wf_name=None, - wf_spec_name=None): - action_db = None - - if wf_name and wf_name != wf_spec_name: - # If workflow belongs to a workbook then check - # action within the same workbook (to be able to - # use short names within workbooks). - # If it doesn't exist then use a name from spec - # to find an action in DB. - wb_name = wf_name.rstrip(wf_spec_name)[:-1] - - action_full_name = "%s.%s" % (wb_name, action_spec_name) - - action_db = db_api.load_action_definition(action_full_name) - - if not action_db: - action_db = db_api.load_action_definition(action_spec_name) - - if not action_db: - raise exc.InvalidActionException( - "Failed to find action [action_name=%s]" % action_spec_name - ) - - return action_db - - -def transform_result(result, task_ex, task_spec): - """Transforms task result accounting for ad-hoc actions. - - In case if the given result is an action result and action is - an ad-hoc action the method transforms the result according to - ad-hoc action configuration. - - :param result: Result of task action/workflow. - :param task_ex: Task DB model. - :param task_spec: Task specification. - """ - if result.is_error(): - return result - - action_spec_name = task_spec.get_action_name() - - if action_spec_name: - wf_ex = task_ex.workflow_execution - wf_spec_name = wf_ex.spec['name'] - - return transform_action_result( - action_spec_name, - result, - wf_ex.workflow_name, - wf_spec_name, - ) - - return result - - -def transform_action_result(action_spec_name, result, - wf_name=None, wf_spec_name=None): - action_def = resolve_action_definition( - action_spec_name, + action_def = actions.resolve_action_definition( + action_ex.name, wf_name, wf_spec_name ) - if not action_def.spec: - return result + return actions.PythonAction(action_def, action_ex=action_ex) - transformer = spec_parser.get_action_spec(action_def.spec).get_output() - if transformer is None: - return result +def build_action_by_name(action_name): + action_def = actions.resolve_action_definition(action_name) - return wf_utils.Result( - data=expr.evaluate_recursively(transformer, result.data), - error=result.error - ) + action_cls = (actions.PythonAction if not action_def.spec + else actions.AdHocAction) + + return action_cls(action_def) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py new file mode 100644 index 00000000..c209f10a --- /dev/null +++ b/mistral/engine/actions.py @@ -0,0 +1,477 @@ +# Copyright 2016 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from oslo_config import cfg +from oslo_log import log as logging +import six + +from mistral.db.v2 import api as db_api +from mistral.engine import rpc +from mistral.engine import utils as e_utils +from mistral import exceptions as exc +from mistral import expressions as expr +from mistral.services import action_manager as a_m +from mistral.services import executions as wf_ex_service +from mistral.services import scheduler +from mistral.services import security +from mistral import utils +from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import states +from mistral.workflow import utils as wf_utils + + +LOG = logging.getLogger(__name__) + +_RUN_EXISTING_ACTION_PATH = 'mistral.engine.actions._run_existing_action' +_RESUME_WORKFLOW_PATH = 'mistral.engine.actions._resume_workflow' + + +@six.add_metaclass(abc.ABCMeta) +class Action(object): + """Action. + + Represents a workflow action and defines interface that can be used by + Mistral engine or its components in order to manipulate with actions. + """ + + def __init__(self, action_def, action_ex=None, task_ex=None): + self.action_def = action_def + self.action_ex = action_ex + self.task_ex = action_ex.task_execution if action_ex else task_ex + + @abc.abstractmethod + def complete(self, result): + """Complete action and process its result. + + :param result: Action result. + """ + raise NotImplementedError + + def fail(self, msg): + # When we set an ERROR state we should safely set output value getting + # w/o exceptions due to field size limitations. + msg = utils.cut_by_kb( + msg, + cfg.CONF.engine.execution_field_size_limit_kb + ) + + self.action_ex.state = states.ERROR + self.action_ex.output = {'result': msg} + + @abc.abstractmethod + def schedule(self, input_dict, target, index=0, desc=''): + """Schedule action run. + + This method is needed to schedule action run so its result can + be received later by engine. In this sense it will be running in + asynchronous mode from engine perspective (don't confuse with + executor asynchrony when executor doesn't immediately send a + result). + + :param input_dict: Action input. + :param target: Target (group of action executors). + :param index: Action execution index. Makes sense for some types. + :param desc: Action execution description. + """ + raise NotImplementedError + + @abc.abstractmethod + def run(self, input_dict, target, index=0, desc='', save=True): + """Immediately run action. + + This method runs method w/o scheduling its run for a later time. + From engine perspective action will be processed in synchronous + mode. + + :param input_dict: Action input. + :param target: Target (group of action executors). + :param index: Action execution index. Makes sense for some types. + :param desc: Action execution description. + :param save: True if action execution object needs to be saved. + :return: Action output. + """ + raise NotImplementedError + + def validate_input(self, input_dict): + """Validates action input parameters. + + :param input_dict: Dictionary with input parameters. + """ + raise NotImplementedError + + def is_sync(self, input_dict): + """Determines if action is synchronous. + + :param input_dict: Dictionary with input parameters. + """ + return True + + def _create_action_execution(self, input_dict, runtime_ctx, desc=''): + # Assign the action execution ID here to minimize database calls. + # Otherwise, the input property of the action execution DB object needs + # to be updated with the action execution ID after the action execution + # DB object is created. + action_ex_id = utils.generate_unicode_uuid() + + # TODO(rakhmerov): Bad place, we probably need to push action context + # to all actions. It's related to + # https://blueprints.launchpad.net/mistral/+spec/mistral-custom-actions-api + if a_m.has_action_context( + self.action_def.action_class, + self.action_def.attributes or {}) and self.task_ex: + input_dict.update( + a_m.get_action_context(self.task_ex, action_ex_id) + ) + + values = { + 'id': action_ex_id, + 'name': self.action_def.name, + 'spec': self.action_def.spec, + 'state': states.RUNNING, + 'input': input_dict, + 'runtime_context': runtime_ctx, + 'description': desc + } + + if self.task_ex: + values.update({ + 'task_execution_id': self.task_ex.id, + 'workflow_name': self.task_ex.workflow_name, + 'workflow_id': self.task_ex.workflow_id, + 'project_id': self.task_ex.project_id, + }) + else: + values.update({ + 'project_id': security.get_project_id(), + }) + + self.action_ex = db_api.create_action_execution(values) + + if self.task_ex: + # Add to collection explicitly so that it's in a proper + # state within the current session. + self.task_ex.executions.append(self.action_ex) + + def _inject_action_ctx_for_validating(self, input_dict): + if a_m.has_action_context( + self.action_def.action_class, self.action_def.attributes): + input_dict.update(a_m.get_empty_action_context()) + + def _log_result(self, prev_state, result): + state = self.action_ex.state + + def _result_msg(): + if state == states.ERROR: + return "error = %s" % utils.cut(result.error) + + return "result = %s" % utils.cut(result.data) + + wf_trace.info( + None, + "Action execution '%s' [%s -> %s, %s]" % + (self.action_ex.name, prev_state, state, _result_msg()) + ) + + +class PythonAction(Action): + """Regular Python action.""" + + def complete(self, result): + if states.is_completed(self.action_ex.state): + return + + prev_state = self.action_ex.state + + self.action_ex.state = (states.SUCCESS if result.is_success() + else states.ERROR) + self.action_ex.output = self._prepare_output(result) + self.action_ex.accepted = True + + self._log_result(prev_state, result) + + def schedule(self, input_dict, target, index=0, desc=''): + self._create_action_execution( + self._prepare_input(input_dict), + self._prepare_runtime_context(index), + desc=desc + ) + + scheduler.schedule_call( + None, + _RUN_EXISTING_ACTION_PATH, + 0, + action_ex_id=self.action_ex.id, + target=target + ) + + def run(self, input_dict, target, index=0, desc='', save=True): + input_dict = self._prepare_input(input_dict) + runtime_ctx = self._prepare_runtime_context(index) + + if save: + self._create_action_execution(input_dict, runtime_ctx, desc=desc) + + result = rpc.get_executor_client().run_action( + self.action_ex.id if self.action_ex else None, + self.action_def.action_class, + self.action_def.attributes or {}, + input_dict, + target, + async=False + ) + + return self._prepare_output(result) + + def is_sync(self, input_dict): + input_dict = self._prepare_input(input_dict) + + a = a_m.get_action_class(self.action_def.name)(**input_dict) + + return a.is_sync() + + def validate_input(self, input_dict): + if self.action_def.action_class: + self._inject_action_ctx_for_validating(input_dict) + + # TODO(rakhmerov): I'm not sure what this is for. + # NOTE(xylan): Don't validate action input if action initialization + # method contains ** argument. + if '**' not in self.action_def.input: + e_utils.validate_input(self.action_def, input_dict) + + def _prepare_input(self, input_dict): + """Template method to do manipulations with input parameters. + + Python action doesn't do anything specific with initial input. + """ + return input_dict + + def _prepare_output(self, result): + """Template method to do manipulations with action result. + + Python action just wraps action result into dict that can + be stored in DB. + """ + return _get_action_output(result) if result else None + + def _prepare_runtime_context(self, index): + """Template method to prepare action runtime context. + + Python action inserts index into runtime context. + """ + return {'index': index} + + +class AdHocAction(PythonAction): + """Ad-hoc action.""" + + def __init__(self, action_def, action_ex=None, task_ex=None): + self.action_spec = spec_parser.get_action_spec(action_def.spec) + + base_action_def = db_api.get_action_definition( + self.action_spec.get_base() + ) + + super(AdHocAction, self).__init__( + base_action_def, + action_ex, + task_ex + ) + + self.adhoc_action_def = action_def + + def validate_input(self, input_dict): + e_utils.validate_input( + self.adhoc_action_def, + input_dict, + self.action_spec + ) + + super(AdHocAction, self).validate_input( + self._prepare_input(input_dict) + ) + + def _prepare_input(self, input_dict): + base_input_expr = self.action_spec.get_base_input() + + if base_input_expr: + base_input_dict = expr.evaluate_recursively( + base_input_expr, + input_dict + ) + else: + base_input_dict = {} + + return super(AdHocAction, self)._prepare_input(base_input_dict) + + def _prepare_output(self, result): + # In case of error, we don't transform a result. + if not result.is_error(): + adhoc_action_spec = spec_parser.get_action_spec( + self.adhoc_action_def.spec + ) + + transformer = adhoc_action_spec.get_output() + + if transformer is not None: + result = wf_utils.Result( + data=expr.evaluate_recursively(transformer, result.data), + error=result.error + ) + + return _get_action_output(result) if result else None + + def _prepare_runtime_context(self, index): + ctx = super(AdHocAction, self)._prepare_runtime_context(index) + + # Insert special field into runtime context so that we track + # a relationship between python action and adhoc action. + return utils.merge_dicts( + ctx, + {'adhoc_action_name': self.adhoc_action_def.name} + ) + + +class WorkflowAction(Action): + """Workflow action.""" + + def complete(self, result): + # No-op because in case of workflow result is already processed. + pass + + def schedule(self, input_dict, target, index=0, desc=''): + parent_wf_ex = self.task_ex.workflow_execution + parent_wf_spec = spec_parser.get_workflow_spec(parent_wf_ex.spec) + + task_spec = spec_parser.get_task_spec(self.task_ex.spec) + + wf_spec_name = task_spec.get_workflow_name() + + wf_def = e_utils.resolve_workflow_definition( + parent_wf_ex.workflow_name, + parent_wf_spec.get_name(), + wf_spec_name + ) + + wf_spec = spec_parser.get_workflow_spec(wf_def.spec) + + wf_params = { + 'task_execution_id': self.task_ex.id, + 'index': index + } + + if 'env' in parent_wf_ex.params: + wf_params['env'] = parent_wf_ex.params['env'] + + for k, v in list(input_dict.items()): + if k not in wf_spec.get_input(): + wf_params[k] = v + del input_dict[k] + + wf_ex, _ = wf_ex_service.create_workflow_execution( + wf_def.name, + input_dict, + "sub-workflow execution", + wf_params, + wf_spec + ) + + scheduler.schedule_call( + None, + _RESUME_WORKFLOW_PATH, + 0, + wf_ex_id=wf_ex.id, + env=None + ) + + # TODO(rakhmerov): Add info logging. + + def run(self, input_dict, target, index=0, desc='', save=True): + raise NotImplemented('Does not apply to this WorkflowAction.') + + def is_sync(self, input_dict): + # Workflow action is always asynchronous. + return False + + def validate_input(self, input_dict): + # TODO(rakhmerov): Implement. + pass + + +def _resume_workflow(wf_ex_id, env): + rpc.get_engine_client().resume_workflow(wf_ex_id, env=env) + + +def _run_existing_action(action_ex_id, target): + action_ex = db_api.get_action_execution(action_ex_id) + action_def = db_api.get_action_definition(action_ex.name) + + result = rpc.get_executor_client().run_action( + action_ex_id, + action_def.action_class, + action_def.attributes or {}, + action_ex.input, + target + ) + + return _get_action_output(result) if result else None + + +def _get_action_output(result): + """Returns action output. + + :param result: ActionResult instance or ActionResult dict + :return: dict containing result. + """ + if isinstance(result, dict): + result = wf_utils.Result(result.get('data'), result.get('error')) + + return ({'result': result.data} + if result.is_success() else {'result': result.error}) + + +def resolve_action_definition(action_spec_name, wf_name=None, + wf_spec_name=None): + """Resolve action definition accounting for ad-hoc action namespacing. + + :param action_spec_name: Action name according to a spec. + :param wf_name: Workflow name. + :param wf_spec_name: Workflow name according to a spec. + :return: Action definition (python or ad-hoc). + """ + action_db = None + + if wf_name and wf_name != wf_spec_name: + # If workflow belongs to a workbook then check + # action within the same workbook (to be able to + # use short names within workbooks). + # If it doesn't exist then use a name from spec + # to find an action in DB. + wb_name = wf_name.rstrip(wf_spec_name)[:-1] + + action_full_name = "%s.%s" % (wb_name, action_spec_name) + + action_db = db_api.load_action_definition(action_full_name) + + if not action_db: + action_db = db_api.load_action_definition(action_spec_name) + + if not action_db: + raise exc.InvalidActionException( + "Failed to find action [action_name=%s]" % action_spec_name + ) + + return action_db diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 944e25e7..34d4334c 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import traceback - from oslo_log import log as logging from mistral import coordination @@ -22,19 +20,14 @@ from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler from mistral.engine import base -from mistral.engine import task_handler +from mistral.engine import dispatcher from mistral.engine import workflow_handler as wf_handler -from mistral import exceptions as exc -from mistral.services import action_manager as a_m from mistral.services import executions as wf_ex_service from mistral.services import workflows as wf_service from mistral import utils as u -from mistral.utils import wf_trace -from mistral.workbook import parser as spec_parser from mistral.workflow import base as wf_base from mistral.workflow import commands from mistral.workflow import states -from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) @@ -53,240 +46,84 @@ class DefaultEngine(base.Engine, coordination.Service): @u.log_exec(LOG) def start_workflow(self, wf_identifier, wf_input, description='', **params): - wf_ex_id = None - - try: - # Create a persistent workflow execution in a separate transaction - # so that we can return it even in case of unexpected errors that - # lead to transaction rollback. - with db_api.transaction(): - # The new workflow execution will be in an IDLE - # state on initial record creation. - wf_ex_id, wf_spec = wf_ex_service.create_workflow_execution( - wf_identifier, - wf_input, - description, - params - ) - - with db_api.transaction(): - wf_ex = db_api.get_workflow_execution(wf_ex_id) - wf_handler.set_execution_state(wf_ex, states.RUNNING) - - wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - - self._dispatch_workflow_commands( - wf_ex, - wf_ctrl.continue_workflow(), - wf_spec - ) - - return wf_ex.get_clone() - except Exception as e: - LOG.error( - "Failed to start workflow '%s' id=%s: %s\n%s", - wf_identifier, wf_ex_id, e, traceback.format_exc() + with db_api.transaction(): + # TODO(rakhmerov): It needs to be hidden in workflow_handler and + # Workflow abstraction. + # The new workflow execution will be in an IDLE + # state on initial record creation. + wf_ex, wf_spec = wf_ex_service.create_workflow_execution( + wf_identifier, + wf_input, + description, + params ) + wf_handler.set_workflow_state(wf_ex, states.RUNNING) - wf_ex = self._fail_workflow(wf_ex_id, e) + wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - if wf_ex: - return wf_ex.get_clone() + cmds = wf_ctrl.continue_workflow() - raise e + dispatcher.dispatch_workflow_commands(wf_ex, cmds) + + return wf_ex.get_clone() @u.log_exec(LOG) def start_action(self, action_name, action_input, description=None, **params): with db_api.transaction(): - action_def = action_handler.resolve_definition(action_name) - resolved_action_input = action_handler.get_action_input( - action_name, - action_input + action = action_handler.build_action_by_name(action_name) + + action.validate_input(action_input) + + save = params.get('save_result') + target = params.get('target') + + if save or not action.is_sync(action_input): + action.schedule(action_input, target) + + return action.action_ex.get_clone() + + output = action.run(action_input, target, save=save) + + # Action execution is not created but we need to return similar + # object to a client anyway. + return db_models.ActionExecution( + name=action_name, + description=description, + input=action_input, + output=output ) - action = a_m.get_action_class(action_def.name)( - **resolved_action_input - ) - - # If we see action is asynchronous, then we enforce 'save_result'. - if params.get('save_result') or not action.is_sync(): - action_ex = action_handler.create_action_execution( - action_def, - resolved_action_input, - description=description - ) - - action_handler.run_action( - action_def, - resolved_action_input, - action_ex.id, - params.get('target') - ) - - return action_ex.get_clone() - else: - output = action_handler.run_action( - action_def, - resolved_action_input, - target=params.get('target'), - async=False - ) - - return db_models.ActionExecution( - name=action_name, - description=description, - input=action_input, - output=output - ) - - def on_task_state_change(self, task_ex_id, state, state_info=None): - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex_id) - # TODO(rakhmerov): The method is mostly needed for policy and - # we are supposed to get the same action execution as when the - # policy worked. - - wf_ex_id = task_ex.workflow_execution_id - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - wf_trace.info( - task_ex, - "Task '%s' [%s -> %s] state_info : %s" - % (task_ex.name, task_ex.state, state, state_info) - ) - - task_ex.state = state - task_ex.state_info = state_info - - self._on_task_state_change(task_ex, wf_ex, wf_spec) - - def _on_task_state_change(self, task_ex, wf_ex, wf_spec): - task_spec = wf_spec.get_tasks()[task_ex.name] - - if task_handler.is_task_completed(task_ex, task_spec): - task_handler.after_task_complete(task_ex, task_spec, wf_spec) - - # Ignore DELAYED state. - if task_ex.state == states.RUNNING_DELAYED: - return - - wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - - # Calculate commands to process next. - try: - cmds = wf_ctrl.continue_workflow() - except exc.YaqlEvaluationException as e: - LOG.error( - 'YAQL error occurred while calculating next workflow ' - 'commands [wf_ex_id=%s, task_ex_id=%s]: %s', - wf_ex.id, task_ex.id, e - ) - - wf_handler.fail_workflow(wf_ex, str(e)) - - return - - # Mark task as processed after all decisions have been made - # upon its completion. - task_ex.processed = True - - self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) - - self._check_workflow_completion(wf_ex, wf_ctrl, wf_spec) - elif task_handler.need_to_continue(task_ex, task_spec): - # Re-run existing task. - cmds = [commands.RunExistingTask(task_ex, reset=False)] - - self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) - - @staticmethod - def _check_workflow_completion(wf_ex, wf_ctrl, wf_spec): - if states.is_paused_or_completed(wf_ex.state): - return - - # Workflow is not completed if there are any incomplete task - # executions that are not in WAITING state. If all incomplete - # tasks are waiting and there are unhandled errors, then these - # tasks will not reach completion. In this case, mark the - # workflow complete. - incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex) - - if any(not states.is_waiting(t.state) for t in incomplete_tasks): - return - - if wf_ctrl.all_errors_handled(): - wf_handler.succeed_workflow( - wf_ex, - wf_ctrl.evaluate_workflow_final_context(), - wf_spec - ) - else: - state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex) - - wf_handler.fail_workflow(wf_ex, state_info) @u.log_exec(LOG) def on_action_complete(self, action_ex_id, result): - wf_ex_id = None + with db_api.transaction(): + action_ex = db_api.get_action_execution(action_ex_id) - try: - with db_api.transaction(): - action_ex = db_api.get_action_execution(action_ex_id) + task_ex = action_ex.task_execution - # In case of single action execution there is no - # assigned task execution. - if not action_ex.task_execution: - return action_handler.store_action_result( - action_ex, - result - ).get_clone() - - wf_ex_id = action_ex.task_execution.workflow_execution_id - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - task_ex = task_handler.on_action_complete( - action_ex, - wf_spec, - result + if task_ex: + wf_handler.lock_workflow_execution( + task_ex.workflow_execution_id ) - # If workflow is on pause or completed then there's no - # need to continue workflow. - if states.is_paused_or_completed(wf_ex.state): - return action_ex.get_clone() + action_handler.on_action_complete(action_ex, result) - self._on_task_state_change(task_ex, wf_ex, wf_spec) - - return action_ex.get_clone() - except Exception as e: - # TODO(rakhmerov): Need to refactor logging in a more elegant way. - LOG.error( - 'Failed to handle action execution result [id=%s]: %s\n%s', - action_ex_id, e, traceback.format_exc() - ) - - # If an exception was thrown after we got the wf_ex_id - if wf_ex_id: - self._fail_workflow(wf_ex_id, e) - - raise e + return action_ex.get_clone() @u.log_exec(LOG) def pause_workflow(self, wf_ex_id): with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - wf_handler.set_execution_state(wf_ex, states.PAUSED) + wf_handler.set_workflow_state(wf_ex, states.PAUSED) return wf_ex - def _continue_workflow(self, wf_ex, task_ex=None, reset=True, env=None): + @staticmethod + def _continue_workflow(wf_ex, task_ex=None, reset=True, env=None): wf_ex = wf_service.update_workflow_execution_env(wf_ex, env) - wf_handler.set_execution_state( + wf_handler.set_workflow_state( wf_ex, states.RUNNING, set_upstream=True @@ -294,13 +131,15 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ctrl = wf_base.get_controller(wf_ex) - # TODO(rakhmerov): Add YAQL error handling. + # TODO(rakhmerov): Add error handling. # Calculate commands to process next. cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) # When resuming a workflow we need to ignore all 'pause' # commands because workflow controller takes tasks that # completed within the period when the workflow was paused. + # TODO(rakhmerov): This all should be in workflow handler, it's too + # specific for engine level. cmds = list( filter( lambda c: not isinstance(c, commands.PauseWorkflow), @@ -316,165 +155,50 @@ class DefaultEngine(base.Engine, coordination.Service): if states.is_completed(t_ex.state) and not t_ex.processed: t_ex.processed = True - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) + dispatcher.dispatch_workflow_commands(wf_ex, cmds) if not cmds: - if not wf_utils.find_incomplete_task_executions(wf_ex): - wf_handler.succeed_workflow( - wf_ex, - wf_ctrl.evaluate_workflow_final_context(), - wf_spec - ) + wf_handler.check_workflow_completion(wf_ex) return wf_ex.get_clone() @u.log_exec(LOG) def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None): - try: - with db_api.transaction(): - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) + # TODO(rakhmerov): Rewrite this functionality with Task abstraction. + with db_api.transaction(): + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - task_ex = db_api.get_task_execution(task_ex_id) + task_ex = db_api.get_task_execution(task_ex_id) - if task_ex.workflow_execution.id != wf_ex_id: - raise ValueError('Workflow execution ID does not match.') + if task_ex.workflow_execution.id != wf_ex_id: + raise ValueError('Workflow execution ID does not match.') - if wf_ex.state == states.PAUSED: - return wf_ex.get_clone() + if wf_ex.state == states.PAUSED: + return wf_ex.get_clone() - return self._continue_workflow(wf_ex, task_ex, reset, env=env) - except Exception as e: - LOG.error( - "Failed to rerun execution id=%s at task=%s: %s\n%s", - wf_ex_id, task_ex_id, e, traceback.format_exc() - ) - self._fail_workflow(wf_ex_id, e) - raise e + # TODO(rakhmerov): This should be a call to workflow handler. + return self._continue_workflow(wf_ex, task_ex, reset, env=env) @u.log_exec(LOG) def resume_workflow(self, wf_ex_id, env=None): - try: - with db_api.transaction(): - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) + # TODO(rakhmerov): Rewrite this functionality with Task abstraction. + with db_api.transaction(): + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - if (not states.is_paused(wf_ex.state) and - not states.is_idle(wf_ex.state)): - return wf_ex.get_clone() + if (not states.is_paused(wf_ex.state) and + not states.is_idle(wf_ex.state)): + return wf_ex.get_clone() - return self._continue_workflow(wf_ex, env=env) - except Exception as e: - LOG.error( - "Failed to resume execution id=%s: %s\n%s", - wf_ex_id, e, traceback.format_exc() - ) - self._fail_workflow(wf_ex_id, e) - raise e + return self._continue_workflow(wf_ex, env=env) @u.log_exec(LOG) def stop_workflow(self, wf_ex_id, state, message=None): with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - return self._stop_workflow(wf_ex, state, message) - - @staticmethod - def _stop_workflow(wf_ex, state, message=None): - if state == states.SUCCESS: - wf_ctrl = wf_base.get_controller(wf_ex) - - final_context = {} - - try: - final_context = wf_ctrl.evaluate_workflow_final_context() - except Exception as e: - LOG.warning( - 'Failed to get final context for %s: %s' % (wf_ex, e) - ) - - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - return wf_handler.succeed_workflow( - wf_ex, - final_context, - wf_spec, - message - ) - elif state == states.ERROR: - return wf_handler.fail_workflow(wf_ex, message) - - return wf_ex + return wf_handler.stop_workflow(wf_ex, state, message) @u.log_exec(LOG) def rollback_workflow(self, wf_ex_id): # TODO(rakhmerov): Implement. raise NotImplementedError - - def _dispatch_workflow_commands(self, wf_ex, wf_cmds, wf_spec): - if not wf_cmds: - return - - for cmd in wf_cmds: - if isinstance(cmd, commands.RunTask) and cmd.is_waiting(): - task_handler.defer_task(cmd) - elif isinstance(cmd, commands.RunTask): - task_ex = task_handler.run_new_task(cmd, wf_spec) - - if task_ex.state == states.ERROR: - wf_handler.fail_workflow( - wf_ex, - 'Failed to start task [task_ex=%s]: %s' % - (task_ex, task_ex.state_info) - ) - elif isinstance(cmd, commands.RunExistingTask): - task_ex = task_handler.run_existing_task( - cmd.task_ex.id, - reset=cmd.reset - ) - - if task_ex.state == states.ERROR: - wf_handler.fail_workflow( - wf_ex, - 'Failed to start task [task_ex=%s]: %s' % - (task_ex, task_ex.state_info) - ) - elif isinstance(cmd, commands.SetWorkflowState): - if states.is_completed(cmd.new_state): - self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg) - else: - wf_handler.set_execution_state(wf_ex, cmd.new_state) - elif isinstance(cmd, commands.Noop): - # Do nothing. - pass - else: - raise RuntimeError('Unsupported workflow command: %s' % cmd) - - if wf_ex.state != states.RUNNING: - break - - # TODO(rakhmerov): This method may not be needed at all because error - # handling is now implemented too roughly w/o distinguishing different - # errors. On most errors (like YAQLException) we shouldn't rollback - # transactions, we just need to fail corresponding execution objects - # where a problem happened (action, task or workflow). - @staticmethod - def _fail_workflow(wf_ex_id, exc): - """Private helper to fail workflow on exceptions.""" - - with db_api.transaction(): - wf_ex = db_api.load_workflow_execution(wf_ex_id) - - if wf_ex is None: - LOG.error( - "Can't fail workflow execution with id='%s': not found.", - wf_ex_id - ) - return None - - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - - if not states.is_paused_or_completed(wf_ex.state): - wf_handler.set_execution_state(wf_ex, states.ERROR, str(exc)) - - return wf_ex diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py new file mode 100644 index 00000000..e1723308 --- /dev/null +++ b/mistral/engine/dispatcher.py @@ -0,0 +1,46 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from mistral import exceptions as exc +from mistral.workflow import commands +from mistral.workflow import states + + +def dispatch_workflow_commands(wf_ex, wf_cmds): + # TODO(rakhmerov): I don't like these imports but otherwise we have + # import cycles. + from mistral.engine import task_handler + from mistral.engine import workflow_handler as wf_handler + + if not wf_cmds: + return + + for cmd in wf_cmds: + if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)): + task_handler.run_task(cmd) + elif isinstance(cmd, commands.SetWorkflowState): + # TODO(rakhmerov): Make just a single call to workflow_handler + if states.is_completed(cmd.new_state): + wf_handler.stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg) + else: + wf_handler.set_workflow_state(wf_ex, cmd.new_state) + elif isinstance(cmd, commands.Noop): + # Do nothing. + pass + else: + raise exc.MistralError('Unsupported workflow command: %s' % cmd) + + if wf_ex.state != states.RUNNING: + break diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index dafa9dac..47c87bc2 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -15,7 +15,6 @@ from mistral.db.v2 import api as db_api from mistral.engine import base -from mistral.engine import rpc from mistral import expressions from mistral.services import scheduler from mistral.utils import wf_trace @@ -24,8 +23,8 @@ from mistral.workflow import states import six -_ENGINE_CLIENT_PATH = 'mistral.engine.rpc.get_engine_client' -_RUN_EXISTING_TASK_PATH = 'mistral.engine.task_handler.run_existing_task' +_CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task' +_COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task' def _log_task_delay(task_ex, delay_sec): @@ -180,7 +179,7 @@ class WaitBeforePolicy(base.TaskPolicy): policy_context = runtime_context[context_key] if policy_context.get('skip'): - # Unset state 'DELAYED'. + # Unset state 'RUNNING_DELAYED'. wf_trace.info( task_ex, "Task '%s' [%s -> %s]" @@ -193,13 +192,16 @@ class WaitBeforePolicy(base.TaskPolicy): if task_ex.state != states.IDLE: policy_context.update({'skip': True}) + _log_task_delay(task_ex, self.delay) task_ex.state = states.RUNNING_DELAYED + # TODO(rakhmerov): This is wrong as task handler doesn't manage + # transactions and hence it can't be called explicitly. scheduler.schedule_call( None, - _RUN_EXISTING_TASK_PATH, + _CONTINUE_TASK_PATH, self.delay, task_ex_id=task_ex.id, ) @@ -228,6 +230,7 @@ class WaitAfterPolicy(base.TaskPolicy): task_ex.runtime_context = runtime_context policy_context = runtime_context[context_key] + if policy_context.get('skip'): # Skip, already processed. return @@ -236,17 +239,25 @@ class WaitAfterPolicy(base.TaskPolicy): _log_task_delay(task_ex, self.delay) - state = task_ex.state + end_state = task_ex.state + end_state_info = task_ex.state_info + + # TODO(rakhmerov): Policies probably needs to have tasks.Task + # interface in order to change manage task state safely. # Set task state to 'DELAYED'. task_ex.state = states.RUNNING_DELAYED + task_ex.state_info = ( + 'Suspended by wait-after policy for %s seconds' % self.delay + ) # Schedule to change task state to RUNNING again. scheduler.schedule_call( - _ENGINE_CLIENT_PATH, - 'on_task_state_change', + None, + _COMPLETE_TASK_PATH, self.delay, - state=state, task_ex_id=task_ex.id, + state=end_state, + state_info=end_state_info ) @@ -339,7 +350,7 @@ class RetryPolicy(base.TaskPolicy): scheduler.schedule_call( None, - _RUN_EXISTING_TASK_PATH, + _CONTINUE_TASK_PATH, self.delay, task_ex_id=task_ex.id, ) @@ -360,7 +371,7 @@ class TimeoutPolicy(base.TaskPolicy): scheduler.schedule_call( None, - 'mistral.engine.policies.fail_task_if_incomplete', + 'mistral.engine.policies._fail_task_if_incomplete', self.delay, task_ex_id=task_ex.id, timeout=self.delay @@ -424,21 +435,35 @@ class ConcurrencyPolicy(base.TaskPolicy): task_ex.runtime_context = runtime_context -def fail_task_if_incomplete(task_ex_id, timeout): +def _continue_task(task_ex_id): + from mistral.engine import task_handler + + # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. + task_handler.continue_task(db_api.get_task_execution(task_ex_id)) + + +def _complete_task(task_ex_id, state, state_info): + from mistral.engine import task_handler + + # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. + task_handler.complete_task( + db_api.get_task_execution(task_ex_id), + state, + state_info + ) + + +def _fail_task_if_incomplete(task_ex_id, timeout): + from mistral.engine import task_handler + + # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. task_ex = db_api.get_task_execution(task_ex_id) if not states.is_completed(task_ex.state): - msg = "Task timed out [id=%s, timeout(s)=%s]." % (task_ex_id, timeout) + msg = 'Task timed out [timeout(s)=%s].' % timeout - wf_trace.info(task_ex, msg) - - wf_trace.info( - task_ex, - "Task '%s' [%s -> ERROR]" % (task_ex.name, task_ex.state) - ) - - rpc.get_engine_client().on_task_state_change( - task_ex_id, + task_handler.complete_task( + db_api.get_task_execution(task_ex_id), states.ERROR, msg ) diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index 913d580f..eea140cd 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -503,11 +503,12 @@ class ExecutorClient(base.Executor): :param transport: Messaging transport. :type transport: Transport. """ + self.topic = cfg.CONF.executor.topic + serializer = auth_ctx.RpcContextSerializer( auth_ctx.JsonPayloadSerializer() ) - self.topic = cfg.CONF.executor.topic self._client = messaging.RPCClient( transport, messaging.Target(), @@ -539,8 +540,15 @@ class ExecutorClient(base.Executor): rpc_client_method = call_ctx.cast if async else call_ctx.call - return rpc_client_method( - auth_ctx.ctx(), - 'run_action', - **kwargs + res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs) + + # TODO(rakhmerov): It doesn't seem a good approach since we have + # a serializer for Result class. A better solution would be to + # use a composite serializer that dispatches serialization and + # deserialization to concrete serializers depending on object + # type. + + return ( + wf_utils.Result(data=res['data'], error=res['error']) + if res else None ) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 996f9f75..c566dd85 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -1,5 +1,6 @@ # Copyright 2015 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. +# Copyright 2016 - Nokia Networks. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,28 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import operator - from oslo_log import log as logging +import traceback as tb -from mistral.db.v2 import api as db_api -from mistral.db.v2.sqlalchemy import models -from mistral.engine import action_handler -from mistral.engine import policies -from mistral.engine import rpc -from mistral.engine import utils as e_utils +from mistral.engine import tasks +from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc -from mistral import expressions as expr -from mistral.services import executions as wf_ex_service -from mistral.services import scheduler -from mistral import utils -from mistral.utils import wf_trace from mistral.workbook import parser as spec_parser -from mistral.workflow import data_flow +from mistral.workflow import commands as wf_cmds from mistral.workflow import states -from mistral.workflow import utils as wf_utils -from mistral.workflow import with_items """Responsible for running tasks and handling results.""" @@ -42,548 +30,145 @@ from mistral.workflow import with_items LOG = logging.getLogger(__name__) -def run_existing_task(task_ex_id, reset=True): - """This function runs existing task execution. +def run_task(wf_cmd): + """Runs workflow task. - It is needed mostly by scheduler. - - :param task_ex_id: Task execution id. - :param reset: Reset action executions for the task. + :param wf_cmd: Workflow command. """ - task_ex = db_api.get_task_execution(task_ex_id) - task_spec = spec_parser.get_task_spec(task_ex.spec) - wf_def = db_api.get_workflow_definition(task_ex.workflow_name) - wf_spec = spec_parser.get_workflow_spec(wf_def.spec) - # Throw exception if the existing task already succeeded. - if task_ex.state == states.SUCCESS: - raise exc.EngineException( - 'Rerunning existing task that already succeeded is not supported.' - ) + task = _build_task_from_command(wf_cmd) - # Exit if the existing task failed and reset is not instructed. - # For a with-items task without reset, re-running the existing - # task will re-run the failed and unstarted items. - if (task_ex.state == states.ERROR and not reset and - not task_spec.get_with_items()): - return task_ex - - # Reset nested executions only if task is not already RUNNING. - if task_ex.state != states.RUNNING: - # Reset state of processed task and related action executions. - if reset: - action_exs = task_ex.executions - else: - action_exs = db_api.get_action_executions( - task_execution_id=task_ex.id, - state=states.ERROR, - accepted=True - ) - - for action_ex in action_exs: - action_ex.accepted = False - - # Explicitly change task state to RUNNING. - set_task_state(task_ex, states.RUNNING, None, processed=False) - - _run_existing_task(task_ex, task_spec, wf_spec) - - return task_ex - - -def _run_existing_task(task_ex, task_spec, wf_spec): try: - input_dicts = _get_input_dictionaries( - wf_spec, - task_ex, - task_spec, - task_ex.in_context - ) + task.run() except exc.MistralException as e: - LOG.error( - 'An error while calculating task action inputs' - ' [task_execution_id=%s]: %s', - task_ex.id, e + wf_ex = wf_cmd.wf_ex + task_spec = wf_cmd.task_spec + + msg = ( + "Failed to run task [wf=%s, task=%s]: %s\n%s" % + (wf_ex, task_spec.get_name(), e, tb.format_exc()) ) - set_task_state(task_ex, states.ERROR, str(e)) + LOG.error(msg) + + task.set_state(states.ERROR, msg) + + wf_handler.fail_workflow(wf_ex, msg) return - # In some cases we can have no input, e.g. in case of 'with-items'. - if input_dicts: - for index, input_d in input_dicts: - _run_action_or_workflow( - task_ex, - task_spec, - input_d, - index, - wf_spec - ) - else: - _schedule_noop_action(task_ex, task_spec, wf_spec) + if task.is_completed(): + wf_handler.check_workflow_completion(wf_cmd.wf_ex) -def defer_task(wf_cmd): - """Defers a task""" - ctx = wf_cmd.ctx - wf_ex = wf_cmd.wf_ex - task_spec = wf_cmd.task_spec +def on_action_complete(action_ex): + """Handles action completion event. - if wf_utils.find_task_executions_by_spec(wf_ex, task_spec): - return None - - return _create_task_execution( - wf_ex, - task_spec, - ctx, - state=states.WAITING - ) - - -def run_new_task(wf_cmd, wf_spec): - """Runs a task.""" - ctx = wf_cmd.ctx - wf_ex = wf_cmd.wf_ex - task_spec = wf_cmd.task_spec - - # NOTE(xylan): Need to think how to get rid of this weird judgment to keep - # it more consistent with the function name. - task_ex = wf_utils.find_task_execution_with_state( - wf_ex, - task_spec, - states.WAITING - ) - - if task_ex: - set_task_state(task_ex, states.RUNNING, None) - task_ex.in_context = ctx - else: - task_ex = _create_task_execution(wf_ex, task_spec, ctx) - - LOG.debug( - 'Starting workflow task [workflow=%s, task_spec=%s, init_state=%s]' % - (wf_ex.name, task_spec, task_ex.state) - ) - - # TODO(rakhmerov): 'concurrency' policy should keep a number of running - # actions/workflows under control so it can't be implemented if it runs - # before any action executions are created. - before_task_start(task_ex, task_spec, wf_spec) - - # Policies could possibly change task state. - if task_ex.state != states.RUNNING: - return task_ex - - _run_existing_task(task_ex, task_spec, wf_spec) - - return task_ex - - -def on_action_complete(action_ex, wf_spec, result): - """Handles event of action result arrival. - - Given action result this method changes corresponding task execution - object. This method must never be called for the case of individual - action which is not associated with any tasks. - - :param action_ex: Action execution objects the result belongs to. - :param wf_spec: Workflow specification. - :param result: Task action/workflow output wrapped into - mistral.workflow.utils.Result instance. - :return Task execution object. + :param action_ex: Action execution. """ task_ex = action_ex.task_execution - # Ignore if action already completed. - if (states.is_completed(action_ex.state) and not - isinstance(action_ex, models.WorkflowExecution)): - return task_ex + if not task_ex: + return - task_spec = wf_spec.get_tasks()[task_ex.name] + task_spec = spec_parser.get_task_spec(task_ex.spec) - try: - result = action_handler.transform_result(result, task_ex, task_spec) - except exc.YaqlEvaluationException as e: - err_msg = str(e) - - LOG.error( - 'YAQL error while transforming action result' - ' [action_execution_id=%s, result=%s]: %s', - action_ex.id, result, err_msg - ) - - result = wf_utils.Result(error=err_msg) - - # Ignore workflow executions because they're handled during - # workflow completion. - if not isinstance(action_ex, models.WorkflowExecution): - action_handler.store_action_result(action_ex, result) - - if result.is_success(): - task_state = states.SUCCESS - task_state_info = None - else: - task_state = states.ERROR - task_state_info = result.error - - if not task_spec.get_with_items(): - _complete_task(task_ex, task_spec, task_state, task_state_info) - else: - with_items.increase_capacity(task_ex) - - if with_items.is_completed(task_ex): - _complete_task( - task_ex, - task_spec, - with_items.get_final_state(task_ex), - task_state_info - ) - - return task_ex - - -def _create_task_execution(wf_ex, task_spec, ctx, state=states.RUNNING): - task_ex = db_api.create_task_execution({ - 'name': task_spec.get_name(), - 'workflow_execution_id': wf_ex.id, - 'workflow_name': wf_ex.workflow_name, - 'workflow_id': wf_ex.workflow_id, - 'state': state, - 'spec': task_spec.to_dict(), - 'in_context': ctx, - 'published': {}, - 'runtime_context': {}, - 'project_id': wf_ex.project_id - }) - - # Add to collection explicitly so that it's in a proper - # state within the current session. - wf_ex.task_executions.append(task_ex) - - return task_ex - - -def before_task_start(task_ex, task_spec, wf_spec): - for p in policies.build_policies(task_spec.get_policies(), wf_spec): - p.before_task_start(task_ex, task_spec) - - -def after_task_complete(task_ex, task_spec, wf_spec): - for p in policies.build_policies(task_spec.get_policies(), wf_spec): - p.after_task_complete(task_ex, task_spec) - - -def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx): - """Calculates a collection of inputs for task action/workflow. - - If the given task is not configured as 'with-items' then return list - will consist of one dictionary containing input that task action/workflow - should run with. - In case of 'with-items' the result list will contain input dictionaries - for all 'with-items' iterations correspondingly. - - :return the list of tuples containing indexes - and the corresponding input dict. - """ - - if not task_spec.get_with_items(): - input_dict = _get_workflow_or_action_input( - wf_spec, - task_ex, - task_spec, - ctx - ) - - return enumerate([input_dict]) - else: - return _get_with_items_input(wf_spec, task_ex, task_spec, ctx) - - -def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx): - if task_spec.get_action_name(): - return _get_action_input( - wf_spec, - task_ex, - task_spec, - ctx - ) - elif task_spec.get_workflow_name(): - return _get_workflow_input(task_spec, ctx) - else: - raise RuntimeError('Must never happen.') - - -def _get_with_items_input(wf_spec, task_ex, task_spec, ctx): - """Calculate input array for separating each action input. - - Example: - DSL: - with_items: - - itemX in <% $.arrayI %> - - itemY in <% $.arrayJ %> - - Assume arrayI = [1, 2], arrayJ = ['a', 'b']. - with_items_input = { - "itemX": [1, 2], - "itemY": ['a', 'b'] - } - - Then we get separated input: - inputs_per_item = [ - {'itemX': 1, 'itemY': 'a'}, - {'itemX': 2, 'itemY': 'b'} - ] - - :return: the list of tuples containing indexes - and the corresponding input dict. - """ - with_items_inputs = expr.evaluate_recursively( - task_spec.get_with_items(), ctx - ) - - with_items.validate_input(with_items_inputs) - - inputs_per_item = [] - - for key, value in with_items_inputs.items(): - for index, item in enumerate(value): - iter_context = {key: item} - - if index >= len(inputs_per_item): - inputs_per_item.append(iter_context) - else: - inputs_per_item[index].update(iter_context) - - action_inputs = [] - - for item_input in inputs_per_item: - new_ctx = utils.merge_dicts(item_input, ctx) - - action_inputs.append(_get_workflow_or_action_input( - wf_spec, task_ex, task_spec, new_ctx - )) - - with_items.prepare_runtime_context(task_ex, task_spec, action_inputs) - - indices = with_items.get_indices_for_loop(task_ex) - with_items.decrease_capacity(task_ex, len(indices)) - - if indices: - current_inputs = operator.itemgetter(*indices)(action_inputs) - - return zip( - indices, - current_inputs if isinstance(current_inputs, tuple) - else [current_inputs] - ) - - return [] - - -def _get_action_input(wf_spec, task_ex, task_spec, ctx): - input_dict = expr.evaluate_recursively(task_spec.get_input(), ctx) - - action_spec_name = task_spec.get_action_name() - - input_dict = utils.merge_dicts( - input_dict, - _get_action_defaults(task_ex, task_spec), - overwrite=False - ) - - return action_handler.get_action_input( - action_spec_name, - input_dict, - task_ex.workflow_name, - wf_spec - ) - - -def _get_workflow_input(task_spec, ctx): - return expr.evaluate_recursively(task_spec.get_input(), ctx) - - -def _run_action_or_workflow(task_ex, task_spec, input_dict, index, wf_spec): - t_name = task_ex.name - - if task_spec.get_action_name(): - wf_trace.info( - task_ex, - "Task '%s' is RUNNING [action_name = %s]" % - (t_name, task_spec.get_action_name()) - ) - - _schedule_run_action(task_ex, task_spec, input_dict, index, wf_spec) - elif task_spec.get_workflow_name(): - wf_trace.info( - task_ex, - "Task '%s' is RUNNING [workflow_name = %s]" % - (t_name, task_spec.get_workflow_name())) - - _schedule_run_workflow(task_ex, task_spec, input_dict, index, wf_spec) - - -def _get_action_defaults(task_ex, task_spec): - actions = task_ex.in_context.get('__env', {}).get('__actions', {}) - - return actions.get(task_spec.get_action_name(), {}) - - -def _schedule_run_action(task_ex, task_spec, action_input, index, wf_spec): - action_spec_name = task_spec.get_action_name() - - action_def = action_handler.resolve_definition( - action_spec_name, - task_ex, - wf_spec - ) - - action_ex = action_handler.create_action_execution( - action_def, - action_input, - task_ex, - index - ) - - target = expr.evaluate_recursively( - task_spec.get_target(), - utils.merge_dicts( - copy.deepcopy(action_input), - copy.deepcopy(task_ex.in_context) - ) - ) - - scheduler.schedule_call( - None, - 'mistral.engine.action_handler.run_existing_action', - 0, - action_ex_id=action_ex.id, - target=target - ) - - -def _schedule_noop_action(task_ex, task_spec, wf_spec): wf_ex = task_ex.workflow_execution - action_def = action_handler.resolve_action_definition( - 'std.noop', - wf_ex.workflow_name, - wf_spec.get_name() + task = _create_task( + wf_ex, + task_spec, + task_ex.in_context, + task_ex ) - action_ex = action_handler.create_action_execution(action_def, {}, task_ex) - - target = expr.evaluate_recursively( - task_spec.get_target(), - task_ex.in_context - ) - - scheduler.schedule_call( - None, - 'mistral.engine.action_handler.run_existing_action', - 0, - action_ex_id=action_ex.id, - target=target - ) - - -def _schedule_run_workflow(task_ex, task_spec, wf_input, index, - parent_wf_spec): - parent_wf_ex = task_ex.workflow_execution - - wf_spec_name = task_spec.get_workflow_name() - - wf_def = e_utils.resolve_workflow_definition( - parent_wf_ex.workflow_name, - parent_wf_spec.get_name(), - wf_spec_name - ) - - wf_spec = spec_parser.get_workflow_spec(wf_def.spec) - - wf_params = { - 'task_execution_id': task_ex.id, - 'with_items_index': index - } - - if 'env' in parent_wf_ex.params: - wf_params['env'] = parent_wf_ex.params['env'] - - for k, v in list(wf_input.items()): - if k not in wf_spec.get_input(): - wf_params[k] = v - del wf_input[k] - - wf_ex_id, _ = wf_ex_service.create_workflow_execution( - wf_def.name, - wf_input, - "sub-workflow execution", - wf_params, - wf_spec - ) - - scheduler.schedule_call( - None, - 'mistral.engine.task_handler.resume_workflow', - 0, - wf_ex_id=wf_ex_id, - env=None - ) - - -def resume_workflow(wf_ex_id, env): - rpc.get_engine_client().resume_workflow(wf_ex_id, env=env) - - -def _complete_task(task_ex, task_spec, state, state_info=None): - # Ignore if task already completed. - if states.is_completed(task_ex.state): - return [] - - set_task_state(task_ex, state, state_info) - try: - data_flow.publish_variables(task_ex, task_spec) + task.on_action_complete(action_ex) except exc.MistralException as e: - LOG.error( - 'An error while publishing task variables' - ' [task_execution_id=%s]: %s', - task_ex.id, str(e) - ) + task_ex = action_ex.task_execution + wf_ex = task_ex.workflow_execution - set_task_state(task_ex, states.ERROR, str(e)) + msg = ("Failed to handle action completion [wf=%s, task=%s," + " action=%s]: %s\n%s" % + (wf_ex.name, task_ex.name, action_ex.name, e, tb.format_exc())) - if not task_spec.get_keep_result(): - data_flow.destroy_task_result(task_ex) + LOG.error(msg) + + task.set_state(states.ERROR, msg) + + wf_handler.fail_workflow(wf_ex, msg) + + return + + if task.is_completed(): + wf_handler.check_workflow_completion(wf_ex) -def set_task_state(task_ex, state, state_info, processed=None): - wf_trace.info( +def fail_task(task_ex, msg): + task = _build_task_from_execution(task_ex) + + task.set_state(states.ERROR, msg) + + wf_handler.fail_workflow(task_ex.workflow_execution, msg) + + +def continue_task(task_ex): + task = _build_task_from_execution(task_ex) + + # TODO(rakhmerov): Error handling. + task.run() + + if task.is_completed(): + wf_handler.check_workflow_completion(task_ex.workflow_execution) + + +def complete_task(task_ex, state, state_info): + task = _build_task_from_execution(task_ex) + + # TODO(rakhmerov): Error handling. + task.complete(state, state_info) + + if task.is_completed(): + wf_handler.check_workflow_completion(task_ex.workflow_execution) + + +def _build_task_from_execution(task_ex, task_spec=None): + return _create_task( task_ex.workflow_execution, - "Task execution '%s' [%s -> %s]" % - (task_ex.name, task_ex.state, state) + task_spec or spec_parser.get_task_spec(task_ex.spec), + task_ex.in_context, + task_ex ) - task_ex.state = state - task_ex.state_info = state_info - if processed is not None: - task_ex.processed = processed +def _build_task_from_command(cmd): + if isinstance(cmd, wf_cmds.RunExistingTask): + task = _create_task( + cmd.wf_ex, + spec_parser.get_task_spec(cmd.task_ex.spec), + cmd.ctx, + cmd.task_ex + ) + + if cmd.reset: + task.reset() + + return task + + if isinstance(cmd, wf_cmds.RunTask): + task = _create_task(cmd.wf_ex, cmd.task_spec, cmd.ctx) + + if cmd.is_waiting(): + task.defer() + + return task + + raise exc.MistralError('Unsupported workflow command: %s' % cmd) -def is_task_completed(task_ex, task_spec): +def _create_task(wf_ex, task_spec, ctx, task_ex=None): if task_spec.get_with_items(): - return with_items.is_completed(task_ex) + return tasks.WithItemsTask(wf_ex, task_spec, ctx, task_ex) - return states.is_completed(task_ex.state) - - -def need_to_continue(task_ex, task_spec): - # For now continue is available only for with-items. - if task_spec.get_with_items(): - return (with_items.has_more_iterations(task_ex) - and with_items.get_concurrency(task_ex)) - - return False + return tasks.RegularTask(wf_ex, task_spec, ctx, task_ex) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py new file mode 100644 index 00000000..0197fada --- /dev/null +++ b/mistral/engine/tasks.py @@ -0,0 +1,456 @@ +# Copyright 2016 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import copy +import operator +from oslo_log import log as logging +import six + +from mistral.db.v2 import api as db_api +from mistral.engine import actions +from mistral.engine import dispatcher +from mistral.engine import policies +from mistral import exceptions as exc +from mistral import expressions as expr +from mistral import utils +from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import base as wf_base +from mistral.workflow import data_flow +from mistral.workflow import states +from mistral.workflow import utils as wf_utils +from mistral.workflow import with_items + + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class Task(object): + """Task. + + Represents a workflow task and defines interface that can be used by + Mistral engine or its components in order to manipulate with tasks. + """ + + def __init__(self, wf_ex, task_spec, ctx, task_ex=None): + self.wf_ex = wf_ex + self.task_spec = task_spec + self.ctx = ctx + self.task_ex = task_ex + self.wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + self.waiting = False + self.reset_flag = False + + @abc.abstractmethod + def on_action_complete(self, action_ex): + """Handle action completion. + + :param action_ex: Action execution. + """ + raise NotImplementedError + + @abc.abstractmethod + def run(self): + """Runs task.""" + raise NotImplementedError + + def defer(self): + """Defers task. + + This methods finds task execution or creates new and puts task + to a waiting state. + """ + + if not self.task_ex: + self.task_ex = wf_utils.find_task_executions_by_spec( + self.wf_ex, + self.task_spec + ) + + if not self.task_ex: + self._create_task_execution() + + self.set_state(states.WAITING, 'Task execution is deferred.') + + self.waiting = True + + def reset(self): + self.reset_flag = True + + def set_state(self, state, state_info, processed=None): + """Sets task state without executing post completion logic. + + :param state: New task state. + :param state_info: New state information (i.e. error message). + :param processed: New "processed" flag value. + """ + + wf_trace.info( + self.task_ex.workflow_execution, + "Task execution '%s' [%s -> %s]: %s" % + (self.task_ex.id, self.task_ex.state, state, state_info) + ) + + self.task_ex.state = state + self.task_ex.state_info = state_info + + if processed is not None: + self.task_ex.processed = processed + + def complete(self, state, state_info=None): + """Complete task and set specified state. + + Method sets specified task state and runs all necessary post + completion logic such as publishing workflow variables and + scheduling new workflow commands. + + :param state: New task state. + :param state_info: New state information (i.e. error message). + """ + + # Ignore if task already completed. + if states.is_completed(self.task_ex.state): + return + + self.set_state(state, state_info) + + data_flow.publish_variables(self.task_ex, self.task_spec) + + if not self.task_spec.get_keep_result(): + # Destroy task result. + for ex in self.task_ex.executions: + if hasattr(ex, 'output'): + ex.output = {} + + self._after_task_complete() + + # Ignore DELAYED state. + if self.task_ex.state == states.RUNNING_DELAYED: + return + + # If workflow is paused we shouldn't schedule new commands + # and mark task as processed. + if states.is_paused(self.wf_ex.state): + return + + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) + + # Calculate commands to process next. + cmds = wf_ctrl.continue_workflow() + + # Mark task as processed after all decisions have been made + # upon its completion. + self.task_ex.processed = True + + dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) + + def _before_task_start(self): + policies_spec = self.task_spec.get_policies() + + for p in policies.build_policies(policies_spec, self.wf_spec): + p.before_task_start(self.task_ex, self.task_spec) + + def _after_task_complete(self): + policies_spec = self.task_spec.get_policies() + + for p in policies.build_policies(policies_spec, self.wf_spec): + p.after_task_complete(self.task_ex, self.task_spec) + + def _create_task_execution(self, state=states.RUNNING): + self.task_ex = db_api.create_task_execution({ + 'name': self.task_spec.get_name(), + 'workflow_execution_id': self.wf_ex.id, + 'workflow_name': self.wf_ex.workflow_name, + 'workflow_id': self.wf_ex.workflow_id, + 'state': state, + 'spec': self.task_spec.to_dict(), + 'in_context': self.ctx, + 'published': {}, + 'runtime_context': {}, + 'project_id': self.wf_ex.project_id + }) + + # Add to collection explicitly so that it's in a proper + # state within the current session. + self.wf_ex.task_executions.append(self.task_ex) + + def _get_action_defaults(self): + action_name = self.task_spec.get_action_name() + + if not action_name: + return {} + + env = self.task_ex.in_context.get('__env', {}) + + return env.get('__actions', {}).get(action_name, {}) + + +class RegularTask(Task): + """Regular task. + + Takes care of processing regular tasks with one action. + """ + + def on_action_complete(self, action_ex): + state = action_ex.state + # TODO(rakhmerov): Here we can define more informative messages + # cases when action is successful and when it's not. For example, + # in state_info we can specify the cause action. + state_info = (None if state == states.SUCCESS + else action_ex.output.get('result')) + + self.complete(state, state_info) + + def is_completed(self): + return self.task_ex and states.is_completed(self.task_ex.state) + + def run(self): + if not self.task_ex: + self._run_new() + else: + self._run_existing() + + def _run_new(self): + # NOTE(xylan): Need to think how to get rid of this weird judgment + # to keep it more consistent with the function name. + self.task_ex = wf_utils.find_task_execution_with_state( + self.wf_ex, + self.task_spec, + states.WAITING + ) + + if self.task_ex: + self.set_state(states.RUNNING, None) + + self.task_ex.in_context = self.ctx + else: + self._create_task_execution() + + LOG.debug( + 'Starting task [workflow=%s, task_spec=%s, init_state=%s]' % + (self.wf_ex.name, self.task_spec, self.task_ex.state) + ) + + self._before_task_start() + + # Policies could possibly change task state. + if self.task_ex.state != states.RUNNING: + return + + self._schedule_actions() + + def _run_existing(self): + if self.waiting: + return + + # Explicitly change task state to RUNNING. + # Throw exception if the existing task already succeeded. + if self.task_ex.state == states.SUCCESS: + raise exc.MistralError( + 'Rerunning succeeded tasks is not supported.' + ) + + self.set_state(states.RUNNING, None, processed=False) + + self._reset_actions() + + self._schedule_actions() + + def _reset_actions(self): + """Resets task state. + + Depending on task type this method may reset task state. For example, + delete all task actions etc. + """ + + # Reset state of processed task and related action executions. + if self.reset_flag: + action_exs = self.task_ex.executions + else: + action_exs = db_api.get_action_executions( + task_execution_id=self.task_ex.id, + state=states.ERROR, + accepted=True + ) + + for action_ex in action_exs: + action_ex.accepted = False + + def _schedule_actions(self): + # Regular task schedules just one action. + input_dict = self._get_action_input() + target = self._get_target(input_dict) + + action = self._build_action() + + action.validate_input(input_dict) + + action.schedule(input_dict, target) + + def _get_target(self, input_dict): + return expr.evaluate_recursively( + self.task_spec.get_target(), + utils.merge_dicts( + copy.deepcopy(input_dict), + copy.deepcopy(self.ctx) + ) + ) + + def _get_action_input(self, ctx=None): + ctx = ctx or self.ctx + + input_dict = expr.evaluate_recursively(self.task_spec.get_input(), ctx) + + return utils.merge_dicts( + input_dict, + self._get_action_defaults(), + overwrite=False + ) + + def _build_action(self): + action_name = self.task_spec.get_action_name() + wf_name = self.task_spec.get_workflow_name() + + if wf_name: + return actions.WorkflowAction(wf_name, task_ex=self.task_ex) + + if not action_name: + action_name = 'std.noop' + + action_def = actions.resolve_action_definition( + action_name, + self.wf_ex.name, + self.wf_spec.get_name() + ) + + if action_def.spec: + return actions.AdHocAction(action_def, task_ex=self.task_ex) + + return actions.PythonAction(action_def, task_ex=self.task_ex) + + +class WithItemsTask(RegularTask): + """With-items task. + + Takes care of processing "with-items" tasks. + """ + + def on_action_complete(self, action_ex): + state = action_ex.state + # TODO(rakhmerov): Here we can define more informative messages + # cases when action is successful and when it's not. For example, + # in state_info we can specify the cause action. + state_info = (None if state == states.SUCCESS + else action_ex.output.get('result')) + + with_items.increase_capacity(self.task_ex) + + if with_items.is_completed(self.task_ex): + self.complete( + with_items.get_final_state(self.task_ex), + state_info + ) + + return + + if (with_items.has_more_iterations(self.task_ex) + and with_items.get_concurrency(self.task_ex)): + self._schedule_actions() + + def _schedule_actions(self): + input_dicts = self._get_with_items_input() + + if not input_dicts: + self.complete(states.SUCCESS) + + return + + for idx, input_dict in input_dicts: + target = self._get_target(input_dict) + + action = self._build_action() + + action.schedule(input_dict, target, index=idx) + + def _get_with_items_input(self): + """Calculate input array for separating each action input. + + Example: + DSL: + with_items: + - itemX in <% $.arrayI %> + - itemY in <% $.arrayJ %> + + Assume arrayI = [1, 2], arrayJ = ['a', 'b']. + with_items_input = { + "itemX": [1, 2], + "itemY": ['a', 'b'] + } + + Then we get separated input: + inputs_per_item = [ + {'itemX': 1, 'itemY': 'a'}, + {'itemX': 2, 'itemY': 'b'} + ] + + :return: the list of tuples containing indexes + and the corresponding input dict. + """ + with_items_inputs = expr.evaluate_recursively( + self.task_spec.get_with_items(), + self.ctx + ) + + with_items.validate_input(with_items_inputs) + + inputs_per_item = [] + + for key, value in with_items_inputs.items(): + for index, item in enumerate(value): + iter_context = {key: item} + + if index >= len(inputs_per_item): + inputs_per_item.append(iter_context) + else: + inputs_per_item[index].update(iter_context) + + action_inputs = [] + + for item_input in inputs_per_item: + new_ctx = utils.merge_dicts(item_input, self.ctx) + + action_inputs.append(self._get_action_input(new_ctx)) + + with_items.prepare_runtime_context( + self.task_ex, + self.task_spec, + action_inputs + ) + + indices = with_items.get_indices_for_loop(self.task_ex) + + with_items.decrease_capacity(self.task_ex, len(indices)) + + if indices: + current_inputs = operator.itemgetter(*indices)(action_inputs) + + return zip( + indices, + current_inputs if isinstance(current_inputs, tuple) + else [current_inputs] + ) + + return [] diff --git a/mistral/engine/utils.py b/mistral/engine/utils.py index 8d910c78..556e3889 100644 --- a/mistral/engine/utils.py +++ b/mistral/engine/utils.py @@ -21,6 +21,9 @@ from mistral import exceptions as exc from mistral import utils +# TODO(rakhmerov): This method is too abstract, validation rules may vary +# depending on object type (action, wf), it's not clear what it can be +# applied to. def validate_input(definition, input, spec=None): input_param_names = copy.deepcopy(list((input or {}).keys())) missing_param_names = [] diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 2255677b..41c0f6f3 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -1,4 +1,4 @@ -# Copyright 2015 - Mirantis, Inc. +# Copyright 2016 - Nokia Networks. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,18 +12,89 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_config import cfg +from oslo_log import log as logging + from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import rpc -from mistral.engine import task_handler from mistral import exceptions as exc from mistral.services import scheduler +from mistral import utils from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import base as wf_base from mistral.workflow import data_flow from mistral.workflow import states from mistral.workflow import utils as wf_utils +LOG = logging.getLogger(__name__) + + +def on_task_complete(task_ex): + wf_ex = task_ex.workflow_execution + + check_workflow_completion(wf_ex) + + +def check_workflow_completion(wf_ex): + if states.is_paused_or_completed(wf_ex.state): + return + + # Workflow is not completed if there are any incomplete task + # executions that are not in WAITING state. If all incomplete + # tasks are waiting and there are no unhandled errors, then these + # tasks will not reach completion. In this case, mark the + # workflow complete. + incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex) + + if any(not states.is_waiting(t.state) for t in incomplete_tasks): + return + + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + + wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) + + if wf_ctrl.all_errors_handled(): + succeed_workflow( + wf_ex, + wf_ctrl.evaluate_workflow_final_context(), + wf_spec + ) + else: + state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex) + + fail_workflow(wf_ex, state_info) + + +def stop_workflow(wf_ex, state, message=None): + if state == states.SUCCESS: + wf_ctrl = wf_base.get_controller(wf_ex) + + final_context = {} + + try: + final_context = wf_ctrl.evaluate_workflow_final_context() + except Exception as e: + LOG.warning( + 'Failed to get final context for %s: %s' % (wf_ex, e) + ) + + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + + return succeed_workflow( + wf_ex, + final_context, + wf_spec, + message + ) + elif state == states.ERROR: + return fail_workflow(wf_ex, message) + + return wf_ex + + def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None): # Fail workflow if output is not successfully evaluated. try: @@ -35,7 +106,7 @@ def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None): return fail_workflow(wf_ex, e.message) # Set workflow execution to success until after output is evaluated. - set_execution_state(wf_ex, states.SUCCESS, state_info) + set_workflow_state(wf_ex, states.SUCCESS, state_info) if wf_ex.task_execution_id: _schedule_send_result_to_parent_workflow(wf_ex) @@ -47,7 +118,16 @@ def fail_workflow(wf_ex, state_info): if states.is_paused_or_completed(wf_ex.state): return wf_ex - set_execution_state(wf_ex, states.ERROR, state_info) + set_workflow_state(wf_ex, states.ERROR, state_info) + + # When we set an ERROR state we should safely set output value getting + # w/o exceptions due to field size limitations. + state_info = utils.cut_by_kb( + state_info, + cfg.CONF.engine.execution_field_size_limit_kb + ) + + wf_ex.output = {'result': state_info} if wf_ex.task_execution_id: _schedule_send_result_to_parent_workflow(wf_ex) @@ -84,7 +164,9 @@ def send_result_to_parent_workflow(wf_ex_id): ) -def set_execution_state(wf_ex, state, state_info=None, set_upstream=False): +# TODO(rakhmerov): Should not be public, should be encapsulated inside Workflow +# abstraction. +def set_workflow_state(wf_ex, state, state_info=None, set_upstream=False): cur_state = wf_ex.state if states.is_valid_transition(cur_state, state): @@ -109,24 +191,28 @@ def set_execution_state(wf_ex, state, state_info=None, set_upstream=False): # If specified, then recursively set the state of the parent workflow # executions to the same state. Only changing state to RUNNING is # supported. + # TODO(rakhmerov): I don't like this hardcoded special case. It's + # used only to continue the workflow (rerun) but at the first glance + # seems like a generic behavior. Need to handle it differently. if set_upstream and state == states.RUNNING and wf_ex.task_execution_id: task_ex = db_api.get_task_execution(wf_ex.task_execution_id) parent_wf_ex = lock_workflow_execution(task_ex.workflow_execution_id) - set_execution_state( + set_workflow_state( parent_wf_ex, state, state_info=state_info, set_upstream=set_upstream ) - task_handler.set_task_state( - task_ex, - state, - state_info=None, - processed=False - ) + # TODO(rakhmerov): How do we need to set task state properly? + # It doesn't seem right to intervene into the parent workflow + # internals. We just need to communicate changes back to parent + # worklfow and it should do what's needed itself. + task_ex.state = state + task_ex.state_info = None + task_ex.processed = False def lock_workflow_execution(wf_ex_id): diff --git a/mistral/services/executions.py b/mistral/services/executions.py index 89aae84b..20d849ec 100644 --- a/mistral/services/executions.py +++ b/mistral/services/executions.py @@ -59,7 +59,7 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): 'context': copy.deepcopy(wf_input) or {}, 'task_execution_id': params.get('task_execution_id'), 'runtime_context': { - 'with_items_index': params.get('with_items_index', 0) + 'index': params.get('index', 0) }, }) @@ -92,4 +92,4 @@ def create_workflow_execution(wf_identifier, wf_input, description, params, wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier) - return wf_ex.id, wf_spec + return wf_ex, wf_spec diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index ec60742f..cc00f3dc 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -162,9 +162,9 @@ class CallScheduler(periodic_task.PeriodicTasks): ) for (target_auth_context, target_method, method_args) in delayed_calls: - + # TODO(rakhmerov): https://bugs.launchpad.net/mistral/+bug/1484521 # Transaction is needed here because some of the - # target_method can use the DB + # target_method can use the DB. with db_api.transaction(): try: # Set the correct context for the method. @@ -175,9 +175,7 @@ class CallScheduler(periodic_task.PeriodicTasks): # Call the method. target_method(**method_args) except Exception as e: - LOG.error( - "Delayed call failed [exception=%s]", e - ) + LOG.exception("Delayed call failed [exception=%s]: %s", e) finally: # Remove context. context.set_ctx(None) diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 97597836..45619f00 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -31,7 +31,7 @@ LOG = logging.getLogger(__name__) # Default delay and timeout in seconds for await_xxx() functions. DEFAULT_DELAY = 1 -DEFAULT_TIMEOUT = 60 +DEFAULT_TIMEOUT = 20 def launch_engine_server(transport, engine): @@ -135,18 +135,38 @@ class EngineTestCase(base.DbTestCase): wf_execs = db_api.get_workflow_executions() - for wf_ex in wf_execs: + for w in wf_execs: print( - "\n%s [state=%s, output=%s]" % - (wf_ex.name, wf_ex.state, wf_ex.output) + "\n%s [state=%s, state_info=%s, output=%s]" % + (w.name, w.state, w.state_info, w.output) ) - for t_ex in wf_ex.task_executions: + for t in w.task_executions: print( - "\t%s [id=%s, state=%s, published=%s]" % - (t_ex.name, t_ex.id, t_ex.state, t_ex.published) + "\t%s [id=%s, state=%s, state_info=%s, processed=%s," + " published=%s]" % + (t.name, + t.id, + t.state, + t.state_info, + t.processed, + t.published) ) + a_execs = db_api.get_action_executions(task_execution_id=t.id) + + for a in a_execs: + print( + "\t\t%s [id=%s, state=%s, state_info=%s, accepted=%s," + " output=%s]" % + (a.name, + a.id, + a.state, + a.state_info, + a.accepted, + a.output) + ) + # Various methods for abstract execution objects. def is_execution_in_state(self, ex_id, state): diff --git a/mistral/tests/unit/engine/test_dataflow.py b/mistral/tests/unit/engine/test_dataflow.py index 5c0c907c..230c8417 100644 --- a/mistral/tests/unit/engine/test_dataflow.py +++ b/mistral/tests/unit/engine/test_dataflow.py @@ -486,10 +486,10 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') + task1 = self._assert_single_item(wf_ex.task_executions, name='task1') result = data_flow.get_task_execution_result(task1) + self.assertListEqual([], result) @@ -512,7 +512,7 @@ class DataFlowTest(test_base.BaseTest): name='my_action', output={'result': 1}, accepted=True, - runtime_context={'with_items_index': 0} + runtime_context={'index': 0} )] with mock.patch.object(db_api, 'get_action_executions', @@ -523,14 +523,14 @@ class DataFlowTest(test_base.BaseTest): name='my_action', output={'result': 1}, accepted=True, - runtime_context={'with_items_index': 0} + runtime_context={'index': 0} )) action_exs.append(models.ActionExecution( name='my_action', output={'result': 1}, accepted=False, - runtime_context={'with_items_index': 0} + runtime_context={'index': 0} )) with mock.patch.object(db_api, 'get_action_executions', diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 07c673ae..9bf0a7a0 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -143,6 +143,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): """ wf_service.create_workflows(wf_text) + wf_ex = self.engine.start_workflow('wf', {}) self.await_execution_success(wf_ex.id) @@ -411,6 +412,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): wf: input: - var + tasks: task1: action: std.echo output=<% $.var + $.var2 %> @@ -484,7 +486,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): name='task1' ) - self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.ERROR, task_1_ex.state) # Assert that there is only one action execution and it's SUCCESS. task_1_action_exs = db_api.get_action_executions( @@ -550,7 +552,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): name='task1' ) - self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.ERROR, task_1_ex.state) task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index d9ed0d75..658294b1 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -460,18 +460,27 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.ERROR, wf_ex.state) self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(2, len(wf_ex.task_executions)) - task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') - task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') + task_execs = wf_ex.task_executions + + self.assertEqual(2, len(task_execs)) + + task_1_ex = self._assert_single_item( + task_execs, + name='t1', + state=states.SUCCESS + ) + task_2_ex = self._assert_single_item( + task_execs, + name='t2', + state=states.ERROR + ) - self.assertEqual(states.SUCCESS, task_1_ex.state) - self.assertEqual(states.ERROR, task_2_ex.state) self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. e = self.assertRaises( - exc.EngineException, + exc.MistralError, self.engine.rerun_workflow, wf_ex.id, task_1_ex.id @@ -505,9 +514,12 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.ERROR, wf_ex.state) self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(1, len(wf_ex.task_executions)) - task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') + task_execs = wf_ex.task_executions + + self.assertEqual(1, len(task_execs)) + + task_1_ex = self._assert_single_item(task_execs, name='t1') self.assertEqual(states.ERROR, task_1_ex.state) self.assertIsNotNone(task_1_ex.state_info) @@ -532,10 +544,13 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, wf_ex.state) self.assertIsNone(wf_ex.state_info) - self.assertEqual(2, len(wf_ex.task_executions)) - task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') - task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') + task_execs = wf_ex.task_executions + + self.assertEqual(2, len(task_execs)) + + task_1_ex = self._assert_single_item(task_execs, name='t1') + task_2_ex = self._assert_single_item(task_execs, name='t2') # Check action executions of task 1. self.assertEqual(states.SUCCESS, task_1_ex.state) @@ -547,8 +562,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase): # The single action execution that succeeded should not re-run. self.assertEqual(5, len(task_1_action_exs)) - self.assertListEqual(['Task 1.0', 'Task 1.1', 'Task 1.2'], - task_1_ex.published.get('v1')) + self.assertListEqual( + ['Task 1.0', 'Task 1.1', 'Task 1.2'], + task_1_ex.published.get('v1') + ) # Check action executions of task 2. self.assertEqual(states.SUCCESS, task_2_ex.state) diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 9625dfdf..29a2bcb6 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -172,8 +172,7 @@ class EnvironmentTest(base.EngineTestCase): 'mistral.actions.std_actions.EchoAction', {}, a_ex.input, - TARGET, - True + TARGET ) def test_subworkflow_env_task_input(self): diff --git a/mistral/tests/unit/engine/test_error_handling.py b/mistral/tests/unit/engine/test_error_handling.py new file mode 100644 index 00000000..7696729a --- /dev/null +++ b/mistral/tests/unit/engine/test_error_handling.py @@ -0,0 +1,61 @@ +# Copyright 2016 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from mistral.db.v2 import api as db_api +from mistral.services import workflows as wf_service +from mistral.tests.unit.engine import base +from mistral.workflow import states + + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +class DirectWorkflowEngineTest(base.EngineTestCase): + def test_action_error(self): + wf_text = """ + version: '2.0' + + wf: + tasks: + task1: + action: std.fail + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf', {}) + + self.await_workflow_error(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + self.assertEqual(1, len(task_execs)) + + self._assert_single_item(task_execs, name='task1', state=states.ERROR) + + # TODO(rakhmerov): Finish. Need to check more complicated cases. + + def test_task_error(self): + # TODO(rakhmerov): Implement. + pass + + def test_workflow_error(self): + # TODO(rakhmerov): Implement. + pass diff --git a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py index 827faf63..f493a66d 100644 --- a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py +++ b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py @@ -178,13 +178,13 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertIn( - 'Failure caused by error in tasks: task1', + 'Failed to handle action completion [wf=wf, task=task1', wf_ex.state_info ) task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') - self.assertEqual( + self.assertIn( "Size of 'published' is 1KB which exceeds the limit of 0KB", task_ex.state_info ) diff --git a/mistral/tests/unit/engine/test_join.py b/mistral/tests/unit/engine/test_join.py index 1809de4b..b150eff5 100644 --- a/mistral/tests/unit/engine/test_join.py +++ b/mistral/tests/unit/engine/test_join.py @@ -576,6 +576,7 @@ class JoinEngineTest(base.EngineTestCase): action: std.noop on-success: - task22 + task22: action: std.noop on-success: @@ -585,6 +586,7 @@ class JoinEngineTest(base.EngineTestCase): action: std.fail on-success: - task32 + task32: action: std.noop on-success: diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py index e8685148..3aedd7bd 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py @@ -303,7 +303,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): # Resume workflow and re-run failed task. e = self.assertRaises( - exc.EngineException, + exc.MistralError, self.engine.rerun_workflow, wf_ex.id, task_1_ex.id diff --git a/mistral/tests/unit/engine/test_run_action.py b/mistral/tests/unit/engine/test_run_action.py index 8404fa01..2f5c36fc 100644 --- a/mistral/tests/unit/engine/test_run_action.py +++ b/mistral/tests/unit/engine/test_run_action.py @@ -65,6 +65,7 @@ class RunActionEngineTest(base.EngineTestCase): # Start action and see the result. action_ex = self.engine.start_action('std.echo', {'output': 'Hello!'}) + self.assertIsNotNone(action_ex.output) self.assertIn('some error', action_ex.output['result']) def test_run_action_save_result(self): @@ -148,10 +149,15 @@ class RunActionEngineTest(base.EngineTestCase): self.assertIn('concat', exception.message) - @mock.patch('mistral.engine.action_handler.resolve_action_definition') + # TODO(rakhmerov): This is an example of a bad test. It pins to + # implementation details too much and prevents from making refactoring + # easily. When writing tests we should make assertions about + # consequences, not about how internal machinery works, i.e. we need to + # follow "black box" testing paradigm. + @mock.patch('mistral.engine.actions.resolve_action_definition') @mock.patch('mistral.engine.utils.validate_input') @mock.patch('mistral.services.action_manager.get_action_class') - @mock.patch('mistral.engine.action_handler.run_action') + @mock.patch('mistral.engine.actions.PythonAction.run') def test_run_action_with_kwargs_input(self, run_mock, class_mock, validate_mock, def_mock): action_def = models.ActionDefinition() @@ -172,16 +178,15 @@ class RunActionEngineTest(base.EngineTestCase): self.engine.start_action('fake_action', {'input': 'Hello'}) - self.assertEqual(2, def_mock.call_count) - def_mock.assert_called_with('fake_action', None, None) + self.assertEqual(1, def_mock.call_count) + def_mock.assert_called_with('fake_action') self.assertEqual(0, validate_mock.call_count) class_ret.assert_called_once_with(input='Hello') run_mock.assert_called_once_with( - action_def, {'input': 'Hello'}, - target=None, - async=False + None, + save=None ) diff --git a/mistral/tests/unit/engine/test_subworkflows.py b/mistral/tests/unit/engine/test_subworkflows.py index d00f5ef8..c18434c4 100644 --- a/mistral/tests/unit/engine/test_subworkflows.py +++ b/mistral/tests/unit/engine/test_subworkflows.py @@ -181,7 +181,7 @@ class SubworkflowsTest(base.EngineTestCase): @mock.patch.object(std_actions.EchoAction, 'run', mock.MagicMock(side_effect=exc.ActionException)) def test_subworkflow_error(self): - wf2_ex = self.engine.start_workflow('wb1.wf2', None) + self.engine.start_workflow('wb1.wf2', None) self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) @@ -208,11 +208,13 @@ class SubworkflowsTest(base.EngineTestCase): self.assertEqual(2, len(wf_execs)) wf2_ex = self._assert_single_item(wf_execs, name='wb2.wf2') + self.assertEqual(states.ERROR, wf2_ex.state) self.assertIn('Can not evaluate YAQL expression', wf2_ex.state_info) # Ensure error message is bubbled up to the main workflow. wf1_ex = self._assert_single_item(wf_execs, name='wb2.wf1') + self.assertEqual(states.ERROR, wf1_ex.state) self.assertIn('Can not evaluate YAQL expression', wf1_ex.state_info) diff --git a/mistral/tests/unit/engine/test_workflow_resume.py b/mistral/tests/unit/engine/test_workflow_resume.py index b55d7f90..4384fdfd 100644 --- a/mistral/tests/unit/engine/test_workflow_resume.py +++ b/mistral/tests/unit/engine/test_workflow_resume.py @@ -16,7 +16,6 @@ import mock from oslo_config import cfg from mistral.db.v2 import api as db_api -from mistral.engine import default_engine as de from mistral import exceptions as exc from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base @@ -139,7 +138,7 @@ workflows: tasks: task1: - action: std.echo output="Hi!" + action: std.echo output="Task 1" on-complete: - task3 - pause @@ -299,7 +298,7 @@ class WorkflowResumeTest(base.EngineTestCase): self.engine.resume_workflow(wf_ex.id) - self.await_execution_success(wf_ex.id, 1, 5) + self.await_execution_success(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -351,8 +350,7 @@ class WorkflowResumeTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, wf_ex.state, wf_ex.state_info) self.assertEqual(4, len(wf_ex.task_executions)) - @mock.patch.object(de.DefaultEngine, '_fail_workflow') - def test_resume_fails(self, mock_fw): + def test_resume_fails(self): # Start and pause workflow. wb_service.create_workbook_v2(WORKBOOK_DIFFERENT_TASK_STATES) @@ -365,7 +363,7 @@ class WorkflowResumeTest(base.EngineTestCase): self.assertEqual(states.PAUSED, wf_ex.state) # Simulate failure and check if it is handled. - err = exc.MistralException('foo') + err = exc.MistralError('foo') with mock.patch.object( db_api, @@ -373,13 +371,11 @@ class WorkflowResumeTest(base.EngineTestCase): side_effect=err): self.assertRaises( - exc.MistralException, + exc.MistralError, self.engine.resume_workflow, wf_ex.id ) - mock_fw.assert_called_once_with(wf_ex.id, err) - def test_resume_diff_env_vars(self): wb_service.create_workbook_v2(RESUME_WORKBOOK_DIFF_ENV_VAR) diff --git a/mistral/tests/unit/workflow/test_direct_workflow.py b/mistral/tests/unit/workflow/test_direct_workflow.py index 8e864a0e..541a1dc2 100644 --- a/mistral/tests/unit/workflow/test_direct_workflow.py +++ b/mistral/tests/unit/workflow/test_direct_workflow.py @@ -103,7 +103,7 @@ class DirectWorkflowControllerTest(base.DbTestCase): state=states.SUCCESS, output={'result': 'Hey'}, accepted=True, - runtime_context={'with_items_index': 0} + runtime_context={'index': 0} ) ) diff --git a/mistral/tests/unit/workflow/test_with_items.py b/mistral/tests/unit/workflow/test_with_items.py index 90edfd5a..eeab387b 100644 --- a/mistral/tests/unit/workflow/test_with_items.py +++ b/mistral/tests/unit/workflow/test_with_items.py @@ -25,9 +25,7 @@ class WithItemsTest(base.BaseTest): return models.ActionExecution( accepted=accepted, state=state, - runtime_context={ - 'with_items_index': index - } + runtime_context={'index': index} ) def test_get_indices(self): diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index fd6b761e..c13923f0 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -23,6 +23,7 @@ from os import path import shutil import six import socket +import sys import tempfile import threading import uuid @@ -169,6 +170,16 @@ def cut(data, length=100): return string +def cut_by_kb(data, kilobytes): + if kilobytes <= 0: + return cut(data) + + bytes_per_char = sys.getsizeof('s') - sys.getsizeof('') + length = int(kilobytes * 1024 / bytes_per_char) + + return cut(data, length) + + def iter_subclasses(cls, _seen=None): """Generator over all subclasses of a given class in depth first order.""" diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 23a0e117..da8092d4 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -83,7 +83,8 @@ class WorkflowController(object): self.wf_spec = wf_spec - def _update_task_ex_env(self, task_ex, env): + @staticmethod + def _update_task_ex_env(task_ex, env): if not env: return task_ex diff --git a/mistral/workflow/commands.py b/mistral/workflow/commands.py index ad7b9727..739818ce 100644 --- a/mistral/workflow/commands.py +++ b/mistral/workflow/commands.py @@ -45,17 +45,18 @@ class RunTask(WorkflowCommand): def __init__(self, wf_ex, task_spec, ctx): super(RunTask, self).__init__(wf_ex, task_spec, ctx) - self.wait_flag = False + + self.wait = False def is_waiting(self): - return (self.wait_flag and + return (self.wait and isinstance(self.task_spec, tasks.DirectWorkflowTaskSpec) and self.task_spec.get_join()) def __repr__(self): return ( "Run task [workflow=%s, task=%s, waif_flag=%s]" - % (self.wf_ex.name, self.task_spec.get_name(), self.wait_flag) + % (self.wf_ex.name, self.task_spec.get_name(), self.wait) ) diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 62c3bf9d..d0851e5b 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -73,7 +73,7 @@ def get_task_execution_result(task_ex): # use db_api.get_action_executions here to avoid session-less use cases. action_execs = db_api.get_action_executions(task_execution_id=task_ex.id) action_execs.sort( - key=lambda x: x.runtime_context.get('with_items_index') + key=lambda x: x.runtime_context.get('index') ) results = [ @@ -111,12 +111,6 @@ def publish_variables(task_ex, task_spec): ) -def destroy_task_result(task_ex): - for ex in task_ex.executions: - if hasattr(ex, 'output'): - ex.output = {} - - def evaluate_task_outbound_context(task_ex): """Evaluates task outbound Data Flow context. diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index d4aef7c8..fad7fdab 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -121,7 +121,7 @@ class DirectWorkflowController(base.WorkflowController): # NOTE(xylan): Decide whether or not a join task should run # immediately. if self._is_unsatisfied_join(cmd): - cmd.wait_flag = True + cmd.wait = True cmds.append(cmd) diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index 282bb5a0..345bc010 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -20,6 +20,9 @@ from mistral import exceptions as exc from mistral.workflow import states +# TODO(rakhmerov): Seems like it makes sense to get rid of this module in favor +# of implementing all the needed logic in engine.tasks.WithItemsTask directly. + _CAPACITY = 'capacity' _CONCURRENCY = 'concurrency' _COUNT = 'count' @@ -73,7 +76,7 @@ def _get_with_item_indices(exs): :param exs: List of executions. :return: a list of numbers. """ - return sorted(set([ex.runtime_context['with_items_index'] for ex in exs])) + return sorted(set([ex.runtime_context['index'] for ex in exs])) def _get_accepted_act_exs(task_ex): diff --git a/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py b/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py index b05cc440..05918cbd 100644 --- a/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py +++ b/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py @@ -597,18 +597,14 @@ class ExecutionTestsV2(base.TestCase): @test.attr(type='negative') def test_create_execution_for_reverse_wf_invalid_start_task(self): - _, wf_ex = self.client.create_execution( + self.assertRaises( + exceptions.BadRequest, + self.client.create_execution, self.reverse_wf['name'], - { - self.reverse_wf['input']: "Bye"}, - { - "task_name": "nonexist" - } + {self.reverse_wf['input']: "Bye"}, + {"task_name": "nonexist"} ) - self.assertEqual("ERROR", wf_ex['state']) - self.assertIn("Invalid task name", wf_ex['state_info']) - @test.attr(type='negative') def test_create_execution_forgot_input_params(self): self.assertRaises(exceptions.BadRequest,