diff --git a/doc/source/conf.py b/doc/source/conf.py index 2fb1e7ec..9dec3b69 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -56,6 +56,7 @@ modindex_common_prefix = ['taskflow.'] # Shortened external links. extlinks = { 'example': (source_tree + '/taskflow/examples/%s.py', ''), + 'pybug': ('http://bugs.python.org/issue%s', ''), } # -- Options for HTML output -------------------------------------------------- diff --git a/doc/source/engines.rst b/doc/source/engines.rst index 0c4b822f..474fa666 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -160,7 +160,8 @@ Parallel **Engine type**: ``'parallel'`` -Parallel engine schedules tasks onto different threads to run them in parallel. +A parallel engine schedules tasks onto different threads/processes to allow for +running non-dependent tasks simultaneously. Additional supported keyword arguments: @@ -168,17 +169,24 @@ Additional supported keyword arguments: interface; it will be used for scheduling tasks. You can use instances of a `thread pool executor`_ or a :py:class:`green executor ` (which internally uses - `eventlet `_ and greenthread pools). + `eventlet `_ and greenthread pools) or a + `process pool executor`_. .. tip:: - Sharing executor between engine instances provides better - scalability by reducing thread creation and teardown as well as by reusing - existing pools (which is a good practice in general). + Sharing an executor between engine instances provides better + scalability by reducing thread/process creation and teardown as well as by + reusing existing pools (which is a good practice in general). .. note:: - Running tasks with a `process pool executor`_ is not currently supported. + Running tasks with a `process pool executor`_ is **experimentally** + supported. This is mainly due to the `futures backport`_ and + the `multiprocessing`_ module that exist in older versions of python not + being as up to date (with important fixes such as :pybug:`4892`, + :pybug:`6721`, :pybug:`9205`, :pybug:`11635`, :pybug:`16284`, + :pybug:`22393` and others...) as the most recent python version (which + themselves have a variety of ongoing/recent bugs). Worker-based ------------ @@ -347,8 +355,10 @@ Hierarchy taskflow.engines.worker_based.engine.WorkerBasedActionEngine :parts: 1 +.. _multiprocessing: https://docs.python.org/2/library/multiprocessing.html .. _future: https://docs.python.org/dev/library/concurrent.futures.html#future-objects .. _executor: https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.Executor .. _networkx: https://networkx.github.io/ +.. _futures backport: https://pypi.python.org/pypi/futures .. _thread pool executor: https://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor .. _process pool executor: https://docs.python.org/dev/library/concurrent.futures.html#processpoolexecutor diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index ffc3a80a..118d9dd3 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -14,9 +14,11 @@ # License for the specific language governing permissions and limitations # under the License. +import abc import contextlib import threading +from concurrent import futures from oslo.utils import excutils from taskflow.engines.action_engine import compiler @@ -58,7 +60,6 @@ class ActionEngine(base.Engine): the tasks and flow being ran can go through. """ _compiler_factory = compiler.PatternCompiler - _task_executor_factory = executor.SerialTaskExecutor def __init__(self, flow, flow_detail, backend, options): super(ActionEngine, self).__init__(flow, flow_detail, backend, options) @@ -202,9 +203,10 @@ class ActionEngine(base.Engine): self._runtime.reset_all() self._change_state(states.PENDING) - @misc.cachedproperty + @abc.abstractproperty def _task_executor(self): return self._task_executor_factory() + pass @misc.cachedproperty def _compiler(self): @@ -226,12 +228,26 @@ class SerialActionEngine(ActionEngine): """Engine that runs tasks in serial manner.""" _storage_factory = atom_storage.SingleThreadedStorage + @misc.cachedproperty + def _task_executor(self): + return executor.SerialTaskExecutor() + class ParallelActionEngine(ActionEngine): """Engine that runs tasks in parallel manner.""" _storage_factory = atom_storage.MultiThreadedStorage - def _task_executor_factory(self): - return executor.ParallelTaskExecutor( - executor=self._options.get('executor'), - max_workers=self._options.get('max_workers')) + @misc.cachedproperty + def _task_executor(self): + kwargs = { + 'executor': self._options.get('executor'), + 'max_workers': self._options.get('max_workers'), + } + # The reason we use the library/built-in futures is to allow for + # instances of that to be detected and handled correctly, instead of + # forcing everyone to use our derivatives... + if isinstance(kwargs['executor'], futures.ProcessPoolExecutor): + executor_cls = executor.ParallelProcessTaskExecutor + else: + executor_cls = executor.ParallelThreadTaskExecutor + return executor_cls(**kwargs) diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 002068b3..97756f54 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -75,7 +75,8 @@ class TaskExecutor(object): """ @abc.abstractmethod - def execute_task(self, task, task_uuid, arguments, progress_callback=None): + def execute_task(self, task, task_uuid, arguments, + progress_callback=None): """Schedules task execution.""" @abc.abstractmethod @@ -128,32 +129,69 @@ class ParallelTaskExecutor(TaskExecutor): def __init__(self, executor=None, max_workers=None): self._executor = executor self._max_workers = max_workers - self._create_executor = executor is None + self._own_executor = executor is None - def execute_task(self, task, task_uuid, arguments, progress_callback=None): - fut = self._executor.submit(_execute_task, - task, arguments, - progress_callback=progress_callback) + @abc.abstractmethod + def _create_executor(self, max_workers=None): + """Called when an executor has not been provided to make one.""" + + def _submit_task(self, func, task, *args, **kwargs): + fut = self._executor.submit(func, task, *args, **kwargs) fut.atom = task return fut + def execute_task(self, task, task_uuid, arguments, progress_callback=None): + return self._submit_task(_execute_task, task, arguments, + progress_callback=progress_callback) + def revert_task(self, task, task_uuid, arguments, result, failures, progress_callback=None): - fut = self._executor.submit(_revert_task, - task, arguments, result, failures, - progress_callback=progress_callback) - fut.atom = task - return fut + return self._submit_task(_revert_task, task, arguments, result, + failures, progress_callback=progress_callback) def start(self): - if self._create_executor: + if self._own_executor: if self._max_workers is not None: max_workers = self._max_workers else: max_workers = threading_utils.get_optimal_thread_count() - self._executor = futures.ThreadPoolExecutor(max_workers) + self._executor = self._create_executor(max_workers=max_workers) def stop(self): - if self._create_executor: + if self._own_executor: self._executor.shutdown(wait=True) self._executor = None + + +class ParallelThreadTaskExecutor(ParallelTaskExecutor): + """Executes tasks in parallel using a thread pool executor.""" + + def _create_executor(self, max_workers=None): + return futures.ThreadPoolExecutor(max_workers=max_workers) + + +class ParallelProcessTaskExecutor(ParallelTaskExecutor): + """Executes tasks in parallel using a process pool executor. + + NOTE(harlowja): this executor executes tasks in external processes, so that + implies that tasks that are sent to that external process are pickleable + since this is how the multiprocessing works (sending pickled objects back + and forth). + """ + + def _create_executor(self, max_workers=None): + return futures.ProcessPoolExecutor(max_workers=max_workers) + + def _submit_task(self, func, task, *args, **kwargs): + """Submit a function to run the given task (with given args/kwargs). + + NOTE(harlowja): task callbacks/notifications will not currently + work (they will be removed before being sent to the target process + for execution). + """ + kwargs.pop('progress_callback', None) + clone = task.copy(retain_listeners=False) + fut = super(ParallelProcessTaskExecutor, self)._submit_task( + func, clone, *args, **kwargs) + fut.atom = task + return fut diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index aefce23f..df915fc9 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -18,6 +18,7 @@ from taskflow.engines.action_engine import engine from taskflow.engines.worker_based import executor from taskflow.engines.worker_based import protocol as pr from taskflow import storage as t_storage +from taskflow.utils import misc class WorkerBasedActionEngine(engine.ActionEngine): @@ -44,7 +45,8 @@ class WorkerBasedActionEngine(engine.ActionEngine): _storage_factory = t_storage.SingleThreadedStorage - def _task_executor_factory(self): + @misc.cachedproperty + def _task_executor(self): try: return self._options['executor'] except KeyError: diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index bdef7bff..2827fb5f 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -34,12 +34,6 @@ from taskflow.utils import threading_utils as tu LOG = logging.getLogger(__name__) -def _is_alive(thread): - if not thread: - return False - return thread.is_alive() - - class PeriodicWorker(object): """Calls a set of functions when activated periodically. @@ -181,7 +175,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): self._requests_cache.cleanup(self._handle_expired_request) def _submit_task(self, task, task_uuid, action, arguments, - progress_callback, **kwargs): + progress_callback=None, **kwargs): """Submit task request to a worker.""" request = pr.Request(task, task_uuid, action, arguments, self._transition_timeout, **kwargs) @@ -239,13 +233,13 @@ class WorkerTaskExecutor(executor.TaskExecutor): def execute_task(self, task, task_uuid, arguments, progress_callback=None): return self._submit_task(task, task_uuid, pr.EXECUTE, arguments, - progress_callback) + progress_callback=progress_callback) def revert_task(self, task, task_uuid, arguments, result, failures, progress_callback=None): return self._submit_task(task, task_uuid, pr.REVERT, arguments, - progress_callback, result=result, - failures=failures) + progress_callback=progress_callback, + result=result, failures=failures) def wait_for_workers(self, workers=1, timeout=None): """Waits for geq workers to notify they are ready to do work. @@ -273,11 +267,11 @@ class WorkerTaskExecutor(executor.TaskExecutor): def start(self): """Starts proxy thread and associated topic notification thread.""" - if not _is_alive(self._proxy_thread): + if not tu.is_alive(self._proxy_thread): self._proxy_thread = tu.daemon_thread(self._proxy.start) self._proxy_thread.start() self._proxy.wait() - if not _is_alive(self._periodic_thread): + if not tu.is_alive(self._periodic_thread): self._periodic.reset() self._periodic_thread = tu.daemon_thread(self._periodic.start) self._periodic_thread.start() diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 1ff81353..baa5e81f 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -17,6 +17,7 @@ import contextlib import testtools +from testtools import testcase import taskflow.engines from taskflow.engines.action_engine import engine as eng @@ -602,11 +603,50 @@ class ParallelEngineWithEventletTest(EngineTaskTest, def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: executor = futures.GreenThreadPoolExecutor() + self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', executor=executor) +class ParallelEngineWithProcessTest(EngineTaskTest, + EngineLinearFlowTest, + EngineParallelFlowTest, + EngineLinearAndUnorderedExceptionsTest, + EngineGraphFlowTest, + EngineCheckingTaskTest, + test.TestCase): + _SKIP_TYPES = (utils.SaveOrderTask,) + + def test_correct_load(self): + engine = self._make_engine(utils.TaskNoRequiresNoReturns) + self.assertIsInstance(engine, eng.ParallelActionEngine) + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = futures.ProcessPoolExecutor(1) + self.addCleanup(executor.shutdown) + e = taskflow.engines.load(flow, flow_detail=flow_detail, + backend=self.backend, engine='parallel', + executor=executor) + # FIXME(harlowja): fix this so that we can actually tests these + # testcases, without having task/global test state that is retained + # and inspected; this doesn't work in a multi-process situation since + # the tasks execute in another process with its own memory/heap + # which this process later can't view/introspect... + try: + e.compile() + for a in e.compilation.execution_graph: + if isinstance(a, self._SKIP_TYPES): + baddies = [a.__name__ for a in self._SKIP_TYPES] + raise testcase.TestSkipped("Process engines can not" + " run flows that contain" + " %s tasks" % baddies) + except (TypeError, exc.TaskFlowException): + pass + return e + + class WorkerBasedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index b3749bca..5048401c 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -40,6 +40,13 @@ else: Event = threading.Event +def is_alive(thread): + """Helper to determine if a thread is alive (handles none safely).""" + if not thread: + return False + return thread.is_alive() + + def get_ident(): """Return the 'thread identifier' of the current thread.""" return _thread.get_ident()