diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 923149dd..1c925678 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -16,7 +16,6 @@ import abc import collections -import multiprocessing from multiprocessing import managers import os import pickle @@ -94,6 +93,16 @@ def _revert_task(task, arguments, result, failures, progress_callback=None): return (REVERTED, result) +class _ViewableSyncManager(managers.SyncManager): + """Manager that exposes its state as methods.""" + + def is_shutdown(self): + return self._state.value == managers.State.SHUTDOWN + + def is_running(self): + return self._state.value == managers.State.STARTED + + class _Channel(object): """Helper wrapper around a multiprocessing queue used by a worker.""" @@ -439,7 +448,7 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): dispatch_periodicity=None): super(ParallelProcessTaskExecutor, self).__init__( executor=executor, max_workers=max_workers) - self._manager = multiprocessing.Manager() + self._manager = _ViewableSyncManager() self._dispatcher = _Dispatcher( dispatch_periodicity=dispatch_periodicity) # Only created after starting... @@ -454,12 +463,10 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): raise RuntimeError("Worker thread must be stopped via stop()" " before starting/restarting") super(ParallelProcessTaskExecutor, self).start() - # TODO(harlowja): do something else here besides accessing a state - # of the manager internals (it doesn't seem to expose any way to know - # this information)... - if self._manager._state.value == managers.State.SHUTDOWN: - self._manager = multiprocessing.Manager() - if self._manager._state.value == managers.State.INITIAL: + # These don't seem restartable; make a new one... + if self._manager.is_shutdown(): + self._manager = _ViewableSyncManager() + if not self._manager.is_running(): self._manager.start() self._dispatcher.reset() self._queue = self._manager.Queue()