From e2c89f777d378e91c904a7e2bfc7e126bc80dde2 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 1 Jun 2016 13:33:20 +0700 Subject: [PATCH] Refactoring workflow handler * Introduced new class Workflow that manages life-cycle of running workflows and is responsible for managing workflow persistent state * Moved all workflow level logic to workflow handler and Workflow class * Changed semantics if how workflows start errors are handled. Previously, in case of invalid user input Mistral engine would store information about error in "state_info" field of workflow execution and bubble up an exception to the user. This approach was incorrect for a number of reasons including broken semantics: if an exception was raised due to invalid input it's normal to expect that system state has not changed. After this refactoring, engine only raises an exception in case of bad user input. That way behavior is consistent with the idea of exceptional situations. * Fixed unit tests in according to the previous point * Fixed a number of logical issues in tests. For example, in test_default_engine.py we expected one type of errors (e.g. env not found) but effectively received another one (invalid input). Partially implements: blueprint mistral-engine-error-handling Change-Id: I09070411fd833df8284cb80db69b8401a40eb6fe --- mistral/db/v2/sqlalchemy/api.py | 104 ++--- mistral/engine/actions.py | 24 +- mistral/engine/base.py | 6 +- mistral/engine/default_engine.py | 93 +---- mistral/engine/dispatcher.py | 6 +- mistral/engine/task_handler.py | 8 +- mistral/engine/utils.py | 13 +- mistral/engine/workflow_handler.py | 238 ++++------- mistral/engine/workflows.py | 371 ++++++++++++++++++ mistral/exceptions.py | 51 ++- mistral/services/action_manager.py | 2 +- mistral/services/executions.py | 95 ----- mistral/services/periodic.py | 2 +- mistral/services/workflows.py | 2 +- .../unit/api/v2/test_action_executions.py | 2 +- mistral/tests/unit/api/v2/test_actions.py | 4 +- .../tests/unit/api/v2/test_cron_triggers.py | 4 +- mistral/tests/unit/api/v2/test_environment.py | 4 +- mistral/tests/unit/api/v2/test_executions.py | 2 +- mistral/tests/unit/api/v2/test_tasks.py | 2 +- mistral/tests/unit/api/v2/test_workbooks.py | 4 +- mistral/tests/unit/api/v2/test_workflows.py | 4 +- .../unit/db/v2/test_sqlalchemy_db_api.py | 56 +-- .../tests/unit/engine/test_default_engine.py | 64 ++- .../unit/engine/test_direct_workflow_rerun.py | 28 +- .../test_execution_fields_size_limitation.py | 99 +++-- .../engine/test_reverse_workflow_rerun.py | 5 +- mistral/tests/unit/services/test_scheduler.py | 2 +- .../unit/services/test_trigger_service.py | 2 +- .../unit/services/test_workflow_service.py | 2 +- mistral/tests/unit/test_exception_base.py | 6 +- mistral/utils/rest_utils.py | 38 +- mistral/workflow/base.py | 6 + mistral/workflow/states.py | 4 + mistral/workflow/utils.py | 30 -- .../tests/api/v2/test_mistral_basic_v2.py | 10 +- 36 files changed, 773 insertions(+), 620 deletions(-) create mode 100644 mistral/engine/workflows.py delete mode 100644 mistral/services/executions.py diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index e08097d3..c2cd0466 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -48,7 +48,7 @@ def setup_db(): try: models.Workbook.metadata.create_all(b.get_engine()) except sa.exc.OperationalError as e: - raise exc.DBException("Failed to setup database: %s" % e) + raise exc.DBError("Failed to setup database: %s" % e) def drop_db(): @@ -58,7 +58,7 @@ def drop_db(): models.Workbook.metadata.drop_all(b.get_engine()) _facade = None except Exception as e: - raise exc.DBException("Failed to drop database: %s" % e) + raise exc.DBError("Failed to drop database: %s" % e) # Transaction management. @@ -183,7 +183,7 @@ def get_workbook(name): wb = _get_workbook(name) if not wb: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Workbook not found [workbook_name=%s]" % name ) @@ -207,7 +207,7 @@ def create_workbook(values, session=None): try: wb.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for WorkbookDefinition: %s" % e.columns ) @@ -219,7 +219,7 @@ def update_workbook(name, values, session=None): wb = _get_workbook(name) if not wb: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Workbook not found [workbook_name=%s]" % name ) @@ -241,7 +241,7 @@ def delete_workbook(name, session=None): wb = _get_workbook(name) if not wb: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Workbook not found [workbook_name=%s]" % name ) @@ -284,7 +284,7 @@ def get_workflow_definition(identifier): else _get_workflow_definition(identifier)) if not wf_def: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Workflow not found [workflow_identifier=%s]" % identifier ) @@ -295,7 +295,7 @@ def get_workflow_definition_by_id(id): wf_def = _get_workflow_definition_by_id(id) if not wf_def: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Workflow not found [workflow_id=%s]" % id ) @@ -324,7 +324,7 @@ def get_workflow_definitions(limit=None, marker=None, sort_keys=None, query ) except Exception as e: - raise exc.DBQueryEntryException( + raise exc.DBQueryEntryError( "Failed when querying database, error type: %s, " "error message: %s" % (e.__class__.__name__, e.message) ) @@ -339,7 +339,7 @@ def create_workflow_definition(values, session=None): try: wf_def.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for WorkflowDefinition: %s" % e.columns ) @@ -366,7 +366,7 @@ def update_workflow_definition(identifier, values, session=None): try: [get_cron_trigger(name) for name in cron_triggers] - except exc.DBEntityNotFoundException: + except exc.DBEntityNotFoundError: raise exc.NotAllowedException( "Can not update scope of workflow that has triggers " "associated in other tenants." @@ -403,7 +403,7 @@ def delete_workflow_definition(identifier, session=None): cron_triggers = _get_associated_cron_triggers(identifier) if cron_triggers: - raise exc.DBException( + raise exc.DBError( "Can't delete workflow that has triggers associated. " "[workflow_identifier=%s], [cron_trigger_name(s)=%s]" % (identifier, ', '.join(cron_triggers)) @@ -449,7 +449,7 @@ def get_action_definition_by_id(id): action_def = _get_db_object_by_id(models.ActionDefinition, id) if not action_def: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Action not found [action_id=%s]" % id ) @@ -460,7 +460,7 @@ def get_action_definition(name): a_def = _get_action_definition(name) if not a_def: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Action definition not found [action_name=%s]" % name ) @@ -485,7 +485,7 @@ def get_action_definitions(limit=None, marker=None, sort_keys=['name'], query ) except Exception as e: - raise exc.DBQueryEntryException( + raise exc.DBQueryEntryError( "Failed when querying database, error type: %s, " "error message: %s" % (e.__class__.__name__, e.message) ) @@ -500,7 +500,7 @@ def create_action_definition(values, session=None): try: a_def.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for action %s: %s" % (a_def.name, e.columns) ) @@ -512,7 +512,7 @@ def update_action_definition(name, values, session=None): a_def = _get_action_definition(name) if not a_def: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Action definition not found [action_name=%s]" % name ) @@ -534,7 +534,7 @@ def delete_action_definition(name, session=None): a_def = _get_action_definition(name) if not a_def: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Action definition not found [action_name=%s]" % name ) @@ -556,7 +556,7 @@ def get_execution(id): ex = _get_execution(id) if not ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Execution not found [execution_id=%s]" % id ) @@ -584,7 +584,7 @@ def create_execution(values, session=None): try: ex.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for Execution: %s" % e.columns ) @@ -596,7 +596,7 @@ def update_execution(id, values, session=None): ex = _get_execution(id) if not ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Execution not found [execution_id=%s]" % id ) @@ -618,7 +618,7 @@ def delete_execution(id, session=None): ex = _get_execution(id) if not ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Execution not found [execution_id=%s]" % id ) @@ -644,7 +644,7 @@ def get_action_execution(id): a_ex = _get_action_execution(id) if not a_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "ActionExecution not found [id=%s]" % id ) @@ -672,7 +672,7 @@ def create_action_execution(values, session=None): try: a_ex.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for ActionExecution: %s" % e.columns ) @@ -684,7 +684,7 @@ def update_action_execution(id, values, session=None): a_ex = _get_action_execution(id) if not a_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "ActionExecution not found [id=%s]" % id ) @@ -706,7 +706,7 @@ def delete_action_execution(id, session=None): a_ex = _get_action_execution(id) if not a_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "ActionExecution not found [id=%s]" % id ) @@ -732,7 +732,7 @@ def get_workflow_execution(id): wf_ex = _get_workflow_execution(id) if not wf_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "WorkflowExecution not found [id=%s]" % id ) @@ -761,7 +761,7 @@ def get_workflow_executions(limit=None, marker=None, sort_keys=['created_at'], query ) except Exception as e: - raise exc.DBQueryEntryException( + raise exc.DBQueryEntryError( "Failed when quering database, error type: %s, " "error message: %s" % (e.__class__.__name__, e.message) ) @@ -776,7 +776,7 @@ def create_workflow_execution(values, session=None): try: wf_ex.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for WorkflowExecution: %s" % e.columns ) @@ -788,7 +788,7 @@ def update_workflow_execution(id, values, session=None): wf_ex = _get_workflow_execution(id) if not wf_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "WorkflowExecution not found [id=%s]" % id ) @@ -810,7 +810,7 @@ def delete_workflow_execution(id, session=None): wf_ex = _get_workflow_execution(id) if not wf_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "WorkflowExecution not found [id=%s]" % id ) @@ -832,7 +832,7 @@ def get_task_execution(id): task_ex = _get_task_execution(id) if not task_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Task execution not found [id=%s]" % id ) @@ -856,7 +856,7 @@ def create_task_execution(values, session=None): try: task_ex.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for TaskExecution: %s" % e.columns ) @@ -868,7 +868,7 @@ def update_task_execution(id, values, session=None): task_ex = _get_task_execution(id) if not task_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "TaskExecution not found [id=%s]" % id ) @@ -890,7 +890,7 @@ def delete_task_execution(id, session=None): task_ex = _get_task_execution(id) if not task_ex: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "TaskExecution not found [id=%s]" % id ) @@ -920,7 +920,7 @@ def create_delayed_call(values, session=None): try: delayed_call.save(session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for DelayedCall: %s" % e.columns ) @@ -932,7 +932,7 @@ def delete_delayed_call(id, session=None): delayed_call = _get_delayed_call(id) if not delayed_call: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "DelayedCall not found [id=%s]" % id ) @@ -982,7 +982,7 @@ def get_delayed_call(id, session=None): delayed_call = _get_delayed_call(id=id, session=session) if not delayed_call: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Delayed Call not found [id=%s]" % id ) @@ -1020,7 +1020,7 @@ def get_cron_trigger(name): cron_trigger = _get_cron_trigger(name) if not cron_trigger: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Cron trigger not found [name=%s]" % name ) @@ -1054,14 +1054,14 @@ def create_cron_trigger(values, session=None): try: cron_trigger.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for cron trigger %s: %s" % (cron_trigger.name, e.columns) ) # TODO(nmakhotkin): Remove this 'except' after fixing # https://bugs.launchpad.net/oslo.db/+bug/1458583. except db_exc.DBError as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for cron trigger: %s" % e ) @@ -1073,7 +1073,7 @@ def update_cron_trigger(name, values, session=None, query_filter=None): cron_trigger = _get_cron_trigger(name) if not cron_trigger: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Cron trigger not found [name=%s]" % name ) @@ -1122,7 +1122,7 @@ def delete_cron_trigger(name, session=None): cron_trigger = _get_cron_trigger(name) if not cron_trigger: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Cron trigger not found [name=%s]" % name ) @@ -1156,7 +1156,7 @@ def get_environment(name): env = _get_environment(name) if not env: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Environment not found [name=%s]" % name ) @@ -1180,7 +1180,7 @@ def create_environment(values, session=None): try: env.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for Environment: %s" % e.columns ) @@ -1192,7 +1192,7 @@ def update_environment(name, values, session=None): env = _get_environment(name) if not env: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Environment not found [name=%s]" % name ) @@ -1216,7 +1216,7 @@ def delete_environment(name, session=None): env = _get_environment(name) if not env: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Environment not found [name=%s]" % name ) @@ -1278,7 +1278,7 @@ def create_resource_member(values, session=None): try: res_member.save(session=session) except db_exc.DBDuplicateEntry as e: - raise exc.DBDuplicateEntryException( + raise exc.DBDuplicateEntryError( "Duplicate entry for ResourceMember: %s" % e.columns ) @@ -1299,7 +1299,7 @@ def get_resource_member(resource_id, res_type, member_id): ).first() if not res_member: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Resource member not found [resource_id=%s, member_id=%s]" % (resource_id, member_id) ) @@ -1329,7 +1329,7 @@ def update_resource_member(resource_id, res_type, member_id, values, # Only member who is not the owner of the resource can update the # membership status. if member_id != security.get_project_id(): - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Resource member not found [resource_id=%s, member_id=%s]" % (resource_id, member_id) ) @@ -1343,7 +1343,7 @@ def update_resource_member(resource_id, res_type, member_id, values, ).first() if not res_member: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Resource member not found [resource_id=%s, member_id=%s]" % (resource_id, member_id) ) @@ -1362,7 +1362,7 @@ def delete_resource_member(resource_id, res_type, member_id, session=None): res_member = query.filter(_get_criterion(resource_id, member_id)).first() if not res_member: - raise exc.DBEntityNotFoundException( + raise exc.DBEntityNotFoundError( "Resource member not found [resource_id=%s, member_id=%s]" % (resource_id, member_id) ) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index c209f10a..a66f588c 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -20,10 +20,10 @@ import six from mistral.db.v2 import api as db_api from mistral.engine import rpc from mistral.engine import utils as e_utils +from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc from mistral import expressions as expr from mistral.services import action_manager as a_m -from mistral.services import executions as wf_ex_service from mistral.services import scheduler from mistral.services import security from mistral import utils @@ -36,7 +36,6 @@ from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) _RUN_EXISTING_ACTION_PATH = 'mistral.engine.actions._run_existing_action' -_RESUME_WORKFLOW_PATH = 'mistral.engine.actions._resume_workflow' @six.add_metaclass(abc.ABCMeta) @@ -381,24 +380,13 @@ class WorkflowAction(Action): wf_params[k] = v del input_dict[k] - wf_ex, _ = wf_ex_service.create_workflow_execution( - wf_def.name, + wf_handler.start_workflow( + wf_def.id, input_dict, "sub-workflow execution", - wf_params, - wf_spec + wf_params ) - scheduler.schedule_call( - None, - _RESUME_WORKFLOW_PATH, - 0, - wf_ex_id=wf_ex.id, - env=None - ) - - # TODO(rakhmerov): Add info logging. - def run(self, input_dict, target, index=0, desc='', save=True): raise NotImplemented('Does not apply to this WorkflowAction.') @@ -411,10 +399,6 @@ class WorkflowAction(Action): pass -def _resume_workflow(wf_ex_id, env): - rpc.get_engine_client().resume_workflow(wf_ex_id, env=env) - - def _run_existing_action(action_ex_id, target): action_ex = db_api.get_action_execution(action_ex_id) action_def = db_api.get_action_definition(action_ex.name) diff --git a/mistral/engine/base.py b/mistral/engine/base.py index 63e0f857..66f4fe92 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -78,19 +78,19 @@ class Engine(object): raise NotImplementedError @abc.abstractmethod - def resume_workflow(self, wf_ex_id): + def resume_workflow(self, wf_ex_id, env=None): """Resumes workflow. :param wf_ex_id: Execution id. + :param env: Workflow environment. :return: Workflow execution object. """ raise NotImplementedError @abc.abstractmethod - def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None): + def rerun_workflow(self, task_ex_id, reset=True, env=None): """Rerun workflow from the specified task. - :param wf_ex_id: Workflow execution id. :param task_ex_id: Task execution id. :param reset: If True, reset task state including deleting its action executions. diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 34d4334c..3a9cc0f9 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -20,14 +20,8 @@ from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler from mistral.engine import base -from mistral.engine import dispatcher from mistral.engine import workflow_handler as wf_handler -from mistral.services import executions as wf_ex_service -from mistral.services import workflows as wf_service from mistral import utils as u -from mistral.workflow import base as wf_base -from mistral.workflow import commands -from mistral.workflow import states LOG = logging.getLogger(__name__) @@ -47,23 +41,12 @@ class DefaultEngine(base.Engine, coordination.Service): def start_workflow(self, wf_identifier, wf_input, description='', **params): with db_api.transaction(): - # TODO(rakhmerov): It needs to be hidden in workflow_handler and - # Workflow abstraction. - # The new workflow execution will be in an IDLE - # state on initial record creation. - wf_ex, wf_spec = wf_ex_service.create_workflow_execution( + wf_ex = wf_handler.start_workflow( wf_identifier, wf_input, description, params ) - wf_handler.set_workflow_state(wf_ex, states.RUNNING) - - wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - - cmds = wf_ctrl.continue_workflow() - - dispatcher.dispatch_workflow_commands(wf_ex, cmds) return wf_ex.get_clone() @@ -115,88 +98,40 @@ class DefaultEngine(base.Engine, coordination.Service): with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - wf_handler.set_workflow_state(wf_ex, states.PAUSED) + wf_handler.pause_workflow(wf_ex) - return wf_ex - - @staticmethod - def _continue_workflow(wf_ex, task_ex=None, reset=True, env=None): - wf_ex = wf_service.update_workflow_execution_env(wf_ex, env) - - wf_handler.set_workflow_state( - wf_ex, - states.RUNNING, - set_upstream=True - ) - - wf_ctrl = wf_base.get_controller(wf_ex) - - # TODO(rakhmerov): Add error handling. - # Calculate commands to process next. - cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) - - # When resuming a workflow we need to ignore all 'pause' - # commands because workflow controller takes tasks that - # completed within the period when the workflow was paused. - # TODO(rakhmerov): This all should be in workflow handler, it's too - # specific for engine level. - cmds = list( - filter( - lambda c: not isinstance(c, commands.PauseWorkflow), - cmds - ) - ) - - # Since there's no explicit task causing the operation - # we need to mark all not processed tasks as processed - # because workflow controller takes only completed tasks - # with flag 'processed' equal to False. - for t_ex in wf_ex.task_executions: - if states.is_completed(t_ex.state) and not t_ex.processed: - t_ex.processed = True - - dispatcher.dispatch_workflow_commands(wf_ex, cmds) - - if not cmds: - wf_handler.check_workflow_completion(wf_ex) - - return wf_ex.get_clone() + return wf_ex.get_clone() @u.log_exec(LOG) - def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None): - # TODO(rakhmerov): Rewrite this functionality with Task abstraction. + def rerun_workflow(self, task_ex_id, reset=True, env=None): with db_api.transaction(): - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - task_ex = db_api.get_task_execution(task_ex_id) - if task_ex.workflow_execution.id != wf_ex_id: - raise ValueError('Workflow execution ID does not match.') + wf_ex = wf_handler.lock_workflow_execution( + task_ex.workflow_execution_id + ) - if wf_ex.state == states.PAUSED: - return wf_ex.get_clone() + wf_handler.rerun_workflow(wf_ex, task_ex, reset=reset, env=env) - # TODO(rakhmerov): This should be a call to workflow handler. - return self._continue_workflow(wf_ex, task_ex, reset, env=env) + return wf_ex.get_clone() @u.log_exec(LOG) def resume_workflow(self, wf_ex_id, env=None): - # TODO(rakhmerov): Rewrite this functionality with Task abstraction. with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - if (not states.is_paused(wf_ex.state) and - not states.is_idle(wf_ex.state)): - return wf_ex.get_clone() + wf_handler.resume_workflow(wf_ex, env=env) - return self._continue_workflow(wf_ex, env=env) + return wf_ex.get_clone() @u.log_exec(LOG) def stop_workflow(self, wf_ex_id, state, message=None): with db_api.transaction(): wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - return wf_handler.stop_workflow(wf_ex, state, message) + wf_handler.stop_workflow(wf_ex, state, message) + + return wf_ex.get_clone() @u.log_exec(LOG) def rollback_workflow(self, wf_ex_id): diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py index cd4c8ead..778cb64e 100644 --- a/mistral/engine/dispatcher.py +++ b/mistral/engine/dispatcher.py @@ -31,11 +31,7 @@ def dispatch_workflow_commands(wf_ex, wf_cmds): if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)): task_handler.run_task(cmd) elif isinstance(cmd, commands.SetWorkflowState): - # TODO(rakhmerov): Make just a single call to workflow_handler - if states.is_completed(cmd.new_state): - wf_handler.stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg) - else: - wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg) + wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg) elif isinstance(cmd, commands.Noop): # Do nothing. pass diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index c566dd85..b61ef0a2 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -58,7 +58,7 @@ def run_task(wf_cmd): return if task.is_completed(): - wf_handler.check_workflow_completion(wf_cmd.wf_ex) + wf_handler.on_task_complete(task.task_ex) def on_action_complete(action_ex): @@ -102,7 +102,7 @@ def on_action_complete(action_ex): return if task.is_completed(): - wf_handler.check_workflow_completion(wf_ex) + wf_handler.on_task_complete(task_ex) def fail_task(task_ex, msg): @@ -120,7 +120,7 @@ def continue_task(task_ex): task.run() if task.is_completed(): - wf_handler.check_workflow_completion(task_ex.workflow_execution) + wf_handler.on_task_complete(task_ex) def complete_task(task_ex, state, state_info): @@ -130,7 +130,7 @@ def complete_task(task_ex, state, state_info): task.complete(state, state_info) if task.is_completed(): - wf_handler.check_workflow_completion(task_ex.workflow_execution) + wf_handler.on_task_complete(task_ex) def _build_task_from_execution(task_ex, task_spec=None): diff --git a/mistral/engine/utils.py b/mistral/engine/utils.py index 556e3889..c31f2731 100644 --- a/mistral/engine/utils.py +++ b/mistral/engine/utils.py @@ -22,10 +22,12 @@ from mistral import utils # TODO(rakhmerov): This method is too abstract, validation rules may vary -# depending on object type (action, wf), it's not clear what it can be +# depending on object type (action, wf), it's not clear what it can be # applied to. -def validate_input(definition, input, spec=None): - input_param_names = copy.deepcopy(list((input or {}).keys())) +# TODO(rakhmerov): It must not do any manipulations with parameters +# (input_dict)! +def validate_input(definition, input_dict, spec=None): + input_param_names = copy.deepcopy(list((input_dict or {}).keys())) missing_param_names = [] spec_input = (spec.get_input() if spec else @@ -33,7 +35,8 @@ def validate_input(definition, input, spec=None): for p_name, p_value in six.iteritems(spec_input): if p_value is utils.NotDefined and p_name not in input_param_names: - missing_param_names.append(p_name) + missing_param_names.append(str(p_name)) + if p_name in input_param_names: input_param_names.remove(p_name) @@ -55,7 +58,7 @@ def validate_input(definition, input, spec=None): msg % tuple(msg_props) ) else: - utils.merge_dicts(input, spec_input, overwrite=False) + utils.merge_dicts(input_dict, spec_input, overwrite=False) def resolve_workflow_definition(parent_wf_name, parent_wf_spec_name, diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 41c0f6f3..06c8e2c7 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -12,208 +12,110 @@ # See the License for the specific language governing permissions and # limitations under the License. -from oslo_config import cfg from oslo_log import log as logging +import traceback as tb from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models -from mistral.engine import rpc +from mistral.engine import workflows from mistral import exceptions as exc -from mistral.services import scheduler -from mistral import utils -from mistral.utils import wf_trace -from mistral.workbook import parser as spec_parser -from mistral.workflow import base as wf_base -from mistral.workflow import data_flow from mistral.workflow import states -from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) +def start_workflow(wf_identifier, wf_input, desc, params): + wf = workflows.Workflow( + db_api.get_workflow_definition(wf_identifier) + ) + + wf.start(wf_input, desc=desc, params=params) + + return wf.wf_ex + + +def stop_workflow(wf_ex, state, msg=None): + wf = workflows.Workflow( + db_api.get_workflow_definition(wf_ex.workflow_id), + wf_ex=wf_ex + ) + + # In this case we should not try to handle possible errors. Instead, + # we need to let them pop up since the typical way of failing objects + # doesn't work here. Failing a workflow is the same as stopping it + # with ERROR state. + wf.stop(state, msg) + + +def fail_workflow(wf_ex, msg=None): + stop_workflow(wf_ex, states.ERROR, msg) + + def on_task_complete(task_ex): wf_ex = task_ex.workflow_execution - check_workflow_completion(wf_ex) + wf = workflows.Workflow( + db_api.get_workflow_definition(wf_ex.workflow_id), + wf_ex=wf_ex + ) - -def check_workflow_completion(wf_ex): - if states.is_paused_or_completed(wf_ex.state): - return - - # Workflow is not completed if there are any incomplete task - # executions that are not in WAITING state. If all incomplete - # tasks are waiting and there are no unhandled errors, then these - # tasks will not reach completion. In this case, mark the - # workflow complete. - incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex) - - if any(not states.is_waiting(t.state) for t in incomplete_tasks): - return - - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - - if wf_ctrl.all_errors_handled(): - succeed_workflow( - wf_ex, - wf_ctrl.evaluate_workflow_final_context(), - wf_spec - ) - else: - state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex) - - fail_workflow(wf_ex, state_info) - - -def stop_workflow(wf_ex, state, message=None): - if state == states.SUCCESS: - wf_ctrl = wf_base.get_controller(wf_ex) - - final_context = {} - - try: - final_context = wf_ctrl.evaluate_workflow_final_context() - except Exception as e: - LOG.warning( - 'Failed to get final context for %s: %s' % (wf_ex, e) - ) - - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - - return succeed_workflow( - wf_ex, - final_context, - wf_spec, - message - ) - elif state == states.ERROR: - return fail_workflow(wf_ex, message) - - return wf_ex - - -def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None): - # Fail workflow if output is not successfully evaluated. try: - wf_ex.output = data_flow.evaluate_workflow_output( - wf_spec, - final_context + wf.on_task_complete(task_ex) + except exc.MistralException as e: + msg = ( + "Failed to handle task completion [wf_ex=%s, task_ex=%s]: %s\n%s" + % (wf_ex, task_ex, e, tb.format_exc()) ) - except Exception as e: - return fail_workflow(wf_ex, e.message) - # Set workflow execution to success until after output is evaluated. - set_workflow_state(wf_ex, states.SUCCESS, state_info) + LOG.error(msg) - if wf_ex.task_execution_id: - _schedule_send_result_to_parent_workflow(wf_ex) - - return wf_ex + fail_workflow(wf.wf_ex, msg) -def fail_workflow(wf_ex, state_info): - if states.is_paused_or_completed(wf_ex.state): - return wf_ex - - set_workflow_state(wf_ex, states.ERROR, state_info) - - # When we set an ERROR state we should safely set output value getting - # w/o exceptions due to field size limitations. - state_info = utils.cut_by_kb( - state_info, - cfg.CONF.engine.execution_field_size_limit_kb +def pause_workflow(wf_ex, msg=None): + wf = workflows.Workflow( + db_api.get_workflow_definition(wf_ex.workflow_id), + wf_ex=wf_ex ) - wf_ex.output = {'result': state_info} - - if wf_ex.task_execution_id: - _schedule_send_result_to_parent_workflow(wf_ex) - - return wf_ex + wf.set_state(states.PAUSED, msg) -def _schedule_send_result_to_parent_workflow(wf_ex): - scheduler.schedule_call( - None, - 'mistral.engine.workflow_handler.send_result_to_parent_workflow', - 0, - wf_ex_id=wf_ex.id +def rerun_workflow(wf_ex, task_ex, reset=True, env=None): + if wf_ex.state == states.PAUSED: + return wf_ex.get_clone() + + wf = workflows.Workflow( + db_api.get_workflow_definition(wf_ex.workflow_id), + wf_ex=wf_ex ) - -def send_result_to_parent_workflow(wf_ex_id): - wf_ex = db_api.get_workflow_execution(wf_ex_id) - - if wf_ex.state == states.SUCCESS: - rpc.get_engine_client().on_action_complete( - wf_ex.id, - wf_utils.Result(data=wf_ex.output) - ) - elif wf_ex.state == states.ERROR: - err_msg = ( - wf_ex.state_info or - 'Failed subworkflow [execution_id=%s]' % wf_ex.id - ) - - rpc.get_engine_client().on_action_complete( - wf_ex.id, - wf_utils.Result(error=err_msg) - ) + wf.rerun(task_ex, reset=reset, env=env) -# TODO(rakhmerov): Should not be public, should be encapsulated inside Workflow -# abstraction. -def set_workflow_state(wf_ex, state, state_info=None, set_upstream=False): - cur_state = wf_ex.state +def resume_workflow(wf_ex, env=None): + if not states.is_paused_or_idle(wf_ex.state): + return wf_ex.get_clone() - if states.is_valid_transition(cur_state, state): - wf_ex.state = state - wf_ex.state_info = state_info + wf = workflows.Workflow( + db_api.get_workflow_definition(wf_ex.workflow_id), + wf_ex=wf_ex + ) - wf_trace.info( - wf_ex, - "Execution of workflow '%s' [%s -> %s]" - % (wf_ex.workflow_name, cur_state, state) - ) + wf.resume(env=env) + + +def set_workflow_state(wf_ex, state, msg=None): + if states.is_completed(state): + stop_workflow(wf_ex, state, msg) + elif states.is_paused(state): + pause_workflow(wf_ex, msg) else: - msg = ("Can't change workflow execution state from %s to %s. " - "[workflow=%s, execution_id=%s]" % - (cur_state, state, wf_ex.name, wf_ex.id)) - raise exc.WorkflowException(msg) - - # Workflow result should be accepted by parent workflows (if any) - # only if it completed successfully or failed. - wf_ex.accepted = wf_ex.state in (states.SUCCESS, states.ERROR) - - # If specified, then recursively set the state of the parent workflow - # executions to the same state. Only changing state to RUNNING is - # supported. - # TODO(rakhmerov): I don't like this hardcoded special case. It's - # used only to continue the workflow (rerun) but at the first glance - # seems like a generic behavior. Need to handle it differently. - if set_upstream and state == states.RUNNING and wf_ex.task_execution_id: - task_ex = db_api.get_task_execution(wf_ex.task_execution_id) - - parent_wf_ex = lock_workflow_execution(task_ex.workflow_execution_id) - - set_workflow_state( - parent_wf_ex, - state, - state_info=state_info, - set_upstream=set_upstream + raise exc.MistralError( + 'Invalid workflow state [wf_ex=%s, state=%s]' % (wf_ex, state) ) - # TODO(rakhmerov): How do we need to set task state properly? - # It doesn't seem right to intervene into the parent workflow - # internals. We just need to communicate changes back to parent - # worklfow and it should do what's needed itself. - task_ex.state = state - task_ex.state_info = None - task_ex.processed = False - def lock_workflow_execution(wf_ex_id): # Locks a workflow execution using the db_api.acquire_lock function. diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py new file mode 100644 index 00000000..34dc2478 --- /dev/null +++ b/mistral/engine/workflows.py @@ -0,0 +1,371 @@ +# Copyright 2016 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import copy +from oslo_config import cfg +from oslo_log import log as logging +import six + +from mistral.db.v2 import api as db_api +from mistral.db.v2.sqlalchemy import models as db_models +from mistral.engine import dispatcher +from mistral.engine import rpc +from mistral.engine import utils as eng_utils +from mistral import exceptions as exc +from mistral.services import scheduler +from mistral.services import workflows as wf_service +from mistral import utils +from mistral.utils import wf_trace +from mistral.workbook import parser as spec_parser +from mistral.workflow import base as wf_base +from mistral.workflow import commands +from mistral.workflow import data_flow +from mistral.workflow import states +from mistral.workflow import utils as wf_utils + + +LOG = logging.getLogger(__name__) + +_SEND_RESULT_TO_PARENT_WORKFLOW_PATH = ( + 'mistral.engine.workflows._send_result_to_parent_workflow' +) + + +@six.add_metaclass(abc.ABCMeta) +class Workflow(object): + """Workflow. + + Represents a workflow and defines interface that can be used by + Mistral engine or its components in order to manipulate with workflows. + """ + + def __init__(self, wf_def, wf_ex=None): + self.wf_def = wf_def + self.wf_ex = wf_ex + self.wf_spec = spec_parser.get_workflow_spec(wf_def.spec) + + def start(self, input_dict, desc='', params=None): + """Start workflow. + + :param input_dict: Workflow input. + :param desc: Workflow execution description. + :param params: Workflow type specific parameters. + """ + + wf_trace.info(self.wf_ex, "Starting workflow: %s" % self.wf_def) + + # TODO(rakhmerov): This call implicitly changes input_dict! Fix it! + # After fix we need to move validation after adding risky fields. + eng_utils.validate_input(self.wf_def, input_dict, self.wf_spec) + + self._create_execution(input_dict, desc, params) + + self.set_state(states.RUNNING) + + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) + + cmds = wf_ctrl.continue_workflow() + + dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) + + def stop(self, state, msg=None): + """Stop workflow. + + :param state: New workflow state. + :param msg: Additional explaining message. + """ + + if state == states.SUCCESS: + wf_ctrl = wf_base.get_controller(self.wf_ex) + + final_context = {} + + try: + final_context = wf_ctrl.evaluate_workflow_final_context() + except Exception as e: + LOG.warning( + 'Failed to get final context for %s: %s' % (self.wf_ex, e) + ) + + return self._succeed_workflow(final_context, msg) + elif state == states.ERROR: + return self._fail_workflow(msg) + + def on_task_complete(self, task_ex): + """Handle task completion event. + + :param task_ex: Task execution that's completed. + """ + + self._check_and_complete() + + def resume(self, env=None): + """Resume workflow. + + :param env: Environment. + """ + + wf_service.update_workflow_execution_env(self.wf_ex, env) + + self.set_state(states.RUNNING, recursive=True) + + self._continue_workflow(env=env) + + def rerun(self, task_ex, reset=True, env=None): + """Rerun workflow from the given task. + + :param task_ex: Task execution that the workflow needs to rerun from. + :param reset: If True, reset task state including deleting its action + executions. + :param env: Environment. + """ + + wf_service.update_workflow_execution_env(self.wf_ex, env) + + self.set_state(states.RUNNING, recursive=True) + + self._continue_workflow(task_ex, reset, env=env) + + def lock(self): + return db_api.acquire_lock(db_models.WorkflowExecution, self.wf_ex.id) + + def _create_execution(self, input_dict, desc, params): + self.wf_ex = db_api.create_workflow_execution({ + 'name': self.wf_def.name, + 'description': desc, + 'workflow_name': self.wf_def.name, + 'workflow_id': self.wf_def.id, + 'spec': self.wf_spec.to_dict(), + 'state': states.IDLE, + 'output': {}, + 'task_execution_id': params.get('task_execution_id'), + 'runtime_context': { + 'index': params.get('index', 0) + }, + }) + + self.wf_ex.input = input_dict or {} + self.wf_ex.context = copy.deepcopy(input_dict) or {} + + env = _get_environment(params) + + if env: + params['env'] = env + + self.wf_ex.params = params + + data_flow.add_openstack_data_to_context(self.wf_ex) + data_flow.add_execution_to_context(self.wf_ex) + data_flow.add_environment_to_context(self.wf_ex) + data_flow.add_workflow_variables_to_context(self.wf_ex, self.wf_spec) + + def set_state(self, state, state_info=None, recursive=False): + cur_state = self.wf_ex.state + + if states.is_valid_transition(cur_state, state): + self.wf_ex.state = state + self.wf_ex.state_info = state_info + + wf_trace.info( + self.wf_ex, + "Execution of workflow '%s' [%s -> %s]" + % (self.wf_ex.workflow_name, cur_state, state) + ) + else: + msg = ("Can't change workflow execution state from %s to %s. " + "[workflow=%s, execution_id=%s]" % + (cur_state, state, self.wf_ex.name, self.wf_ex.id)) + + raise exc.WorkflowException(msg) + + # Workflow result should be accepted by parent workflows (if any) + # only if it completed successfully or failed. + self.wf_ex.accepted = state in (states.SUCCESS, states.ERROR) + + if recursive and self.wf_ex.task_execution_id: + parent_task_ex = db_api.get_task_execution( + self.wf_ex.task_execution_id + ) + + parent_wf = Workflow( + db_api.get_workflow_definition(parent_task_ex.workflow_id), + parent_task_ex.workflow_execution + ) + + parent_wf.lock() + + parent_wf.set_state(state, recursive=recursive) + + # TODO(rakhmerov): It'd be better to use instance of Task here. + parent_task_ex.state = state + parent_task_ex.state_info = None + parent_task_ex.processed = False + + def _continue_workflow(self, task_ex=None, reset=True, env=None): + wf_ctrl = wf_base.get_controller(self.wf_ex) + + # Calculate commands to process next. + cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) + + # When resuming a workflow we need to ignore all 'pause' + # commands because workflow controller takes tasks that + # completed within the period when the workflow was paused. + cmds = list( + filter(lambda c: not isinstance(c, commands.PauseWorkflow), cmds) + ) + + # Since there's no explicit task causing the operation + # we need to mark all not processed tasks as processed + # because workflow controller takes only completed tasks + # with flag 'processed' equal to False. + for t_ex in self.wf_ex.task_executions: + if states.is_completed(t_ex.state) and not t_ex.processed: + t_ex.processed = True + + dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) + + if not cmds: + self._check_and_complete() + + def _check_and_complete(self): + if states.is_paused_or_completed(self.wf_ex.state): + return + + # Workflow is not completed if there are any incomplete task + # executions that are not in WAITING state. If all incomplete + # tasks are waiting and there are unhandled errors, then these + # tasks will not reach completion. In this case, mark the + # workflow complete. + incomplete_tasks = wf_utils.find_incomplete_task_executions(self.wf_ex) + + if any(not states.is_waiting(t.state) for t in incomplete_tasks): + return + + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) + + if wf_ctrl.all_errors_handled(): + self._succeed_workflow(wf_ctrl.evaluate_workflow_final_context()) + else: + self._fail_workflow(_build_fail_info_message(wf_ctrl, self.wf_ex)) + + def _succeed_workflow(self, final_context, msg=None): + self.wf_ex.output = data_flow.evaluate_workflow_output( + self.wf_spec, + final_context + ) + + # Set workflow execution to success until after output is evaluated. + self.set_state(states.SUCCESS, msg) + + if self.wf_ex.task_execution_id: + self._schedule_send_result_to_parent_workflow() + + def _fail_workflow(self, msg): + if states.is_paused_or_completed(self.wf_ex.state): + return + + self.set_state(states.ERROR, state_info=msg) + + # When we set an ERROR state we should safely set output value getting + # w/o exceptions due to field size limitations. + msg = utils.cut_by_kb( + msg, + cfg.CONF.engine.execution_field_size_limit_kb + ) + + self.wf_ex.output = {'result': msg} + + if self.wf_ex.task_execution_id: + self._schedule_send_result_to_parent_workflow() + + def _schedule_send_result_to_parent_workflow(self): + scheduler.schedule_call( + None, + _SEND_RESULT_TO_PARENT_WORKFLOW_PATH, + 0, + wf_ex_id=self.wf_ex.id + ) + + +def _get_environment(params): + env = params.get('env', {}) + + if isinstance(env, dict): + return env + + if isinstance(env, six.string_types): + env_db = db_api.load_environment(env) + + if not env_db: + raise exc.InputException( + 'Environment is not found: %s' % env + ) + + return env_db.variables + + raise exc.InputException( + 'Unexpected value type for environment [env=%s, type=%s]' + % (env, type(env)) + ) + + +def _send_result_to_parent_workflow(wf_ex_id): + wf_ex = db_api.get_workflow_execution(wf_ex_id) + + if wf_ex.state == states.SUCCESS: + rpc.get_engine_client().on_action_complete( + wf_ex.id, + wf_utils.Result(data=wf_ex.output) + ) + elif wf_ex.state == states.ERROR: + err_msg = ( + wf_ex.state_info or + 'Failed subworkflow [execution_id=%s]' % wf_ex.id + ) + + rpc.get_engine_client().on_action_complete( + wf_ex.id, + wf_utils.Result(error=err_msg) + ) + + +def _build_fail_info_message(wf_ctrl, wf_ex): + # Try to find where error is exactly. + failed_tasks = sorted( + filter( + lambda t: not wf_ctrl.is_error_handled_for(t), + wf_utils.find_error_task_executions(wf_ex) + ), + key=lambda t: t.name + ) + + msg = ('Failure caused by error in tasks: %s\n' % + ', '.join([t.name for t in failed_tasks])) + + for t in failed_tasks: + msg += '\n %s [task_ex_id=%s] -> %s\n' % (t.name, t.id, t.state_info) + + for i, ex in enumerate(t.executions): + if ex.state == states.ERROR: + output = (ex.output or dict()).get('result', 'Unknown') + msg += ( + ' [action_ex_id=%s, idx=%s]: %s\n' % ( + ex.id, + i, + str(output) + ) + ) + + return msg diff --git a/mistral/exceptions.py b/mistral/exceptions.py index 57550c24..4b35252e 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -14,18 +14,39 @@ # limitations under the License. +# TODO(rakhmerov): Can we make one parent for errors and exceptions? + class MistralError(Exception): """Mistral specific error. - Reserved for situations that can't automatically handled. When it occurs + Reserved for situations that can't be automatically handled. When it occurs it signals that there is a major environmental problem like invalid startup configuration or implementation problem (e.g. some code doesn't take care of certain corner cases). From architectural perspective it's pointless to try to handle this type of problems except doing some finalization work like transaction rollback, deleting temporary files etc. """ + + message = "An unknown error occurred" + http_code = 500 + def __init__(self, message=None): - super(MistralError, self).__init__(message) + if message is not None: + self.message = message + + super(MistralError, self).__init__( + '%d: %s' % (self.http_code, self.message)) + + @property + def code(self): + """This is here for webob to read. + + https://github.com/Pylons/webob/blob/master/webob/exc.py + """ + return self.http_code + + def __str__(self): + return self.message class MistralException(Exception): @@ -46,6 +67,13 @@ class MistralException(Exception): message = "An unknown exception occurred" http_code = 500 + def __init__(self, message=None): + if message is not None: + self.message = message + + super(MistralException, self).__init__( + '%d: %s' % (self.http_code, self.message)) + @property def code(self): """This is here for webob to read. @@ -57,30 +85,23 @@ class MistralException(Exception): def __str__(self): return self.message - def __init__(self, message=None): - if message is not None: - self.message = message - super(MistralException, self).__init__( - '%d: %s' % (self.http_code, self.message)) +# Database errors. - -# Database exceptions. - -class DBException(MistralException): +class DBError(MistralError): http_code = 400 -class DBDuplicateEntryException(DBException): +class DBDuplicateEntryError(DBError): http_code = 409 message = "Database object already exists" -class DBQueryEntryException(DBException): +class DBQueryEntryError(DBError): http_code = 400 -class DBEntityNotFoundException(DBException): +class DBEntityNotFoundError(DBError): http_code = 404 message = "Object not found" @@ -101,7 +122,7 @@ class InvalidModelException(DSLParsingException): message = "Wrong entity definition" -# Various common exceptions. +# Various common exceptions and errors. class YaqlEvaluationException(MistralException): http_code = 400 diff --git a/mistral/services/action_manager.py b/mistral/services/action_manager.py index 8e67b9ae..6be34f03 100644 --- a/mistral/services/action_manager.py +++ b/mistral/services/action_manager.py @@ -69,7 +69,7 @@ def register_action_class(name, action_class_str, attributes, LOG.debug("Registering action in DB: %s" % name) db_api.create_action_definition(values) - except exc.DBDuplicateEntryException: + except exc.DBDuplicateEntryError: LOG.debug("Action %s already exists in DB." % name) diff --git a/mistral/services/executions.py b/mistral/services/executions.py deleted file mode 100644 index 20d849ec..00000000 --- a/mistral/services/executions.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2016 - StackStorm, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy -import six - -from oslo_log import log as logging - -from mistral.db.v2 import api as db_api -from mistral.engine import utils as eng_utils -from mistral.utils import wf_trace -from mistral.workbook import parser as spec_parser -from mistral.workflow import data_flow -from mistral.workflow import states - - -LOG = logging.getLogger(__name__) - - -def canonize_workflow_params(params): - # Resolve environment parameter. - env = params.get('env', {}) - - if not isinstance(env, dict) and not isinstance(env, six.string_types): - raise ValueError( - 'Unexpected type for environment [environment=%s]' % str(env) - ) - - if isinstance(env, six.string_types): - env_db = db_api.get_environment(env) - env = env_db.variables - params['env'] = env - - return params - - -def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): - wf_ex = db_api.create_workflow_execution({ - 'name': wf_def.name, - 'description': desc, - 'workflow_name': wf_def.name, - 'workflow_id': wf_def.id, - 'spec': wf_spec.to_dict(), - 'params': params or {}, - 'state': states.IDLE, - 'input': wf_input or {}, - 'output': {}, - 'context': copy.deepcopy(wf_input) or {}, - 'task_execution_id': params.get('task_execution_id'), - 'runtime_context': { - 'index': params.get('index', 0) - }, - }) - - data_flow.add_openstack_data_to_context(wf_ex) - data_flow.add_execution_to_context(wf_ex) - data_flow.add_environment_to_context(wf_ex) - data_flow.add_workflow_variables_to_context(wf_ex, wf_spec) - - return wf_ex - - -def create_workflow_execution(wf_identifier, wf_input, description, params, - wf_spec=None): - params = canonize_workflow_params(params) - - wf_def = db_api.get_workflow_definition(wf_identifier) - - if wf_spec is None: - wf_spec = spec_parser.get_workflow_spec(wf_def.spec) - - eng_utils.validate_input(wf_def, wf_input, wf_spec) - - wf_ex = _create_workflow_execution( - wf_def, - wf_spec, - wf_input, - description, - params - ) - - wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier) - - return wf_ex, wf_spec diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index bd427ba3..32568cf0 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -102,7 +102,7 @@ def advance_cron_trigger(t): 'next_execution_time': t.next_execution_time } ) - except exc.DBEntityNotFoundException as e: + except exc.DBEntityNotFoundError as e: # Cron trigger was probably already deleted by a different process. LOG.debug( "Cron trigger named '%s' does not exist anymore: %s", diff --git a/mistral/services/workflows.py b/mistral/services/workflows.py index 657e5831..25e375b3 100644 --- a/mistral/services/workflows.py +++ b/mistral/services/workflows.py @@ -111,7 +111,7 @@ def update_workflow_execution_env(wf_ex, env): if wf_ex.state not in [states.IDLE, states.PAUSED, states.ERROR]: raise exc.NotAllowedException( 'Updating env to workflow execution is only permitted if ' - 'it is in idle, paused, or re-runnable state.' + 'it is in IDLE, PAUSED, or ERROR state.' ) wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env) diff --git a/mistral/tests/unit/api/v2/test_action_executions.py b/mistral/tests/unit/api/v2/test_action_executions.py index c0b91e65..11aa0cb6 100644 --- a/mistral/tests/unit/api/v2/test_action_executions.py +++ b/mistral/tests/unit/api/v2/test_action_executions.py @@ -142,7 +142,7 @@ MOCK_ACTION_NOT_COMPLETE = mock.MagicMock( MOCK_AD_HOC_ACTION = mock.MagicMock(return_value=AD_HOC_ACTION_EX_DB) MOCK_ACTIONS = mock.MagicMock(return_value=[ACTION_EX_DB]) MOCK_EMPTY = mock.MagicMock(return_value=[]) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_DELETE = mock.MagicMock(return_value=None) diff --git a/mistral/tests/unit/api/v2/test_actions.py b/mistral/tests/unit/api/v2/test_actions.py index 7b072453..62324a50 100644 --- a/mistral/tests/unit/api/v2/test_actions.py +++ b/mistral/tests/unit/api/v2/test_actions.py @@ -119,8 +119,8 @@ MOCK_ACTIONS = mock.MagicMock(return_value=[ACTION_DB]) MOCK_UPDATED_ACTION = mock.MagicMock(return_value=UPDATED_ACTION_DB) MOCK_DELETE = mock.MagicMock(return_value=None) MOCK_EMPTY = mock.MagicMock(return_value=[]) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) -MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) +MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError()) class TestActionsController(base.APITest): diff --git a/mistral/tests/unit/api/v2/test_cron_triggers.py b/mistral/tests/unit/api/v2/test_cron_triggers.py index a2f71102..1a1de672 100644 --- a/mistral/tests/unit/api/v2/test_cron_triggers.py +++ b/mistral/tests/unit/api/v2/test_cron_triggers.py @@ -63,8 +63,8 @@ MOCK_TRIGGER = mock.MagicMock(return_value=TRIGGER_DB) MOCK_TRIGGERS = mock.MagicMock(return_value=[TRIGGER_DB]) MOCK_DELETE = mock.MagicMock(return_value=None) MOCK_EMPTY = mock.MagicMock(return_value=[]) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) -MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) +MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError()) class TestCronTriggerController(base.APITest): diff --git a/mistral/tests/unit/api/v2/test_environment.py b/mistral/tests/unit/api/v2/test_environment.py index 77c11dd3..92b823fc 100644 --- a/mistral/tests/unit/api/v2/test_environment.py +++ b/mistral/tests/unit/api/v2/test_environment.py @@ -107,8 +107,8 @@ MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB) MOCK_ENVIRONMENTS = mock.MagicMock(return_value=[ENVIRONMENT_DB]) MOCK_UPDATED_ENVIRONMENT = mock.MagicMock(return_value=UPDATED_ENVIRONMENT_DB) MOCK_EMPTY = mock.MagicMock(return_value=[]) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) -MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) +MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError()) MOCK_DELETE = mock.MagicMock(return_value=None) diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index c9510d64..23cc19cc 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -111,7 +111,7 @@ MOCK_WF_EXECUTIONS = mock.MagicMock(return_value=[WF_EX]) MOCK_UPDATED_WF_EX = mock.MagicMock(return_value=UPDATED_WF_EX) MOCK_DELETE = mock.MagicMock(return_value=None) MOCK_EMPTY = mock.MagicMock(return_value=[]) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException()) diff --git a/mistral/tests/unit/api/v2/test_tasks.py b/mistral/tests/unit/api/v2/test_tasks.py index 623e3c52..3074d51d 100644 --- a/mistral/tests/unit/api/v2/test_tasks.py +++ b/mistral/tests/unit/api/v2/test_tasks.py @@ -127,7 +127,7 @@ MOCK_WF_EX = mock.MagicMock(return_value=WF_EX) MOCK_TASK = mock.MagicMock(return_value=TASK_EX) MOCK_TASKS = mock.MagicMock(return_value=[TASK_EX]) MOCK_EMPTY = mock.MagicMock(return_value=[]) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_ERROR_TASK = mock.MagicMock(return_value=ERROR_TASK_EX) MOCK_ERROR_ITEMS_TASK = mock.MagicMock(return_value=ERROR_ITEMS_TASK_EX) diff --git a/mistral/tests/unit/api/v2/test_workbooks.py b/mistral/tests/unit/api/v2/test_workbooks.py index 80b07900..83e0b37e 100644 --- a/mistral/tests/unit/api/v2/test_workbooks.py +++ b/mistral/tests/unit/api/v2/test_workbooks.py @@ -97,8 +97,8 @@ MOCK_WORKBOOKS = mock.MagicMock(return_value=[WORKBOOK_DB]) MOCK_UPDATED_WORKBOOK = mock.MagicMock(return_value=UPDATED_WORKBOOK_DB) MOCK_DELETE = mock.MagicMock(return_value=None) MOCK_EMPTY = mock.MagicMock(return_value=[]) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) -MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) +MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError()) class TestWorkbooksController(base.APITest): diff --git a/mistral/tests/unit/api/v2/test_workflows.py b/mistral/tests/unit/api/v2/test_workflows.py index 43aa336e..19eef730 100644 --- a/mistral/tests/unit/api/v2/test_workflows.py +++ b/mistral/tests/unit/api/v2/test_workflows.py @@ -160,8 +160,8 @@ MOCK_WFS = mock.MagicMock(return_value=[WF_DB]) MOCK_UPDATED_WF = mock.MagicMock(return_value=UPDATED_WF_DB) MOCK_DELETE = mock.MagicMock(return_value=None) MOCK_EMPTY = mock.MagicMock(return_value=[]) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) -MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) +MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError()) class TestWorkflowsController(base.APITest): diff --git a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py index 7777fc5a..d4138149 100644 --- a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py @@ -83,7 +83,7 @@ class WorkbookTest(SQLAlchemyTest): db_api.create_workbook(WORKBOOKS[0]) self.assertRaises( - exc.DBDuplicateEntryException, + exc.DBDuplicateEntryError, db_api.create_workbook, WORKBOOKS[0] ) @@ -153,7 +153,7 @@ class WorkbookTest(SQLAlchemyTest): db_api.delete_workbook(created.name) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workbook, created.name ) @@ -284,7 +284,7 @@ class WorkflowDefinitionTest(SQLAlchemyTest): db_api.create_workflow_definition(WF_DEFINITIONS[0]) self.assertRaises( - exc.DBDuplicateEntryException, + exc.DBDuplicateEntryError, db_api.create_workflow_definition, WF_DEFINITIONS[0] ) @@ -424,7 +424,7 @@ class WorkflowDefinitionTest(SQLAlchemyTest): db_api.delete_workflow_definition(identifier) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workflow_definition, identifier ) @@ -536,7 +536,7 @@ class ActionDefinitionTest(SQLAlchemyTest): db_api.create_action_definition(ACTION_DEFINITIONS[0]) self.assertRaises( - exc.DBDuplicateEntryException, + exc.DBDuplicateEntryError, db_api.create_action_definition, ACTION_DEFINITIONS[0] ) @@ -606,7 +606,7 @@ class ActionDefinitionTest(SQLAlchemyTest): db_api.delete_action_definition(created.name) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_action_definition, created.name ) @@ -726,7 +726,7 @@ class ActionExecutionTest(SQLAlchemyTest): db_api.delete_action_execution(created.id) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_action_execution, created.id ) @@ -738,7 +738,7 @@ class ActionExecutionTest(SQLAlchemyTest): auth_context.set_ctx(test_base.get_context(default=False)) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.delete_action_execution, created.id ) @@ -880,7 +880,7 @@ class WorkflowExecutionTest(SQLAlchemyTest): db_api.delete_workflow_execution(created.id) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workflow_execution, created.id ) @@ -1130,7 +1130,7 @@ class TaskExecutionTest(SQLAlchemyTest): db_api.delete_task_execution(created.id) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_task_execution, created.id ) @@ -1203,7 +1203,7 @@ class CronTriggerTest(SQLAlchemyTest): db_api.create_cron_trigger(CRON_TRIGGERS[0]) self.assertRaises( - exc.DBDuplicateEntryException, + exc.DBDuplicateEntryError, db_api.create_cron_trigger, CRON_TRIGGERS[0] ) @@ -1302,7 +1302,7 @@ class CronTriggerTest(SQLAlchemyTest): self.assertEqual(1, rowcount) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_cron_trigger, created.name ) @@ -1365,7 +1365,7 @@ class EnvironmentTest(SQLAlchemyTest): db_api.create_environment(ENVIRONMENTS[0]) self.assertRaises( - exc.DBDuplicateEntryException, + exc.DBDuplicateEntryError, db_api.create_environment, ENVIRONMENTS[0] ) @@ -1432,7 +1432,7 @@ class EnvironmentTest(SQLAlchemyTest): db_api.delete_environment(created.name) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_environment, created.name ) @@ -1462,7 +1462,7 @@ class TXTest(SQLAlchemyTest): self.assertFalse(self.is_db_session_open()) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workbook, created['id'] ) @@ -1525,12 +1525,12 @@ class TXTest(SQLAlchemyTest): self.assertFalse(self.is_db_session_open()) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workflow_execution, created.id ) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workbook, created_wb.name ) @@ -1548,12 +1548,12 @@ class TXTest(SQLAlchemyTest): db_api.create_workbook(WORKBOOKS[0]) - except exc.DBDuplicateEntryException: + except exc.DBDuplicateEntryError: pass self.assertFalse(self.is_db_session_open()) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workbook, created.name ) @@ -1633,7 +1633,7 @@ class ResourceMemberTest(SQLAlchemyTest): # Tenant A can not see membership of resource shared to Tenant B. self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_resource_member, '123e4567-e89b-12d3-a456-426655440000', 'workflow', @@ -1644,7 +1644,7 @@ class ResourceMemberTest(SQLAlchemyTest): db_api.create_resource_member(RESOURCE_MEMBERS[0]) self.assertRaises( - exc.DBDuplicateEntryException, + exc.DBDuplicateEntryError, db_api.create_resource_member, RESOURCE_MEMBERS[0] ) @@ -1695,7 +1695,7 @@ class ResourceMemberTest(SQLAlchemyTest): created = db_api.create_resource_member(RESOURCE_MEMBERS[0]) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.update_resource_member, created.resource_id, 'workflow', @@ -1726,7 +1726,7 @@ class ResourceMemberTest(SQLAlchemyTest): auth_context.set_ctx(user_context) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.delete_resource_member, created.resource_id, 'workflow', @@ -1743,7 +1743,7 @@ class ResourceMemberTest(SQLAlchemyTest): ) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.delete_resource_member, created.resource_id, 'workflow', @@ -1752,7 +1752,7 @@ class ResourceMemberTest(SQLAlchemyTest): def test_delete_nonexistent_resource_member(self): self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.delete_resource_member, 'nonexitent_resource', 'workflow', @@ -1768,7 +1768,7 @@ class WorkflowSharingTest(SQLAlchemyTest): auth_context.set_ctx(user_context) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workflow_definition, wf.id ) @@ -1836,7 +1836,7 @@ class WorkflowSharingTest(SQLAlchemyTest): auth_context.set_ctx(user_context) self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, db_api.get_workflow_definition, wf.id ) @@ -1872,7 +1872,7 @@ class WorkflowSharingTest(SQLAlchemyTest): auth_context.set_ctx(test_base.get_context()) self.assertRaises( - exc.DBException, + exc.DBError, db_api.delete_workflow_definition, wf.id ) diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 7ee9d568..375f55d5 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -89,7 +89,7 @@ ENVIRONMENT_DB = models.Environment( ) MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB) -MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException()) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) class DefaultEngineTest(base.DbTestCase): @@ -210,7 +210,7 @@ class DefaultEngineTest(base.DbTestCase): self.assertDictEqual(wf_ex.params.get('env', {}), env) - @mock.patch.object(db_api, "get_environment", MOCK_ENVIRONMENT) + @mock.patch.object(db_api, "load_environment", MOCK_ENVIRONMENT) def test_start_workflow_with_saved_env(self): wf_input = { 'param1': '<% env().key1 %>', @@ -223,7 +223,8 @@ class DefaultEngineTest(base.DbTestCase): 'wb.wf', wf_input, env='test', - task_name='task2') + task_name='task2' + ) self.assertIsNotNone(wf_ex) @@ -233,23 +234,40 @@ class DefaultEngineTest(base.DbTestCase): @mock.patch.object(db_api, "get_environment", MOCK_NOT_FOUND) def test_start_workflow_env_not_found(self): - self.assertRaises(exc.DBEntityNotFoundException, - self.engine.start_workflow, - 'wb.wf', - {'param1': '<% env().key1 %>'}, - env='foo', - task_name='task2') + e = self.assertRaises( + exc.InputException, + self.engine.start_workflow, + 'wb.wf', + { + 'param1': '<% env().key1 %>', + 'param2': 'some value' + }, + env='foo', + task_name='task2' + ) + + self.assertEqual("Environment is not found: foo", e.message) def test_start_workflow_with_env_type_error(self): - self.assertRaises(ValueError, - self.engine.start_workflow, - 'wb.wf', - {'param1': '<% env().key1 %>'}, - env=True, - task_name='task2') + e = self.assertRaises( + exc.InputException, + self.engine.start_workflow, + 'wb.wf', + { + 'param1': '<% env().key1 %>', + 'param2': 'some value' + }, + env=True, + task_name='task2' + ) + + self.assertIn( + 'Unexpected value type for environment', + e.message + ) def test_start_workflow_missing_parameters(self): - self.assertRaises( + e = self.assertRaises( exc.InputException, self.engine.start_workflow, 'wb.wf', @@ -257,15 +275,25 @@ class DefaultEngineTest(base.DbTestCase): task_name='task2' ) + self.assertIn("Invalid input", e.message) + self.assertIn("missing=['param2']", e.message) + def test_start_workflow_unexpected_parameters(self): - self.assertRaises( + e = self.assertRaises( exc.InputException, self.engine.start_workflow, 'wb.wf', - {'param1': 'Hey', 'param2': 'Hi', 'unexpected_param': 'val'}, + { + 'param1': 'Hey', + 'param2': 'Hi', + 'unexpected_param': 'val' + }, task_name='task2' ) + self.assertIn("Invalid input", e.message) + self.assertIn("unexpected=['unexpected_param']", e.message) + def test_on_action_complete(self): wf_input = {'param1': 'Hey', 'param2': 'Hi'} diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index 658294b1..427523bf 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -241,7 +241,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_2_ex.id) + self.engine.rerun_workflow(task_2_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -347,7 +347,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): } # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_21_ex.id, env=updated_env) + self.engine.rerun_workflow(task_21_ex.id, env=updated_env) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -482,7 +482,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase): e = self.assertRaises( exc.MistralError, self.engine.rerun_workflow, - wf_ex.id, task_1_ex.id ) @@ -531,7 +530,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(3, len(task_1_action_exs)) # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False) + self.engine.rerun_workflow(task_1_ex.id, reset=False) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -616,7 +615,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(4, len(task_1_action_exs)) # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False) + self.engine.rerun_workflow(task_1_ex.id, reset=False) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -705,7 +704,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase): # Resume workflow and re-run failed task. self.engine.rerun_workflow( - wf_ex.id, task_1_ex.id, reset=False, env=updated_env @@ -798,7 +796,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertIsNotNone(task_3_ex.state_info) # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_3_ex.id) + self.engine.rerun_workflow(task_3_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -894,7 +892,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_1_ex.id) + self.engine.rerun_workflow(task_1_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -921,7 +919,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.WAITING, task_3_ex.state) # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_2_ex.id) + self.engine.rerun_workflow(task_2_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -1022,7 +1020,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(3, len(task_1_action_exs)) # Resume workflow and re-run failed task. Re-run #1 with no reset. - self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False) + self.engine.rerun_workflow(task_1_ex.id, reset=False) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -1043,7 +1041,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(5, len(task_1_action_exs)) # Resume workflow and re-run failed task. Re-run #2 with reset. - self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=True) + self.engine.rerun_workflow(task_1_ex.id, reset=True) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -1064,7 +1062,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(8, len(task_1_action_exs)) # Resume workflow and re-run failed task. Re-run #3 with no reset. - self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False) + self.engine.rerun_workflow(task_1_ex.id, reset=False) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -1085,7 +1083,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(10, len(task_1_action_exs)) # Resume workflow and re-run failed task. Re-run #4 with no reset. - self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False) + self.engine.rerun_workflow(task_1_ex.id, reset=False) wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -1160,7 +1158,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_2_ex.id) + self.engine.rerun_workflow(task_2_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) @@ -1261,7 +1259,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertIsNotNone(sub_wf_task_ex.state_info) # Resume workflow and re-run failed subworkflow task. - self.engine.rerun_workflow(sub_wf_ex.id, sub_wf_task_ex.id) + self.engine.rerun_workflow(sub_wf_task_ex.id) sub_wf_ex = db_api.get_workflow_execution(sub_wf_ex.id) diff --git a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py index f493a66d..9d855075 100644 --- a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py +++ b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py @@ -13,7 +13,6 @@ # limitations under the License. from oslo_config import cfg -import testtools from mistral.actions import base as actions_base from mistral.db.v2 import api as db_api @@ -63,20 +62,6 @@ class MyAction(actions_base.Action): raise NotImplementedError -def expect_size_limit_exception(field_name): - def logger(test_func): - def wrapped(*args, **kwargs): - with testtools.ExpectedException(exc.SizeLimitExceededException, - value_re="Size of '%s' is 1KB " - "which exceeds the limit" - " of 0KB" % field_name): - return test_func(*args, **kwargs) - - return wrapped - - return logger - - def generate_workflow(tokens): new_wf = WF long_string = ''.join('A' for _ in range(1024)) @@ -91,17 +76,35 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): def setUp(self): """Resets the size limit config between tests""" super(ExecutionFieldsSizeLimitTest, self).setUp() - cfg.CONF.set_default('execution_field_size_limit_kb', 0, - group='engine') + + cfg.CONF.set_default( + 'execution_field_size_limit_kb', + 0, + group='engine' + ) + test_base.register_action_class('my_action', MyAction) + def tearDown(self): + """Restores the size limit config to default""" + super(ExecutionFieldsSizeLimitTest, self).tearDown() + + cfg.CONF.set_default( + 'execution_field_size_limit_kb', + 1024, + group='engine' + ) + def test_default_limit(self): - cfg.CONF.set_default('execution_field_size_limit_kb', -1, - group='engine') + cfg.CONF.set_default( + 'execution_field_size_limit_kb', + -1, + group='engine' + ) new_wf = generate_workflow( - ['__ACTION_INPUT_', '__WORKFLOW_INPUT__', - '__TASK_PUBLISHED__']) + ['__ACTION_INPUT_', '__WORKFLOW_INPUT__', '__TASK_PUBLISHED__'] + ) wf_service.create_workflows(new_wf) @@ -110,25 +113,38 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): self.await_execution_success(wf_ex.id) - @expect_size_limit_exception('input') def test_workflow_input_default_value_limit(self): new_wf = generate_workflow(['__WORKFLOW_INPUT__']) wf_service.create_workflows(new_wf) # Start workflow. - self.engine.start_workflow('wf', {}) + e = self.assertRaises( + exc.SizeLimitExceededException, + self.engine.start_workflow, + 'wf', + {} + ) + + self.assertEqual( + "Size of 'input' is 1KB which exceeds the limit of 0KB", + e.message + ) - @expect_size_limit_exception('input') def test_workflow_input_limit(self): wf_service.create_workflows(WF) # Start workflow. - self.engine.start_workflow( + e = self.assertRaises( + exc.SizeLimitExceededException, + self.engine.start_workflow, 'wf', - { - 'workflow_input': ''.join('A' for _ in range(1024)) - } + {'workflow_input': ''.join('A' for _ in range(1024))} + ) + + self.assertEqual( + "Size of 'input' is 1KB which exceeds the limit of 0KB", + e.message ) def test_action_input_limit(self): @@ -149,9 +165,10 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): wf_service.create_workflows(WF) # Start workflow. - wf_ex = self.engine.start_workflow('wf', { - 'action_output_length': 1024 - }) + wf_ex = self.engine.start_workflow( + 'wf', + {'action_output_length': 1024} + ) self.await_execution_error(wf_ex.id) @@ -189,16 +206,22 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): task_ex.state_info ) - @expect_size_limit_exception('params') def test_workflow_params_limit(self): wf_service.create_workflows(WF) # Start workflow. long_string = ''.join('A' for _ in range(1024)) - self.engine.start_workflow('wf', {}, '', env={'param': long_string}) - def tearDown(self): - """Restores the size limit config to default""" - super(ExecutionFieldsSizeLimitTest, self).tearDown() - cfg.CONF.set_default('execution_field_size_limit_kb', 1024, - group='engine') + e = self.assertRaises( + exc.SizeLimitExceededException, + self.engine.start_workflow, + 'wf', + {}, + '', + env={'param': long_string} + ) + + self.assertIn( + "Size of 'params' is 1KB which exceeds the limit of 0KB", + e.message + ) diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py index 3aedd7bd..5210e742 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py @@ -104,7 +104,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_2_ex.id) + self.engine.rerun_workflow(task_2_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) @@ -203,7 +203,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): } # Resume workflow and re-run failed task. - self.engine.rerun_workflow(wf_ex.id, task_2_ex.id, env=updated_env) + self.engine.rerun_workflow(task_2_ex.id, env=updated_env) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) @@ -305,7 +305,6 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): e = self.assertRaises( exc.MistralError, self.engine.rerun_workflow, - wf_ex.id, task_1_ex.id ) diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_scheduler.py index f08ea363..d2973027 100644 --- a/mistral/tests/unit/services/test_scheduler.py +++ b/mistral/tests/unit/services/test_scheduler.py @@ -289,7 +289,7 @@ class SchedulerServiceTest(base.DbTestCase): eventlet.sleep(WAIT) - self.assertRaises(exc.DBEntityNotFoundException, + self.assertRaises(exc.DBEntityNotFoundError, db_api.get_delayed_call, calls[0].id ) diff --git a/mistral/tests/unit/services/test_trigger_service.py b/mistral/tests/unit/services/test_trigger_service.py index 44b59f44..bc25aea3 100644 --- a/mistral/tests/unit/services/test_trigger_service.py +++ b/mistral/tests/unit/services/test_trigger_service.py @@ -145,7 +145,7 @@ class TriggerServiceV2Test(base.DbTestCase): # But creation with the same count and first time # simultaneously leads to error. self.assertRaises( - exc.DBDuplicateEntryException, + exc.DBDuplicateEntryError, t_s.create_cron_trigger, 'trigger-%s' % utils.generate_unicode_uuid(), self.wf.name, diff --git a/mistral/tests/unit/services/test_workflow_service.py b/mistral/tests/unit/services/test_workflow_service.py index 225069e3..6c0284eb 100644 --- a/mistral/tests/unit/services/test_workflow_service.py +++ b/mistral/tests/unit/services/test_workflow_service.py @@ -165,7 +165,7 @@ class WorkflowServiceTest(base.DbTestCase): def test_update_non_existing_workflow_failed(self): exception = self.assertRaises( - exc.DBEntityNotFoundException, + exc.DBEntityNotFoundError, wf_service.update_workflows, WORKFLOW ) diff --git a/mistral/tests/unit/test_exception_base.py b/mistral/tests/unit/test_exception_base.py index 07160354..1296b434 100644 --- a/mistral/tests/unit/test_exception_base.py +++ b/mistral/tests/unit/test_exception_base.py @@ -24,19 +24,19 @@ class ExceptionTestCase(base.BaseTest): """Test cases for exception code.""" def test_nf_with_message(self): - exc = exceptions.DBEntityNotFoundException('check_for_this') + exc = exceptions.DBEntityNotFoundError('check_for_this') self.assertIn('check_for_this', six.text_type(exc)) self.assertEqual(404, exc.http_code) def test_nf_with_no_message(self): - exc = exceptions.DBEntityNotFoundException() + exc = exceptions.DBEntityNotFoundError() self.assertIn("Object not found", six.text_type(exc)) self.assertEqual(404, exc.http_code,) def test_duplicate_obj_code(self): - exc = exceptions.DBDuplicateEntryException() + exc = exceptions.DBDuplicateEntryError() self.assertIn("Database object already exists", six.text_type(exc)) self.assertEqual(409, exc.http_code,) diff --git a/mistral/utils/rest_utils.py b/mistral/utils/rest_utils.py index 24306694..b14bf798 100644 --- a/mistral/utils/rest_utils.py +++ b/mistral/utils/rest_utils.py @@ -20,9 +20,9 @@ import json import pecan import six from webob import Response -from wsme import exc +from wsme import exc as wsme_exc -from mistral import exceptions as ex +from mistral import exceptions as exc def wrap_wsme_controller_exception(func): @@ -35,10 +35,14 @@ def wrap_wsme_controller_exception(func): def wrapped(*args, **kwargs): try: return func(*args, **kwargs) - except ex.MistralException as excp: - pecan.response.translatable_error = excp - raise exc.ClientSideError(msg=six.text_type(excp), - status_code=excp.http_code) + except (exc.MistralException, exc.MistralError) as e: + pecan.response.translatable_error = e + + raise wsme_exc.ClientSideError( + msg=six.text_type(e), + status_code=e.http_code + ) + return wrapped @@ -52,31 +56,33 @@ def wrap_pecan_controller_exception(func): def wrapped(*args, **kwargs): try: return func(*args, **kwargs) - except ex.MistralException as excp: + except (exc.MistralException, exc.MistralError) as e: return Response( - status=excp.http_code, + status=e.http_code, content_type='application/json', - body=json.dumps(dict( - faultstring=six.text_type(excp)))) + body=json.dumps(dict(faultstring=six.text_type(e))) + ) return wrapped def validate_query_params(limit, sort_keys, sort_dirs): if limit is not None and limit <= 0: - raise exc.ClientSideError("Limit must be positive.") + raise wsme_exc.ClientSideError("Limit must be positive.") if len(sort_keys) < len(sort_dirs): - raise exc.ClientSideError("Length of sort_keys must be equal or " - "greater than sort_dirs.") + raise wsme_exc.ClientSideError( + "Length of sort_keys must be equal or greater than sort_dirs." + ) if len(sort_keys) > len(sort_dirs): sort_dirs.extend(['asc'] * (len(sort_keys) - len(sort_dirs))) for sort_dir in sort_dirs: if sort_dir not in ['asc', 'desc']: - raise exc.ClientSideError("Unknown sort direction, must be 'desc' " - "or 'asc'.") + raise wsme_exc.ClientSideError( + "Unknown sort direction, must be 'desc' or 'asc'." + ) def validate_fields(fields, object_fields): @@ -93,6 +99,6 @@ def validate_fields(fields, object_fields): invalid_fields = set(fields) - set(object_fields) if invalid_fields: - raise exc.ClientSideError( + raise wsme_exc.ClientSideError( 'Field(s) %s are invalid.' % ', '.join(invalid_fields) ) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index da8092d4..46a770d0 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -111,6 +111,8 @@ class WorkflowController(object): if self._is_paused_or_completed(): return [] + # TODO(rakhmerov): I think it should rather be a new method + # rerun_task() because it covers a different use case. if task_ex: return self._get_rerun_commands([task_ex], reset, env=env) @@ -199,7 +201,11 @@ class WorkflowController(object): :param env: A set of environment variables to overwrite. :return: List of workflow commands. """ + for task_ex in task_exs: + # TODO(rakhmerov): It is wrong that we update something in + # workflow controller, by design it should not change system + # state. Fix it, it should happen outside. self._update_task_ex_env(task_ex, env) cmds = [commands.RunExistingTask(t_e, reset) for t_e in task_exs] diff --git a/mistral/workflow/states.py b/mistral/workflow/states.py index dc0ab611..f528a432 100644 --- a/mistral/workflow/states.py +++ b/mistral/workflow/states.py @@ -69,6 +69,10 @@ def is_paused_or_completed(state): return is_paused(state) or is_completed(state) +def is_paused_or_idle(state): + return is_paused(state) or is_idle(state) + + def is_valid_transition(from_state, to_state): if is_invalid(from_state) or is_invalid(to_state): return False diff --git a/mistral/workflow/utils.py b/mistral/workflow/utils.py index 411e3a5f..ba8a125d 100644 --- a/mistral/workflow/utils.py +++ b/mistral/workflow/utils.py @@ -106,33 +106,3 @@ def find_incomplete_task_executions(wf_ex): def find_error_task_executions(wf_ex): return find_task_executions_with_state(wf_ex, states.ERROR) - - -def construct_fail_info_message(wf_ctrl, wf_ex): - # Try to find where error is exactly. - failed_tasks = sorted( - filter( - lambda t: not wf_ctrl.is_error_handled_for(t), - find_error_task_executions(wf_ex) - ), - key=lambda t: t.name - ) - - msg = ('Failure caused by error in tasks: %s\n' % - ', '.join([t.name for t in failed_tasks])) - - for t in failed_tasks: - msg += '\n %s [task_ex_id=%s] -> %s\n' % (t.name, t.id, t.state_info) - - for i, ex in enumerate(t.executions): - if ex.state == states.ERROR: - output = (ex.output or dict()).get('result', 'Unknown') - msg += ( - ' [action_ex_id=%s, idx=%s]: %s\n' % ( - ex.id, - i, - str(output) - ) - ) - - return msg diff --git a/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py b/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py index 05918cbd..6aa90225 100644 --- a/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py +++ b/mistral_tempest_tests/tests/api/v2/test_mistral_basic_v2.py @@ -607,10 +607,12 @@ class ExecutionTestsV2(base.TestCase): @test.attr(type='negative') def test_create_execution_forgot_input_params(self): - self.assertRaises(exceptions.BadRequest, - self.client.create_execution, - self.reverse_wf['name'], - params={"task_name": "nonexist"}) + self.assertRaises( + exceptions.BadRequest, + self.client.create_execution, + self.reverse_wf['name'], + params={"task_name": "nonexist"} + ) @test.attr(type='sanity') def test_action_ex_concurrency(self):