From 816bfd9dcc63dad735e6429c6f952147c88c159b Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 11 May 2016 13:36:50 +0700 Subject: [PATCH] Refactor Mistral Engine * Introduced class hierarchies Task and Action used by Mistral engine. Note: Action here is a different than executor Action and represents rather actions of different types: regular python action, ad-hoc action and workflow action (since for task action and workflow are polymorphic) * Refactored task_handler.py and action_handler.py with Task and Action hierarchies * Rebuilt a chain call so that the entire action processing would look like a chain of calls Action -> Task -> Workflow where each level knows only about the next level and can influence it (e.g. if adhoc action has failed due to YAQL error in 'output' transformer action itself fails its task) * Refactored policies according to new object model * Fixed some of the tests to match the idea of having two types of exceptions, MistralException and MistralError, where the latter is considered either a harsh environmental problem or a logical issue in the system itself so that it must not be handled anywhere in the code TODO(in subsequent patches): * Refactor WithItemsTask w/o using with_items.py * Remove DB transaction in Scheduler when making a delayed call, helper policy methods like 'continue_workflow' * Refactor policies test so that workflow definitions live right in test methods * Refactor workflow_handler with Workflow abstraction * Get rid of RunExistingTask workflow command, it should be just one command with various properties * Refactor resume and rerun with Task abstraction (same way as other methods, e.g. on_action_complete()) * Add error handling to all required places such as task_handler.continue_task() * More tests for error handling P.S. This patch is very big but it was nearly impossible to split it into multiple smaller patches just because how entangled everything was in Mistral Engine. Partially implements: blueprint mistral-engine-error-handling Implements: blueprint mistral-action-result-processing-pipeline Implements: blueprint mistral-refactor-task-handler Closes-Bug: #1568909 Change-Id: I0668e695c60dde31efc690563fc891387d44d6ba --- mistral/db/v2/sqlalchemy/models.py | 4 +- mistral/engine/action_handler.py | 323 ++------- mistral/engine/actions.py | 477 +++++++++++++ mistral/engine/default_engine.py | 422 ++---------- mistral/engine/dispatcher.py | 46 ++ mistral/engine/policies.py | 69 +- mistral/engine/rpc.py | 18 +- mistral/engine/task_handler.py | 633 +++--------------- mistral/engine/tasks.py | 456 +++++++++++++ mistral/engine/utils.py | 3 + mistral/engine/workflow_handler.py | 110 ++- mistral/services/executions.py | 4 +- mistral/services/scheduler.py | 8 +- mistral/tests/unit/engine/base.py | 34 +- mistral/tests/unit/engine/test_dataflow.py | 10 +- .../tests/unit/engine/test_direct_workflow.py | 6 +- .../unit/engine/test_direct_workflow_rerun.py | 43 +- mistral/tests/unit/engine/test_environment.py | 3 +- .../tests/unit/engine/test_error_handling.py | 61 ++ .../test_execution_fields_size_limitation.py | 4 +- mistral/tests/unit/engine/test_join.py | 2 + .../engine/test_reverse_workflow_rerun.py | 2 +- mistral/tests/unit/engine/test_run_action.py | 19 +- .../tests/unit/engine/test_subworkflows.py | 4 +- .../tests/unit/engine/test_workflow_resume.py | 14 +- .../unit/workflow/test_direct_workflow.py | 2 +- .../tests/unit/workflow/test_with_items.py | 4 +- mistral/utils/__init__.py | 11 + mistral/workflow/base.py | 3 +- mistral/workflow/commands.py | 7 +- mistral/workflow/data_flow.py | 8 +- mistral/workflow/direct_workflow.py | 2 +- mistral/workflow/with_items.py | 5 +- .../tests/api/v2/test_mistral_basic_v2.py | 14 +- 34 files changed, 1564 insertions(+), 1267 deletions(-) create mode 100644 mistral/engine/actions.py create mode 100644 mistral/engine/dispatcher.py create mode 100644 mistral/engine/tasks.py create mode 100644 mistral/tests/unit/engine/test_error_handling.py diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index 78a42e5c..4f52ea4c 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -199,7 +199,7 @@ def validate_long_type_length(cls, field_name, value): size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb # If the size is unlimited. - if (size_limit_kb < 0): + if size_limit_kb < 0: return size_kb = int(sys.getsizeof(str(value)) / 1024) @@ -394,6 +394,8 @@ class CronTrigger(mb.MistralSecureModelBase): # Register all hooks related to secure models. mb.register_secure_model_hooks() +# TODO(rakhmerov): This is a bad solution. It's hard to find in the code, +# configure flexibly etc. Fix it. # Register an event listener to verify that the size of all the long columns # affected by the user do not exceed the limit configuration. for attr_name in ['input', 'output', 'params', 'published']: diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index aa5e152f..118722fa 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -12,302 +12,81 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_log import log as logging +import traceback as tb -from mistral.db.v2 import api as db_api -from mistral.engine import rpc -from mistral.engine import utils as e_utils +from mistral.db.v2.sqlalchemy import models +from mistral.engine import actions +from mistral.engine import task_handler from mistral import exceptions as exc -from mistral import expressions as expr -from mistral.services import action_manager as a_m -from mistral.services import security -from mistral import utils -from mistral.utils import wf_trace from mistral.workbook import parser as spec_parser -from mistral.workflow import states -from mistral.workflow import utils as wf_utils -def create_action_execution(action_def, action_input, task_ex=None, - index=0, description=''): - # TODO(rakhmerov): We can avoid hitting DB at all when calling things like - # create_action_execution(), these operations can be just done using - # SQLAlchemy session (1-level cache) and session flush (on TX commit) would - # send necessary SQL queries to DB. Currently, session flush happens - # on every operation which may not be optimal. The problem with using just - # session level cache is in generating ids. Ids are generated only on - # session flush. And now we have a lot places where we need to have ids - # before TX completion. +LOG = logging.getLogger(__name__) - # Assign the action execution ID here to minimize database calls. - # Otherwise, the input property of the action execution DB object needs - # to be updated with the action execution ID after the action execution - # DB object is created. - action_ex_id = utils.generate_unicode_uuid() - if a_m.has_action_context( - action_def.action_class, action_def.attributes or {}) and task_ex: - action_input.update(a_m.get_action_context(task_ex, action_ex_id)) +def on_action_complete(action_ex, result): + task_ex = action_ex.task_execution - values = { - 'id': action_ex_id, - 'name': action_def.name, - 'spec': action_def.spec, - 'state': states.RUNNING, - 'input': action_input, - 'runtime_context': {'with_items_index': index}, - 'description': description - } + action = _build_action(action_ex) + + try: + action.complete(result) + except exc.MistralException as e: + msg = ("Failed to complete action [action=%s, task=%s]: %s\n%s" % + (action_ex.name, task_ex.name, e, tb.format_exc())) + + LOG.error(msg) + + action.fail(msg) + + if task_ex: + task_handler.fail_task(task_ex, msg) + + return if task_ex: - values.update({ - 'task_execution_id': task_ex.id, - 'workflow_name': task_ex.workflow_name, - 'workflow_id': task_ex.workflow_id, - 'project_id': task_ex.project_id, - }) - else: - values.update({ - 'project_id': security.get_project_id(), - }) - - action_ex = db_api.create_action_execution(values) - - if task_ex: - # Add to collection explicitly so that it's in a proper - # state within the current session. - task_ex.executions.append(action_ex) - - return action_ex + task_handler.on_action_complete(action_ex) -def _inject_action_ctx_for_validating(action_def, input_dict): - if a_m.has_action_context(action_def.action_class, action_def.attributes): - input_dict.update(a_m.get_empty_action_context()) +def _build_action(action_ex): + if isinstance(action_ex, models.WorkflowExecution): + return actions.WorkflowAction(None, action_ex=action_ex) + wf_name = None + wf_spec_name = None -def get_action_input(action_name, input_dict, wf_name=None, wf_spec=None): - action_def = resolve_action_definition( - action_name, - wf_name, - wf_spec.get_name() if wf_spec else None - ) + if action_ex.workflow_name: + wf_name = action_ex.workflow_name + wf_spec = spec_parser.get_workflow_spec( + action_ex.task_execution.workflow_execution.spec + ) + wf_spec_name = wf_spec.get_name() - if action_def.action_class: - _inject_action_ctx_for_validating(action_def, input_dict) + adhoc_action_name = action_ex.runtime_context.get('adhoc_action_name') - # NOTE(xylan): Don't validate action input if action initialization method - # contains ** argument. - if '**' not in action_def.input: - e_utils.validate_input(action_def, input_dict) - - if action_def.spec: - # Ad-hoc action. - return _get_adhoc_action_input( - action_def, - input_dict, + if adhoc_action_name: + action_def = actions.resolve_action_definition( + adhoc_action_name, wf_name, - wf_spec + wf_spec_name ) - return input_dict + return actions.AdHocAction(action_def, action_ex=action_ex) - -def _get_adhoc_action_input(action_def, input_dict, - wf_name=None, wf_spec=None): - action_spec = spec_parser.get_action_spec(action_def.spec) - - base_name = action_spec.get_base() - - action_def = resolve_action_definition( - base_name, - wf_name if wf_name else None, - wf_spec.get_name() if wf_spec else None - ) - - _inject_action_ctx_for_validating(action_def, input_dict) - e_utils.validate_input(action_def, input_dict, action_spec) - - base_input = action_spec.get_base_input() - - if base_input: - input_dict = expr.evaluate_recursively( - base_input, - input_dict - ) - else: - input_dict = {} - - return input_dict - - -def run_action(action_def, action_input, - action_ex_id=None, target=None, async=True): - action_result = rpc.get_executor_client().run_action( - action_ex_id, - action_def.action_class, - action_def.attributes or {}, - action_input, - target, - async - ) - - if action_result: - return _get_action_output(action_result) - - -def _get_action_output(result): - """Returns action output. - - :param result: ActionResult instance or ActionResult dict - :return: dict containing result. - """ - if isinstance(result, dict): - result = wf_utils.Result(result.get('data'), result.get('error')) - - return ({'result': result.data} - if result.is_success() else {'result': result.error}) - - -def store_action_result(action_ex, result): - prev_state = action_ex.state - - action_ex.state = states.SUCCESS if result.is_success() else states.ERROR - action_ex.output = _get_action_output(result) - - action_ex.accepted = True - - _log_action_result(action_ex, prev_state, action_ex.state, result) - - return action_ex - - -def _log_action_result(action_ex, from_state, to_state, result): - def _result_msg(): - if action_ex.state == states.ERROR: - return "error = %s" % utils.cut(result.error) - - return "result = %s" % utils.cut(result.data) - - wf_trace.info( - None, - "Action execution '%s' [%s -> %s, %s]" % - (action_ex.name, from_state, to_state, _result_msg()) - ) - - -def run_existing_action(action_ex_id, target): - action_ex = db_api.get_action_execution(action_ex_id) - action_def = db_api.get_action_definition(action_ex.name) - - return run_action( - action_def, - action_ex.input, - action_ex_id, - target - ) - - -def resolve_definition(action_name, task_ex=None, wf_spec=None): - if task_ex and wf_spec: - wf_ex = task_ex.workflow_execution - - action_def = resolve_action_definition( - action_name, - wf_ex.workflow_name, - wf_spec.get_name() - ) - else: - action_def = resolve_action_definition(action_name) - - if action_def.spec: - # Ad-hoc action. - action_spec = spec_parser.get_action_spec(action_def.spec) - - base_name = action_spec.get_base() - - action_def = resolve_action_definition( - base_name, - task_ex.workflow_name if task_ex else None, - wf_spec.get_name() if wf_spec else None - ) - - return action_def - - -def resolve_action_definition(action_spec_name, wf_name=None, - wf_spec_name=None): - action_db = None - - if wf_name and wf_name != wf_spec_name: - # If workflow belongs to a workbook then check - # action within the same workbook (to be able to - # use short names within workbooks). - # If it doesn't exist then use a name from spec - # to find an action in DB. - wb_name = wf_name.rstrip(wf_spec_name)[:-1] - - action_full_name = "%s.%s" % (wb_name, action_spec_name) - - action_db = db_api.load_action_definition(action_full_name) - - if not action_db: - action_db = db_api.load_action_definition(action_spec_name) - - if not action_db: - raise exc.InvalidActionException( - "Failed to find action [action_name=%s]" % action_spec_name - ) - - return action_db - - -def transform_result(result, task_ex, task_spec): - """Transforms task result accounting for ad-hoc actions. - - In case if the given result is an action result and action is - an ad-hoc action the method transforms the result according to - ad-hoc action configuration. - - :param result: Result of task action/workflow. - :param task_ex: Task DB model. - :param task_spec: Task specification. - """ - if result.is_error(): - return result - - action_spec_name = task_spec.get_action_name() - - if action_spec_name: - wf_ex = task_ex.workflow_execution - wf_spec_name = wf_ex.spec['name'] - - return transform_action_result( - action_spec_name, - result, - wf_ex.workflow_name, - wf_spec_name, - ) - - return result - - -def transform_action_result(action_spec_name, result, - wf_name=None, wf_spec_name=None): - action_def = resolve_action_definition( - action_spec_name, + action_def = actions.resolve_action_definition( + action_ex.name, wf_name, wf_spec_name ) - if not action_def.spec: - return result + return actions.PythonAction(action_def, action_ex=action_ex) - transformer = spec_parser.get_action_spec(action_def.spec).get_output() - if transformer is None: - return result +def build_action_by_name(action_name): + action_def = actions.resolve_action_definition(action_name) - return wf_utils.Result( - data=expr.evaluate_recursively(transformer, result.data), - error=result.error - ) + action_cls = (actions.PythonAction if not action_def.spec + else actions.AdHocAction) + + return action_cls(action_def) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py new file mode 100644 index 00000000..c209f10a --- /dev/null +++ b/mistral/engine/actions.py @@ -0,0 +1,477 @@ +# Copyright 2016 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from oslo_config import cfg +from oslo_log import log as logging +import six + +from mistral.db.v2 import api as db_api +from mistral.engine import rpc +from mistral.engine import utils as e_utils +from mistral import exceptions as exc +from mistral import expressions as expr +from mistral.services import action_manager as a_m +from mistral.services import executions as wf_ex_service +from mistral.services import scheduler +from mistral.services import security +from mistral import utils +from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import states +from mistral.workflow import utils as wf_utils + + +LOG = logging.getLogger(__name__) + +_RUN_EXISTING_ACTION_PATH = 'mistral.engine.actions._run_existing_action' +_RESUME_WORKFLOW_PATH = 'mistral.engine.actions._resume_workflow' + + +@six.add_metaclass(abc.ABCMeta) +class Action(object): + """Action. + + Represents a workflow action and defines interface that can be used by + Mistral engine or its components in order to manipulate with actions. + """ + + def __init__(self, action_def, action_ex=None, task_ex=None): + self.action_def = action_def + self.action_ex = action_ex + self.task_ex = action_ex.task_execution if action_ex else task_ex + + @abc.abstractmethod + def complete(self, result): + """Complete action and process its result. + + :param result: Action result. + """ + raise NotImplementedError + + def fail(self, msg): + # When we set an ERROR state we should safely set output value getting + # w/o exceptions due to field size limitations. + msg = utils.cut_by_kb( + msg, + cfg.CONF.engine.execution_field_size_limit_kb + ) + + self.action_ex.state = states.ERROR + self.action_ex.output = {'result': msg} + + @abc.abstractmethod + def schedule(self, input_dict, target, index=0, desc=''): + """Schedule action run. + + This method is needed to schedule action run so its result can + be received later by engine. In this sense it will be running in + asynchronous mode from engine perspective (don't confuse with + executor asynchrony when executor doesn't immediately send a + result). + + :param input_dict: Action input. + :param target: Target (group of action executors). + :param index: Action execution index. Makes sense for some types. + :param desc: Action execution description. + """ + raise NotImplementedError + + @abc.abstractmethod + def run(self, input_dict, target, index=0, desc='', save=True): + """Immediately run action. + + This method runs method w/o scheduling its run for a later time. + From engine perspective action will be processed in synchronous + mode. + + :param input_dict: Action input. + :param target: Target (group of action executors). + :param index: Action execution index. Makes sense for some types. + :param desc: Action execution description. + :param save: True if action execution object needs to be saved. + :return: Action output. + """ + raise NotImplementedError + + def validate_input(self, input_dict): + """Validates action input parameters. + + :param input_dict: Dictionary with input parameters. + """ + raise NotImplementedError + + def is_sync(self, input_dict): + """Determines if action is synchronous. + + :param input_dict: Dictionary with input parameters. + """ + return True + + def _create_action_execution(self, input_dict, runtime_ctx, desc=''): + # Assign the action execution ID here to minimize database calls. + # Otherwise, the input property of the action execution DB object needs + # to be updated with the action execution ID after the action execution + # DB object is created. + action_ex_id = utils.generate_unicode_uuid() + + # TODO(rakhmerov): Bad place, we probably need to push action context + # to all actions. It's related to + # https://blueprints.launchpad.net/mistral/+spec/mistral-custom-actions-api + if a_m.has_action_context( + self.action_def.action_class, + self.action_def.attributes or {}) and self.task_ex: + input_dict.update( + a_m.get_action_context(self.task_ex, action_ex_id) + ) + + values = { + 'id': action_ex_id, + 'name': self.action_def.name, + 'spec': self.action_def.spec, + 'state': states.RUNNING, + 'input': input_dict, + 'runtime_context': runtime_ctx, + 'description': desc + } + + if self.task_ex: + values.update({ + 'task_execution_id': self.task_ex.id, + 'workflow_name': self.task_ex.workflow_name, + 'workflow_id': self.task_ex.workflow_id, + 'project_id': self.task_ex.project_id, + }) + else: + values.update({ + 'project_id': security.get_project_id(), + }) + + self.action_ex = db_api.create_action_execution(values) + + if self.task_ex: + # Add to collection explicitly so that it's in a proper + # state within the current session. + self.task_ex.executions.append(self.action_ex) + + def _inject_action_ctx_for_validating(self, input_dict): + if a_m.has_action_context( + self.action_def.action_class, self.action_def.attributes): + input_dict.update(a_m.get_empty_action_context()) + + def _log_result(self, prev_state, result): + state = self.action_ex.state + + def _result_msg(): + if state == states.ERROR: + return "error = %s" % utils.cut(result.error) + + return "result = %s" % utils.cut(result.data) + + wf_trace.info( + None, + "Action execution '%s' [%s -> %s, %s]" % + (self.action_ex.name, prev_state, state, _result_msg()) + ) + + +class PythonAction(Action): + """Regular Python action.""" + + def complete(self, result): + if states.is_completed(self.action_ex.state): + return + + prev_state = self.action_ex.state + + self.action_ex.state = (states.SUCCESS if result.is_success() + else states.ERROR) + self.action_ex.output = self._prepare_output(result) + self.action_ex.accepted = True + + self._log_result(prev_state, result) + + def schedule(self, input_dict, target, index=0, desc=''): + self._create_action_execution( + self._prepare_input(input_dict), + self._prepare_runtime_context(index), + desc=desc + ) + + scheduler.schedule_call( + None, + _RUN_EXISTING_ACTION_PATH, + 0, + action_ex_id=self.action_ex.id, + target=target + ) + + def run(self, input_dict, target, index=0, desc='', save=True): + input_dict = self._prepare_input(input_dict) + runtime_ctx = self._prepare_runtime_context(index) + + if save: + self._create_action_execution(input_dict, runtime_ctx, desc=desc) + + result = rpc.get_executor_client().run_action( + self.action_ex.id if self.action_ex else None, + self.action_def.action_class, + self.action_def.attributes or {}, + input_dict, + target, + async=False + ) + + return self._prepare_output(result) + + def is_sync(self, input_dict): + input_dict = self._prepare_input(input_dict) + + a = a_m.get_action_class(self.action_def.name)(**input_dict) + + return a.is_sync() + + def validate_input(self, input_dict): + if self.action_def.action_class: + self._inject_action_ctx_for_validating(input_dict) + + # TODO(rakhmerov): I'm not sure what this is for. + # NOTE(xylan): Don't validate action input if action initialization + # method contains ** argument. + if '**' not in self.action_def.input: + e_utils.validate_input(self.action_def, input_dict) + + def _prepare_input(self, input_dict): + """Template method to do manipulations with input parameters. + + Python action doesn't do anything specific with initial input. + """ + return input_dict + + def _prepare_output(self, result): + """Template method to do manipulations with action result. + + Python action just wraps action result into dict that can + be stored in DB. + """ + return _get_action_output(result) if result else None + + def _prepare_runtime_context(self, index): + """Template method to prepare action runtime context. + + Python action inserts index into runtime context. + """ + return {'index': index} + + +class AdHocAction(PythonAction): + """Ad-hoc action.""" + + def __init__(self, action_def, action_ex=None, task_ex=None): + self.action_spec = spec_parser.get_action_spec(action_def.spec) + + base_action_def = db_api.get_action_definition( + self.action_spec.get_base() + ) + + super(AdHocAction, self).__init__( + base_action_def, + action_ex, + task_ex + ) + + self.adhoc_action_def = action_def + + def validate_input(self, input_dict): + e_utils.validate_input( + self.adhoc_action_def, + input_dict, + self.action_spec + ) + + super(AdHocAction, self).validate_input( + self._prepare_input(input_dict) + ) + + def _prepare_input(self, input_dict): + base_input_expr = self.action_spec.get_base_input() + + if base_input_expr: + base_input_dict = expr.evaluate_recursively( + base_input_expr, + input_dict + ) + else: + base_input_dict = {} + + return super(AdHocAction, self)._prepare_input(base_input_dict) + + def _prepare_output(self, result): + # In case of error, we don't transform a result. + if not result.is_error(): + adhoc_action_spec = spec_parser.get_action_spec( + self.adhoc_action_def.spec + ) + + transformer = adhoc_action_spec.get_output() + + if transformer is not None: + result = wf_utils.Result( + data=expr.evaluate_recursively(transformer, result.data), + error=result.error + ) + + return _get_action_output(result) if result else None + + def _prepare_runtime_context(self, index): + ctx = super(AdHocAction, self)._prepare_runtime_context(index) + + # Insert special field into runtime context so that we track + # a relationship between python action and adhoc action. + return utils.merge_dicts( + ctx, + {'adhoc_action_name': self.adhoc_action_def.name} + ) + + +class WorkflowAction(Action): + """Workflow action.""" + + def complete(self, result): + # No-op because in case of workflow result is already processed. + pass + + def schedule(self, input_dict, target, index=0, desc=''): + parent_wf_ex = self.task_ex.workflow_execution + parent_wf_spec = spec_parser.get_workflow_spec(parent_wf_ex.spec) + + task_spec = spec_parser.get_task_spec(self.task_ex.spec) + + wf_spec_name = task_spec.get_workflow_name() + + wf_def = e_utils.resolve_workflow_definition( + parent_wf_ex.workflow_name, + parent_wf_spec.get_name(), + wf_spec_name + ) + + wf_spec = spec_parser.get_workflow_spec(wf_def.spec) + + wf_params = { + 'task_execution_id': self.task_ex.id, + 'index': index + } + + if 'env' in parent_wf_ex.params: + wf_params['env'] = parent_wf_ex.params['env'] + + for k, v in list(input_dict.items()): + if k not in wf_spec.get_input(): + wf_params[k] = v + del input_dict[k] + + wf_ex, _ = wf_ex_service.create_workflow_execution( + wf_def.name, + input_dict, + "sub-workflow execution", + wf_params, + wf_spec + ) + + scheduler.schedule_call( + None, + _RESUME_WORKFLOW_PATH, + 0, + wf_ex_id=wf_ex.id, + env=None + ) + + # TODO(rakhmerov): Add info logging. + + def run(self, input_dict, target, index=0, desc='', save=True): + raise NotImplemented('Does not apply to this WorkflowAction.') + + def is_sync(self, input_dict): + # Workflow action is always asynchronous. + return False + + def validate_input(self, input_dict): + # TODO(rakhmerov): Implement. + pass + + +def _resume_workflow(wf_ex_id, env): + rpc.get_engine_client().resume_workflow(wf_ex_id, env=env) + + +def _run_existing_action(action_ex_id, target): + action_ex = db_api.get_action_execution(action_ex_id) + action_def = db_api.get_action_definition(action_ex.name) + + result = rpc.get_executor_client().run_action( + action_ex_id, + action_def.action_class, + action_def.attributes or {}, + action_ex.input, + target + ) + + return _get_action_output(result) if result else None + + +def _get_action_output(result): + """Returns action output. + + :param result: ActionResult instance or ActionResult dict + :return: dict containing result. + """ + if isinstance(result, dict): + result = wf_utils.Result(result.get('data'), result.get('error')) + + return ({'result': result.data} + if result.is_success() else {'result': result.error}) + + +def resolve_action_definition(action_spec_name, wf_name=None, + wf_spec_name=None): + """Resolve action definition accounting for ad-hoc action namespacing. + + :param action_spec_name: Action name according to a spec. + :param wf_name: Workflow name. + :param wf_spec_name: Workflow name according to a spec. + :return: Action definition (python or ad-hoc). + """ + action_db = None + + if wf_name and wf_name != wf_spec_name: + # If workflow belongs to a workbook then check + # action within the same workbook (to be able to + # use short names within workbooks). + # If it doesn't exist then use a name from spec + # to find an action in DB. + wb_name = wf_name.rstrip(wf_spec_name)[:-1] + + action_full_name = "%s.%s" % (wb_name, action_spec_name) + + action_db = db_api.load_action_definition(action_full_name) + + if not action_db: + action_db = db_api.load_action_definition(action_spec_name) + + if not action_db: + raise exc.InvalidActionException( + "Failed to find action [action_name=%s]" % action_spec_name + ) + + return action_db diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 944e25e7..34d4334c 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import traceback - from oslo_log import log as logging from mistral import coordination @@ -22,19 +20,14 @@ from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler from mistral.engine import base -from mistral.engine import task_handler +from mistral.engine import dispatcher from mistral.engine import workflow_handler as wf_handler -from mistral import exceptions as exc -from mistral.services import action_manager as a_m from mistral.services import executions as wf_ex_service from mistral.services import workflows as wf_service from mistral import utils as u -from mistral.utils import wf_trace -from mistral.workbook import parser as spec_parser from mistral.workflow import base as wf_base from mistral.workflow import commands from mistral.workflow import states -from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) @@ -53,240 +46,84 @@ class DefaultEngine(base.Engine, coordination.Service): @u.log_exec(LOG) def start_workflow(self, wf_identifier, wf_input, description='', **params): - wf_ex_id = None - - try: - # Create a persistent workflow execution in a separate transaction - # so that we can return it even in case of unexpected errors that - # lead to transaction rollback. - with db_api.transaction(): - # The new workflow execution will be in an IDLE - # state on initial record creation. - wf_ex_id, wf_spec = wf_ex_service.create_workflow_execution( - wf_identifier, - wf_input, - description, - params - ) - - with db_api.transaction(): - wf_ex = db_api.get_workflow_execution(wf_ex_id) - wf_handler.set_execution_state(wf_ex, states.RUNNING) - - wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - - self._dispatch_workflow_commands( - wf_ex, - wf_ctrl.continue_workflow(), - wf_spec - ) - - return wf_ex.get_clone() - except Exception as e: - LOG.error( - "Failed to start workflow '%s' id=%s: %s\n%s", - wf_identifier, wf_ex_id, e, traceback.format_exc() + with db_api.transaction(): + # TODO(rakhmerov): It needs to be hidden in workflow_handler and + # Workflow abstraction. + # The new workflow execution will be in an IDLE + # state on initial record creation. + wf_ex, wf_spec = wf_ex_service.create_workflow_execution( + wf_identifier, + wf_input, + description, + params ) + wf_handler.set_workflow_state(wf_ex, states.RUNNING) - wf_ex = self._fail_workflow(wf_ex_id, e) + wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - if wf_ex: - return wf_ex.get_clone() + cmds = wf_ctrl.continue_workflow() - raise e + dispatcher.dispatch_workflow_commands(wf_ex, cmds) + + return wf_ex.get_clone() @u.log_exec(LOG) def start_action(self, action_name, action_input, description=None, **params): with db_api.transaction(): - action_def = action_handler.resolve_definition(action_name) - resolved_action_input = action_handler.get_action_input( - action_name, - action_input + action = action_handler.build_action_by_name(action_name) + + action.validate_input(action_input) + + save = params.get('save_result') + target = params.get('target') + + if save or not action.is_sync(action_input): + action.schedule(action_input, target) + + return action.action_ex.get_clone() + + output = action.run(action_input, target, save=save) + + # Action execution is not created but we need to return similar + # object to a client anyway. + return db_models.ActionExecution( + name=action_name, + description=description, + input=action_input, + output=output ) - action = a_m.get_action_class(action_def.name)( - **resolved_action_input - ) - - # If we see action is asynchronous, then we enforce 'save_result'. - if params.get('save_result') or not action.is_sync(): - action_ex = action_handler.create_action_execution( - action_def, - resolved_action_input, - description=description - ) - - action_handler.run_action( - action_def, - resolved_action_input, - action_ex.id, - params.get('target') - ) - - return action_ex.get_clone() - else: - output = action_handler.run_action( - action_def, - resolved_action_input, - target=params.get('target'), - async=False - ) - - return db_models.ActionExecution( - name=action_name, - description=description, - input=action_input, - output=output - ) - - def on_task_state_change(self, task_ex_id, state, state_info=None): - with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex_id) - # TODO(rakhmerov): The method is mostly needed for policy and - # we are supposed to get the same action execution as when the - # policy worked. - - wf_ex_id = task_ex.workflow_execution_id - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - wf_trace.info( - task_ex, - "Task '%s' [%s -> %s] state_info : %s" - % (task_ex.name, task_ex.state, state, state_info) - ) - - task_ex.state = state - task_ex.state_info = state_info - - self._on_task_state_change(task_ex, wf_ex, wf_spec) - - def _on_task_state_change(self, task_ex, wf_ex, wf_spec): - task_spec = wf_spec.get_tasks()[task_ex.name] - - if task_handler.is_task_completed(task_ex, task_spec): - task_handler.after_task_complete(task_ex, task_spec, wf_spec) - - # Ignore DELAYED state. - if task_ex.state == states.RUNNING_DELAYED: - return - - wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - - # Calculate commands to process next. - try: - cmds = wf_ctrl.continue_workflow() - except exc.YaqlEvaluationException as e: - LOG.error( - 'YAQL error occurred while calculating next workflow ' - 'commands [wf_ex_id=%s, task_ex_id=%s]: %s', - wf_ex.id, task_ex.id, e - ) - - wf_handler.fail_workflow(wf_ex, str(e)) - - return - - # Mark task as processed after all decisions have been made - # upon its completion. - task_ex.processed = True - - self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) - - self._check_workflow_completion(wf_ex, wf_ctrl, wf_spec) - elif task_handler.need_to_continue(task_ex, task_spec): - # Re-run existing task. - cmds = [commands.RunExistingTask(task_ex, reset=False)] - - self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) - - @staticmethod - def _check_workflow_completion(wf_ex, wf_ctrl, wf_spec): - if states.is_paused_or_completed(wf_ex.state): - return - - # Workflow is not completed if there are any incomplete task - # executions that are not in WAITING state. If all incomplete - # tasks are waiting and there are unhandled errors, then these - # tasks will not reach completion. In this case, mark the - # workflow complete. - incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex) - - if any(not states.is_waiting(t.state) for t in incomplete_tasks): - return - - if wf_ctrl.all_errors_handled(): - wf_handler.succeed_workflow( - wf_ex, - wf_ctrl.evaluate_workflow_final_context(), - wf_spec - ) - else: - state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex) - - wf_handler.fail_workflow(wf_ex, state_info) @u.log_exec(LOG) def on_action_complete(self, action_ex_id, result): - wf_ex_id = None + with db_api.transaction(): + action_ex = db_api.get_action_execution(action_ex_id) - try: - with db_api.transaction(): - action_ex = db_api.get_action_execution(action_ex_id) + task_ex = action_ex.task_execution - # In case of single action execution there is no - # assigned task execution. - if not action_ex.task_execution: - return action_handler.store_action_result( - action_ex, - result - ).get_clone() - - wf_ex_id = action_ex.task_execution.workflow_execution_id - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - task_ex = task_handler.on_action_complete( - action_ex, - wf_spec, - result + if task_ex: + wf_handler.lock_workflow_execution( + task_ex.workflow_execution_id ) - # If workflow is on pause or completed then there's no - # need to continue workflow. - if states.is_paused_or_completed(wf_ex.state): - return action_ex.get_clone() + action_handler.on_action_complete(action_ex, result) - self._on_task_state_change(task_ex, wf_ex, wf_spec) - - return action_ex.get_clone() - except Exception as e: - # TODO(rakhmerov): Need to refactor logging in a more elegant way. - LOG.error( - 'Failed to handle action execution result [id=%s]: %s\n%s', - action_ex_id, e, traceback.format_exc() - ) - - # If an exception was thrown after we got the wf_ex_id - if wf_ex_id: - self._fail_workflow(wf_ex_id, e) - - raise e + return action_ex.get_clone() @u.log_exec(LOG) def pause_workflow(self, wf_ex_id): with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - wf_handler.set_execution_state(wf_ex, states.PAUSED) + wf_handler.set_workflow_state(wf_ex, states.PAUSED) return wf_ex - def _continue_workflow(self, wf_ex, task_ex=None, reset=True, env=None): + @staticmethod + def _continue_workflow(wf_ex, task_ex=None, reset=True, env=None): wf_ex = wf_service.update_workflow_execution_env(wf_ex, env) - wf_handler.set_execution_state( + wf_handler.set_workflow_state( wf_ex, states.RUNNING, set_upstream=True @@ -294,13 +131,15 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ctrl = wf_base.get_controller(wf_ex) - # TODO(rakhmerov): Add YAQL error handling. + # TODO(rakhmerov): Add error handling. # Calculate commands to process next. cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) # When resuming a workflow we need to ignore all 'pause' # commands because workflow controller takes tasks that # completed within the period when the workflow was paused. + # TODO(rakhmerov): This all should be in workflow handler, it's too + # specific for engine level. cmds = list( filter( lambda c: not isinstance(c, commands.PauseWorkflow), @@ -316,165 +155,50 @@ class DefaultEngine(base.Engine, coordination.Service): if states.is_completed(t_ex.state) and not t_ex.processed: t_ex.processed = True - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) + dispatcher.dispatch_workflow_commands(wf_ex, cmds) if not cmds: - if not wf_utils.find_incomplete_task_executions(wf_ex): - wf_handler.succeed_workflow( - wf_ex, - wf_ctrl.evaluate_workflow_final_context(), - wf_spec - ) + wf_handler.check_workflow_completion(wf_ex) return wf_ex.get_clone() @u.log_exec(LOG) def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None): - try: - with db_api.transaction(): - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) + # TODO(rakhmerov): Rewrite this functionality with Task abstraction. + with db_api.transaction(): + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - task_ex = db_api.get_task_execution(task_ex_id) + task_ex = db_api.get_task_execution(task_ex_id) - if task_ex.workflow_execution.id != wf_ex_id: - raise ValueError('Workflow execution ID does not match.') + if task_ex.workflow_execution.id != wf_ex_id: + raise ValueError('Workflow execution ID does not match.') - if wf_ex.state == states.PAUSED: - return wf_ex.get_clone() + if wf_ex.state == states.PAUSED: + return wf_ex.get_clone() - return self._continue_workflow(wf_ex, task_ex, reset, env=env) - except Exception as e: - LOG.error( - "Failed to rerun execution id=%s at task=%s: %s\n%s", - wf_ex_id, task_ex_id, e, traceback.format_exc() - ) - self._fail_workflow(wf_ex_id, e) - raise e + # TODO(rakhmerov): This should be a call to workflow handler. + return self._continue_workflow(wf_ex, task_ex, reset, env=env) @u.log_exec(LOG) def resume_workflow(self, wf_ex_id, env=None): - try: - with db_api.transaction(): - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) + # TODO(rakhmerov): Rewrite this functionality with Task abstraction. + with db_api.transaction(): + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - if (not states.is_paused(wf_ex.state) and - not states.is_idle(wf_ex.state)): - return wf_ex.get_clone() + if (not states.is_paused(wf_ex.state) and + not states.is_idle(wf_ex.state)): + return wf_ex.get_clone() - return self._continue_workflow(wf_ex, env=env) - except Exception as e: - LOG.error( - "Failed to resume execution id=%s: %s\n%s", - wf_ex_id, e, traceback.format_exc() - ) - self._fail_workflow(wf_ex_id, e) - raise e + return self._continue_workflow(wf_ex, env=env) @u.log_exec(LOG) def stop_workflow(self, wf_ex_id, state, message=None): with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - return self._stop_workflow(wf_ex, state, message) - - @staticmethod - def _stop_workflow(wf_ex, state, message=None): - if state == states.SUCCESS: - wf_ctrl = wf_base.get_controller(wf_ex) - - final_context = {} - - try: - final_context = wf_ctrl.evaluate_workflow_final_context() - except Exception as e: - LOG.warning( - 'Failed to get final context for %s: %s' % (wf_ex, e) - ) - - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - return wf_handler.succeed_workflow( - wf_ex, - final_context, - wf_spec, - message - ) - elif state == states.ERROR: - return wf_handler.fail_workflow(wf_ex, message) - - return wf_ex + return wf_handler.stop_workflow(wf_ex, state, message) @u.log_exec(LOG) def rollback_workflow(self, wf_ex_id): # TODO(rakhmerov): Implement. raise NotImplementedError - - def _dispatch_workflow_commands(self, wf_ex, wf_cmds, wf_spec): - if not wf_cmds: - return - - for cmd in wf_cmds: - if isinstance(cmd, commands.RunTask) and cmd.is_waiting(): - task_handler.defer_task(cmd) - elif isinstance(cmd, commands.RunTask): - task_ex = task_handler.run_new_task(cmd, wf_spec) - - if task_ex.state == states.ERROR: - wf_handler.fail_workflow( - wf_ex, - 'Failed to start task [task_ex=%s]: %s' % - (task_ex, task_ex.state_info) - ) - elif isinstance(cmd, commands.RunExistingTask): - task_ex = task_handler.run_existing_task( - cmd.task_ex.id, - reset=cmd.reset - ) - - if task_ex.state == states.ERROR: - wf_handler.fail_workflow( - wf_ex, - 'Failed to start task [task_ex=%s]: %s' % - (task_ex, task_ex.state_info) - ) - elif isinstance(cmd, commands.SetWorkflowState): - if states.is_completed(cmd.new_state): - self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg) - else: - wf_handler.set_execution_state(wf_ex, cmd.new_state) - elif isinstance(cmd, commands.Noop): - # Do nothing. - pass - else: - raise RuntimeError('Unsupported workflow command: %s' % cmd) - - if wf_ex.state != states.RUNNING: - break - - # TODO(rakhmerov): This method may not be needed at all because error - # handling is now implemented too roughly w/o distinguishing different - # errors. On most errors (like YAQLException) we shouldn't rollback - # transactions, we just need to fail corresponding execution objects - # where a problem happened (action, task or workflow). - @staticmethod - def _fail_workflow(wf_ex_id, exc): - """Private helper to fail workflow on exceptions.""" - - with db_api.transaction(): - wf_ex = db_api.load_workflow_execution(wf_ex_id) - - if wf_ex is None: - LOG.error( - "Can't fail workflow execution with id='%s': not found.", - wf_ex_id - ) - return None - - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - - if not states.is_paused_or_completed(wf_ex.state): - wf_handler.set_execution_state(wf_ex, states.ERROR, str(exc)) - - return wf_ex diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py new file mode 100644 index 00000000..e1723308 --- /dev/null +++ b/mistral/engine/dispatcher.py @@ -0,0 +1,46 @@ +# Copyright 2016 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from mistral import exceptions as exc +from mistral.workflow import commands +from mistral.workflow import states + + +def dispatch_workflow_commands(wf_ex, wf_cmds): + # TODO(rakhmerov): I don't like these imports but otherwise we have + # import cycles. + from mistral.engine import task_handler + from mistral.engine import workflow_handler as wf_handler + + if not wf_cmds: + return + + for cmd in wf_cmds: + if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)): + task_handler.run_task(cmd) + elif isinstance(cmd, commands.SetWorkflowState): + # TODO(rakhmerov): Make just a single call to workflow_handler + if states.is_completed(cmd.new_state): + wf_handler.stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg) + else: + wf_handler.set_workflow_state(wf_ex, cmd.new_state) + elif isinstance(cmd, commands.Noop): + # Do nothing. + pass + else: + raise exc.MistralError('Unsupported workflow command: %s' % cmd) + + if wf_ex.state != states.RUNNING: + break diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index dafa9dac..47c87bc2 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -15,7 +15,6 @@ from mistral.db.v2 import api as db_api from mistral.engine import base -from mistral.engine import rpc from mistral import expressions from mistral.services import scheduler from mistral.utils import wf_trace @@ -24,8 +23,8 @@ from mistral.workflow import states import six -_ENGINE_CLIENT_PATH = 'mistral.engine.rpc.get_engine_client' -_RUN_EXISTING_TASK_PATH = 'mistral.engine.task_handler.run_existing_task' +_CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task' +_COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task' def _log_task_delay(task_ex, delay_sec): @@ -180,7 +179,7 @@ class WaitBeforePolicy(base.TaskPolicy): policy_context = runtime_context[context_key] if policy_context.get('skip'): - # Unset state 'DELAYED'. + # Unset state 'RUNNING_DELAYED'. wf_trace.info( task_ex, "Task '%s' [%s -> %s]" @@ -193,13 +192,16 @@ class WaitBeforePolicy(base.TaskPolicy): if task_ex.state != states.IDLE: policy_context.update({'skip': True}) + _log_task_delay(task_ex, self.delay) task_ex.state = states.RUNNING_DELAYED + # TODO(rakhmerov): This is wrong as task handler doesn't manage + # transactions and hence it can't be called explicitly. scheduler.schedule_call( None, - _RUN_EXISTING_TASK_PATH, + _CONTINUE_TASK_PATH, self.delay, task_ex_id=task_ex.id, ) @@ -228,6 +230,7 @@ class WaitAfterPolicy(base.TaskPolicy): task_ex.runtime_context = runtime_context policy_context = runtime_context[context_key] + if policy_context.get('skip'): # Skip, already processed. return @@ -236,17 +239,25 @@ class WaitAfterPolicy(base.TaskPolicy): _log_task_delay(task_ex, self.delay) - state = task_ex.state + end_state = task_ex.state + end_state_info = task_ex.state_info + + # TODO(rakhmerov): Policies probably needs to have tasks.Task + # interface in order to change manage task state safely. # Set task state to 'DELAYED'. task_ex.state = states.RUNNING_DELAYED + task_ex.state_info = ( + 'Suspended by wait-after policy for %s seconds' % self.delay + ) # Schedule to change task state to RUNNING again. scheduler.schedule_call( - _ENGINE_CLIENT_PATH, - 'on_task_state_change', + None, + _COMPLETE_TASK_PATH, self.delay, - state=state, task_ex_id=task_ex.id, + state=end_state, + state_info=end_state_info ) @@ -339,7 +350,7 @@ class RetryPolicy(base.TaskPolicy): scheduler.schedule_call( None, - _RUN_EXISTING_TASK_PATH, + _CONTINUE_TASK_PATH, self.delay, task_ex_id=task_ex.id, ) @@ -360,7 +371,7 @@ class TimeoutPolicy(base.TaskPolicy): scheduler.schedule_call( None, - 'mistral.engine.policies.fail_task_if_incomplete', + 'mistral.engine.policies._fail_task_if_incomplete', self.delay, task_ex_id=task_ex.id, timeout=self.delay @@ -424,21 +435,35 @@ class ConcurrencyPolicy(base.TaskPolicy): task_ex.runtime_context = runtime_context -def fail_task_if_incomplete(task_ex_id, timeout): +def _continue_task(task_ex_id): + from mistral.engine import task_handler + + # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. + task_handler.continue_task(db_api.get_task_execution(task_ex_id)) + + +def _complete_task(task_ex_id, state, state_info): + from mistral.engine import task_handler + + # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. + task_handler.complete_task( + db_api.get_task_execution(task_ex_id), + state, + state_info + ) + + +def _fail_task_if_incomplete(task_ex_id, timeout): + from mistral.engine import task_handler + + # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. task_ex = db_api.get_task_execution(task_ex_id) if not states.is_completed(task_ex.state): - msg = "Task timed out [id=%s, timeout(s)=%s]." % (task_ex_id, timeout) + msg = 'Task timed out [timeout(s)=%s].' % timeout - wf_trace.info(task_ex, msg) - - wf_trace.info( - task_ex, - "Task '%s' [%s -> ERROR]" % (task_ex.name, task_ex.state) - ) - - rpc.get_engine_client().on_task_state_change( - task_ex_id, + task_handler.complete_task( + db_api.get_task_execution(task_ex_id), states.ERROR, msg ) diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index 913d580f..eea140cd 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -503,11 +503,12 @@ class ExecutorClient(base.Executor): :param transport: Messaging transport. :type transport: Transport. """ + self.topic = cfg.CONF.executor.topic + serializer = auth_ctx.RpcContextSerializer( auth_ctx.JsonPayloadSerializer() ) - self.topic = cfg.CONF.executor.topic self._client = messaging.RPCClient( transport, messaging.Target(), @@ -539,8 +540,15 @@ class ExecutorClient(base.Executor): rpc_client_method = call_ctx.cast if async else call_ctx.call - return rpc_client_method( - auth_ctx.ctx(), - 'run_action', - **kwargs + res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs) + + # TODO(rakhmerov): It doesn't seem a good approach since we have + # a serializer for Result class. A better solution would be to + # use a composite serializer that dispatches serialization and + # deserialization to concrete serializers depending on object + # type. + + return ( + wf_utils.Result(data=res['data'], error=res['error']) + if res else None ) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 996f9f75..c566dd85 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -1,5 +1,6 @@ # Copyright 2015 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. +# Copyright 2016 - Nokia Networks. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,28 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import operator - from oslo_log import log as logging +import traceback as tb -from mistral.db.v2 import api as db_api -from mistral.db.v2.sqlalchemy import models -from mistral.engine import action_handler -from mistral.engine import policies -from mistral.engine import rpc -from mistral.engine import utils as e_utils +from mistral.engine import tasks +from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc -from mistral import expressions as expr -from mistral.services import executions as wf_ex_service -from mistral.services import scheduler -from mistral import utils -from mistral.utils import wf_trace from mistral.workbook import parser as spec_parser -from mistral.workflow import data_flow +from mistral.workflow import commands as wf_cmds from mistral.workflow import states -from mistral.workflow import utils as wf_utils -from mistral.workflow import with_items """Responsible for running tasks and handling results.""" @@ -42,548 +30,145 @@ from mistral.workflow import with_items LOG = logging.getLogger(__name__) -def run_existing_task(task_ex_id, reset=True): - """This function runs existing task execution. +def run_task(wf_cmd): + """Runs workflow task. - It is needed mostly by scheduler. - - :param task_ex_id: Task execution id. - :param reset: Reset action executions for the task. + :param wf_cmd: Workflow command. """ - task_ex = db_api.get_task_execution(task_ex_id) - task_spec = spec_parser.get_task_spec(task_ex.spec) - wf_def = db_api.get_workflow_definition(task_ex.workflow_name) - wf_spec = spec_parser.get_workflow_spec(wf_def.spec) - # Throw exception if the existing task already succeeded. - if task_ex.state == states.SUCCESS: - raise exc.EngineException( - 'Rerunning existing task that already succeeded is not supported.' - ) + task = _build_task_from_command(wf_cmd) - # Exit if the existing task failed and reset is not instructed. - # For a with-items task without reset, re-running the existing - # task will re-run the failed and unstarted items. - if (task_ex.state == states.ERROR and not reset and - not task_spec.get_with_items()): - return task_ex - - # Reset nested executions only if task is not already RUNNING. - if task_ex.state != states.RUNNING: - # Reset state of processed task and related action executions. - if reset: - action_exs = task_ex.executions - else: - action_exs = db_api.get_action_executions( - task_execution_id=task_ex.id, - state=states.ERROR, - accepted=True - ) - - for action_ex in action_exs: - action_ex.accepted = False - - # Explicitly change task state to RUNNING. - set_task_state(task_ex, states.RUNNING, None, processed=False) - - _run_existing_task(task_ex, task_spec, wf_spec) - - return task_ex - - -def _run_existing_task(task_ex, task_spec, wf_spec): try: - input_dicts = _get_input_dictionaries( - wf_spec, - task_ex, - task_spec, - task_ex.in_context - ) + task.run() except exc.MistralException as e: - LOG.error( - 'An error while calculating task action inputs' - ' [task_execution_id=%s]: %s', - task_ex.id, e + wf_ex = wf_cmd.wf_ex + task_spec = wf_cmd.task_spec + + msg = ( + "Failed to run task [wf=%s, task=%s]: %s\n%s" % + (wf_ex, task_spec.get_name(), e, tb.format_exc()) ) - set_task_state(task_ex, states.ERROR, str(e)) + LOG.error(msg) + + task.set_state(states.ERROR, msg) + + wf_handler.fail_workflow(wf_ex, msg) return - # In some cases we can have no input, e.g. in case of 'with-items'. - if input_dicts: - for index, input_d in input_dicts: - _run_action_or_workflow( - task_ex, - task_spec, - input_d, - index, - wf_spec - ) - else: - _schedule_noop_action(task_ex, task_spec, wf_spec) + if task.is_completed(): + wf_handler.check_workflow_completion(wf_cmd.wf_ex) -def defer_task(wf_cmd): - """Defers a task""" - ctx = wf_cmd.ctx - wf_ex = wf_cmd.wf_ex - task_spec = wf_cmd.task_spec +def on_action_complete(action_ex): + """Handles action completion event. - if wf_utils.find_task_executions_by_spec(wf_ex, task_spec): - return None - - return _create_task_execution( - wf_ex, - task_spec, - ctx, - state=states.WAITING - ) - - -def run_new_task(wf_cmd, wf_spec): - """Runs a task.""" - ctx = wf_cmd.ctx - wf_ex = wf_cmd.wf_ex - task_spec = wf_cmd.task_spec - - # NOTE(xylan): Need to think how to get rid of this weird judgment to keep - # it more consistent with the function name. - task_ex = wf_utils.find_task_execution_with_state( - wf_ex, - task_spec, - states.WAITING - ) - - if task_ex: - set_task_state(task_ex, states.RUNNING, None) - task_ex.in_context = ctx - else: - task_ex = _create_task_execution(wf_ex, task_spec, ctx) - - LOG.debug( - 'Starting workflow task [workflow=%s, task_spec=%s, init_state=%s]' % - (wf_ex.name, task_spec, task_ex.state) - ) - - # TODO(rakhmerov): 'concurrency' policy should keep a number of running - # actions/workflows under control so it can't be implemented if it runs - # before any action executions are created. - before_task_start(task_ex, task_spec, wf_spec) - - # Policies could possibly change task state. - if task_ex.state != states.RUNNING: - return task_ex - - _run_existing_task(task_ex, task_spec, wf_spec) - - return task_ex - - -def on_action_complete(action_ex, wf_spec, result): - """Handles event of action result arrival. - - Given action result this method changes corresponding task execution - object. This method must never be called for the case of individual - action which is not associated with any tasks. - - :param action_ex: Action execution objects the result belongs to. - :param wf_spec: Workflow specification. - :param result: Task action/workflow output wrapped into - mistral.workflow.utils.Result instance. - :return Task execution object. + :param action_ex: Action execution. """ task_ex = action_ex.task_execution - # Ignore if action already completed. - if (states.is_completed(action_ex.state) and not - isinstance(action_ex, models.WorkflowExecution)): - return task_ex + if not task_ex: + return - task_spec = wf_spec.get_tasks()[task_ex.name] + task_spec = spec_parser.get_task_spec(task_ex.spec) - try: - result = action_handler.transform_result(result, task_ex, task_spec) - except exc.YaqlEvaluationException as e: - err_msg = str(e) - - LOG.error( - 'YAQL error while transforming action result' - ' [action_execution_id=%s, result=%s]: %s', - action_ex.id, result, err_msg - ) - - result = wf_utils.Result(error=err_msg) - - # Ignore workflow executions because they're handled during - # workflow completion. - if not isinstance(action_ex, models.WorkflowExecution): - action_handler.store_action_result(action_ex, result) - - if result.is_success(): - task_state = states.SUCCESS - task_state_info = None - else: - task_state = states.ERROR - task_state_info = result.error - - if not task_spec.get_with_items(): - _complete_task(task_ex, task_spec, task_state, task_state_info) - else: - with_items.increase_capacity(task_ex) - - if with_items.is_completed(task_ex): - _complete_task( - task_ex, - task_spec, - with_items.get_final_state(task_ex), - task_state_info - ) - - return task_ex - - -def _create_task_execution(wf_ex, task_spec, ctx, state=states.RUNNING): - task_ex = db_api.create_task_execution({ - 'name': task_spec.get_name(), - 'workflow_execution_id': wf_ex.id, - 'workflow_name': wf_ex.workflow_name, - 'workflow_id': wf_ex.workflow_id, - 'state': state, - 'spec': task_spec.to_dict(), - 'in_context': ctx, - 'published': {}, - 'runtime_context': {}, - 'project_id': wf_ex.project_id - }) - - # Add to collection explicitly so that it's in a proper - # state within the current session. - wf_ex.task_executions.append(task_ex) - - return task_ex - - -def before_task_start(task_ex, task_spec, wf_spec): - for p in policies.build_policies(task_spec.get_policies(), wf_spec): - p.before_task_start(task_ex, task_spec) - - -def after_task_complete(task_ex, task_spec, wf_spec): - for p in policies.build_policies(task_spec.get_policies(), wf_spec): - p.after_task_complete(task_ex, task_spec) - - -def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx): - """Calculates a collection of inputs for task action/workflow. - - If the given task is not configured as 'with-items' then return list - will consist of one dictionary containing input that task action/workflow - should run with. - In case of 'with-items' the result list will contain input dictionaries - for all 'with-items' iterations correspondingly. - - :return the list of tuples containing indexes - and the corresponding input dict. - """ - - if not task_spec.get_with_items(): - input_dict = _get_workflow_or_action_input( - wf_spec, - task_ex, - task_spec, - ctx - ) - - return enumerate([input_dict]) - else: - return _get_with_items_input(wf_spec, task_ex, task_spec, ctx) - - -def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx): - if task_spec.get_action_name(): - return _get_action_input( - wf_spec, - task_ex, - task_spec, - ctx - ) - elif task_spec.get_workflow_name(): - return _get_workflow_input(task_spec, ctx) - else: - raise RuntimeError('Must never happen.') - - -def _get_with_items_input(wf_spec, task_ex, task_spec, ctx): - """Calculate input array for separating each action input. - - Example: - DSL: - with_items: - - itemX in <% $.arrayI %> - - itemY in <% $.arrayJ %> - - Assume arrayI = [1, 2], arrayJ = ['a', 'b']. - with_items_input = { - "itemX": [1, 2], - "itemY": ['a', 'b'] - } - - Then we get separated input: - inputs_per_item = [ - {'itemX': 1, 'itemY': 'a'}, - {'itemX': 2, 'itemY': 'b'} - ] - - :return: the list of tuples containing indexes - and the corresponding input dict. - """ - with_items_inputs = expr.evaluate_recursively( - task_spec.get_with_items(), ctx - ) - - with_items.validate_input(with_items_inputs) - - inputs_per_item = [] - - for key, value in with_items_inputs.items(): - for index, item in enumerate(value): - iter_context = {key: item} - - if index >= len(inputs_per_item): - inputs_per_item.append(iter_context) - else: - inputs_per_item[index].update(iter_context) - - action_inputs = [] - - for item_input in inputs_per_item: - new_ctx = utils.merge_dicts(item_input, ctx) - - action_inputs.append(_get_workflow_or_action_input( - wf_spec, task_ex, task_spec, new_ctx - )) - - with_items.prepare_runtime_context(task_ex, task_spec, action_inputs) - - indices = with_items.get_indices_for_loop(task_ex) - with_items.decrease_capacity(task_ex, len(indices)) - - if indices: - current_inputs = operator.itemgetter(*indices)(action_inputs) - - return zip( - indices, - current_inputs if isinstance(current_inputs, tuple) - else [current_inputs] - ) - - return [] - - -def _get_action_input(wf_spec, task_ex, task_spec, ctx): - input_dict = expr.evaluate_recursively(task_spec.get_input(), ctx) - - action_spec_name = task_spec.get_action_name() - - input_dict = utils.merge_dicts( - input_dict, - _get_action_defaults(task_ex, task_spec), - overwrite=False - ) - - return action_handler.get_action_input( - action_spec_name, - input_dict, - task_ex.workflow_name, - wf_spec - ) - - -def _get_workflow_input(task_spec, ctx): - return expr.evaluate_recursively(task_spec.get_input(), ctx) - - -def _run_action_or_workflow(task_ex, task_spec, input_dict, index, wf_spec): - t_name = task_ex.name - - if task_spec.get_action_name(): - wf_trace.info( - task_ex, - "Task '%s' is RUNNING [action_name = %s]" % - (t_name, task_spec.get_action_name()) - ) - - _schedule_run_action(task_ex, task_spec, input_dict, index, wf_spec) - elif task_spec.get_workflow_name(): - wf_trace.info( - task_ex, - "Task '%s' is RUNNING [workflow_name = %s]" % - (t_name, task_spec.get_workflow_name())) - - _schedule_run_workflow(task_ex, task_spec, input_dict, index, wf_spec) - - -def _get_action_defaults(task_ex, task_spec): - actions = task_ex.in_context.get('__env', {}).get('__actions', {}) - - return actions.get(task_spec.get_action_name(), {}) - - -def _schedule_run_action(task_ex, task_spec, action_input, index, wf_spec): - action_spec_name = task_spec.get_action_name() - - action_def = action_handler.resolve_definition( - action_spec_name, - task_ex, - wf_spec - ) - - action_ex = action_handler.create_action_execution( - action_def, - action_input, - task_ex, - index - ) - - target = expr.evaluate_recursively( - task_spec.get_target(), - utils.merge_dicts( - copy.deepcopy(action_input), - copy.deepcopy(task_ex.in_context) - ) - ) - - scheduler.schedule_call( - None, - 'mistral.engine.action_handler.run_existing_action', - 0, - action_ex_id=action_ex.id, - target=target - ) - - -def _schedule_noop_action(task_ex, task_spec, wf_spec): wf_ex = task_ex.workflow_execution - action_def = action_handler.resolve_action_definition( - 'std.noop', - wf_ex.workflow_name, - wf_spec.get_name() + task = _create_task( + wf_ex, + task_spec, + task_ex.in_context, + task_ex ) - action_ex = action_handler.create_action_execution(action_def, {}, task_ex) - - target = expr.evaluate_recursively( - task_spec.get_target(), - task_ex.in_context - ) - - scheduler.schedule_call( - None, - 'mistral.engine.action_handler.run_existing_action', - 0, - action_ex_id=action_ex.id, - target=target - ) - - -def _schedule_run_workflow(task_ex, task_spec, wf_input, index, - parent_wf_spec): - parent_wf_ex = task_ex.workflow_execution - - wf_spec_name = task_spec.get_workflow_name() - - wf_def = e_utils.resolve_workflow_definition( - parent_wf_ex.workflow_name, - parent_wf_spec.get_name(), - wf_spec_name - ) - - wf_spec = spec_parser.get_workflow_spec(wf_def.spec) - - wf_params = { - 'task_execution_id': task_ex.id, - 'with_items_index': index - } - - if 'env' in parent_wf_ex.params: - wf_params['env'] = parent_wf_ex.params['env'] - - for k, v in list(wf_input.items()): - if k not in wf_spec.get_input(): - wf_params[k] = v - del wf_input[k] - - wf_ex_id, _ = wf_ex_service.create_workflow_execution( - wf_def.name, - wf_input, - "sub-workflow execution", - wf_params, - wf_spec - ) - - scheduler.schedule_call( - None, - 'mistral.engine.task_handler.resume_workflow', - 0, - wf_ex_id=wf_ex_id, - env=None - ) - - -def resume_workflow(wf_ex_id, env): - rpc.get_engine_client().resume_workflow(wf_ex_id, env=env) - - -def _complete_task(task_ex, task_spec, state, state_info=None): - # Ignore if task already completed. - if states.is_completed(task_ex.state): - return [] - - set_task_state(task_ex, state, state_info) - try: - data_flow.publish_variables(task_ex, task_spec) + task.on_action_complete(action_ex) except exc.MistralException as e: - LOG.error( - 'An error while publishing task variables' - ' [task_execution_id=%s]: %s', - task_ex.id, str(e) - ) + task_ex = action_ex.task_execution + wf_ex = task_ex.workflow_execution - set_task_state(task_ex, states.ERROR, str(e)) + msg = ("Failed to handle action completion [wf=%s, task=%s," + " action=%s]: %s\n%s" % + (wf_ex.name, task_ex.name, action_ex.name, e, tb.format_exc())) - if not task_spec.get_keep_result(): - data_flow.destroy_task_result(task_ex) + LOG.error(msg) + + task.set_state(states.ERROR, msg) + + wf_handler.fail_workflow(wf_ex, msg) + + return + + if task.is_completed(): + wf_handler.check_workflow_completion(wf_ex) -def set_task_state(task_ex, state, state_info, processed=None): - wf_trace.info( +def fail_task(task_ex, msg): + task = _build_task_from_execution(task_ex) + + task.set_state(states.ERROR, msg) + + wf_handler.fail_workflow(task_ex.workflow_execution, msg) + + +def continue_task(task_ex): + task = _build_task_from_execution(task_ex) + + # TODO(rakhmerov): Error handling. + task.run() + + if task.is_completed(): + wf_handler.check_workflow_completion(task_ex.workflow_execution) + + +def complete_task(task_ex, state, state_info): + task = _build_task_from_execution(task_ex) + + # TODO(rakhmerov): Error handling. + task.complete(state, state_info) + + if task.is_completed(): + wf_handler.check_workflow_completion(task_ex.workflow_execution) + + +def _build_task_from_execution(task_ex, task_spec=None): + return _create_task( task_ex.workflow_execution, - "Task execution '%s' [%s -> %s]" % - (task_ex.name, task_ex.state, state) + task_spec or spec_parser.get_task_spec(task_ex.spec), + task_ex.in_context, + task_ex ) - task_ex.state = state - task_ex.state_info = state_info - if processed is not None: - task_ex.processed = processed +def _build_task_from_command(cmd): + if isinstance(cmd, wf_cmds.RunExistingTask): + task = _create_task( + cmd.wf_ex, + spec_parser.get_task_spec(cmd.task_ex.spec), + cmd.ctx, + cmd.task_ex + ) + + if cmd.reset: + task.reset() + + return task + + if isinstance(cmd, wf_cmds.RunTask): + task = _create_task(cmd.wf_ex, cmd.task_spec, cmd.ctx) + + if cmd.is_waiting(): + task.defer() + + return task + + raise exc.MistralError('Unsupported workflow command: %s' % cmd) -def is_task_completed(task_ex, task_spec): +def _create_task(wf_ex, task_spec, ctx, task_ex=None): if task_spec.get_with_items(): - return with_items.is_completed(task_ex) + return tasks.WithItemsTask(wf_ex, task_spec, ctx, task_ex) - return states.is_completed(task_ex.state) - - -def need_to_continue(task_ex, task_spec): - # For now continue is available only for with-items. - if task_spec.get_with_items(): - return (with_items.has_more_iterations(task_ex) - and with_items.get_concurrency(task_ex)) - - return False + return tasks.RegularTask(wf_ex, task_spec, ctx, task_ex) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py new file mode 100644 index 00000000..0197fada --- /dev/null +++ b/mistral/engine/tasks.py @@ -0,0 +1,456 @@ +# Copyright 2016 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import copy +import operator +from oslo_log import log as logging +import six + +from mistral.db.v2 import api as db_api +from mistral.engine import actions +from mistral.engine import dispatcher +from mistral.engine import policies +from mistral import exceptions as exc +from mistral import expressions as expr +from mistral import utils +from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import base as wf_base +from mistral.workflow import data_flow +from mistral.workflow import states +from mistral.workflow import utils as wf_utils +from mistral.workflow import with_items + + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class Task(object): + """Task. + + Represents a workflow task and defines interface that can be used by + Mistral engine or its components in order to manipulate with tasks. + """ + + def __init__(self, wf_ex, task_spec, ctx, task_ex=None): + self.wf_ex = wf_ex + self.task_spec = task_spec + self.ctx = ctx + self.task_ex = task_ex + self.wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + self.waiting = False + self.reset_flag = False + + @abc.abstractmethod + def on_action_complete(self, action_ex): + """Handle action completion. + + :param action_ex: Action execution. + """ + raise NotImplementedError + + @abc.abstractmethod + def run(self): + """Runs task.""" + raise NotImplementedError + + def defer(self): + """Defers task. + + This methods finds task execution or creates new and puts task + to a waiting state. + """ + + if not self.task_ex: + self.task_ex = wf_utils.find_task_executions_by_spec( + self.wf_ex, + self.task_spec + ) + + if not self.task_ex: + self._create_task_execution() + + self.set_state(states.WAITING, 'Task execution is deferred.') + + self.waiting = True + + def reset(self): + self.reset_flag = True + + def set_state(self, state, state_info, processed=None): + """Sets task state without executing post completion logic. + + :param state: New task state. + :param state_info: New state information (i.e. error message). + :param processed: New "processed" flag value. + """ + + wf_trace.info( + self.task_ex.workflow_execution, + "Task execution '%s' [%s -> %s]: %s" % + (self.task_ex.id, self.task_ex.state, state, state_info) + ) + + self.task_ex.state = state + self.task_ex.state_info = state_info + + if processed is not None: + self.task_ex.processed = processed + + def complete(self, state, state_info=None): + """Complete task and set specified state. + + Method sets specified task state and runs all necessary post + completion logic such as publishing workflow variables and + scheduling new workflow commands. + + :param state: New task state. + :param state_info: New state information (i.e. error message). + """ + + # Ignore if task already completed. + if states.is_completed(self.task_ex.state): + return + + self.set_state(state, state_info) + + data_flow.publish_variables(self.task_ex, self.task_spec) + + if not self.task_spec.get_keep_result(): + # Destroy task result. + for ex in self.task_ex.executions: + if hasattr(ex, 'output'): + ex.output = {} + + self._after_task_complete() + + # Ignore DELAYED state. + if self.task_ex.state == states.RUNNING_DELAYED: + return + + # If workflow is paused we shouldn't schedule new commands + # and mark task as processed. + if states.is_paused(self.wf_ex.state): + return + + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) + + # Calculate commands to process next. + cmds = wf_ctrl.continue_workflow() + + # Mark task as processed after all decisions have been made + # upon its completion. + self.task_ex.processed = True + + dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) + + def _before_task_start(self): + policies_spec = self.task_spec.get_policies() + + for p in policies.build_policies(policies_spec, self.wf_spec): + p.before_task_start(self.task_ex, self.task_spec) + + def _after_task_complete(self): + policies_spec = self.task_spec.get_policies() + + for p in policies.build_policies(policies_spec, self.wf_spec): + p.after_task_complete(self.task_ex, self.task_spec) + + def _create_task_execution(self, state=states.RUNNING): + self.task_ex = db_api.create_task_execution({ + 'name': self.task_spec.get_name(), + 'workflow_execution_id': self.wf_ex.id, + 'workflow_name': self.wf_ex.workflow_name, + 'workflow_id': self.wf_ex.workflow_id, + 'state': state, + 'spec': self.task_spec.to_dict(), + 'in_context': self.ctx, + 'published': {}, + 'runtime_context': {}, + 'project_id': self.wf_ex.project_id + }) + + # Add to collection explicitly so that it's in a proper + # state within the current session. + self.wf_ex.task_executions.append(self.task_ex) + + def _get_action_defaults(self): + action_name = self.task_spec.get_action_name() + + if not action_name: + return {} + + env = self.task_ex.in_context.get('__env', {}) + + return env.get('__actions', {}).get(action_name, {}) + + +class RegularTask(Task): + """Regular task. + + Takes care of processing regular tasks with one action. + """ + + def on_action_complete(self, action_ex): + state = action_ex.state + # TODO(rakhmerov): Here we can define more informative messages + # cases when action is successful and when it's not. For example, + # in state_info we can specify the cause action. + state_info = (None if state == states.SUCCESS + else action_ex.output.get('result')) + + self.complete(state, state_info) + + def is_completed(self): + return self.task_ex and states.is_completed(self.task_ex.state) + + def run(self): + if not self.task_ex: + self._run_new() + else: + self._run_existing() + + def _run_new(self): + # NOTE(xylan): Need to think how to get rid of this weird judgment + # to keep it more consistent with the function name. + self.task_ex = wf_utils.find_task_execution_with_state( + self.wf_ex, + self.task_spec, + states.WAITING + ) + + if self.task_ex: + self.set_state(states.RUNNING, None) + + self.task_ex.in_context = self.ctx + else: + self._create_task_execution() + + LOG.debug( + 'Starting task [workflow=%s, task_spec=%s, init_state=%s]' % + (self.wf_ex.name, self.task_spec, self.task_ex.state) + ) + + self._before_task_start() + + # Policies could possibly change task state. + if self.task_ex.state != states.RUNNING: + return + + self._schedule_actions() + + def _run_existing(self): + if self.waiting: + return + + # Explicitly change task state to RUNNING. + # Throw exception if the existing task already succeeded. + if self.task_ex.state == states.SUCCESS: + raise exc.MistralError( + 'Rerunning succeeded tasks is not supported.' + ) + + self.set_state(states.RUNNING, None, processed=False) + + self._reset_actions() + + self._schedule_actions() + + def _reset_actions(self): + """Resets task state. + + Depending on task type this method may reset task state. For example, + delete all task actions etc. + """ + + # Reset state of processed task and related action executions. + if self.reset_flag: + action_exs = self.task_ex.executions + else: + action_exs = db_api.get_action_executions( + task_execution_id=self.task_ex.id, + state=states.ERROR, + accepted=True + ) + + for action_ex in action_exs: + action_ex.accepted = False + + def _schedule_actions(self): + # Regular task schedules just one action. + input_dict = self._get_action_input() + target = self._get_target(input_dict) + + action = self._build_action() + + action.validate_input(input_dict) + + action.schedule(input_dict, target) + + def _get_target(self, input_dict): + return expr.evaluate_recursively( + self.task_spec.get_target(), + utils.merge_dicts( + copy.deepcopy(input_dict), + copy.deepcopy(self.ctx) + ) + ) + + def _get_action_input(self, ctx=None): + ctx = ctx or self.ctx + + input_dict = expr.evaluate_recursively(self.task_spec.get_input(), ctx) + + return utils.merge_dicts( + input_dict, + self._get_action_defaults(), + overwrite=False + ) + + def _build_action(self): + action_name = self.task_spec.get_action_name() + wf_name = self.task_spec.get_workflow_name() + + if wf_name: + return actions.WorkflowAction(wf_name, task_ex=self.task_ex) + + if not action_name: + action_name = 'std.noop' + + action_def = actions.resolve_action_definition( + action_name, + self.wf_ex.name, + self.wf_spec.get_name() + ) + + if action_def.spec: + return actions.AdHocAction(action_def, task_ex=self.task_ex) + + return actions.PythonAction(action_def, task_ex=self.task_ex) + + +class WithItemsTask(RegularTask): + """With-items task. + + Takes care of processing "with-items" tasks. + """ + + def on_action_complete(self, action_ex): + state = action_ex.state + # TODO(rakhmerov): Here we can define more informative messages + # cases when action is successful and when it's not. For example, + # in state_info we can specify the cause action. + state_info = (None if state == states.SUCCESS + else action_ex.output.get('result')) + + with_items.increase_capacity(self.task_ex) + + if with_items.is_completed(self.task_ex): + self.complete( + with_items.get_final_state(self.task_ex), + state_info + ) + + return + + if (with_items.has_more_iterations(self.task_ex) + and with_items.get_concurrency(self.task_ex)): + self._schedule_actions() + + def _schedule_actions(self): + input_dicts = self._get_with_items_input() + + if not input_dicts: + self.complete(states.SUCCESS) + + return + + for idx, input_dict in input_dicts: + target = self._get_target(input_dict) + + action = self._build_action() + + action.schedule(input_dict, target, index=idx) + + def _get_with_items_input(self): + """Calculate input array for separating each action input. + + Example: + DSL: + with_items: + - itemX in <% $.arrayI %> + - itemY in <% $.arrayJ %> + + Assume arrayI = [1, 2], arrayJ = ['a', 'b']. + with_items_input = { + "itemX": [1, 2], + "itemY": ['a', 'b'] + } + + Then we get separated input: + inputs_per_item = [ + {'itemX': 1, 'itemY': 'a'}, + {'itemX': 2, 'itemY': 'b'} + ] + + :return: the list of tuples containing indexes + and the corresponding input dict. + """ + with_items_inputs = expr.evaluate_recursively( + self.task_spec.get_with_items(), + self.ctx + ) + + with_items.validate_input(with_items_inputs) + + inputs_per_item = [] + + for key, value in with_items_inputs.items(): + for index, item in enumerate(value): + iter_context = {key: item} + + if index >= len(inputs_per_item): + inputs_per_item.append(iter_context) + else: + inputs_per_item[index].update(iter_context) + + action_inputs = [] + + for item_input in inputs_per_item: + new_ctx = utils.merge_dicts(item_input, self.ctx) + + action_inputs.append(self._get_action_input(new_ctx)) + + with_items.prepare_runtime_context( + self.task_ex, + self.task_spec, + action_inputs + ) + + indices = with_items.get_indices_for_loop(self.task_ex) + + with_items.decrease_capacity(self.task_ex, len(indices)) + + if indices: + current_inputs = operator.itemgetter(*indices)(action_inputs) + + return zip( + indices, + current_inputs if isinstance(current_inputs, tuple) + else [current_inputs] + ) + + return [] diff --git a/mistral/engine/utils.py b/mistral/engine/utils.py index 8d910c78..556e3889 100644 --- a/mistral/engine/utils.py +++ b/mistral/engine/utils.py @@ -21,6 +21,9 @@ from mistral import exceptions as exc from mistral import utils +# TODO(rakhmerov): This method is too abstract, validation rules may vary +# depending on object type (action, wf), it's not clear what it can be +# applied to. def validate_input(definition, input, spec=None): input_param_names = copy.deepcopy(list((input or {}).keys())) missing_param_names = [] diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 2255677b..41c0f6f3 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -1,4 +1,4 @@ -# Copyright 2015 - Mirantis, Inc. +# Copyright 2016 - Nokia Networks. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,18 +12,89 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_config import cfg +from oslo_log import log as logging + from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import rpc -from mistral.engine import task_handler from mistral import exceptions as exc from mistral.services import scheduler +from mistral import utils from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import base as wf_base from mistral.workflow import data_flow from mistral.workflow import states from mistral.workflow import utils as wf_utils +LOG = logging.getLogger(__name__) + + +def on_task_complete(task_ex): + wf_ex = task_ex.workflow_execution + + check_workflow_completion(wf_ex) + + +def check_workflow_completion(wf_ex): + if states.is_paused_or_completed(wf_ex.state): + return + + # Workflow is not completed if there are any incomplete task + # executions that are not in WAITING state. If all incomplete + # tasks are waiting and there are no unhandled errors, then these + # tasks will not reach completion. In this case, mark the + # workflow complete. + incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex) + + if any(not states.is_waiting(t.state) for t in incomplete_tasks): + return + + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + + wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) + + if wf_ctrl.all_errors_handled(): + succeed_workflow( + wf_ex, + wf_ctrl.evaluate_workflow_final_context(), + wf_spec + ) + else: + state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex) + + fail_workflow(wf_ex, state_info) + + +def stop_workflow(wf_ex, state, message=None): + if state == states.SUCCESS: + wf_ctrl = wf_base.get_controller(wf_ex) + + final_context = {} + + try: + final_context = wf_ctrl.evaluate_workflow_final_context() + except Exception as e: + LOG.warning( + 'Failed to get final context for %s: %s' % (wf_ex, e) + ) + + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + + return succeed_workflow( + wf_ex, + final_context, + wf_spec, + message + ) + elif state == states.ERROR: + return fail_workflow(wf_ex, message) + + return wf_ex + + def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None): # Fail workflow if output is not successfully evaluated. try: @@ -35,7 +106,7 @@ def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None): return fail_workflow(wf_ex, e.message) # Set workflow execution to success until after output is evaluated. - set_execution_state(wf_ex, states.SUCCESS, state_info) + set_workflow_state(wf_ex, states.SUCCESS, state_info) if wf_ex.task_execution_id: _schedule_send_result_to_parent_workflow(wf_ex) @@ -47,7 +118,16 @@ def fail_workflow(wf_ex, state_info): if states.is_paused_or_completed(wf_ex.state): return wf_ex - set_execution_state(wf_ex, states.ERROR, state_info) + set_workflow_state(wf_ex, states.ERROR, state_info) + + # When we set an ERROR state we should safely set output value getting + # w/o exceptions due to field size limitations. + state_info = utils.cut_by_kb( + state_info, + cfg.CONF.engine.execution_field_size_limit_kb + ) + + wf_ex.output = {'result': state_info} if wf_ex.task_execution_id: _schedule_send_result_to_parent_workflow(wf_ex) @@ -84,7 +164,9 @@ def send_result_to_parent_workflow(wf_ex_id): ) -def set_execution_state(wf_ex, state, state_info=None, set_upstream=False): +# TODO(rakhmerov): Should not be public, should be encapsulated inside Workflow +# abstraction. +def set_workflow_state(wf_ex, state, state_info=None, set_upstream=False): cur_state = wf_ex.state if states.is_valid_transition(cur_state, state): @@ -109,24 +191,28 @@ def set_execution_state(wf_ex, state, state_info=None, set_upstream=False): # If specified, then recursively set the state of the parent workflow # executions to the same state. Only changing state to RUNNING is # supported. + # TODO(rakhmerov): I don't like this hardcoded special case. It's + # used only to continue the workflow (rerun) but at the first glance + # seems like a generic behavior. Need to handle it differently. if set_upstream and state == states.RUNNING and wf_ex.task_execution_id: task_ex = db_api.get_task_execution(wf_ex.task_execution_id) parent_wf_ex = lock_workflow_execution(task_ex.workflow_execution_id) - set_execution_state( + set_workflow_state( parent_wf_ex, state, state_info=state_info, set_upstream=set_upstream ) - task_handler.set_task_state( - task_ex, - state, - state_info=None, - processed=False - ) + # TODO(rakhmerov): How do we need to set task state properly? + # It doesn't seem right to intervene into the parent workflow + # internals. We just need to communicate changes back to parent + # worklfow and it should do what's needed itself. + task_ex.state = state + task_ex.state_info = None + task_ex.processed = False def lock_workflow_execution(wf_ex_id): diff --git a/mistral/services/executions.py b/mistral/services/executions.py index 89aae84b..20d849ec 100644 --- a/mistral/services/executions.py +++ b/mistral/services/executions.py @@ -59,7 +59,7 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): 'context': copy.deepcopy(wf_input) or {}, 'task_execution_id': params.get('task_execution_id'), 'runtime_context': { - 'with_items_index': params.get('with_items_index', 0) + 'index': params.get('index', 0) }, }) @@ -92,4 +92,4 @@ def create_workflow_execution(wf_identifier, wf_input, description, params, wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier) - return wf_ex.id, wf_spec + return wf_ex, wf_spec diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index ec60742f..cc00f3dc 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -162,9 +162,9 @@ class CallScheduler(periodic_task.PeriodicTasks): ) for (target_auth_context, target_method, method_args) in delayed_calls: - + # TODO(rakhmerov): https://bugs.launchpad.net/mistral/+bug/1484521 # Transaction is needed here because some of the - # target_method can use the DB + # target_method can use the DB. with db_api.transaction(): try: # Set the correct context for the method. @@ -175,9 +175,7 @@ class CallScheduler(periodic_task.PeriodicTasks): # Call the method. target_method(**method_args) except Exception as e: - LOG.error( - "Delayed call failed [exception=%s]", e - ) + LOG.exception("Delayed call failed [exception=%s]: %s", e) finally: # Remove context. context.set_ctx(None) diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 97597836..45619f00 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -31,7 +31,7 @@ LOG = logging.getLogger(__name__) # Default delay and timeout in seconds for await_xxx() functions. DEFAULT_DELAY = 1 -DEFAULT_TIMEOUT = 60 +DEFAULT_TIMEOUT = 20 def launch_engine_server(transport, engine): @@ -135,18 +135,38 @@ class EngineTestCase(base.DbTestCase): wf_execs = db_api.get_workflow_executions() - for wf_ex in wf_execs: + for w in wf_execs: print( - "\n%s [state=%s, output=%s]" % - (wf_ex.name, wf_ex.state, wf_ex.output) + "\n%s [state=%s, state_info=%s, output=%s]" % + (w.name, w.state, w.state_info, w.output) ) - for t_ex in wf_ex.task_executions: + for t in w.task_executions: print( - "\t%s [id=%s, state=%s, published=%s]" % - (t_ex.name, t_ex.id, t_ex.state, t_ex.published) + "\t%s [id=%s, state=%s, state_info=%s, processed=%s," + " published=%s]" % + (t.name, + t.id, + t.state, + t.state_info, + t.processed, + t.published) ) + a_execs = db_api.get_action_executions(task_execution_id=t.id) + + for a in a_execs: + print( + "\t\t%s [id=%s, state=%s, state_info=%s, accepted=%s," + " output=%s]" % + (a.name, + a.id, + a.state, + a.state_info, + a.accepted, + a.output) + ) + # Various methods for abstract execution objects. def is_execution_in_state(self, ex_id, state): diff --git a/mistral/tests/unit/engine/test_dataflow.py b/mistral/tests/unit/engine/test_dataflow.py index 5c0c907c..230c8417 100644 --- a/mistral/tests/unit/engine/test_dataflow.py +++ b/mistral/tests/unit/engine/test_dataflow.py @@ -486,10 +486,10 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') + task1 = self._assert_single_item(wf_ex.task_executions, name='task1') result = data_flow.get_task_execution_result(task1) + self.assertListEqual([], result) @@ -512,7 +512,7 @@ class DataFlowTest(test_base.BaseTest): name='my_action', output={'result': 1}, accepted=True, - runtime_context={'with_items_index': 0} + runtime_context={'index': 0} )] with mock.patch.object(db_api, 'get_action_executions', @@ -523,14 +523,14 @@ class DataFlowTest(test_base.BaseTest): name='my_action', output={'result': 1}, accepted=True, - runtime_context={'with_items_index': 0} + runtime_context={'index': 0} )) action_exs.append(models.ActionExecution( name='my_action', output={'result': 1}, accepted=False, - runtime_context={'with_items_index': 0} + runtime_context={'index': 0} )) with mock.patch.object(db_api, 'get_action_executions', diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 07c673ae..9bf0a7a0 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -143,6 +143,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): """ wf_service.create_workflows(wf_text) + wf_ex = self.engine.start_workflow('wf', {}) self.await_execution_success(wf_ex.id) @@ -411,6 +412,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): wf: input: - var + tasks: task1: action: std.echo output=<% $.var + $.var2 %> @@ -484,7 +486,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): name='task1' ) - self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.ERROR, task_1_ex.state) # Assert that there is only one action execution and it's SUCCESS. task_1_action_exs = db_api.get_action_executions( @@ -550,7 +552,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): name='task1' ) - self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.ERROR, task_1_ex.state) task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index d9ed0d75..658294b1 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -460,18 +460,27 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.ERROR, wf_ex.state) self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(2, len(wf_ex.task_executions)) - task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') - task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') + task_execs = wf_ex.task_executions + + self.assertEqual(2, len(task_execs)) + + task_1_ex = self._assert_single_item( + task_execs, + name='t1', + state=states.SUCCESS + ) + task_2_ex = self._assert_single_item( + task_execs, + name='t2', + state=states.ERROR + ) - self.assertEqual(states.SUCCESS, task_1_ex.state) - self.assertEqual(states.ERROR, task_2_ex.state) self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. e = self.assertRaises( - exc.EngineException, + exc.MistralError, self.engine.rerun_workflow, wf_ex.id, task_1_ex.id @@ -505,9 +514,12 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.ERROR, wf_ex.state) self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(1, len(wf_ex.task_executions)) - task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') + task_execs = wf_ex.task_executions + + self.assertEqual(1, len(task_execs)) + + task_1_ex = self._assert_single_item(task_execs, name='t1') self.assertEqual(states.ERROR, task_1_ex.state) self.assertIsNotNone(task_1_ex.state_info) @@ -532,10 +544,13 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, wf_ex.state) self.assertIsNone(wf_ex.state_info) - self.assertEqual(2, len(wf_ex.task_executions)) - task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') - task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') + task_execs = wf_ex.task_executions + + self.assertEqual(2, len(task_execs)) + + task_1_ex = self._assert_single_item(task_execs, name='t1') + task_2_ex = self._assert_single_item(task_execs, name='t2') # Check action executions of task 1. self.assertEqual(states.SUCCESS, task_1_ex.state) @@ -547,8 +562,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase): # The single action execution that succeeded should not re-run. self.assertEqual(5, len(task_1_action_exs)) - self.assertListEqual(['Task 1.0', 'Task 1.1', 'Task 1.2'], - task_1_ex.published.get('v1')) + self.assertListEqual( + ['Task 1.0', 'Task 1.1', 'Task 1.2'], + task_1_ex.published.get('v1') + ) # Check action executions of task 2. self.assertEqual(states.SUCCESS, task_2_ex.state) diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 9625dfdf..29a2bcb6 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -172,8 +172,7 @@ class EnvironmentTest(base.EngineTestCase): 'mistral.actions.std_actions.EchoAction', {}, a_ex.input, - TARGET, - True + TARGET ) def test_subworkflow_env_task_input(self): diff --git a/mistral/tests/unit/engine/test_error_handling.py b/mistral/tests/unit/engine/test_error_handling.py new file mode 100644 index 00000000..7696729a --- /dev/null +++ b/mistral/tests/unit/engine/test_error_handling.py @@ -0,0 +1,61 @@ +# Copyright 2016 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from mistral.db.v2 import api as db_api +from mistral.services import workflows as wf_service +from mistral.tests.unit.engine import base +from mistral.workflow import states + + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +class DirectWorkflowEngineTest(base.EngineTestCase): + def test_action_error(self): + wf_text = """ + version: '2.0' + + wf: + tasks: + task1: + action: std.fail + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf', {}) + + self.await_workflow_error(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + self.assertEqual(1, len(task_execs)) + + self._assert_single_item(task_execs, name='task1', state=states.ERROR) + + # TODO(rakhmerov): Finish. Need to check more complicated cases. + + def test_task_error(self): + # TODO(rakhmerov): Implement. + pass + + def test_workflow_error(self): + # TODO(rakhmerov): Implement. + pass diff --git a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py index 827faf63..f493a66d 100644 --- a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py +++ b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py @@ -178,13 +178,13 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertIn( - 'Failure caused by error in tasks: task1', + 'Failed to handle action completion [wf=wf, task=task1', wf_ex.state_info ) task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') - self.assertEqual( + self.assertIn( "Size of 'published' is 1KB which exceeds the limit of 0KB", task_ex.state_info ) diff --git a/mistral/tests/unit/engine/test_join.py b/mistral/tests/unit/engine/test_join.py index 1809de4b..b150eff5 100644 --- a/mistral/tests/unit/engine/test_join.py +++ b/mistral/tests/unit/engine/test_join.py @@ -576,6 +576,7 @@ class JoinEngineTest(base.EngineTestCase): action: std.noop on-success: - task22 + task22: action: std.noop on-success: @@ -585,6 +586,7 @@ class JoinEngineTest(base.EngineTestCase): action: std.fail on-success: - task32 + task32: action: std.noop on-success: diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py index e8685148..3aedd7bd 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py @@ -303,7 +303,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): # Resume workflow and re-run failed task. e = self.assertRaises( - exc.EngineException, + exc.MistralError, self.engine.rerun_workflow, wf_ex.id, task_1_ex.id diff --git a/mistral/tests/unit/engine/test_run_action.py b/mistral/tests/unit/engine/test_run_action.py index 8404fa01..2f5c36fc 100644 --- a/mistral/tests/unit/engine/test_run_action.py +++ b/mistral/tests/unit/engine/test_run_action.py @@ -65,6 +65,7 @@ class RunActionEngineTest(base.EngineTestCase): # Start action and see the result. action_ex = self.engine.start_action('std.echo', {'output': 'Hello!'}) + self.assertIsNotNone(action_ex.output) self.assertIn('some error', action_ex.output['result']) def test_run_action_save_result(self): @@ -148,10 +149,15 @@ class RunActionEngineTest(base.EngineTestCase): self.assertIn('concat', exception.message) - @mock.patch('mistral.engine.action_handler.resolve_action_definition') + # TODO(rakhmerov): This is an example of a bad test. It pins to + # implementation details too much and prevents from making refactoring + # easily. When writing tests we should make assertions about + # consequences, not about how internal machinery works, i.e. we need to + # follow "black box" testing paradigm. + @mock.patch('mistral.engine.actions.resolve_action_definition') @mock.patch('mistral.engine.utils.validate_input') @mock.patch('mistral.services.action_manager.get_action_class') - @mock.patch('mistral.engine.action_handler.run_action') + @mock.patch('mistral.engine.actions.PythonAction.run') def test_run_action_with_kwargs_input(self, run_mock, class_mock, validate_mock, def_mock): action_def = models.ActionDefinition() @@ -172,16 +178,15 @@ class RunActionEngineTest(base.EngineTestCase): self.engine.start_action('fake_action', {'input': 'Hello'}) - self.assertEqual(2, def_mock.call_count) - def_mock.assert_called_with('fake_action', None, None) + self.assertEqual(1, def_mock.call_count) + def_mock.assert_called_with('fake_action') self.assertEqual(0, validate_mock.call_count) class_ret.assert_called_once_with(input='Hello') run_mock.assert_called_once_with( - action_def, {'input': 'Hello'}, - target=None, - async=False + None, + save=None ) diff --git a/mistral/tests/unit/engine/test_subworkflows.py b/mistral/tests/unit/engine/test_subworkflows.py index d00f5ef8..c18434c4 100644 --- a/mistral/tests/unit/engine/test_subworkflows.py +++ b/mistral/tests/unit/engine/test_subworkflows.py @@ -181,7 +181,7 @@ class SubworkflowsTest(base.EngineTestCase): @mock.patch.object(std_actions.EchoAction, 'run', mock.MagicMock(side_effect=exc.ActionException)) def test_subworkflow_error(self): - wf2_ex = self.engine.start_workflow('wb1.wf2', None) + self.engine.start_workflow('wb1.wf2', None) self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) @@ -208,11 +208,13 @@ class SubworkflowsTest(base.EngineTestCase): self.assertEqual(2, len(wf_execs)) wf2_ex = self._assert_single_item(wf_execs, name='wb2.wf2') + self.assertEqual(states.ERROR, wf2_ex.state) self.assertIn('Can not evaluate YAQL expression', wf2_ex.state_info) # Ensure error message is bubbled up to the main workflow. wf1_ex = self._assert_single_item(wf_execs, name='wb2.wf1') + self.assertEqual(states.ERROR, wf1_ex.state) self.assertIn('Can not evaluate YAQL expression', wf1_ex.state_info) diff --git a/mistral/tests/unit/engine/test_workflow_resume.py b/mistral/tests/unit/engine/test_workflow_resume.py index b55d7f90..4384fdfd 100644 --- a/mistral/tests/unit/engine/test_workflow_resume.py +++ b/mistral/tests/unit/engine/test_workflow_resume.py @@ -16,7 +16,6 @@ import mock from oslo_config import cfg from mistral.db.v2 import api as db_api -from mistral.engine import default_engine as de from mistral import exceptions as exc from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base @@ -139,7 +138,7 @@ workflows: tasks: task1: - action: std.echo output="Hi!" + action: std.echo output="Task 1" on-complete: - task3 - pause @@ -299,7 +298,7 @@ class WorkflowResumeTest(base.EngineTestCase): self.engine.resume_workflow(wf_ex.id) - self.await_execution_success(wf_ex.id, 1, 5) + self.await_execution_success(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -351,8 +350,7 @@ class WorkflowResumeTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, wf_ex.state, wf_ex.state_info) self.assertEqual(4, len(wf_ex.task_executions)) - @mock.patch.object(de.DefaultEngine, '_fail_workflow') - def test_resume_fails(self, mock_fw): + def test_resume_fails(self): # Start and pause workflow. wb_service.create_workbook_v2(WORKBOOK_DIFFERENT_TASK_STATES) @@ -365,7 +363,7 @@ class WorkflowResumeTest(base.EngineTestCase): self.assertEqual(states.PAUSED, wf_ex.state) # Simulate failure and check if it is handled. - err = exc.MistralException('foo') + err = exc.MistralError('foo') with mock.patch.object( db_api, @@ -373,13 +371,11 @@ class WorkflowResumeTest(base.EngineTestCase): side_effect=err): self.assertRaises( - exc.MistralException, + exc.MistralError, self.engine.resume_workflow, wf_ex.id ) - mock_fw.assert_called_once_with(wf_ex.id, err) - def test_resume_diff_env_vars(self): wb_service.create_workbook_v2(RESUME_WORKBOOK_DIFF_ENV_VAR) diff --git a/mistral/tests/unit/workflow/test_direct_workflow.py b/mistral/tests/unit/workflow/test_direct_workflow.py index 8e864a0e..541a1dc2 100644 --- a/mistral/tests/unit/workflow/test_direct_workflow.py +++ b/mistral/tests/unit/workflow/test_direct_workflow.py @@ -103,7 +103,7 @@ class DirectWorkflowControllerTest(base.DbTestCase): state=states.SUCCESS, output={'result': 'Hey'}, accepted=True, - runtime_context={'with_items_index': 0} + runtime_context={'index': 0} ) ) diff --git a/mistral/tests/unit/workflow/test_with_items.py b/mistral/tests/unit/workflow/test_with_items.py index 90edfd5a..eeab387b 100644 --- a/mistral/tests/unit/workflow/test_with_items.py +++ b/mistral/tests/unit/workflow/test_with_items.py @@ -25,9 +25,7 @@ class WithItemsTest(base.BaseTest): return models.ActionExecution( accepted=accepted, state=state, - runtime_context={ - 'with_items_index': index - } + runtime_context={'index': index} ) def test_get_indices(self): diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index fd6b761e..c13923f0 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -23,6 +23,7 @@ from os import path import shutil import six import socket +import sys import tempfile import threading import uuid @@ -169,6 +170,16 @@ def cut(data, length=100): return string +def cut_by_kb(data, kilobytes): + if kilobytes <= 0: + return cut(data) + + bytes_per_char = sys.getsizeof('s') - sys.getsizeof('') + length = int(kilobytes * 1024 / bytes_per_char) + + return cut(data, length) + + def iter_subclasses(cls, _seen=None): """Generator over all subclasses of a given class in depth first order.""" diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 23a0e117..da8092d4 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -83,7 +83,8 @@ class WorkflowController(object): self.wf_spec = wf_spec - def _update_task_ex_env(self, task_ex, env): + @staticmethod + def _update_task_ex_env(task_ex, env): if not env: return task_ex diff --git a/mistral/workflow/commands.py b/mistral/workflow/commands.py index ad7b9727..739818ce 100644 --- a/mistral/workflow/commands.py +++ b/mistral/workflow/commands.py @@ -45,17 +45,18 @@ class RunTask(WorkflowCommand): def __init__(self, wf_ex, task_spec, ctx): super(RunTask, self).__init__(wf_ex, task_spec, ctx) - self.wait_flag = False + + self.wait = False def is_waiting(self): - return (self.wait_flag and + return (self.wait and isinstance(self.task_spec, tasks.DirectWorkflowTaskSpec) and self.task_spec.get_join()) def __repr__(self): return ( "Run task [workflow=%s, task=%s, waif_flag=%s]" - % (self.wf_ex.name, self.task_spec.get_name(), self.wait_flag) + % (self.wf_ex.name, self.task_spec.get_name(), self.wait) ) diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 62c3bf9d..d0851e5b 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -73,7 +73,7 @@ def get_task_execution_result(task_ex): # use db_api.get_action_executions here to avoid session-less use cases. action_execs = db_api.get_action_executions(task_execution_id=task_ex.id) action_execs.sort( - key=lambda x: x.runtime_context.get('with_items_index') + key=lambda x: x.runtime_context.get('index') ) results = [ @@ -111,12 +111,6 @@ def publish_variables(task_ex, task_spec): ) -def destroy_task_result(task_ex): - for ex in task_ex.executions: - if hasattr(ex, 'output'): - ex.output = {} - - def evaluate_task_outbound_context(task_ex): """Evaluates task outbound Data Flow context. diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index d4aef7c8..fad7fdab 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -121,7 +121,7 @@ class DirectWorkflowController(base.WorkflowController): # NOTE(xylan): Decide whether or not a join task should run # immediately. if self._is_unsatisfied_join(cmd): - cmd.wait_flag = True + cmd.wait = True cmds.append(cmd) diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index 282bb5a0..345bc010 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -20,6 +20,9 @@ from mistral import exceptions as exc from mistral.workflow import states +# TODO(rakhmerov): Seems like it makes sense to get rid of this module in favor +# of implementing all the needed logic in engine.tasks.WithItemsTask directly. + _CAPACITY = 'capacity' _CONCURRENCY = 'concurrency' _COUNT = 'count' @@ -73,7 +76,7 @@ def _get_with_item_indices(exs): :param exs: List of executions. :return: a list of numbers. """ - return sorted(set([ex.runtime_context['with_items_index'] for ex in exs])) + return sorted(set([ex.runtime_context['index'] for ex in exs])) def _get_accepted_act_exs(task_ex): diff --git a/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py b/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py index b05cc440..05918cbd 100644 --- a/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py +++ b/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py @@ -597,18 +597,14 @@ class ExecutionTestsV2(base.TestCase): @test.attr(type='negative') def test_create_execution_for_reverse_wf_invalid_start_task(self): - _, wf_ex = self.client.create_execution( + self.assertRaises( + exceptions.BadRequest, + self.client.create_execution, self.reverse_wf['name'], - { - self.reverse_wf['input']: "Bye"}, - { - "task_name": "nonexist" - } + {self.reverse_wf['input']: "Bye"}, + {"task_name": "nonexist"} ) - self.assertEqual("ERROR", wf_ex['state']) - self.assertIn("Invalid task name", wf_ex['state_info']) - @test.attr(type='negative') def test_create_execution_forgot_input_params(self): self.assertRaises(exceptions.BadRequest,