Implement pause-before

Implements blueprint mistral-pause-before-policy

Change-Id: Ibf0ca7b17ce100467e4243b411f94ce481ec4962
This commit is contained in:
bhavenst 2014-12-09 22:27:02 -05:00 committed by Bryan Havenstein
parent 4513b446ff
commit 9c06dca52c
7 changed files with 126 additions and 25 deletions

View File

@ -175,25 +175,34 @@ YAML example
| ``  policies:`` | ``  policies:``
| ``    wait-before: 2`` | ``    wait-before: 2``
| ``    wait-after: 4`` | ``    wait-after: 4``
| `` pause-before: $.my_expr``
| ``    timeout: 30`` | ``    timeout: 30``
| ``    retry:`` | ``    retry:``
| ``      count: 10`` | ``      count: 10``
| ``      delay: 20`` | ``      delay: 20``
| ``      break-on: $.my_var = true`` | ``      break-on: $.my_var = True``
'wait-before' 'wait-before'
Defines a delay in seconds that Mistral Engine should wait before Defines a delay in seconds that Mistral Engine should wait before
starting a task. starting a task.
'wait-after' 'wait-after'
Defines a delay in seconds that Mistral Engine should wait after a task Defines a delay in seconds that Mistral Engine should wait after a task
has completed before starting next tasks defined in 'on-success', has completed before starting next tasks defined in 'on-success',
'on-error' or 'on-complete'. 'on-error' or 'on-complete'.
'pause-before'
The Mistral Engine will pause the workflow and its task with the 'pause-before'
policy before executing it. The workflow and task will be paused until a
resume signal is received. This policy accepts a YAQL expression which will
cause the policy to be applied only if the expression evaluates to 'True'.
'timeout' 'timeout'
@ -201,7 +210,7 @@ Defines a period of time in seconds after which a task will be failed
automatically by engine if hasn't completed. automatically by engine if hasn't completed.
'retry' 'retry'
Defines a pattern how task should be repeated in case of an error. Defines a pattern how task should be repeated in case of an error.
@ -210,7 +219,7 @@ Defines a pattern how task should be repeated in case of an error.
- **delay** - Defines a delay in seconds between subsequent task - **delay** - Defines a delay in seconds between subsequent task
iterations. iterations.
- **break-on** - Defines a YAQL expression that will break iteration - **break-on** - Defines a YAQL expression that will break iteration
loop if it evaluates to 'true'. If it fires then the task is loop if it evaluates to 'True'. If it fires then the task is
considered successful. considered successful.
Simplified Input Syntax Simplified Input Syntax
@ -333,7 +342,7 @@ task'. When Mistral Engine starts a workflow it recursively identifies
all the dependencies that need to be completed first. all the dependencies that need to be completed first.
| |Figure 2. Mistral Reverse Workflow.| | |Figure 2. Mistral Reverse Workflow.|
Figure 2. Mistral Reverse Workflow. Figure 2. Mistral Reverse Workflow.
Figure 2 explains how reverse workflow works. In the example, task Figure 2 explains how reverse workflow works. In the example, task

View File

@ -97,12 +97,21 @@ class RunTask(EngineCommand):
self.exec_db = task_db.execution self.exec_db = task_db.execution
def run_local(self, exec_db, wf_handler, cause_task_db=None): def run_local(self, exec_db, wf_handler, cause_task_db=None):
if self.task_db and self.task_db.state == states.IDLE:
LOG.debug('Resuming workflow task: %s' % self.task_spec)
self.task_db.state = states.RUNNING
return True
LOG.debug('Running workflow task: %s' % self.task_spec) LOG.debug('Running workflow task: %s' % self.task_spec)
self._prepare_task(exec_db, wf_handler, cause_task_db) self._prepare_task(exec_db, wf_handler, cause_task_db)
self._before_task_start(wf_handler.wf_spec) self._before_task_start(wf_handler.wf_spec)
return True if exec_db.state == states.RUNNING:
return True
return False
def _prepare_task(self, exec_db, wf_handler, cause_task_db): def _prepare_task(self, exec_db, wf_handler, cause_task_db):
if self.task_db: if self.task_db:

View File

@ -17,12 +17,12 @@
from oslo.config import cfg from oslo.config import cfg
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import states
from mistral.engine1 import base from mistral.engine1 import base
from mistral.engine1 import rpc from mistral.engine1 import rpc
from mistral import expressions from mistral import expressions
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
from mistral.services import scheduler from mistral.services import scheduler
from mistral.workflow import states
from mistral.workflow import utils from mistral.workflow import utils
@ -53,7 +53,8 @@ def get_policy_factories():
build_wait_before_policy, build_wait_before_policy,
build_wait_after_policy, build_wait_after_policy,
build_retry_policy, build_retry_policy,
build_timeout_policy build_timeout_policy,
build_pause_before_policy
] ]
@ -115,6 +116,16 @@ def build_timeout_policy(policies_spec):
return TimeoutPolicy(timeout_policy) if timeout_policy > 0 else None return TimeoutPolicy(timeout_policy) if timeout_policy > 0 else None
def build_pause_before_policy(policies_spec):
if not policies_spec:
return None
pause_before_policy = policies_spec.get_pause_before()
return PauseBeforePolicy(pause_before_policy) \
if pause_before_policy else None
def _ensure_context_has_key(runtime_context, key): def _ensure_context_has_key(runtime_context, key):
if not runtime_context: if not runtime_context:
runtime_context = {} runtime_context = {}
@ -184,7 +195,7 @@ class WaitAfterPolicy(base.TaskPolicy):
if policy_context.get('skip'): if policy_context.get('skip'):
# Need to avoid terminal states. # Need to avoid terminal states.
if not states.is_finished(task_db.state): if not states.is_completed(task_db.state):
# Unset state 'DELAYED'. # Unset state 'DELAYED'.
WF_TRACE.info( WF_TRACE.info(
@ -304,10 +315,27 @@ class TimeoutPolicy(base.TaskPolicy):
% (task_db.id, self.delay)) % (task_db.id, self.delay))
class PauseBeforePolicy(base.TaskPolicy):
def __init__(self, expression):
self.expr = expression
def before_task_start(self, task_db, task_spec):
if not expressions.evaluate(self.expr, task_db.in_context):
return
WF_TRACE.info(
"Worflow paused before task '%s' [%s -> %s]" %
(task_db.name, task_db.execution.state, states.PAUSED)
)
task_db.execution.state = states.PAUSED
task_db.state = states.IDLE
def fail_task_if_incomplete(task_id, timeout): def fail_task_if_incomplete(task_id, timeout):
task_db = db_api.get_task(task_id) task_db = db_api.get_task(task_id)
if not states.is_finished(task_db.state): if not states.is_completed(task_db.state):
msg = "Task timed out [task=%s, timeout(s)=%s]." % (task_id, timeout) msg = "Task timed out [task=%s, timeout(s)=%s]." % (task_id, timeout)
WF_TRACE.info(msg) WF_TRACE.info(msg)

View File

@ -182,6 +182,26 @@ workflows:
""" """
PAUSE_BEFORE_WB = """
---
version: '2.0'
name: wb
workflows:
wf1:
type: direct
tasks:
task1:
action: std.echo output="Hi!"
policies:
pause-before: True
on-success:
- task2
task2:
action: std.echo output="Bye!"
"""
class PoliciesTest(base.EngineTestCase): class PoliciesTest(base.EngineTestCase):
def setUp(self): def setUp(self):
super(PoliciesTest, self).setUp() super(PoliciesTest, self).setUp()
@ -354,3 +374,30 @@ class PoliciesTest(base.EngineTestCase):
# Make sure that engine did not create extra tasks. # Make sure that engine did not create extra tasks.
self.assertEqual(1, len(tasks_db)) self.assertEqual(1, len(tasks_db))
def test_pause_before_policy(self):
wb_service.create_workbook_v2(PAUSE_BEFORE_WB)
# Start workflow.
exec_db = self.engine.start_workflow('wb.wf1', {})
exec_db = db_api.get_execution(exec_db.id)
task_db = self._assert_single_item(exec_db.tasks, name="task1")
self.assertEqual(states.IDLE, task_db.state)
self._await(lambda: self.is_execution_paused(exec_db.id))
self.engine.resume_workflow(exec_db.id)
exec_db = db_api.get_execution(exec_db.id)
task_db = self._assert_single_item(exec_db.tasks, name="task1")
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
task_db = self._assert_single_item(exec_db.tasks, name="task1")
next_task_db = self._assert_single_item(exec_db.tasks, name="task2")
self.assertEqual(states.SUCCESS, task_db.state)
self.assertEqual(states.SUCCESS, next_task_db.state)

View File

@ -25,6 +25,7 @@ class TaskPoliciesSpec(base.BaseSpec):
"wait-before": {"type": "integer"}, "wait-before": {"type": "integer"},
"wait-after": {"type": "integer"}, "wait-after": {"type": "integer"},
"timeout": {"type": "integer"}, "timeout": {"type": "integer"},
"pause-before": {"type": "boolean"},
}, },
"additionalProperties": False "additionalProperties": False
} }
@ -36,6 +37,7 @@ class TaskPoliciesSpec(base.BaseSpec):
self._wait_before = data.get("wait-before", 0) self._wait_before = data.get("wait-before", 0)
self._wait_after = data.get("wait-after", 0) self._wait_after = data.get("wait-after", 0)
self._timeout = data.get("timeout", 0) self._timeout = data.get("timeout", 0)
self._pause_before = data.get("pause-before", False)
def get_retry(self): def get_retry(self):
return self._retry return self._retry
@ -48,3 +50,6 @@ class TaskPoliciesSpec(base.BaseSpec):
def get_timeout(self): def get_timeout(self):
return self._timeout return self._timeout
def get_pause_before(self):
return self._pause_before

View File

@ -177,12 +177,12 @@ class WorkflowHandler(object):
def filter_task_cmds(cmds): def filter_task_cmds(cmds):
return [cmd for cmd in cmds if isinstance(cmd, commands.RunTask)] return [cmd for cmd in cmds if isinstance(cmd, commands.RunTask)]
def get_tasks_to_schedule(task_db, schedule_tasks): def get_tasks_to_schedule(task_db, schedule_tasks, schedule_cmds):
"""Finds tasks that should run after given task and searches them """Finds tasks that should run after given task and searches them
in DB. If there are no tasks in the DB, it should be scheduled in DB. If there are no tasks in the DB, it should be scheduled
now. If there are tasks in the DB, continue search to next tasks now. If there are tasks in the DB, continue search to next tasks
in workflow if this task is finished. If this task is not in workflow if this task is finished. If this task is in IDLE
finished - do nothing. state, schedule it for resume.
:param task_db: Task DB. :param task_db: Task DB.
:param schedule_tasks: Task names from previous iteration. :param schedule_tasks: Task names from previous iteration.
@ -198,32 +198,31 @@ class WorkflowHandler(object):
if not t_db: if not t_db:
schedule_tasks += [task_name] schedule_tasks += [task_name]
task_spec = self.wf_spec.get_tasks()[task_name]
schedule_cmds += [commands.RunTask(task_spec)]
else: else:
schedule_tasks += get_tasks_to_schedule( schedule_tasks += get_tasks_to_schedule(
t_db, t_db,
schedule_tasks schedule_tasks,
schedule_cmds
) )
elif states.is_idle(task_db.state):
idle_task_spec = self.wf_spec.get_tasks()[task_db.name]
schedule_cmds += [commands.RunTask(idle_task_spec, task_db)]
return schedule_tasks return schedule_cmds
params = self.exec_db.start_params params = self.exec_db.start_params
start_task_cmds = filter_task_cmds( start_task_cmds = filter_task_cmds(
self.start_workflow(**params if params else {}) self.start_workflow(**params if params else {})
) )
task_names = [] schedule_cmds = []
for cmd in start_task_cmds: for cmd in start_task_cmds:
task_db = [t for t in tasks task_db = [t for t in tasks
if t.name == cmd.task_spec.get_name()][0] if t.name == cmd.task_spec.get_name()][0]
task_names += get_tasks_to_schedule(task_db, []) schedule_cmds += get_tasks_to_schedule(task_db, [], [])
schedule_cmds = []
for t_name in task_names:
schedule_cmds += [commands.RunTask(
self.wf_spec.get_tasks()[t_name]
)]
return schedule_cmds return schedule_cmds

View File

@ -47,6 +47,10 @@ def is_completed(state):
return state in [SUCCESS, ERROR] return state in [SUCCESS, ERROR]
def is_idle(state):
return state == IDLE
def is_paused_or_completed(state): def is_paused_or_completed(state):
return state == PAUSED or is_completed(state) return state == PAUSED or is_completed(state)