From 43194f554e4951ca400d1bea3cf61adfa19292ca Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 22 Apr 2015 14:38:16 -0700 Subject: [PATCH] Test more engine types in argument passing unit test Change-Id: I8cdca39b488fecef495a501640a1b2c7efaf6b79 --- taskflow/tests/unit/test_arguments_passing.py | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py index c84d8534..a1ba2ac8 100644 --- a/taskflow/tests/unit/test_arguments_passing.py +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -14,10 +14,14 @@ # License for the specific language governing permissions and limitations # under the License. +import testtools + import taskflow.engines from taskflow import exceptions as exc from taskflow import test from taskflow.tests import utils +from taskflow.types import futures +from taskflow.utils import eventlet_utils as eu class ArgumentsPassingTest(utils.EngineTestBase): @@ -160,8 +164,8 @@ class ArgumentsPassingTest(utils.EngineTestBase): }) -class SingleThreadedEngineTest(ArgumentsPassingTest, - test.TestCase): +class SerialEngineTest(ArgumentsPassingTest, test.TestCase): + def _make_engine(self, flow, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, @@ -169,10 +173,43 @@ class SingleThreadedEngineTest(ArgumentsPassingTest, backend=self.backend) -class MultiThreadedEngineTest(ArgumentsPassingTest, - test.TestCase): +class ParallelEngineWithThreadsTest(ArgumentsPassingTest, test.TestCase): + _EXECUTOR_WORKERS = 2 + def _make_engine(self, flow, flow_detail=None, executor=None): - return taskflow.engines.load(flow, flow_detail=flow_detail, + if executor is None: + executor = 'threads' + return taskflow.engines.load(flow, + flow_detail=flow_detail, engine='parallel', backend=self.backend, + executor=executor, + max_workers=self._EXECUTOR_WORKERS) + + +@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +class ParallelEngineWithEventletTest(ArgumentsPassingTest, test.TestCase): + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = futures.GreenThreadPoolExecutor() + self.addCleanup(executor.shutdown) + return taskflow.engines.load(flow, + flow_detail=flow_detail, + backend=self.backend, + engine='parallel', executor=executor) + + +class ParallelEngineWithProcessTest(ArgumentsPassingTest, test.TestCase): + _EXECUTOR_WORKERS = 2 + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = 'processes' + return taskflow.engines.load(flow, + flow_detail=flow_detail, + backend=self.backend, + engine='parallel', + executor=executor, + max_workers=self._EXECUTOR_WORKERS)