Refactor task output: 'db_tasks'->'task_execs', 'db_execs'->'wf_execs'

Change-Id: I921d5ecb480e360160717be2cc56d57e0fbfa18b
This commit is contained in:
Renat Akhmerov 2015-02-26 17:45:23 +06:00
parent 848d35c64b
commit 78e6acd35f
7 changed files with 41 additions and 34 deletions

View File

@ -347,9 +347,14 @@ class Engine(object):
def _create_next_tasks(cls, task, workbook):
tasks = workflow.find_tasks_after_completion(task, workbook)
db_tasks = cls._create_tasks(tasks, workbook, task.workbook_name,
task.execution_id)
return workflow.find_resolved_tasks(db_tasks)
task_execs = cls._create_tasks(
tasks,
workbook,
task.workbook_name,
task.execution_id
)
return workflow.find_resolved_tasks(task_execs)
@classmethod
def _create_tasks(cls, task_list, workbook, workbook_name, execution_id):

View File

@ -115,15 +115,15 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertDictEqual({}, exec1_db.input)
self.assertDictEqual({'env': env}, exec1_db.start_params)
db_execs = db_api.get_workflow_executions()
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(db_execs))
self.assertEqual(2, len(wf_execs))
# Execution 2.
if db_execs[0].id != exec1_db.id:
exec2_db = db_execs[0]
if wf_execs[0].id != exec1_db.id:
exec2_db = wf_execs[0]
else:
exec2_db = db_execs[1]
exec2_db = wf_execs[1]
expected_start_params = {
'task_name': 'task2',

View File

@ -87,15 +87,15 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertDictEqual({}, exec1_db.input)
self.assertDictEqual({}, exec1_db.start_params)
db_execs = db_api.get_workflow_executions()
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(db_execs))
self.assertEqual(2, len(wf_execs))
# Execution 2.
if db_execs[0].id != exec1_db.id:
exec2_db = db_execs[0]
if wf_execs[0].id != exec1_db.id:
exec2_db = wf_execs[0]
else:
exec2_db = db_execs[1]
exec2_db = wf_execs[1]
self.assertEqual(project_id, exec2_db.project_id)
self.assertIsNotNone(exec2_db.task_execution_id)
@ -159,14 +159,14 @@ class SubworkflowsTest(base.EngineTestCase):
def test_subworkflow_error(self):
exec1_db = self.engine.start_workflow('my_wb.wf2', None)
db_execs = db_api.get_workflow_executions()
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(db_execs))
self.assertEqual(2, len(wf_execs))
if db_execs[0].id != exec1_db.id:
exec2_db = db_execs[0]
if wf_execs[0].id != exec1_db.id:
exec2_db = wf_execs[0]
else:
exec2_db = db_execs[1]
exec2_db = wf_execs[1]
# Wait till workflow 'wf1' is completed.
self._await(lambda: self.is_execution_error(exec2_db.id))
@ -185,15 +185,15 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertDictEqual({}, exec1_db.input)
self.assertDictEqual({'env': env}, exec1_db.start_params)
db_execs = db_api.get_workflow_executions()
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(db_execs))
self.assertEqual(2, len(wf_execs))
# Execution 2.
if db_execs[0].id != exec1_db.id:
exec2_db = db_execs[0]
if wf_execs[0].id != exec1_db.id:
exec2_db = wf_execs[0]
else:
exec2_db = db_execs[1]
exec2_db = wf_execs[1]
expected_start_params = {
'task_name': 'task2',

View File

@ -40,14 +40,14 @@ def prepare_db_task(task_ex, task_spec, upstream_task_specs, wf_ex,
:param wf_ex: Execution DB model.
"""
upstream_db_tasks = wf_utils.find_db_tasks(
upstream_task_execs = wf_utils.find_task_executions(
wf_ex,
upstream_task_specs
)
task_ex.in_context = utils.merge_dicts(
copy.copy(wf_ex.context),
_evaluate_upstream_context(upstream_db_tasks)
_evaluate_upstream_context(upstream_task_execs)
)
task_ex.input = evaluate_task_input(
@ -75,13 +75,13 @@ def evaluate_task_input(task_spec, context):
return expr.evaluate_recursively(task_spec.get_input(), context)
def _evaluate_upstream_context(upstream_db_tasks):
def _evaluate_upstream_context(upstream_task_execs):
task_result_ctx = {}
ctx = {}
for t_db in upstream_db_tasks:
task_result_ctx = utils.merge_dicts(task_result_ctx, t_db.result)
utils.merge_dicts(ctx, evaluate_task_outbound_context(t_db))
for t_ex in upstream_task_execs:
task_result_ctx = utils.merge_dicts(task_result_ctx, t_ex.result)
utils.merge_dicts(ctx, evaluate_task_outbound_context(t_ex))
return utils.merge_dicts(ctx, task_result_ctx)

View File

@ -89,6 +89,8 @@ class DirectWorkflowHandler(base.WorkflowHandler):
return to_task_name in t_names
# TODO(rakhmerov): Need to refactor this method to be able to pass tasks
# whose contexts need to be merged.
def _evaluate_workflow_final_context(self, cause_task_ex):
ctx = {}

View File

@ -145,8 +145,8 @@ class ReverseWorkflowHandler(base.WorkflowHandler):
return dep_t_specs
def _find_db_task(self, name):
db_tasks = filter(
task_execs = filter(
lambda t: t.name == name, self.wf_ex.task_executions
)
return db_tasks[0] if db_tasks else None
return task_execs[0] if task_execs else None

View File

@ -46,15 +46,15 @@ class TaskResultSerializer(serializer.Serializer):
def find_db_task(wf_ex, task_spec):
db_tasks = [
task_execs = [
t for t in wf_ex.task_executions
if t.name == task_spec.get_name()
]
return db_tasks[0] if len(db_tasks) > 0 else None
return task_execs[0] if len(task_execs) > 0 else None
def find_db_tasks(wf_ex, task_specs):
def find_task_executions(wf_ex, task_specs):
return filter(None, [find_db_task(wf_ex, t_s) for t_s in task_specs])