diff --git a/nova/process.py b/nova/process.py index ff789a08..ebfb2f4b 100644 --- a/nova/process.py +++ b/nova/process.py @@ -85,7 +85,7 @@ class _BackRelay(protocol.ProcessProtocol): def errReceivedIsBad(self, text): if self.deferred is not None: self.onProcessEnded = defer.Deferred() - err = _UnexpectedErrorOutput(text, self.onProcessEnded) + err = UnexpectedErrorOutput(text, self.onProcessEnded) self.deferred.errback(failure.Failure(err)) self.deferred = None self.transport.loseConnection() @@ -152,8 +152,8 @@ def getProcessOutput(executable, args=None, env=None, path=None, reactor=None, d = defer.Deferred() p = BackRelayWithInput( d, startedDeferred=startedDeferred, error_ok=error_ok, input=input) - # VISH: commands come in as unicode, but self.executes needs - # strings or process.spawn raises a deprecation warning + # NOTE(vish): commands come in as unicode, but self.executes needs + # strings or process.spawn raises a deprecation warning executable = str(executable) if not args is None: args = [str(x) for x in args] @@ -171,7 +171,7 @@ class ProcessPool(object): self.size = size and size or FLAGS.process_pool_size self._pool = defer.DeferredSemaphore(self.size) - def simpleExecute(self, cmd, **kw): + def simple_execute(self, cmd, **kw): """ Weak emulation of the old utils.execute() function. This only exists as a way to quickly move old execute methods to @@ -205,34 +205,10 @@ class ProcessPool(object): self._pool.release() return rv - -class Pool(object): - """ A simple process pool implementation around mutliprocessing. - - Allows up to `size` processes at a time and queues the rest. - - Using workarounds for multiprocessing behavior described in: - http://pypi.python.org/pypi/twisted.internet.processes/1.0b1 - """ - - def __init__(self, size=None): - self._size = size - self._pool = multiprocessing.Pool(size) - self._registerShutdown() - - def _registerShutdown(self): - reactor.addSystemEventTrigger( - 'during', 'shutdown', self.shutdown, reactor) - - def shutdown(self, reactor=None): - if not self._pool: - return - self._pool.close() - # wait for workers to finish - self._pool.terminate() - self._pool = None - - def apply(self, f, *args, **kw): - """ Add a task to the pool and return a deferred. """ - result = self._pool.apply_async(f, args, kw) - return threads.deferToThread(result.get) +class SharedPool(ProcessPool): + _instance = None + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super(SharedPool, cls).__new__( + cls, *args, **kwargs) + return cls._instance diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py index 01648961..1c15b69a 100644 --- a/nova/tests/process_unittest.py +++ b/nova/tests/process_unittest.py @@ -37,7 +37,7 @@ class ProcessTestCase(test.TrialTestCase): def test_execute_stdout(self): pool = process.ProcessPool(2) - d = pool.simpleExecute('echo test') + d = pool.simple_execute('echo test') def _check(rv): self.assertEqual(rv[0], 'test\n') self.assertEqual(rv[1], '') @@ -48,38 +48,38 @@ class ProcessTestCase(test.TrialTestCase): def test_execute_stderr(self): pool = process.ProcessPool(2) - d = pool.simpleExecute('cat BAD_FILE', error_ok=1) + d = pool.simple_execute('cat BAD_FILE', error_ok=1) def _check(rv): self.assertEqual(rv[0], '') self.assert_('No such file' in rv[1]) - + d.addCallback(_check) d.addErrback(self.fail) return d def test_execute_unexpected_stderr(self): pool = process.ProcessPool(2) - d = pool.simpleExecute('cat BAD_FILE') + d = pool.simple_execute('cat BAD_FILE') d.addCallback(lambda x: self.fail('should have raised an error')) d.addErrback(lambda failure: failure.trap(IOError)) return d - + def test_max_processes(self): pool = process.ProcessPool(2) - d1 = pool.simpleExecute('sleep 0.01') - d2 = pool.simpleExecute('sleep 0.01') - d3 = pool.simpleExecute('sleep 0.005') - d4 = pool.simpleExecute('sleep 0.005') + d1 = pool.simple_execute('sleep 0.01') + d2 = pool.simple_execute('sleep 0.01') + d3 = pool.simple_execute('sleep 0.005') + d4 = pool.simple_execute('sleep 0.005') called = [] def _called(rv, name): called.append(name) - + d1.addCallback(_called, 'd1') d2.addCallback(_called, 'd2') d3.addCallback(_called, 'd3') d4.addCallback(_called, 'd4') - + # Make sure that d3 and d4 had to wait on the other two and were called # in order # NOTE(termie): there may be a race condition in this test if for some @@ -92,25 +92,31 @@ class ProcessTestCase(test.TrialTestCase): def test_kill_long_process(self): pool = process.ProcessPool(2) - - d1 = pool.simpleExecute('sleep 1') - d2 = pool.simpleExecute('sleep 0.005') + + d1 = pool.simple_execute('sleep 1') + d2 = pool.simple_execute('sleep 0.005') timeout = reactor.callLater(0.1, self.fail, 'should have been killed') - + # kill d1 and wait on it to end then cancel the timeout d2.addCallback(lambda _: d1.process.signalProcess('KILL')) d2.addCallback(lambda _: d1) d2.addBoth(lambda _: timeout.active() and timeout.cancel()) d2.addErrback(self.fail) return d2 - + def test_process_exit_is_contained(self): pool = process.ProcessPool(2) - - d1 = pool.simpleExecute('sleep 1') + + d1 = pool.simple_execute('sleep 1') d1.addCallback(lambda x: self.fail('should have errbacked')) d1.addErrback(lambda fail: fail.trap(IOError)) reactor.callLater(0.05, d1.process.signalProcess, 'KILL') - + return d1 + + def test_shared_pool_is_singleton(self): + pool1 = process.SharedPool() + pool2 = process.SharedPool() + self.assert_(id(pool1) == id(pool2)) +