Merge "Cleanup obvious issues in 'with-items' tests"

This commit is contained in:
Jenkins 2016-12-07 11:26:25 +00:00 committed by Gerrit Code Review
commit 6a7c195cf7
2 changed files with 112 additions and 92 deletions

View File

@ -425,7 +425,7 @@ class WithItemsTask(RegularTask):
assert self.task_ex
# TODO(rakhmerov): Here we can define more informative messages
# cases when action is successful and when it's not. For example,
# in cases when action is successful and when it's not. For example,
# in state_info we can specify the cause action.
# The use of action_ex.output.get('result') for state_info is not
# accurate because there could be action executions that had

View File

@ -167,20 +167,21 @@ class RandomSleepEchoAction(action_base.Action):
class WithItemsEngineTest(base.EngineTestCase):
def assert_capacity(self, capacity, task_ex):
def _assert_capacity(self, capacity, task_ex):
self.assertEqual(
capacity,
task_ex.runtime_context['with_items_context']['capacity']
)
@staticmethod
def get_incomplete_action_ex(task_ex):
return [ex for ex in task_ex.executions if not ex.accepted][0]
def _get_incomplete_action(task_ex):
return [e for e in task_ex.executions if not e.accepted][0]
@staticmethod
def get_running_action_exs_number(task_ex):
return len([ex for ex in task_ex.executions
if ex.state == states.RUNNING])
def _get_running_actions_count(task_ex):
return len(
[e for e in task_ex.executions if e.state == states.RUNNING]
)
def test_with_items_simple(self):
wb_service.create_workbook_v2(WB)
@ -627,50 +628,53 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
task_ex = db_api.get_task_execution(task_ex.id)
# Also initialize a lazy collections.
task_ex = wf_ex.task_executions[0]
self.assert_capacity(0, task_ex)
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
self._assert_capacity(0, task_ex)
self.assertEqual(1, self._get_running_actions_count(task_ex))
# 1st iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("John")
)
task_ex = db_api.get_task_execution(task_ex.id)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
self._assert_capacity(0, task_ex)
self.assertEqual(1, self._get_running_actions_count(task_ex))
# 2nd iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("Ivan")
)
task_ex = db_api.get_task_execution(task_ex.id)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
self._assert_capacity(0, task_ex)
self.assertEqual(1, self._get_running_actions_count(task_ex))
# 3rd iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("Mistral")
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(1, task_ex)
self._assert_capacity(1, task_ex)
self.await_workflow_success(wf_ex.id)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
# the check does not depend on order of items.
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
@ -684,12 +688,13 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_ex.state)
@testtools.skip('Restore concurrency support.')
def test_with_items_concurrency_yaql(self):
wf_with_concurrency_yaql = """---
# TODO(rakhmerov): This test passes even with broken 'concurrency'.
# The idea of the test is not fully clear.
wf_text = """---
version: "2.0"
concurrency_test:
wf:
type: direct
input:
@ -703,41 +708,35 @@ class WithItemsEngineTest(base.EngineTestCase):
concurrency: <% $.concurrency %>
"""
wf_service.create_workflows(wf_with_concurrency_yaql)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow(
'concurrency_test',
{'concurrency': 2}
)
wf_ex = self.engine.start_workflow('wf', {'concurrency': 2})
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.SUCCESS, task_ex.state)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.SUCCESS, task_ex.state)
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
# Since we know that we can receive results in random order,
# the check does not depend on order of items.
self.assertIn('John', result)
self.assertIn('Ivan', result)
self.assertIn('Mistral', result)
@testtools.skip('Restore concurrency support.')
def test_with_items_concurrency_yaql_wrong_type(self):
wf_with_concurrency_yaql = """---
version: "2.0"
concurrency_test:
wf:
type: direct
input:
@ -754,10 +753,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_with_concurrency_yaql)
# Start workflow.
wf_ex = self.engine.start_workflow(
'concurrency_test',
{'concurrency': '2'}
)
wf_ex = self.engine.start_workflow('wf', {'concurrency': '2'})
self.assertIn(
'Invalid data type in ConcurrencyPolicy',
@ -788,53 +784,64 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(2, self.get_running_action_exs_number(task_ex))
task_ex = wf_ex.task_executions[0]
running_cnt = self._get_running_actions_count(task_ex)
self._assert_capacity(0, task_ex)
self.assertEqual(2, running_cnt)
# 1st iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("John")
)
task_ex = db_api.get_task_execution(task_ex.id)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(2, self.get_running_action_exs_number(task_ex))
running_cnt = self._get_running_actions_count(task_ex)
self._assert_capacity(0, task_ex)
self.assertEqual(2, running_cnt)
# 2nd iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("Ivan")
)
task_ex = db_api.get_task_execution(task_ex.id)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(2, self.get_running_action_exs_number(task_ex))
running_cnt = self._get_running_actions_count(task_ex)
self._assert_capacity(0, task_ex)
self.assertEqual(2, running_cnt)
# 3rd iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("Mistral")
)
task_ex = db_api.get_task_execution(task_ex.id)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(1, task_ex)
self._assert_capacity(1, task_ex)
# 4th iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("Hello")
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(2, task_ex)
self._assert_capacity(2, task_ex)
self.await_workflow_success(wf_ex.id)
@ -880,9 +887,10 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
task_exs = wf_ex.task_executions
self.assertEqual(2, len(task_exs))
@ -919,41 +927,44 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(3, self.get_running_action_exs_number(task_ex))
task_ex = wf_ex.task_executions[0]
running_cnt = self._get_running_actions_count(task_ex)
self._assert_capacity(0, task_ex)
self.assertEqual(3, running_cnt)
# 1st iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("John")
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(1, task_ex)
self._assert_capacity(1, task_ex)
# 2nd iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("Ivan")
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(2, task_ex)
self._assert_capacity(2, task_ex)
# 3rd iteration complete.
self.engine.on_action_complete(
self.get_incomplete_action_ex(task_ex).id,
self._get_incomplete_action(task_ex).id,
wf_utils.Result("Mistral")
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(3, task_ex)
self._assert_capacity(3, task_ex)
self.await_workflow_success(wf_ex.id)
@ -972,8 +983,9 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertIn('Ivan', result)
self.assertIn('Mistral', result)
@testtools.skip('Restore concurrency support.')
def test_with_items_concurrency_gt_list_length(self):
# TODO(rakhmerov): This test passes even with disabled 'concurrency'
# support. Make sure it's valid.
wf_definition = """---
version: "2.0"
@ -997,16 +1009,16 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(
task_execs,
name='task1',
state=states.SUCCESS
)
result = data_flow.get_task_execution_result(task_ex)
@ -1085,17 +1097,22 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
task1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(12, len(task1_ex.executions))
self._assert_multiple_items(task1_ex.executions, 4, accepted=True)
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
task1_execs = task1_ex.executions
self.assertEqual(12, len(task1_execs))
self._assert_multiple_items(task1_execs, 4, accepted=True)
def test_with_items_env(self):
wf_text = """---
@ -1240,10 +1257,13 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)