Fix "with-items" locking
* Previously we thought that we can avoid locking task execution when we process completion of an individual action inside WithItemsTask. But that was wrong because even when we don't need to control "concurrency" property we still can get a situation when two different action completions will be processed via scheduler and both will see all actions in state SUCCESS (because scheduler handles it in a different later transaction) and hence both will complete the task and run tasks from "on-xxx" clauses. This patch makes WithItemsTask always use locking. * Added db_utils.retry_on_deadlock for _scheduled_on_action_complete() method since it opens a new DB transaction and can potentially end up in a dead lock (due to MySQL nature) Closes-Bug: #1715116 Change-Id: I6f34409e7182af3ca5b13c17e6d6fb4302f9efed
This commit is contained in:
parent
9f5160f0dc
commit
7b45a50a42
@ -19,6 +19,7 @@ from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
import traceback as tb
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral.engine import action_queue
|
||||
@ -402,6 +403,7 @@ def _schedule_refresh_task_state(task_ex, delay=0):
|
||||
)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def _scheduled_on_action_complete(action_ex_id, wf_action):
|
||||
with db_api.transaction():
|
||||
|
@ -492,52 +492,41 @@ class WithItemsTask(RegularTask):
|
||||
def on_action_complete(self, action_ex):
|
||||
assert self.task_ex
|
||||
|
||||
if (not self._get_concurrency() and
|
||||
not self.task_spec.get_policies().get_retry()):
|
||||
self._on_action_complete(action_ex)
|
||||
else:
|
||||
# If we need to control 'concurrency' we need to do atomic
|
||||
# reads/writes to task runtime context. Locking prevents us
|
||||
# from modifying runtime context simultaneously by multiple
|
||||
# transactions.
|
||||
with db_api.named_lock('with-items-%s' % self.task_ex.id):
|
||||
# NOTE: We need to refresh task execution object right
|
||||
# after the lock is acquired to make sure that we're
|
||||
# working with a fresh state of its runtime context.
|
||||
# Otherwise, SQLAlchemy session can contain a stale
|
||||
# cached version of it so that we don't modify actual
|
||||
# values (i.e. capacity).
|
||||
db_api.refresh(self.task_ex)
|
||||
with db_api.named_lock('with-items-%s' % self.task_ex.id):
|
||||
# NOTE: We need to refresh task execution object right
|
||||
# after the lock is acquired to make sure that we're
|
||||
# working with a fresh state of its runtime context.
|
||||
# Otherwise, SQLAlchemy session can contain a stale
|
||||
# cached version of it so that we don't modify actual
|
||||
# values (i.e. capacity).
|
||||
db_api.refresh(self.task_ex)
|
||||
|
||||
self._on_action_complete(action_ex)
|
||||
if self.is_completed():
|
||||
return
|
||||
|
||||
def _on_action_complete(self, action_ex):
|
||||
if self.is_completed():
|
||||
return
|
||||
self._increase_capacity()
|
||||
|
||||
self._increase_capacity()
|
||||
if self.is_with_items_completed():
|
||||
state = self._get_final_state()
|
||||
|
||||
if self.is_with_items_completed():
|
||||
state = self._get_final_state()
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
# in cases when action is successful and when it's not.
|
||||
# For example, in state_info we can specify the cause action.
|
||||
# The use of action_ex.output.get('result') for state_info is
|
||||
# not accurate because there could be action executions that
|
||||
# had failed or was cancelled prior to this action execution.
|
||||
state_info = {
|
||||
states.SUCCESS: None,
|
||||
states.ERROR: 'One or more actions had failed.',
|
||||
states.CANCELLED: 'One or more actions was cancelled.'
|
||||
}
|
||||
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
# in cases when action is successful and when it's not.
|
||||
# For example, in state_info we can specify the cause action.
|
||||
# The use of action_ex.output.get('result') for state_info is not
|
||||
# accurate because there could be action executions that had
|
||||
# failed or was cancelled prior to this action execution.
|
||||
state_info = {
|
||||
states.SUCCESS: None,
|
||||
states.ERROR: 'One or more actions had failed.',
|
||||
states.CANCELLED: 'One or more actions was cancelled.'
|
||||
}
|
||||
self.complete(state, state_info[state])
|
||||
|
||||
self.complete(state, state_info[state])
|
||||
return
|
||||
|
||||
return
|
||||
|
||||
if self._has_more_iterations() and self._get_concurrency():
|
||||
self._schedule_actions()
|
||||
if self._has_more_iterations() and self._get_concurrency():
|
||||
self._schedule_actions()
|
||||
|
||||
def _schedule_actions(self):
|
||||
with_items_values = self._get_with_items_values()
|
||||
|
Loading…
Reference in New Issue
Block a user