Fix race conditions in policies
* Fixed race conditions in wait-after and timeout policies * Unit test Change-Id: I64d08e7645b55fabdb97cd82849685c33603614d
This commit is contained in:
parent
0cf68559f3
commit
cb2e06f583
@ -145,9 +145,10 @@ class WaitAfterPolicy(base.TaskPolicy):
|
||||
policy_context = runtime_context[context_key]
|
||||
|
||||
if policy_context.get('skip'):
|
||||
# Need to avoid terminal states.
|
||||
if not states.is_finished(task_db.state):
|
||||
# Unset state 'DELAYED'.
|
||||
task_db.state = \
|
||||
states.ERROR if raw_result.is_error() else states.SUCCESS
|
||||
task_db.state = states.RUNNING
|
||||
|
||||
return
|
||||
|
||||
|
@ -167,6 +167,9 @@ class BaseTest(base.BaseTestCase):
|
||||
|
||||
time.sleep(delay)
|
||||
|
||||
def _sleep(self, seconds):
|
||||
time.sleep(seconds)
|
||||
|
||||
|
||||
class DbTestCase(BaseTest):
|
||||
is_heavy_init_called = False
|
||||
|
@ -142,6 +142,23 @@ workflows:
|
||||
"""
|
||||
|
||||
|
||||
TIMEOUT_WB2 = """
|
||||
---
|
||||
version: '2.0'
|
||||
name: wb
|
||||
workflows:
|
||||
wf1:
|
||||
type: direct
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output="Hi!"
|
||||
policies:
|
||||
wait-after: 2
|
||||
timeout: 1
|
||||
"""
|
||||
|
||||
|
||||
class PoliciesTest(base.EngineTestCase):
|
||||
def setUp(self):
|
||||
super(PoliciesTest, self).setUp()
|
||||
@ -288,3 +305,29 @@ class PoliciesTest(base.EngineTestCase):
|
||||
|
||||
exec_db = db_api.get_execution(exec_db.id)
|
||||
self.assertEqual(states.SUCCESS, exec_db.state)
|
||||
|
||||
def test_timeout_policy_success_after_timeout(self):
|
||||
wb_service.create_workbook_v2({'definition': TIMEOUT_WB2})
|
||||
|
||||
# Start workflow.
|
||||
exec_db = self.engine.start_workflow('wb.wf1', {})
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
exec_db = db_api.get_execution(exec_db.id)
|
||||
task_db = exec_db.tasks[0]
|
||||
|
||||
self.assertEqual(states.RUNNING, task_db.state)
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_error(exec_db.id),
|
||||
)
|
||||
|
||||
# Wait until timeout exceeds.
|
||||
self._sleep(2)
|
||||
|
||||
exec_db = db_api.get_execution(exec_db.id)
|
||||
tasks_db = exec_db.tasks
|
||||
|
||||
# Make sure that engine did not create extra tasks.
|
||||
self.assertEqual(1, len(tasks_db))
|
||||
self.assertEqual(states.ERROR, tasks_db[0].state)
|
||||
|
@ -63,11 +63,13 @@ class WorkflowHandler(object):
|
||||
(before publisher). Instance of mistral.workflow.utils.TaskResult
|
||||
:return List of engine commands that needs to be performed.
|
||||
"""
|
||||
# TODO(rakhmerov): need to ignore result if task is complete.
|
||||
|
||||
# Ignore if task already completed.
|
||||
if states.is_finished(task_db.state):
|
||||
return []
|
||||
|
||||
task_db.state = \
|
||||
states.ERROR if raw_result.is_error() else states.SUCCESS
|
||||
|
||||
task_spec = self.wf_spec.get_tasks()[task_db.name]
|
||||
|
||||
task_db.output =\
|
||||
|
Loading…
Reference in New Issue
Block a user