Allow worker count to be specified when no executor provided

When a multi-threaded engine is used it is nice to be able
to inform the engine how many workers should be created when
it creates its own executor. To allow this to be possible accept
a new keywork argument that can be used to set this value.

Change-Id: I0095d548249372440abbcf9b5c3b8fa841ca0ea9
This commit is contained in:
Joshua Harlow
2014-08-17 15:07:07 -07:00
parent 8109207bae
commit 16d80b5236
2 changed files with 17 additions and 10 deletions

View File

@@ -216,9 +216,12 @@ class MultiThreadedActionEngine(ActionEngine):
_storage_factory = atom_storage.MultiThreadedStorage
def _task_executor_factory(self):
return executor.ParallelTaskExecutor(self._executor)
return executor.ParallelTaskExecutor(executor=self._executor,
max_workers=self._max_workers)
def __init__(self, flow, flow_detail, backend, conf, **kwargs):
def __init__(self, flow, flow_detail, backend, conf,
executor=None, max_workers=None):
super(MultiThreadedActionEngine, self).__init__(
flow, flow_detail, backend, conf)
self._executor = kwargs.get('executor')
self._executor = executor
self._max_workers = max_workers

View File

@@ -111,13 +111,14 @@ class SerialTaskExecutor(TaskExecutorBase):
class ParallelTaskExecutor(TaskExecutorBase):
"""Executes tasks in parallel.
Submits tasks to executor which should provide interface similar
Submits tasks to an executor which should provide an interface similar
to concurrent.Futures.Executor.
"""
def __init__(self, executor=None):
def __init__(self, executor=None, max_workers=None):
self._executor = executor
self._own_executor = executor is None
self._max_workers = max_workers
self._create_executor = executor is None
def execute_task(self, task, task_uuid, arguments, progress_callback=None):
return self._executor.submit(
@@ -133,11 +134,14 @@ class ParallelTaskExecutor(TaskExecutorBase):
return async_utils.wait_for_any(fs, timeout)
def start(self):
if self._own_executor:
thread_count = threading_utils.get_optimal_thread_count()
self._executor = futures.ThreadPoolExecutor(thread_count)
if self._create_executor:
if self._max_workers is not None:
max_workers = self._max_workers
else:
max_workers = threading_utils.get_optimal_thread_count()
self._executor = futures.ThreadPoolExecutor(max_workers)
def stop(self):
if self._own_executor:
if self._create_executor:
self._executor.shutdown(wait=True)
self._executor = None