Test more engine types in argument passing unit test

Change-Id: I8cdca39b488fecef495a501640a1b2c7efaf6b79
This commit is contained in:
Joshua Harlow
2015-04-22 14:38:16 -07:00
parent f734467ddc
commit 43194f554e

View File

@@ -14,10 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import testtools
import taskflow.engines
from taskflow import exceptions as exc
from taskflow import test
from taskflow.tests import utils
from taskflow.types import futures
from taskflow.utils import eventlet_utils as eu
class ArgumentsPassingTest(utils.EngineTestBase):
@@ -160,8 +164,8 @@ class ArgumentsPassingTest(utils.EngineTestBase):
})
class SingleThreadedEngineTest(ArgumentsPassingTest,
test.TestCase):
class SerialEngineTest(ArgumentsPassingTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
@@ -169,10 +173,43 @@ class SingleThreadedEngineTest(ArgumentsPassingTest,
backend=self.backend)
class MultiThreadedEngineTest(ArgumentsPassingTest,
test.TestCase):
class ParallelEngineWithThreadsTest(ArgumentsPassingTest, test.TestCase):
_EXECUTOR_WORKERS = 2
def _make_engine(self, flow, flow_detail=None, executor=None):
return taskflow.engines.load(flow, flow_detail=flow_detail,
if executor is None:
executor = 'threads'
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine='parallel',
backend=self.backend,
executor=executor,
max_workers=self._EXECUTOR_WORKERS)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(ArgumentsPassingTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = futures.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow,
flow_detail=flow_detail,
backend=self.backend,
engine='parallel',
executor=executor)
class ParallelEngineWithProcessTest(ArgumentsPassingTest, test.TestCase):
_EXECUTOR_WORKERS = 2
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = 'processes'
return taskflow.engines.load(flow,
flow_detail=flow_detail,
backend=self.backend,
engine='parallel',
executor=executor,
max_workers=self._EXECUTOR_WORKERS)