diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 00660c6f..ac88a1b9 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -18,6 +18,7 @@ import contextlib import networkx +import testtools from concurrent import futures @@ -34,6 +35,8 @@ from taskflow import states from taskflow import task from taskflow import test from taskflow.tests import utils + +from taskflow.utils import eventlet_utils as eu from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -606,3 +609,20 @@ class MultiThreadedEngineTest(EngineTaskTest, self.assertIs(e1.executor, e2.executor) finally: executor.shutdown(wait=True) + + +@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +class ParallelEngineWithEventletTest(EngineTaskTest, + EngineLinearFlowTest, + EngineParallelFlowTest, + EngineGraphFlowTest, + test.TestCase): + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = eu.GreenExecutor() + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py index 1d8226f7..72d53ca5 100644 --- a/taskflow/tests/unit/test_suspend_flow.py +++ b/taskflow/tests/unit/test_suspend_flow.py @@ -16,9 +16,11 @@ # License for the specific language governing permissions and limitations # under the License. +import testtools import time from taskflow.patterns import linear_flow as lf +from taskflow.utils import eventlet_utils as eu import taskflow.engines @@ -222,3 +224,17 @@ class MultiThreadedEngineTest(SuspendFlowTest, return taskflow.engines.load(flow, flow_detail=flow_detail, engine_conf=engine_conf, backend=self.backend) + + +@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +class ParallelEngineWithEventletTest(SuspendFlowTest, + test.TestCase): + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = eu.GreenExecutor() + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend)