Support subclass iteration for Workflow controller

Just as task specification list implementation, implement workflow
controllers class hierarchy, this patch includes:

* add two static methods in WorkflowController, _get_class() and
get_controller()
* add a variable for DirectWorkflowController and ReverseWorkflowController
respectively
* use subclasses iteration for WorkflowController
* make use of get_controller() when getting WorkflowController instance
* delete workflow_controller_factory.py
* import DirectWorkflowController and ReverseWorkflowController earlier
for WorkflowController's subclasses iteration
* add some unit tests

Change-Id: I34e61abbbd592f5252a69bd2803041dee1e49768
Implements: bp workflow-handler-design-improvements
This commit is contained in:
LingxianKong 2015-03-20 13:30:43 +08:00
parent cbab3c1e31
commit e84f091e1e
8 changed files with 113 additions and 51 deletions

View File

@ -0,0 +1,20 @@
# Copyright 2015 - Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_utils import importutils
# NOTE(xylan): import modules for WorkflowHandler subclasses iteration
importutils.import_module('mistral.workflow.direct_workflow')
importutils.import_module('mistral.workflow.reverse_workflow')

View File

@ -26,11 +26,11 @@ from mistral.openstack.common import log as logging
from mistral import utils as u from mistral import utils as u
from mistral.utils import wf_trace from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import commands from mistral.workflow import commands
from mistral.workflow import data_flow from mistral.workflow import data_flow
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
from mistral.workflow import workflow_controller_factory as wfc_factory
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -67,7 +67,7 @@ class DefaultEngine(base.Engine):
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name)
wf_ctrl = wfc_factory.create_workflow_controller( wf_ctrl = wf_base.WorkflowController.get_controller(
wf_ex, wf_ex,
wf_spec wf_spec
) )
@ -119,7 +119,7 @@ class DefaultEngine(base.Engine):
if task_ex.state == states.DELAYED: if task_ex.state == states.DELAYED:
return return
wf_ctrl = wfc_factory.create_workflow_controller(wf_ex) wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
# Calculate commands to process next. # Calculate commands to process next.
cmds = wf_ctrl.continue_workflow() cmds = wf_ctrl.continue_workflow()
@ -214,7 +214,7 @@ class DefaultEngine(base.Engine):
wf_handler.set_execution_state(wf_ex, states.RUNNING) wf_handler.set_execution_state(wf_ex, states.RUNNING)
wf_ctrl = wfc_factory.create_workflow_controller(wf_ex) wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
# Calculate commands to process next. # Calculate commands to process next.
cmds = wf_ctrl.continue_workflow() cmds = wf_ctrl.continue_workflow()

View File

@ -0,0 +1,59 @@
# Copyright 2015 - Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from mistral import exceptions
from mistral.tests import base
from mistral.workflow import base as wf_base
from mistral.workflow import direct_workflow
from mistral.workflow import reverse_workflow
class WorkflowControllerTest(base.BaseTest):
def test_get_class_direct(self):
wf_handler_cls = wf_base.WorkflowController._get_class("direct")
self.assertIs(wf_handler_cls, direct_workflow.DirectWorkflowController)
def test_get_class_reverse(self):
wf_handler_cls = wf_base.WorkflowController._get_class("reverse")
self.assertIs(wf_handler_cls,
reverse_workflow.ReverseWorkflowController)
def test_get_class_notfound(self):
exc = self.assertRaises(
exceptions.NotFoundException,
wf_base.WorkflowController._get_class,
"invalid"
)
self.assertIn("Failed to find a workflow controller", str(exc))
@mock.patch("mistral.workbook.parser.get_workflow_spec")
@mock.patch("mistral.workflow.base.WorkflowController._get_class")
def test_get_handler(self, mock_get_class, mock_get_spec):
mock_wf_spec = mock.MagicMock()
mock_wf_spec.get_type.return_value = "direct"
mock_get_spec.return_value = mock_wf_spec
mock_handler_cls = mock.MagicMock()
mock_get_class.return_value = mock_handler_cls
wf_ex = {"spec": "spec"}
wf_base.WorkflowController.get_controller(wf_ex)
mock_get_spec.assert_called_once_with("spec")
mock_get_class.assert_called_once_with("direct")
mock_handler_cls.assert_called_once_with(wf_ex)

View File

@ -1,5 +1,6 @@
# Copyright 2014 - Mirantis, Inc. # Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc. # Copyright 2015 - StackStorm, Inc.
# Copyright 2015 - Huawei Technologies Co. Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -16,6 +17,7 @@
import abc import abc
import copy import copy
from mistral import exceptions as exc
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
from mistral import utils as u from mistral import utils as u
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
@ -108,3 +110,26 @@ class WorkflowController(object):
def _is_paused_or_completed(self): def _is_paused_or_completed(self):
return states.is_paused_or_completed(self.wf_ex.state) return states.is_paused_or_completed(self.wf_ex.state)
@staticmethod
def _get_class(wf_type):
"""Gets a workflow controller class by given workflow type.
:param wf_type: Workflow type.
:returns: Workflow controller class.
"""
for wf_ctrl_cls in u.iter_subclasses(WorkflowController):
if wf_type == wf_ctrl_cls.__workflow_type__:
return wf_ctrl_cls
msg = 'Failed to find a workflow controller [type=%s]' % wf_type
raise exc.NotFoundException(msg)
@staticmethod
def get_controller(wf_ex, wf_spec=None):
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec(wf_ex['spec'])
ctrl_cls = WorkflowController._get_class(wf_spec.get_type())
return ctrl_cls(wf_ex)

View File

@ -38,6 +38,8 @@ class DirectWorkflowController(base.WorkflowController):
'A'->'B' and 'A'->'C' evaluate to true. 'A'->'B' and 'A'->'C' evaluate to true.
""" """
__workflow_type__ = "direct"
def _get_upstream_task_executions(self, task_spec): def _get_upstream_task_executions(self, task_spec):
return filter( return filter(
lambda t_e: self._is_upstream_task_execution(task_spec, t_e), lambda t_e: self._is_upstream_task_execution(task_spec, t_e),

View File

@ -38,6 +38,8 @@ class ReverseWorkflowController(base.WorkflowController):
when a dependency of 'A' is resolved, will run task 'A'. when a dependency of 'A' is resolved, will run task 'A'.
""" """
__workflow_type__ = "reverse"
def _find_next_commands(self): def _find_next_commands(self):
"""Finds all tasks with resolved dependencies and return them """Finds all tasks with resolved dependencies and return them
in the form of workflow commands. in the form of workflow commands.

View File

@ -1,47 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral import exceptions as exc
from mistral.workbook import parser as spec_parser
from mistral.workflow import direct_workflow
from mistral.workflow import reverse_workflow
def create_workflow_controller(wf_ex, wf_spec=None):
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
cls = _select_workflow_controller(wf_spec)
if not cls:
msg = 'Failed to find a workflow controller [wf_spec=%s]' % wf_spec
raise exc.WorkflowException(msg)
return cls(wf_ex)
def _select_workflow_controller(wf_spec):
# TODO(rakhmerov): This algorithm is actually for DSL v2.
# TODO(rakhmerov): Take DSL versions into account.
wf_type = wf_spec.get_type() or 'direct'
if wf_type == 'reverse':
return reverse_workflow.ReverseWorkflowController
if wf_type == 'direct':
return direct_workflow.DirectWorkflowController
return None

View File

@ -14,6 +14,7 @@ kombu>=2.4.8
oslo.config>=1.4.0 # Apache-2.0 oslo.config>=1.4.0 # Apache-2.0
oslo.db>=1.0.0 # Apache-2.0 oslo.db>=1.0.0 # Apache-2.0
oslo.messaging>=1.4.0 oslo.messaging>=1.4.0
oslo.utils>=1.2.0 # Apache-2.0
paramiko>=1.13.0 paramiko>=1.13.0
python-cinderclient>=1.1.0 python-cinderclient>=1.1.0
python-heatclient>=0.2.9 python-heatclient>=0.2.9