From 9a1a157274663cf812d577d3702f3b5a12c0ca01 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 23 Mar 2016 14:17:31 +0600 Subject: [PATCH] Refactor workflow controller and fix a bug in _fail_workflow() * Method get_controller is moved out from WorkflowController class because it's not related with its functionality directly * Fixed tests accordingly * "not found" test has been removed because there's no way now to make "not found" exceptin get raised. In order to make it happen we need to have a new workflow specification class w/o corresponding WorkflowController implementation. So that exception is just left just to check ourselves when we're working on a new WorkflowController implementation. Change-Id: I0330870e4382f01c4519b5c48e43ac50a08db338 --- mistral/engine/default_engine.py | 12 ++-- mistral/engine/task_handler.py | 2 +- mistral/tests/unit/workflow/test_base.py | 59 ---------------- .../tests/unit/workflow/test_workflow_base.py | 67 +++++++++++++++++++ mistral/workflow/base.py | 56 +++++++++------- 5 files changed, 104 insertions(+), 92 deletions(-) delete mode 100644 mistral/tests/unit/workflow/test_base.py create mode 100644 mistral/tests/unit/workflow/test_workflow_base.py diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 46cdcb37..6c70c050 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -73,10 +73,7 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ex = db_api.get_workflow_execution(wf_ex_id) wf_handler.set_execution_state(wf_ex, states.RUNNING) - wf_ctrl = wf_base.WorkflowController.get_controller( - wf_ex, - wf_spec - ) + wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) self._dispatch_workflow_commands( wf_ex, @@ -178,7 +175,7 @@ class DefaultEngine(base.Engine, coordination.Service): if task_ex.state == states.RUNNING_DELAYED: return - wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex, wf_spec) + wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) # Calculate commands to process next. cmds = wf_ctrl.continue_workflow() @@ -302,7 +299,7 @@ class DefaultEngine(base.Engine, coordination.Service): set_upstream=True ) - wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex) + wf_ctrl = wf_base.get_controller(wf_ex) # Calculate commands to process next. cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) @@ -390,7 +387,7 @@ class DefaultEngine(base.Engine, coordination.Service): @staticmethod def _stop_workflow(wf_ex, state, message=None): if state == states.SUCCESS: - wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex) + wf_ctrl = wf_base.get_controller(wf_ex) final_context = {} try: @@ -469,6 +466,7 @@ class DefaultEngine(base.Engine, coordination.Service): task_handler.on_action_complete( action_ex, + spec_parser.get_workflow_spec(wf_ex.spec), wf_utils.Result(error=err_msg) ) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index f9d10094..7985f245 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -162,7 +162,7 @@ def on_action_complete(action_ex, wf_spec, result): scheduled for execution. :param action_ex: Action execution objects the result belongs to. - :param wf_spec: Worflow specification. + :param wf_spec: Workflow specification. :param result: Task action/workflow output wrapped into mistral.workflow.utils.Result instance. :return List of engine commands that need to be performed. diff --git a/mistral/tests/unit/workflow/test_base.py b/mistral/tests/unit/workflow/test_base.py deleted file mode 100644 index e2b41f0c..00000000 --- a/mistral/tests/unit/workflow/test_base.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2015 - Huawei Technologies Co. Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import mock - -from mistral import exceptions -from mistral.tests.unit import base -from mistral.workflow import base as wf_base -from mistral.workflow import direct_workflow -from mistral.workflow import reverse_workflow - - -class WorkflowControllerTest(base.BaseTest): - def test_get_class_direct(self): - wf_handler_cls = wf_base.WorkflowController._get_class("direct") - - self.assertIs(wf_handler_cls, direct_workflow.DirectWorkflowController) - - def test_get_class_reverse(self): - wf_handler_cls = wf_base.WorkflowController._get_class("reverse") - - self.assertIs(wf_handler_cls, - reverse_workflow.ReverseWorkflowController) - - def test_get_class_notfound(self): - exc = self.assertRaises( - exceptions.NotFoundException, - wf_base.WorkflowController._get_class, - "invalid" - ) - - self.assertIn("Failed to find a workflow controller", str(exc)) - - @mock.patch("mistral.workbook.parser.get_workflow_spec") - @mock.patch("mistral.workflow.base.WorkflowController._get_class") - def test_get_handler(self, mock_get_class, mock_get_spec): - mock_wf_spec = mock.MagicMock() - mock_wf_spec.get_type.return_value = "direct" - mock_get_spec.return_value = mock_wf_spec - mock_handler_cls = mock.MagicMock() - mock_get_class.return_value = mock_handler_cls - wf_ex = {"spec": "spec"} - - wf_base.WorkflowController.get_controller(wf_ex) - - mock_get_spec.assert_called_once_with("spec") - mock_get_class.assert_called_once_with("direct") - mock_handler_cls.assert_called_once_with(wf_ex, mock_wf_spec) diff --git a/mistral/tests/unit/workflow/test_workflow_base.py b/mistral/tests/unit/workflow/test_workflow_base.py new file mode 100644 index 00000000..b2b7d2ea --- /dev/null +++ b/mistral/tests/unit/workflow/test_workflow_base.py @@ -0,0 +1,67 @@ +# Copyright 2015 - Huawei Technologies Co. Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from mistral.tests.unit import base +from mistral.workbook import parser as spec_parser +from mistral.workflow import base as wf_base +from mistral.workflow import direct_workflow as direct_wf +from mistral.workflow import reverse_workflow as reverse_wf + +from mistral.db.v2.sqlalchemy import models as db_models + + +DIRECT_WF = """ +--- +version: '2.0' + +wf: + type: direct + + tasks: + task1: + action: std.echo output="Hey" +""" + +REVERSE_WF = """ +--- +version: '2.0' + +wf: + type: reverse + + tasks: + task1: + action: std.echo output="Hey" +""" + + +class WorkflowControllerTest(base.BaseTest): + def test_get_controller_direct(self): + wf_spec = spec_parser.get_workflow_list_spec_from_yaml(DIRECT_WF)[0] + wf_ex = db_models.WorkflowExecution(spec=wf_spec.to_dict()) + + self.assertIsInstance( + wf_base.get_controller(wf_ex, wf_spec), + direct_wf.DirectWorkflowController + ) + + def test_get_controller_reverse(self): + wf_spec = spec_parser.get_workflow_list_spec_from_yaml(REVERSE_WF)[0] + wf_ex = db_models.WorkflowExecution(spec=wf_spec.to_dict()) + + self.assertIsInstance( + wf_base.get_controller(wf_ex, wf_spec), + reverse_wf.ReverseWorkflowController + ) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 21cceccc..ee1b9fe6 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -31,6 +31,35 @@ from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) +def get_controller(wf_ex, wf_spec=None): + """Gets a workflow controller instance by given workflow execution object. + + :param wf_ex: Workflow execution object. + :param wf_spec: Workflow specification object. If passed, the method works + faster. + :returns: Workflow controller class. + """ + + if not wf_spec: + wf_spec = spec_parser.get_workflow_spec(wf_ex['spec']) + + wf_type = wf_spec.get_type() + + ctrl_cls = None + + for cls in u.iter_subclasses(WorkflowController): + if cls.__workflow_type__ == wf_type: + ctrl_cls = cls + break + + if not ctrl_cls: + raise exc.NotFoundException( + 'Failed to find a workflow controller [type=%s]' % wf_type + ) + + return ctrl_cls(wf_ex, wf_spec) + + class WorkflowController(object): """Workflow Controller base class. @@ -48,8 +77,10 @@ class WorkflowController(object): :param wf_spec: Workflow specification. """ self.wf_ex = wf_ex + if wf_spec is None: wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + self.wf_spec = wf_spec def _update_task_ex_env(self, task_ex, env): @@ -178,28 +209,3 @@ class WorkflowController(object): def _is_paused_or_completed(self): return states.is_paused_or_completed(self.wf_ex.state) - - @staticmethod - def _get_class(wf_type): - """Gets a workflow controller class by given workflow type. - - :param wf_type: Workflow type. - :returns: Workflow controller class. - """ - for wf_ctrl_cls in u.iter_subclasses(WorkflowController): - if wf_type == wf_ctrl_cls.__workflow_type__: - return wf_ctrl_cls - - raise exc.NotFoundException( - 'Failed to find a workflow controller [type=%s]' % wf_type - ) - - @staticmethod - def get_controller(wf_ex, wf_spec=None): - if not wf_spec: - wf_spec = spec_parser.get_workflow_spec(wf_ex['spec']) - - return WorkflowController._get_class(wf_spec.get_type())( - wf_ex, - wf_spec - )