diff --git a/eventlet/greenio.py b/eventlet/greenio.py index d8a07d1..2817ce6 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -22,11 +22,15 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from eventlet.api import trampoline, get_hub -from eventlet import util BUFFER_SIZE = 4096 -import socket, errno +import errno +import os +import socket +import fcntl + + from errno import EWOULDBLOCK, EAGAIN @@ -89,7 +93,7 @@ def socket_send(descriptor, data): try: return descriptor.send(data) except socket.error, e: - if e[0] == errno.EWOULDBLOCK: + if e[0] == errno.EWOULDBLOCK or e[0] == errno.ENOTCONN: return 0 raise except SSL.WantWriteError: @@ -145,26 +149,38 @@ def file_send(fd, data): except socket.error, e: if e[0] == errno.EPIPE: written = 0 - + + +def set_nonblocking(fd): + ## Socket + if hasattr(fd, 'setblocking'): + fd.setblocking(0) + ## File + else: + fileno = fd.fileno() + flags = fcntl.fcntl(fileno, fcntl.F_GETFL) + fcntl.fcntl(fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK) + class GreenSocket(object): is_secure = False def __init__(self, fd): + set_nonblocking(fd) self.fd = fd self._fileno = fd.fileno() self.sendcount = 0 self.recvcount = 0 self.recvbuffer = '' self.closed = False - + def accept(self): fd = self.fd while True: res = socket_accept(fd) if res is not None: client, addr = res - util.set_nonblocking(client) + set_nonblocking(client) return type(self)(client), addr trampoline(fd, read=True, write=True) @@ -197,7 +213,7 @@ class GreenSocket(object): def dup(self, *args, **kw): sock = self.fd.dup(*args, **kw) - util.set_nonblocking(sock) + set_nonblocking(sock) return type(self)(sock) def fileno(self, *args, **kw): @@ -264,16 +280,18 @@ class GreenSocket(object): def shutdown(self, *args, **kw): fn = self.shutdown = self.fd.shutdown return fn(*args, **kw) - + + class GreenFile(object): newlines = '\r\n' mode = 'wb+' def __init__(self, fd): + set_nonblocking(fd) self.sock = fd self.closed = False - + def close(self): self.sock.close() self.closed = True @@ -386,6 +404,7 @@ class GreenPipeSocket(GreenSocket): class GreenPipe(GreenFile): def __init__(self, fd): + set_nonblocking(fd) self.fd = GreenPipeSocket(fd) super(GreenPipe, self).__init__(self.fd) @@ -399,3 +418,5 @@ class GreenPipe(GreenFile): def flush(self): self.fd.fd.flush() + + diff --git a/eventlet/processes.py b/eventlet/processes.py index 35c7dd1..f00b2ae 100644 --- a/eventlet/processes.py +++ b/eventlet/processes.py @@ -21,19 +21,63 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ - +import errno import os import popen2 import signal +import sys - -from eventlet import util, pools +from eventlet import coros +from eventlet import pools from eventlet import greenio + class DeadProcess(RuntimeError): pass +CHILD_PIDS = [] + +CHILD_EVENTS = {} + + +def sig_child(signal, frame): + for child_pid in CHILD_PIDS: + try: + pid, code = os.waitpid(child_pid, os.WNOHANG) + if not pid: + continue ## Wasn't this one that died + elif pid == -1: + print >> sys.stderr, "Got -1! Why didn't python raise?" + elif pid != child_pid: + print >> sys.stderr, "pid (%d) != child_pid (%d)" % (pid, child_pid) + + # Defensively assume we could get a different pid back + if CHILD_EVENTS.get(pid): + event = CHILD_EVENTS.pop(pid) + event.send(code) + + except OSError, e: + if e[0] != errno.ECHILD: + raise e + elif CHILD_EVENTS.get(child_pid): + # Already dead; signal, but assume success + event = CHILD_EVENTS.pop(child_pid) + event.send(0) +signal.signal(signal.SIGCHLD, sig_child) + + +def _add_child_pid(pid): + """Add the given integer 'pid' to the list of child + process ids we are tracking. Return an event object + that can be used to get the process' exit code. + """ + CHILD_PIDS.append(pid) + event = coros.event() + CHILD_EVENTS[pid] = event + return event + + class Process(object): process_number = 0 def __init__(self, command, args, dead_callback=lambda:None): @@ -51,10 +95,11 @@ class Process(object): ## We use popen4 so that read() will read from either stdout or stderr self.popen4 = popen2.Popen4([self.command] + self.args) + self.event = _add_child_pid(self.popen4.pid) child_stdout_stderr = self.popen4.fromchild child_stdin = self.popen4.tochild - util.set_nonblocking(child_stdout_stderr) - util.set_nonblocking(child_stdin) + greenio.set_nonblocking(child_stdout_stderr) + greenio.set_nonblocking(child_stdin) self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr) self.child_stdout_stderr.newlines = '\n' # the default is \r\n, which aren't sent over pipes self.child_stdin = greenio.GreenPipe(child_stdin) @@ -116,6 +161,9 @@ class Process(object): def getpid(self): return self.popen4.pid + def wait(self): + return self.event.wait() + class ProcessPool(pools.Pool): def __init__(self, command, args=None, min_size=0, max_size=4): diff --git a/eventlet/util.py b/eventlet/util.py index 237aa95..d9a63b1 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -24,7 +24,6 @@ THE SOFTWARE. """ import os -import fcntl import socket import errno @@ -67,7 +66,6 @@ __original_socket__ = socket.socket def tcp_socket(): s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM) - set_nonblocking(s) return s @@ -90,6 +88,7 @@ def wrap_ssl(sock, certificate=None, private_key=None): connection.set_connect_state() return greenio.GreenSocket(connection) + socket_already_wrapped = False def wrap_socket_with_coroutine_socket(): global socket_already_wrapped @@ -98,9 +97,7 @@ def wrap_socket_with_coroutine_socket(): def new_socket(*args, **kw): from eventlet import greenio - s = __original_socket__(*args, **kw) - set_nonblocking(s) - return greenio.GreenSocket(s) + return greenio.GreenSocket(__original_socket__(*args, **kw)) socket.socket = new_socket socket.ssl = wrap_ssl @@ -108,6 +105,53 @@ def wrap_socket_with_coroutine_socket(): socket_already_wrapped = True +__original_fdopen__ = os.fdopen +__original_read__ = os.read +__original_write__ = os.write +__original_waitpid__ = os.waitpid +__original_fork__ = os.fork +## TODO wrappings for popen functions? not really needed since Process object exists? + + +pipes_already_wrapped = False +def wrap_pipes_with_coroutine_pipes(): + from eventlet import processes ## Make sure the signal handler is installed + global pipes_already_wrapped + if pipes_already_wrapped: + return + def new_fdopen(*args, **kw): + from eventlet import greenio + return greenio.GreenPipe(__original_fdopen__(*args, **kw)) + def new_read(fd, *args, **kw): + from eventlet import api + api.trampoline(fd, read=True) + return __original_read__(fd, *args, **kw) + def new_write(fd, *args, **kw): + from eventlet import api + api.trampoline(fd, write=True) + return __original_write__(fd, *args, **kw) + def new_fork(*args, **kwargs): + pid = __original_fork__ + if pid: + processes._add_child_pid(pid) + return pid + def new_waitpid(pid, options): + from eventlet import processes + evt = processes.CHILD_EVENTS[pid] + if options == os.WNOHANG: + if evt.ready(): + return pid, evt.wait() + return 0, 0 + elif options: + return __original_waitpid__(pid, result) + return pid, evt.wait() + os.fdopen = new_fdopen + os.read = new_read + os.write = new_write + os.fork = new_fork + os.waitpid = new_waitpid + + def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50): set_reuse_addr(descriptor) descriptor.bind(addr) @@ -124,18 +168,4 @@ def set_reuse_addr(descriptor): ) except socket.error: pass - -def set_nonblocking(descriptor): - if hasattr(descriptor, 'setblocking'): - # socket - descriptor.setblocking(0) - else: - # fd - if hasattr(descriptor, 'fileno'): - fd = descriptor.fileno() - else: - fd = descriptor - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - return descriptor