Add fixes for celery executor and unit test

This commit is contained in:
Dmitry Shulyak
2015-07-23 16:30:27 +03:00
parent 54502c72b9
commit e1a83b9855
3 changed files with 30 additions and 7 deletions

View File

@@ -5,15 +5,17 @@ from celery import group
def celery_executor(dg, tasks, control_tasks=()):
to_execute = []
for task in tasks:
for task_name in tasks:
# task_id needs to be unique, so for each plan we will use
# generated uid of this plan and task_name
task_id = '{}.{}'.format(dg.graph['uid'], task)
task = app.tasks[dg.node[task]['type']]
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task = app.tasks[dg.node[task_name]['type']]
if all_success(dg, dg.predecessors(task)) or task in control_tasks:
dg.node[task]['status'] = 'INPROGRESS'
for t in generate_task(task, dg.node[task], task_id):
if all_success(dg, dg.predecessors(task_name)) or task_name in control_tasks:
dg.node[task_name]['status'] = 'INPROGRESS'
for t in generate_task(task, dg.node[task_name], task_id):
to_execute.append(t)
return group(to_execute)

View File

@@ -111,7 +111,6 @@ def schedule(plan_uid, dg):
tasks)
execution = executor.celery_executor(
dg, limit_chain, control_tasks=('fault_tolerance',))
graph.save_graph(plan_uid, dg)
execution()

View File

@@ -0,0 +1,22 @@
import networkx as nx
from pytest import fixture
from mock import patch
from solar.orchestration import executor
@fixture
def dg():
ex = nx.DiGraph()
ex.add_node('t1', args=['t'], status='PENDING', type='echo')
ex.graph['uid'] = 'some_string'
return ex
@patch.object(executor, 'app')
def test_celery_executor(mapp, dg):
"""Just check that it doesnt fail for now.
"""
assert executor.celery_executor(dg, ['t1'])
assert dg.node['t1']['status'] == 'INPROGRESS'