210 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			210 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # vim: tabstop=4 shiftwidth=4 softtabstop=4
 | |
| 
 | |
| # Copyright 2010 United States Government as represented by the
 | |
| # Administrator of the National Aeronautics and Space Administration.
 | |
| # Copyright 2010 FathomDB Inc.
 | |
| # All Rights Reserved.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| """
 | |
| Process pool using twisted threading
 | |
| """
 | |
| 
 | |
| import logging
 | |
| import StringIO
 | |
| 
 | |
| from twisted.internet import defer
 | |
| from twisted.internet import error
 | |
| from twisted.internet import protocol
 | |
| from twisted.internet import reactor
 | |
| 
 | |
| from nova import flags
 | |
| from nova.exception import ProcessExecutionError
 | |
| 
 | |
| FLAGS = flags.FLAGS
 | |
| flags.DEFINE_integer('process_pool_size', 4,
 | |
|                      'Number of processes to use in the process pool')
 | |
| 
 | |
| 
 | |
| # This is based on _BackRelay from twister.internal.utils, but modified to
 | |
| #  capture both stdout and stderr, without odd stderr handling, and also to
 | |
| #  handle stdin
 | |
| class BackRelayWithInput(protocol.ProcessProtocol):
 | |
|     """
 | |
|     Trivial protocol for communicating with a process and turning its output
 | |
|     into the result of a L{Deferred}.
 | |
| 
 | |
|     @ivar deferred: A L{Deferred} which will be called back with all of stdout
 | |
|         and all of stderr as well (as a tuple).  C{terminate_on_stderr} is true
 | |
|         and any bytes are received over stderr, this will fire with an
 | |
|         L{_ProcessExecutionError} instance and the attribute will be set to
 | |
|         C{None}.
 | |
| 
 | |
|     @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
 | |
|         received over stderr, this attribute will refer to a L{Deferred} which
 | |
|         will be called back when the process ends.  This C{Deferred} is also
 | |
|         associated with the L{_ProcessExecutionError} which C{deferred} fires
 | |
|         with earlier in this case so that users can determine when the process
 | |
|         has actually ended, in addition to knowing when bytes have been
 | |
|         received via stderr.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, deferred, cmd, started_deferred=None,
 | |
|                  terminate_on_stderr=False, check_exit_code=True,
 | |
|                  process_input=None):
 | |
|         self.deferred = deferred
 | |
|         self.cmd = cmd
 | |
|         self.stdout = StringIO.StringIO()
 | |
|         self.stderr = StringIO.StringIO()
 | |
|         self.started_deferred = started_deferred
 | |
|         self.terminate_on_stderr = terminate_on_stderr
 | |
|         self.check_exit_code = check_exit_code
 | |
|         self.process_input = process_input
 | |
|         self.on_process_ended = None
 | |
| 
 | |
|     def _build_execution_error(self, exit_code=None):
 | |
|         return ProcessExecutionError(cmd=self.cmd,
 | |
|                                      exit_code=exit_code,
 | |
|                                      stdout=self.stdout.getvalue(),
 | |
|                                      stderr=self.stderr.getvalue())
 | |
| 
 | |
|     def errReceived(self, text):
 | |
|         self.stderr.write(text)
 | |
|         if self.terminate_on_stderr and (self.deferred is not None):
 | |
|             self.on_process_ended = defer.Deferred()
 | |
|             self.deferred.errback(self._build_execution_error())
 | |
|             self.deferred = None
 | |
|             self.transport.loseConnection()
 | |
| 
 | |
|     def outReceived(self, text):
 | |
|         self.stdout.write(text)
 | |
| 
 | |
|     def processEnded(self, reason):
 | |
|         if self.deferred is not None:
 | |
|             stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
 | |
|             exit_code = reason.value.exitCode
 | |
|             if self.check_exit_code and exit_code != 0:
 | |
|                 self.deferred.errback(self._build_execution_error(exit_code))
 | |
|             else:
 | |
|                 try:
 | |
|                     if self.check_exit_code:
 | |
|                         reason.trap(error.ProcessDone)
 | |
|                     self.deferred.callback((stdout, stderr))
 | |
|                 except:
 | |
|                     # NOTE(justinsb): This logic is a little suspicious to me.
 | |
|                     # If the callback throws an exception, then errback will
 | |
|                     # be called also. However, this is what the unit tests
 | |
|                     # test for.
 | |
|                     exec_error = self._build_execution_error(exit_code)
 | |
|                     self.deferred.errback(exec_error)
 | |
|         elif self.on_process_ended is not None:
 | |
|             self.on_process_ended.errback(reason)
 | |
| 
 | |
|     def connectionMade(self):
 | |
|         if self.started_deferred:
 | |
|             self.started_deferred.callback(self)
 | |
|         if self.process_input:
 | |
|             self.transport.write(str(self.process_input))
 | |
|         self.transport.closeStdin()
 | |
| 
 | |
| 
 | |
| def get_process_output(executable, args=None, env=None, path=None,
 | |
|                        process_reactor=None, check_exit_code=True,
 | |
|                        process_input=None, started_deferred=None,
 | |
|                        terminate_on_stderr=False):
 | |
|     if process_reactor is None:
 | |
|         process_reactor = reactor
 | |
|     args = args and args or ()
 | |
|     env = env and env and {}
 | |
|     deferred = defer.Deferred()
 | |
|     cmd = executable
 | |
|     if args:
 | |
|         cmd = " ".join([cmd] + args)
 | |
|     logging.debug("Running cmd: %s", cmd)
 | |
|     process_handler = BackRelayWithInput(
 | |
|             deferred,
 | |
|             cmd,
 | |
|             started_deferred=started_deferred,
 | |
|             check_exit_code=check_exit_code,
 | |
|             process_input=process_input,
 | |
|             terminate_on_stderr=terminate_on_stderr)
 | |
|     # NOTE(vish): commands come in as unicode, but self.executes needs
 | |
|     #             strings or process.spawn raises a deprecation warning
 | |
|     executable = str(executable)
 | |
|     if not args is None:
 | |
|         args = [str(x) for x in args]
 | |
|     process_reactor.spawnProcess(process_handler, executable,
 | |
|                                  (executable,) + tuple(args), env, path)
 | |
|     return deferred
 | |
| 
 | |
| 
 | |
| class ProcessPool(object):
 | |
|     """ A simple process pool implementation using Twisted's Process bits.
 | |
| 
 | |
|     This is pretty basic right now, but hopefully the API will be the correct
 | |
|     one so that it can be optimized later.
 | |
|     """
 | |
|     def __init__(self, size=None):
 | |
|         self.size = size and size or FLAGS.process_pool_size
 | |
|         self._pool = defer.DeferredSemaphore(self.size)
 | |
| 
 | |
|     def simple_execute(self, cmd, **kw):
 | |
|         """ Weak emulation of the old utils.execute() function.
 | |
| 
 | |
|         This only exists as a way to quickly move old execute methods to
 | |
|         this new style of code.
 | |
| 
 | |
|         NOTE(termie): This will break on args with spaces in them.
 | |
|         """
 | |
|         parsed = cmd.split(' ')
 | |
|         executable, args = parsed[0], parsed[1:]
 | |
|         return self.execute(executable, args, **kw)
 | |
| 
 | |
|     def execute(self, *args, **kw):
 | |
|         deferred = self._pool.acquire()
 | |
| 
 | |
|         def _associate_process(proto):
 | |
|             deferred.process = proto.transport
 | |
|             return proto.transport
 | |
| 
 | |
|         started = defer.Deferred()
 | |
|         started.addCallback(_associate_process)
 | |
|         kw.setdefault('started_deferred', started)
 | |
| 
 | |
|         deferred.process = None
 | |
|         deferred.started = started
 | |
| 
 | |
|         deferred.addCallback(lambda _: get_process_output(*args, **kw))
 | |
|         deferred.addBoth(self._release)
 | |
|         return deferred
 | |
| 
 | |
|     def _release(self, retval=None):
 | |
|         self._pool.release()
 | |
|         return retval
 | |
| 
 | |
| 
 | |
| class SharedPool(object):
 | |
|     _instance = None
 | |
| 
 | |
|     def __init__(self):
 | |
|         if SharedPool._instance is None:
 | |
|             self.__class__._instance = ProcessPool()
 | |
| 
 | |
|     def __getattr__(self, key):
 | |
|         return getattr(self._instance, key)
 | |
| 
 | |
| 
 | |
| def simple_execute(cmd, **kwargs):
 | |
|     return SharedPool().simple_execute(cmd, **kwargs)
 | 
