First cut of eventlet.green.subprocess module. Added stdlib tests as well. Fixed GreenPipe's incorrect assumption that all callers would use a buflen argument to read().
This commit is contained in:
69
eventlet/green/subprocess.py
Normal file
69
eventlet/green/subprocess.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import errno
|
||||
import new
|
||||
import os
|
||||
|
||||
import eventlet
|
||||
from eventlet import greenio
|
||||
from eventlet import patcher
|
||||
from eventlet.green import select
|
||||
|
||||
subprocess_orig = __import__("subprocess")
|
||||
# TODO: eventlet.green.os
|
||||
patcher.inject('subprocess', globals(), ('select', select))
|
||||
|
||||
# This is the meat of this module, the green version of Popen.
|
||||
class Popen(subprocess_orig.Popen):
|
||||
"""eventlet-friendly version of subprocess.Popen"""
|
||||
# We do not believe that Windows pipes support non-blocking I/O. At least,
|
||||
# the Python file objects stored on our base-class object have no
|
||||
# setblocking() method, and the Python fcntl module doesn't exist on
|
||||
# Windows. (see eventlet.greenio.set_nonblocking()) As the sole purpose of
|
||||
# this __init__() override is to wrap the pipes for eventlet-friendly
|
||||
# non-blocking I/O, don't even bother overriding it on Windows.
|
||||
if not subprocess_orig.mswindows:
|
||||
def __init__(self, *args, **kwds):
|
||||
# Forward the call to base-class constructor
|
||||
subprocess_orig.Popen.__init__(self, *args, **kwds)
|
||||
# Now wrap the pipes, if any. This logic is loosely borrowed from
|
||||
# eventlet.processes.Process.run() method.
|
||||
for attr in "stdin", "stdout", "stderr":
|
||||
pipe = getattr(self, attr)
|
||||
if pipe is not None:
|
||||
greenio.set_nonblocking(pipe)
|
||||
wrapped_pipe = greenio.GreenPipe(pipe)
|
||||
# The default 'newlines' attribute is '\r\n', which aren't
|
||||
# sent over pipes.
|
||||
wrapped_pipe.newlines = '\n'
|
||||
setattr(self, attr, wrapped_pipe)
|
||||
__init__.__doc__ = subprocess_orig.Popen.__init__.__doc__
|
||||
|
||||
def wait(self, check_interval=0.01):
|
||||
# Instead of a blocking OS call, this version of wait() uses logic
|
||||
# borrowed from the eventlet 0.2 processes.Process.wait() method.
|
||||
try:
|
||||
while True:
|
||||
status = self.poll()
|
||||
if status is not None:
|
||||
return status
|
||||
eventlet.sleep(check_interval)
|
||||
except OSError, e:
|
||||
if e.errno == errno.ECHILD:
|
||||
# no child process, this happens if the child process
|
||||
# already died and has been cleaned up, or if you just
|
||||
# called with a random pid value
|
||||
return -1
|
||||
else:
|
||||
raise
|
||||
wait.__doc__ = subprocess_orig.Popen.wait.__doc__
|
||||
|
||||
if not subprocess_orig.mswindows:
|
||||
# We don't want to copy/paste all the logic of the original
|
||||
# _communicate() method, we just want a version that uses
|
||||
# eventlet.green.select.select() instead of select.select().
|
||||
_communicate = new.function(subprocess_orig.Popen._communicate.im_func.func_code,
|
||||
globals())
|
||||
|
||||
# Borrow subprocess.call() and check_call(), but patch them so they reference
|
||||
# OUR Popen class rather than subprocess.Popen.
|
||||
call = new.function(subprocess_orig.call.func_code, globals())
|
||||
check_call = new.function(subprocess_orig.check_call.func_code, globals())
|
@@ -358,10 +358,8 @@ class GreenPipe(object):
|
||||
def fileno(self):
|
||||
return self.fd.fileno()
|
||||
|
||||
def read(self, buflen, flags=0):
|
||||
def _recv(self, buflen):
|
||||
fd = self.fd
|
||||
if buflen is None:
|
||||
buflen = BUFFER_SIZE
|
||||
buf = self.recvbuffer
|
||||
if buf:
|
||||
chunk, self.recvbuffer = buf[:buflen], buf[buflen:]
|
||||
@@ -378,7 +376,23 @@ class GreenPipe(object):
|
||||
raise
|
||||
trampoline(fd, read=True)
|
||||
|
||||
def write(self, data, flags=0):
|
||||
|
||||
def read(self, size=None):
|
||||
"""read at most size bytes, returned as a string."""
|
||||
accum = ''
|
||||
while True:
|
||||
if size is None:
|
||||
recv_size = BUFFER_SIZE
|
||||
else:
|
||||
recv_size = size - len(accum)
|
||||
chunk = self._recv(recv_size)
|
||||
accum += chunk
|
||||
if chunk == '':
|
||||
return accum
|
||||
if size is not None and len(accum) >= size:
|
||||
return accum
|
||||
|
||||
def write(self, data):
|
||||
fd = self.fd
|
||||
tail = 0
|
||||
len_data = len(data)
|
||||
|
11
tests/stdlib/test_subprocess.py
Normal file
11
tests/stdlib/test_subprocess.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from eventlet import patcher
|
||||
from eventlet.green import subprocess
|
||||
from eventlet.green import time
|
||||
|
||||
patcher.inject('test.test_subprocess',
|
||||
globals(),
|
||||
('subprocess', subprocess),
|
||||
('time', time))
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
Reference in New Issue
Block a user