Merge "Handle action inputs properly to prevent tasks stuck in RUNNING state"
This commit is contained in:
commit
fbbb9790c7
|
@ -323,11 +323,16 @@ class PythonAction(Action):
|
|||
return self._prepare_output(result)
|
||||
|
||||
def is_sync(self, input_dict):
|
||||
prepared_input_dict = self._prepare_input(input_dict)
|
||||
try:
|
||||
prepared_input_dict = self._prepare_input(input_dict)
|
||||
|
||||
a = a_m.get_action_class(self.action_def.name)(**prepared_input_dict)
|
||||
a = a_m.get_action_class(self.action_def.name)(
|
||||
**prepared_input_dict
|
||||
)
|
||||
|
||||
return a.is_sync()
|
||||
return a.is_sync()
|
||||
except BaseException as e:
|
||||
raise exc.InputException(str(e))
|
||||
|
||||
def validate_input(self, input_dict):
|
||||
# NOTE(kong): Don't validate action input if action initialization
|
||||
|
|
|
@ -18,9 +18,11 @@ import testtools
|
|||
from mistral.db.v2 import api as db_api
|
||||
from mistral.lang.v2 import tasks as tasks_lang
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit import base as test_base
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral import utils
|
||||
from mistral.workflow import states
|
||||
from mistral_lib import actions as actions_base
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
|
@ -28,6 +30,24 @@ from mistral.workflow import states
|
|||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
|
||||
class ActionWithExceptionInInit(actions_base.Action):
|
||||
def __init__(self, aaa):
|
||||
super(ActionWithExceptionInInit, self).__init__()
|
||||
|
||||
if aaa != "bbb":
|
||||
raise Exception("Aaa doesn't equal bbb")
|
||||
|
||||
self.aaa = aaa
|
||||
|
||||
def run(self, context):
|
||||
return actions_base.Result(
|
||||
data=self.aaa
|
||||
)
|
||||
|
||||
def test(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class JoinEngineTest(base.EngineTestCase):
|
||||
def test_full_join_simple(self):
|
||||
wf_text = """---
|
||||
|
@ -1348,3 +1368,43 @@ class JoinEngineTest(base.EngineTestCase):
|
|||
name='join_task',
|
||||
state=states.ERROR
|
||||
)
|
||||
|
||||
def test_join_task_with_input_error(self):
|
||||
test_base.register_action_class(
|
||||
'my_action',
|
||||
ActionWithExceptionInInit
|
||||
)
|
||||
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
type: direct
|
||||
|
||||
tasks:
|
||||
join_task:
|
||||
action: my_action aaa="aaa"
|
||||
join: all
|
||||
|
||||
task1:
|
||||
on-success: join_task
|
||||
|
||||
task2:
|
||||
on-success: join_task
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf')
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
t_execs = wf_ex.task_executions
|
||||
|
||||
self._assert_single_item(t_execs, name='task1', state=states.SUCCESS)
|
||||
self._assert_single_item(t_execs, name='task2', state=states.SUCCESS)
|
||||
self._assert_single_item(t_execs, name='join_task',
|
||||
state=states.ERROR)
|
||||
|
|
Loading…
Reference in New Issue