Working on "join": making "full join" work with conditional transitions

Change-Id: I465b80f261df56a7cb957522cac21841b9298ef5
This commit is contained in:
Renat Akhmerov 2014-12-05 15:32:26 +06:00
parent d6f1f6f8eb
commit 7b2c7c170e
4 changed files with 91 additions and 21 deletions

View File

@ -83,9 +83,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
# Start workflow.
exec_db = self.engine.start_workflow('wb.wf1', {})
self._await(
lambda: self.is_execution_error(exec_db.id)
)
self._await(lambda: self.is_execution_error(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
@ -96,12 +94,8 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(3, len(tasks))
self._await(
lambda: self.is_task_success(task3.id)
)
self._await(
lambda: self.is_task_success(task4.id)
)
self._await(lambda: self.is_task_success(task3.id))
self._await(lambda: self.is_task_success(task4.id))
def test_wrong_task_input(self):
wb_service.create_workbook_v2(WORKBOOK_WRONG_TASK_INPUT)
@ -109,9 +103,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
# Start workflow.
exec_db = self.engine.start_workflow('wb.wf', {})
self._await(
lambda: self.is_execution_error(exec_db.id)
)
self._await(lambda: self.is_execution_error(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]

View File

@ -27,7 +27,7 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
WF_FULL_JOIN = """
---
version: "2.0"
version: 2.0
wf:
type: direct
@ -60,7 +60,7 @@ wf:
WF_FULL_JOIN_WITH_ERRORS = """
---
version: "2.0"
version: 2.0
wf:
type: direct
@ -88,6 +88,44 @@ wf:
result3: $
"""
WF_FULL_JOIN_WITH_CONDITIONS = """
---
version: 2.0
wf:
type: direct
output:
result: $.result4
tasks:
task1:
action: std.echo output=1
publish:
result1: $
on-complete:
- task3
task2:
action: std.echo output=2
publish:
result2: $
on-complete:
- task3: $.result2 = 11111
- task4: $.result2 = 2
task3:
join: all
action: std.echo output="{$.result1}-{$.result1}"
publish:
result3: $
task4:
action: std.echo output=4
publish:
result4: $
"""
class JoinEngineTest(base.EngineTestCase):
def test_full_join_without_errors(self):
@ -136,6 +174,31 @@ class JoinEngineTest(base.EngineTestCase):
self.assertDictEqual({'result': '1-1'}, exec_db.output)
def test_full_join_with_conditions(self):
wf_service.create_workflows(WF_FULL_JOIN_WITH_CONDITIONS)
# Start workflow.
exec_db = self.engine.start_workflow('wf', {})
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
tasks = exec_db.tasks
self.assertEqual(3, len(tasks))
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task4 = self._assert_single_item(tasks, name='task4')
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(states.SUCCESS, task2.state)
self.assertEqual(states.SUCCESS, task4.state)
self.assertDictEqual({'result': 4}, exec_db.output)
def test_partial_join(self):
# TODO(rakhmerov): Implement.
pass

View File

@ -207,8 +207,7 @@ class WorkflowHandler(object):
:return: List of engine commands.
"""
def filter_task_cmds(cmds):
return [cmd for cmd in cmds
if isinstance(cmd, commands.RunTask)]
return [cmd for cmd in cmds if isinstance(cmd, commands.RunTask)]
def get_tasks_to_schedule(task_db, schedule_tasks):
"""Finds tasks that should run after given task and searches them

View File

@ -82,7 +82,7 @@ class DirectWorkflowHandler(base.WorkflowHandler):
return to_task_name in t_names
def _find_next_commands(self, task_db):
def _find_next_commands(self, task_db, remove_incomplete_joins=True):
"""Finds commands that should run after completing given task.
Expression 'on_complete' is not mutually exclusive to 'on_success'
@ -117,7 +117,10 @@ class DirectWorkflowHandler(base.WorkflowHandler):
LOG.debug("Found commands: %s" % cmds)
return self._remove_incomplete_joins(cmds)
if remove_incomplete_joins:
return self._remove_incomplete_joins(cmds)
else:
return cmds
def _is_error_handled(self, task_db):
return self.get_on_error_clause(task_db.name)
@ -203,9 +206,22 @@ class DirectWorkflowHandler(base.WorkflowHandler):
return False
for in_t_s in in_task_specs:
in_t_db = wf_utils.find_db_task(self.exec_db, in_t_s)
if not in_t_db or not states.is_completed(in_t_db.state):
if not self._triggers_join(task_spec, in_t_s):
return True
return False
def _triggers_join(self, join_task_spec, inbound_task_spec):
in_t_db = wf_utils.find_db_task(self.exec_db, inbound_task_spec)
if not in_t_db or not states.is_completed(in_t_db.state):
return False
def is_join_task(cmd):
return (isinstance(cmd, commands.RunTask)
and cmd.task_spec == join_task_spec)
return filter(
lambda cmd: is_join_task(cmd),
self._find_next_commands(in_t_db, False)
)