Add support for Process.wait; clean up the way set_nonblocking is called and move it out of util into greenio; Add wrap_pipes_with_coroutine_pipes (UNTESTED)

This commit is contained in:
donovan
2008-05-09 08:56:13 -07:00
parent 03f8d92618
commit 3db9d703f5
3 changed files with 132 additions and 33 deletions

View File

@@ -22,11 +22,15 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from eventlet.api import trampoline, get_hub
from eventlet import util
BUFFER_SIZE = 4096
import socket, errno
import errno
import os
import socket
import fcntl
from errno import EWOULDBLOCK, EAGAIN
@@ -89,7 +93,7 @@ def socket_send(descriptor, data):
try:
return descriptor.send(data)
except socket.error, e:
if e[0] == errno.EWOULDBLOCK:
if e[0] == errno.EWOULDBLOCK or e[0] == errno.ENOTCONN:
return 0
raise
except SSL.WantWriteError:
@@ -147,10 +151,22 @@ def file_send(fd, data):
written = 0
def set_nonblocking(fd):
## Socket
if hasattr(fd, 'setblocking'):
fd.setblocking(0)
## File
else:
fileno = fd.fileno()
flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
fcntl.fcntl(fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK)
class GreenSocket(object):
is_secure = False
def __init__(self, fd):
set_nonblocking(fd)
self.fd = fd
self._fileno = fd.fileno()
self.sendcount = 0
@@ -164,7 +180,7 @@ class GreenSocket(object):
res = socket_accept(fd)
if res is not None:
client, addr = res
util.set_nonblocking(client)
set_nonblocking(client)
return type(self)(client), addr
trampoline(fd, read=True, write=True)
@@ -197,7 +213,7 @@ class GreenSocket(object):
def dup(self, *args, **kw):
sock = self.fd.dup(*args, **kw)
util.set_nonblocking(sock)
set_nonblocking(sock)
return type(self)(sock)
def fileno(self, *args, **kw):
@@ -266,11 +282,13 @@ class GreenSocket(object):
return fn(*args, **kw)
class GreenFile(object):
newlines = '\r\n'
mode = 'wb+'
def __init__(self, fd):
set_nonblocking(fd)
self.sock = fd
self.closed = False
@@ -386,6 +404,7 @@ class GreenPipeSocket(GreenSocket):
class GreenPipe(GreenFile):
def __init__(self, fd):
set_nonblocking(fd)
self.fd = GreenPipeSocket(fd)
super(GreenPipe, self).__init__(self.fd)
@@ -399,3 +418,5 @@ class GreenPipe(GreenFile):
def flush(self):
self.fd.fd.flush()

View File

@@ -21,19 +21,63 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import errno
import os
import popen2
import signal
import sys
from eventlet import util, pools
from eventlet import coros
from eventlet import pools
from eventlet import greenio
class DeadProcess(RuntimeError):
pass
CHILD_PIDS = []
CHILD_EVENTS = {}
def sig_child(signal, frame):
for child_pid in CHILD_PIDS:
try:
pid, code = os.waitpid(child_pid, os.WNOHANG)
if not pid:
continue ## Wasn't this one that died
elif pid == -1:
print >> sys.stderr, "Got -1! Why didn't python raise?"
elif pid != child_pid:
print >> sys.stderr, "pid (%d) != child_pid (%d)" % (pid, child_pid)
# Defensively assume we could get a different pid back
if CHILD_EVENTS.get(pid):
event = CHILD_EVENTS.pop(pid)
event.send(code)
except OSError, e:
if e[0] != errno.ECHILD:
raise e
elif CHILD_EVENTS.get(child_pid):
# Already dead; signal, but assume success
event = CHILD_EVENTS.pop(child_pid)
event.send(0)
signal.signal(signal.SIGCHLD, sig_child)
def _add_child_pid(pid):
"""Add the given integer 'pid' to the list of child
process ids we are tracking. Return an event object
that can be used to get the process' exit code.
"""
CHILD_PIDS.append(pid)
event = coros.event()
CHILD_EVENTS[pid] = event
return event
class Process(object):
process_number = 0
def __init__(self, command, args, dead_callback=lambda:None):
@@ -51,10 +95,11 @@ class Process(object):
## We use popen4 so that read() will read from either stdout or stderr
self.popen4 = popen2.Popen4([self.command] + self.args)
self.event = _add_child_pid(self.popen4.pid)
child_stdout_stderr = self.popen4.fromchild
child_stdin = self.popen4.tochild
util.set_nonblocking(child_stdout_stderr)
util.set_nonblocking(child_stdin)
greenio.set_nonblocking(child_stdout_stderr)
greenio.set_nonblocking(child_stdin)
self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr)
self.child_stdout_stderr.newlines = '\n' # the default is \r\n, which aren't sent over pipes
self.child_stdin = greenio.GreenPipe(child_stdin)
@@ -116,6 +161,9 @@ class Process(object):
def getpid(self):
return self.popen4.pid
def wait(self):
return self.event.wait()
class ProcessPool(pools.Pool):
def __init__(self, command, args=None, min_size=0, max_size=4):

View File

@@ -24,7 +24,6 @@ THE SOFTWARE.
"""
import os
import fcntl
import socket
import errno
@@ -67,7 +66,6 @@ __original_socket__ = socket.socket
def tcp_socket():
s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM)
set_nonblocking(s)
return s
@@ -90,6 +88,7 @@ def wrap_ssl(sock, certificate=None, private_key=None):
connection.set_connect_state()
return greenio.GreenSocket(connection)
socket_already_wrapped = False
def wrap_socket_with_coroutine_socket():
global socket_already_wrapped
@@ -98,9 +97,7 @@ def wrap_socket_with_coroutine_socket():
def new_socket(*args, **kw):
from eventlet import greenio
s = __original_socket__(*args, **kw)
set_nonblocking(s)
return greenio.GreenSocket(s)
return greenio.GreenSocket(__original_socket__(*args, **kw))
socket.socket = new_socket
socket.ssl = wrap_ssl
@@ -108,6 +105,53 @@ def wrap_socket_with_coroutine_socket():
socket_already_wrapped = True
__original_fdopen__ = os.fdopen
__original_read__ = os.read
__original_write__ = os.write
__original_waitpid__ = os.waitpid
__original_fork__ = os.fork
## TODO wrappings for popen functions? not really needed since Process object exists?
pipes_already_wrapped = False
def wrap_pipes_with_coroutine_pipes():
from eventlet import processes ## Make sure the signal handler is installed
global pipes_already_wrapped
if pipes_already_wrapped:
return
def new_fdopen(*args, **kw):
from eventlet import greenio
return greenio.GreenPipe(__original_fdopen__(*args, **kw))
def new_read(fd, *args, **kw):
from eventlet import api
api.trampoline(fd, read=True)
return __original_read__(fd, *args, **kw)
def new_write(fd, *args, **kw):
from eventlet import api
api.trampoline(fd, write=True)
return __original_write__(fd, *args, **kw)
def new_fork(*args, **kwargs):
pid = __original_fork__
if pid:
processes._add_child_pid(pid)
return pid
def new_waitpid(pid, options):
from eventlet import processes
evt = processes.CHILD_EVENTS[pid]
if options == os.WNOHANG:
if evt.ready():
return pid, evt.wait()
return 0, 0
elif options:
return __original_waitpid__(pid, result)
return pid, evt.wait()
os.fdopen = new_fdopen
os.read = new_read
os.write = new_write
os.fork = new_fork
os.waitpid = new_waitpid
def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50):
set_reuse_addr(descriptor)
descriptor.bind(addr)
@@ -125,17 +169,3 @@ def set_reuse_addr(descriptor):
except socket.error:
pass
def set_nonblocking(descriptor):
if hasattr(descriptor, 'setblocking'):
# socket
descriptor.setblocking(0)
else:
# fd
if hasattr(descriptor, 'fileno'):
fd = descriptor.fileno()
else:
fd = descriptor
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
return descriptor