Merge "Add 'triggered_by' into task execution runtime context"

This commit is contained in:
Jenkins 2017-05-22 15:00:08 +00:00 committed by Gerrit Code Review
commit 81c51cb483
8 changed files with 189 additions and 67 deletions

View File

@ -232,7 +232,8 @@ def _build_task_from_command(cmd):
cmd.task_spec,
cmd.ctx,
unique_key=cmd.unique_key,
waiting=cmd.is_waiting()
waiting=cmd.is_waiting(),
triggered_by=cmd.triggered_by
)
return task
@ -241,13 +242,22 @@ def _build_task_from_command(cmd):
def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
unique_key=None, waiting=False):
unique_key=None, waiting=False, triggered_by=None):
if task_spec.get_with_items():
cls = tasks.WithItemsTask
else:
cls = tasks.RegularTask
return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting)
return cls(
wf_ex,
wf_spec,
task_spec,
ctx,
task_ex=task_ex,
unique_key=unique_key,
waiting=waiting,
triggered_by=triggered_by
)
@action_queue.process

View File

@ -44,7 +44,7 @@ class Task(object):
"""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None,
unique_key=None, waiting=False):
unique_key=None, waiting=False, triggered_by=None):
self.wf_ex = wf_ex
self.task_spec = task_spec
self.ctx = ctx
@ -52,6 +52,7 @@ class Task(object):
self.wf_spec = wf_spec
self.unique_key = unique_key
self.waiting = waiting
self.triggered_by = triggered_by
self.reset_flag = False
self.created = False
self.state_changed = False
@ -227,6 +228,14 @@ class Task(object):
'type': task_type
}
if self.triggered_by:
values['runtime_context']['triggered_by'] = [
{
'task_id': self.triggered_by[0].id,
'event': self.triggered_by[1]
}
]
self.task_ex = db_api.create_task_execution(values)
# Add to collection explicitly so that it's in a proper

View File

@ -38,7 +38,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
return db_api.get_workflow_execution(wf_ex.id)
def test_direct_workflow_on_closures(self):
def test_on_closures(self):
wf_text = """
version: '2.0'
@ -48,7 +48,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
tasks:
task1:
description: |
Explicit 'fail' command should lead to workflow failure.
Explicit 'succeed' command should lead to workflow success.
action: std.echo output="Echo"
on-success:
- task2
@ -72,7 +72,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
action: std.noop
"""
wf_ex = self._run_workflow(wf_text)
wf_ex = self._run_workflow(wf_text, expected_state=states.SUCCESS)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -80,18 +80,16 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task3 = self._assert_single_item(tasks, name='task3')
task4 = self._assert_single_item(tasks, name='task4')
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(3, len(tasks))
self.assertEqual(2, len(tasks))
self.await_task_success(task1.id)
self.await_task_success(task3.id)
self.await_task_success(task4.id)
self.await_task_success(task2.id)
self.assertTrue(wf_ex.state, states.ERROR)
def test_direct_workflow_condition_transition_not_triggering(self):
def test_condition_transition_not_triggering(self):
wf_text = """---
version: '2.0'
@ -132,7 +130,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertTrue(wf_ex.state, states.ERROR)
def test_direct_workflow_change_state_after_success(self):
def test_change_state_after_success(self):
wf_text = """
version: '2.0'
@ -656,7 +654,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
len(db_api.get_delayed_calls(target_method_name=mtd_name)) == 0
)
def test_direct_workfow_output(self):
def test_output(self):
wf_text = """---
version: '2.0'
@ -680,3 +678,77 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual({}, wf_ex.output)
def test_triggered_by(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
on-error: task3
task3:
action: std.fail
on-error: noop
on-success: task4
on-complete: task4
task4:
action: std.noop
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task1 = self._assert_single_item(task_execs, name='task1')
task2 = self._assert_single_item(task_execs, name='task2')
task3 = self._assert_single_item(task_execs, name='task3')
task4 = self._assert_single_item(task_execs, name='task4')
key = 'triggered_by'
self.assertIsNone(task1.runtime_context.get(key))
self.assertListEqual(
[
{
"task_id": task1.id,
"event": "on-success"
}
],
task2.runtime_context.get(key)
)
self.assertListEqual(
[
{
"task_id": task2.id,
"event": "on-error"
}
],
task3.runtime_context.get(key)
)
self.assertListEqual(
[
{
"task_id": task3.id,
"event": "on-complete"
}
],
task4.runtime_context.get(key)
)

View File

@ -32,7 +32,6 @@ class TaskCancelTest(base.EngineTestCase):
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.async_noop
@ -119,13 +118,11 @@ class TaskCancelTest(base.EngineTestCase):
workflows:
wf:
type: direct
tasks:
taskx:
workflow: subwf
subwf:
type: direct
tasks:
task1:
action: std.async_noop
@ -207,7 +204,6 @@ class TaskCancelTest(base.EngineTestCase):
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.async_noop
@ -294,10 +290,11 @@ class TaskCancelTest(base.EngineTestCase):
def test_cancel_with_items_concurrency(self):
wb_def = """
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t1:
with-items: i in <% list(range(0, 4)) %>

View File

@ -47,7 +47,10 @@ class InspectUtilsTest(base.BaseTest):
clazz = commands.RunTask
parameters_str = i_u.get_arg_list_as_str(clazz.__init__)
self.assertEqual('wf_ex, wf_spec, task_spec, ctx', parameters_str)
self.assertEqual(
'wf_ex, wf_spec, task_spec, ctx, triggered_by=null',
parameters_str
)
def test_get_parameters_str_with_function_parameter(self):

View File

@ -22,16 +22,17 @@ class WorkflowCommand(object):
"""Workflow command.
A set of workflow commands form a communication protocol between workflow
handler and its clients. When workflow handler makes a decision about
controller and its clients. When workflow controller makes a decision about
how to continue a workflow it returns a set of commands so that a caller
knows what to do next.
"""
def __init__(self, wf_ex, wf_spec, task_spec, ctx):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, triggered_by=None):
self.wf_ex = wf_ex
self.wf_spec = wf_spec
self.task_spec = task_spec
self.ctx = ctx or {}
self.triggered_by = triggered_by
class Noop(WorkflowCommand):
@ -44,8 +45,14 @@ class Noop(WorkflowCommand):
class RunTask(WorkflowCommand):
"""Instruction to run a workflow task."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx):
super(RunTask, self).__init__(wf_ex, wf_spec, task_spec, ctx)
def __init__(self, wf_ex, wf_spec, task_spec, ctx, triggered_by=None):
super(RunTask, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
triggered_by=triggered_by
)
self.wait = False
self.unique_key = None
@ -82,8 +89,15 @@ class RunExistingTask(WorkflowCommand):
class SetWorkflowState(WorkflowCommand):
"""Instruction to change a workflow state."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, new_state, msg):
super(SetWorkflowState, self).__init__(wf_ex, wf_spec, task_spec, ctx)
def __init__(self, wf_ex, wf_spec, task_spec, ctx, new_state, msg=None,
triggered_by=None):
super(SetWorkflowState, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
triggered_by=triggered_by
)
self.new_state = new_state
self.msg = msg
@ -92,14 +106,16 @@ class SetWorkflowState(WorkflowCommand):
class FailWorkflow(SetWorkflowState):
"""Instruction to fail a workflow."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None,
triggered_by=None):
super(FailWorkflow, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
states.ERROR,
msg
msg=msg,
triggered_by=triggered_by
)
def __repr__(self):
@ -109,14 +125,16 @@ class FailWorkflow(SetWorkflowState):
class SucceedWorkflow(SetWorkflowState):
"""Instruction to succeed a workflow."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None,
triggered_by=None):
super(SucceedWorkflow, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
states.SUCCESS,
msg
msg=msg,
triggered_by=triggered_by
)
def __repr__(self):
@ -126,14 +144,16 @@ class SucceedWorkflow(SetWorkflowState):
class PauseWorkflow(SetWorkflowState):
"""Instruction to pause a workflow."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None,
triggered_by=None):
super(PauseWorkflow, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
states.PAUSED,
msg
msg=msg,
triggered_by=triggered_by
)
def __repr__(self):
@ -155,7 +175,7 @@ def get_command_class(cmd_name):
def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
explicit_params=None):
params=None, triggered_by=None):
cmd_cls = get_command_class(cmd_name) or RunTask
if issubclass(cmd_cls, SetWorkflowState):
@ -164,7 +184,14 @@ def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
wf_spec,
task_spec,
ctx,
explicit_params.get('msg')
msg=params.get('msg'),
triggered_by=triggered_by
)
else:
return cmd_cls(wf_ex, wf_spec, task_spec, ctx)
return cmd_cls(
wf_ex,
wf_spec,
task_spec,
ctx,
triggered_by=triggered_by
)

View File

@ -116,7 +116,7 @@ class DirectWorkflowController(base.WorkflowController):
ctx = data_flow.evaluate_task_outbound_context(task_ex)
for t_n, params in self._find_next_tasks(task_ex, ctx=ctx):
for t_n, params, event_name in self._find_next_tasks(task_ex, ctx=ctx):
t_s = self.wf_spec.get_tasks()[t_n]
if not (t_s or t_n in commands.RESERVED_CMDS):
@ -132,7 +132,8 @@ class DirectWorkflowController(base.WorkflowController):
self.wf_spec,
t_s,
ctx,
params
params=params,
triggered_by=(task_ex, event_name)
)
self._configure_if_join(cmd)
@ -161,11 +162,12 @@ class DirectWorkflowController(base.WorkflowController):
def evaluate_workflow_final_context(self):
ctx = {}
for t_ex in self._find_end_tasks():
for t_ex in self._find_end_task_executions():
ctx = utils.merge_dicts(
ctx,
data_flow.evaluate_task_outbound_context(t_ex)
)
data_flow.remove_internal_data_from_context(ctx)
return ctx
@ -202,7 +204,7 @@ class DirectWorkflowController(base.WorkflowController):
return True
def _find_end_tasks(self):
def _find_end_task_executions(self):
def is_end_task(t_ex):
try:
return not self._has_outbound_tasks(t_ex)
@ -214,8 +216,10 @@ class DirectWorkflowController(base.WorkflowController):
return True
return list(
filter(is_end_task,
lookup_utils.find_completed_tasks(self.wf_ex.id))
filter(
is_end_task,
lookup_utils.find_completed_task_executions(self.wf_ex.id)
)
)
def _has_outbound_tasks(self, task_ex):
@ -241,33 +245,33 @@ class DirectWorkflowController(base.WorkflowController):
self.wf_ex.input
)
t_names_and_params = []
# [(task_name, 'on-success'|'on-error'|'on-complete', params), ...]
result = []
def process_clause(clause, event_name):
task_tuples = self._find_next_tasks_for_clause(clause, ctx_view)
for t in task_tuples:
result.append((t[0], t[1], event_name))
if t_state == states.SUCCESS:
process_clause(
self.wf_spec.get_on_success_clause(t_name),
'on-success'
)
elif t_state == states.ERROR:
process_clause(
self.wf_spec.get_on_error_clause(t_name),
'on-error'
)
if states.is_completed(t_state) and not states.is_cancelled(t_state):
t_names_and_params += (
self._find_next_tasks_for_clause(
self.wf_spec.get_on_complete_clause(t_name),
ctx_view
)
process_clause(
self.wf_spec.get_on_complete_clause(t_name),
'on-complete'
)
if t_state == states.ERROR:
t_names_and_params += (
self._find_next_tasks_for_clause(
self.wf_spec.get_on_error_clause(t_name),
ctx_view
)
)
elif t_state == states.SUCCESS:
t_names_and_params += (
self._find_next_tasks_for_clause(
self.wf_spec.get_on_success_clause(t_name),
ctx_view
)
)
return t_names_and_params
return result
@staticmethod
def _find_next_tasks_for_clause(clause, ctx):
@ -276,7 +280,7 @@ class DirectWorkflowController(base.WorkflowController):
This method finds next task(command) base on given {name: condition}
dictionary.
:param clause: Dictionary {task_name: condition} taken from
:param clause: Tuple (task_name, condition, parameters) taken from
'on-complete', 'on-success' or 'on-error' clause.
:param ctx: Context that clause expressions should be evaluated
against of.

View File

@ -115,7 +115,7 @@ def find_cancelled_task_executions(wf_ex_id):
return find_task_executions_with_state(wf_ex_id, states.CANCELLED)
def find_completed_tasks(wf_ex_id):
def find_completed_task_executions(wf_ex_id):
return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id)