Using count() instead of all() for getting incompleted tasks

* Workflow completion check logic used heavy select query which
  actually returned objects whereas all we need is to count them.
  This patch adds get_incompleted_task_executions_count() methods
  on DB API to address this.

Change-Id: I796ba175210f41ded7877f310efeb30bd5045531
This commit is contained in:
Renat Akhmerov 2016-09-12 16:18:03 +03:00
parent 33e0ee5ea8
commit 698248363c
5 changed files with 65 additions and 3 deletions

View File

@ -312,6 +312,10 @@ def get_incomplete_task_executions(**kwargs):
return IMPL.get_incomplete_task_executions(**kwargs)
def get_incomplete_task_executions_count(**kwargs):
return IMPL.get_incomplete_task_executions_count(**kwargs)
def create_task_execution(values):
return IMPL.create_task_execution(values)

View File

@ -797,7 +797,7 @@ def get_task_executions(**kwargs):
return _get_task_executions(**kwargs)
def get_incomplete_task_executions(**kwargs):
def _get_incomplete_task_executions_query(kwargs):
query = b.model_query(models.TaskExecution)
query = query.filter_by(**kwargs)
@ -811,9 +811,21 @@ def get_incomplete_task_executions(**kwargs):
)
)
return query
def get_incomplete_task_executions(**kwargs):
query = _get_incomplete_task_executions_query(kwargs)
return query.all()
def get_incomplete_task_executions_count(**kwargs):
query = _get_incomplete_task_executions_query(kwargs)
return query.count()
@b.session_aware()
def create_task_execution(values, session=None):
task_ex = models.TaskExecution()

View File

@ -52,6 +52,7 @@ def on_action_complete(action_ex, result):
task_handler.schedule_on_action_complete(action_ex)
@profiler.trace('action-handler-build-action')
def _build_action(action_ex):
if isinstance(action_ex, models.WorkflowExecution):
return actions.WorkflowAction(None, action_ex=action_ex)

View File

@ -280,11 +280,11 @@ class Workflow(object):
# Workflow is not completed if there are any incomplete task
# executions.
incomplete_tasks = db_api.get_incomplete_task_executions(
incomplete_tasks_count = db_api.get_incomplete_task_executions_count(
workflow_execution_id=self.wf_ex.id,
)
if incomplete_tasks:
if incomplete_tasks_count > 0:
return
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)

View File

@ -1401,6 +1401,51 @@ class TaskExecutionTest(SQLAlchemyTest):
created.id
)
def test_get_incomplete_task_executions(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
values = copy.deepcopy(TASK_EXECS[0])
values.update({'workflow_execution_id': wf_ex.id})
values['state'] = 'RUNNING'
task_ex1 = db_api.create_task_execution(values)
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
workflow_execution_id=wf_ex.id
)
)
# Add one more task.
values = copy.deepcopy(TASK_EXECS[1])
values.update({'workflow_execution_id': wf_ex.id})
values['state'] = 'SUCCESS'
db_api.create_task_execution(values)
# It should be still one incompleted task.
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
workflow_execution_id=wf_ex.id
)
)
def test_task_execution_repr(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])