Fixing wf execution creation at initial stage

* Wf execution is not created when first
   task(s) failed.
 * Fixed some unit-tests.

Closes-Bug: #1506470

Change-Id: Id34b11efe15fcff0a9360e2966d1a85c29325c62
This commit is contained in:
Nikolay Mahotkin 2015-10-27 13:23:25 +03:00
parent 46b00683c0
commit e0c00aa516
10 changed files with 83 additions and 70 deletions

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import six
from oslo_db.sqlalchemy import models as oslo_models from oslo_db.sqlalchemy import models as oslo_models
import sqlalchemy as sa import sqlalchemy as sa
@ -100,7 +101,8 @@ class _MistralModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
def datetime_to_str(dct, attr_name): def datetime_to_str(dct, attr_name):
if dct.get(attr_name) is not None: if (dct.get(attr_name) is not None
and not isinstance(dct.get(attr_name), six.string_types)):
dct[attr_name] = dct[attr_name].isoformat(' ') dct[attr_name] = dct[attr_name].isoformat(' ')

View File

@ -55,9 +55,9 @@ class DefaultEngine(base.Engine, coordination.Service):
def start_workflow(self, wf_name, wf_input, description='', **params): def start_workflow(self, wf_name, wf_input, description='', **params):
wf_exec_id = None wf_exec_id = None
try: params = self._canonize_workflow_params(params)
params = self._canonize_workflow_params(params)
try:
with db_api.transaction(): with db_api.transaction():
wf_def = db_api.get_workflow_definition(wf_name) wf_def = db_api.get_workflow_definition(wf_name)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec) wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
@ -75,11 +75,17 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name)
# Separate workflow execution creation and dispatching command
# transactions in order to be able to return workflow execution
# with corresponding error message in state_info when error occurs
# at dispatching commands.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_exec_id)
wf_ctrl = wf_base.WorkflowController.get_controller( wf_ctrl = wf_base.WorkflowController.get_controller(
wf_ex, wf_ex,
wf_spec wf_spec
) )
self._dispatch_workflow_commands( self._dispatch_workflow_commands(
wf_ex, wf_ex,
wf_ctrl.continue_workflow() wf_ctrl.continue_workflow()
@ -91,7 +97,11 @@ class DefaultEngine(base.Engine, coordination.Service):
"Failed to start workflow '%s' id=%s: %s\n%s", "Failed to start workflow '%s' id=%s: %s\n%s",
wf_name, wf_exec_id, e, traceback.format_exc() wf_name, wf_exec_id, e, traceback.format_exc()
) )
self._fail_workflow(wf_exec_id, e) wf_ex = self._fail_workflow(wf_exec_id, e)
if wf_ex:
return wf_ex.get_clone()
raise e raise e
@u.log_exec(LOG) @u.log_exec(LOG)
@ -255,7 +265,9 @@ class DefaultEngine(base.Engine, coordination.Service):
"Failed to handle action execution result [id=%s]: %s\n%s", "Failed to handle action execution result [id=%s]: %s\n%s",
action_ex_id, e, traceback.format_exc() action_ex_id, e, traceback.format_exc()
) )
self._fail_workflow(wf_ex_id, e) self._fail_workflow(wf_ex_id, e)
raise e raise e
@u.log_exec(LOG) @u.log_exec(LOG)
@ -422,9 +434,9 @@ class DefaultEngine(base.Engine, coordination.Service):
@staticmethod @staticmethod
def _fail_workflow(wf_ex_id, err, action_ex_id=None): def _fail_workflow(wf_ex_id, err, action_ex_id=None):
"""Private helper to fail workflow on exceptions.""" """Private helper to fail workflow on exceptions."""
with db_api.transaction(): err_msg = str(err)
err_msg = str(err)
with db_api.transaction():
wf_ex = db_api.load_workflow_execution(wf_ex_id) wf_ex = db_api.load_workflow_execution(wf_ex_id)
if wf_ex is None: if wf_ex is None:
@ -447,6 +459,8 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_utils.Result(error=err_msg) wf_utils.Result(error=err_msg)
) )
return wf_ex
@staticmethod @staticmethod
def _canonize_workflow_params(params): def _canonize_workflow_params(params):
# Resolve environment parameter. # Resolve environment parameter.

View File

@ -538,11 +538,17 @@ class ExecutionTestsV2(base.TestCase):
@test.attr(type='negative') @test.attr(type='negative')
def test_create_execution_for_reverse_wf_invalid_start_task(self): def test_create_execution_for_reverse_wf_invalid_start_task(self):
self.assertRaises(exceptions.BadRequest, _, wf_ex = self.client.create_execution(
self.client.create_execution, self.reverse_wf['name'],
self.reverse_wf['name'], {
{self.reverse_wf['input']: "Bye"}, self.reverse_wf['input']: "Bye"},
{"task_name": "nonexist"}) {
"task_name": "nonexist"
}
)
self.assertEqual("ERROR", wf_ex['state'])
self.assertIn("Invalid task name", wf_ex['state_info'])
@test.attr(type='negative') @test.attr(type='negative')
def test_create_execution_forgot_input_params(self): def test_create_execution_forgot_input_params(self):

View File

@ -16,9 +16,9 @@ from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import workbooks as wb_service from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine import base from mistral.tests.unit.engine import base
from mistral.workflow import states
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -133,9 +133,10 @@ class AdhocActionsTest(base.EngineTestCase):
) )
def test_run_adhoc_action_without_sufficient_input_value(self): def test_run_adhoc_action_without_sufficient_input_value(self):
self.assertRaises( wf_ex = self.engine.start_workflow(
exc.InputException,
self.engine.start_workflow,
'my_wb.wf3', 'my_wb.wf3',
{'str1': 'a', 'str2': 'b'} {'str1': 'a', 'str2': 'b'}
) )
self.assertIn("Invalid input", wf_ex.state_info)
self.assertEqual(states.ERROR, wf_ex.state)

View File

@ -13,12 +13,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import default_engine as de
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.services import workflows as wf_service from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base from mistral.tests.unit.engine import base
@ -165,11 +163,10 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
action: std.echo wrong_input="Ha-ha" action: std.echo wrong_input="Ha-ha"
""" """
self.assertRaises( wf_ex = self._run_workflow(wf_text)
exc.InputException,
self._run_workflow, self.assertIn("Invalid input", wf_ex.state_info)
wf_text self.assertEqual(states.ERROR, wf_ex.state)
)
def test_wrong_action(self): def test_wrong_action(self):
wf_text = """ wf_text = """
@ -210,20 +207,13 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text) wf_service.create_workflows(wf_text)
with mock.patch.object(de.DefaultEngine, '_fail_workflow') as mock_fw: wf_ex = self.engine.start_workflow('wf', None)
self.assertRaises(
exc.InvalidActionException,
self.engine.start_workflow, 'wf', None)
self.assertEqual(1, mock_fw.call_count) self.assertIn(
"Failed to find action [action_name=wrong.task]",
self.assertTrue( wf_ex.state_info
issubclass( )
type(mock_fw.call_args[0][1]), self.assertEqual(states.ERROR, wf_ex.state)
exc.InvalidActionException
),
"Called with a right exception"
)
def test_next_task_with_input_yaql_error(self): def test_next_task_with_input_yaql_error(self):
wf_text = """ wf_text = """
@ -348,20 +338,13 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text) wf_service.create_workflows(wf_text)
with mock.patch.object(de.DefaultEngine, '_fail_workflow') as mock_fw: wf_ex = self.engine.start_workflow('wf', None)
self.assertRaises(
exc.YaqlEvaluationException,
self.engine.start_workflow, 'wf', None
)
self.assertEqual(1, mock_fw.call_count) self.assertIn(
self.assertTrue( "Can not evaluate YAQL expression: wrong(yaql)",
issubclass( wf_ex.state_info
type(mock_fw.call_args[0][1]), )
exc.YaqlEvaluationException self.assertEqual(states.ERROR, wf_ex.state)
),
"Called with a right exception"
)
def test_mismatched_yaql_in_first_task(self): def test_mismatched_yaql_in_first_task(self):
wf_text = """ wf_text = """
@ -377,12 +360,10 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text) wf_service.create_workflows(wf_text)
exception = self.assertRaises( wf_ex = self.engine.start_workflow('wf', {'var': 2})
exc.YaqlEvaluationException,
self.engine.start_workflow, 'wf', {'var': 2}
)
self.assertIn("Can not evaluate YAQL expression", exception.message) self.assertIn("Can not evaluate YAQL expression", wf_ex.state_info)
self.assertEqual(states.ERROR, wf_ex.state)
def test_one_line_syntax_in_on_clauses(self): def test_one_line_syntax_in_on_clauses(self):
wf_text = """ wf_text = """

View File

@ -458,6 +458,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
self._await(lambda: self.is_task_in_state(task_1_ex.id, states.ERROR)) self._await(lambda: self.is_task_in_state(task_1_ex.id, states.ERROR))
self._await(lambda: self.is_task_in_state(task_2_ex.id, states.ERROR)) self._await(lambda: self.is_task_in_state(task_2_ex.id, states.ERROR))
self._await(lambda: self.is_execution_error(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state) self.assertEqual(states.ERROR, wf_ex.state)

View File

@ -22,6 +22,7 @@ from mistral import exceptions as exc
from mistral.services import workflows as wf_service from mistral.services import workflows as wf_service
from mistral.tests import base as test_base from mistral.tests import base as test_base
from mistral.tests.unit.engine import base from mistral.tests.unit.engine import base
from mistral.workflow import states
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -129,14 +130,19 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
} }
) )
@expect_size_limit_exception('input')
def test_action_input_limit(self): def test_action_input_limit(self):
new_wf = generate_workflow(['__ACTION_INPUT__']) new_wf = generate_workflow(['__ACTION_INPUT__'])
wf_service.create_workflows(new_wf) wf_service.create_workflows(new_wf)
# Start workflow. # Start workflow.
self.engine.start_workflow('wf', {}) wf_ex = self.engine.start_workflow('wf', {})
self.assertIn(
"Size of 'input' is 1KB which exceeds the limit of 0KB",
wf_ex.state_info
)
self.assertEqual(states.ERROR, wf_ex.state)
def test_action_output_limit(self): def test_action_output_limit(self):
wf_service.create_workflows(WF) wf_service.create_workflows(WF)
@ -151,9 +157,11 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks. # Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual("Size of 'output' is 1KB which exceeds " self.assertIn(
"the limit of 0KB", "Size of 'output' is 1KB which exceeds the limit of 0KB",
wf_ex.state_info) wf_ex.state_info
)
self.assertEqual(states.ERROR, wf_ex.state)
def test_task_published_limit(self): def test_task_published_limit(self):
new_wf = generate_workflow(['__TASK_PUBLISHED__']) new_wf = generate_workflow(['__TASK_PUBLISHED__'])

View File

@ -888,13 +888,13 @@ class PoliciesTest(base.EngineTestCase):
wb_service.create_workbook_v2(wb) wb_service.create_workbook_v2(wb)
# Start workflow. # Start workflow.
exception = self.assertRaises( wf_ex = self.engine.start_workflow('wb.wf1', {'wait_before': '1'})
exc.InvalidModelException,
self.engine.start_workflow,
'wb.wf1', {'wait_before': '1'}
)
self.assertIn('Invalid data type in WaitBeforePolicy', str(exception)) self.assertIn(
'Invalid data type in WaitBeforePolicy',
wf_ex.state_info
)
self.assertEqual(states.ERROR, wf_ex.state)
def test_delayed_task_and_correct_finish_workflow(self): def test_delayed_task_and_correct_finish_workflow(self):
wf_delayed_state = """--- wf_delayed_state = """---

View File

@ -656,17 +656,16 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_service.create_workflows(workflow_with_concurrency_yaql) wf_service.create_workflows(workflow_with_concurrency_yaql)
# Start workflow. # Start workflow.
exception = self.assertRaises( wf_ex = self.engine.start_workflow(
exc.InvalidModelException,
self.engine.start_workflow,
'concurrency_test', 'concurrency_test',
{'concurrency': '2'} {'concurrency': '2'}
) )
self.assertIn( self.assertIn(
"Invalid data type in ConcurrencyPolicy", "Invalid data type in ConcurrencyPolicy",
exception.message wf_ex.state_info
) )
self.assertEqual(states.ERROR, wf_ex.state)
def test_with_items_concurrency_2(self): def test_with_items_concurrency_2(self):
workflow_with_concurrency_2 = """--- workflow_with_concurrency_2 = """---

View File

@ -66,7 +66,7 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {}) wf_ex = self.engine.start_workflow('wf', {})
self._await(lambda: self.is_execution_success(wf_ex.id), timeout=5) self._await(lambda: self.is_execution_success(wf_ex.id))
# Reread execution to access related tasks. # Reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)