Adds a Twisted implementation of a process pool
Meant for use instead of utils.execute()
This commit is contained in:
parent
3b594c7cd3
commit
b07af87974
113
nova/process.py
113
nova/process.py
@ -19,15 +19,41 @@ Process pool, still buggy right now.
|
||||
|
||||
import logging
|
||||
import multiprocessing
|
||||
import StringIO
|
||||
|
||||
from nova import vendor
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet import error
|
||||
from twisted.internet import process
|
||||
from twisted.internet import protocol
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet import threads
|
||||
from twisted.python import failure
|
||||
|
||||
from nova import flags
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_integer('process_pool_size', 4,
|
||||
'Number of processes to use in the process pool')
|
||||
|
||||
|
||||
# NOTE(termie): this is copied from twisted.internet.utils but since
|
||||
# they don't export it I've copied.
|
||||
# they don't export it I've copied and modified
|
||||
class UnexpectedErrorOutput(IOError):
|
||||
"""
|
||||
Standard error data was received where it was not expected. This is a
|
||||
subclass of L{IOError} to preserve backward compatibility with the previous
|
||||
error behavior of L{getProcessOutput}.
|
||||
|
||||
@ivar processEnded: A L{Deferred} which will fire when the process which
|
||||
produced the data on stderr has ended (exited and all file descriptors
|
||||
closed).
|
||||
"""
|
||||
def __init__(self, stdout=None, stderr=None):
|
||||
IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr))
|
||||
|
||||
|
||||
# NOTE(termie): this too
|
||||
class _BackRelay(protocol.ProcessProtocol):
|
||||
"""
|
||||
Trivial protocol for communicating with a process and turning its output
|
||||
@ -77,28 +103,103 @@ class _BackRelay(protocol.ProcessProtocol):
|
||||
|
||||
|
||||
class BackRelayWithInput(_BackRelay):
|
||||
def __init__(self, deferred, errortoo=0, input=None):
|
||||
super(BackRelayWithInput, self).__init__(deferred, errortoo)
|
||||
def __init__(self, deferred, startedDeferred=None, error_ok=0,
|
||||
input=None):
|
||||
# Twisted doesn't use new-style classes in most places :(
|
||||
_BackRelay.__init__(self, deferred, errortoo=error_ok)
|
||||
self.error_ok = error_ok
|
||||
self.input = input
|
||||
self.stderr = StringIO.StringIO()
|
||||
self.startedDeferred = startedDeferred
|
||||
|
||||
def errReceivedIsBad(self, text):
|
||||
self.stderr.write(text)
|
||||
self.transport.loseConnection()
|
||||
|
||||
def errReceivedIsGood(self, text):
|
||||
self.stderr.write(text)
|
||||
|
||||
def connectionMade(self):
|
||||
if self.startedDeferred:
|
||||
self.startedDeferred.callback(self)
|
||||
if self.input:
|
||||
self.transport.write(self.input)
|
||||
self.transport.closeStdin()
|
||||
|
||||
def processEnded(self, reason):
|
||||
if self.deferred is not None:
|
||||
stdout, stderr = self.s.getvalue(), self.stderr.getvalue()
|
||||
try:
|
||||
# NOTE(termie): current behavior means if error_ok is True
|
||||
# we won't throw an error even if the process
|
||||
# exited with a non-0 status, so you can't be
|
||||
# okay with stderr output and not with bad exit
|
||||
# codes.
|
||||
if not self.error_ok:
|
||||
reason.trap(error.ProcessDone)
|
||||
self.deferred.callback((stdout, stderr))
|
||||
except:
|
||||
self.deferred.errback(UnexpectedErrorOutput(stdout, stderr))
|
||||
|
||||
|
||||
def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
|
||||
errortoo=0, input=None):
|
||||
error_ok=0, input=None, startedDeferred=None):
|
||||
if reactor is None:
|
||||
from twisted.internet import reactor
|
||||
args = args and args or ()
|
||||
env = env and env and {}
|
||||
d = defer.Deferred()
|
||||
p = BackRelayWithInput(d, errortoo=errortoo, input=input)
|
||||
p = BackRelayWithInput(
|
||||
d, startedDeferred=startedDeferred, error_ok=error_ok, input=input)
|
||||
reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path)
|
||||
return d
|
||||
|
||||
|
||||
class ProcessPool(object):
|
||||
""" A simple process pool implementation using Twisted's Process bits.
|
||||
|
||||
This is pretty basic right now, but hopefully the API will be the correct
|
||||
one so that it can be optimized later.
|
||||
"""
|
||||
def __init__(self, size=None):
|
||||
self.size = size and size or FLAGS.process_pool_size
|
||||
self._pool = defer.DeferredSemaphore(self.size)
|
||||
|
||||
def simpleExecute(self, cmd, **kw):
|
||||
""" Weak emulation of the old utils.execute() function.
|
||||
|
||||
This only exists as a way to quickly move old execute methods to
|
||||
this new style of code.
|
||||
|
||||
NOTE(termie): This will break on args with spaces in them.
|
||||
"""
|
||||
parsed = cmd.split(' ')
|
||||
executable, args = parsed[0], parsed[1:]
|
||||
return self.execute(executable, args, **kw)
|
||||
|
||||
def execute(self, *args, **kw):
|
||||
d = self._pool.acquire()
|
||||
|
||||
def _associateProcess(proto):
|
||||
d.process = proto.transport
|
||||
return proto.transport
|
||||
|
||||
started = defer.Deferred()
|
||||
started.addCallback(_associateProcess)
|
||||
kw.setdefault('startedDeferred', started)
|
||||
|
||||
d.process = None
|
||||
d.started = started
|
||||
|
||||
d.addCallback(lambda _: getProcessOutput(*args, **kw))
|
||||
d.addBoth(self._release)
|
||||
return d
|
||||
|
||||
def _release(self, rv=None):
|
||||
self._pool.release()
|
||||
return rv
|
||||
|
||||
|
||||
class Pool(object):
|
||||
""" A simple process pool implementation around mutliprocessing.
|
||||
|
||||
|
115
nova/tests/process_unittest.py
Normal file
115
nova/tests/process_unittest.py
Normal file
@ -0,0 +1,115 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
# Copyright [2010] [Anso Labs, LLC]
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from xml.etree import ElementTree
|
||||
|
||||
from nova import vendor
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import reactor
|
||||
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova import process
|
||||
from nova import test
|
||||
from nova import utils
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
|
||||
|
||||
class ProcessTestCase(test.TrialTestCase):
|
||||
def setUp(self):
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
super(ProcessTestCase, self).setUp()
|
||||
|
||||
def test_execute_stdout(self):
|
||||
pool = process.ProcessPool(2)
|
||||
d = pool.simpleExecute('echo test')
|
||||
def _check(rv):
|
||||
self.assertEqual(rv[0], 'test\n')
|
||||
self.assertEqual(rv[1], '')
|
||||
|
||||
d.addCallback(_check)
|
||||
d.addErrback(self.fail)
|
||||
return d
|
||||
|
||||
def test_execute_stderr(self):
|
||||
pool = process.ProcessPool(2)
|
||||
d = pool.simpleExecute('cat BAD_FILE', error_ok=1)
|
||||
def _check(rv):
|
||||
self.assertEqual(rv[0], '')
|
||||
self.assert_('No such file' in rv[1])
|
||||
|
||||
d.addCallback(_check)
|
||||
d.addErrback(self.fail)
|
||||
return d
|
||||
|
||||
def test_execute_unexpected_stderr(self):
|
||||
pool = process.ProcessPool(2)
|
||||
d = pool.simpleExecute('cat BAD_FILE')
|
||||
d.addCallback(lambda x: self.fail('should have raised an error'))
|
||||
d.addErrback(lambda failure: failure.trap(IOError))
|
||||
return d
|
||||
|
||||
def test_max_processes(self):
|
||||
pool = process.ProcessPool(2)
|
||||
d1 = pool.simpleExecute('sleep 0.01')
|
||||
d2 = pool.simpleExecute('sleep 0.01')
|
||||
d3 = pool.simpleExecute('sleep 0.005')
|
||||
d4 = pool.simpleExecute('sleep 0.005')
|
||||
|
||||
called = []
|
||||
def _called(rv, name):
|
||||
called.append(name)
|
||||
|
||||
d1.addCallback(_called, 'd1')
|
||||
d2.addCallback(_called, 'd2')
|
||||
d3.addCallback(_called, 'd3')
|
||||
d4.addCallback(_called, 'd4')
|
||||
|
||||
# Make sure that d3 and d4 had to wait on the other two and were called
|
||||
# in order
|
||||
# NOTE(termie): there may be a race condition in this test if for some
|
||||
# reason one of the sleeps takes longer to complete
|
||||
# than it should
|
||||
d4.addCallback(lambda x: self.assertEqual(called[2], 'd3'))
|
||||
d4.addCallback(lambda x: self.assertEqual(called[3], 'd4'))
|
||||
d4.addErrback(self.fail)
|
||||
return d4
|
||||
|
||||
def test_kill_long_process(self):
|
||||
pool = process.ProcessPool(2)
|
||||
|
||||
d1 = pool.simpleExecute('sleep 1')
|
||||
d2 = pool.simpleExecute('sleep 0.005')
|
||||
|
||||
timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
|
||||
|
||||
# kill d1 and wait on it to end then cancel the timeout
|
||||
d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
|
||||
d2.addCallback(lambda _: d1)
|
||||
d2.addBoth(lambda _: timeout.active() and timeout.cancel())
|
||||
d2.addErrback(self.fail)
|
||||
return d2
|
||||
|
||||
def test_process_exit_is_contained(self):
|
||||
pool = process.ProcessPool(2)
|
||||
|
||||
d1 = pool.simpleExecute('sleep 1')
|
||||
d1.addCallback(lambda x: self.fail('should have errbacked'))
|
||||
d1.addErrback(lambda fail: fail.trap(IOError))
|
||||
reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
|
||||
|
||||
return d1
|
@ -50,6 +50,7 @@ from nova.tests.keeper_unittest import *
|
||||
from nova.tests.network_unittest import *
|
||||
from nova.tests.node_unittest import *
|
||||
from nova.tests.objectstore_unittest import *
|
||||
from nova.tests.process_unittest import *
|
||||
from nova.tests.storage_unittest import *
|
||||
from nova.tests.users_unittest import *
|
||||
from nova.tests.datastore_unittest import *
|
||||
|
Loading…
x
Reference in New Issue
Block a user