483 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			483 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import errno
 | |
| import os
 | |
| import socket
 | |
| import sys
 | |
| import time
 | |
| import warnings
 | |
| 
 | |
| import eventlet
 | |
| from eventlet.hubs import trampoline, notify_opened, IOClosed
 | |
| from eventlet.support import get_errno, six
 | |
| 
 | |
| __all__ = [
 | |
|     'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking',
 | |
|     'SOCKET_CLOSED', 'CONNECT_ERR', 'CONNECT_SUCCESS',
 | |
|     'shutdown_safe', 'SSL',
 | |
| ]
 | |
| 
 | |
| BUFFER_SIZE = 4096
 | |
| CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
 | |
| CONNECT_SUCCESS = set((0, errno.EISCONN))
 | |
| if sys.platform[:3] == "win":
 | |
|     CONNECT_ERR.add(errno.WSAEINVAL)   # Bug 67
 | |
| 
 | |
| if six.PY2:
 | |
|     _python2_fileobject = socket._fileobject
 | |
| 
 | |
| _original_socket = eventlet.patcher.original('socket').socket
 | |
| 
 | |
| 
 | |
| def socket_connect(descriptor, address):
 | |
|     """
 | |
|     Attempts to connect to the address, returns the descriptor if it succeeds,
 | |
|     returns None if it needs to trampoline, and raises any exceptions.
 | |
|     """
 | |
|     err = descriptor.connect_ex(address)
 | |
|     if err in CONNECT_ERR:
 | |
|         return None
 | |
|     if err not in CONNECT_SUCCESS:
 | |
|         raise socket.error(err, errno.errorcode[err])
 | |
|     return descriptor
 | |
| 
 | |
| 
 | |
| def socket_checkerr(descriptor):
 | |
|     err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
 | |
|     if err not in CONNECT_SUCCESS:
 | |
|         raise socket.error(err, errno.errorcode[err])
 | |
| 
 | |
| 
 | |
| def socket_accept(descriptor):
 | |
|     """
 | |
|     Attempts to accept() on the descriptor, returns a client,address tuple
 | |
|     if it succeeds; returns None if it needs to trampoline, and raises
 | |
|     any exceptions.
 | |
|     """
 | |
|     try:
 | |
|         return descriptor.accept()
 | |
|     except socket.error as e:
 | |
|         if get_errno(e) == errno.EWOULDBLOCK:
 | |
|             return None
 | |
|         raise
 | |
| 
 | |
| 
 | |
| if sys.platform[:3] == "win":
 | |
|     # winsock sometimes throws ENOTCONN
 | |
|     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
 | |
|     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
 | |
| else:
 | |
|     # oddly, on linux/darwin, an unconnected socket is expected to block,
 | |
|     # so we treat ENOTCONN the same as EWOULDBLOCK
 | |
|     SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
 | |
|     SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
 | |
| 
 | |
| 
 | |
| def set_nonblocking(fd):
 | |
|     """
 | |
|     Sets the descriptor to be nonblocking.  Works on many file-like
 | |
|     objects as well as sockets.  Only sockets can be nonblocking on
 | |
|     Windows, however.
 | |
|     """
 | |
|     try:
 | |
|         setblocking = fd.setblocking
 | |
|     except AttributeError:
 | |
|         # fd has no setblocking() method. It could be that this version of
 | |
|         # Python predates socket.setblocking(). In that case, we can still set
 | |
|         # the flag "by hand" on the underlying OS fileno using the fcntl
 | |
|         # module.
 | |
|         try:
 | |
|             import fcntl
 | |
|         except ImportError:
 | |
|             # Whoops, Windows has no fcntl module. This might not be a socket
 | |
|             # at all, but rather a file-like object with no setblocking()
 | |
|             # method. In particular, on Windows, pipes don't support
 | |
|             # non-blocking I/O and therefore don't have that method. Which
 | |
|             # means fcntl wouldn't help even if we could load it.
 | |
|             raise NotImplementedError("set_nonblocking() on a file object "
 | |
|                                       "with no setblocking() method "
 | |
|                                       "(Windows pipes don't support non-blocking I/O)")
 | |
|         # We managed to import fcntl.
 | |
|         fileno = fd.fileno()
 | |
|         orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
 | |
|         new_flags = orig_flags | os.O_NONBLOCK
 | |
|         if new_flags != orig_flags:
 | |
|             fcntl.fcntl(fileno, fcntl.F_SETFL, new_flags)
 | |
|     else:
 | |
|         # socket supports setblocking()
 | |
|         setblocking(0)
 | |
| 
 | |
| 
 | |
| try:
 | |
|     from socket import _GLOBAL_DEFAULT_TIMEOUT
 | |
| except ImportError:
 | |
|     _GLOBAL_DEFAULT_TIMEOUT = object()
 | |
| 
 | |
| 
 | |
| class GreenSocket(object):
 | |
|     """
 | |
|     Green version of socket.socket class, that is intended to be 100%
 | |
|     API-compatible.
 | |
| 
 | |
|     It also recognizes the keyword parameter, 'set_nonblocking=True'.
 | |
|     Pass False to indicate that socket is already in non-blocking mode
 | |
|     to save syscalls.
 | |
|     """
 | |
| 
 | |
|     # This placeholder is to prevent __getattr__ from creating an infinite call loop
 | |
|     fd = None
 | |
| 
 | |
|     def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
 | |
|         should_set_nonblocking = kwargs.pop('set_nonblocking', True)
 | |
|         if isinstance(family_or_realsock, six.integer_types):
 | |
|             fd = _original_socket(family_or_realsock, *args, **kwargs)
 | |
|             # Notify the hub that this is a newly-opened socket.
 | |
|             notify_opened(fd.fileno())
 | |
|         else:
 | |
|             fd = family_or_realsock
 | |
| 
 | |
|         # import timeout from other socket, if it was there
 | |
|         try:
 | |
|             self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
 | |
|         except AttributeError:
 | |
|             self._timeout = socket.getdefaulttimeout()
 | |
| 
 | |
|         if should_set_nonblocking:
 | |
|             set_nonblocking(fd)
 | |
|         self.fd = fd
 | |
|         # when client calls setblocking(0) or settimeout(0) the socket must
 | |
|         # act non-blocking
 | |
|         self.act_non_blocking = False
 | |
| 
 | |
|         # Copy some attributes from underlying real socket.
 | |
|         # This is the easiest way that i found to fix
 | |
|         # https://bitbucket.org/eventlet/eventlet/issue/136
 | |
|         # Only `getsockopt` is required to fix that issue, others
 | |
|         # are just premature optimization to save __getattr__ call.
 | |
|         self.bind = fd.bind
 | |
|         self.close = fd.close
 | |
|         self.fileno = fd.fileno
 | |
|         self.getsockname = fd.getsockname
 | |
|         self.getsockopt = fd.getsockopt
 | |
|         self.listen = fd.listen
 | |
|         self.setsockopt = fd.setsockopt
 | |
|         self.shutdown = fd.shutdown
 | |
|         self._closed = False
 | |
| 
 | |
|     @property
 | |
|     def _sock(self):
 | |
|         return self
 | |
| 
 | |
|     if six.PY3:
 | |
|         def _get_io_refs(self):
 | |
|             return self.fd._io_refs
 | |
| 
 | |
|         def _set_io_refs(self, value):
 | |
|             self.fd._io_refs = value
 | |
| 
 | |
|         _io_refs = property(_get_io_refs, _set_io_refs)
 | |
| 
 | |
|     # Forward unknown attributes to fd, cache the value for future use.
 | |
|     # I do not see any simple attribute which could be changed
 | |
|     # so caching everything in self is fine.
 | |
|     # If we find such attributes - only attributes having __get__ might be cached.
 | |
|     # For now - I do not want to complicate it.
 | |
|     def __getattr__(self, name):
 | |
|         if self.fd is None:
 | |
|             raise AttributeError(name)
 | |
|         attr = getattr(self.fd, name)
 | |
|         setattr(self, name, attr)
 | |
|         return attr
 | |
| 
 | |
|     def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
 | |
|         """ We need to trampoline via the event hub.
 | |
|             We catch any signal back from the hub indicating that the operation we
 | |
|             were waiting on was associated with a filehandle that's since been
 | |
|             invalidated.
 | |
|         """
 | |
|         if self._closed:
 | |
|             # If we did any logging, alerting to a second trampoline attempt on a closed
 | |
|             # socket here would be useful.
 | |
|             raise IOClosed()
 | |
|         try:
 | |
|             return trampoline(fd, read=read, write=write, timeout=timeout,
 | |
|                               timeout_exc=timeout_exc,
 | |
|                               mark_as_closed=self._mark_as_closed)
 | |
|         except IOClosed:
 | |
|             # This socket's been obsoleted. De-fang it.
 | |
|             self._mark_as_closed()
 | |
|             raise
 | |
| 
 | |
|     def accept(self):
 | |
|         if self.act_non_blocking:
 | |
|             return self.fd.accept()
 | |
|         fd = self.fd
 | |
|         while True:
 | |
|             res = socket_accept(fd)
 | |
|             if res is not None:
 | |
|                 client, addr = res
 | |
|                 set_nonblocking(client)
 | |
|                 return type(self)(client), addr
 | |
|             self._trampoline(fd, read=True, timeout=self.gettimeout(),
 | |
|                              timeout_exc=socket.timeout("timed out"))
 | |
| 
 | |
|     def _mark_as_closed(self):
 | |
|         """ Mark this socket as being closed """
 | |
|         self._closed = True
 | |
| 
 | |
|     def __del__(self):
 | |
|         # This is in case self.close is not assigned yet (currently the constructor does it)
 | |
|         close = getattr(self, 'close', None)
 | |
|         if close is not None:
 | |
|             close()
 | |
| 
 | |
|     def connect(self, address):
 | |
|         if self.act_non_blocking:
 | |
|             return self.fd.connect(address)
 | |
|         fd = self.fd
 | |
|         if self.gettimeout() is None:
 | |
|             while not socket_connect(fd, address):
 | |
|                 try:
 | |
|                     self._trampoline(fd, write=True)
 | |
|                 except IOClosed:
 | |
|                     raise socket.error(errno.EBADFD)
 | |
|                 socket_checkerr(fd)
 | |
|         else:
 | |
|             end = time.time() + self.gettimeout()
 | |
|             while True:
 | |
|                 if socket_connect(fd, address):
 | |
|                     return
 | |
|                 if time.time() >= end:
 | |
|                     raise socket.timeout("timed out")
 | |
|                 try:
 | |
|                     self._trampoline(fd, write=True, timeout=end - time.time(),
 | |
|                                      timeout_exc=socket.timeout("timed out"))
 | |
|                 except IOClosed:
 | |
|                     # ... we need some workable errno here.
 | |
|                     raise socket.error(errno.EBADFD)
 | |
|                 socket_checkerr(fd)
 | |
| 
 | |
|     def connect_ex(self, address):
 | |
|         if self.act_non_blocking:
 | |
|             return self.fd.connect_ex(address)
 | |
|         fd = self.fd
 | |
|         if self.gettimeout() is None:
 | |
|             while not socket_connect(fd, address):
 | |
|                 try:
 | |
|                     self._trampoline(fd, write=True)
 | |
|                     socket_checkerr(fd)
 | |
|                 except socket.error as ex:
 | |
|                     return get_errno(ex)
 | |
|                 except IOClosed:
 | |
|                     return errno.EBADFD
 | |
|         else:
 | |
|             end = time.time() + self.gettimeout()
 | |
|             while True:
 | |
|                 try:
 | |
|                     if socket_connect(fd, address):
 | |
|                         return 0
 | |
|                     if time.time() >= end:
 | |
|                         raise socket.timeout(errno.EAGAIN)
 | |
|                     self._trampoline(fd, write=True, timeout=end - time.time(),
 | |
|                                      timeout_exc=socket.timeout(errno.EAGAIN))
 | |
|                     socket_checkerr(fd)
 | |
|                 except socket.error as ex:
 | |
|                     return get_errno(ex)
 | |
|                 except IOClosed:
 | |
|                     return errno.EBADFD
 | |
| 
 | |
|     def dup(self, *args, **kw):
 | |
|         sock = self.fd.dup(*args, **kw)
 | |
|         newsock = type(self)(sock, set_nonblocking=False)
 | |
|         newsock.settimeout(self.gettimeout())
 | |
|         return newsock
 | |
| 
 | |
|     if six.PY3:
 | |
|         def makefile(self, *args, **kwargs):
 | |
|             return _original_socket.makefile(self, *args, **kwargs)
 | |
|     else:
 | |
|         def makefile(self, *args, **kwargs):
 | |
|             dupped = self.dup()
 | |
|             res = _python2_fileobject(dupped, *args, **kwargs)
 | |
|             if hasattr(dupped, "_drop"):
 | |
|                 dupped._drop()
 | |
|             return res
 | |
| 
 | |
|     def makeGreenFile(self, *args, **kw):
 | |
|         warnings.warn("makeGreenFile has been deprecated, please use "
 | |
|                       "makefile instead", DeprecationWarning, stacklevel=2)
 | |
|         return self.makefile(*args, **kw)
 | |
| 
 | |
|     def _read_trampoline(self):
 | |
|         self._trampoline(
 | |
|             self.fd,
 | |
|             read=True,
 | |
|             timeout=self.gettimeout(),
 | |
|             timeout_exc=socket.timeout("timed out"))
 | |
| 
 | |
|     def _recv_loop(self, recv_meth, *args):
 | |
|         fd = self.fd
 | |
|         if self.act_non_blocking:
 | |
|             return recv_meth(*args)
 | |
| 
 | |
|         while True:
 | |
|             try:
 | |
|                 # recv: bufsize=0?
 | |
|                 # recv_into: buffer is empty?
 | |
|                 # This is needed because behind the scenes we use sockets in
 | |
|                 # nonblocking mode and builtin recv* methods. Attempting to read
 | |
|                 # 0 bytes from a nonblocking socket using a builtin recv* method
 | |
|                 # does not raise a timeout exception. Since we're simulating
 | |
|                 # a blocking socket here we need to produce a timeout exception
 | |
|                 # if needed, hence the call to trampoline.
 | |
|                 if not args[0]:
 | |
|                     self._read_trampoline()
 | |
|                 return recv_meth(*args)
 | |
|             except socket.error as e:
 | |
|                 if get_errno(e) in SOCKET_BLOCKING:
 | |
|                     pass
 | |
|                 elif get_errno(e) in SOCKET_CLOSED:
 | |
|                     return b''
 | |
|                 else:
 | |
|                     raise
 | |
| 
 | |
|             try:
 | |
|                 self._read_trampoline()
 | |
|             except IOClosed as e:
 | |
|                 # Perhaps we should return '' instead?
 | |
|                 raise EOFError()
 | |
| 
 | |
|     def recv(self, bufsize, flags=0):
 | |
|         return self._recv_loop(self.fd.recv, bufsize, flags)
 | |
| 
 | |
|     def recvfrom(self, bufsize, flags=0):
 | |
|         return self._recv_loop(self.fd.recvfrom, bufsize, flags)
 | |
| 
 | |
|     def recv_into(self, buffer, nbytes=0, flags=0):
 | |
|         return self._recv_loop(self.fd.recv_into, buffer, nbytes, flags)
 | |
| 
 | |
|     def recvfrom_into(self, buffer, nbytes=0, flags=0):
 | |
|         return self._recv_loop(self.fd.recvfrom_into, buffer, nbytes, flags)
 | |
| 
 | |
|     def _send_loop(self, send_method, data, *args):
 | |
|         if self.act_non_blocking:
 | |
|             return send_method(data, *args)
 | |
| 
 | |
|         while 1:
 | |
|             try:
 | |
|                 return send_method(data, *args)
 | |
|             except socket.error as e:
 | |
|                 eno = get_errno(e)
 | |
|                 if eno == errno.ENOTCONN or eno not in SOCKET_BLOCKING:
 | |
|                     raise
 | |
| 
 | |
|             try:
 | |
|                 self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
 | |
|                                  timeout_exc=socket.timeout("timed out"))
 | |
|             except IOClosed:
 | |
|                 raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
 | |
| 
 | |
|     def send(self, data, flags=0):
 | |
|         return self._send_loop(self.fd.send, data, flags)
 | |
| 
 | |
|     def sendto(self, data, *args):
 | |
|         return self._send_loop(self.fd.sendto, data, *args)
 | |
| 
 | |
|     def sendall(self, data, flags=0):
 | |
|         tail = self.send(data, flags)
 | |
|         len_data = len(data)
 | |
|         while tail < len_data:
 | |
|             tail += self.send(data[tail:], flags)
 | |
| 
 | |
|     def setblocking(self, flag):
 | |
|         if flag:
 | |
|             self.act_non_blocking = False
 | |
|             self._timeout = None
 | |
|         else:
 | |
|             self.act_non_blocking = True
 | |
|             self._timeout = 0.0
 | |
| 
 | |
|     def settimeout(self, howlong):
 | |
|         if howlong is None or howlong == _GLOBAL_DEFAULT_TIMEOUT:
 | |
|             self.setblocking(True)
 | |
|             return
 | |
|         try:
 | |
|             f = howlong.__float__
 | |
|         except AttributeError:
 | |
|             raise TypeError('a float is required')
 | |
|         howlong = f()
 | |
|         if howlong < 0.0:
 | |
|             raise ValueError('Timeout value out of range')
 | |
|         if howlong == 0.0:
 | |
|             self.act_non_blocking = True
 | |
|             self._timeout = 0.0
 | |
|         else:
 | |
|             self.act_non_blocking = False
 | |
|             self._timeout = howlong
 | |
| 
 | |
|     def gettimeout(self):
 | |
|         return self._timeout
 | |
| 
 | |
|     if "__pypy__" in sys.builtin_module_names:
 | |
|         def _reuse(self):
 | |
|             getattr(self.fd, '_sock', self.fd)._reuse()
 | |
| 
 | |
|         def _drop(self):
 | |
|             getattr(self.fd, '_sock', self.fd)._drop()
 | |
| 
 | |
| 
 | |
| def _operation_on_closed_file(*args, **kwargs):
 | |
|     raise ValueError("I/O operation on closed file")
 | |
| 
 | |
| 
 | |
| greenpipe_doc = """
 | |
|     GreenPipe is a cooperative replacement for file class.
 | |
|     It will cooperate on pipes. It will block on regular file.
 | |
|     Differneces from file class:
 | |
|     - mode is r/w property. Should re r/o
 | |
|     - encoding property not implemented
 | |
|     - write/writelines will not raise TypeError exception when non-string data is written
 | |
|       it will write str(data) instead
 | |
|     - Universal new lines are not supported and newlines property not implementeded
 | |
|     - file argument can be descriptor, file name or file object.
 | |
|     """
 | |
| 
 | |
| # import SSL module here so we can refer to greenio.SSL.exceptionclass
 | |
| try:
 | |
|     from OpenSSL import SSL
 | |
| except ImportError:
 | |
|     # pyOpenSSL not installed, define exceptions anyway for convenience
 | |
|     class SSL(object):
 | |
|         class WantWriteError(Exception):
 | |
|             pass
 | |
| 
 | |
|         class WantReadError(Exception):
 | |
|             pass
 | |
| 
 | |
|         class ZeroReturnError(Exception):
 | |
|             pass
 | |
| 
 | |
|         class SysCallError(Exception):
 | |
|             pass
 | |
| 
 | |
| 
 | |
| def shutdown_safe(sock):
 | |
|     """ Shuts down the socket. This is a convenience method for
 | |
|     code that wants to gracefully handle regular sockets, SSL.Connection
 | |
|     sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
 | |
|     interchangeably.  Both types of ssl socket require a shutdown() before
 | |
|     close, but they have different arity on their shutdown method.
 | |
| 
 | |
|     Regular sockets don't need a shutdown before close, but it doesn't hurt.
 | |
|     """
 | |
|     try:
 | |
|         try:
 | |
|             # socket, ssl.SSLSocket
 | |
|             return sock.shutdown(socket.SHUT_RDWR)
 | |
|         except TypeError:
 | |
|             # SSL.Connection
 | |
|             return sock.shutdown()
 | |
|     except socket.error as e:
 | |
|         # we don't care if the socket is already closed;
 | |
|         # this will often be the case in an http server context
 | |
|         if get_errno(e) not in (errno.ENOTCONN, errno.EBADF):
 | |
|             raise
 | 
