Adds a Twisted implementation of a process pool
Meant for use instead of utils.execute()
This commit is contained in:
		
							
								
								
									
										113
									
								
								nova/process.py
									
									
									
									
									
								
							
							
						
						
									
										113
									
								
								nova/process.py
									
									
									
									
									
								
							@@ -19,15 +19,41 @@ Process pool, still buggy right now.
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
import multiprocessing
 | 
			
		||||
import StringIO
 | 
			
		||||
 | 
			
		||||
from nova import vendor
 | 
			
		||||
from twisted.internet import defer
 | 
			
		||||
from twisted.internet import reactor
 | 
			
		||||
from twisted.internet import error
 | 
			
		||||
from twisted.internet import process
 | 
			
		||||
from twisted.internet import protocol
 | 
			
		||||
from twisted.internet import reactor
 | 
			
		||||
from twisted.internet import threads
 | 
			
		||||
from twisted.python import failure
 | 
			
		||||
 | 
			
		||||
from nova import flags
 | 
			
		||||
 | 
			
		||||
FLAGS = flags.FLAGS
 | 
			
		||||
flags.DEFINE_integer('process_pool_size', 4,
 | 
			
		||||
                     'Number of processes to use in the process pool')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# NOTE(termie): this is copied from twisted.internet.utils but since
 | 
			
		||||
#               they don't export it I've copied.
 | 
			
		||||
#               they don't export it I've copied and modified
 | 
			
		||||
class UnexpectedErrorOutput(IOError):
 | 
			
		||||
    """
 | 
			
		||||
    Standard error data was received where it was not expected.  This is a
 | 
			
		||||
    subclass of L{IOError} to preserve backward compatibility with the previous
 | 
			
		||||
    error behavior of L{getProcessOutput}.
 | 
			
		||||
 | 
			
		||||
    @ivar processEnded: A L{Deferred} which will fire when the process which
 | 
			
		||||
        produced the data on stderr has ended (exited and all file descriptors
 | 
			
		||||
        closed).
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, stdout=None, stderr=None):
 | 
			
		||||
        IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# NOTE(termie): this too
 | 
			
		||||
class _BackRelay(protocol.ProcessProtocol):
 | 
			
		||||
    """
 | 
			
		||||
    Trivial protocol for communicating with a process and turning its output
 | 
			
		||||
@@ -77,28 +103,103 @@ class _BackRelay(protocol.ProcessProtocol):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BackRelayWithInput(_BackRelay):
 | 
			
		||||
    def __init__(self, deferred, errortoo=0, input=None):
 | 
			
		||||
        super(BackRelayWithInput, self).__init__(deferred, errortoo)
 | 
			
		||||
    def __init__(self, deferred, startedDeferred=None, error_ok=0,
 | 
			
		||||
                 input=None):
 | 
			
		||||
        # Twisted doesn't use new-style classes in most places :(
 | 
			
		||||
        _BackRelay.__init__(self, deferred, errortoo=error_ok)
 | 
			
		||||
        self.error_ok = error_ok
 | 
			
		||||
        self.input = input
 | 
			
		||||
        self.stderr = StringIO.StringIO()
 | 
			
		||||
        self.startedDeferred = startedDeferred
 | 
			
		||||
 | 
			
		||||
    def errReceivedIsBad(self, text):
 | 
			
		||||
        self.stderr.write(text)
 | 
			
		||||
        self.transport.loseConnection()
 | 
			
		||||
 | 
			
		||||
    def errReceivedIsGood(self, text):
 | 
			
		||||
        self.stderr.write(text)
 | 
			
		||||
    
 | 
			
		||||
    def connectionMade(self):
 | 
			
		||||
        if self.startedDeferred:
 | 
			
		||||
            self.startedDeferred.callback(self)
 | 
			
		||||
        if self.input:
 | 
			
		||||
            self.transport.write(self.input)
 | 
			
		||||
        self.transport.closeStdin()
 | 
			
		||||
 | 
			
		||||
    def processEnded(self, reason):
 | 
			
		||||
        if self.deferred is not None:
 | 
			
		||||
            stdout, stderr = self.s.getvalue(), self.stderr.getvalue()
 | 
			
		||||
            try:
 | 
			
		||||
                # NOTE(termie): current behavior means if error_ok is True
 | 
			
		||||
                #               we won't throw an error even if the process
 | 
			
		||||
                #               exited with a non-0 status, so you can't be
 | 
			
		||||
                #               okay with stderr output and not with bad exit
 | 
			
		||||
                #               codes.
 | 
			
		||||
                if not self.error_ok:
 | 
			
		||||
                    reason.trap(error.ProcessDone)
 | 
			
		||||
                self.deferred.callback((stdout, stderr))
 | 
			
		||||
            except:
 | 
			
		||||
                self.deferred.errback(UnexpectedErrorOutput(stdout, stderr))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
 | 
			
		||||
                     errortoo=0, input=None):
 | 
			
		||||
                     error_ok=0, input=None, startedDeferred=None):
 | 
			
		||||
    if reactor is None:
 | 
			
		||||
        from twisted.internet import reactor
 | 
			
		||||
    args = args and args or ()
 | 
			
		||||
    env = env and env and {}
 | 
			
		||||
    d = defer.Deferred()
 | 
			
		||||
    p = BackRelayWithInput(d, errortoo=errortoo, input=input)
 | 
			
		||||
    p = BackRelayWithInput(
 | 
			
		||||
            d, startedDeferred=startedDeferred, error_ok=error_ok, input=input)
 | 
			
		||||
    reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path)
 | 
			
		||||
    return d
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ProcessPool(object):
 | 
			
		||||
    """ A simple process pool implementation using Twisted's Process bits.
 | 
			
		||||
 | 
			
		||||
    This is pretty basic right now, but hopefully the API will be the correct
 | 
			
		||||
    one so that it can be optimized later.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, size=None):
 | 
			
		||||
        self.size = size and size or FLAGS.process_pool_size
 | 
			
		||||
        self._pool = defer.DeferredSemaphore(self.size)
 | 
			
		||||
 | 
			
		||||
    def simpleExecute(self, cmd, **kw):
 | 
			
		||||
        """ Weak emulation of the old utils.execute() function.
 | 
			
		||||
        
 | 
			
		||||
        This only exists as a way to quickly move old execute methods to
 | 
			
		||||
        this new style of code.
 | 
			
		||||
 | 
			
		||||
        NOTE(termie): This will break on args with spaces in them.
 | 
			
		||||
        """
 | 
			
		||||
        parsed = cmd.split(' ')
 | 
			
		||||
        executable, args = parsed[0], parsed[1:]
 | 
			
		||||
        return self.execute(executable, args, **kw)
 | 
			
		||||
 | 
			
		||||
    def execute(self, *args, **kw):
 | 
			
		||||
        d = self._pool.acquire()
 | 
			
		||||
 | 
			
		||||
        def _associateProcess(proto):
 | 
			
		||||
            d.process = proto.transport
 | 
			
		||||
            return proto.transport
 | 
			
		||||
 | 
			
		||||
        started = defer.Deferred()
 | 
			
		||||
        started.addCallback(_associateProcess)
 | 
			
		||||
        kw.setdefault('startedDeferred', started)
 | 
			
		||||
 | 
			
		||||
        d.process = None
 | 
			
		||||
        d.started = started
 | 
			
		||||
        
 | 
			
		||||
        d.addCallback(lambda _: getProcessOutput(*args, **kw))
 | 
			
		||||
        d.addBoth(self._release)
 | 
			
		||||
        return d
 | 
			
		||||
 | 
			
		||||
    def _release(self, rv=None):
 | 
			
		||||
        self._pool.release()
 | 
			
		||||
        return rv
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Pool(object):
 | 
			
		||||
    """ A simple process pool implementation around mutliprocessing.
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										115
									
								
								nova/tests/process_unittest.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										115
									
								
								nova/tests/process_unittest.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,115 @@
 | 
			
		||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 | 
			
		||||
# Copyright [2010] [Anso Labs, LLC]
 | 
			
		||||
#
 | 
			
		||||
#    Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
#    you may not use this file except in compliance with the License.
 | 
			
		||||
#    You may obtain a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#        http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
#    Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
#    distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
#    See the License for the specific language governing permissions and
 | 
			
		||||
#    limitations under the License.
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
from xml.etree import ElementTree
 | 
			
		||||
 | 
			
		||||
from nova import vendor
 | 
			
		||||
from twisted.internet import defer
 | 
			
		||||
from twisted.internet import reactor
 | 
			
		||||
 | 
			
		||||
from nova import exception
 | 
			
		||||
from nova import flags
 | 
			
		||||
from nova import process
 | 
			
		||||
from nova import test
 | 
			
		||||
from nova import utils
 | 
			
		||||
 | 
			
		||||
FLAGS = flags.FLAGS
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ProcessTestCase(test.TrialTestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        logging.getLogger().setLevel(logging.DEBUG)
 | 
			
		||||
        super(ProcessTestCase, self).setUp()
 | 
			
		||||
 | 
			
		||||
    def test_execute_stdout(self):
 | 
			
		||||
        pool = process.ProcessPool(2)
 | 
			
		||||
        d = pool.simpleExecute('echo test')
 | 
			
		||||
        def _check(rv):
 | 
			
		||||
            self.assertEqual(rv[0], 'test\n')
 | 
			
		||||
            self.assertEqual(rv[1], '')
 | 
			
		||||
 | 
			
		||||
        d.addCallback(_check)
 | 
			
		||||
        d.addErrback(self.fail)
 | 
			
		||||
        return d
 | 
			
		||||
 | 
			
		||||
    def test_execute_stderr(self):
 | 
			
		||||
        pool = process.ProcessPool(2)
 | 
			
		||||
        d = pool.simpleExecute('cat BAD_FILE', error_ok=1)
 | 
			
		||||
        def _check(rv):
 | 
			
		||||
            self.assertEqual(rv[0], '')
 | 
			
		||||
            self.assert_('No such file' in rv[1])
 | 
			
		||||
        
 | 
			
		||||
        d.addCallback(_check)
 | 
			
		||||
        d.addErrback(self.fail)
 | 
			
		||||
        return d
 | 
			
		||||
 | 
			
		||||
    def test_execute_unexpected_stderr(self):
 | 
			
		||||
        pool = process.ProcessPool(2)
 | 
			
		||||
        d = pool.simpleExecute('cat BAD_FILE')
 | 
			
		||||
        d.addCallback(lambda x: self.fail('should have raised an error'))
 | 
			
		||||
        d.addErrback(lambda failure: failure.trap(IOError))
 | 
			
		||||
        return d
 | 
			
		||||
    
 | 
			
		||||
    def test_max_processes(self):
 | 
			
		||||
        pool = process.ProcessPool(2)
 | 
			
		||||
        d1 = pool.simpleExecute('sleep 0.01')
 | 
			
		||||
        d2 = pool.simpleExecute('sleep 0.01')
 | 
			
		||||
        d3 = pool.simpleExecute('sleep 0.005')
 | 
			
		||||
        d4 = pool.simpleExecute('sleep 0.005')
 | 
			
		||||
 | 
			
		||||
        called = []
 | 
			
		||||
        def _called(rv, name):
 | 
			
		||||
            called.append(name)
 | 
			
		||||
    
 | 
			
		||||
        d1.addCallback(_called, 'd1')
 | 
			
		||||
        d2.addCallback(_called, 'd2')
 | 
			
		||||
        d3.addCallback(_called, 'd3')
 | 
			
		||||
        d4.addCallback(_called, 'd4')
 | 
			
		||||
        
 | 
			
		||||
        # Make sure that d3 and d4 had to wait on the other two and were called
 | 
			
		||||
        # in order
 | 
			
		||||
        # NOTE(termie): there may be a race condition in this test if for some
 | 
			
		||||
        #               reason one of the sleeps takes longer to complete
 | 
			
		||||
        #               than it should
 | 
			
		||||
        d4.addCallback(lambda x: self.assertEqual(called[2], 'd3'))
 | 
			
		||||
        d4.addCallback(lambda x: self.assertEqual(called[3], 'd4'))
 | 
			
		||||
        d4.addErrback(self.fail)
 | 
			
		||||
        return d4
 | 
			
		||||
 | 
			
		||||
    def test_kill_long_process(self):
 | 
			
		||||
        pool = process.ProcessPool(2)
 | 
			
		||||
        
 | 
			
		||||
        d1 = pool.simpleExecute('sleep 1')
 | 
			
		||||
        d2 = pool.simpleExecute('sleep 0.005')
 | 
			
		||||
 | 
			
		||||
        timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
 | 
			
		||||
         
 | 
			
		||||
        # kill d1 and wait on it to end then cancel the timeout
 | 
			
		||||
        d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
 | 
			
		||||
        d2.addCallback(lambda _: d1)
 | 
			
		||||
        d2.addBoth(lambda _: timeout.active() and timeout.cancel())
 | 
			
		||||
        d2.addErrback(self.fail)
 | 
			
		||||
        return d2
 | 
			
		||||
        
 | 
			
		||||
    def test_process_exit_is_contained(self):
 | 
			
		||||
        pool = process.ProcessPool(2)
 | 
			
		||||
        
 | 
			
		||||
        d1 = pool.simpleExecute('sleep 1')
 | 
			
		||||
        d1.addCallback(lambda x: self.fail('should have errbacked'))
 | 
			
		||||
        d1.addErrback(lambda fail: fail.trap(IOError))
 | 
			
		||||
        reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
 | 
			
		||||
        
 | 
			
		||||
        return d1
 | 
			
		||||
@@ -50,6 +50,7 @@ from nova.tests.keeper_unittest import *
 | 
			
		||||
from nova.tests.network_unittest import *
 | 
			
		||||
from nova.tests.node_unittest import *
 | 
			
		||||
from nova.tests.objectstore_unittest import *
 | 
			
		||||
from nova.tests.process_unittest import *
 | 
			
		||||
from nova.tests.storage_unittest import *
 | 
			
		||||
from nova.tests.users_unittest import *
 | 
			
		||||
from nova.tests.datastore_unittest import *
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user