Removed unused Pool from process.py, added a singleton pool called SharedPool, changed calls in node to use singleton pool

This commit is contained in:
Vishvananda Ishaya
2010-07-16 19:12:36 +00:00
parent e0c2a51a5c
commit d6eb6e5895
2 changed files with 36 additions and 54 deletions

View File

@@ -85,7 +85,7 @@ class _BackRelay(protocol.ProcessProtocol):
def errReceivedIsBad(self, text):
if self.deferred is not None:
self.onProcessEnded = defer.Deferred()
err = _UnexpectedErrorOutput(text, self.onProcessEnded)
err = UnexpectedErrorOutput(text, self.onProcessEnded)
self.deferred.errback(failure.Failure(err))
self.deferred = None
self.transport.loseConnection()
@@ -152,8 +152,8 @@ def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
d = defer.Deferred()
p = BackRelayWithInput(
d, startedDeferred=startedDeferred, error_ok=error_ok, input=input)
# VISH: commands come in as unicode, but self.executes needs
# strings or process.spawn raises a deprecation warning
# NOTE(vish): commands come in as unicode, but self.executes needs
# strings or process.spawn raises a deprecation warning
executable = str(executable)
if not args is None:
args = [str(x) for x in args]
@@ -171,7 +171,7 @@ class ProcessPool(object):
self.size = size and size or FLAGS.process_pool_size
self._pool = defer.DeferredSemaphore(self.size)
def simpleExecute(self, cmd, **kw):
def simple_execute(self, cmd, **kw):
""" Weak emulation of the old utils.execute() function.
This only exists as a way to quickly move old execute methods to
@@ -205,34 +205,10 @@ class ProcessPool(object):
self._pool.release()
return rv
class Pool(object):
""" A simple process pool implementation around mutliprocessing.
Allows up to `size` processes at a time and queues the rest.
Using workarounds for multiprocessing behavior described in:
http://pypi.python.org/pypi/twisted.internet.processes/1.0b1
"""
def __init__(self, size=None):
self._size = size
self._pool = multiprocessing.Pool(size)
self._registerShutdown()
def _registerShutdown(self):
reactor.addSystemEventTrigger(
'during', 'shutdown', self.shutdown, reactor)
def shutdown(self, reactor=None):
if not self._pool:
return
self._pool.close()
# wait for workers to finish
self._pool.terminate()
self._pool = None
def apply(self, f, *args, **kw):
""" Add a task to the pool and return a deferred. """
result = self._pool.apply_async(f, args, kw)
return threads.deferToThread(result.get)
class SharedPool(ProcessPool):
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(SharedPool, cls).__new__(
cls, *args, **kwargs)
return cls._instance

View File

@@ -37,7 +37,7 @@ class ProcessTestCase(test.TrialTestCase):
def test_execute_stdout(self):
pool = process.ProcessPool(2)
d = pool.simpleExecute('echo test')
d = pool.simple_execute('echo test')
def _check(rv):
self.assertEqual(rv[0], 'test\n')
self.assertEqual(rv[1], '')
@@ -48,38 +48,38 @@ class ProcessTestCase(test.TrialTestCase):
def test_execute_stderr(self):
pool = process.ProcessPool(2)
d = pool.simpleExecute('cat BAD_FILE', error_ok=1)
d = pool.simple_execute('cat BAD_FILE', error_ok=1)
def _check(rv):
self.assertEqual(rv[0], '')
self.assert_('No such file' in rv[1])
d.addCallback(_check)
d.addErrback(self.fail)
return d
def test_execute_unexpected_stderr(self):
pool = process.ProcessPool(2)
d = pool.simpleExecute('cat BAD_FILE')
d = pool.simple_execute('cat BAD_FILE')
d.addCallback(lambda x: self.fail('should have raised an error'))
d.addErrback(lambda failure: failure.trap(IOError))
return d
def test_max_processes(self):
pool = process.ProcessPool(2)
d1 = pool.simpleExecute('sleep 0.01')
d2 = pool.simpleExecute('sleep 0.01')
d3 = pool.simpleExecute('sleep 0.005')
d4 = pool.simpleExecute('sleep 0.005')
d1 = pool.simple_execute('sleep 0.01')
d2 = pool.simple_execute('sleep 0.01')
d3 = pool.simple_execute('sleep 0.005')
d4 = pool.simple_execute('sleep 0.005')
called = []
def _called(rv, name):
called.append(name)
d1.addCallback(_called, 'd1')
d2.addCallback(_called, 'd2')
d3.addCallback(_called, 'd3')
d4.addCallback(_called, 'd4')
# Make sure that d3 and d4 had to wait on the other two and were called
# in order
# NOTE(termie): there may be a race condition in this test if for some
@@ -92,25 +92,31 @@ class ProcessTestCase(test.TrialTestCase):
def test_kill_long_process(self):
pool = process.ProcessPool(2)
d1 = pool.simpleExecute('sleep 1')
d2 = pool.simpleExecute('sleep 0.005')
d1 = pool.simple_execute('sleep 1')
d2 = pool.simple_execute('sleep 0.005')
timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
# kill d1 and wait on it to end then cancel the timeout
d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
d2.addCallback(lambda _: d1)
d2.addBoth(lambda _: timeout.active() and timeout.cancel())
d2.addErrback(self.fail)
return d2
def test_process_exit_is_contained(self):
pool = process.ProcessPool(2)
d1 = pool.simpleExecute('sleep 1')
d1 = pool.simple_execute('sleep 1')
d1.addCallback(lambda x: self.fail('should have errbacked'))
d1.addErrback(lambda fail: fail.trap(IOError))
reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
return d1
def test_shared_pool_is_singleton(self):
pool1 = process.SharedPool()
pool2 = process.SharedPool()
self.assert_(id(pool1) == id(pool2))