diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index c1106cf28..500464d7e 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from osprofiler import profiler import traceback as tb +from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models from mistral.engine import action_queue @@ -402,6 +403,7 @@ def _schedule_refresh_task_state(task_ex, delay=0): ) +@db_utils.retry_on_deadlock @action_queue.process def _scheduled_on_action_complete(action_ex_id, wf_action): with db_api.transaction(): diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 31cb8580a..390a53eaf 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -492,52 +492,41 @@ class WithItemsTask(RegularTask): def on_action_complete(self, action_ex): assert self.task_ex - if (not self._get_concurrency() and - not self.task_spec.get_policies().get_retry()): - self._on_action_complete(action_ex) - else: - # If we need to control 'concurrency' we need to do atomic - # reads/writes to task runtime context. Locking prevents us - # from modifying runtime context simultaneously by multiple - # transactions. - with db_api.named_lock('with-items-%s' % self.task_ex.id): - # NOTE: We need to refresh task execution object right - # after the lock is acquired to make sure that we're - # working with a fresh state of its runtime context. - # Otherwise, SQLAlchemy session can contain a stale - # cached version of it so that we don't modify actual - # values (i.e. capacity). - db_api.refresh(self.task_ex) + with db_api.named_lock('with-items-%s' % self.task_ex.id): + # NOTE: We need to refresh task execution object right + # after the lock is acquired to make sure that we're + # working with a fresh state of its runtime context. + # Otherwise, SQLAlchemy session can contain a stale + # cached version of it so that we don't modify actual + # values (i.e. capacity). + db_api.refresh(self.task_ex) - self._on_action_complete(action_ex) + if self.is_completed(): + return - def _on_action_complete(self, action_ex): - if self.is_completed(): - return + self._increase_capacity() - self._increase_capacity() + if self.is_with_items_completed(): + state = self._get_final_state() - if self.is_with_items_completed(): - state = self._get_final_state() + # TODO(rakhmerov): Here we can define more informative messages + # in cases when action is successful and when it's not. + # For example, in state_info we can specify the cause action. + # The use of action_ex.output.get('result') for state_info is + # not accurate because there could be action executions that + # had failed or was cancelled prior to this action execution. + state_info = { + states.SUCCESS: None, + states.ERROR: 'One or more actions had failed.', + states.CANCELLED: 'One or more actions was cancelled.' + } - # TODO(rakhmerov): Here we can define more informative messages - # in cases when action is successful and when it's not. - # For example, in state_info we can specify the cause action. - # The use of action_ex.output.get('result') for state_info is not - # accurate because there could be action executions that had - # failed or was cancelled prior to this action execution. - state_info = { - states.SUCCESS: None, - states.ERROR: 'One or more actions had failed.', - states.CANCELLED: 'One or more actions was cancelled.' - } + self.complete(state, state_info[state]) - self.complete(state, state_info[state]) + return - return - - if self._has_more_iterations() and self._get_concurrency(): - self._schedule_actions() + if self._has_more_iterations() and self._get_concurrency(): + self._schedule_actions() def _schedule_actions(self): with_items_values = self._get_with_items_values()