On Python 3, GreenFileIO.write() now always write all bytes using a loop, to have the same behaviour than GreenPipe.write() on Python 2. Before, the write() could be partial, write less bytes than expected. On Python 2, GreenPipe.write() doesn't allow partial writes, it always write all bytes, even if the user requested an unbuffered pipe (ex: by calling os.fdopen(fd, 'wb', 0)). Modifying Python 2 to be pedantic and allow partial writes for unbuffered pipes will likely break a lot of applications. It's simpler to modify Python 3 GreenFileIO to behave the same than Python 2 GreenPipe.
214 lines
6.2 KiB
Python
214 lines
6.2 KiB
Python
import _pyio as _original_pyio
|
|
import errno
|
|
import os as _original_os
|
|
import socket as _original_socket
|
|
from io import (
|
|
BufferedRandom as _OriginalBufferedRandom,
|
|
BufferedReader as _OriginalBufferedReader,
|
|
BufferedWriter as _OriginalBufferedWriter,
|
|
DEFAULT_BUFFER_SIZE,
|
|
TextIOWrapper as _OriginalTextIOWrapper,
|
|
IOBase as _OriginalIOBase,
|
|
)
|
|
from types import FunctionType
|
|
|
|
from eventlet.greenio.base import (
|
|
_operation_on_closed_file,
|
|
greenpipe_doc,
|
|
set_nonblocking,
|
|
SOCKET_BLOCKING,
|
|
)
|
|
from eventlet.hubs import notify_close, notify_opened, IOClosed, trampoline
|
|
from eventlet.support import get_errno, six
|
|
|
|
__all__ = ['_fileobject', 'GreenPipe']
|
|
|
|
# TODO get rid of this, it only seems like the original _fileobject
|
|
_fileobject = _original_socket.SocketIO
|
|
|
|
# Large part of the following code is copied from the original
|
|
# eventlet.greenio module
|
|
|
|
|
|
class GreenFileIO(_OriginalIOBase):
|
|
def __init__(self, name, mode='r', closefd=True, opener=None):
|
|
if isinstance(name, int):
|
|
fileno = name
|
|
self._name = "<fd:%d>" % fileno
|
|
else:
|
|
assert isinstance(name, six.string_types)
|
|
with open(name, mode) as fd:
|
|
self._name = fd.name
|
|
fileno = _original_os.dup(fd.fileno())
|
|
|
|
notify_opened(fileno)
|
|
self._fileno = fileno
|
|
self._mode = mode
|
|
self._closed = False
|
|
set_nonblocking(self)
|
|
self._seekable = None
|
|
|
|
@property
|
|
def closed(self):
|
|
return self._closed
|
|
|
|
def seekable(self):
|
|
if self._seekable is None:
|
|
try:
|
|
_original_os.lseek(self._fileno, 0, _original_os.SEEK_CUR)
|
|
except IOError as e:
|
|
if get_errno(e) == errno.ESPIPE:
|
|
self._seekable = False
|
|
else:
|
|
raise
|
|
else:
|
|
self._seekable = True
|
|
|
|
return self._seekable
|
|
|
|
def readable(self):
|
|
return 'r' in self._mode or '+' in self._mode
|
|
|
|
def writable(self):
|
|
return 'w' in self._mode or '+' in self._mode
|
|
|
|
def fileno(self):
|
|
return self._fileno
|
|
|
|
def read(self, size=-1):
|
|
if size == -1:
|
|
return self.readall()
|
|
|
|
while True:
|
|
try:
|
|
return _original_os.read(self._fileno, size)
|
|
except OSError as e:
|
|
if get_errno(e) not in SOCKET_BLOCKING:
|
|
raise IOError(*e.args)
|
|
self._trampoline(self, read=True)
|
|
|
|
def readall(self):
|
|
buf = []
|
|
while True:
|
|
try:
|
|
chunk = _original_os.read(self._fileno, DEFAULT_BUFFER_SIZE)
|
|
if chunk == b'':
|
|
return b''.join(buf)
|
|
buf.append(chunk)
|
|
except OSError as e:
|
|
if get_errno(e) not in SOCKET_BLOCKING:
|
|
raise IOError(*e.args)
|
|
self._trampoline(self, read=True)
|
|
|
|
def readinto(self, b):
|
|
up_to = len(b)
|
|
data = self.read(up_to)
|
|
bytes_read = len(data)
|
|
b[:bytes_read] = data
|
|
return bytes_read
|
|
|
|
def isatty(self):
|
|
try:
|
|
return _original_os.isatty(self.fileno())
|
|
except OSError as e:
|
|
raise IOError(*e.args)
|
|
|
|
def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
|
|
if self._closed:
|
|
# Don't trampoline if we're already closed.
|
|
raise IOClosed()
|
|
try:
|
|
return trampoline(fd, read=read, write=write, timeout=timeout,
|
|
timeout_exc=timeout_exc,
|
|
mark_as_closed=self._mark_as_closed)
|
|
except IOClosed:
|
|
# Our fileno has been obsoleted. Defang ourselves to
|
|
# prevent spurious closes.
|
|
self._mark_as_closed()
|
|
raise
|
|
|
|
def _mark_as_closed(self):
|
|
""" Mark this socket as being closed """
|
|
self._closed = True
|
|
|
|
def write(self, data):
|
|
view = memoryview(data)
|
|
datalen = len(data)
|
|
offset = 0
|
|
while offset < datalen:
|
|
try:
|
|
written = _original_os.write(self._fileno, view[offset:])
|
|
except OSError as e:
|
|
if get_errno(e) not in SOCKET_BLOCKING:
|
|
raise IOError(*e.args)
|
|
trampoline(self, write=True)
|
|
else:
|
|
offset += written
|
|
return offset
|
|
|
|
def close(self):
|
|
if not self._closed:
|
|
self._closed = True
|
|
_original_os.close(self._fileno)
|
|
notify_close(self._fileno)
|
|
for method in [
|
|
'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
|
|
'readline', 'readlines', 'seek', 'tell', 'truncate',
|
|
'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
|
|
setattr(self, method, _operation_on_closed_file)
|
|
|
|
def truncate(self, size=-1):
|
|
if size == -1:
|
|
size = self.tell()
|
|
try:
|
|
rv = _original_os.ftruncate(self._fileno, size)
|
|
except OSError as e:
|
|
raise IOError(*e.args)
|
|
else:
|
|
self.seek(size) # move position&clear buffer
|
|
return rv
|
|
|
|
def seek(self, offset, whence=_original_os.SEEK_SET):
|
|
try:
|
|
return _original_os.lseek(self._fileno, offset, whence)
|
|
except OSError as e:
|
|
raise IOError(*e.args)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.close()
|
|
|
|
|
|
_open_environment = dict(globals())
|
|
_open_environment.update(dict(
|
|
BufferedRandom=_OriginalBufferedRandom,
|
|
BufferedWriter=_OriginalBufferedWriter,
|
|
BufferedReader=_OriginalBufferedReader,
|
|
TextIOWrapper=_OriginalTextIOWrapper,
|
|
FileIO=GreenFileIO,
|
|
os=_original_os,
|
|
))
|
|
|
|
_open = FunctionType(
|
|
six.get_function_code(_original_pyio.open),
|
|
_open_environment,
|
|
)
|
|
|
|
|
|
def GreenPipe(name, mode="r", buffering=-1, encoding=None, errors=None,
|
|
newline=None, closefd=True, opener=None):
|
|
try:
|
|
fileno = name.fileno()
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
fileno = _original_os.dup(fileno)
|
|
name.close()
|
|
name = fileno
|
|
|
|
return _open(name, mode, buffering, encoding, errors, newline, closefd, opener)
|
|
|
|
GreenPipe.__doc__ = greenpipe_doc
|