Run some engine tests with eventlet if it's available

This change adds test cases that run some of engine tests against
parallel engine with GreenExecutor if eventlet is available, in addition
to current tests for single-threaded engine and parallel engine with
native threading executor.

Change-Id: I7be6de9a80034b7363fb9e0680d63090a05d1c12
This commit is contained in:
Ivan A. Melnikov
2013-12-12 13:07:31 +04:00
parent b56afe66a2
commit 6a2aad5e1e
2 changed files with 36 additions and 0 deletions

View File

@@ -18,6 +18,7 @@
import contextlib
import networkx
import testtools
from concurrent import futures
@@ -34,6 +35,8 @@ from taskflow import states
from taskflow import task
from taskflow import test
from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
@@ -606,3 +609,20 @@ class MultiThreadedEngineTest(EngineTaskTest,
self.assertIs(e1.executor, e2.executor)
finally:
executor.shutdown(wait=True)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineGraphFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = eu.GreenExecutor()
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)

View File

@@ -16,9 +16,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import testtools
import time
from taskflow.patterns import linear_flow as lf
from taskflow.utils import eventlet_utils as eu
import taskflow.engines
@@ -222,3 +224,17 @@ class MultiThreadedEngineTest(SuspendFlowTest,
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(SuspendFlowTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = eu.GreenExecutor()
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)