diff --git a/doc/source/dsl/dsl_v2.rst b/doc/source/dsl/dsl_v2.rst index 04822b87..fd9e381e 100644 --- a/doc/source/dsl/dsl_v2.rst +++ b/doc/source/dsl/dsl_v2.rst @@ -175,25 +175,34 @@ YAML example | ``  policies:`` | ``    wait-before: 2`` | ``    wait-after: 4`` +| `` pause-before: $.my_expr`` | ``    timeout: 30`` | ``    retry:`` | ``      count: 10`` | ``      delay: 20`` -| ``      break-on: $.my_var = true`` +| ``      break-on: $.my_var = True`` 'wait-before' - + Defines a delay in seconds that Mistral Engine should wait before starting a task. 'wait-after' - + Defines a delay in seconds that Mistral Engine should wait after a task has completed before starting next tasks defined in 'on-success', 'on-error' or 'on-complete'. +'pause-before' + + +The Mistral Engine will pause the workflow and its task with the 'pause-before' +policy before executing it. The workflow and task will be paused until a +resume signal is received. This policy accepts a YAQL expression which will +cause the policy to be applied only if the expression evaluates to 'True'. + 'timeout' @@ -201,7 +210,7 @@ Defines a period of time in seconds after which a task will be failed automatically by engine if hasn't completed. 'retry' - + Defines a pattern how task should be repeated in case of an error. @@ -210,7 +219,7 @@ Defines a pattern how task should be repeated in case of an error. - **delay** - Defines a delay in seconds between subsequent task iterations. - **break-on** - Defines a YAQL expression that will break iteration - loop if it evaluates to 'true'. If it fires then the task is + loop if it evaluates to 'True'. If it fires then the task is considered successful. Simplified Input Syntax @@ -333,7 +342,7 @@ task'. When Mistral Engine starts a workflow it recursively identifies all the dependencies that need to be completed first. | |Figure 2. Mistral Reverse Workflow.| - + Figure 2. Mistral Reverse Workflow. Figure 2 explains how reverse workflow works. In the example, task diff --git a/mistral/engine1/commands.py b/mistral/engine1/commands.py index f1d6def1..c964cf57 100644 --- a/mistral/engine1/commands.py +++ b/mistral/engine1/commands.py @@ -97,12 +97,21 @@ class RunTask(EngineCommand): self.exec_db = task_db.execution def run_local(self, exec_db, wf_handler, cause_task_db=None): + if self.task_db and self.task_db.state == states.IDLE: + LOG.debug('Resuming workflow task: %s' % self.task_spec) + self.task_db.state = states.RUNNING + + return True + LOG.debug('Running workflow task: %s' % self.task_spec) self._prepare_task(exec_db, wf_handler, cause_task_db) self._before_task_start(wf_handler.wf_spec) - return True + if exec_db.state == states.RUNNING: + return True + + return False def _prepare_task(self, exec_db, wf_handler, cause_task_db): if self.task_db: diff --git a/mistral/engine1/policies.py b/mistral/engine1/policies.py index cea95d5b..680a51d9 100644 --- a/mistral/engine1/policies.py +++ b/mistral/engine1/policies.py @@ -17,12 +17,12 @@ from oslo.config import cfg from mistral.db.v2 import api as db_api -from mistral.engine import states from mistral.engine1 import base from mistral.engine1 import rpc from mistral import expressions from mistral.openstack.common import log as logging from mistral.services import scheduler +from mistral.workflow import states from mistral.workflow import utils @@ -53,7 +53,8 @@ def get_policy_factories(): build_wait_before_policy, build_wait_after_policy, build_retry_policy, - build_timeout_policy + build_timeout_policy, + build_pause_before_policy ] @@ -115,6 +116,16 @@ def build_timeout_policy(policies_spec): return TimeoutPolicy(timeout_policy) if timeout_policy > 0 else None +def build_pause_before_policy(policies_spec): + if not policies_spec: + return None + + pause_before_policy = policies_spec.get_pause_before() + + return PauseBeforePolicy(pause_before_policy) \ + if pause_before_policy else None + + def _ensure_context_has_key(runtime_context, key): if not runtime_context: runtime_context = {} @@ -184,7 +195,7 @@ class WaitAfterPolicy(base.TaskPolicy): if policy_context.get('skip'): # Need to avoid terminal states. - if not states.is_finished(task_db.state): + if not states.is_completed(task_db.state): # Unset state 'DELAYED'. WF_TRACE.info( @@ -304,10 +315,27 @@ class TimeoutPolicy(base.TaskPolicy): % (task_db.id, self.delay)) +class PauseBeforePolicy(base.TaskPolicy): + def __init__(self, expression): + self.expr = expression + + def before_task_start(self, task_db, task_spec): + if not expressions.evaluate(self.expr, task_db.in_context): + return + + WF_TRACE.info( + "Worflow paused before task '%s' [%s -> %s]" % + (task_db.name, task_db.execution.state, states.PAUSED) + ) + + task_db.execution.state = states.PAUSED + task_db.state = states.IDLE + + def fail_task_if_incomplete(task_id, timeout): task_db = db_api.get_task(task_id) - if not states.is_finished(task_db.state): + if not states.is_completed(task_db.state): msg = "Task timed out [task=%s, timeout(s)=%s]." % (task_id, timeout) WF_TRACE.info(msg) diff --git a/mistral/tests/unit/engine1/test_policies.py b/mistral/tests/unit/engine1/test_policies.py index 64e9753b..7b1b90a5 100644 --- a/mistral/tests/unit/engine1/test_policies.py +++ b/mistral/tests/unit/engine1/test_policies.py @@ -182,6 +182,26 @@ workflows: """ +PAUSE_BEFORE_WB = """ +--- +version: '2.0' +name: wb +workflows: + wf1: + type: direct + + tasks: + task1: + action: std.echo output="Hi!" + policies: + pause-before: True + on-success: + - task2 + task2: + action: std.echo output="Bye!" +""" + + class PoliciesTest(base.EngineTestCase): def setUp(self): super(PoliciesTest, self).setUp() @@ -354,3 +374,30 @@ class PoliciesTest(base.EngineTestCase): # Make sure that engine did not create extra tasks. self.assertEqual(1, len(tasks_db)) + + def test_pause_before_policy(self): + wb_service.create_workbook_v2(PAUSE_BEFORE_WB) + + # Start workflow. + exec_db = self.engine.start_workflow('wb.wf1', {}) + + exec_db = db_api.get_execution(exec_db.id) + task_db = self._assert_single_item(exec_db.tasks, name="task1") + + self.assertEqual(states.IDLE, task_db.state) + + self._await(lambda: self.is_execution_paused(exec_db.id)) + + self.engine.resume_workflow(exec_db.id) + + exec_db = db_api.get_execution(exec_db.id) + task_db = self._assert_single_item(exec_db.tasks, name="task1") + + self._await(lambda: self.is_execution_success(exec_db.id)) + + exec_db = db_api.get_execution(exec_db.id) + task_db = self._assert_single_item(exec_db.tasks, name="task1") + next_task_db = self._assert_single_item(exec_db.tasks, name="task2") + + self.assertEqual(states.SUCCESS, task_db.state) + self.assertEqual(states.SUCCESS, next_task_db.state) diff --git a/mistral/workbook/v2/task_policies.py b/mistral/workbook/v2/task_policies.py index bfff9389..19392029 100644 --- a/mistral/workbook/v2/task_policies.py +++ b/mistral/workbook/v2/task_policies.py @@ -25,6 +25,7 @@ class TaskPoliciesSpec(base.BaseSpec): "wait-before": {"type": "integer"}, "wait-after": {"type": "integer"}, "timeout": {"type": "integer"}, + "pause-before": {"type": "boolean"}, }, "additionalProperties": False } @@ -36,6 +37,7 @@ class TaskPoliciesSpec(base.BaseSpec): self._wait_before = data.get("wait-before", 0) self._wait_after = data.get("wait-after", 0) self._timeout = data.get("timeout", 0) + self._pause_before = data.get("pause-before", False) def get_retry(self): return self._retry @@ -48,3 +50,6 @@ class TaskPoliciesSpec(base.BaseSpec): def get_timeout(self): return self._timeout + + def get_pause_before(self): + return self._pause_before diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 701966fe..6eced6f0 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -177,12 +177,12 @@ class WorkflowHandler(object): def filter_task_cmds(cmds): return [cmd for cmd in cmds if isinstance(cmd, commands.RunTask)] - def get_tasks_to_schedule(task_db, schedule_tasks): + def get_tasks_to_schedule(task_db, schedule_tasks, schedule_cmds): """Finds tasks that should run after given task and searches them in DB. If there are no tasks in the DB, it should be scheduled now. If there are tasks in the DB, continue search to next tasks - in workflow if this task is finished. If this task is not - finished - do nothing. + in workflow if this task is finished. If this task is in IDLE + state, schedule it for resume. :param task_db: Task DB. :param schedule_tasks: Task names from previous iteration. @@ -198,32 +198,31 @@ class WorkflowHandler(object): if not t_db: schedule_tasks += [task_name] + task_spec = self.wf_spec.get_tasks()[task_name] + schedule_cmds += [commands.RunTask(task_spec)] else: schedule_tasks += get_tasks_to_schedule( t_db, - schedule_tasks + schedule_tasks, + schedule_cmds ) + elif states.is_idle(task_db.state): + idle_task_spec = self.wf_spec.get_tasks()[task_db.name] + schedule_cmds += [commands.RunTask(idle_task_spec, task_db)] - return schedule_tasks + return schedule_cmds params = self.exec_db.start_params start_task_cmds = filter_task_cmds( self.start_workflow(**params if params else {}) ) - task_names = [] + schedule_cmds = [] for cmd in start_task_cmds: task_db = [t for t in tasks if t.name == cmd.task_spec.get_name()][0] - task_names += get_tasks_to_schedule(task_db, []) - - schedule_cmds = [] - - for t_name in task_names: - schedule_cmds += [commands.RunTask( - self.wf_spec.get_tasks()[t_name] - )] + schedule_cmds += get_tasks_to_schedule(task_db, [], []) return schedule_cmds diff --git a/mistral/workflow/states.py b/mistral/workflow/states.py index 7b942297..1adf5499 100644 --- a/mistral/workflow/states.py +++ b/mistral/workflow/states.py @@ -47,6 +47,10 @@ def is_completed(state): return state in [SUCCESS, ERROR] +def is_idle(state): + return state == IDLE + + def is_paused_or_completed(state): return state == PAUSED or is_completed(state)