Removed SIGCHLD handler from processes because it breaks popen. http://mail.python.org/pipermail/python-dev/2004-November/049987.html Replaced the signal handler functionality with an equivalent that we've been running over here for a while.
This commit is contained in:
@@ -22,6 +22,7 @@ import os
|
|||||||
import popen2
|
import popen2
|
||||||
import signal
|
import signal
|
||||||
|
|
||||||
|
from eventlet import api
|
||||||
from eventlet import coros
|
from eventlet import coros
|
||||||
from eventlet import pools
|
from eventlet import pools
|
||||||
from eventlet import greenio
|
from eventlet import greenio
|
||||||
@@ -30,58 +31,31 @@ from eventlet import greenio
|
|||||||
class DeadProcess(RuntimeError):
|
class DeadProcess(RuntimeError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def cooperative_wait(pobj, check_interval=0.01):
|
||||||
|
""" Waits for a child process to exit, returning the status
|
||||||
|
code.
|
||||||
|
|
||||||
CHILD_POBJS = []
|
Unlike os.wait, cooperative_wait does not block the entire
|
||||||
|
process, only the calling coroutine. If the child process does
|
||||||
|
not die, cooperative_wait could wait forever.
|
||||||
|
|
||||||
CHILD_EVENTS = {}
|
The argument check_interval is the amount of time, in seconds,
|
||||||
|
that cooperative_wait will sleep between calls to os.waitpid.
|
||||||
|
|
||||||
def wait_on_children():
|
|
||||||
global CHILD_POBJS
|
|
||||||
unclosed_pobjs = []
|
|
||||||
for child_pobj in CHILD_POBJS:
|
|
||||||
try:
|
|
||||||
# We have to use poll() rather than os.waitpid because
|
|
||||||
# otherwise the popen2 module leaks fds; hat tip to Brian
|
|
||||||
# Brunswick
|
|
||||||
code = child_pobj.poll()
|
|
||||||
if code == -1:
|
|
||||||
unclosed_pobjs.append(child_pobj)
|
|
||||||
continue ## Wasn't this one that died
|
|
||||||
|
|
||||||
event = CHILD_EVENTS.pop(child_pobj, None)
|
|
||||||
if event:
|
|
||||||
event.send(code)
|
|
||||||
except OSError, e:
|
|
||||||
if e[0] == errno.ECHILD:
|
|
||||||
print "already dead"
|
|
||||||
# Already dead; signal, but assume success
|
|
||||||
event = CHILD_EVENTS.pop(child_pobj, None)
|
|
||||||
event.send(0)
|
|
||||||
else:
|
|
||||||
print "raising"
|
|
||||||
raise e
|
|
||||||
|
|
||||||
CHILD_POBJS = unclosed_pobjs
|
|
||||||
|
|
||||||
|
|
||||||
def sig_child(signal, frame):
|
|
||||||
from eventlet import api
|
|
||||||
api.call_after_global(0, wait_on_children)
|
|
||||||
try:
|
|
||||||
signal.signal(signal.SIGCHLD, sig_child)
|
|
||||||
except AttributeError:
|
|
||||||
pass # Windows
|
|
||||||
|
|
||||||
def _add_child_pobj(pobj):
|
|
||||||
"""Add the given popen4 object to the list of child
|
|
||||||
processes we are tracking. Return an event object
|
|
||||||
that can be used to get the process' exit code.
|
|
||||||
"""
|
"""
|
||||||
CHILD_POBJS.append(pobj)
|
try:
|
||||||
event = coros.event()
|
while True:
|
||||||
CHILD_EVENTS[pobj] = event
|
status = pobj.poll()
|
||||||
return event
|
if status >= 0:
|
||||||
|
return status
|
||||||
|
api.sleep(check_interval)
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno == errno.ECHILD:
|
||||||
|
# no child process, this happens if the child process
|
||||||
|
# already died and has been cleaned up, or if you just
|
||||||
|
# called with a random pid value
|
||||||
|
return -1
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
class Process(object):
|
class Process(object):
|
||||||
@@ -101,7 +75,6 @@ class Process(object):
|
|||||||
|
|
||||||
## We use popen4 so that read() will read from either stdout or stderr
|
## We use popen4 so that read() will read from either stdout or stderr
|
||||||
self.popen4 = popen2.Popen4([self.command] + self.args)
|
self.popen4 = popen2.Popen4([self.command] + self.args)
|
||||||
self.event = _add_child_pobj(self.popen4)
|
|
||||||
child_stdout_stderr = self.popen4.fromchild
|
child_stdout_stderr = self.popen4.fromchild
|
||||||
child_stdin = self.popen4.tochild
|
child_stdin = self.popen4.tochild
|
||||||
greenio.set_nonblocking(child_stdout_stderr)
|
greenio.set_nonblocking(child_stdout_stderr)
|
||||||
@@ -116,7 +89,11 @@ class Process(object):
|
|||||||
self.recv = self.child_stdout_stderr.read
|
self.recv = self.child_stdout_stderr.read
|
||||||
self.readline = self.child_stdout_stderr.readline
|
self.readline = self.child_stdout_stderr.readline
|
||||||
|
|
||||||
|
def wait(self):
|
||||||
|
return cooperative_wait(self.popen4)
|
||||||
|
|
||||||
def dead_callback(self):
|
def dead_callback(self):
|
||||||
|
self.wait()
|
||||||
self.dead = True
|
self.dead = True
|
||||||
if self._dead_callback:
|
if self._dead_callback:
|
||||||
self._dead_callback()
|
self._dead_callback()
|
||||||
@@ -162,14 +139,12 @@ class Process(object):
|
|||||||
def kill(self, sig=None):
|
def kill(self, sig=None):
|
||||||
if sig == None:
|
if sig == None:
|
||||||
sig = signal.SIGTERM
|
sig = signal.SIGTERM
|
||||||
os.kill(self.popen4.pid, sig)
|
pid = self.getpid()
|
||||||
|
os.kill(pid, sig)
|
||||||
|
|
||||||
def getpid(self):
|
def getpid(self):
|
||||||
return self.popen4.pid
|
return self.popen4.pid
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
return self.event.wait()
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessPool(pools.Pool):
|
class ProcessPool(pools.Pool):
|
||||||
def __init__(self, command, args=None, min_size=0, max_size=4):
|
def __init__(self, command, args=None, min_size=0, max_size=4):
|
||||||
|
Reference in New Issue
Block a user