Fixing workflow handlers to return all possible commands

Change-Id: I0f2f7746d7071c5eded30e75e41e80dfbe792af7
This commit is contained in:
Renat Akhmerov 2014-09-15 16:10:13 -07:00
parent e458708b73
commit 3015931b7f
6 changed files with 40 additions and 29 deletions

View File

@ -56,6 +56,8 @@ class RunTask(EngineCommand):
self._before_task_start()
self._run_task()
return True
def _prepare_task(self, exec_db, wf_handler):
if self.task_db:
return
@ -195,10 +197,16 @@ class RollbackWorkflow(EngineCommand):
pass
CMD_MAP = {
'run_task': RunTask,
RESERVED_COMMANDS = {
'fail': FailWorkflow,
'succeed': SucceedWorkflow,
'pause': PauseWorkflow,
'rollback': PauseWorkflow
}
def get_reserved_command(cmd_name):
if cmd_name not in RESERVED_COMMANDS:
return None
return RESERVED_COMMANDS[cmd_name]()

View File

@ -127,7 +127,8 @@ class DefaultEngine(base.Engine):
return
for cmd in commands:
cmd.run(exec_db, wf_handler)
if not cmd.run(exec_db, wf_handler):
break
@staticmethod
def _create_db_execution(wf_db, wf_spec, wf_input, params):

View File

@ -13,7 +13,6 @@
# limitations under the License.
from oslo.config import cfg
import testtools
from mistral.db.v2 import api as db_api
from mistral.openstack.common import log as logging
@ -53,8 +52,6 @@ workflows:
"""
# TODO(rakhmerov): Delete this decorator when tests pass.
@testtools.skip('')
class EngineInstructionsTest(base.EngineTestCase):
def setUp(self):
super(EngineInstructionsTest, self).setUp()

View File

@ -14,7 +14,6 @@
import abc
from mistral.engine1 import commands
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral import utils
@ -78,9 +77,9 @@ class WorkflowHandler(object):
return []
task_specs = self._find_next_tasks(task_db)
commands = self._find_next_commands(task_db)
if len(task_specs) == 0:
if len(commands) == 0:
# If there are no running tasks at this point we can conclude that
# the workflow has finished.
if not self._find_running_tasks():
@ -98,16 +97,16 @@ class WorkflowHandler(object):
task_out_ctx
)
return [commands.RunTask(t_s) for t_s in task_specs]
return commands
@abc.abstractmethod
def _find_next_tasks(self, task_db):
"""Finds tasks that should run next.
def _find_next_commands(self, task_db):
"""Finds commands that should run next.
A concrete algorithm of finding such tasks depends on a concrete
workflow handler.
:param task_db: Task DB model causing the operation (completed).
:return: List of task specifications.
:return: List of engine commands.
"""
raise NotImplementedError

View File

@ -46,15 +46,15 @@ class DirectWorkflowHandler(base.WorkflowHandler):
# so we may need to get rid of it at all.
return []
def _find_next_tasks(self, task_db):
"""Finds tasks that should run after completing given task.
def _find_next_commands(self, task_db):
"""Finds commands that should run after completing given task.
Expression 'on_complete' is not mutually exclusive to 'on_success'
and 'on_error'.
:param task_db: Task DB model.
:return: List of task specifications.
"""
task_specs = []
commands = []
t_name = task_db.name
t_state = task_db.state
@ -67,29 +67,34 @@ class DirectWorkflowHandler(base.WorkflowHandler):
on_error = tasks_spec[t_name].get_on_error()
if on_error:
task_specs = self._get_tasks_to_schedule(on_error, ctx)
commands = self._get_next_commands(on_error, ctx)
elif t_state == states.SUCCESS:
on_success = tasks_spec[t_name].get_on_success()
if on_success:
task_specs = self._get_tasks_to_schedule(on_success, ctx)
commands = self._get_next_commands(on_success, ctx)
if states.is_finished(t_state):
on_complete = tasks_spec[t_name].get_on_complete()
if on_complete:
task_specs += self._get_tasks_to_schedule(on_complete, ctx)
commands += self._get_next_commands(on_complete, ctx)
LOG.debug("Found tasks: %s" % task_specs)
LOG.debug("Found commands: %s" % commands)
return task_specs
return commands
def _get_tasks_to_schedule(self, task_conditions, ctx):
task_specs = []
def _get_next_commands(self, cmd_conditions, ctx):
commands = []
for t_name, condition in task_conditions.iteritems():
for t_name, condition in cmd_conditions.iteritems():
if not condition or expr.evaluate(condition, ctx):
task_specs.append(self.wf_spec.get_tasks()[t_name])
commands.append(self.build_command(t_name))
return task_specs
return commands
def build_command(self, cmd_name):
cmd = commands.get_reserved_command(cmd_name)
return cmd or commands.RunTask(self.wf_spec.get_tasks()[cmd_name])

View File

@ -57,8 +57,9 @@ class ReverseWorkflowHandler(base.WorkflowHandler):
return [self.wf_spec.get_tasks()[t_name]
for t_name in task_spec.get_requires() or []]
def _find_next_tasks(self, task_db):
"""Finds all tasks with resolved dependencies.
def _find_next_commands(self, task_db):
"""Finds all tasks with resolved dependencies and return them
in the form of engine commands.
:param task_db: Task DB model causing the operation.
:return: Tasks with resolved dependencies.
@ -89,7 +90,7 @@ class ReverseWorkflowHandler(base.WorkflowHandler):
if not t_db or t_db.state == states.IDLE:
resolved_task_specs.append(t_spec)
return resolved_task_specs
return [commands.RunTask(t_s) for t_s in resolved_task_specs]
def _find_tasks_without_dependencies(self, task_spec):
"""Given a target task name finds tasks with no dependencies.