diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py index 97178a83..17ae6322 100644 --- a/taskflow/examples/wrapped_exception.py +++ b/taskflow/examples/wrapped_exception.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib import logging import os import sys @@ -34,6 +33,7 @@ import taskflow.engines from taskflow import exceptions from taskflow.patterns import unordered_flow as uf from taskflow import task +from taskflow.tests import utils from taskflow.utils import misc # INTRO: In this example we create two tasks which can trigger exceptions @@ -60,20 +60,6 @@ def print_wrapped(text): print("-" * (len(text))) -@contextlib.contextmanager -def wrap_all_failures(): - """Convert any exceptions to WrappedFailure. - - When you expect several failures, it may be convenient - to wrap any exception with WrappedFailure in order to - unify error handling. - """ - try: - yield - except Exception: - raise exceptions.WrappedFailure([misc.Failure()]) - - class FirstException(Exception): """Exception that first task raises.""" @@ -110,7 +96,7 @@ def run(**store): SecondTask() ) try: - with wrap_all_failures(): + with utils.wrap_all_failures(): taskflow.engines.run(flow, store=store, engine_conf='parallel') except exceptions.WrappedFailure as ex: diff --git a/taskflow/test.py b/taskflow/test.py index fb27fd0a..fe69ffcd 100644 --- a/taskflow/test.py +++ b/taskflow/test.py @@ -20,6 +20,9 @@ from testtools import compat from testtools import matchers from testtools import testcase +from taskflow import exceptions +from taskflow.tests import utils + class GreaterThanEqual(object): """Matches if the item is geq than the matchers reference object.""" @@ -33,6 +36,24 @@ class GreaterThanEqual(object): return matchers.Mismatch("%s was not >= %s" % (other, self.source)) +class FailureRegexpMatcher(object): + """Matches if the failure was caused by the given exception and its string + matches to the given pattern. + """ + + def __init__(self, exc_class, pattern): + self.exc_class = exc_class + self.pattern = pattern + + def match(self, failure): + for cause in failure: + if cause.check(self.exc_class) is not None: + return matchers.MatchesRegex( + self.pattern).match(cause.exception_str) + return matchers.Mismatch("The `%s` wasn't caused by the `%s`" % + (failure, self.exc_class)) + + class TestCase(testcase.TestCase): """Test case base class for all taskflow unit tests.""" @@ -89,6 +110,17 @@ class TestCase(testcase.TestCase): else: current_tail = current_tail[super_index + 1:] + def assertFailuresRegexp(self, exc_class, pattern, callable_obj, *args, + **kwargs): + """Assert that the callable failed with the given exception and its + string matches to the given pattern. + """ + try: + with utils.wrap_all_failures(): + callable_obj(*args, **kwargs) + except exceptions.WrappedFailure as e: + self.assertThat(e, FailureRegexpMatcher(exc_class, pattern)) + class MockTestCase(TestCase): diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 901384f8..fc937098 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -17,6 +17,7 @@ import contextlib import networkx import testtools +import threading from concurrent import futures @@ -27,6 +28,8 @@ from taskflow.patterns import unordered_flow as uf import taskflow.engines from taskflow.engines.action_engine import engine as eng +from taskflow.engines.worker_based import engine as w_eng +from taskflow.engines.worker_based import worker as wkr from taskflow import exceptions as exc from taskflow.persistence import logbook from taskflow import states @@ -87,11 +90,11 @@ class EngineTaskTest(utils.EngineTestBase): 'fail reverted(Failure: RuntimeError: Woot!)', 'fail REVERTED', 'flow REVERTED'] - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual(self.values, expected) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) now_expected = expected + ['fail PENDING', 'flow PENDING'] + expected self.assertEqual(self.values, now_expected) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) @@ -119,7 +122,7 @@ class EngineTaskTest(utils.EngineTestBase): def test_nasty_failing_task_exception_reraised(self): flow = utils.NastyFailingTask() engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) class EngineLinearFlowTest(utils.EngineTestBase): @@ -152,7 +155,7 @@ class EngineLinearFlowTest(utils.EngineTestBase): utils.FailingTask(name='fail') ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual(engine.storage.fetch_all(), {}) def test_sequential_flow_nested_blocks(self): @@ -171,7 +174,7 @@ class EngineLinearFlowTest(utils.EngineTestBase): utils.FailingTask(name='fail') ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) def test_revert_not_run_task_is_not_reverted(self): flow = lf.Flow('revert-not-run').add( @@ -179,7 +182,7 @@ class EngineLinearFlowTest(utils.EngineTestBase): utils.NeverRunningTask(), ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual( self.values, ['fail reverted(Failure: RuntimeError: Woot!)']) @@ -193,32 +196,13 @@ class EngineLinearFlowTest(utils.EngineTestBase): ) ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual( self.values, ['task1', 'task2', 'fail reverted(Failure: RuntimeError: Woot!)', 'task2 reverted(5)', 'task1 reverted(5)']) - def test_flow_failures_are_passed_to_revert(self): - class CheckingTask(task.Task): - def execute(m_self): - return 'RESULT' - - def revert(m_self, result, flow_failures): - self.assertEqual(result, 'RESULT') - self.assertEqual(list(flow_failures.keys()), ['fail1']) - fail = flow_failures['fail1'] - self.assertIsInstance(fail, misc.Failure) - self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!') - - flow = lf.Flow('test').add( - CheckingTask(), - utils.FailingTask('fail1') - ) - engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) - class EngineParallelFlowTest(utils.EngineTestBase): @@ -252,7 +236,7 @@ class EngineParallelFlowTest(utils.EngineTestBase): utils.TaskNoRequiresNoReturns(name='task2') ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertIn('fail reverted(Failure: RuntimeError: Woot!)', self.values) @@ -269,7 +253,7 @@ class EngineParallelFlowTest(utils.EngineTestBase): utils.FailingTask() ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) def test_sequential_flow_two_tasks_with_resumption(self): flow = lf.Flow('lf-2-r').add( @@ -307,7 +291,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): ) ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) # NOTE(imelnikov): we don't know if task 3 was run, but if it was, # it should have been reverted in correct order. @@ -336,7 +320,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): ) ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) # NOTE(imelnikov): we don't know if task 3 was run, but if it was, # it should have been reverted in correct order. @@ -356,7 +340,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): ) ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertIn('fail reverted(Failure: RuntimeError: Woot!)', self.values) @@ -377,7 +361,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): ) ) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) self.assertNotIn('task2 reverted(5)', self.values) @@ -443,7 +427,7 @@ class EngineGraphFlowTest(utils.EngineTestBase): utils.SaveOrderTask(name='task1', provides='a')) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual( self.values, ['task1', 'task2', @@ -458,7 +442,7 @@ class EngineGraphFlowTest(utils.EngineTestBase): utils.SaveOrderTask(name='task1', provides='a')) engine = self._make_engine(flow) - self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) self.assertEqual(engine.storage.get_flow_state(), states.FAILURE) def test_graph_flow_with_multireturn_and_multiargs_tasks(self): @@ -498,11 +482,34 @@ class EngineGraphFlowTest(utils.EngineTestBase): self.assertIsInstance(graph, networkx.DiGraph) +class EngineCheckingTaskTest(utils.EngineTestBase): + + def test_flow_failures_are_passed_to_revert(self): + class CheckingTask(task.Task): + def execute(m_self): + return 'RESULT' + + def revert(m_self, result, flow_failures): + self.assertEqual(result, 'RESULT') + self.assertEqual(list(flow_failures.keys()), ['fail1']) + fail = flow_failures['fail1'] + self.assertIsInstance(fail, misc.Failure) + self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!') + + flow = lf.Flow('test').add( + CheckingTask(), + utils.FailingTask('fail1') + ) + engine = self._make_engine(flow) + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + + class SingleThreadedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, EngineGraphFlowTest, + EngineCheckingTaskTest, test.TestCase): def _make_engine(self, flow, flow_detail=None): return taskflow.engines.load(flow, @@ -524,6 +531,7 @@ class MultiThreadedEngineTest(EngineTaskTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, EngineGraphFlowTest, + EngineCheckingTaskTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): engine_conf = dict(engine='parallel', @@ -554,6 +562,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, EngineGraphFlowTest, + EngineCheckingTaskTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): @@ -564,3 +573,63 @@ class ParallelEngineWithEventletTest(EngineTaskTest, return taskflow.engines.load(flow, flow_detail=flow_detail, engine_conf=engine_conf, backend=self.backend) + + +class WorkerBasedEngineTest(EngineTaskTest, + EngineLinearFlowTest, + EngineParallelFlowTest, + EngineLinearAndUnorderedExceptionsTest, + EngineGraphFlowTest, + test.TestCase): + + def setUp(self): + super(WorkerBasedEngineTest, self).setUp() + self.exchange = 'test' + self.topic = 'topic' + self.transport = 'memory' + worker_conf = { + 'exchange': self.exchange, + 'topic': self.topic, + 'tasks': [ + 'taskflow.tests.utils' + ], + 'transport': self.transport + } + self.worker = wkr.Worker(**worker_conf) + self.worker_thread = threading.Thread(target=self.worker.run) + self.worker_thread.daemon = True + self.worker_thread.start() + # make sure worker is started before we can continue + self.worker.wait() + + def tearDown(self): + self.worker.stop() + self.worker_thread.join() + super(WorkerBasedEngineTest, self).tearDown() + + def _make_engine(self, flow, flow_detail=None): + engine_conf = { + 'engine': 'worker-based', + 'exchange': self.exchange, + 'workers_info': { + self.topic: [ + 'taskflow.tests.utils.SaveOrderTask', + 'taskflow.tests.utils.FailingTask', + 'taskflow.tests.utils.TaskOneReturn', + 'taskflow.tests.utils.TaskMultiReturn', + 'taskflow.tests.utils.TaskMultiArgOneReturn', + 'taskflow.tests.utils.NastyTask', + 'taskflow.tests.utils.NastyFailingTask', + 'taskflow.tests.utils.NeverRunningTask', + 'taskflow.tests.utils.TaskNoRequiresNoReturns' + ] + }, + 'transport': self.transport + } + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) + + def test_correct_load(self): + engine = self._make_engine(utils.TaskNoRequiresNoReturns) + self.assertIsInstance(engine, w_eng.WorkerBasedActionEngine) diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 08cba065..0ce356aa 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -18,14 +18,30 @@ import contextlib import six +from taskflow import exceptions from taskflow.persistence.backends import impl_memory from taskflow import task +from taskflow.utils import misc ARGS_KEY = '__args__' KWARGS_KEY = '__kwargs__' ORDER_KEY = '__order__' +@contextlib.contextmanager +def wrap_all_failures(): + """Convert any exceptions to WrappedFailure. + + When you expect several failures, it may be convenient + to wrap any exception with WrappedFailure in order to + unify error handling. + """ + try: + yield + except Exception: + raise exceptions.WrappedFailure([misc.Failure()]) + + def make_reverting_task(token, blowup=False): def do_revert(context, *args, **kwargs):