Use executors instead of pools

A executor is a more generic concept than a pool and
also seems to work better with eventlet. This is also
a more supported model of performing concurrent operations
than using the mostly unknown multiprocessing thread
pool implementation.

Fixes: bug 1225275

Change-Id: I09e9a9000bc88cc57d51342b83b31f97790a62e9
This commit is contained in:
Joshua Harlow
2013-09-13 20:21:03 -07:00
parent 15b2af47ae
commit 86c60dfa60
4 changed files with 43 additions and 30 deletions

View File

@@ -16,9 +16,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import multiprocessing
import threading import threading
from multiprocessing import pool from concurrent import futures
from taskflow.engines.action_engine import parallel_action from taskflow.engines.action_engine import parallel_action
from taskflow.engines.action_engine import seq_action from taskflow.engines.action_engine import seq_action
@@ -175,7 +176,7 @@ class MultiThreadedActionEngine(ActionEngine):
translator_cls = MultiThreadedTranslator translator_cls = MultiThreadedTranslator
def __init__(self, flow, flow_detail=None, book=None, backend=None, def __init__(self, flow, flow_detail=None, book=None, backend=None,
thread_pool=None): executor=None):
if flow_detail is None: if flow_detail is None:
flow_detail = p_utils.create_flow_detail(flow, flow_detail = p_utils.create_flow_detail(flow,
book=book, book=book,
@@ -183,31 +184,40 @@ class MultiThreadedActionEngine(ActionEngine):
ActionEngine.__init__(self, flow, ActionEngine.__init__(self, flow,
storage=t_storage.ThreadSafeStorage(flow_detail, storage=t_storage.ThreadSafeStorage(flow_detail,
backend)) backend))
if thread_pool: if executor is not None:
self._thread_pool = thread_pool self._executor = executor
self._owns_thread_pool = False self._owns_executor = False
self._thread_count = -1
else: else:
self._thread_pool = None self._executor = None
self._owns_thread_pool = True self._owns_executor = True
# TODO(harlowja): allow this to be configurable??
@decorators.locked try:
def compile(self): self._thread_count = multiprocessing.cpu_count() + 1
ActionEngine.compile(self) except NotImplementedError:
if self._thread_pool is None: # NOTE(harlowja): apparently may raise so in this case we will
self._thread_pool = pool.ThreadPool() # just setup two threads since its hard to know what else we
# should do in this situation.
self._thread_count = 2
@decorators.locked @decorators.locked
def run(self): def run(self):
if self._owns_executor:
if self._executor is not None:
# The previous shutdown failed, something is very wrong.
raise exc.InvalidStateException("The previous shutdown() of"
" the executor powering this"
" engine failed. Something is"
" very very wrong.")
self._executor = futures.ThreadPoolExecutor(self._thread_count)
try: try:
ActionEngine.run(self) ActionEngine.run(self)
finally: finally:
# Ensure we close then join on the thread pool to make sure its # Don't forget to shutdown the executor!!
# resources get cleaned up correctly. if self._owns_executor and self._executor is not None:
if self._owns_thread_pool and self._thread_pool: self._executor.shutdown(wait=True)
self._thread_pool.close() self._executor = None
self._thread_pool.join()
self._thread_pool = None
@property @property
def thread_pool(self): def executor(self):
return self._thread_pool return self._executor

View File

@@ -29,7 +29,7 @@ class ParallelAction(base.Action):
self._actions.append(action) self._actions.append(action)
def _map(self, engine, fn): def _map(self, engine, fn):
pool = engine.thread_pool executor = engine.executor
def call_fn(action): def call_fn(action):
try: try:
@@ -40,7 +40,7 @@ class ParallelAction(base.Action):
return None return None
failures = [] failures = []
result_iter = pool.imap_unordered(call_fn, self._actions) result_iter = executor.map(call_fn, self._actions)
for result in result_iter: for result in result_iter:
if isinstance(result, misc.Failure): if isinstance(result, misc.Failure):
failures.append(result) failures.append(result)

View File

@@ -17,9 +17,10 @@
# under the License. # under the License.
import contextlib import contextlib
from multiprocessing import pool
import time import time
from concurrent import futures
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf from taskflow.patterns import unordered_flow as uf
@@ -447,20 +448,20 @@ class MultiThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest, EngineLinearFlowTest,
EngineParallelFlowTest, EngineParallelFlowTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, flow_detail=None, thread_pool=None): def _make_engine(self, flow, flow_detail=None, executor=None):
if flow_detail is None: if flow_detail is None:
flow_detail = p_utils.create_flow_detail(flow, self.book, flow_detail = p_utils.create_flow_detail(flow, self.book,
self.backend) self.backend)
return eng.MultiThreadedActionEngine(flow, backend=self.backend, return eng.MultiThreadedActionEngine(flow, backend=self.backend,
flow_detail=flow_detail, flow_detail=flow_detail,
thread_pool=thread_pool) executor=executor)
def test_using_common_pool(self): def test_using_common_pool(self):
flow = TestTask(self.values, name='task1') flow = TestTask(self.values, name='task1')
thread_pool = pool.ThreadPool() executor = futures.ThreadPoolExecutor(2)
e1 = self._make_engine(flow, thread_pool=thread_pool) e1 = self._make_engine(flow, executor=executor)
e2 = self._make_engine(flow, thread_pool=thread_pool) e2 = self._make_engine(flow, executor=executor)
self.assertIs(e1.thread_pool, e2.thread_pool) self.assertIs(e1.executor, e2.executor)
def test_parallel_revert_specific(self): def test_parallel_revert_specific(self):
flow = uf.Flow('p-r-r').add( flow = uf.Flow('p-r-r').add(

View File

@@ -11,3 +11,5 @@ threading2
Babel>=0.9.6 Babel>=0.9.6
# Used for backend storage engine loading # Used for backend storage engine loading
stevedore>=0.10 stevedore>=0.10
# Backport for concurrent.futures which exists in 3.2+
futures>=2.1.3