Enable workflow to be resumable from errors
Allow ERROR->RUNNING as a valid transition state. Enable workflow to rerun task in failed state. For with-items task, if reset options not set, then only items not successfully completed are rerun. Change-Id: If64644c18dd9bf7d6a1f528fc13acf1ce287eff3 Partial-Implements: blueprint mistral-workflow-pause-resume-with-intervention
This commit is contained in:
parent
54bad0b1e7
commit
63cf937691
@ -1,4 +1,5 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -22,6 +23,7 @@ from mistral.engine import action_handler
|
||||
from mistral.engine import policies
|
||||
from mistral.engine import rpc
|
||||
from mistral.engine import utils as e_utils
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
@ -38,7 +40,7 @@ from mistral.workflow import with_items
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_existing_task(task_ex_id):
|
||||
def run_existing_task(task_ex_id, reset=True):
|
||||
"""This function runs existing task execution.
|
||||
|
||||
It is needed mostly by scheduler.
|
||||
@ -48,8 +50,34 @@ def run_existing_task(task_ex_id):
|
||||
wf_def = db_api.get_workflow_definition(task_ex.workflow_name)
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
|
||||
|
||||
# Throw exception if the existing task already succeeded.
|
||||
if task_ex.state == states.SUCCESS:
|
||||
raise exc.EngineException('Reruning existing task that already '
|
||||
'succeeded is not supported.')
|
||||
|
||||
# Exit if the existing task failed and reset is not instructed.
|
||||
# For a with-items task without reset, re-running the existing
|
||||
# task will re-run the failed and unstarted items.
|
||||
if (task_ex.state == states.ERROR and not reset and
|
||||
not task_spec.get_with_items()):
|
||||
return
|
||||
|
||||
# Reset state of processed task and related action executions.
|
||||
if reset:
|
||||
action_exs = task_ex.executions
|
||||
else:
|
||||
action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_ex.id,
|
||||
state=states.ERROR,
|
||||
accepted=True
|
||||
)
|
||||
|
||||
for action_ex in action_exs:
|
||||
action_ex.accepted = False
|
||||
|
||||
# Explicitly change task state to RUNNING.
|
||||
task_ex.state = states.RUNNING
|
||||
task_ex.processed = False
|
||||
|
||||
_run_existing_task(task_ex, task_spec, wf_spec)
|
||||
|
||||
@ -66,9 +94,22 @@ def _run_existing_task(task_ex, task_spec, wf_spec):
|
||||
if task_spec.get_with_items():
|
||||
with_items.prepare_runtime_context(task_ex, task_spec, input_dicts)
|
||||
|
||||
action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_ex.id,
|
||||
state=states.SUCCESS,
|
||||
accepted=True
|
||||
)
|
||||
|
||||
with_items_indices = [
|
||||
action_ex.runtime_context['with_items_index']
|
||||
for action_ex in action_exs
|
||||
if 'with_items_index' in action_ex.runtime_context
|
||||
]
|
||||
|
||||
# In some cases we can have no input, e.g. in case of 'with-items'.
|
||||
if input_dicts:
|
||||
for index, input_d in enumerate(input_dicts):
|
||||
if index not in with_items_indices:
|
||||
_run_action_or_workflow(task_ex, task_spec, input_d, index)
|
||||
else:
|
||||
_schedule_noop_action(task_ex, task_spec)
|
||||
|
30
mistral/tests/actions.py
Normal file
30
mistral/tests/actions.py
Normal file
@ -0,0 +1,30 @@
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mistral.actions import base as base_actions
|
||||
from mistral.actions import std_actions
|
||||
|
||||
|
||||
class MockEchoAction(base_actions.Action):
|
||||
mock_failure = True
|
||||
mock_which = []
|
||||
|
||||
def __init__(self, output):
|
||||
self.output = output
|
||||
|
||||
def run(self):
|
||||
if self.mock_failure and self.output in self.mock_which:
|
||||
raise Exception('Test action error for output="%s".', self.output)
|
||||
|
||||
return std_actions.EchoAction(self.output).run()
|
438
mistral/tests/unit/engine/test_direct_workflow_rerun.py
Normal file
438
mistral/tests/unit/engine/test_direct_workflow_rerun.py
Normal file
@ -0,0 +1,438 @@
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import task_handler
|
||||
from mistral.engine import workflow_handler
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workbooks as wb_service
|
||||
from mistral.tests import actions as test_actions
|
||||
from mistral.tests import base as test_base
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
|
||||
SIMPLE_WORKBOOK = """
|
||||
---
|
||||
version: '2.0'
|
||||
name: wb1
|
||||
workflows:
|
||||
wf1:
|
||||
type: direct
|
||||
tasks:
|
||||
t1:
|
||||
action: mock.echo output="Task 1"
|
||||
on-success:
|
||||
- t2
|
||||
t2:
|
||||
action: mock.echo output="Task 2"
|
||||
on-success:
|
||||
- t3
|
||||
t3:
|
||||
action: mock.echo output="Task 3"
|
||||
"""
|
||||
|
||||
WITH_ITEMS_WORKBOOK = """
|
||||
---
|
||||
version: '2.0'
|
||||
name: wb3
|
||||
workflows:
|
||||
wf1:
|
||||
type: direct
|
||||
tasks:
|
||||
t1:
|
||||
with-items: i in <% range(0, 3).list() %>
|
||||
action: mock.echo output="Task 1.<% $.i %>"
|
||||
publish:
|
||||
v1: <% $.t1 %>
|
||||
on-success:
|
||||
- t2
|
||||
t2:
|
||||
action: std.echo output="Task 2"
|
||||
"""
|
||||
|
||||
JOIN_WORKBOOK = """
|
||||
---
|
||||
version: '2.0'
|
||||
name: wb1
|
||||
workflows:
|
||||
wf1:
|
||||
type: direct
|
||||
tasks:
|
||||
t1:
|
||||
action: mock.echo output="Task 1"
|
||||
on-success:
|
||||
- t3
|
||||
t2:
|
||||
action: mock.echo output="Task 2"
|
||||
on-success:
|
||||
- t3
|
||||
t3:
|
||||
action: mock.echo output="Task 3"
|
||||
join: all
|
||||
"""
|
||||
|
||||
|
||||
class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(DirectWorkflowRerunTest, self).setUp()
|
||||
|
||||
test_base.register_action_class(
|
||||
'mock.echo',
|
||||
test_actions.MockEchoAction,
|
||||
desc='Mock of std.echo for unit testing.'
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
super(DirectWorkflowRerunTest, self).tearDown()
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = []
|
||||
|
||||
def _rerun(self, wf_ex, task_name, reset=True):
|
||||
with db_api.transaction():
|
||||
db_api.acquire_lock(db_models.WorkflowExecution, wf_ex.id)
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions,
|
||||
name=task_name)
|
||||
workflow_handler.set_execution_state(wf_ex, states.RUNNING)
|
||||
task_handler.run_existing_task(task_ex.id, reset=reset)
|
||||
|
||||
def test_rerun(self):
|
||||
wb_service.create_workbook_v2(SIMPLE_WORKBOOK)
|
||||
|
||||
# Setup mock action.
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = ['Task 2']
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf_ex = self.engine.start_workflow('wb1.wf1', {})
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_2_ex.state)
|
||||
|
||||
# Flag the mock action to not raise exception.
|
||||
test_actions.MockEchoAction.mock_failure = False
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
self._rerun(wf_ex, 't2')
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
|
||||
# Wait for the workflow to succeed.
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state)
|
||||
self.assertEqual(3, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
|
||||
|
||||
# Check action executions of task 1.
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id)
|
||||
|
||||
self.assertEqual(1, len(task_1_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
|
||||
|
||||
# Check action executions of task 2.
|
||||
self.assertEqual(states.SUCCESS, task_2_ex.state)
|
||||
|
||||
task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_2_ex.id)
|
||||
|
||||
self.assertEqual(2, len(task_2_action_exs))
|
||||
self.assertEqual(states.ERROR, task_2_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, task_2_action_exs[1].state)
|
||||
|
||||
# Check action executions of task 3.
|
||||
self.assertEqual(states.SUCCESS, task_3_ex.state)
|
||||
|
||||
task_3_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_3_ex.id)
|
||||
|
||||
self.assertEqual(1, len(task_3_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
|
||||
|
||||
def test_rerun_from_prev_step(self):
|
||||
wb_service.create_workbook_v2(SIMPLE_WORKBOOK)
|
||||
|
||||
# Setup mock action.
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = ['Task 2']
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf_ex = self.engine.start_workflow('wb1.wf1', {})
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_2_ex.state)
|
||||
|
||||
# Flag the mock action to not raise exception.
|
||||
test_actions.MockEchoAction.mock_failure = False
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
e = self.assertRaises(exc.EngineException, self._rerun, wf_ex, 't1')
|
||||
self.assertIn('not supported', str(e))
|
||||
|
||||
def test_rerun_with_items(self):
|
||||
wb_service.create_workbook_v2(WITH_ITEMS_WORKBOOK)
|
||||
|
||||
# Setup mock action.
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = ['Task 1.0', 'Task 1.2']
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf_ex = self.engine.start_workflow('wb3.wf1', {})
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertEqual(1, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
|
||||
self.assertEqual(states.ERROR, task_1_ex.state)
|
||||
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id)
|
||||
|
||||
self.assertEqual(3, len(task_1_action_exs))
|
||||
|
||||
# Flag the mock action to not raise exception.
|
||||
test_actions.MockEchoAction.mock_failure = False
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
self._rerun(wf_ex, 't1', reset=False)
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state)
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
|
||||
# Check action executions of task 1.
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id)
|
||||
|
||||
# The single action execution that succeeded should not re-run.
|
||||
self.assertEqual(5, len(task_1_action_exs))
|
||||
|
||||
self.assertListEqual(['Task 1.0', 'Task 1.1', 'Task 1.2'],
|
||||
task_1_ex.published.get('v1'))
|
||||
|
||||
# Check action executions of task 2.
|
||||
self.assertEqual(states.SUCCESS, task_2_ex.state)
|
||||
|
||||
task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_2_ex.id)
|
||||
|
||||
self.assertEqual(1, len(task_2_action_exs))
|
||||
|
||||
def test_rerun_on_join_task(self):
|
||||
wb_service.create_workbook_v2(JOIN_WORKBOOK)
|
||||
|
||||
# Setup mock action.
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = ['Task 3']
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf_ex = self.engine.start_workflow('wb1.wf1', {})
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertEqual(3, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.SUCCESS, task_2_ex.state)
|
||||
self.assertEqual(states.ERROR, task_3_ex.state)
|
||||
|
||||
# Flag the mock action to not raise exception.
|
||||
test_actions.MockEchoAction.mock_failure = False
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
self._rerun(wf_ex, 't3')
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
|
||||
# Wait for the workflow to succeed.
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state)
|
||||
self.assertEqual(3, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
|
||||
|
||||
# Check action executions of task 1.
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id)
|
||||
|
||||
self.assertEqual(1, len(task_1_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
|
||||
|
||||
# Check action executions of task 2.
|
||||
task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_2_ex.id)
|
||||
|
||||
self.assertEqual(1, len(task_2_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_2_action_exs[0].state)
|
||||
|
||||
# Check action executions of task 3.
|
||||
task_3_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_ex.task_executions[2].id)
|
||||
|
||||
self.assertEqual(2, len(task_3_action_exs))
|
||||
self.assertEqual(states.ERROR, task_3_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, task_3_action_exs[1].state)
|
||||
|
||||
def test_rerun_join_with_branch_errors(self):
|
||||
wb_service.create_workbook_v2(JOIN_WORKBOOK)
|
||||
|
||||
# Setup mock action.
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = ['Task 1', 'Task 2']
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf_ex = self.engine.start_workflow('wb1.wf1', {})
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
self._await(lambda: self.is_task_in_state(task_1_ex.id, states.ERROR))
|
||||
self._await(lambda: self.is_task_in_state(task_2_ex.id, states.ERROR))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
|
||||
self.assertEqual(states.ERROR, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_2_ex.state)
|
||||
|
||||
# Flag the mock action to not raise exception.
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = ['Task 2']
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
self._rerun(wf_ex, 't1')
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
|
||||
# Wait for the task to succeed.
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
self._await(lambda: self.is_task_in_state(task_1_ex.id,
|
||||
states.SUCCESS))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
self.assertEqual(3, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_2_ex.state)
|
||||
self.assertEqual(states.WAITING, task_3_ex.state)
|
||||
|
||||
# Flag the mock action to not raise exception.
|
||||
test_actions.MockEchoAction.mock_failure = False
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
self._rerun(wf_ex, 't2')
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
|
||||
# Wait for the workflow to succeed.
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state)
|
||||
self.assertEqual(3, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
|
||||
|
||||
# Check action executions of task 1.
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id)
|
||||
|
||||
self.assertEqual(2, len(task_1_action_exs))
|
||||
self.assertEqual(states.ERROR, task_1_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, task_1_action_exs[1].state)
|
||||
|
||||
# Check action executions of task 2.
|
||||
task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_2_ex.id)
|
||||
|
||||
self.assertEqual(2, len(task_2_action_exs))
|
||||
self.assertEqual(states.ERROR, task_2_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, task_2_action_exs[1].state)
|
||||
|
||||
# Check action executions of task 3.
|
||||
task_3_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_ex.task_executions[2].id)
|
||||
|
||||
self.assertEqual(1, len(task_3_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
|
175
mistral/tests/unit/engine/test_reverse_workflow_rerun.py
Normal file
175
mistral/tests/unit/engine/test_reverse_workflow_rerun.py
Normal file
@ -0,0 +1,175 @@
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import task_handler
|
||||
from mistral.engine import workflow_handler
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workbooks as wb_service
|
||||
from mistral.tests import actions as test_actions
|
||||
from mistral.tests import base as test_base
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
|
||||
SIMPLE_WORKBOOK = """
|
||||
---
|
||||
version: '2.0'
|
||||
name: wb1
|
||||
workflows:
|
||||
wf1:
|
||||
type: reverse
|
||||
tasks:
|
||||
t1:
|
||||
action: mock.echo output="Task 1"
|
||||
t2:
|
||||
action: mock.echo output="Task 2"
|
||||
requires:
|
||||
- t1
|
||||
t3:
|
||||
action: mock.echo output="Task 3"
|
||||
requires:
|
||||
- t2
|
||||
"""
|
||||
|
||||
|
||||
class ReverseWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ReverseWorkflowRerunTest, self).setUp()
|
||||
|
||||
test_base.register_action_class(
|
||||
'mock.echo',
|
||||
test_actions.MockEchoAction,
|
||||
desc='Mock of std.echo for unit testing.'
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
super(ReverseWorkflowRerunTest, self).tearDown()
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = []
|
||||
|
||||
def _rerun(self, wf_ex, task_name, reset=True):
|
||||
with db_api.transaction():
|
||||
db_api.acquire_lock(db_models.WorkflowExecution, wf_ex.id)
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions,
|
||||
name=task_name)
|
||||
workflow_handler.set_execution_state(wf_ex, states.RUNNING)
|
||||
task_handler.run_existing_task(task_ex.id, reset=reset)
|
||||
|
||||
def test_rerun(self):
|
||||
wb_service.create_workbook_v2(SIMPLE_WORKBOOK)
|
||||
|
||||
# Setup mock action.
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = ['Task 2']
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf_ex = self.engine.start_workflow('wb1.wf1', {}, task_name='t3')
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_2_ex.state)
|
||||
|
||||
# Flag the mock action to not raise exception.
|
||||
test_actions.MockEchoAction.mock_failure = False
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
self._rerun(wf_ex, 't2')
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
|
||||
# Wait for the workflow to succeed.
|
||||
self._await(lambda: self.is_execution_success(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state)
|
||||
self.assertEqual(3, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
|
||||
|
||||
# Check action executions of task 1.
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_1_ex.id)
|
||||
|
||||
self.assertEqual(1, len(task_1_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
|
||||
|
||||
# Check action executions of task 2.
|
||||
self.assertEqual(states.SUCCESS, task_2_ex.state)
|
||||
|
||||
task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_2_ex.id)
|
||||
|
||||
self.assertEqual(2, len(task_2_action_exs))
|
||||
self.assertEqual(states.ERROR, task_2_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, task_2_action_exs[1].state)
|
||||
|
||||
# Check action executions of task 3.
|
||||
self.assertEqual(states.SUCCESS, task_3_ex.state)
|
||||
|
||||
task_3_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=task_3_ex.id)
|
||||
|
||||
self.assertEqual(1, len(task_3_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
|
||||
|
||||
def test_rerun_from_prev_step(self):
|
||||
wb_service.create_workbook_v2(SIMPLE_WORKBOOK)
|
||||
|
||||
# Setup mock action.
|
||||
test_actions.MockEchoAction.mock_failure = True
|
||||
test_actions.MockEchoAction.mock_which = ['Task 2']
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf_ex = self.engine.start_workflow('wb1.wf1', {}, task_name='t3')
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.ERROR, task_2_ex.state)
|
||||
|
||||
# Flag the mock action to not raise exception.
|
||||
test_actions.MockEchoAction.mock_failure = False
|
||||
|
||||
# Resume workflow and re-run failed task.
|
||||
e = self.assertRaises(exc.EngineException, self._rerun, wf_ex, 't1')
|
||||
self.assertIn('not supported', str(e))
|
@ -66,7 +66,7 @@ class StatesModuleTest(base.BaseTest):
|
||||
|
||||
# From ERROR
|
||||
self.assertTrue(s.is_valid_transition(s.ERROR, s.ERROR))
|
||||
self.assertFalse(s.is_valid_transition(s.ERROR, s.RUNNING))
|
||||
self.assertTrue(s.is_valid_transition(s.ERROR, s.RUNNING))
|
||||
self.assertFalse(s.is_valid_transition(s.ERROR, s.PAUSED))
|
||||
self.assertFalse(s.is_valid_transition(s.ERROR, s.DELAYED))
|
||||
self.assertFalse(s.is_valid_transition(s.ERROR, s.SUCCESS))
|
||||
|
@ -33,7 +33,7 @@ _VALID_TRANSITIONS = {
|
||||
PAUSED: [RUNNING, ERROR],
|
||||
DELAYED: [RUNNING, ERROR],
|
||||
SUCCESS: [],
|
||||
ERROR: []
|
||||
ERROR: [RUNNING]
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user