Added eventlet.green.os and its test; rewrote wrap_pipe_with_coroutine_pipe to accomodate. Slight changes to eventlet.green.subprocess.

This commit is contained in:
Ryan Williams
2010-02-05 19:56:06 -08:00
parent 88e2387322
commit 69ad86ed24
6 changed files with 97 additions and 56 deletions

71
eventlet/green/os.py Normal file
View File

@@ -0,0 +1,71 @@
os_orig = __import__("os")
import errno
import socket
from eventlet import greenio
from eventlet import greenthread
from eventlet import hubs
for var in dir(os_orig):
exec "%s = os_orig.%s" % (var, var)
def fdopen(*args, **kw):
"""fdopen(fd [, mode='r' [, bufsize]]) -> file_object
Return an open file object connected to a file descriptor."""
return greenio.GreenPipe(os_orig.fdopen(*args, **kw))
def read(fd, n):
"""read(fd, buffersize) -> string
Read a file descriptor."""
while True:
try:
return os_orig.read(fd, n)
except (OSError, IOError), e:
if e[0] != errno.EAGAIN:
raise
except socket.error, e:
if e[0] == errno.EPIPE:
return ''
raise
hubs.trampoline(fd, read=True)
def write(fd, st):
"""write(fd, string) -> byteswritten
Write a string to a file descriptor.
"""
while True:
try:
return os_orig.write(fd, st)
except (OSError, IOError), e:
if e[0] != errno.EAGAIN:
raise
except socket.error, e:
if e[0] != errno.EPIPE:
raise
hubs.trampoline(fd, write=True)
def wait():
"""wait() -> (pid, status)
Wait for completion of a child process."""
return waitpid(0,0)
def waitpid(pid, options):
"""waitpid(...)
waitpid(pid, options) -> (pid, status)
Wait for completion of a given child process."""
if options & os.WNOHANG != 0:
return os_orig.waitpid(pid, options)
else:
new_options = options | os.WNOHANG
while True:
rpid, status = os_orig.waitpid(pid, new_options)
if status >= 0:
return rpid, status
greenthread.sleep(0.01)
# TODO: open

View File

@@ -1,15 +1,14 @@
import errno
import new
import os
import eventlet
from eventlet import greenio
from eventlet import patcher
from eventlet.green import os
from eventlet.green import select
subprocess_orig = __import__("subprocess")
# TODO: eventlet.green.os
patcher.inject('subprocess', globals(), ('select', select))
subprocess_orig = __import__("subprocess")
# This is the meat of this module, the green version of Popen.
class Popen(subprocess_orig.Popen):
@@ -49,17 +48,16 @@ class Popen(subprocess_orig.Popen):
except OSError, e:
if e.errno == errno.ECHILD:
# no child process, this happens if the child process
# already died and has been cleaned up, or if you just
# called with a random pid value
# already died and has been cleaned up
return -1
else:
raise
wait.__doc__ = subprocess_orig.Popen.wait.__doc__
if not subprocess_orig.mswindows:
# We don't want to copy/paste all the logic of the original
# _communicate() method, we just want a version that uses
# eventlet.green.select.select() instead of select.select().
# don't want to rewrite the original _communicate() method, we
# just want a version that uses eventlet.green.select.select()
# instead of select.select().
_communicate = new.function(subprocess_orig.Popen._communicate.im_func.func_code,
globals())

View File

@@ -394,20 +394,16 @@ class GreenPipe(object):
def write(self, data):
fd = self.fd
tail = 0
len_data = len(data)
while tail < len_data:
tosend = data[tail:]
while True:
try:
fd.write(tosend)
fd.write(data)
fd.flush()
tail += len(tosend)
if tail == len_data:
return len_data
return len(data)
except IOError, e:
if e[0] != EAGAIN:
raise
except ValueError, e:
# what's this for?
pass
except socket.error, e:
if e[0] != errno.EPIPE:

View File

@@ -121,49 +121,14 @@ __original_waitpid__ = os.waitpid
pipes_already_wrapped = False
def wrap_pipes_with_coroutine_pipes():
from eventlet import processes ## Make sure the signal handler is installed
global pipes_already_wrapped
if pipes_already_wrapped:
return
def new_fdopen(*args, **kw):
return greenio.GreenPipe(__original_fdopen__(*args, **kw))
def new_read(fd, *args, **kw):
from eventlet import hubs
try:
hubs.trampoline(fd, read=True)
except socket.error, e:
if e[0] == errno.EPIPE:
return ''
else:
raise
return __original_read__(fd, *args, **kw)
def new_write(fd, *args, **kw):
from eventlet import hubs
hubs.trampoline(fd, write=True)
return __original_write__(fd, *args, **kw)
def new_fork(*args, **kwargs):
pid = __original_fork__()
if pid:
processes._add_child_pid(pid)
return pid
def new_waitpid(pid, options):
from eventlet import processes
evt = processes.CHILD_EVENTS.get(pid)
if not evt:
return 0, 0
if options == os.WNOHANG:
if evt.ready():
return pid, evt.wait()
return 0, 0
elif options:
return __original_waitpid__(pid, options)
return pid, evt.wait()
os.fdopen = new_fdopen
os.read = new_read
os.write = new_write
if __original_fork__ is not None:
os.fork = new_fork
os.waitpid = new_waitpid
from eventlet.green import greenos
os.fdopen = greenos.fdopen
os.read = greenos.read
os.write = greenos.write
os.waitpid = greenos.waitpid
__original_select__ = select.select

View File

@@ -41,11 +41,13 @@ import_main('test_ftplib')
import_main('test_httplib')
if have_network_access:
import_main('test_httpservers')
import_main('test_os')
import_main('test_queue')
if have_network_access:
import_main('test_socket')
import_main('test_socket_ssl')
#import_main('test_socketserver')
#import_main('test_subprocess')
if have_network_access:
import_main('test_ssl')
import_main('test_thread')

9
tests/stdlib/test_os.py Normal file
View File

@@ -0,0 +1,9 @@
from eventlet import patcher
from eventlet.green import os
patcher.inject('test.test_os',
globals(),
('os', os))
if __name__ == "__main__":
test_main()