Fixing finishing workflow in case DELAYED task state
* If we have 2 parallel tasks and one of these in the DELAYED state, workflow incorrectly finishes and DELAYED state is not taken to account. Closes-Bug: #1422621 Change-Id: I187a61117f5ad94c6f4ab33fbc750350f8adef0b
This commit is contained in:
@@ -21,6 +21,7 @@ from mistral import exceptions as exc
|
|||||||
from mistral.openstack.common import log as logging
|
from mistral.openstack.common import log as logging
|
||||||
from mistral.services import scheduler
|
from mistral.services import scheduler
|
||||||
from mistral.services import workbooks as wb_service
|
from mistral.services import workbooks as wb_service
|
||||||
|
from mistral.services import workflows as wf_service
|
||||||
from mistral.tests.unit.engine1 import base
|
from mistral.tests.unit.engine1 import base
|
||||||
from mistral.workbook import parser as spec_parser
|
from mistral.workbook import parser as spec_parser
|
||||||
|
|
||||||
@@ -535,3 +536,30 @@ class PoliciesTest(base.EngineTestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.assertIn('Invalid data type in WaitBeforePolicy', str(exception))
|
self.assertIn('Invalid data type in WaitBeforePolicy', str(exception))
|
||||||
|
|
||||||
|
def test_delayed_task_and_correct_finish_workflow(self):
|
||||||
|
wf_delayed_state = """---
|
||||||
|
version: "2.0"
|
||||||
|
wf:
|
||||||
|
type: direct
|
||||||
|
tasks:
|
||||||
|
|
||||||
|
task1:
|
||||||
|
action: std.noop
|
||||||
|
policies:
|
||||||
|
wait-before: 1
|
||||||
|
|
||||||
|
task2:
|
||||||
|
action: std.noop
|
||||||
|
"""
|
||||||
|
wf_service.create_workflows(wf_delayed_state)
|
||||||
|
|
||||||
|
# Start workflow.
|
||||||
|
wf_ex = self.engine.start_workflow('wf', {})
|
||||||
|
|
||||||
|
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||||
|
|
||||||
|
# Note: We need to reread execution to access related tasks.
|
||||||
|
wf_ex = db_api.get_execution(wf_ex.id)
|
||||||
|
|
||||||
|
self.assertEqual(2, len(wf_ex.task_executions))
|
||||||
|
|||||||
@@ -110,7 +110,7 @@ class WorkflowHandler(object):
|
|||||||
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
if not cmds and not wf_utils.find_running_tasks(self.wf_ex):
|
if not cmds and not wf_utils.find_incomplete_tasks(self.wf_ex):
|
||||||
# If there are no running tasks at this point we can conclude that
|
# If there are no running tasks at this point we can conclude that
|
||||||
# the workflow has finished.
|
# the workflow has finished.
|
||||||
if not self.is_paused_or_completed():
|
if not self.is_paused_or_completed():
|
||||||
|
|||||||
@@ -72,5 +72,10 @@ def find_successful_tasks(wf_ex):
|
|||||||
return [t for t in wf_ex.task_executions if t.state == states.SUCCESS]
|
return [t for t in wf_ex.task_executions if t.state == states.SUCCESS]
|
||||||
|
|
||||||
|
|
||||||
|
def find_incomplete_tasks(wf_ex):
|
||||||
|
return [t for t in wf_ex.task_executions
|
||||||
|
if not states.is_completed(t.state)]
|
||||||
|
|
||||||
|
|
||||||
def find_error_tasks(wf_ex):
|
def find_error_tasks(wf_ex):
|
||||||
return [t for t in wf_ex.task_executions if t.state == states.ERROR]
|
return [t for t in wf_ex.task_executions if t.state == states.ERROR]
|
||||||
|
|||||||
Reference in New Issue
Block a user