Merge "Create and use a multiprocessing sync manager subclass"
This commit is contained in:
@@ -16,7 +16,6 @@
|
||||
|
||||
import abc
|
||||
import collections
|
||||
import multiprocessing
|
||||
from multiprocessing import managers
|
||||
import os
|
||||
import pickle
|
||||
@@ -94,6 +93,16 @@ def _revert_task(task, arguments, result, failures, progress_callback=None):
|
||||
return (REVERTED, result)
|
||||
|
||||
|
||||
class _ViewableSyncManager(managers.SyncManager):
|
||||
"""Manager that exposes its state as methods."""
|
||||
|
||||
def is_shutdown(self):
|
||||
return self._state.value == managers.State.SHUTDOWN
|
||||
|
||||
def is_running(self):
|
||||
return self._state.value == managers.State.STARTED
|
||||
|
||||
|
||||
class _Channel(object):
|
||||
"""Helper wrapper around a multiprocessing queue used by a worker."""
|
||||
|
||||
@@ -439,7 +448,7 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
|
||||
dispatch_periodicity=None):
|
||||
super(ParallelProcessTaskExecutor, self).__init__(
|
||||
executor=executor, max_workers=max_workers)
|
||||
self._manager = multiprocessing.Manager()
|
||||
self._manager = _ViewableSyncManager()
|
||||
self._dispatcher = _Dispatcher(
|
||||
dispatch_periodicity=dispatch_periodicity)
|
||||
# Only created after starting...
|
||||
@@ -454,12 +463,10 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
|
||||
raise RuntimeError("Worker thread must be stopped via stop()"
|
||||
" before starting/restarting")
|
||||
super(ParallelProcessTaskExecutor, self).start()
|
||||
# TODO(harlowja): do something else here besides accessing a state
|
||||
# of the manager internals (it doesn't seem to expose any way to know
|
||||
# this information)...
|
||||
if self._manager._state.value == managers.State.SHUTDOWN:
|
||||
self._manager = multiprocessing.Manager()
|
||||
if self._manager._state.value == managers.State.INITIAL:
|
||||
# These don't seem restartable; make a new one...
|
||||
if self._manager.is_shutdown():
|
||||
self._manager = _ViewableSyncManager()
|
||||
if not self._manager.is_running():
|
||||
self._manager.start()
|
||||
self._dispatcher.reset()
|
||||
self._queue = self._manager.Queue()
|
||||
|
||||
Reference in New Issue
Block a user