Use @db_utils.retry_on_deadlock to retry scheduler transactions

* Since scheduler has transactional logic we need to account for
  cases when these transactions hit DB deadlocks. It is possible
  just due to MySQL nature, it's recommended to always design
  based on that. This patch decomposes one big scheduler method
  that processes delayed calls into smaller methods so that we
  could apply @db_utils.retry_on_deadlock decorator to repeat
  transactions if they fail because of a deadlock in MySQL.
* Fixed taskk state update when we're assiging it to RUNNING.
  In this case we can't set it to RUNNING if there any paused
  child executions.
* Fixed the test for cascaded pausing which didn't account
  non-atomicity of this operation

Closes-Bug: #1715589
Change-Id: Iffa0fb540a5705c587d71d30af6ab913b26d3952
This commit is contained in:
Renat Akhmerov 2017-09-07 16:42:00 +07:00
parent 7627c5d9f9
commit a995916f14
4 changed files with 111 additions and 48 deletions

@ -77,7 +77,7 @@ def retry_on_deadlock(func):
def check_db_obj_access(db_obj):
"""Check accessbility to db object."""
"""Check accessibility to db object."""
ctx = context.ctx()
is_admin = ctx.is_admin

@ -216,8 +216,17 @@ class Task(object):
return
# Update only if state transition is valid.
if states.is_valid_transition(self.task_ex.state, state):
self.set_state(state, state_info)
if not states.is_valid_transition(self.task_ex.state, state):
return
# We can't set the task state to RUNNING if some other
# child executions are paused.
child_states = [a_ex.state for a_ex in self.task_ex.executions]
if state == states.RUNNING and states.PAUSED in child_states:
return
self.set_state(state, state_info)
def _before_task_start(self):
policies_spec = self.task_spec.get_policies()

@ -24,6 +24,7 @@ from oslo_service import threadgroup
from oslo_utils import importutils
from mistral import context
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
@ -102,37 +103,76 @@ class CallScheduler(periodic_task.PeriodicTasks):
# TODO(rakhmerov): Think how to make 'spacing' configurable.
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def run_delayed_calls(self, ctx=None):
time_filter = datetime.datetime.now() + datetime.timedelta(
seconds=1)
"""Run delayed required calls.
# Wrap delayed calls processing in transaction to guarantee that calls
# will be processed just once. Do delete query to DB first to force
# hanging up all parallel transactions.
# It should work with transactions which run at least 'READ-COMMITTED'
# mode.
delayed_calls = []
This algorithm should work with transactions having at least
'READ-COMMITTED' isolation mode.
:param ctx: Auth context.
"""
# Select and capture calls matching time criteria.
db_calls = self._capture_calls()
# Determine target methods, deserialize arguments etc.
prepared_calls = self._prepare_calls(db_calls)
# Invoke prepared calls.
self._invoke_calls(prepared_calls)
# Delete invoked calls from DB.
self.delete_calls(db_calls)
@staticmethod
@db_utils.retry_on_deadlock
def _capture_calls():
"""Captures delayed calls eligible for processing (based on time).
The intention of this method is to select delayed calls based on time
criteria and mark them in DB as being processed so that no other
threads could process them in parallel.
:return: A list of delayed calls captured for further processing.
"""
result = []
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1)
with db_api.transaction():
candidate_calls = db_api.get_delayed_calls_to_start(
time_filter
)
calls_to_make = []
candidates = db_api.get_delayed_calls_to_start(time_filter)
for call in candidate_calls:
for call in candidates:
# Mark this delayed call has been processed in order to
# prevent calling from parallel transaction.
result, number_of_updated = db_api.update_delayed_call(
db_call, updated_cnt = db_api.update_delayed_call(
id=call.id,
values={'processing': True},
query_filter={'processing': False}
)
# If number_of_updated != 1 other scheduler already
# updated.
if number_of_updated == 1:
calls_to_make.append(result)
# If updated_cnt != 1 then another scheduler
# has already updated it.
if updated_cnt == 1:
result.append(db_call)
for call in calls_to_make:
return result
@staticmethod
def _prepare_calls(raw_calls):
"""Prepares delayed calls for invocation.
After delayed calls were selected from DB they still need to be
prepared for further usage, we need to build final target methods
and deserialize arguments, if needed.
:param raw_calls: Delayed calls fetched from DB (DB models).
:return: A list of tuples (target_auth_context, target_method,
method_args) where all data is properly deserialized.
"""
result = []
for call in raw_calls:
LOG.debug(
'Processing next delayed call. '
'[ID=%s, factory_method_path=%s, target_method_name=%s, '
@ -143,9 +183,7 @@ class CallScheduler(periodic_task.PeriodicTasks):
target_auth_context = copy.deepcopy(call.auth_context)
if call.factory_method_path:
factory = importutils.import_class(
call.factory_method_path
)
factory = importutils.import_class(call.factory_method_path)
target_method = getattr(factory(), call.target_method_name)
else:
@ -166,9 +204,17 @@ class CallScheduler(periodic_task.PeriodicTasks):
method_args[arg_name] = deserialized
delayed_calls.append(
(target_auth_context, target_method, method_args)
)
result.append((target_auth_context, target_method, method_args))
return result
@staticmethod
def _invoke_calls(delayed_calls):
"""Invokes prepared delayed calls.
:param delayed_calls: Prepared delayed calls represented as tuples
(target_auth_context, target_method, method_args).
"""
ctx_serializer = context.RpcContextSerializer()
@ -177,7 +223,7 @@ class CallScheduler(periodic_task.PeriodicTasks):
# Set the correct context for the method.
ctx_serializer.deserialize_context(target_auth_context)
# Call the method.
# Invoke the method.
target_method(**method_args)
except Exception as e:
LOG.exception(
@ -189,14 +235,21 @@ class CallScheduler(periodic_task.PeriodicTasks):
# Remove context.
context.set_ctx(None)
@staticmethod
@db_utils.retry_on_deadlock
def delete_calls(db_calls):
"""Deletes delayed calls.
:param db_calls: Delayed calls to delete from DB.
"""
with db_api.transaction():
for call in calls_to_make:
for call in db_calls:
try:
# Delete calls that were processed.
db_api.delete_delayed_call(call.id)
except Exception as e:
LOG.error(
"failed to delete call [call=%s, "
"Failed to delete delayed call [call=%s, "
"exception=%s]", call, e
)

@ -1575,6 +1575,7 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
# Pause one of the subworkflows in the with-items task.
self.engine.pause_workflow(wf_2_ex_1.id)
self.await_workflow_paused(wf_2_ex_1.id)
self.await_workflow_paused(wf_2_ex_2.id)
self.await_workflow_paused(wf_2_ex_3.id)
@ -1589,8 +1590,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
@ -1613,8 +1612,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
wf_1_task_1_action_exs[0].id
)
wf_2_ex_1_task_execs = wf_2_ex_1.task_executions
wf_2_ex_1_task_1_ex = self._assert_single_item(
wf_2_ex_1.task_executions,
name='task1'
@ -1628,8 +1625,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
wf_1_task_1_action_exs[1].id
)
wf_2_ex_2_task_execs = wf_2_ex_2.task_executions
wf_2_ex_2_task_1_ex = self._assert_single_item(
wf_2_ex_2.task_executions,
name='task1'
@ -1643,8 +1638,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
wf_1_task_1_action_exs[2].id
)
wf_2_ex_3_task_execs = wf_2_ex_3.task_executions
wf_2_ex_3_task_1_ex = self._assert_single_item(
wf_2_ex_3.task_executions,
name='task1'
@ -1657,8 +1650,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
# Get objects for the wf3 subworkflow execution.
wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3')
wf_3_task_execs = wf_3_ex.task_executions
wf_3_task_1_ex = self._assert_single_item(
wf_3_ex.task_executions,
name='task1'
@ -1697,8 +1688,24 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_3_task_1_ex.state)
self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state)
# NOTE(rakhmerov): Since cascade pausing is not atomic we need
# to make sure that all internal operations related to pausing
# one of workflow executions 'wb.wf2' are completed. So we have
# to look if any "_on_action_update" calls are scheduled.
def _predicate():
return all(
[
'_on_action_update' not in c.target_method_name
for c in db_api.get_delayed_calls()
]
)
self._await(_predicate)
# Resume one of the subworkflows in the with-items task.
self.engine.resume_workflow(wf_2_ex_1.id)
self.await_workflow_running(wf_2_ex_1.id)
self.await_workflow_paused(wf_2_ex_2.id)
self.await_workflow_paused(wf_2_ex_3.id)
@ -1727,8 +1734,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
@ -1751,8 +1756,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
wf_1_task_1_action_exs[0].id
)
wf_2_ex_1_task_execs = wf_2_ex_1.task_executions
wf_2_ex_1_task_1_ex = self._assert_single_item(
wf_2_ex_1.task_executions,
name='task1'
@ -1766,8 +1769,6 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
wf_1_task_1_action_exs[1].id
)
wf_2_ex_2_task_execs = wf_2_ex_2.task_executions
wf_2_ex_2_task_1_ex = self._assert_single_item(
wf_2_ex_2.task_executions,
name='task1'