Get the basics of a process executor working

Since we support various executors (threaded and distributed)
the next best executor when a threaded executor will not perform
and a distributed one requires to much setup is a local process
based one so it would be great to support this where we can.

Things that are currently (likely never) not going to work:

 * Non-pickleable/non-copyable tasks
 * Tasks that return non-pickleable/non-copyable results
 * Tasks that use non-pickleable/non-copyable args/kwargs

Part of blueprint process-executor

Change-Id: I966ae01d390c7217b858db3feb2db949ce5c08d1
This commit is contained in:
Joshua Harlow 2014-09-12 18:49:29 -07:00 committed by Joshua Harlow
parent 1467772a7b
commit 2a8fde1798
8 changed files with 147 additions and 39 deletions

View File

@ -56,6 +56,7 @@ modindex_common_prefix = ['taskflow.']
# Shortened external links.
extlinks = {
'example': (source_tree + '/taskflow/examples/%s.py', ''),
'pybug': ('http://bugs.python.org/issue%s', ''),
}
# -- Options for HTML output --------------------------------------------------

View File

@ -160,7 +160,8 @@ Parallel
**Engine type**: ``'parallel'``
Parallel engine schedules tasks onto different threads to run them in parallel.
A parallel engine schedules tasks onto different threads/processes to allow for
running non-dependent tasks simultaneously.
Additional supported keyword arguments:
@ -168,17 +169,24 @@ Additional supported keyword arguments:
interface; it will be used for scheduling tasks. You can use instances of a
`thread pool executor`_ or a :py:class:`green executor
<taskflow.types.futures.GreenThreadPoolExecutor>` (which internally uses
`eventlet <http://eventlet.net/>`_ and greenthread pools).
`eventlet <http://eventlet.net/>`_ and greenthread pools) or a
`process pool executor`_.
.. tip::
Sharing executor between engine instances provides better
scalability by reducing thread creation and teardown as well as by reusing
existing pools (which is a good practice in general).
Sharing an executor between engine instances provides better
scalability by reducing thread/process creation and teardown as well as by
reusing existing pools (which is a good practice in general).
.. note::
Running tasks with a `process pool executor`_ is not currently supported.
Running tasks with a `process pool executor`_ is **experimentally**
supported. This is mainly due to the `futures backport`_ and
the `multiprocessing`_ module that exist in older versions of python not
being as up to date (with important fixes such as :pybug:`4892`,
:pybug:`6721`, :pybug:`9205`, :pybug:`11635`, :pybug:`16284`,
:pybug:`22393` and others...) as the most recent python version (which
themselves have a variety of ongoing/recent bugs).
Worker-based
------------
@ -347,8 +355,10 @@ Hierarchy
taskflow.engines.worker_based.engine.WorkerBasedActionEngine
:parts: 1
.. _multiprocessing: https://docs.python.org/2/library/multiprocessing.html
.. _future: https://docs.python.org/dev/library/concurrent.futures.html#future-objects
.. _executor: https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.Executor
.. _networkx: https://networkx.github.io/
.. _futures backport: https://pypi.python.org/pypi/futures
.. _thread pool executor: https://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor
.. _process pool executor: https://docs.python.org/dev/library/concurrent.futures.html#processpoolexecutor

View File

@ -14,9 +14,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
import threading
from concurrent import futures
from oslo.utils import excutils
from taskflow.engines.action_engine import compiler
@ -58,7 +60,6 @@ class ActionEngine(base.Engine):
the tasks and flow being ran can go through.
"""
_compiler_factory = compiler.PatternCompiler
_task_executor_factory = executor.SerialTaskExecutor
def __init__(self, flow, flow_detail, backend, options):
super(ActionEngine, self).__init__(flow, flow_detail, backend, options)
@ -202,9 +203,10 @@ class ActionEngine(base.Engine):
self._runtime.reset_all()
self._change_state(states.PENDING)
@misc.cachedproperty
@abc.abstractproperty
def _task_executor(self):
return self._task_executor_factory()
pass
@misc.cachedproperty
def _compiler(self):
@ -226,12 +228,26 @@ class SerialActionEngine(ActionEngine):
"""Engine that runs tasks in serial manner."""
_storage_factory = atom_storage.SingleThreadedStorage
@misc.cachedproperty
def _task_executor(self):
return executor.SerialTaskExecutor()
class ParallelActionEngine(ActionEngine):
"""Engine that runs tasks in parallel manner."""
_storage_factory = atom_storage.MultiThreadedStorage
def _task_executor_factory(self):
return executor.ParallelTaskExecutor(
executor=self._options.get('executor'),
max_workers=self._options.get('max_workers'))
@misc.cachedproperty
def _task_executor(self):
kwargs = {
'executor': self._options.get('executor'),
'max_workers': self._options.get('max_workers'),
}
# The reason we use the library/built-in futures is to allow for
# instances of that to be detected and handled correctly, instead of
# forcing everyone to use our derivatives...
if isinstance(kwargs['executor'], futures.ProcessPoolExecutor):
executor_cls = executor.ParallelProcessTaskExecutor
else:
executor_cls = executor.ParallelThreadTaskExecutor
return executor_cls(**kwargs)

View File

@ -75,7 +75,8 @@ class TaskExecutor(object):
"""
@abc.abstractmethod
def execute_task(self, task, task_uuid, arguments, progress_callback=None):
def execute_task(self, task, task_uuid, arguments,
progress_callback=None):
"""Schedules task execution."""
@abc.abstractmethod
@ -128,32 +129,69 @@ class ParallelTaskExecutor(TaskExecutor):
def __init__(self, executor=None, max_workers=None):
self._executor = executor
self._max_workers = max_workers
self._create_executor = executor is None
self._own_executor = executor is None
def execute_task(self, task, task_uuid, arguments, progress_callback=None):
fut = self._executor.submit(_execute_task,
task, arguments,
progress_callback=progress_callback)
@abc.abstractmethod
def _create_executor(self, max_workers=None):
"""Called when an executor has not been provided to make one."""
def _submit_task(self, func, task, *args, **kwargs):
fut = self._executor.submit(func, task, *args, **kwargs)
fut.atom = task
return fut
def execute_task(self, task, task_uuid, arguments, progress_callback=None):
return self._submit_task(_execute_task, task, arguments,
progress_callback=progress_callback)
def revert_task(self, task, task_uuid, arguments, result, failures,
progress_callback=None):
fut = self._executor.submit(_revert_task,
task, arguments, result, failures,
progress_callback=progress_callback)
fut.atom = task
return fut
return self._submit_task(_revert_task, task, arguments, result,
failures, progress_callback=progress_callback)
def start(self):
if self._create_executor:
if self._own_executor:
if self._max_workers is not None:
max_workers = self._max_workers
else:
max_workers = threading_utils.get_optimal_thread_count()
self._executor = futures.ThreadPoolExecutor(max_workers)
self._executor = self._create_executor(max_workers=max_workers)
def stop(self):
if self._create_executor:
if self._own_executor:
self._executor.shutdown(wait=True)
self._executor = None
class ParallelThreadTaskExecutor(ParallelTaskExecutor):
"""Executes tasks in parallel using a thread pool executor."""
def _create_executor(self, max_workers=None):
return futures.ThreadPoolExecutor(max_workers=max_workers)
class ParallelProcessTaskExecutor(ParallelTaskExecutor):
"""Executes tasks in parallel using a process pool executor.
NOTE(harlowja): this executor executes tasks in external processes, so that
implies that tasks that are sent to that external process are pickleable
since this is how the multiprocessing works (sending pickled objects back
and forth).
"""
def _create_executor(self, max_workers=None):
return futures.ProcessPoolExecutor(max_workers=max_workers)
def _submit_task(self, func, task, *args, **kwargs):
"""Submit a function to run the given task (with given args/kwargs).
NOTE(harlowja): task callbacks/notifications will not currently
work (they will be removed before being sent to the target process
for execution).
"""
kwargs.pop('progress_callback', None)
clone = task.copy(retain_listeners=False)
fut = super(ParallelProcessTaskExecutor, self)._submit_task(
func, clone, *args, **kwargs)
fut.atom = task
return fut

View File

@ -18,6 +18,7 @@ from taskflow.engines.action_engine import engine
from taskflow.engines.worker_based import executor
from taskflow.engines.worker_based import protocol as pr
from taskflow import storage as t_storage
from taskflow.utils import misc
class WorkerBasedActionEngine(engine.ActionEngine):
@ -44,7 +45,8 @@ class WorkerBasedActionEngine(engine.ActionEngine):
_storage_factory = t_storage.SingleThreadedStorage
def _task_executor_factory(self):
@misc.cachedproperty
def _task_executor(self):
try:
return self._options['executor']
except KeyError:

View File

@ -34,12 +34,6 @@ from taskflow.utils import threading_utils as tu
LOG = logging.getLogger(__name__)
def _is_alive(thread):
if not thread:
return False
return thread.is_alive()
class PeriodicWorker(object):
"""Calls a set of functions when activated periodically.
@ -181,7 +175,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._requests_cache.cleanup(self._handle_expired_request)
def _submit_task(self, task, task_uuid, action, arguments,
progress_callback, **kwargs):
progress_callback=None, **kwargs):
"""Submit task request to a worker."""
request = pr.Request(task, task_uuid, action, arguments,
self._transition_timeout, **kwargs)
@ -239,13 +233,13 @@ class WorkerTaskExecutor(executor.TaskExecutor):
def execute_task(self, task, task_uuid, arguments,
progress_callback=None):
return self._submit_task(task, task_uuid, pr.EXECUTE, arguments,
progress_callback)
progress_callback=progress_callback)
def revert_task(self, task, task_uuid, arguments, result, failures,
progress_callback=None):
return self._submit_task(task, task_uuid, pr.REVERT, arguments,
progress_callback, result=result,
failures=failures)
progress_callback=progress_callback,
result=result, failures=failures)
def wait_for_workers(self, workers=1, timeout=None):
"""Waits for geq workers to notify they are ready to do work.
@ -273,11 +267,11 @@ class WorkerTaskExecutor(executor.TaskExecutor):
def start(self):
"""Starts proxy thread and associated topic notification thread."""
if not _is_alive(self._proxy_thread):
if not tu.is_alive(self._proxy_thread):
self._proxy_thread = tu.daemon_thread(self._proxy.start)
self._proxy_thread.start()
self._proxy.wait()
if not _is_alive(self._periodic_thread):
if not tu.is_alive(self._periodic_thread):
self._periodic.reset()
self._periodic_thread = tu.daemon_thread(self._periodic.start)
self._periodic_thread.start()

View File

@ -17,6 +17,7 @@
import contextlib
import testtools
from testtools import testcase
import taskflow.engines
from taskflow.engines.action_engine import engine as eng
@ -602,11 +603,50 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = futures.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',
executor=executor)
class ParallelEngineWithProcessTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineLinearAndUnorderedExceptionsTest,
EngineGraphFlowTest,
EngineCheckingTaskTest,
test.TestCase):
_SKIP_TYPES = (utils.SaveOrderTask,)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.ParallelActionEngine)
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = futures.ProcessPoolExecutor(1)
self.addCleanup(executor.shutdown)
e = taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',
executor=executor)
# FIXME(harlowja): fix this so that we can actually tests these
# testcases, without having task/global test state that is retained
# and inspected; this doesn't work in a multi-process situation since
# the tasks execute in another process with its own memory/heap
# which this process later can't view/introspect...
try:
e.compile()
for a in e.compilation.execution_graph:
if isinstance(a, self._SKIP_TYPES):
baddies = [a.__name__ for a in self._SKIP_TYPES]
raise testcase.TestSkipped("Process engines can not"
" run flows that contain"
" %s tasks" % baddies)
except (TypeError, exc.TaskFlowException):
pass
return e
class WorkerBasedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,

View File

@ -40,6 +40,13 @@ else:
Event = threading.Event
def is_alive(thread):
"""Helper to determine if a thread is alive (handles none safely)."""
if not thread:
return False
return thread.is_alive()
def get_ident():
"""Return the 'thread identifier' of the current thread."""
return _thread.get_ident()