Merge "Use listener instead of AutoSuspendTask in test_suspend_flow"

This commit is contained in:
Jenkins
2014-02-06 03:11:07 +00:00
committed by Gerrit Code Review

View File

@@ -20,6 +20,7 @@ import testtools
import taskflow.engines import taskflow.engines
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow.listeners import base as lbase
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow import states from taskflow import states
from taskflow import test from taskflow import test
@@ -27,32 +28,26 @@ from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu from taskflow.utils import eventlet_utils as eu
class AutoSuspendingTask(utils.SaveOrderTask): class SuspendingListener(lbase.ListenerBase):
def execute(self, engine): def __init__(self, engine, task_name, task_state):
result = super(AutoSuspendingTask, self).execute() super(SuspendingListener, self).__init__(
engine.suspend() engine, task_listen_for=(task_state,))
return result self._task_name = task_name
def _task_receiver(self, state, details):
class AutoSuspendingTaskOnRevert(utils.SaveOrderTask): if details['task_name'] == self._task_name:
self._engine.suspend()
def execute(self, engine):
return super(AutoSuspendingTaskOnRevert, self).execute()
def revert(self, engine, result, flow_failures):
super(AutoSuspendingTaskOnRevert, self).revert(
result=result, flow_failures=flow_failures)
engine.suspend()
class SuspendFlowTest(utils.EngineTestBase): class SuspendFlowTest(utils.EngineTestBase):
def test_suspend_one_task(self): def test_suspend_one_task(self):
flow = AutoSuspendingTask('a') flow = utils.SaveOrderTask('a')
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.storage.inject({'engine': engine}) with SuspendingListener(engine, task_name='b',
engine.run() task_state=states.SUCCESS):
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS) self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEqual(self.values, ['a']) self.assertEqual(self.values, ['a'])
engine.run() engine.run()
@@ -62,12 +57,13 @@ class SuspendFlowTest(utils.EngineTestBase):
def test_suspend_linear_flow(self): def test_suspend_linear_flow(self):
flow = lf.Flow('linear').add( flow = lf.Flow('linear').add(
utils.SaveOrderTask('a'), utils.SaveOrderTask('a'),
AutoSuspendingTask('b'), utils.SaveOrderTask('b'),
utils.SaveOrderTask('c') utils.SaveOrderTask('c')
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.storage.inject({'engine': engine}) with SuspendingListener(engine, task_name='b',
engine.run() task_state=states.SUCCESS):
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEqual(self.values, ['a', 'b']) self.assertEqual(self.values, ['a', 'b'])
engine.run() engine.run()
@@ -77,12 +73,13 @@ class SuspendFlowTest(utils.EngineTestBase):
def test_suspend_linear_flow_on_revert(self): def test_suspend_linear_flow_on_revert(self):
flow = lf.Flow('linear').add( flow = lf.Flow('linear').add(
utils.SaveOrderTask('a'), utils.SaveOrderTask('a'),
AutoSuspendingTaskOnRevert('b'), utils.SaveOrderTask('b'),
utils.FailingTask('c') utils.FailingTask('c')
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.storage.inject({'engine': engine}) with SuspendingListener(engine, task_name='b',
engine.run() task_state=states.REVERTED):
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEqual( self.assertEqual(
self.values, self.values,
@@ -102,12 +99,14 @@ class SuspendFlowTest(utils.EngineTestBase):
def test_suspend_and_resume_linear_flow_on_revert(self): def test_suspend_and_resume_linear_flow_on_revert(self):
flow = lf.Flow('linear').add( flow = lf.Flow('linear').add(
utils.SaveOrderTask('a'), utils.SaveOrderTask('a'),
AutoSuspendingTaskOnRevert('b'), utils.SaveOrderTask('b'),
utils.FailingTask('c') utils.FailingTask('c')
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run() with SuspendingListener(engine, task_name='b',
task_state=states.REVERTED):
engine.run()
# pretend we are resuming # pretend we are resuming
engine2 = self._make_engine(flow, engine.storage._flowdetail) engine2 = self._make_engine(flow, engine.storage._flowdetail)
@@ -124,41 +123,45 @@ class SuspendFlowTest(utils.EngineTestBase):
def test_suspend_and_revert_even_if_task_is_gone(self): def test_suspend_and_revert_even_if_task_is_gone(self):
flow = lf.Flow('linear').add( flow = lf.Flow('linear').add(
utils.SaveOrderTask('a'), utils.SaveOrderTask('a'),
AutoSuspendingTaskOnRevert('b'), utils.SaveOrderTask('b'),
utils.FailingTask('c') utils.FailingTask('c')
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run() with SuspendingListener(engine, task_name='b',
task_state=states.REVERTED):
engine.run()
expected_values = ['a', 'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)']
self.assertEqual(self.values, expected_values)
# pretend we are resuming, but task 'c' gone when flow got updated # pretend we are resuming, but task 'c' gone when flow got updated
flow2 = lf.Flow('linear').add( flow2 = lf.Flow('linear').add(
utils.SaveOrderTask('a'), utils.SaveOrderTask('a'),
AutoSuspendingTaskOnRevert('b') utils.SaveOrderTask('b')
) )
engine2 = self._make_engine(flow2, engine.storage._flowdetail) engine2 = self._make_engine(flow2, engine.storage._flowdetail)
self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
self.assertEqual( expected_values.append('a reverted(5)')
self.values, self.assertEqual(self.values, expected_values)
['a',
'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)',
'a reverted(5)'])
def test_storage_is_rechecked(self): def test_storage_is_rechecked(self):
flow = lf.Flow('linear').add( flow = lf.Flow('linear').add(
AutoSuspendingTask('b'), utils.SaveOrderTask('b', requires=['foo']),
utils.SaveOrderTask(name='c') utils.SaveOrderTask('c')
) )
engine = self._make_engine(flow) engine = self._make_engine(flow)
engine.storage.inject({'engine': engine, 'boo': True}) engine.storage.inject({'foo': 'bar'})
engine.run() with SuspendingListener(engine, task_name='b',
task_state=states.SUCCESS):
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
# uninject engine # uninject everything:
engine.storage.save(engine.storage.injector_name, engine.storage.save(engine.storage.injector_name,
None, states.FAILURE) {}, states.SUCCESS)
self.assertRaises(exc.MissingDependencies, engine.run) self.assertRaises(exc.MissingDependencies, engine.run)