greenio, tpool: python3 compatibility

Also:
- PEP-8
- check both EAGAIN/EWOULDBLOCK
- use system implementation of GreenPipe.readinto()
This commit is contained in:
Sergey Shepelev
2014-04-24 17:42:30 +04:00
parent a651a3097f
commit f37a87b1f8
3 changed files with 70 additions and 82 deletions

View File

@@ -1,4 +1,3 @@
import array
import errno
import os
from socket import socket as _original_socket
@@ -60,12 +59,12 @@ def socket_accept(descriptor):
if sys.platform[:3] == "win":
# winsock sometimes throws ENOTCONN
SOCKET_BLOCKING = set((errno.EWOULDBLOCK,))
SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
else:
# oddly, on linux/darwin, an unconnected socket is expected to block,
# so we treat ENOTCONN the same as EWOULDBLOCK
SOCKET_BLOCKING = set((errno.EWOULDBLOCK, errno.ENOTCONN))
SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
@@ -351,7 +350,11 @@ class GreenSocket(object):
class _SocketDuckForFd(object):
""" Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls."""
"""Class implementing all socket method used by _fileobject
in cooperative manner using low level os I/O calls.
"""
_refcount = 0
def __init__(self, fileno):
self._fileno = fileno
@@ -368,10 +371,26 @@ class _SocketDuckForFd(object):
data = os.read(self._fileno, buflen)
return data
except OSError as e:
if get_errno(e) != errno.EAGAIN:
if get_errno(e) not in SOCKET_BLOCKING:
raise IOError(*e.args)
trampoline(self, read=True)
def recv_into(self, buf, nbytes=0, flags=0):
if nbytes == 0:
nbytes = len(buf)
data = self.recv(nbytes)
buf[:nbytes] = data
return len(data)
def send(self, data):
while True:
try:
os.write(self._fileno, data)
except OSError as e:
if get_errno(e) not in SOCKET_BLOCKING:
raise IOError(*e.args)
trampoline(self, write=True)
def sendall(self, data):
len_data = len(data)
os_write = os.write
@@ -397,22 +416,22 @@ class _SocketDuckForFd(object):
try:
os.close(self._fileno)
except:
# os.close may fail if __init__ didn't complete (i.e file dscriptor passed to popen was invalid
# os.close may fail if __init__ didn't complete
# (i.e file dscriptor passed to popen was invalid
pass
def __repr__(self):
return "%s:%d" % (self.__class__.__name__, self._fileno)
if "__pypy__" in sys.builtin_module_names:
_refcount = 0
def _reuse(self):
self._refcount += 1
def _reuse(self):
self._refcount += 1
def _drop(self):
self._refcount -= 1
if self._refcount == 0:
self._close()
def _drop(self):
self._refcount -= 1
if self._refcount == 0:
self._close()
# Python3
_decref_socketios = _drop
def _operationOnClosedFile(*args, **kwargs):
@@ -449,7 +468,7 @@ class GreenPipe(_fileobject):
self._name = f.name
f.close()
super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize)
super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
set_nonblocking(self)
self.softspace = 0
@@ -470,7 +489,7 @@ class GreenPipe(_fileobject):
for method in [
'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
'readline', 'readlines', 'seek', 'tell', 'truncate',
'write', 'xreadlines', '__iter__', 'writelines']:
'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
setattr(self, method, _operationOnClosedFile)
def __enter__(self):
@@ -479,17 +498,6 @@ class GreenPipe(_fileobject):
def __exit__(self, *args):
self.close()
def readinto(self, buf):
data = self.read(len(buf)) # FIXME could it be done without allocating intermediate?
n = len(data)
try:
buf[:n] = data
except TypeError as err:
if not isinstance(buf, array.array):
raise err
buf[:n] = array.array('c', data)
return n
def _get_readahead_len(self):
return len(self._rbuf.getvalue())

View File

@@ -21,39 +21,38 @@ import traceback
from eventlet import event, greenio, greenthread, patcher, timeout
from eventlet.support import six
__all__ = ['execute', 'Proxy', 'killall', 'set_num_threads']
EXC_CLASSES = (Exception, timeout.Timeout)
SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit)
QUIET = True
socket = patcher.original('socket')
threading = patcher.original('threading')
if six.PY2:
Queue_module = patcher.original('Queue')
if six.PY3:
Queue_module = patcher.original('queue')
Queue = Queue_module.Queue
Empty = Queue_module.Empty
__all__ = ['execute', 'Proxy', 'killall']
QUIET = True
_rfile = _wfile = None
Queue = Queue_module.Queue
_bytetosend = ' '.encode()
def _signal_t2e():
_wfile.write(_bytetosend)
_wfile.flush()
_reqq = None
_rspq = None
_coro = None
_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
_reqq = _rspq = None
_rsock = _wsock = None
_setup_already = False
_threads = []
def tpool_trampoline():
global _rspq
while True:
try:
_c = _rfile.read(1)
_c = _rsock.recv(1)
assert _c
except ValueError:
break # will be raised when pipe is closed
@@ -66,13 +65,9 @@ def tpool_trampoline():
pass
SYS_EXCS = (KeyboardInterrupt, SystemExit)
EXC_CLASSES = (Exception, timeout.Timeout)
def tworker():
global _rspq
while(True):
while True:
try:
msg = _reqq.get()
except AttributeError:
@@ -91,7 +86,7 @@ def tworker():
# exc_info does not lead to memory leaks
_rspq.put((e, rv))
msg = meth = args = kwargs = e = rv = None
_signal_t2e()
_wsock.sendall(_bytetosend)
def execute(meth, *args, **kwargs):
@@ -248,40 +243,25 @@ class Proxy(object):
return Proxy(it)
def next(self):
return proxy_call(self._autowrap, self._obj.next)
return proxy_call(self._autowrap, next, self._obj)
# Python3
__next__ = next
_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
_threads = []
_coro = None
_setup_already = False
def setup():
global _rfile, _wfile, _threads, _coro, _setup_already, _rspq, _reqq
global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
if _setup_already:
return
else:
_setup_already = True
try:
_rpipe, _wpipe = os.pipe()
_wfile = greenio.GreenPipe(_wpipe, 'wb', 0)
_rfile = greenio.GreenPipe(_rpipe, 'rb', 0)
except (ImportError, NotImplementedError):
# This is Windows compatibility -- use a socket instead of a pipe because
# pipes don't really exist on Windows.
import socket
from eventlet import util
sock = util.__original_socket__(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 0))
sock.listen(50)
csock = util.__original_socket__(socket.AF_INET, socket.SOCK_STREAM)
csock.connect(('localhost', sock.getsockname()[1]))
nsock, addr = sock.accept()
_rfile = greenio.GreenSocket(csock).makefile('rb', 0)
_wfile = nsock.makefile('wb', 0)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', 0))
sock.listen(1)
csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
csock.connect(sock.getsockname())
_wsock, _addr = sock.accept()
_rsock = greenio.GreenSocket(csock)
_reqq = Queue(maxsize=-1)
_rspq = Queue(maxsize=-1)
@@ -302,7 +282,7 @@ def setup():
def killall():
global _setup_already, _rspq, _rfile, _wfile
global _setup_already, _rspq, _rsock, _wsock
if not _setup_already:
return
for thr in _threads:
@@ -312,10 +292,10 @@ def killall():
del _threads[:]
if _coro is not None:
greenthread.kill(_coro)
_rfile.close()
_wfile.close()
_rfile = None
_wfile = None
_rsock.close()
_wsock.close()
_rsock = None
_wsock = None
_rspq = None
_setup_already = False

View File

@@ -80,7 +80,7 @@ class TestTpool(LimitedTestCase):
def test_wrap_dict(self):
my_object = {'a': 1}
prox = tpool.Proxy(my_object)
self.assertEqual('a', prox.keys()[0])
self.assertEqual('a', list(prox.keys())[0])
self.assertEqual(1, prox['a'])
self.assertEqual(str(my_object), str(prox))
self.assertEqual(repr(my_object), repr(prox))