From 261826674b6d556f0b23b0431fcae6f5202863a9 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 25 Jan 2016 13:28:50 -0800 Subject: [PATCH] Allow for specifying green threaded to parallel engine Currently when a string is passed to the parallel engine it will only know how to create a process or a native thread based executor. The futurist library also supports making a green thread based executor, so support creating it. This will save glance some code that they have to create a executor based on different options (one of those is a green option). Change-Id: I15c164a38b4445d28eb6062aed6c56cce0e0364b --- taskflow/engines/action_engine/engine.py | 9 ++++++ taskflow/engines/action_engine/executor.py | 18 ++++++++++++ taskflow/examples/hello_world.py | 34 +++++++++------------- taskflow/tests/unit/test_engines.py | 3 +- taskflow/tests/unit/test_retries.py | 4 +-- 5 files changed, 42 insertions(+), 26 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 721f64377..e31229b3c 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -519,6 +519,12 @@ String (case insensitive) Executor used ``thread`` :class:`~.executor.ParallelThreadTaskExecutor` ``threaded`` :class:`~.executor.ParallelThreadTaskExecutor` ``threads`` :class:`~.executor.ParallelThreadTaskExecutor` +``greenthread`` :class:`~.executor.ParallelThreadTaskExecutor` + (greened version) +``greedthreaded`` :class:`~.executor.ParallelThreadTaskExecutor` + (greened version) +``greenthreads`` :class:`~.executor.ParallelThreadTaskExecutor` + (greened version) =========================== =============================================== * ``max_workers``: a integer that will affect the number of parallel @@ -562,6 +568,9 @@ String (case insensitive) Executor used executor.ParallelProcessTaskExecutor), _ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']), executor.ParallelThreadTaskExecutor), + _ExecutorTextMatch(frozenset(['greenthread', 'greenthreads', + 'greenthreaded']), + executor.ParallelGreenThreadTaskExecutor), ] # Used when no executor is provided (either a string or object)... diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 1f7ec0b75..ff4f400d4 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -476,6 +476,24 @@ class ParallelThreadTaskExecutor(ParallelTaskExecutor): return futurist.ThreadPoolExecutor(max_workers=max_workers) +class ParallelGreenThreadTaskExecutor(ParallelThreadTaskExecutor): + """Executes tasks in parallel using a greenthread pool executor.""" + + DEFAULT_WORKERS = 1000 + """ + Default number of workers when ``None`` is passed; being that + greenthreads don't map to native threads or processors very well this + is more of a guess/somewhat arbitrary, but it does match what the eventlet + greenpool default size is (so at least it's consistent with what eventlet + does). + """ + + def _create_executor(self, max_workers=None): + if max_workers is None: + max_workers = self.DEFAULT_WORKERS + return futurist.GreenThreadPoolExecutor(max_workers=max_workers) + + class ParallelProcessTaskExecutor(ParallelTaskExecutor): """Executes tasks in parallel using a process pool executor. diff --git a/taskflow/examples/hello_world.py b/taskflow/examples/hello_world.py index 73fe5aa39..9f6f4dce3 100644 --- a/taskflow/examples/hello_world.py +++ b/taskflow/examples/hello_world.py @@ -25,8 +25,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) sys.path.insert(0, top_dir) -import futurist - from taskflow import engines from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf @@ -82,35 +80,29 @@ song.add(PrinterTask("conductor@begin", # Run in parallel using eventlet green threads... try: - executor = futurist.GreenThreadPoolExecutor() -except RuntimeError: + import eventlet as _eventlet # noqa +except ImportError: # No eventlet currently active, skip running with it... pass else: print("-- Running in parallel using eventlet --") - with executor: - e = engines.load(song, executor=executor, engine='parallel') - e.run() - print("-- Statistics gathered --") - print(e.statistics) + e = engines.load(song, executor='greenthreaded', engine='parallel', + max_workers=1) + e.run() # Run in parallel using real threads... -with futurist.ThreadPoolExecutor(max_workers=1) as executor: - print("-- Running in parallel using threads --") - e = engines.load(song, executor=executor, engine='parallel') - e.run() - print("-- Statistics gathered --") - print(e.statistics) +print("-- Running in parallel using threads --") +e = engines.load(song, executor='threaded', engine='parallel', + max_workers=1) +e.run() # Run in parallel using external processes... -with futurist.ProcessPoolExecutor(max_workers=1) as executor: - print("-- Running in parallel using processes --") - e = engines.load(song, executor=executor, engine='parallel') - e.run() - print("-- Statistics gathered --") - print(e.statistics) +print("-- Running in parallel using processes --") +e = engines.load(song, executor='processes', engine='parallel', + max_workers=1) +e.run() # Run serially (aka, if the workflow could have been ran in parallel, it will diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 4eb1fb549..d16ce5741 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -1449,8 +1449,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, flow_detail=None, executor=None, store=None, **kwargs): if executor is None: - executor = futurist.GreenThreadPoolExecutor() - self.addCleanup(executor.shutdown) + executor = 'greenthreads' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', executor=executor, diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 5f6a22e16..3d1cbb628 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import futurist import testtools import taskflow.engines @@ -1305,8 +1304,7 @@ class ParallelEngineWithEventletTest(RetryTest, test.TestCase): def _make_engine(self, flow, defer_reverts=None, flow_detail=None, executor=None): if executor is None: - executor = futurist.GreenThreadPoolExecutor() - self.addCleanup(executor.shutdown) + executor = 'greenthreads' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend,