diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index a0b99a56..c041a886 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -102,9 +102,8 @@ def _get_task_resources_with_results(wf_ex_id=None): if wf_ex_id: filters['workflow_execution_id'] = wf_ex_id - with db_api.transaction(): - task_exs = db_api.get_task_executions(**filters) - tasks = [_get_task_resource_with_result(t_e) for t_e in task_exs] + task_exs = db_api.get_task_executions(**filters) + tasks = [_get_task_resource_with_result(t_e) for t_e in task_exs] return Tasks(tasks=tasks) diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index f9ab346f..f4ae215c 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -272,6 +272,14 @@ class RetryPolicy(base.TaskPolicy): """ super(RetryPolicy, self).after_task_complete(task_ex, task_spec) + # TODO(m4dcoder): If the task_ex.executions collection is not called, + # then the retry_no in the runtime_context of the task_ex will not + # be updated accurately. To be exact, the retry_no will be one + # iteration behind. task_ex.executions was originally called in + # get_task_execution_result but it was refactored to use + # db_api.get_action_executions to support session-less use cases. + action_ex = task_ex.executions # noqa + context_key = 'retry_task_policy' runtime_context = _ensure_context_has_key( diff --git a/mistral/tests/unit/engine/test_dataflow.py b/mistral/tests/unit/engine/test_dataflow.py index ebe8dbd0..c92275e4 100644 --- a/mistral/tests/unit/engine/test_dataflow.py +++ b/mistral/tests/unit/engine/test_dataflow.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock + from oslo_config import cfg from oslo_log import log as logging @@ -472,26 +474,36 @@ class DataFlowTest(test_base.BaseTest): } ) - task_ex.executions.append(models.ActionExecution( + action_exs = [] + + action_exs.append(models.ActionExecution( name='my_action', output={'result': 1}, accepted=True, runtime_context={'with_items_index': 0} )) - self.assertEqual([1], data_flow.get_task_execution_result(task_ex)) + with mock.patch.object(db_api, 'get_action_executions', + return_value=action_exs): + self.assertEqual([1], data_flow.get_task_execution_result(task_ex)) - task_ex.executions.append(models.ActionExecution( + action_exs.append(models.ActionExecution( name='my_action', output={'result': 1}, accepted=True, runtime_context={'with_items_index': 0} )) - task_ex.executions.append(models.ActionExecution( + + action_exs.append(models.ActionExecution( name='my_action', output={'result': 1}, accepted=False, runtime_context={'with_items_index': 0} )) - self.assertEqual([1, 1], data_flow.get_task_execution_result(task_ex)) + with mock.patch.object(db_api, 'get_action_executions', + return_value=action_exs): + self.assertEqual( + [1, 1], + data_flow.get_task_execution_result(task_ex) + ) diff --git a/mistral/tests/unit/engine/test_policies.py b/mistral/tests/unit/engine/test_policies.py index 65853434..b7be6c91 100644 --- a/mistral/tests/unit/engine/test_policies.py +++ b/mistral/tests/unit/engine/test_policies.py @@ -715,8 +715,8 @@ class PoliciesTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) task_ex = wf_ex.task_executions[0] - self.assertEqual( - {}, + self.assertDictEqual( + {'retry_no': 1}, task_ex.runtime_context['retry_task_policy'] ) diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 8f6c6551..06c0bbf5 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -87,14 +87,20 @@ def invalidate_task_execution_result(task_ex): def get_task_execution_result(task_ex): - action_execs = task_ex.executions + # Use of task_ex.executions requires a session to lazy load the action + # executions. This get_task_execution_result method is also invoked + # from get_all in the task execution API controller. If there is a lot of + # read against the API, it will lead to a lot of unnecessary DB locks + # which result in possible deadlocks and WF execution failures. Therefore, + # use db_api.get_action_executions here to avoid session-less use cases. + action_execs = db_api.get_action_executions(task_execution_id=task_ex.id) action_execs.sort( key=lambda x: x.runtime_context.get('with_items_index') ) results = [ _extract_execution_result(ex) - for ex in task_ex.executions + for ex in action_execs if hasattr(ex, 'output') and ex.accepted ]