Files
deb-python-taskflow/taskflow/tests/unit/test_suspend_flow.py
Ivan A. Melnikov 958308b081 Use test utils in test_suspend_flow
Use tasks from taskflow.tests.utils in test_suspend_flow instead
of having an outdated copy of them.

Change-Id: I53fad3ffd132830f806836af663ebe3804d47131
2014-02-05 15:47:15 +04:00

196 lines
7.1 KiB
Python

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
import taskflow.engines
from taskflow import exceptions as exc
from taskflow.patterns import linear_flow as lf
from taskflow import states
from taskflow import test
from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu
class AutoSuspendingTask(utils.SaveOrderTask):
def execute(self, engine):
result = super(AutoSuspendingTask, self).execute()
engine.suspend()
return result
class AutoSuspendingTaskOnRevert(utils.SaveOrderTask):
def execute(self, engine):
return super(AutoSuspendingTaskOnRevert, self).execute()
def revert(self, engine, result, flow_failures):
super(AutoSuspendingTaskOnRevert, self).revert(
result=result, flow_failures=flow_failures)
engine.suspend()
class SuspendFlowTest(utils.EngineTestBase):
def test_suspend_one_task(self):
flow = AutoSuspendingTask('a')
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEqual(self.values, ['a'])
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEqual(self.values, ['a'])
def test_suspend_linear_flow(self):
flow = lf.Flow('linear').add(
utils.SaveOrderTask('a'),
AutoSuspendingTask('b'),
utils.SaveOrderTask('c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEqual(self.values, ['a', 'b'])
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
self.assertEqual(self.values, ['a', 'b', 'c'])
def test_suspend_linear_flow_on_revert(self):
flow = lf.Flow('linear').add(
utils.SaveOrderTask('a'),
AutoSuspendingTaskOnRevert('b'),
utils.FailingTask('c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
self.assertEqual(
self.values,
['a', 'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)'])
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
self.assertEqual(
self.values,
['a',
'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)',
'a reverted(5)'])
def test_suspend_and_resume_linear_flow_on_revert(self):
flow = lf.Flow('linear').add(
utils.SaveOrderTask('a'),
AutoSuspendingTaskOnRevert('b'),
utils.FailingTask('c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
# pretend we are resuming
engine2 = self._make_engine(flow, engine.storage._flowdetail)
self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
self.assertEqual(
self.values,
['a',
'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)',
'a reverted(5)'])
def test_suspend_and_revert_even_if_task_is_gone(self):
flow = lf.Flow('linear').add(
utils.SaveOrderTask('a'),
AutoSuspendingTaskOnRevert('b'),
utils.FailingTask('c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine})
engine.run()
# pretend we are resuming, but task 'c' gone when flow got updated
flow2 = lf.Flow('linear').add(
utils.SaveOrderTask('a'),
AutoSuspendingTaskOnRevert('b')
)
engine2 = self._make_engine(flow2, engine.storage._flowdetail)
self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
self.assertEqual(
self.values,
['a',
'b',
'c reverted(Failure: RuntimeError: Woot!)',
'b reverted(5)',
'a reverted(5)'])
def test_storage_is_rechecked(self):
flow = lf.Flow('linear').add(
AutoSuspendingTask('b'),
utils.SaveOrderTask(name='c')
)
engine = self._make_engine(flow)
engine.storage.inject({'engine': engine, 'boo': True})
engine.run()
self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
# uninject engine
engine.storage.save(engine.storage.injector_name,
None, states.FAILURE)
self.assertRaises(exc.MissingDependencies, engine.run)
class SingleThreadedEngineTest(SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine_conf='serial',
backend=self.backend)
class MultiThreadedEngineTest(SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = eu.GreenExecutor()
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)