Merge "Remove the transaction scope from task executions API"

This commit is contained in:
Jenkins 2015-10-07 08:30:31 +00:00 committed by Gerrit Code Review
commit 107cd6c5cb
5 changed files with 37 additions and 12 deletions

View File

@ -102,9 +102,8 @@ def _get_task_resources_with_results(wf_ex_id=None):
if wf_ex_id: if wf_ex_id:
filters['workflow_execution_id'] = wf_ex_id filters['workflow_execution_id'] = wf_ex_id
with db_api.transaction(): task_exs = db_api.get_task_executions(**filters)
task_exs = db_api.get_task_executions(**filters) tasks = [_get_task_resource_with_result(t_e) for t_e in task_exs]
tasks = [_get_task_resource_with_result(t_e) for t_e in task_exs]
return Tasks(tasks=tasks) return Tasks(tasks=tasks)

View File

@ -272,6 +272,14 @@ class RetryPolicy(base.TaskPolicy):
""" """
super(RetryPolicy, self).after_task_complete(task_ex, task_spec) super(RetryPolicy, self).after_task_complete(task_ex, task_spec)
# TODO(m4dcoder): If the task_ex.executions collection is not called,
# then the retry_no in the runtime_context of the task_ex will not
# be updated accurately. To be exact, the retry_no will be one
# iteration behind. task_ex.executions was originally called in
# get_task_execution_result but it was refactored to use
# db_api.get_action_executions to support session-less use cases.
action_ex = task_ex.executions # noqa
context_key = 'retry_task_policy' context_key = 'retry_task_policy'
runtime_context = _ensure_context_has_key( runtime_context = _ensure_context_has_key(

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -472,26 +474,36 @@ class DataFlowTest(test_base.BaseTest):
} }
) )
task_ex.executions.append(models.ActionExecution( action_exs = []
action_exs.append(models.ActionExecution(
name='my_action', name='my_action',
output={'result': 1}, output={'result': 1},
accepted=True, accepted=True,
runtime_context={'with_items_index': 0} runtime_context={'with_items_index': 0}
)) ))
self.assertEqual([1], data_flow.get_task_execution_result(task_ex)) with mock.patch.object(db_api, 'get_action_executions',
return_value=action_exs):
self.assertEqual([1], data_flow.get_task_execution_result(task_ex))
task_ex.executions.append(models.ActionExecution( action_exs.append(models.ActionExecution(
name='my_action', name='my_action',
output={'result': 1}, output={'result': 1},
accepted=True, accepted=True,
runtime_context={'with_items_index': 0} runtime_context={'with_items_index': 0}
)) ))
task_ex.executions.append(models.ActionExecution(
action_exs.append(models.ActionExecution(
name='my_action', name='my_action',
output={'result': 1}, output={'result': 1},
accepted=False, accepted=False,
runtime_context={'with_items_index': 0} runtime_context={'with_items_index': 0}
)) ))
self.assertEqual([1, 1], data_flow.get_task_execution_result(task_ex)) with mock.patch.object(db_api, 'get_action_executions',
return_value=action_exs):
self.assertEqual(
[1, 1],
data_flow.get_task_execution_result(task_ex)
)

View File

@ -715,8 +715,8 @@ class PoliciesTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0] task_ex = wf_ex.task_executions[0]
self.assertEqual( self.assertDictEqual(
{}, {'retry_no': 1},
task_ex.runtime_context['retry_task_policy'] task_ex.runtime_context['retry_task_policy']
) )

View File

@ -87,14 +87,20 @@ def invalidate_task_execution_result(task_ex):
def get_task_execution_result(task_ex): def get_task_execution_result(task_ex):
action_execs = task_ex.executions # Use of task_ex.executions requires a session to lazy load the action
# executions. This get_task_execution_result method is also invoked
# from get_all in the task execution API controller. If there is a lot of
# read against the API, it will lead to a lot of unnecessary DB locks
# which result in possible deadlocks and WF execution failures. Therefore,
# use db_api.get_action_executions here to avoid session-less use cases.
action_execs = db_api.get_action_executions(task_execution_id=task_ex.id)
action_execs.sort( action_execs.sort(
key=lambda x: x.runtime_context.get('with_items_index') key=lambda x: x.runtime_context.get('with_items_index')
) )
results = [ results = [
_extract_execution_result(ex) _extract_execution_result(ex)
for ex in task_ex.executions for ex in action_execs
if hasattr(ex, 'output') and ex.accepted if hasattr(ex, 'output') and ex.accepted
] ]