Merge "Refactor workflow controller and fix a bug in _fail_workflow()" into stable/mitaka

This commit is contained in:
Jenkins 2016-03-31 04:47:49 +00:00 committed by Gerrit Code Review
commit 4ebf957d9d
5 changed files with 104 additions and 92 deletions

View File

@ -73,10 +73,7 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_handler.set_execution_state(wf_ex, states.RUNNING)
wf_ctrl = wf_base.WorkflowController.get_controller(
wf_ex,
wf_spec
)
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
self._dispatch_workflow_commands(
wf_ex,
@ -178,7 +175,7 @@ class DefaultEngine(base.Engine, coordination.Service):
if task_ex.state == states.RUNNING_DELAYED:
return
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex, wf_spec)
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow()
@ -302,7 +299,7 @@ class DefaultEngine(base.Engine, coordination.Service):
set_upstream=True
)
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
wf_ctrl = wf_base.get_controller(wf_ex)
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
@ -390,7 +387,7 @@ class DefaultEngine(base.Engine, coordination.Service):
@staticmethod
def _stop_workflow(wf_ex, state, message=None):
if state == states.SUCCESS:
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
wf_ctrl = wf_base.get_controller(wf_ex)
final_context = {}
try:
@ -469,6 +466,7 @@ class DefaultEngine(base.Engine, coordination.Service):
task_handler.on_action_complete(
action_ex,
spec_parser.get_workflow_spec(wf_ex.spec),
wf_utils.Result(error=err_msg)
)

View File

@ -162,7 +162,7 @@ def on_action_complete(action_ex, wf_spec, result):
scheduled for execution.
:param action_ex: Action execution objects the result belongs to.
:param wf_spec: Worflow specification.
:param wf_spec: Workflow specification.
:param result: Task action/workflow output wrapped into
mistral.workflow.utils.Result instance.
:return List of engine commands that need to be performed.

View File

@ -1,59 +0,0 @@
# Copyright 2015 - Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from mistral import exceptions
from mistral.tests.unit import base
from mistral.workflow import base as wf_base
from mistral.workflow import direct_workflow
from mistral.workflow import reverse_workflow
class WorkflowControllerTest(base.BaseTest):
def test_get_class_direct(self):
wf_handler_cls = wf_base.WorkflowController._get_class("direct")
self.assertIs(wf_handler_cls, direct_workflow.DirectWorkflowController)
def test_get_class_reverse(self):
wf_handler_cls = wf_base.WorkflowController._get_class("reverse")
self.assertIs(wf_handler_cls,
reverse_workflow.ReverseWorkflowController)
def test_get_class_notfound(self):
exc = self.assertRaises(
exceptions.NotFoundException,
wf_base.WorkflowController._get_class,
"invalid"
)
self.assertIn("Failed to find a workflow controller", str(exc))
@mock.patch("mistral.workbook.parser.get_workflow_spec")
@mock.patch("mistral.workflow.base.WorkflowController._get_class")
def test_get_handler(self, mock_get_class, mock_get_spec):
mock_wf_spec = mock.MagicMock()
mock_wf_spec.get_type.return_value = "direct"
mock_get_spec.return_value = mock_wf_spec
mock_handler_cls = mock.MagicMock()
mock_get_class.return_value = mock_handler_cls
wf_ex = {"spec": "spec"}
wf_base.WorkflowController.get_controller(wf_ex)
mock_get_spec.assert_called_once_with("spec")
mock_get_class.assert_called_once_with("direct")
mock_handler_cls.assert_called_once_with(wf_ex, mock_wf_spec)

View File

@ -0,0 +1,67 @@
# Copyright 2015 - Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.tests.unit import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import direct_workflow as direct_wf
from mistral.workflow import reverse_workflow as reverse_wf
from mistral.db.v2.sqlalchemy import models as db_models
DIRECT_WF = """
---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Hey"
"""
REVERSE_WF = """
---
version: '2.0'
wf:
type: reverse
tasks:
task1:
action: std.echo output="Hey"
"""
class WorkflowControllerTest(base.BaseTest):
def test_get_controller_direct(self):
wf_spec = spec_parser.get_workflow_list_spec_from_yaml(DIRECT_WF)[0]
wf_ex = db_models.WorkflowExecution(spec=wf_spec.to_dict())
self.assertIsInstance(
wf_base.get_controller(wf_ex, wf_spec),
direct_wf.DirectWorkflowController
)
def test_get_controller_reverse(self):
wf_spec = spec_parser.get_workflow_list_spec_from_yaml(REVERSE_WF)[0]
wf_ex = db_models.WorkflowExecution(spec=wf_spec.to_dict())
self.assertIsInstance(
wf_base.get_controller(wf_ex, wf_spec),
reverse_wf.ReverseWorkflowController
)

View File

@ -31,6 +31,35 @@ from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
def get_controller(wf_ex, wf_spec=None):
"""Gets a workflow controller instance by given workflow execution object.
:param wf_ex: Workflow execution object.
:param wf_spec: Workflow specification object. If passed, the method works
faster.
:returns: Workflow controller class.
"""
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec(wf_ex['spec'])
wf_type = wf_spec.get_type()
ctrl_cls = None
for cls in u.iter_subclasses(WorkflowController):
if cls.__workflow_type__ == wf_type:
ctrl_cls = cls
break
if not ctrl_cls:
raise exc.NotFoundException(
'Failed to find a workflow controller [type=%s]' % wf_type
)
return ctrl_cls(wf_ex, wf_spec)
class WorkflowController(object):
"""Workflow Controller base class.
@ -48,8 +77,10 @@ class WorkflowController(object):
:param wf_spec: Workflow specification.
"""
self.wf_ex = wf_ex
if wf_spec is None:
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
self.wf_spec = wf_spec
def _update_task_ex_env(self, task_ex, env):
@ -178,28 +209,3 @@ class WorkflowController(object):
def _is_paused_or_completed(self):
return states.is_paused_or_completed(self.wf_ex.state)
@staticmethod
def _get_class(wf_type):
"""Gets a workflow controller class by given workflow type.
:param wf_type: Workflow type.
:returns: Workflow controller class.
"""
for wf_ctrl_cls in u.iter_subclasses(WorkflowController):
if wf_type == wf_ctrl_cls.__workflow_type__:
return wf_ctrl_cls
raise exc.NotFoundException(
'Failed to find a workflow controller [type=%s]' % wf_type
)
@staticmethod
def get_controller(wf_ex, wf_spec=None):
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec(wf_ex['spec'])
return WorkflowController._get_class(wf_spec.get_type())(
wf_ex,
wf_spec
)