Reimplemented GreenPipe without using file object.

file.write is not returning number of bytes writen and partial writes were not handled properly.
New implementation is using os module calls which support partial writes.
It also implements missing calls from file object (like seek, tell, truncate, ...).
The later is not very usefull, because regular files never return EAGAIN.
New GreenPipe can be constructed from int, string or file object.
This commit is contained in:
amajorek
2010-03-30 00:41:39 -04:00
parent 4a3b4e99e5
commit b13bf6ce5e
7 changed files with 180 additions and 162 deletions

View File

@@ -12,12 +12,13 @@ __patched__ = ['fdopen', 'read', 'write', 'wait', 'waitpid']
for var in dir(os_orig):
exec "%s = os_orig.%s" % (var, var)
__original_fdopen__ = os_orig.fdopen
def fdopen(*args, **kw):
def fdopen(fd, *args, **kw):
"""fdopen(fd [, mode='r' [, bufsize]]) -> file_object
Return an open file object connected to a file descriptor."""
return greenio.GreenPipe(__original_fdopen__(*args, **kw))
if not isinstance(fd, int):
raise TypeError('fd should be int, not %r' % fd)
return greenio.GreenPipe(fd, *args, **kw)
__original_read__ = os_orig.read
def read(fd, n):

View File

@@ -20,19 +20,15 @@ class Popen(subprocess_orig.Popen):
# this __init__() override is to wrap the pipes for eventlet-friendly
# non-blocking I/O, don't even bother overriding it on Windows.
if not subprocess_orig.mswindows:
def __init__(self, *args, **kwds):
def __init__(self, args, bufsize=0, *argss, **kwds):
# Forward the call to base-class constructor
subprocess_orig.Popen.__init__(self, *args, **kwds)
subprocess_orig.Popen.__init__(self, args, 0, *argss, **kwds)
# Now wrap the pipes, if any. This logic is loosely borrowed from
# eventlet.processes.Process.run() method.
for attr in "stdin", "stdout", "stderr":
pipe = getattr(self, attr)
if pipe is not None:
greenio.set_nonblocking(pipe)
wrapped_pipe = greenio.GreenPipe(pipe)
# The default 'newlines' attribute is '\r\n', which aren't
# sent over pipes.
wrapped_pipe.newlines = '\n'
wrapped_pipe = greenio.GreenPipe(pipe, pipe.mode, bufsize)
setattr(self, attr, wrapped_pipe)
__init__.__doc__ = subprocess_orig.Popen.__init__.__doc__

View File

@@ -306,139 +306,178 @@ class GreenSocket(object):
def gettimeout(self):
return self._timeout
class _SocketDuckForFd(object):
""" Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls."""
def __init__(self, fileno):
self._fileno = fileno
class GreenPipe(object):
""" GreenPipe is a cooperatively-yielding wrapper around OS pipes.
"""
newlines = '\n'
def __init__(self, fd):
set_nonblocking(fd)
self.fd = fd
self.closed = False
self.recvbuffer = ''
def close(self):
self.fd.close()
self.closed = True
def fileno(self):
return self.fd.fileno()
def _recv(self, buflen):
fd = self.fd
buf = self.recvbuffer
if buf:
chunk, self.recvbuffer = buf[:buflen], buf[buflen:]
return chunk
while True:
try:
return fd.read(buflen)
except IOError, e:
if get_errno(e) != errno.EAGAIN:
return ''
except socket.error, e:
if get_errno(e) == errno.EPIPE:
return ''
raise
trampoline(fd, read=True)
def read(self, size=None):
"""read at most size bytes, returned as a string."""
accum = ''
while True:
if size is None:
recv_size = BUFFER_SIZE
else:
recv_size = size - len(accum)
chunk = self._recv(recv_size)
accum += chunk
if chunk == '':
return accum
if size is not None and len(accum) >= size:
return accum
def write(self, data):
fd = self.fd
while True:
try:
fd.write(data)
fd.flush()
return len(data)
except IOError, e:
if get_errno(e) != errno.EAGAIN:
raise
except ValueError, e:
# what's this for?
pass
except socket.error, e:
if get_errno(e) != errno.EPIPE:
raise
trampoline(fd, write=True)
def flush(self):
pass
def readuntil(self, terminator, size=None):
buf, self.recvbuffer = self.recvbuffer, ''
checked = 0
if size is None:
while True:
found = buf.find(terminator, checked)
if found != -1:
found += len(terminator)
chunk, self.recvbuffer = buf[:found], buf[found:]
return chunk
checked = max(0, len(buf) - (len(terminator) - 1))
d = self._recv(BUFFER_SIZE)
if not d:
break
buf += d
return buf
while len(buf) < size:
found = buf.find(terminator, checked)
if found != -1:
found += len(terminator)
chunk, self.recvbuffer = buf[:found], buf[found:]
return chunk
checked = len(buf)
d = self._recv(BUFFER_SIZE)
if not d:
break
buf += d
chunk, self.recvbuffer = buf[:size], buf[size:]
return chunk
def readline(self, size=None):
return self.readuntil(self.newlines, size=size)
def __iter__(self):
return self.xreadlines()
def __enter__(self):
@property
def _sock(self):
return self
def __exit__(self, *exc_info):
self.close()
def fileno(self):
return self._fileno
def xreadlines(self, size=None):
if size is None:
while True:
line = self.readline()
if not line:
break
yield line
def recv(self, buflen):
while True:
try:
data = os.read(self._fileno, buflen)
return data
except OSError, e:
if get_errno(e) != errno.EAGAIN:
raise IOError(*e.args)
trampoline(self, read=True)
def sendall(self, data):
len_data = len(data)
os_write = os.write
fileno = self._fileno
try:
total_sent = os_write(fileno, data)
except OSError, e:
if get_errno(e) != errno.EAGAIN:
raise IOError(*e.args)
total_sent = 0
while total_sent <len_data:
trampoline(self, write=True)
try:
total_sent += os_write(fileno, data[total_sent:])
except OSError, e:
if get_errno(e) != errno. EAGAIN:
raise IOError(*e.args)
def __del__(self):
os.close(self._fileno)
def __repr__(self):
return "%s:%d" % (self.__class__.__name__, self._fileno)
def _operationOnClosedFile(*args, **kwargs):
raise ValueError("I/O operation on closed file")
class GreenPipe(_fileobject):
"""
GreenPipe is a cooperative replacement for file class.
It will cooperate on pipes. It will block on regular file.
Differneces from file class:
- mode is r/w property. Should re r/o
- encoding property not implemented
- write/writelines will not raise TypeError exception when non-string data is written
it will write str(data) instead
- Universal new lines are not supported and newlines property not implementeded
- file argument can be descriptor, file name or file object.
"""
def __init__(self, f, mode='r', bufsize=-1):
if not isinstance(f, (basestring, int, file)):
raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
if isinstance(f, basestring):
f = open(f, mode, bufsize=0)
if isinstance(f, int):
fileno = f
self._name = "<fd:%d>" % fileno
else:
while size > 0:
line = self.readline(size)
if not line:
break
yield line
size -= len(line)
fileno = os.dup(f.fileno())
self._name = f.name
if f.mode != mode:
raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
self._name = f.name
f.close()
def writelines(self, lines):
for line in lines:
self.write(line)
super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize)
set_nonblocking(self)
self.softspace = 0
@property
def name(self): return self._name
def __repr__(self):
return "<%s %s %r, mode %r at 0x%x>" % (
self.closed and 'closed' or 'open',
self.__class__.__name__,
self.name,
self.mode,
(id(self) < 0) and (sys.maxint +id(self)) or id(self))
def close(self):
super(GreenPipe, self).close()
for method in ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
'readline', 'readlines', 'seek', 'tell', 'truncate',
'write', 'xreadlines', '__iter__', 'writelines']:
setattr(self, method, _operationOnClosedFile)
if getattr(file, '__enter__', None):
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def xreadlines(self, buffer):
return iterator(self)
def readinto(self, buf):
data = self.read(len(buf)) #FIXME could it be done without allocating intermediate?
n = len(data)
try:
buf[:n] = data
except TypeError, err:
if not isinstance(buf, array.array):
raise err
buf[:n] = array.array('c', data)
return n
def _get_readahead_len(self):
try:
return len(self._rbuf.getvalue()) # StringIO in 2.5
except AttributeError:
return len(self._rbuf) # str in 2.4
def _clear_readahead_buf(self):
len = self._get_readahead_len()
if len>0:
self.read(len)
def tell(self):
self.flush()
try:
return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
except OSError, e:
raise IOError(*e.args)
def seek(self, offset, whence=0):
self.flush()
if whence == 1 and offset==0: # tell synonym
return self.tell()
if whence == 1: # adjust offset by what is read ahead
offset -= self.get_readahead_len()
try:
rv = os.lseek(self.fileno(), offset, whence)
except OSError, e:
raise IOError(*e.args)
else:
self._clear_readahead_buf()
return rv
if getattr(file, "truncate", None): # not all OSes implement truncate
def truncate(self, size=-1):
self.flush()
if size ==-1:
size = self.tell()
try:
rv = os.ftruncate(self.fileno(), size)
except OSError, e:
raise IOError(*e.args)
else:
self.seek(size) # move position&clear buffer
return rv
def isatty(self):
try:
return os.isatty(self.fileno())
except OSError, e:
raise IOError(*e.args)
# import SSL module here so we can refer to greenio.SSL.exceptionclass

View File

@@ -65,14 +65,8 @@ class Process(object):
self.popen4 = popen2.Popen4([self.command] + self.args)
child_stdout_stderr = self.popen4.fromchild
child_stdin = self.popen4.tochild
greenio.set_nonblocking(child_stdout_stderr)
greenio.set_nonblocking(child_stdin)
self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr)
self.child_stdout_stderr.newlines = '\n' # the default is
# \r\n, which aren't sent over
# pipes
self.child_stdin = greenio.GreenPipe(child_stdin)
self.child_stdin.newlines = '\n'
self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr, child_stdout_stderr.mode, 0)
self.child_stdin = greenio.GreenPipe(child_stdin, child_stdin.mode, 0)
self.sendall = self.child_stdin.write
self.send = self.child_stdin.write

View File

@@ -229,11 +229,8 @@ def setup():
_setup_already = True
try:
_rpipe, _wpipe = os.pipe()
_wfile = os.fdopen(_wpipe,"wb",0)
_rfile = os.fdopen(_rpipe,"rb",0)
## Work whether or not wrap_pipe_with_coroutine_pipe was called
if not isinstance(_rfile, greenio.GreenPipe):
_rfile = greenio.GreenPipe(_rfile)
_wfile = greenio.GreenPipe(_wpipe, 'wb', 0)
_rfile = greenio.GreenPipe(_rpipe, 'rb', 0)
except ImportError:
# This is Windows compatibility -- use a socket instead of a pipe because
# pipes don't really exist on Windows.

View File

@@ -456,11 +456,8 @@ class TestGreenIo(LimitedTestCase):
# also ensures that readline() terminates on '\n' and '\r\n'
r, w = os.pipe()
r = os.fdopen(r)
w = os.fdopen(w, 'w')
r = greenio.GreenPipe(r)
w = greenio.GreenPipe(w)
w = greenio.GreenPipe(w, 'w')
def writer():
eventlet.sleep(.1)
@@ -486,11 +483,8 @@ class TestGreenIo(LimitedTestCase):
def test_pipe_writes_large_messages(self):
r, w = os.pipe()
r = os.fdopen(r)
w = os.fdopen(w, 'w', 0)
r = greenio.GreenPipe(r)
w = greenio.GreenPipe(w)
w = greenio.GreenPipe(w, 'w')
large_message = "".join([1024*chr(i) for i in xrange(65)])
def writer():

View File

@@ -11,11 +11,8 @@ class TestGreenPipeWithStatement(LimitedTestCase):
# ensure using a pipe as a context actually closes it.
r, w = os.pipe()
r = os.fdopen(r)
w = os.fdopen(w, 'w')
r = greenio.GreenPipe(r)
w = greenio.GreenPipe(w)
w = greenio.GreenPipe(w, 'w')
with r:
pass