Adding "requires" to "task-defaults" clause
* Reverse workflows can now use "requires" declared in "task-defaults", for example, to have all tasks depending on a configured task (except this task itself) * Fixed a bug with "processed" flag, it shouldn't be taken into account for reverse workflows because if say task3 depends on task2 then these tasks can't depend on a task1 at the same time Change-Id: Iad113e405f43125b37d16b91fcf448c21cd354e6
This commit is contained in:
parent
03ff68c2d6
commit
fa0102354b
@ -29,35 +29,34 @@ LOG = logging.getLogger(__name__)
|
||||
# the change in value is not permanent.
|
||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
DIRECT_WF_ON_ERROR = """
|
||||
---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: direct
|
||||
|
||||
task-defaults:
|
||||
on-error:
|
||||
- task3
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
description: That should lead to transition to task3.
|
||||
action: std.http url="http://some_url"
|
||||
on-success:
|
||||
- task2
|
||||
|
||||
task2:
|
||||
action: std.echo output="Morpheus"
|
||||
|
||||
task3:
|
||||
action: std.echo output="output"
|
||||
"""
|
||||
|
||||
|
||||
class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase):
|
||||
def test_task_defaults_on_error(self):
|
||||
wf_service.create_workflows(DIRECT_WF_ON_ERROR)
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: direct
|
||||
|
||||
task-defaults:
|
||||
on-error:
|
||||
- task3
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
description: That should lead to transition to task3.
|
||||
action: std.http url="http://some_url"
|
||||
on-success:
|
||||
- task2
|
||||
|
||||
task2:
|
||||
action: std.echo output="Morpheus"
|
||||
|
||||
task3:
|
||||
action: std.echo output="output"
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
@ -77,64 +76,6 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase):
|
||||
self.assertEqual(states.SUCCESS, task3.state)
|
||||
|
||||
|
||||
REVERSE_WF_RETRY = """
|
||||
---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: reverse
|
||||
|
||||
task-defaults:
|
||||
retry:
|
||||
count: 2
|
||||
delay: 1
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.fail
|
||||
|
||||
task2:
|
||||
action: std.echo output=2
|
||||
requires: [task1]
|
||||
"""
|
||||
|
||||
|
||||
REVERSE_WF_TIMEOUT = """
|
||||
---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: reverse
|
||||
|
||||
task-defaults:
|
||||
timeout: 1
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.async_noop
|
||||
|
||||
task2:
|
||||
action: std.echo output=2
|
||||
requires: [task1]
|
||||
"""
|
||||
|
||||
REVERSE_WF_WAIT = """
|
||||
---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: reverse
|
||||
|
||||
task-defaults:
|
||||
wait-before: 1
|
||||
wait-after: 1
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output=1
|
||||
"""
|
||||
|
||||
|
||||
class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
def setUp(self):
|
||||
super(TaskDefaultsReverseWorkflowEngineTest, self).setUp()
|
||||
@ -144,7 +85,27 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
self.addCleanup(thread_group.stop)
|
||||
|
||||
def test_task_defaults_retry_policy(self):
|
||||
wf_service.create_workflows(REVERSE_WF_RETRY)
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: reverse
|
||||
|
||||
task-defaults:
|
||||
retry:
|
||||
count: 2
|
||||
delay: 1
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.fail
|
||||
|
||||
task2:
|
||||
action: std.echo output=2
|
||||
requires: [task1]
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
|
||||
@ -170,7 +131,25 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
|
||||
@testtools.skip("Fix 'timeout' policy.")
|
||||
def test_task_defaults_timeout_policy(self):
|
||||
wf_service.create_workflows(REVERSE_WF_TIMEOUT)
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: reverse
|
||||
|
||||
task-defaults:
|
||||
timeout: 1
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.async_noop
|
||||
|
||||
task2:
|
||||
action: std.echo output=2
|
||||
requires: [task1]
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
|
||||
@ -187,7 +166,22 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
self._assert_single_item(tasks, name='task1', state=states.ERROR)
|
||||
|
||||
def test_task_defaults_wait_policies(self):
|
||||
wf_service.create_workflows(REVERSE_WF_WAIT)
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: reverse
|
||||
|
||||
task-defaults:
|
||||
wait-before: 1
|
||||
wait-after: 1
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output=1
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
time_before = dt.datetime.now()
|
||||
|
||||
@ -210,3 +204,44 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
self.assertEqual(1, len(tasks))
|
||||
|
||||
self._assert_single_item(tasks, name='task1', state=states.SUCCESS)
|
||||
|
||||
def test_task_defaults_requires(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: reverse
|
||||
|
||||
task-defaults:
|
||||
requires: [always_do]
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output=1
|
||||
|
||||
task2:
|
||||
action: std.echo output=1
|
||||
requires: [task1]
|
||||
|
||||
always_do:
|
||||
action: std.echo output="Do something"
|
||||
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
|
||||
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(3, len(tasks))
|
||||
|
||||
self._assert_single_item(tasks, name='task1', state=states.SUCCESS)
|
||||
self._assert_single_item(tasks, name='task2', state=states.SUCCESS)
|
||||
self._assert_single_item(tasks, name='always_do', state=states.SUCCESS)
|
||||
|
@ -218,6 +218,13 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
|
||||
({'on-complete': []}, True),
|
||||
({'on-complete': ['email', 'email']}, True),
|
||||
({'on-complete': ['email', 12345]}, True),
|
||||
({'requires': ''}, True),
|
||||
({'requires': []}, True),
|
||||
({'requires': ['']}, True),
|
||||
({'requires': None}, True),
|
||||
({'requires': 12345}, True),
|
||||
({'requires': ['echo']}, False),
|
||||
({'requires': ['echo', 'get']}, False),
|
||||
({'retry': {'count': 3, 'delay': 1}}, False),
|
||||
({'retry': {'count': '<% 3 %>', 'delay': 1}}, False),
|
||||
({'retry': {'count': '<% * %>', 'delay': 1}}, True),
|
||||
|
@ -43,7 +43,8 @@ class TaskDefaultsSpec(base.BaseSpec):
|
||||
"concurrency": policies.CONCURRENCY_SCHEMA,
|
||||
"on-complete": _on_clause_type,
|
||||
"on-success": _on_clause_type,
|
||||
"on-error": _on_clause_type
|
||||
"on-error": _on_clause_type,
|
||||
"requires": types.UNIQUE_STRING_LIST
|
||||
},
|
||||
"additionalProperties": False
|
||||
}
|
||||
@ -67,6 +68,7 @@ class TaskDefaultsSpec(base.BaseSpec):
|
||||
self._on_complete = self._as_list_of_tuples("on-complete")
|
||||
self._on_success = self._as_list_of_tuples("on-success")
|
||||
self._on_error = self._as_list_of_tuples("on-error")
|
||||
self._requires = data.get('requires', [])
|
||||
|
||||
def validate(self):
|
||||
super(TaskDefaultsSpec, self).validate()
|
||||
@ -93,3 +95,6 @@ class TaskDefaultsSpec(base.BaseSpec):
|
||||
|
||||
def get_on_error(self):
|
||||
return self._on_error
|
||||
|
||||
def get_requires(self):
|
||||
return self._requires
|
||||
|
@ -289,9 +289,6 @@ class ReverseWorkflowTaskSpec(TaskSpec):
|
||||
self._requires = data.get('requires', [])
|
||||
|
||||
def get_requires(self):
|
||||
if isinstance(self._requires, six.string_types):
|
||||
return [self._requires]
|
||||
|
||||
return self._requires
|
||||
|
||||
|
||||
|
@ -73,7 +73,7 @@ class ReverseWorkflowController(base.WorkflowController):
|
||||
def _get_upstream_task_executions(self, task_spec):
|
||||
t_specs = [
|
||||
self.wf_spec.get_tasks()[t_name]
|
||||
for t_name in task_spec.get_requires()
|
||||
for t_name in self._get_task_requires(task_spec)
|
||||
or []
|
||||
]
|
||||
|
||||
@ -119,16 +119,16 @@ class ReverseWorkflowController(base.WorkflowController):
|
||||
if task_ex:
|
||||
return False
|
||||
|
||||
if not task_spec.get_requires():
|
||||
if not self._get_task_requires(task_spec):
|
||||
return True
|
||||
|
||||
success_task_names = set()
|
||||
success_t_names = set()
|
||||
|
||||
for t_ex in self.wf_ex.task_executions:
|
||||
if t_ex.state == states.SUCCESS and not t_ex.processed:
|
||||
success_task_names.add(t_ex.name)
|
||||
if t_ex.state == states.SUCCESS:
|
||||
success_t_names.add(t_ex.name)
|
||||
|
||||
return not (set(task_spec.get_requires()) - success_task_names)
|
||||
return not (set(self._get_task_requires(task_spec)) - success_t_names)
|
||||
|
||||
def _build_graph(self, tasks_spec):
|
||||
graph = nx.DiGraph()
|
||||
@ -144,9 +144,8 @@ class ReverseWorkflowController(base.WorkflowController):
|
||||
|
||||
return graph
|
||||
|
||||
@staticmethod
|
||||
def _get_dependency_tasks(tasks_spec, task_spec):
|
||||
dep_task_names = tasks_spec[task_spec.get_name()].get_requires()
|
||||
def _get_dependency_tasks(self, tasks_spec, task_spec):
|
||||
dep_task_names = self._get_task_requires(task_spec)
|
||||
|
||||
if len(dep_task_names) == 0:
|
||||
return []
|
||||
@ -159,3 +158,15 @@ class ReverseWorkflowController(base.WorkflowController):
|
||||
dep_t_specs.add(t_spec)
|
||||
|
||||
return dep_t_specs
|
||||
|
||||
def _get_task_requires(self, task_spec):
|
||||
requires = set(task_spec.get_requires())
|
||||
|
||||
task_defaults = self.wf_spec.get_task_defaults()
|
||||
|
||||
if task_defaults:
|
||||
requires |= set(task_defaults.get_requires())
|
||||
|
||||
requires.discard(task_spec.get_name())
|
||||
|
||||
return list(requires)
|
||||
|
Loading…
Reference in New Issue
Block a user