From fa0102354b6fd5f865fc33eb2f703f658bfe98b1 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Thu, 30 Apr 2015 16:56:18 +0600 Subject: [PATCH] Adding "requires" to "task-defaults" clause * Reverse workflows can now use "requires" declared in "task-defaults", for example, to have all tasks depending on a configured task (except this task itself) * Fixed a bug with "processed" flag, it shouldn't be taken into account for reverse workflows because if say task3 depends on task2 then these tasks can't depend on a task1 at the same time Change-Id: Iad113e405f43125b37d16b91fcf448c21cd354e6 --- .../tests/unit/engine/test_task_defaults.py | 209 ++++++++++-------- .../tests/unit/workbook/v2/test_workflows.py | 7 + mistral/workbook/v2/task_defaults.py | 7 +- mistral/workbook/v2/tasks.py | 3 - mistral/workflow/reverse_workflow.py | 29 ++- 5 files changed, 155 insertions(+), 100 deletions(-) diff --git a/mistral/tests/unit/engine/test_task_defaults.py b/mistral/tests/unit/engine/test_task_defaults.py index 057f2a0a..4441e5c4 100644 --- a/mistral/tests/unit/engine/test_task_defaults.py +++ b/mistral/tests/unit/engine/test_task_defaults.py @@ -29,35 +29,34 @@ LOG = logging.getLogger(__name__) # the change in value is not permanent. cfg.CONF.set_default('auth_enable', False, group='pecan') -DIRECT_WF_ON_ERROR = """ ---- -version: '2.0' - -wf: - type: direct - - task-defaults: - on-error: - - task3 - - tasks: - task1: - description: That should lead to transition to task3. - action: std.http url="http://some_url" - on-success: - - task2 - - task2: - action: std.echo output="Morpheus" - - task3: - action: std.echo output="output" -""" - class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase): def test_task_defaults_on_error(self): - wf_service.create_workflows(DIRECT_WF_ON_ERROR) + wf_text = """--- + version: '2.0' + + wf: + type: direct + + task-defaults: + on-error: + - task3 + + tasks: + task1: + description: That should lead to transition to task3. + action: std.http url="http://some_url" + on-success: + - task2 + + task2: + action: std.echo output="Morpheus" + + task3: + action: std.echo output="output" + """ + + wf_service.create_workflows(wf_text) # Start workflow. wf_ex = self.engine.start_workflow('wf', {}) @@ -77,64 +76,6 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task3.state) -REVERSE_WF_RETRY = """ ---- -version: '2.0' - -wf: - type: reverse - - task-defaults: - retry: - count: 2 - delay: 1 - - tasks: - task1: - action: std.fail - - task2: - action: std.echo output=2 - requires: [task1] -""" - - -REVERSE_WF_TIMEOUT = """ ---- -version: '2.0' - -wf: - type: reverse - - task-defaults: - timeout: 1 - - tasks: - task1: - action: std.async_noop - - task2: - action: std.echo output=2 - requires: [task1] -""" - -REVERSE_WF_WAIT = """ ---- -version: '2.0' - -wf: - type: reverse - - task-defaults: - wait-before: 1 - wait-after: 1 - - tasks: - task1: - action: std.echo output=1 -""" - - class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): def setUp(self): super(TaskDefaultsReverseWorkflowEngineTest, self).setUp() @@ -144,7 +85,27 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): self.addCleanup(thread_group.stop) def test_task_defaults_retry_policy(self): - wf_service.create_workflows(REVERSE_WF_RETRY) + wf_text = """--- + version: '2.0' + + wf: + type: reverse + + task-defaults: + retry: + count: 2 + delay: 1 + + tasks: + task1: + action: std.fail + + task2: + action: std.echo output=2 + requires: [task1] + """ + + wf_service.create_workflows(wf_text) # Start workflow. wf_ex = self.engine.start_workflow('wf', {}, task_name='task2') @@ -170,7 +131,25 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): @testtools.skip("Fix 'timeout' policy.") def test_task_defaults_timeout_policy(self): - wf_service.create_workflows(REVERSE_WF_TIMEOUT) + wf_text = """--- + version: '2.0' + + wf: + type: reverse + + task-defaults: + timeout: 1 + + tasks: + task1: + action: std.async_noop + + task2: + action: std.echo output=2 + requires: [task1] + """ + + wf_service.create_workflows(wf_text) # Start workflow. wf_ex = self.engine.start_workflow('wf', {}, task_name='task2') @@ -187,7 +166,22 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): self._assert_single_item(tasks, name='task1', state=states.ERROR) def test_task_defaults_wait_policies(self): - wf_service.create_workflows(REVERSE_WF_WAIT) + wf_text = """--- + version: '2.0' + + wf: + type: reverse + + task-defaults: + wait-before: 1 + wait-after: 1 + + tasks: + task1: + action: std.echo output=1 + """ + + wf_service.create_workflows(wf_text) time_before = dt.datetime.now() @@ -210,3 +204,44 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): self.assertEqual(1, len(tasks)) self._assert_single_item(tasks, name='task1', state=states.SUCCESS) + + def test_task_defaults_requires(self): + wf_text = """--- + version: '2.0' + + wf: + type: reverse + + task-defaults: + requires: [always_do] + + tasks: + task1: + action: std.echo output=1 + + task2: + action: std.echo output=1 + requires: [task1] + + always_do: + action: std.echo output="Do something" + + """ + + wf_service.create_workflows(wf_text) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', {}, task_name='task2') + + self._await(lambda: self.is_execution_success(wf_ex.id)) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + tasks = wf_ex.task_executions + + self.assertEqual(3, len(tasks)) + + self._assert_single_item(tasks, name='task1', state=states.SUCCESS) + self._assert_single_item(tasks, name='task2', state=states.SUCCESS) + self._assert_single_item(tasks, name='always_do', state=states.SUCCESS) diff --git a/mistral/tests/unit/workbook/v2/test_workflows.py b/mistral/tests/unit/workbook/v2/test_workflows.py index 0ff27028..8ea1815a 100644 --- a/mistral/tests/unit/workbook/v2/test_workflows.py +++ b/mistral/tests/unit/workbook/v2/test_workflows.py @@ -218,6 +218,13 @@ class WorkflowSpecValidation(base.WorkflowSpecValidationTestCase): ({'on-complete': []}, True), ({'on-complete': ['email', 'email']}, True), ({'on-complete': ['email', 12345]}, True), + ({'requires': ''}, True), + ({'requires': []}, True), + ({'requires': ['']}, True), + ({'requires': None}, True), + ({'requires': 12345}, True), + ({'requires': ['echo']}, False), + ({'requires': ['echo', 'get']}, False), ({'retry': {'count': 3, 'delay': 1}}, False), ({'retry': {'count': '<% 3 %>', 'delay': 1}}, False), ({'retry': {'count': '<% * %>', 'delay': 1}}, True), diff --git a/mistral/workbook/v2/task_defaults.py b/mistral/workbook/v2/task_defaults.py index 26e36b44..27786c8e 100644 --- a/mistral/workbook/v2/task_defaults.py +++ b/mistral/workbook/v2/task_defaults.py @@ -43,7 +43,8 @@ class TaskDefaultsSpec(base.BaseSpec): "concurrency": policies.CONCURRENCY_SCHEMA, "on-complete": _on_clause_type, "on-success": _on_clause_type, - "on-error": _on_clause_type + "on-error": _on_clause_type, + "requires": types.UNIQUE_STRING_LIST }, "additionalProperties": False } @@ -67,6 +68,7 @@ class TaskDefaultsSpec(base.BaseSpec): self._on_complete = self._as_list_of_tuples("on-complete") self._on_success = self._as_list_of_tuples("on-success") self._on_error = self._as_list_of_tuples("on-error") + self._requires = data.get('requires', []) def validate(self): super(TaskDefaultsSpec, self).validate() @@ -93,3 +95,6 @@ class TaskDefaultsSpec(base.BaseSpec): def get_on_error(self): return self._on_error + + def get_requires(self): + return self._requires diff --git a/mistral/workbook/v2/tasks.py b/mistral/workbook/v2/tasks.py index 9e852181..8c748ecb 100644 --- a/mistral/workbook/v2/tasks.py +++ b/mistral/workbook/v2/tasks.py @@ -289,9 +289,6 @@ class ReverseWorkflowTaskSpec(TaskSpec): self._requires = data.get('requires', []) def get_requires(self): - if isinstance(self._requires, six.string_types): - return [self._requires] - return self._requires diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index 80a89ab8..4d99cd3f 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -73,7 +73,7 @@ class ReverseWorkflowController(base.WorkflowController): def _get_upstream_task_executions(self, task_spec): t_specs = [ self.wf_spec.get_tasks()[t_name] - for t_name in task_spec.get_requires() + for t_name in self._get_task_requires(task_spec) or [] ] @@ -119,16 +119,16 @@ class ReverseWorkflowController(base.WorkflowController): if task_ex: return False - if not task_spec.get_requires(): + if not self._get_task_requires(task_spec): return True - success_task_names = set() + success_t_names = set() for t_ex in self.wf_ex.task_executions: - if t_ex.state == states.SUCCESS and not t_ex.processed: - success_task_names.add(t_ex.name) + if t_ex.state == states.SUCCESS: + success_t_names.add(t_ex.name) - return not (set(task_spec.get_requires()) - success_task_names) + return not (set(self._get_task_requires(task_spec)) - success_t_names) def _build_graph(self, tasks_spec): graph = nx.DiGraph() @@ -144,9 +144,8 @@ class ReverseWorkflowController(base.WorkflowController): return graph - @staticmethod - def _get_dependency_tasks(tasks_spec, task_spec): - dep_task_names = tasks_spec[task_spec.get_name()].get_requires() + def _get_dependency_tasks(self, tasks_spec, task_spec): + dep_task_names = self._get_task_requires(task_spec) if len(dep_task_names) == 0: return [] @@ -159,3 +158,15 @@ class ReverseWorkflowController(base.WorkflowController): dep_t_specs.add(t_spec) return dep_t_specs + + def _get_task_requires(self, task_spec): + requires = set(task_spec.get_requires()) + + task_defaults = self.wf_spec.get_task_defaults() + + if task_defaults: + requires |= set(task_defaults.get_requires()) + + requires.discard(task_spec.get_name()) + + return list(requires)