|
|
|
|
@@ -2,6 +2,7 @@
|
|
|
|
|
|
|
|
|
|
# Copyright 2010 United States Government as represented by the
|
|
|
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
|
|
|
# Copyright 2010 FathomDB Inc.
|
|
|
|
|
# All Rights Reserved.
|
|
|
|
|
#
|
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
|
|
|
@@ -20,17 +21,12 @@
|
|
|
|
|
Process pool, still buggy right now.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
import multiprocessing
|
|
|
|
|
import StringIO
|
|
|
|
|
|
|
|
|
|
from twisted.internet import defer
|
|
|
|
|
from twisted.internet import error
|
|
|
|
|
from twisted.internet import process
|
|
|
|
|
from twisted.internet import protocol
|
|
|
|
|
from twisted.internet import reactor
|
|
|
|
|
from twisted.internet import threads
|
|
|
|
|
from twisted.python import failure
|
|
|
|
|
|
|
|
|
|
from nova import flags
|
|
|
|
|
|
|
|
|
|
@@ -55,111 +51,100 @@ class UnexpectedErrorOutput(IOError):
|
|
|
|
|
IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# NOTE(termie): this too
|
|
|
|
|
class _BackRelay(protocol.ProcessProtocol):
|
|
|
|
|
# This is based on _BackRelay from twister.internal.utils, but modified to
|
|
|
|
|
# capture both stdout and stderr, without odd stderr handling, and also to
|
|
|
|
|
# handle stdin
|
|
|
|
|
class BackRelayWithInput(protocol.ProcessProtocol):
|
|
|
|
|
"""
|
|
|
|
|
Trivial protocol for communicating with a process and turning its output
|
|
|
|
|
into the result of a L{Deferred}.
|
|
|
|
|
|
|
|
|
|
@ivar deferred: A L{Deferred} which will be called back with all of stdout
|
|
|
|
|
and, if C{errortoo} is true, all of stderr as well (mixed together in
|
|
|
|
|
one string). If C{errortoo} is false and any bytes are received over
|
|
|
|
|
stderr, this will fire with an L{_UnexpectedErrorOutput} instance and
|
|
|
|
|
the attribute will be set to C{None}.
|
|
|
|
|
and all of stderr as well (as a tuple). C{terminate_on_stderr} is true
|
|
|
|
|
and any bytes are received over stderr, this will fire with an
|
|
|
|
|
L{_UnexpectedErrorOutput} instance and the attribute will be set to
|
|
|
|
|
C{None}.
|
|
|
|
|
|
|
|
|
|
@ivar onProcessEnded: If C{errortoo} is false and bytes are received over
|
|
|
|
|
stderr, this attribute will refer to a L{Deferred} which will be called
|
|
|
|
|
back when the process ends. This C{Deferred} is also associated with
|
|
|
|
|
the L{_UnexpectedErrorOutput} which C{deferred} fires with earlier in
|
|
|
|
|
this case so that users can determine when the process has actually
|
|
|
|
|
ended, in addition to knowing when bytes have been received via stderr.
|
|
|
|
|
@ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
|
|
|
|
|
received over stderr, this attribute will refer to a L{Deferred} which
|
|
|
|
|
will be called back when the process ends. This C{Deferred} is also
|
|
|
|
|
associated with the L{_UnexpectedErrorOutput} which C{deferred} fires
|
|
|
|
|
with earlier in this case so that users can determine when the process
|
|
|
|
|
has actually ended, in addition to knowing when bytes have been received
|
|
|
|
|
via stderr.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, deferred, errortoo=0):
|
|
|
|
|
def __init__(self, deferred, started_deferred=None,
|
|
|
|
|
terminate_on_stderr=False, check_exit_code=True,
|
|
|
|
|
process_input=None):
|
|
|
|
|
self.deferred = deferred
|
|
|
|
|
self.s = StringIO.StringIO()
|
|
|
|
|
if errortoo:
|
|
|
|
|
self.errReceived = self.errReceivedIsGood
|
|
|
|
|
else:
|
|
|
|
|
self.errReceived = self.errReceivedIsBad
|
|
|
|
|
|
|
|
|
|
def errReceivedIsBad(self, text):
|
|
|
|
|
if self.deferred is not None:
|
|
|
|
|
self.onProcessEnded = defer.Deferred()
|
|
|
|
|
err = UnexpectedErrorOutput(text, self.onProcessEnded)
|
|
|
|
|
self.deferred.errback(failure.Failure(err))
|
|
|
|
|
self.stdout = StringIO.StringIO()
|
|
|
|
|
self.stderr = StringIO.StringIO()
|
|
|
|
|
self.started_deferred = started_deferred
|
|
|
|
|
self.terminate_on_stderr = terminate_on_stderr
|
|
|
|
|
self.check_exit_code = check_exit_code
|
|
|
|
|
self.process_input = process_input
|
|
|
|
|
self.on_process_ended = None
|
|
|
|
|
|
|
|
|
|
def errReceived(self, text):
|
|
|
|
|
self.stderr.write(text)
|
|
|
|
|
if self.terminate_on_stderr and (self.deferred is not None):
|
|
|
|
|
self.on_process_ended = defer.Deferred()
|
|
|
|
|
self.deferred.errback(UnexpectedErrorOutput(
|
|
|
|
|
stdout=self.stdout.getvalue(),
|
|
|
|
|
stderr=self.stderr.getvalue()))
|
|
|
|
|
self.deferred = None
|
|
|
|
|
self.transport.loseConnection()
|
|
|
|
|
|
|
|
|
|
def errReceivedIsGood(self, text):
|
|
|
|
|
self.s.write(text)
|
|
|
|
|
|
|
|
|
|
def outReceived(self, text):
|
|
|
|
|
self.s.write(text)
|
|
|
|
|
self.stdout.write(text)
|
|
|
|
|
|
|
|
|
|
def processEnded(self, reason):
|
|
|
|
|
if self.deferred is not None:
|
|
|
|
|
self.deferred.callback(self.s.getvalue())
|
|
|
|
|
elif self.onProcessEnded is not None:
|
|
|
|
|
self.onProcessEnded.errback(reason)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BackRelayWithInput(_BackRelay):
|
|
|
|
|
def __init__(self, deferred, startedDeferred=None, error_ok=0,
|
|
|
|
|
input=None):
|
|
|
|
|
# Twisted doesn't use new-style classes in most places :(
|
|
|
|
|
_BackRelay.__init__(self, deferred, errortoo=error_ok)
|
|
|
|
|
self.error_ok = error_ok
|
|
|
|
|
self.input = input
|
|
|
|
|
self.stderr = StringIO.StringIO()
|
|
|
|
|
self.startedDeferred = startedDeferred
|
|
|
|
|
|
|
|
|
|
def errReceivedIsBad(self, text):
|
|
|
|
|
self.stderr.write(text)
|
|
|
|
|
self.transport.loseConnection()
|
|
|
|
|
|
|
|
|
|
def errReceivedIsGood(self, text):
|
|
|
|
|
self.stderr.write(text)
|
|
|
|
|
|
|
|
|
|
def connectionMade(self):
|
|
|
|
|
if self.startedDeferred:
|
|
|
|
|
self.startedDeferred.callback(self)
|
|
|
|
|
if self.input:
|
|
|
|
|
self.transport.write(self.input)
|
|
|
|
|
self.transport.closeStdin()
|
|
|
|
|
|
|
|
|
|
def processEnded(self, reason):
|
|
|
|
|
if self.deferred is not None:
|
|
|
|
|
stdout, stderr = self.s.getvalue(), self.stderr.getvalue()
|
|
|
|
|
stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
|
|
|
|
|
try:
|
|
|
|
|
# NOTE(termie): current behavior means if error_ok is True
|
|
|
|
|
# we won't throw an error even if the process
|
|
|
|
|
# exited with a non-0 status, so you can't be
|
|
|
|
|
# okay with stderr output and not with bad exit
|
|
|
|
|
# codes.
|
|
|
|
|
if not self.error_ok:
|
|
|
|
|
if self.check_exit_code:
|
|
|
|
|
reason.trap(error.ProcessDone)
|
|
|
|
|
self.deferred.callback((stdout, stderr))
|
|
|
|
|
except:
|
|
|
|
|
# NOTE(justinsb): This logic is a little suspicious to me...
|
|
|
|
|
# If the callback throws an exception, then errback will be
|
|
|
|
|
# called also. However, this is what the unit tests test for...
|
|
|
|
|
self.deferred.errback(UnexpectedErrorOutput(stdout, stderr))
|
|
|
|
|
elif self.on_process_ended is not None:
|
|
|
|
|
self.on_process_ended.errback(reason)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
|
|
|
|
|
error_ok=0, input=None, startedDeferred=None):
|
|
|
|
|
if reactor is None:
|
|
|
|
|
from twisted.internet import reactor
|
|
|
|
|
def connectionMade(self):
|
|
|
|
|
if self.started_deferred:
|
|
|
|
|
self.started_deferred.callback(self)
|
|
|
|
|
if self.process_input:
|
|
|
|
|
self.transport.write(self.process_input)
|
|
|
|
|
self.transport.closeStdin()
|
|
|
|
|
|
|
|
|
|
def get_process_output(executable, args=None, env=None, path=None,
|
|
|
|
|
process_reactor=None, check_exit_code=True,
|
|
|
|
|
process_input=None, started_deferred=None,
|
|
|
|
|
terminate_on_stderr=False):
|
|
|
|
|
if process_reactor is None:
|
|
|
|
|
process_reactor = reactor
|
|
|
|
|
args = args and args or ()
|
|
|
|
|
env = env and env and {}
|
|
|
|
|
d = defer.Deferred()
|
|
|
|
|
p = BackRelayWithInput(
|
|
|
|
|
d, startedDeferred=startedDeferred, error_ok=error_ok, input=input)
|
|
|
|
|
deferred = defer.Deferred()
|
|
|
|
|
process_handler = BackRelayWithInput(
|
|
|
|
|
deferred,
|
|
|
|
|
started_deferred=started_deferred,
|
|
|
|
|
check_exit_code=check_exit_code,
|
|
|
|
|
process_input=process_input,
|
|
|
|
|
terminate_on_stderr=terminate_on_stderr)
|
|
|
|
|
# NOTE(vish): commands come in as unicode, but self.executes needs
|
|
|
|
|
# strings or process.spawn raises a deprecation warning
|
|
|
|
|
executable = str(executable)
|
|
|
|
|
if not args is None:
|
|
|
|
|
args = [str(x) for x in args]
|
|
|
|
|
reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path)
|
|
|
|
|
return d
|
|
|
|
|
process_reactor.spawnProcess( process_handler, executable,
|
|
|
|
|
(executable,)+tuple(args), env, path)
|
|
|
|
|
return deferred
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ProcessPool(object):
|
|
|
|
|
@@ -185,26 +170,26 @@ class ProcessPool(object):
|
|
|
|
|
return self.execute(executable, args, **kw)
|
|
|
|
|
|
|
|
|
|
def execute(self, *args, **kw):
|
|
|
|
|
d = self._pool.acquire()
|
|
|
|
|
deferred = self._pool.acquire()
|
|
|
|
|
|
|
|
|
|
def _associateProcess(proto):
|
|
|
|
|
d.process = proto.transport
|
|
|
|
|
def _associate_process(proto):
|
|
|
|
|
deferred.process = proto.transport
|
|
|
|
|
return proto.transport
|
|
|
|
|
|
|
|
|
|
started = defer.Deferred()
|
|
|
|
|
started.addCallback(_associateProcess)
|
|
|
|
|
kw.setdefault('startedDeferred', started)
|
|
|
|
|
started.addCallback(_associate_process)
|
|
|
|
|
kw.setdefault('started_deferred', started)
|
|
|
|
|
|
|
|
|
|
d.process = None
|
|
|
|
|
d.started = started
|
|
|
|
|
deferred.process = None
|
|
|
|
|
deferred.started = started
|
|
|
|
|
|
|
|
|
|
d.addCallback(lambda _: getProcessOutput(*args, **kw))
|
|
|
|
|
d.addBoth(self._release)
|
|
|
|
|
return d
|
|
|
|
|
deferred.addCallback(lambda _: get_process_output(*args, **kw))
|
|
|
|
|
deferred.addBoth(self._release)
|
|
|
|
|
return deferred
|
|
|
|
|
|
|
|
|
|
def _release(self, rv=None):
|
|
|
|
|
def _release(self, retval=None):
|
|
|
|
|
self._pool.release()
|
|
|
|
|
return rv
|
|
|
|
|
return retval
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SharedPool(object):
|
|
|
|
|
|