From eb536daa0e63aa2e110ac48ff5da65b27b48473c Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 12 Jan 2015 18:25:14 -0800 Subject: [PATCH] Create and use a multiprocessing sync manager subclass Instead of accessing private variables of the manager base class just create a subclass and more nicely expose methods that can be used to introspect the managers state and perform actions based on that state. Change-Id: Ied570a25e52de94370b59d844ecdcc6d3551fd3d --- taskflow/engines/action_engine/executor.py | 23 ++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index dafc4f58..3af95999 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -16,7 +16,6 @@ import abc import collections -import multiprocessing from multiprocessing import managers import os import pickle @@ -94,6 +93,16 @@ def _revert_task(task, arguments, result, failures, progress_callback=None): return (REVERTED, result) +class _ViewableSyncManager(managers.SyncManager): + """Manager that exposes its state as methods.""" + + def is_shutdown(self): + return self._state.value == managers.State.SHUTDOWN + + def is_running(self): + return self._state.value == managers.State.STARTED + + class _Channel(object): """Helper wrapper around a multiprocessing queue used by a worker.""" @@ -437,7 +446,7 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): dispatch_periodicity=None): super(ParallelProcessTaskExecutor, self).__init__( executor=executor, max_workers=max_workers) - self._manager = multiprocessing.Manager() + self._manager = _ViewableSyncManager() self._dispatcher = _Dispatcher( dispatch_periodicity=dispatch_periodicity) # Only created after starting... @@ -452,12 +461,10 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): raise RuntimeError("Worker thread must be stopped via stop()" " before starting/restarting") super(ParallelProcessTaskExecutor, self).start() - # TODO(harlowja): do something else here besides accessing a state - # of the manager internals (it doesn't seem to expose any way to know - # this information)... - if self._manager._state.value == managers.State.SHUTDOWN: - self._manager = multiprocessing.Manager() - if self._manager._state.value == managers.State.INITIAL: + # These don't seem restartable; make a new one... + if self._manager.is_shutdown(): + self._manager = _ViewableSyncManager() + if not self._manager.is_running(): self._manager.start() self._dispatcher.reset() self._queue = self._manager.Queue()