Run action-engine tests with worker-based engine
Change-Id: I00398fd2387b261c577f5d3eba9e2aebae3ba165
This commit is contained in:
@@ -16,7 +16,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@@ -36,6 +35,7 @@ import taskflow.engines
|
||||
from taskflow import exceptions
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
from taskflow import task
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
# INTRO: In this example we create two tasks which can trigger exceptions
|
||||
@@ -62,20 +62,6 @@ def print_wrapped(text):
|
||||
print("-" * (len(text)))
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def wrap_all_failures():
|
||||
"""Convert any exceptions to WrappedFailure.
|
||||
|
||||
When you expect several failures, it may be convenient
|
||||
to wrap any exception with WrappedFailure in order to
|
||||
unify error handling.
|
||||
"""
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
raise exceptions.WrappedFailure([misc.Failure()])
|
||||
|
||||
|
||||
class FirstException(Exception):
|
||||
"""Exception that first task raises."""
|
||||
|
||||
@@ -112,7 +98,7 @@ def run(**store):
|
||||
SecondTask()
|
||||
)
|
||||
try:
|
||||
with wrap_all_failures():
|
||||
with utils.wrap_all_failures():
|
||||
taskflow.engines.run(flow, store=store,
|
||||
engine_conf='parallel')
|
||||
except exceptions.WrappedFailure as ex:
|
||||
|
||||
@@ -22,6 +22,9 @@ from testtools import compat
|
||||
from testtools import matchers
|
||||
from testtools import testcase
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow.tests import utils
|
||||
|
||||
|
||||
class GreaterThanEqual(object):
|
||||
"""Matches if the item is geq than the matchers reference object."""
|
||||
@@ -35,6 +38,24 @@ class GreaterThanEqual(object):
|
||||
return matchers.Mismatch("%s was not >= %s" % (other, self.source))
|
||||
|
||||
|
||||
class FailureRegexpMatcher(object):
|
||||
"""Matches if the failure was caused by the given exception and its string
|
||||
matches to the given pattern.
|
||||
"""
|
||||
|
||||
def __init__(self, exc_class, pattern):
|
||||
self.exc_class = exc_class
|
||||
self.pattern = pattern
|
||||
|
||||
def match(self, failure):
|
||||
for cause in failure:
|
||||
if cause.check(self.exc_class) is not None:
|
||||
return matchers.MatchesRegex(
|
||||
self.pattern).match(cause.exception_str)
|
||||
return matchers.Mismatch("The `%s` wasn't caused by the `%s`" %
|
||||
(failure, self.exc_class))
|
||||
|
||||
|
||||
class TestCase(testcase.TestCase):
|
||||
"""Test case base class for all taskflow unit tests."""
|
||||
|
||||
@@ -91,6 +112,17 @@ class TestCase(testcase.TestCase):
|
||||
else:
|
||||
current_tail = current_tail[super_index + 1:]
|
||||
|
||||
def assertFailuresRegexp(self, exc_class, pattern, callable_obj, *args,
|
||||
**kwargs):
|
||||
"""Assert that the callable failed with the given exception and its
|
||||
string matches to the given pattern.
|
||||
"""
|
||||
try:
|
||||
with utils.wrap_all_failures():
|
||||
callable_obj(*args, **kwargs)
|
||||
except exceptions.WrappedFailure as e:
|
||||
self.assertThat(e, FailureRegexpMatcher(exc_class, pattern))
|
||||
|
||||
|
||||
class MockTestCase(TestCase):
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
import contextlib
|
||||
import networkx
|
||||
import testtools
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
|
||||
@@ -29,6 +30,8 @@ from taskflow.patterns import unordered_flow as uf
|
||||
import taskflow.engines
|
||||
|
||||
from taskflow.engines.action_engine import engine as eng
|
||||
from taskflow.engines.worker_based import engine as w_eng
|
||||
from taskflow.engines.worker_based import worker as wkr
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
@@ -89,11 +92,11 @@ class EngineTaskTest(utils.EngineTestBase):
|
||||
'fail reverted(Failure: RuntimeError: Woot!)',
|
||||
'fail REVERTED',
|
||||
'flow REVERTED']
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertEqual(self.values, expected)
|
||||
self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
|
||||
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
now_expected = expected + ['fail PENDING', 'flow PENDING'] + expected
|
||||
self.assertEqual(self.values, now_expected)
|
||||
self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
|
||||
@@ -121,7 +124,7 @@ class EngineTaskTest(utils.EngineTestBase):
|
||||
def test_nasty_failing_task_exception_reraised(self):
|
||||
flow = utils.NastyFailingTask()
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
|
||||
|
||||
class EngineLinearFlowTest(utils.EngineTestBase):
|
||||
@@ -154,7 +157,7 @@ class EngineLinearFlowTest(utils.EngineTestBase):
|
||||
utils.FailingTask(name='fail')
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertEqual(engine.storage.fetch_all(), {})
|
||||
|
||||
def test_sequential_flow_nested_blocks(self):
|
||||
@@ -173,7 +176,7 @@ class EngineLinearFlowTest(utils.EngineTestBase):
|
||||
utils.FailingTask(name='fail')
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
|
||||
def test_revert_not_run_task_is_not_reverted(self):
|
||||
flow = lf.Flow('revert-not-run').add(
|
||||
@@ -181,7 +184,7 @@ class EngineLinearFlowTest(utils.EngineTestBase):
|
||||
utils.NeverRunningTask(),
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertEqual(
|
||||
self.values,
|
||||
['fail reverted(Failure: RuntimeError: Woot!)'])
|
||||
@@ -195,32 +198,13 @@ class EngineLinearFlowTest(utils.EngineTestBase):
|
||||
)
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertEqual(
|
||||
self.values,
|
||||
['task1', 'task2',
|
||||
'fail reverted(Failure: RuntimeError: Woot!)',
|
||||
'task2 reverted(5)', 'task1 reverted(5)'])
|
||||
|
||||
def test_flow_failures_are_passed_to_revert(self):
|
||||
class CheckingTask(task.Task):
|
||||
def execute(m_self):
|
||||
return 'RESULT'
|
||||
|
||||
def revert(m_self, result, flow_failures):
|
||||
self.assertEqual(result, 'RESULT')
|
||||
self.assertEqual(list(flow_failures.keys()), ['fail1'])
|
||||
fail = flow_failures['fail1']
|
||||
self.assertIsInstance(fail, misc.Failure)
|
||||
self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!')
|
||||
|
||||
flow = lf.Flow('test').add(
|
||||
CheckingTask(),
|
||||
utils.FailingTask('fail1')
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
|
||||
|
||||
class EngineParallelFlowTest(utils.EngineTestBase):
|
||||
|
||||
@@ -254,7 +238,7 @@ class EngineParallelFlowTest(utils.EngineTestBase):
|
||||
utils.TaskNoRequiresNoReturns(name='task2')
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertIn('fail reverted(Failure: RuntimeError: Woot!)',
|
||||
self.values)
|
||||
|
||||
@@ -271,7 +255,7 @@ class EngineParallelFlowTest(utils.EngineTestBase):
|
||||
utils.FailingTask()
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
|
||||
def test_sequential_flow_two_tasks_with_resumption(self):
|
||||
flow = lf.Flow('lf-2-r').add(
|
||||
@@ -309,7 +293,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
|
||||
)
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
|
||||
# NOTE(imelnikov): we don't know if task 3 was run, but if it was,
|
||||
# it should have been reverted in correct order.
|
||||
@@ -338,7 +322,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
|
||||
)
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
|
||||
# NOTE(imelnikov): we don't know if task 3 was run, but if it was,
|
||||
# it should have been reverted in correct order.
|
||||
@@ -358,7 +342,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
|
||||
)
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertIn('fail reverted(Failure: RuntimeError: Woot!)',
|
||||
self.values)
|
||||
|
||||
@@ -379,7 +363,7 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
|
||||
)
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
self.assertNotIn('task2 reverted(5)', self.values)
|
||||
|
||||
|
||||
@@ -445,7 +429,7 @@ class EngineGraphFlowTest(utils.EngineTestBase):
|
||||
utils.SaveOrderTask(name='task1', provides='a'))
|
||||
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
|
||||
self.assertEqual(
|
||||
self.values,
|
||||
['task1', 'task2',
|
||||
@@ -460,7 +444,7 @@ class EngineGraphFlowTest(utils.EngineTestBase):
|
||||
utils.SaveOrderTask(name='task1', provides='a'))
|
||||
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
|
||||
self.assertEqual(engine.storage.get_flow_state(), states.FAILURE)
|
||||
|
||||
def test_graph_flow_with_multireturn_and_multiargs_tasks(self):
|
||||
@@ -500,11 +484,34 @@ class EngineGraphFlowTest(utils.EngineTestBase):
|
||||
self.assertIsInstance(graph, networkx.DiGraph)
|
||||
|
||||
|
||||
class EngineCheckingTaskTest(utils.EngineTestBase):
|
||||
|
||||
def test_flow_failures_are_passed_to_revert(self):
|
||||
class CheckingTask(task.Task):
|
||||
def execute(m_self):
|
||||
return 'RESULT'
|
||||
|
||||
def revert(m_self, result, flow_failures):
|
||||
self.assertEqual(result, 'RESULT')
|
||||
self.assertEqual(list(flow_failures.keys()), ['fail1'])
|
||||
fail = flow_failures['fail1']
|
||||
self.assertIsInstance(fail, misc.Failure)
|
||||
self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!')
|
||||
|
||||
flow = lf.Flow('test').add(
|
||||
CheckingTask(),
|
||||
utils.FailingTask('fail1')
|
||||
)
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
|
||||
|
||||
|
||||
class SingleThreadedEngineTest(EngineTaskTest,
|
||||
EngineLinearFlowTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
EngineGraphFlowTest,
|
||||
EngineCheckingTaskTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None):
|
||||
return taskflow.engines.load(flow,
|
||||
@@ -526,6 +533,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
EngineGraphFlowTest,
|
||||
EngineCheckingTaskTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel',
|
||||
@@ -556,6 +564,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
EngineGraphFlowTest,
|
||||
EngineCheckingTaskTest,
|
||||
test.TestCase):
|
||||
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
@@ -566,3 +575,63 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
|
||||
|
||||
class WorkerBasedEngineTest(EngineTaskTest,
|
||||
EngineLinearFlowTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
EngineGraphFlowTest,
|
||||
test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WorkerBasedEngineTest, self).setUp()
|
||||
self.exchange = 'test'
|
||||
self.topic = 'topic'
|
||||
self.transport = 'memory'
|
||||
worker_conf = {
|
||||
'exchange': self.exchange,
|
||||
'topic': self.topic,
|
||||
'tasks': [
|
||||
'taskflow.tests.utils'
|
||||
],
|
||||
'transport': self.transport
|
||||
}
|
||||
self.worker = wkr.Worker(**worker_conf)
|
||||
self.worker_thread = threading.Thread(target=self.worker.run)
|
||||
self.worker_thread.daemon = True
|
||||
self.worker_thread.start()
|
||||
# make sure worker is started before we can continue
|
||||
self.worker.wait()
|
||||
|
||||
def tearDown(self):
|
||||
self.worker.stop()
|
||||
self.worker_thread.join()
|
||||
super(WorkerBasedEngineTest, self).tearDown()
|
||||
|
||||
def _make_engine(self, flow, flow_detail=None):
|
||||
engine_conf = {
|
||||
'engine': 'worker-based',
|
||||
'exchange': self.exchange,
|
||||
'workers_info': {
|
||||
self.topic: [
|
||||
'taskflow.tests.utils.SaveOrderTask',
|
||||
'taskflow.tests.utils.FailingTask',
|
||||
'taskflow.tests.utils.TaskOneReturn',
|
||||
'taskflow.tests.utils.TaskMultiReturn',
|
||||
'taskflow.tests.utils.TaskMultiArgOneReturn',
|
||||
'taskflow.tests.utils.NastyTask',
|
||||
'taskflow.tests.utils.NastyFailingTask',
|
||||
'taskflow.tests.utils.NeverRunningTask',
|
||||
'taskflow.tests.utils.TaskNoRequiresNoReturns'
|
||||
]
|
||||
},
|
||||
'transport': self.transport
|
||||
}
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
|
||||
def test_correct_load(self):
|
||||
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
|
||||
self.assertIsInstance(engine, w_eng.WorkerBasedActionEngine)
|
||||
|
||||
@@ -20,14 +20,30 @@ import contextlib
|
||||
|
||||
import six
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow.persistence.backends import impl_memory
|
||||
from taskflow import task
|
||||
from taskflow.utils import misc
|
||||
|
||||
ARGS_KEY = '__args__'
|
||||
KWARGS_KEY = '__kwargs__'
|
||||
ORDER_KEY = '__order__'
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def wrap_all_failures():
|
||||
"""Convert any exceptions to WrappedFailure.
|
||||
|
||||
When you expect several failures, it may be convenient
|
||||
to wrap any exception with WrappedFailure in order to
|
||||
unify error handling.
|
||||
"""
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
raise exceptions.WrappedFailure([misc.Failure()])
|
||||
|
||||
|
||||
def make_reverting_task(token, blowup=False):
|
||||
|
||||
def do_revert(context, *args, **kwargs):
|
||||
|
||||
Reference in New Issue
Block a user