Add a periodic job to check workflow execution integrity
* Added a method called via scheduler to check and fix workflow execution integrity. Specifically, this method finds all task executions in RUNNING state and for each of them checks whether all its action/workflow executions are finished. If yes, it most likely means that the task got stuck (i.e. an RPC was lost) and we need to repair it. * Also added atomic update of a task execution state using the well-known approach with "UPDATE ... WHERE" SQL query which returns a number of actually updated rows. This prevents from possible races between concurrent processes that try to update a task execution in overlapping transactions. * Other minor style changes Change-Id: If4f2efb05d959d1ffdb16515f08a811c5ce597e6
This commit is contained in:
parent
dd1bcc1ea1
commit
0d2ebb7b01
@ -161,6 +161,17 @@ engine_opts = [
|
||||
default=1024,
|
||||
help=_('The default maximum size in KB of large text fields '
|
||||
'of runtime execution objects. Use -1 for no limit.')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'execution_integrity_check_delay',
|
||||
default=20,
|
||||
help=_('A number of seconds since the last update of a task'
|
||||
' execution in RUNNING state after which Mistral will'
|
||||
' start checking its integrity, meaning that if all'
|
||||
' associated actions/workflows are finished its state'
|
||||
' will be restored automatically. If this property is'
|
||||
' set to a negative value Mistral will never be doing '
|
||||
' this check.')
|
||||
)
|
||||
]
|
||||
|
||||
|
@ -340,6 +340,10 @@ def delete_task_executions(**kwargs):
|
||||
return IMPL.delete_task_executions(**kwargs)
|
||||
|
||||
|
||||
def update_task_execution_state(**kwargs):
|
||||
return IMPL.update_task_execution_state(**kwargs)
|
||||
|
||||
|
||||
# Delayed calls.
|
||||
|
||||
def get_delayed_calls_to_start(time):
|
||||
|
@ -847,7 +847,6 @@ def delete_workflow_executions(session=None, **kwargs):
|
||||
|
||||
@b.session_aware()
|
||||
def update_workflow_execution_state(id, cur_state, state, session=None):
|
||||
|
||||
wf_ex = None
|
||||
|
||||
# Use WHERE clause to exclude possible conflicts if the state has
|
||||
@ -1002,6 +1001,34 @@ def _get_task_executions(**kwargs):
|
||||
return _get_collection_sorted_by_time(models.TaskExecution, **kwargs)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def update_task_execution_state(id, cur_state, state, session=None):
|
||||
wf_ex = None
|
||||
|
||||
# Use WHERE clause to exclude possible conflicts if the state has
|
||||
# already been changed.
|
||||
try:
|
||||
specimen = models.TaskExecution(
|
||||
id=id,
|
||||
state=cur_state
|
||||
)
|
||||
|
||||
wf_ex = b.model_query(
|
||||
models.TaskExecution).update_on_match(
|
||||
specimen=specimen,
|
||||
surrogate_key='id',
|
||||
values={'state': state}
|
||||
)
|
||||
except oslo_sqlalchemy.update_match.NoRowsMatched:
|
||||
LOG.info(
|
||||
"Can't change task execution state from %s to %s, "
|
||||
"because it has already been changed. [execution_id=%s]",
|
||||
cur_state, state, id
|
||||
)
|
||||
|
||||
return wf_ex
|
||||
|
||||
|
||||
# Delayed calls.
|
||||
|
||||
@b.session_aware()
|
||||
|
@ -125,29 +125,43 @@ class Task(object):
|
||||
:param state: New task state.
|
||||
:param state_info: New state information (i.e. error message).
|
||||
:param processed: New "processed" flag value.
|
||||
:return True if the state was changed as a result of this call,
|
||||
False otherwise.
|
||||
"""
|
||||
|
||||
assert self.task_ex
|
||||
|
||||
if (self.task_ex.state != state or
|
||||
self.task_ex.state_info != state_info):
|
||||
cur_state = self.task_ex.state
|
||||
|
||||
if cur_state != state or self.task_ex.state_info != state_info:
|
||||
task_ex = db_api.update_task_execution_state(
|
||||
id=self.task_ex.id,
|
||||
cur_state=cur_state,
|
||||
state=state
|
||||
)
|
||||
|
||||
if task_ex is None:
|
||||
# Do nothing because the update query did not change the DB.
|
||||
return False
|
||||
|
||||
self.task_ex = task_ex
|
||||
self.task_ex.state_info = state_info
|
||||
self.state_changed = True
|
||||
|
||||
if processed is not None:
|
||||
self.task_ex.processed = processed
|
||||
|
||||
wf_trace.info(
|
||||
self.task_ex.workflow_execution,
|
||||
"Task '%s' (%s) [%s -> %s, msg=%s]" %
|
||||
(self.task_ex.name,
|
||||
self.task_ex.id,
|
||||
self.task_ex.state,
|
||||
cur_state,
|
||||
state,
|
||||
state_info)
|
||||
)
|
||||
|
||||
self.state_changed = True
|
||||
|
||||
self.task_ex.state = state
|
||||
self.task_ex.state_info = state_info
|
||||
|
||||
if processed is not None:
|
||||
self.task_ex.processed = processed
|
||||
return True
|
||||
|
||||
@profiler.trace('task-complete')
|
||||
def complete(self, state, state_info=None):
|
||||
@ -164,10 +178,15 @@ class Task(object):
|
||||
assert self.task_ex
|
||||
|
||||
# Ignore if task already completed.
|
||||
if states.is_completed(self.task_ex.state):
|
||||
if self.is_completed():
|
||||
return
|
||||
|
||||
self.set_state(state, state_info)
|
||||
# If we were unable to change the task state it means that it was
|
||||
# already changed by a concurrent process. In this case we need to
|
||||
# skip all regular completion logic like scheduling new tasks,
|
||||
# running engine commands and publishing.
|
||||
if not self.set_state(state, state_info):
|
||||
return
|
||||
|
||||
data_flow.publish_variables(self.task_ex, self.task_spec)
|
||||
|
||||
|
@ -13,7 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
from osprofiler import profiler
|
||||
import traceback as tb
|
||||
|
||||
@ -27,6 +29,7 @@ from mistral.workflow import states
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
_CHECK_AND_COMPLETE_PATH = (
|
||||
'mistral.engine.workflow_handler._check_and_complete'
|
||||
@ -96,7 +99,11 @@ def _check_and_complete(wf_ex_id):
|
||||
|
||||
wf = workflows.Workflow(wf_ex=wf_ex)
|
||||
|
||||
incomplete_tasks_count = 0
|
||||
|
||||
try:
|
||||
check_and_fix_integrity(wf_ex)
|
||||
|
||||
incomplete_tasks_count = wf.check_and_complete()
|
||||
except exc.MistralException as e:
|
||||
msg = (
|
||||
@ -109,22 +116,78 @@ def _check_and_complete(wf_ex_id):
|
||||
force_fail_workflow(wf.wf_ex, msg)
|
||||
|
||||
return
|
||||
finally:
|
||||
if states.is_completed(wf_ex.state):
|
||||
return
|
||||
|
||||
if not states.is_completed(wf_ex.state):
|
||||
# Let's assume that a task takes 0.01 sec in average to complete
|
||||
# and based on this assumption calculate a time of the next check.
|
||||
# The estimation is very rough but this delay will be decreasing
|
||||
# as tasks will be completing which will give a decent
|
||||
# approximation.
|
||||
# For example, if a workflow has 100 incomplete tasks then the
|
||||
# next check call will happen in 10 seconds. For 500 tasks it will
|
||||
# be 50 seconds. The larger the workflow is, the more beneficial
|
||||
# next check call will happen in 1 second. For 500 tasks it will
|
||||
# be 5 seconds. The larger the workflow is, the more beneficial
|
||||
# this mechanism will be.
|
||||
delay = int(incomplete_tasks_count * 0.01)
|
||||
delay = (
|
||||
int(incomplete_tasks_count * 0.01) if incomplete_tasks_count
|
||||
else 4
|
||||
)
|
||||
|
||||
_schedule_check_and_complete(wf_ex, delay)
|
||||
|
||||
|
||||
@profiler.trace('workflow-handler-check-and-fix-integrity')
|
||||
def check_and_fix_integrity(wf_ex):
|
||||
check_after_seconds = CONF.engine.execution_integrity_check_delay
|
||||
|
||||
if check_after_seconds < 0:
|
||||
# Never check integrity if it's a negative value.
|
||||
return
|
||||
|
||||
# To break cyclic dependency.
|
||||
from mistral.engine import task_handler
|
||||
|
||||
running_task_execs = db_api.get_task_executions(
|
||||
workflow_execution_id=wf_ex.id,
|
||||
state=states.RUNNING
|
||||
)
|
||||
|
||||
for t_ex in running_task_execs:
|
||||
# The idea is that we take the latest known timestamp of the task
|
||||
# execution and consider it eligible for checking and fixing only
|
||||
# if some minimum period of time elapsed since the last update.
|
||||
timestamp = t_ex.updated_at or t_ex.created_at
|
||||
|
||||
delta = timeutils.delta_seconds(timestamp, timeutils.utcnow())
|
||||
|
||||
if delta < check_after_seconds:
|
||||
continue
|
||||
|
||||
child_executions = t_ex.executions
|
||||
|
||||
if not child_executions:
|
||||
continue
|
||||
|
||||
all_finished = all(
|
||||
[states.is_completed(c_ex.state) for c_ex in child_executions]
|
||||
)
|
||||
|
||||
if all_finished:
|
||||
# We found a task execution in RUNNING state for which all
|
||||
# child executions are finished. We need to call
|
||||
# "schedule_on_action_complete" on the task handler for any of
|
||||
# the child executions so that the task state is calculated and
|
||||
# updated properly.
|
||||
LOG.warning(
|
||||
"Found a task execution that is likely stuck in RUNNING state"
|
||||
" because all child executions are finished,"
|
||||
" will try to recover [task_execution=%s]", t_ex.id
|
||||
)
|
||||
|
||||
task_handler.schedule_on_action_complete(child_executions[-1])
|
||||
|
||||
|
||||
def pause_workflow(wf_ex, msg=None):
|
||||
# Pause subworkflows first.
|
||||
for task_ex in wf_ex.task_executions:
|
||||
|
@ -308,6 +308,7 @@ class Workflow(object):
|
||||
|
||||
self.wf_ex = wf_ex
|
||||
self.wf_ex.state_info = state_info
|
||||
|
||||
wf_trace.info(
|
||||
self.wf_ex,
|
||||
"Workflow '%s' [%s -> %s, msg=%s]"
|
||||
|
@ -49,8 +49,10 @@ def validate_cron_trigger_input(pattern, first_time, count):
|
||||
raise exc.InvalidModelException(
|
||||
'Pattern or first_execution_time must be specified.'
|
||||
)
|
||||
|
||||
if first_time:
|
||||
valid_min_time = datetime.datetime.utcnow() + datetime.timedelta(0, 60)
|
||||
|
||||
if valid_min_time > first_time:
|
||||
raise exc.InvalidModelException(
|
||||
'first_execution_time must be at least 1 minute in the future.'
|
||||
@ -59,6 +61,7 @@ def validate_cron_trigger_input(pattern, first_time, count):
|
||||
raise exc.InvalidModelException(
|
||||
'Pattern must be provided if count is superior to 1.'
|
||||
)
|
||||
|
||||
if pattern:
|
||||
try:
|
||||
croniter.croniter(pattern)
|
||||
|
85
mistral/tests/unit/engine/test_integrity_check.py
Normal file
85
mistral/tests/unit/engine/test_integrity_check.py
Normal file
@ -0,0 +1,85 @@
|
||||
# Copyright 2016 - Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
class IntegrityCheckTest(base.EngineTestCase):
|
||||
def setUp(self):
|
||||
super(IntegrityCheckTest, self).setUp()
|
||||
|
||||
self.override_config('auth_enable', False, group='pecan')
|
||||
self.override_config(
|
||||
'execution_integrity_check_delay',
|
||||
2,
|
||||
group='engine'
|
||||
)
|
||||
|
||||
def test_task_execution_integrity(self):
|
||||
# The idea of the test is that we use the no-op asynchronous action
|
||||
# so that action and task execution state is not automatically set
|
||||
# to SUCCESS after we start the workflow. We'll update the action
|
||||
# execution state to SUCCESS directly through the DB and will wait
|
||||
# till task execution integrity is checked and fixed automatically
|
||||
# by a periodic job after about 2 seconds.
|
||||
wf_text = """
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
on-success: task2
|
||||
|
||||
task2:
|
||||
action: std.async_noop
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf', '', {})
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task1_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
self.await_task_success(task1_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task2_ex = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task2',
|
||||
state=states.RUNNING
|
||||
)
|
||||
action2_ex = self._assert_single_item(
|
||||
task2_ex.executions,
|
||||
state=states.RUNNING
|
||||
)
|
||||
|
||||
db_api.update_action_execution(
|
||||
action2_ex.id,
|
||||
{'state': states.SUCCESS}
|
||||
)
|
||||
|
||||
self.await_task_success(task2_ex.id)
|
||||
self.await_workflow_success(wf_ex.id)
|
Loading…
Reference in New Issue
Block a user