From f37a87b1f8e2f09b8a30c2e947486cf0858650cd Mon Sep 17 00:00:00 2001 From: Sergey Shepelev Date: Thu, 24 Apr 2014 17:42:30 +0400 Subject: [PATCH] greenio, tpool: python3 compatibility Also: - PEP-8 - check both EAGAIN/EWOULDBLOCK - use system implementation of GreenPipe.readinto() --- eventlet/greenio.py | 64 ++++++++++++++++++--------------- eventlet/tpool.py | 86 +++++++++++++++++---------------------------- tests/tpool_test.py | 2 +- 3 files changed, 70 insertions(+), 82 deletions(-) diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 44a4e00..12f5f06 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -1,4 +1,3 @@ -import array import errno import os from socket import socket as _original_socket @@ -60,12 +59,12 @@ def socket_accept(descriptor): if sys.platform[:3] == "win": # winsock sometimes throws ENOTCONN - SOCKET_BLOCKING = set((errno.EWOULDBLOCK,)) + SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,)) SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN)) else: # oddly, on linux/darwin, an unconnected socket is expected to block, # so we treat ENOTCONN the same as EWOULDBLOCK - SOCKET_BLOCKING = set((errno.EWOULDBLOCK, errno.ENOTCONN)) + SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN)) SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE)) @@ -351,7 +350,11 @@ class GreenSocket(object): class _SocketDuckForFd(object): - """ Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls.""" + """Class implementing all socket method used by _fileobject + in cooperative manner using low level os I/O calls. + """ + _refcount = 0 + def __init__(self, fileno): self._fileno = fileno @@ -368,10 +371,26 @@ class _SocketDuckForFd(object): data = os.read(self._fileno, buflen) return data except OSError as e: - if get_errno(e) != errno.EAGAIN: + if get_errno(e) not in SOCKET_BLOCKING: raise IOError(*e.args) trampoline(self, read=True) + def recv_into(self, buf, nbytes=0, flags=0): + if nbytes == 0: + nbytes = len(buf) + data = self.recv(nbytes) + buf[:nbytes] = data + return len(data) + + def send(self, data): + while True: + try: + os.write(self._fileno, data) + except OSError as e: + if get_errno(e) not in SOCKET_BLOCKING: + raise IOError(*e.args) + trampoline(self, write=True) + def sendall(self, data): len_data = len(data) os_write = os.write @@ -397,22 +416,22 @@ class _SocketDuckForFd(object): try: os.close(self._fileno) except: - # os.close may fail if __init__ didn't complete (i.e file dscriptor passed to popen was invalid + # os.close may fail if __init__ didn't complete + # (i.e file dscriptor passed to popen was invalid pass def __repr__(self): return "%s:%d" % (self.__class__.__name__, self._fileno) - if "__pypy__" in sys.builtin_module_names: - _refcount = 0 + def _reuse(self): + self._refcount += 1 - def _reuse(self): - self._refcount += 1 - - def _drop(self): - self._refcount -= 1 - if self._refcount == 0: - self._close() + def _drop(self): + self._refcount -= 1 + if self._refcount == 0: + self._close() + # Python3 + _decref_socketios = _drop def _operationOnClosedFile(*args, **kwargs): @@ -449,7 +468,7 @@ class GreenPipe(_fileobject): self._name = f.name f.close() - super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize) + super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode) set_nonblocking(self) self.softspace = 0 @@ -470,7 +489,7 @@ class GreenPipe(_fileobject): for method in [ 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto', 'readline', 'readlines', 'seek', 'tell', 'truncate', - 'write', 'xreadlines', '__iter__', 'writelines']: + 'write', 'xreadlines', '__iter__', '__next__', 'writelines']: setattr(self, method, _operationOnClosedFile) def __enter__(self): @@ -479,17 +498,6 @@ class GreenPipe(_fileobject): def __exit__(self, *args): self.close() - def readinto(self, buf): - data = self.read(len(buf)) # FIXME could it be done without allocating intermediate? - n = len(data) - try: - buf[:n] = data - except TypeError as err: - if not isinstance(buf, array.array): - raise err - buf[:n] = array.array('c', data) - return n - def _get_readahead_len(self): return len(self._rbuf.getvalue()) diff --git a/eventlet/tpool.py b/eventlet/tpool.py index e4e3a7b..82aae4b 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -21,39 +21,38 @@ import traceback from eventlet import event, greenio, greenthread, patcher, timeout from eventlet.support import six +__all__ = ['execute', 'Proxy', 'killall', 'set_num_threads'] + +EXC_CLASSES = (Exception, timeout.Timeout) +SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit) + +QUIET = True + +socket = patcher.original('socket') threading = patcher.original('threading') if six.PY2: Queue_module = patcher.original('Queue') if six.PY3: Queue_module = patcher.original('queue') -Queue = Queue_module.Queue Empty = Queue_module.Empty - -__all__ = ['execute', 'Proxy', 'killall'] - -QUIET = True - -_rfile = _wfile = None +Queue = Queue_module.Queue _bytetosend = ' '.encode() - - -def _signal_t2e(): - _wfile.write(_bytetosend) - _wfile.flush() - - -_reqq = None -_rspq = None +_coro = None +_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20)) +_reqq = _rspq = None +_rsock = _wsock = None +_setup_already = False +_threads = [] def tpool_trampoline(): global _rspq while True: try: - _c = _rfile.read(1) + _c = _rsock.recv(1) assert _c except ValueError: break # will be raised when pipe is closed @@ -66,13 +65,9 @@ def tpool_trampoline(): pass -SYS_EXCS = (KeyboardInterrupt, SystemExit) -EXC_CLASSES = (Exception, timeout.Timeout) - - def tworker(): global _rspq - while(True): + while True: try: msg = _reqq.get() except AttributeError: @@ -91,7 +86,7 @@ def tworker(): # exc_info does not lead to memory leaks _rspq.put((e, rv)) msg = meth = args = kwargs = e = rv = None - _signal_t2e() + _wsock.sendall(_bytetosend) def execute(meth, *args, **kwargs): @@ -248,40 +243,25 @@ class Proxy(object): return Proxy(it) def next(self): - return proxy_call(self._autowrap, self._obj.next) + return proxy_call(self._autowrap, next, self._obj) # Python3 __next__ = next -_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20)) -_threads = [] -_coro = None -_setup_already = False - - def setup(): - global _rfile, _wfile, _threads, _coro, _setup_already, _rspq, _reqq + global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq if _setup_already: return else: _setup_already = True - try: - _rpipe, _wpipe = os.pipe() - _wfile = greenio.GreenPipe(_wpipe, 'wb', 0) - _rfile = greenio.GreenPipe(_rpipe, 'rb', 0) - except (ImportError, NotImplementedError): - # This is Windows compatibility -- use a socket instead of a pipe because - # pipes don't really exist on Windows. - import socket - from eventlet import util - sock = util.__original_socket__(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('localhost', 0)) - sock.listen(50) - csock = util.__original_socket__(socket.AF_INET, socket.SOCK_STREAM) - csock.connect(('localhost', sock.getsockname()[1])) - nsock, addr = sock.accept() - _rfile = greenio.GreenSocket(csock).makefile('rb', 0) - _wfile = nsock.makefile('wb', 0) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('', 0)) + sock.listen(1) + csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + csock.connect(sock.getsockname()) + _wsock, _addr = sock.accept() + _rsock = greenio.GreenSocket(csock) _reqq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1) @@ -302,7 +282,7 @@ def setup(): def killall(): - global _setup_already, _rspq, _rfile, _wfile + global _setup_already, _rspq, _rsock, _wsock if not _setup_already: return for thr in _threads: @@ -312,10 +292,10 @@ def killall(): del _threads[:] if _coro is not None: greenthread.kill(_coro) - _rfile.close() - _wfile.close() - _rfile = None - _wfile = None + _rsock.close() + _wsock.close() + _rsock = None + _wsock = None _rspq = None _setup_already = False diff --git a/tests/tpool_test.py b/tests/tpool_test.py index 35b6056..6ad4355 100644 --- a/tests/tpool_test.py +++ b/tests/tpool_test.py @@ -80,7 +80,7 @@ class TestTpool(LimitedTestCase): def test_wrap_dict(self): my_object = {'a': 1} prox = tpool.Proxy(my_object) - self.assertEqual('a', prox.keys()[0]) + self.assertEqual('a', list(prox.keys())[0]) self.assertEqual(1, prox['a']) self.assertEqual(str(my_object), str(prox)) self.assertEqual(repr(my_object), repr(prox))