Files
deb-python-eventlet/eventlet/greenio.py
Michael Kerrin 8f98f8b634 hubs: defang after remove; related to second simultaneous read issue
https://github.com/eventlet/eventlet/issues/94

We have to defang the listener after removing it from the hub

Otherwise we never actaully remove it from the hub and get a return of the
second simultanous read.

This should fix this issue.

Turn off __builtin__ monkey patching by default

The reason is that eventlet.greenio.GreenPipe tries to
adapt an original file f, and performs the following check
isinstance(f, file) f is an original file object with file
is now our file method. This fails causing TypeError

nova-api exercises this

Fix up zmq to use the extended add signature

Having heard no problems relating eventlet and zmq,
I'm not going to dive in and drop random pataches against
it. If the 'Second simultaneous *er' crops up against
zmw then we at least now have the machinery to
address it.
2014-08-12 12:10:42 +04:00

681 lines
22 KiB
Python

import errno
import os
from socket import socket as _original_socket
import socket
import sys
import time
import warnings
from eventlet.support import get_errno, six
from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed
__all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
BUFFER_SIZE = 4096
CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
CONNECT_SUCCESS = set((0, errno.EISCONN))
if sys.platform[:3] == "win":
CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67
if six.PY3:
from io import IOBase as file
_fileobject = socket.SocketIO
elif six.PY2:
_fileobject = socket._fileobject
def socket_connect(descriptor, address):
"""
Attempts to connect to the address, returns the descriptor if it succeeds,
returns None if it needs to trampoline, and raises any exceptions.
"""
err = descriptor.connect_ex(address)
if err in CONNECT_ERR:
return None
if err not in CONNECT_SUCCESS:
raise socket.error(err, errno.errorcode[err])
return descriptor
def socket_checkerr(descriptor):
err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err not in CONNECT_SUCCESS:
raise socket.error(err, errno.errorcode[err])
def socket_accept(descriptor):
"""
Attempts to accept() on the descriptor, returns a client,address tuple
if it succeeds; returns None if it needs to trampoline, and raises
any exceptions.
"""
try:
return descriptor.accept()
except socket.error as e:
if get_errno(e) == errno.EWOULDBLOCK:
return None
raise
if sys.platform[:3] == "win":
# winsock sometimes throws ENOTCONN
SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
else:
# oddly, on linux/darwin, an unconnected socket is expected to block,
# so we treat ENOTCONN the same as EWOULDBLOCK
SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
def set_nonblocking(fd):
"""
Sets the descriptor to be nonblocking. Works on many file-like
objects as well as sockets. Only sockets can be nonblocking on
Windows, however.
"""
try:
setblocking = fd.setblocking
except AttributeError:
# fd has no setblocking() method. It could be that this version of
# Python predates socket.setblocking(). In that case, we can still set
# the flag "by hand" on the underlying OS fileno using the fcntl
# module.
try:
import fcntl
except ImportError:
# Whoops, Windows has no fcntl module. This might not be a socket
# at all, but rather a file-like object with no setblocking()
# method. In particular, on Windows, pipes don't support
# non-blocking I/O and therefore don't have that method. Which
# means fcntl wouldn't help even if we could load it.
raise NotImplementedError("set_nonblocking() on a file object "
"with no setblocking() method "
"(Windows pipes don't support non-blocking I/O)")
# We managed to import fcntl.
fileno = fd.fileno()
orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
new_flags = orig_flags | os.O_NONBLOCK
if new_flags != orig_flags:
fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
else:
# socket supports setblocking()
setblocking(0)
try:
from socket import _GLOBAL_DEFAULT_TIMEOUT
except ImportError:
_GLOBAL_DEFAULT_TIMEOUT = object()
class GreenSocket(object):
"""
Green version of socket.socket class, that is intended to be 100%
API-compatible.
It also recognizes the keyword parameter, 'set_nonblocking=True'.
Pass False to indicate that socket is already in non-blocking mode
to save syscalls.
"""
def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
should_set_nonblocking = kwargs.pop('set_nonblocking', True)
if isinstance(family_or_realsock, six.integer_types):
fd = _original_socket(family_or_realsock, *args, **kwargs)
# Notify the hub that this is a newly-opened socket.
notify_opened(fd.fileno())
else:
fd = family_or_realsock
# import timeout from other socket, if it was there
try:
self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
except AttributeError:
self._timeout = socket.getdefaulttimeout()
if should_set_nonblocking:
set_nonblocking(fd)
self.fd = fd
# when client calls setblocking(0) or settimeout(0) the socket must
# act non-blocking
self.act_non_blocking = False
# Copy some attributes from underlying real socket.
# This is the easiest way that i found to fix
# https://bitbucket.org/eventlet/eventlet/issue/136
# Only `getsockopt` is required to fix that issue, others
# are just premature optimization to save __getattr__ call.
self.bind = fd.bind
self.close = fd.close
self.fileno = fd.fileno
self.getsockname = fd.getsockname
self.getsockopt = fd.getsockopt
self.listen = fd.listen
self.setsockopt = fd.setsockopt
self.shutdown = fd.shutdown
self._closed = False
@property
def _sock(self):
return self
# Forward unknown attributes to fd, cache the value for future use.
# I do not see any simple attribute which could be changed
# so caching everything in self is fine.
# If we find such attributes - only attributes having __get__ might be cached.
# For now - I do not want to complicate it.
def __getattr__(self, name):
attr = getattr(self.fd, name)
setattr(self, name, attr)
return attr
def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
""" We need to trampoline via the event hub.
We catch any signal back from the hub indicating that the operation we
were waiting on was associated with a filehandle that's since been
invalidated.
"""
if self._closed:
# If we did any logging, alerting to a second trampoline attempt on a closed
# socket here would be useful.
raise IOClosed()
try:
return trampoline(fd, read=read, write=write, timeout=timeout,
timeout_exc=timeout_exc,
mark_as_closed=self._mark_as_closed)
except IOClosed:
# This socket's been obsoleted. De-fang it.
self._mark_as_closed()
raise
def accept(self):
if self.act_non_blocking:
return self.fd.accept()
fd = self.fd
while True:
res = socket_accept(fd)
if res is not None:
client, addr = res
set_nonblocking(client)
return type(self)(client), addr
self._trampoline(fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
def _defanged_close(self):
# Already closed the once
pass
def _mark_as_closed(self):
""" Mark this socket as being closed """
self.close = self._defanged_close
self._closed = True
def close(self):
notify_close(self.fd)
self._mark_as_closed() # Don't do this twice.
return self.fd.close()
def __del__(self):
self.close()
def connect(self, address):
if self.act_non_blocking:
return self.fd.connect(address)
fd = self.fd
if self.gettimeout() is None:
while not socket_connect(fd, address):
try:
self._trampoline(fd, write=True)
except IOClosed:
raise socket.error(errno.EBADFD)
socket_checkerr(fd)
else:
end = time.time() + self.gettimeout()
while True:
if socket_connect(fd, address):
return
if time.time() >= end:
raise socket.timeout("timed out")
try:
self._trampoline(fd, write=True, timeout=end - time.time(),
timeout_exc=socket.timeout("timed out"))
except IOClosed:
# ... we need some workable errno here.
raise socket.error(errno.EBADFD)
socket_checkerr(fd)
def connect_ex(self, address):
if self.act_non_blocking:
return self.fd.connect_ex(address)
fd = self.fd
if self.gettimeout() is None:
while not socket_connect(fd, address):
try:
self._trampoline(fd, write=True)
socket_checkerr(fd)
except socket.error as ex:
return get_errno(ex)
except IOClosed:
return errno.EBADFD
else:
end = time.time() + self.gettimeout()
while True:
try:
if socket_connect(fd, address):
return 0
if time.time() >= end:
raise socket.timeout(errno.EAGAIN)
self._trampoline(fd, write=True, timeout=end - time.time(),
timeout_exc=socket.timeout(errno.EAGAIN))
socket_checkerr(fd)
except socket.error as ex:
return get_errno(ex)
except IOClosed:
return errno.EBADFD
def dup(self, *args, **kw):
sock = self.fd.dup(*args, **kw)
newsock = type(self)(sock, set_nonblocking=False)
newsock.settimeout(self.gettimeout())
return newsock
def makefile(self, *args, **kw):
dupped = self.dup()
res = _fileobject(dupped, *args, **kw)
if hasattr(dupped, "_drop"):
dupped._drop()
return res
def makeGreenFile(self, *args, **kw):
warnings.warn("makeGreenFile has been deprecated, please use "
"makefile instead", DeprecationWarning, stacklevel=2)
return self.makefile(*args, **kw)
def recv(self, buflen, flags=0):
fd = self.fd
if self.act_non_blocking:
return fd.recv(buflen, flags)
while True:
try:
return fd.recv(buflen, flags)
except socket.error as e:
if get_errno(e) in SOCKET_BLOCKING:
pass
elif get_errno(e) in SOCKET_CLOSED:
return ''
else:
raise
try:
self._trampoline(
fd,
read=True,
timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
except IOClosed as e:
# Perhaps we should return '' instead?
raise EOFError()
def recvfrom(self, *args):
if not self.act_non_blocking:
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recvfrom(*args)
def recvfrom_into(self, *args):
if not self.act_non_blocking:
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recvfrom_into(*args)
def recv_into(self, *args):
if not self.act_non_blocking:
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recv_into(*args)
def send(self, data, flags=0):
fd = self.fd
if self.act_non_blocking:
return fd.send(data, flags)
# blocking socket behavior - sends all, blocks if the buffer is full
total_sent = 0
len_data = len(data)
while 1:
try:
total_sent += fd.send(data[total_sent:], flags)
except socket.error as e:
if get_errno(e) not in SOCKET_BLOCKING:
raise
if total_sent == len_data:
break
try:
self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
except IOClosed:
raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
return total_sent
def sendall(self, data, flags=0):
tail = self.send(data, flags)
len_data = len(data)
while tail < len_data:
tail += self.send(data[tail:], flags)
def sendto(self, *args):
self._trampoline(self.fd, write=True)
return self.fd.sendto(*args)
def setblocking(self, flag):
if flag:
self.act_non_blocking = False
self._timeout = None
else:
self.act_non_blocking = True
self._timeout = 0.0
def settimeout(self, howlong):
if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
self.setblocking(True)
return
try:
f = howlong.__float__
except AttributeError:
raise TypeError('a float is required')
howlong = f()
if howlong < 0.0:
raise ValueError('Timeout value out of range')
if howlong == 0.0:
self.act_non_blocking = True
self._timeout = 0.0
else:
self.act_non_blocking = False
self._timeout = howlong
def gettimeout(self):
return self._timeout
if "__pypy__" in sys.builtin_module_names:
def _reuse(self):
getattr(self.fd, '_sock', self.fd)._reuse()
def _drop(self):
getattr(self.fd, '_sock', self.fd)._drop()
class _SocketDuckForFd(object):
"""Class implementing all socket method used by _fileobject
in cooperative manner using low level os I/O calls.
"""
_refcount = 0
def __init__(self, fileno):
self._fileno = fileno
notify_opened(fileno)
self._closed = False
def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
if self._closed:
# Don't trampoline if we're already closed.
raise IOClosed()
try:
return trampoline(fd, read=read, write=write, timeout=timeout,
timeout_exc=timeout_exc,
mark_as_closed=self._mark_as_closed)
except IOClosed:
# Our fileno has been obsoleted. Defang ourselves to
# prevent spurious closes.
self._mark_as_closed()
raise
def _defanged_close(self):
# Don't let anything close the wrong filehandle.
pass
def _mark_as_closed(self):
self.close = self._close = self._defanged_close
self._closed = True
@property
def _sock(self):
return self
def fileno(self):
return self._fileno
def recv(self, buflen):
while True:
try:
data = os.read(self._fileno, buflen)
return data
except OSError as e:
if get_errno(e) not in SOCKET_BLOCKING:
raise IOError(*e.args)
self._trampoline(self, read=True)
def recv_into(self, buf, nbytes=0, flags=0):
if nbytes == 0:
nbytes = len(buf)
data = self.recv(nbytes)
buf[:nbytes] = data
return len(data)
def send(self, data):
while True:
try:
os.write(self._fileno, data)
except OSError as e:
if get_errno(e) not in SOCKET_BLOCKING:
raise IOError(*e.args)
trampoline(self, write=True)
def sendall(self, data):
len_data = len(data)
os_write = os.write
fileno = self._fileno
try:
total_sent = os_write(fileno, data)
except OSError as e:
if get_errno(e) != errno.EAGAIN:
raise IOError(*e.args)
total_sent = 0
while total_sent < len_data:
self._trampoline(self, write=True)
try:
total_sent += os_write(fileno, data[total_sent:])
except OSError as e:
if get_errno(e) != errno. EAGAIN:
raise IOError(*e.args)
def __del__(self):
self._close()
def _close(self):
notify_close(self._fileno)
self._mark_as_closed()
try:
os.close(self._fileno)
except:
# os.close may fail if __init__ didn't complete
# (i.e file dscriptor passed to popen was invalid
pass
def __repr__(self):
return "%s:%d" % (self.__class__.__name__, self._fileno)
def _reuse(self):
self._refcount += 1
def _drop(self):
self._refcount -= 1
if self._refcount == 0:
self._close()
# Python3
_decref_socketios = _drop
def _operationOnClosedFile(*args, **kwargs):
raise ValueError("I/O operation on closed file")
class GreenPipe(_fileobject):
"""
GreenPipe is a cooperative replacement for file class.
It will cooperate on pipes. It will block on regular file.
Differneces from file class:
- mode is r/w property. Should re r/o
- encoding property not implemented
- write/writelines will not raise TypeError exception when non-string data is written
it will write str(data) instead
- Universal new lines are not supported and newlines property not implementeded
- file argument can be descriptor, file name or file object.
"""
def __init__(self, f, mode='r', bufsize=-1):
if not isinstance(f, six.string_types + (int, file)):
raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
if isinstance(f, six.string_types):
f = open(f, mode, 0)
if isinstance(f, int):
fileno = f
self._name = "<fd:%d>" % fileno
else:
fileno = os.dup(f.fileno())
self._name = f.name
if f.mode != mode:
raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
self._name = f.name
f.close()
super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
set_nonblocking(self)
self.softspace = 0
@property
def name(self):
return self._name
def __repr__(self):
return "<%s %s %r, mode %r at 0x%x>" % (
self.closed and 'closed' or 'open',
self.__class__.__name__,
self.name,
self.mode,
(id(self) < 0) and (sys.maxint + id(self)) or id(self))
def _defanged_close(self):
pass
def _mark_as_closed(self):
self.close = self._defanged_close
def close(self):
self._mark_as_closed()
super(GreenPipe, self).close()
for method in [
'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
'readline', 'readlines', 'seek', 'tell', 'truncate',
'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
setattr(self, method, _operationOnClosedFile)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def _get_readahead_len(self):
return len(self._rbuf.getvalue())
def _clear_readahead_buf(self):
len = self._get_readahead_len()
if len > 0:
self.read(len)
def tell(self):
self.flush()
try:
return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
except OSError as e:
raise IOError(*e.args)
def seek(self, offset, whence=0):
self.flush()
if whence == 1 and offset == 0: # tell synonym
return self.tell()
if whence == 1: # adjust offset by what is read ahead
offset -= self._get_readahead_len()
try:
rv = os.lseek(self.fileno(), offset, whence)
except OSError as e:
raise IOError(*e.args)
else:
self._clear_readahead_buf()
return rv
if getattr(file, "truncate", None): # not all OSes implement truncate
def truncate(self, size=-1):
self.flush()
if size == -1:
size = self.tell()
try:
rv = os.ftruncate(self.fileno(), size)
except OSError as e:
raise IOError(*e.args)
else:
self.seek(size) # move position&clear buffer
return rv
def isatty(self):
try:
return os.isatty(self.fileno())
except OSError as e:
raise IOError(*e.args)
# import SSL module here so we can refer to greenio.SSL.exceptionclass
try:
from OpenSSL import SSL
except ImportError:
# pyOpenSSL not installed, define exceptions anyway for convenience
class SSL(object):
class WantWriteError(Exception):
pass
class WantReadError(Exception):
pass
class ZeroReturnError(Exception):
pass
class SysCallError(Exception):
pass
def shutdown_safe(sock):
""" Shuts down the socket. This is a convenience method for
code that wants to gracefully handle regular sockets, SSL.Connection
sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
interchangeably. Both types of ssl socket require a shutdown() before
close, but they have different arity on their shutdown method.
Regular sockets don't need a shutdown before close, but it doesn't hurt.
"""
try:
try:
# socket, ssl.SSLSocket
return sock.shutdown(socket.SHUT_RDWR)
except TypeError:
# SSL.Connection
return sock.shutdown()
except socket.error as e:
# we don't care if the socket is already closed;
# this will often be the case in an http server context
if get_errno(e) != errno.ENOTCONN:
raise