Merge "Add semantics validation of direct workflow 'join' tasks"
This commit is contained in:
commit
b67d9f814d
@ -95,6 +95,7 @@ workflows:
|
||||
on-complete:
|
||||
- task9
|
||||
- task10
|
||||
- task11
|
||||
|
||||
task9:
|
||||
join: all
|
||||
|
@ -72,9 +72,12 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
|
||||
for task, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {'task1': task}}}
|
||||
self._parse_dsl_spec(add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_inputs(self):
|
||||
tests = [
|
||||
@ -89,10 +92,14 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
|
||||
for task_input, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {'task1': {'action': 'test.mock'}}}}
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks']['task1'], task_input)
|
||||
self._parse_dsl_spec(add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_with_items(self):
|
||||
tests = [
|
||||
@ -114,9 +121,12 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
|
||||
for with_item, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {'get': with_item}}}
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_publish(self):
|
||||
tests = [
|
||||
@ -131,10 +141,14 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
|
||||
for output, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {'task1': {'action': 'test.mock'}}}}
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks']['task1'], output)
|
||||
self._parse_dsl_spec(add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_policies(self):
|
||||
tests = [
|
||||
@ -191,9 +205,11 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
for policy, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {'get': policy}}}
|
||||
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_direct_transition(self):
|
||||
tests = [
|
||||
@ -237,9 +253,11 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'get': transition})
|
||||
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_join(self):
|
||||
tests = [
|
||||
@ -248,7 +266,8 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
({'join': 'all'}, False),
|
||||
({'join': 'one'}, False),
|
||||
({'join': 0}, False),
|
||||
({'join': 3}, False),
|
||||
({'join': 2}, False),
|
||||
({'join': 3}, True),
|
||||
({'join': '3'}, True),
|
||||
({'join': -3}, True)
|
||||
]
|
||||
@ -257,12 +276,16 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
|
||||
for join, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {}}}
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'get': on_success})
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'echo': on_success})
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'email': join})
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_requires(self):
|
||||
tests = [
|
||||
@ -278,11 +301,15 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
|
||||
for require, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {}}}
|
||||
|
||||
utils.merge_dicts(overlay['test'], {'type': 'reverse'})
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'email': require})
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_keep_result(self):
|
||||
tests = [
|
||||
@ -300,7 +327,11 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
|
||||
for keep_result, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {}}}
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'email': keep_result})
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
@ -35,9 +35,12 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
|
||||
|
||||
for wf_type, expect_error in tests:
|
||||
overlay = {'test': wf_type}
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error)
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_direct_workflow(self):
|
||||
overlay = {'test': {'type': 'direct', 'tasks': {}}}
|
||||
@ -48,9 +51,11 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'echo': on_success})
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'email': join})
|
||||
|
||||
wfs_spec = self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=False)
|
||||
wfs_spec = self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=False
|
||||
)
|
||||
|
||||
self.assertEqual(1, len(wfs_spec.get_workflows()))
|
||||
self.assertEqual('test', wfs_spec.get_workflows()[0].get_name())
|
||||
@ -67,9 +72,11 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'email': requires})
|
||||
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=True)
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=True
|
||||
)
|
||||
|
||||
def test_direct_workflow_no_start_tasks(self):
|
||||
overlay = {
|
||||
@ -82,9 +89,42 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
|
||||
}
|
||||
}
|
||||
|
||||
self._parse_dsl_spec(add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=True)
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=True
|
||||
)
|
||||
|
||||
def test_direct_workflow_invalid_join(self):
|
||||
tests = [
|
||||
({'task3': {'join': 2}}, False),
|
||||
({'task3': {'join': 5}}, True),
|
||||
({'task3': {'join': 1}}, False),
|
||||
({'task3': {'join': 'one'}}, False),
|
||||
({'task3': {'join': 'all'}}, False),
|
||||
({'task4': {'join': 'all'}}, True),
|
||||
({'task4': {'join': 1}}, True),
|
||||
({'task4': {'join': 'one'}}, True)
|
||||
]
|
||||
|
||||
for test in tests:
|
||||
overlay = {
|
||||
'test': {
|
||||
'type': 'direct',
|
||||
'tasks': {
|
||||
'task1': {'on-complete': 'task3'},
|
||||
'task2': {'on-complete': 'task3'}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks'], test[0])
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=False,
|
||||
changes=overlay,
|
||||
expect_error=test[1]
|
||||
)
|
||||
|
||||
def test_reverse_workflow(self):
|
||||
overlay = {'test': {'type': 'reverse', 'tasks': {}}}
|
||||
@ -92,9 +132,11 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'email': require})
|
||||
|
||||
wfs_spec = self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=False)
|
||||
wfs_spec = self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=False
|
||||
)
|
||||
|
||||
self.assertEqual(1, len(wfs_spec.get_workflows()))
|
||||
self.assertEqual('test', wfs_spec.get_workflows()[0].get_name())
|
||||
@ -109,17 +151,21 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'echo': on_success})
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'email': join})
|
||||
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=True)
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=True
|
||||
)
|
||||
|
||||
def test_version_required(self):
|
||||
dsl_dict = copy.deepcopy(self._dsl_blank)
|
||||
dsl_dict.pop('version', None)
|
||||
|
||||
exception = self.assertRaises(exc.DSLParsingException,
|
||||
self._spec_parser,
|
||||
yaml.safe_dump(dsl_dict))
|
||||
exception = self.assertRaises(
|
||||
exc.DSLParsingException,
|
||||
self._spec_parser,
|
||||
yaml.safe_dump(dsl_dict)
|
||||
)
|
||||
|
||||
self.assertIn("'version' is a required property", exception.message)
|
||||
|
||||
@ -133,9 +179,11 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase):
|
||||
]
|
||||
|
||||
for version, expect_error in tests:
|
||||
self._parse_dsl_spec(add_tasks=True,
|
||||
changes=version,
|
||||
expect_error=expect_error)
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=version,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_inputs(self):
|
||||
tests = [
|
||||
|
@ -253,14 +253,6 @@ class DirectWorkflowTaskSpec(TaskSpec):
|
||||
def validate_schema(self):
|
||||
super(DirectWorkflowTaskSpec, self).validate_schema()
|
||||
|
||||
if 'join' in self._data:
|
||||
join = self._data.get('join')
|
||||
|
||||
if not (isinstance(join, int) or join in ['all', 'one']):
|
||||
msg = ("Task property 'join' is only allowed to be an"
|
||||
" integer, 'all' or 'one': %s" % self._data)
|
||||
raise exc.InvalidModelException(msg)
|
||||
|
||||
# Validate YAQL expressions.
|
||||
self._validate_transitions('on-complete')
|
||||
self._validate_transitions('on-success')
|
||||
|
@ -149,6 +149,7 @@ class DirectWorkflowSpec(WorkflowSpec):
|
||||
)
|
||||
|
||||
self._check_workflow_integrity()
|
||||
self._check_join_tasks()
|
||||
|
||||
def _check_workflow_integrity(self):
|
||||
for t_s in self.get_tasks():
|
||||
@ -157,6 +158,39 @@ class DirectWorkflowSpec(WorkflowSpec):
|
||||
for out_t_name in out_task_names:
|
||||
self._validate_task_link(out_t_name)
|
||||
|
||||
def _check_join_tasks(self):
|
||||
join_tasks = [t for t in self.get_tasks() if t.get_join()]
|
||||
|
||||
err_msgs = []
|
||||
|
||||
for join_t in join_tasks:
|
||||
t_name = join_t.get_name()
|
||||
join_val = join_t.get_join()
|
||||
|
||||
in_tasks = self.find_inbound_task_specs(join_t)
|
||||
|
||||
if join_val == 'all':
|
||||
if len(in_tasks) == 0:
|
||||
err_msgs.append(
|
||||
"No inbound tasks for task with 'join: all'"
|
||||
" [task_name=%s]" % t_name
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
if join_val == 'one':
|
||||
join_val = 1
|
||||
|
||||
if len(in_tasks) < join_val:
|
||||
err_msgs.append(
|
||||
"Not enough inbound tasks for task with 'join'"
|
||||
" [task_name=%s, join=%s, inbound_tasks=%s]" %
|
||||
(t_name, join_val, len(in_tasks))
|
||||
)
|
||||
|
||||
if len(err_msgs) > 0:
|
||||
raise exc.InvalidModelException('\n'.join(err_msgs))
|
||||
|
||||
def find_start_tasks(self):
|
||||
return [
|
||||
t_s for t_s in self.get_tasks()
|
||||
|
Loading…
Reference in New Issue
Block a user