diff --git a/mistral/tests/resources/workbook/v2/my_workbook.yaml b/mistral/tests/resources/workbook/v2/my_workbook.yaml index 8021abe4..38a78ece 100644 --- a/mistral/tests/resources/workbook/v2/my_workbook.yaml +++ b/mistral/tests/resources/workbook/v2/my_workbook.yaml @@ -95,6 +95,7 @@ workflows: on-complete: - task9 - task10 + - task11 task9: join: all diff --git a/mistral/tests/unit/workbook/v2/test_tasks.py b/mistral/tests/unit/workbook/v2/test_tasks.py index 777f06a9..964970a8 100644 --- a/mistral/tests/unit/workbook/v2/test_tasks.py +++ b/mistral/tests/unit/workbook/v2/test_tasks.py @@ -72,9 +72,12 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): for task, expect_error in tests: overlay = {'test': {'tasks': {'task1': task}}} - self._parse_dsl_spec(add_tasks=False, - changes=overlay, - expect_error=expect_error) + + self._parse_dsl_spec( + add_tasks=False, + changes=overlay, + expect_error=expect_error + ) def test_inputs(self): tests = [ @@ -89,10 +92,14 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): for task_input, expect_error in tests: overlay = {'test': {'tasks': {'task1': {'action': 'test.mock'}}}} + utils.merge_dicts(overlay['test']['tasks']['task1'], task_input) - self._parse_dsl_spec(add_tasks=False, - changes=overlay, - expect_error=expect_error) + + self._parse_dsl_spec( + add_tasks=False, + changes=overlay, + expect_error=expect_error + ) def test_with_items(self): tests = [ @@ -114,9 +121,12 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): for with_item, expect_error in tests: overlay = {'test': {'tasks': {'get': with_item}}} - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=expect_error) + + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=expect_error + ) def test_publish(self): tests = [ @@ -131,10 +141,14 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): for output, expect_error in tests: overlay = {'test': {'tasks': {'task1': {'action': 'test.mock'}}}} + utils.merge_dicts(overlay['test']['tasks']['task1'], output) - self._parse_dsl_spec(add_tasks=False, - changes=overlay, - expect_error=expect_error) + + self._parse_dsl_spec( + add_tasks=False, + changes=overlay, + expect_error=expect_error + ) def test_policies(self): tests = [ @@ -191,9 +205,11 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): for policy, expect_error in tests: overlay = {'test': {'tasks': {'get': policy}}} - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=expect_error) + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=expect_error + ) def test_direct_transition(self): tests = [ @@ -237,9 +253,11 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): utils.merge_dicts(overlay['test']['tasks'], {'get': transition}) - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=expect_error) + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=expect_error + ) def test_join(self): tests = [ @@ -248,7 +266,8 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): ({'join': 'all'}, False), ({'join': 'one'}, False), ({'join': 0}, False), - ({'join': 3}, False), + ({'join': 2}, False), + ({'join': 3}, True), ({'join': '3'}, True), ({'join': -3}, True) ] @@ -257,12 +276,16 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): for join, expect_error in tests: overlay = {'test': {'tasks': {}}} + utils.merge_dicts(overlay['test']['tasks'], {'get': on_success}) utils.merge_dicts(overlay['test']['tasks'], {'echo': on_success}) utils.merge_dicts(overlay['test']['tasks'], {'email': join}) - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=expect_error) + + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=expect_error + ) def test_requires(self): tests = [ @@ -278,11 +301,15 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): for require, expect_error in tests: overlay = {'test': {'tasks': {}}} + utils.merge_dicts(overlay['test'], {'type': 'reverse'}) utils.merge_dicts(overlay['test']['tasks'], {'email': require}) - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=expect_error) + + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=expect_error + ) def test_keep_result(self): tests = [ @@ -300,7 +327,11 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase): for keep_result, expect_error in tests: overlay = {'test': {'tasks': {}}} + utils.merge_dicts(overlay['test']['tasks'], {'email': keep_result}) - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=expect_error) + + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=expect_error + ) diff --git a/mistral/tests/unit/workbook/v2/test_workflows.py b/mistral/tests/unit/workbook/v2/test_workflows.py index 12263765..034b2435 100644 --- a/mistral/tests/unit/workbook/v2/test_workflows.py +++ b/mistral/tests/unit/workbook/v2/test_workflows.py @@ -35,9 +35,12 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase): for wf_type, expect_error in tests: overlay = {'test': wf_type} - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=expect_error) + + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=expect_error + ) def test_direct_workflow(self): overlay = {'test': {'type': 'direct', 'tasks': {}}} @@ -48,9 +51,11 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase): utils.merge_dicts(overlay['test']['tasks'], {'echo': on_success}) utils.merge_dicts(overlay['test']['tasks'], {'email': join}) - wfs_spec = self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=False) + wfs_spec = self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=False + ) self.assertEqual(1, len(wfs_spec.get_workflows())) self.assertEqual('test', wfs_spec.get_workflows()[0].get_name()) @@ -67,9 +72,11 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase): utils.merge_dicts(overlay['test']['tasks'], {'email': requires}) - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=True) + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=True + ) def test_direct_workflow_no_start_tasks(self): overlay = { @@ -82,9 +89,42 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase): } } - self._parse_dsl_spec(add_tasks=False, - changes=overlay, - expect_error=True) + self._parse_dsl_spec( + add_tasks=False, + changes=overlay, + expect_error=True + ) + + def test_direct_workflow_invalid_join(self): + tests = [ + ({'task3': {'join': 2}}, False), + ({'task3': {'join': 5}}, True), + ({'task3': {'join': 1}}, False), + ({'task3': {'join': 'one'}}, False), + ({'task3': {'join': 'all'}}, False), + ({'task4': {'join': 'all'}}, True), + ({'task4': {'join': 1}}, True), + ({'task4': {'join': 'one'}}, True) + ] + + for test in tests: + overlay = { + 'test': { + 'type': 'direct', + 'tasks': { + 'task1': {'on-complete': 'task3'}, + 'task2': {'on-complete': 'task3'} + } + } + } + + utils.merge_dicts(overlay['test']['tasks'], test[0]) + + self._parse_dsl_spec( + add_tasks=False, + changes=overlay, + expect_error=test[1] + ) def test_reverse_workflow(self): overlay = {'test': {'type': 'reverse', 'tasks': {}}} @@ -92,9 +132,11 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase): utils.merge_dicts(overlay['test']['tasks'], {'email': require}) - wfs_spec = self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=False) + wfs_spec = self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=False + ) self.assertEqual(1, len(wfs_spec.get_workflows())) self.assertEqual('test', wfs_spec.get_workflows()[0].get_name()) @@ -109,17 +151,21 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase): utils.merge_dicts(overlay['test']['tasks'], {'echo': on_success}) utils.merge_dicts(overlay['test']['tasks'], {'email': join}) - self._parse_dsl_spec(add_tasks=True, - changes=overlay, - expect_error=True) + self._parse_dsl_spec( + add_tasks=True, + changes=overlay, + expect_error=True + ) def test_version_required(self): dsl_dict = copy.deepcopy(self._dsl_blank) dsl_dict.pop('version', None) - exception = self.assertRaises(exc.DSLParsingException, - self._spec_parser, - yaml.safe_dump(dsl_dict)) + exception = self.assertRaises( + exc.DSLParsingException, + self._spec_parser, + yaml.safe_dump(dsl_dict) + ) self.assertIn("'version' is a required property", exception.message) @@ -133,9 +179,11 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase): ] for version, expect_error in tests: - self._parse_dsl_spec(add_tasks=True, - changes=version, - expect_error=expect_error) + self._parse_dsl_spec( + add_tasks=True, + changes=version, + expect_error=expect_error + ) def test_inputs(self): tests = [ diff --git a/mistral/workbook/v2/tasks.py b/mistral/workbook/v2/tasks.py index 3a89610d..f27759af 100644 --- a/mistral/workbook/v2/tasks.py +++ b/mistral/workbook/v2/tasks.py @@ -253,14 +253,6 @@ class DirectWorkflowTaskSpec(TaskSpec): def validate_schema(self): super(DirectWorkflowTaskSpec, self).validate_schema() - if 'join' in self._data: - join = self._data.get('join') - - if not (isinstance(join, int) or join in ['all', 'one']): - msg = ("Task property 'join' is only allowed to be an" - " integer, 'all' or 'one': %s" % self._data) - raise exc.InvalidModelException(msg) - # Validate YAQL expressions. self._validate_transitions('on-complete') self._validate_transitions('on-success') diff --git a/mistral/workbook/v2/workflows.py b/mistral/workbook/v2/workflows.py index 01fbc810..ed98daea 100644 --- a/mistral/workbook/v2/workflows.py +++ b/mistral/workbook/v2/workflows.py @@ -149,6 +149,7 @@ class DirectWorkflowSpec(WorkflowSpec): ) self._check_workflow_integrity() + self._check_join_tasks() def _check_workflow_integrity(self): for t_s in self.get_tasks(): @@ -157,6 +158,39 @@ class DirectWorkflowSpec(WorkflowSpec): for out_t_name in out_task_names: self._validate_task_link(out_t_name) + def _check_join_tasks(self): + join_tasks = [t for t in self.get_tasks() if t.get_join()] + + err_msgs = [] + + for join_t in join_tasks: + t_name = join_t.get_name() + join_val = join_t.get_join() + + in_tasks = self.find_inbound_task_specs(join_t) + + if join_val == 'all': + if len(in_tasks) == 0: + err_msgs.append( + "No inbound tasks for task with 'join: all'" + " [task_name=%s]" % t_name + ) + + continue + + if join_val == 'one': + join_val = 1 + + if len(in_tasks) < join_val: + err_msgs.append( + "Not enough inbound tasks for task with 'join'" + " [task_name=%s, join=%s, inbound_tasks=%s]" % + (t_name, join_val, len(in_tasks)) + ) + + if len(err_msgs) > 0: + raise exc.InvalidModelException('\n'.join(err_msgs)) + def find_start_tasks(self): return [ t_s for t_s in self.get_tasks()