Adding tests for order of engine instructions

* Additional testing
* Fixed direct workflow handler
* Added logging decorators to engine methods
* Changed default logging level of @log_exec decorator to DEBUG

Change-Id: Ibd0008cf43bbf57407f610513b904386bdb74dde
This commit is contained in:
Renat Akhmerov 2014-09-18 15:06:45 -07:00
parent 395813e7b8
commit 321f95151c
4 changed files with 191 additions and 53 deletions

View File

@ -21,13 +21,13 @@ from mistral.engine1 import commands
from mistral.engine1 import policies
from mistral.engine1 import utils
from mistral.openstack.common import log as logging
from mistral import utils as u
from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
from mistral.workflow import workflow_handler_factory as wfh_factory
LOG = logging.getLogger(__name__)
# Submodules of mistral.engine will throw NoSuchOptError if configuration
@ -45,6 +45,7 @@ class DefaultEngine(base.Engine):
self._engine_client = engine_client
self._executor_client = executor_client
@u.log_exec(LOG)
def start_workflow(self, workflow_name, workflow_input, **params):
with db_api.transaction():
wf_db = db_api.get_workflow(workflow_name)
@ -67,7 +68,9 @@ class DefaultEngine(base.Engine):
return exec_db
@u.log_exec(LOG)
def on_task_result(self, task_id, raw_result):
with db_api.transaction():
task_db = db_api.get_task(task_id)
exec_db = db_api.get_execution(task_db.execution_id)
@ -94,6 +97,7 @@ class DefaultEngine(base.Engine):
return task_db
@u.log_exec(LOG)
def pause_workflow(self, execution_id):
with db_api.transaction():
exec_db = db_api.get_execution(execution_id)
@ -104,6 +108,7 @@ class DefaultEngine(base.Engine):
return exec_db
@u.log_exec(LOG)
def resume_workflow(self, execution_id):
with db_api.transaction():
exec_db = db_api.get_execution(execution_id)
@ -153,6 +158,7 @@ class DefaultEngine(base.Engine):
for p in policies.build_policies(task_spec.get_policies()):
p.after_task_complete(task_db, task_spec, raw_result)
@u.log_exec(LOG)
def run_task(self, task_id):
with db_api.transaction():
task_db = db_api.update_task(task_id, {'state': states.RUNNING})

View File

@ -27,7 +27,7 @@ LOG = logging.getLogger(__name__)
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK = """
WORKBOOK1 = """
---
version: '2.0'
@ -44,51 +44,19 @@ workflows:
- fail: $.my_var = 1
- succeed: $.my_var = 2
- pause: $.my_var = 3
- rollback: $.my_var = 3
- task2: $.my_var = 4 # (Never happens in this test)
task2:
action: std.echo output='2'
"""
WORKBOOK1 = """
---
version: '2.0'
workflows:
wf:
type: direct
parameters:
- my_var
on-task-complete:
- fail: $.my_var = 1
- succeed: $.my_var = 2
- pause: $.my_var = 3
- rollback: $.my_var = 3
- task2: $.my_var = 4 # (Never happens in this test)
tasks:
task1:
action: std.echo output='1'
- task2
task2:
action: std.echo output='2'
"""
class EngineInstructionsTest(base.EngineTestCase):
class SimpleEngineInstructionsTest(base.EngineTestCase):
def setUp(self):
super(EngineInstructionsTest, self).setUp()
super(SimpleEngineInstructionsTest, self).setUp()
wb_service.create_workbook_v2({
'name': 'my_wb',
'definition': WORKBOOK,
'tags': ['test']
})
wb_service.create_workbook_v2({
'name': 'my_wb1',
'definition': WORKBOOK1,
'tags': ['test']
})
@ -135,12 +103,45 @@ class EngineInstructionsTest(base.EngineTestCase):
state=states.SUCCESS
)
def test_rollback(self):
# TODO(rakhmerov): Implement.
pass
def test_fail_wf_level(self):
exec_db = self.engine.start_workflow('my_wb1.wf', {'my_var': 1})
WORKBOOK2 = """
---
version: '2.0'
workflows:
wf:
type: direct
parameters:
- my_var
on-task-complete:
- fail: $.my_var = 1
- succeed: $.my_var = 2
- pause: $.my_var = 3
- rollback: $.my_var = 3
- task2: $.my_var = 4 # (Never happens in this test)
tasks:
task1:
action: std.echo output='1'
task2:
action: std.echo output='2'
"""
class SimpleEngineWorkflowLevelInstructionsTest(base.EngineTestCase):
def setUp(self):
super(SimpleEngineWorkflowLevelInstructionsTest, self).setUp()
wb_service.create_workbook_v2({
'name': 'my_wb',
'definition': WORKBOOK2,
'tags': ['test']
})
def test_fail(self):
exec_db = self.engine.start_workflow('my_wb.wf', {'my_var': 1})
self._await(lambda: self.is_execution_error(exec_db.id))
@ -153,8 +154,8 @@ class EngineInstructionsTest(base.EngineTestCase):
state=states.SUCCESS
)
def test_succeed_wf_level(self):
exec_db = self.engine.start_workflow('my_wb1.wf', {'my_var': 2})
def test_succeed(self):
exec_db = self.engine.start_workflow('my_wb.wf', {'my_var': 2})
self._await(lambda: self.is_execution_success(exec_db.id))
@ -167,8 +168,8 @@ class EngineInstructionsTest(base.EngineTestCase):
state=states.SUCCESS
)
def test_pause_wf_level(self):
exec_db = self.engine.start_workflow('my_wb1.wf', {'my_var': 3})
def test_pause(self):
exec_db = self.engine.start_workflow('my_wb.wf', {'my_var': 3})
self._await(lambda: self.is_execution_paused(exec_db.id))
@ -181,6 +182,136 @@ class EngineInstructionsTest(base.EngineTestCase):
state=states.SUCCESS
)
def test_rollback_wf_level(self):
# TODO(rakhmerov): Implement.
pass
WORKBOOK3 = """
---
version: '2.0'
workflows:
fail_first_wf:
type: direct
tasks:
task1:
action: std.echo output='1'
on-complete:
- fail
- task2
task2:
action: std.echo output='2'
fail_second_wf:
type: direct
tasks:
task1:
action: std.echo output='1'
on-complete:
- task2
- fail
task2:
action: std.echo output='2'
succeed_first_wf:
type: direct
tasks:
task1:
action: std.echo output='1'
on-complete:
- succeed
- task2
task2:
action: std.echo output='2'
succeed_second_wf:
type: direct
tasks:
task1:
action: std.echo output='1'
on-complete:
- task2
- succeed
task2:
action: std.http url='some.not.existing.url'
"""
class OrderEngineInstructionsTest(base.EngineTestCase):
def setUp(self):
super(OrderEngineInstructionsTest, self).setUp()
wb_service.create_workbook_v2({
'name': 'my_wb',
'definition': WORKBOOK3,
'tags': ['test']
})
def test_fail_first(self):
exec_db = self.engine.start_workflow('my_wb.fail_first_wf', None)
self._await(lambda: self.is_execution_error(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self._assert_single_item(
exec_db.tasks,
name='task1',
state=states.SUCCESS
)
def test_fail_second(self):
exec_db = self.engine.start_workflow('my_wb.fail_second_wf', None)
self._await(lambda: self.is_execution_error(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
self.assertEqual(2, len(exec_db.tasks))
self._assert_single_item(
exec_db.tasks,
name='task1',
state=states.SUCCESS
)
task2_db = self._assert_single_item(exec_db.tasks, name='task2')
self._await(lambda: self.is_task_success(task2_db.id))
self._await(lambda: self.is_execution_error(exec_db.id))
def test_succeed_first(self):
exec_db = self.engine.start_workflow('my_wb.succeed_first_wf', None)
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self._assert_single_item(
exec_db.tasks,
name='task1',
state=states.SUCCESS
)
def test_succeed_second(self):
exec_db = self.engine.start_workflow('my_wb.succeed_second_wf', None)
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
self.assertEqual(2, len(exec_db.tasks))
self._assert_single_item(
exec_db.tasks,
name='task1',
state=states.SUCCESS
)
task2_db = self._assert_single_item(exec_db.tasks, name='task2')
self._await(lambda: self.is_task_error(task2_db.id))
self._await(lambda: self.is_execution_success(exec_db.id))

View File

@ -72,7 +72,7 @@ def set_thread_local(var_name, val):
gl_storage[var_name] = val
def log_exec(logger, level=logging.INFO):
def log_exec(logger, level=logging.DEBUG):
"""Decorator for logging function execution.
By default, target function execution is logged with INFO level.
"""

View File

@ -72,8 +72,8 @@ class WorkflowHandler(object):
data_flow.evaluate_task_output(task_spec, raw_result)
if task_db.state == states.ERROR and not task_spec.get_on_error():
# TODO(rakhmerov): Temporary hack, need to use policies.
self._set_execution_state(states.ERROR)
if not self.is_paused_or_finished():
self._set_execution_state(states.ERROR)
return []
@ -83,7 +83,8 @@ class WorkflowHandler(object):
# If there are no running tasks at this point we can conclude that
# the workflow has finished.
if not self._find_running_tasks():
self._set_execution_state(states.SUCCESS)
if not self.is_paused_or_finished():
self._set_execution_state(states.SUCCESS)
task_out_ctx = data_flow.evaluate_outbound_context(task_db)