Create and use a multiprocessing sync manager subclass

Instead of accessing private variables of the manager
base class just create a subclass and more nicely expose
methods that can be used to introspect the managers state
and perform actions based on that state.

Change-Id: Ied570a25e52de94370b59d844ecdcc6d3551fd3d
This commit is contained in:
Joshua Harlow
2015-01-12 18:25:14 -08:00
parent 4c756ef852
commit eb536daa0e

View File

@@ -16,7 +16,6 @@
import abc
import collections
import multiprocessing
from multiprocessing import managers
import os
import pickle
@@ -94,6 +93,16 @@ def _revert_task(task, arguments, result, failures, progress_callback=None):
return (REVERTED, result)
class _ViewableSyncManager(managers.SyncManager):
"""Manager that exposes its state as methods."""
def is_shutdown(self):
return self._state.value == managers.State.SHUTDOWN
def is_running(self):
return self._state.value == managers.State.STARTED
class _Channel(object):
"""Helper wrapper around a multiprocessing queue used by a worker."""
@@ -437,7 +446,7 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
dispatch_periodicity=None):
super(ParallelProcessTaskExecutor, self).__init__(
executor=executor, max_workers=max_workers)
self._manager = multiprocessing.Manager()
self._manager = _ViewableSyncManager()
self._dispatcher = _Dispatcher(
dispatch_periodicity=dispatch_periodicity)
# Only created after starting...
@@ -452,12 +461,10 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
raise RuntimeError("Worker thread must be stopped via stop()"
" before starting/restarting")
super(ParallelProcessTaskExecutor, self).start()
# TODO(harlowja): do something else here besides accessing a state
# of the manager internals (it doesn't seem to expose any way to know
# this information)...
if self._manager._state.value == managers.State.SHUTDOWN:
self._manager = multiprocessing.Manager()
if self._manager._state.value == managers.State.INITIAL:
# These don't seem restartable; make a new one...
if self._manager.is_shutdown():
self._manager = _ViewableSyncManager()
if not self._manager.is_running():
self._manager.start()
self._dispatcher.reset()
self._queue = self._manager.Queue()