Treat NOOP or SUCCESS as visited and ERROR as blocker

This commit is contained in:
Dmitry Shulyak 2015-11-18 14:33:02 +02:00
parent a9d1de432e
commit d0c20cb5f2
2 changed files with 9 additions and 10 deletions

View File

@ -29,12 +29,10 @@ def celery_executor(dg, tasks, control_tasks=()):
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task = app.tasks[dg.node[task_name]['type']]
all_ok = all_success(dg, dg.predecessors(task_name))
if all_ok or task_name in control_tasks:
dg.node[task_name]['status'] = 'INPROGRESS'
dg.node[task_name]['start_time'] = time.time()
for t in generate_task(task, dg.node[task_name], task_id):
to_execute.append(t)
dg.node[task_name]['status'] = 'INPROGRESS'
dg.node[task_name]['start_time'] = time.time()
for t in generate_task(task, dg.node[task_name], task_id):
to_execute.append(t)
return group(to_execute)

View File

@ -32,8 +32,8 @@ from enum import Enum
states = Enum('States', 'SUCCESS ERROR NOOP INPROGRESS SKIPPED PENDING')
VISITED = (states.SUCCESS.name, states.ERROR.name, states.NOOP.name)
BLOCKED = (states.INPROGRESS.name, states.SKIPPED.name)
VISITED = (states.SUCCESS.name, states.NOOP.name)
BLOCKED = (states.INPROGRESS.name, states.SKIPPED.name, states.ERROR.name)
def traverse(dg):
@ -43,7 +43,7 @@ def traverse(dg):
data = dg.node[node]
if data['status'] in VISITED:
visited.add(node)
rst = []
for node in dg:
data = dg.node[node]
@ -51,4 +51,5 @@ def traverse(dg):
continue
if set(dg.predecessors(node)) <= visited:
yield node
rst.append(node)
return rst