From f5fe024979a55b56faa38ff050686a52cf5958ec Mon Sep 17 00:00:00 2001 From: Sergey Shepelev Date: Fri, 14 Dec 2012 17:30:55 +0400 Subject: [PATCH] greenio: pep8, pyflakes --- eventlet/greenio.py | 47 ++++++++------ tests/greenio_test.py | 139 +++++++++++++++++++++++------------------- 2 files changed, 103 insertions(+), 83 deletions(-) diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 1e526ca..531bad1 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -2,6 +2,7 @@ from eventlet.support import get_errno from eventlet.hubs import trampoline BUFFER_SIZE = 4096 +import array import errno import os import socket @@ -14,7 +15,7 @@ __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe'] CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)) CONNECT_SUCCESS = set((0, errno.EISCONN)) -if sys.platform[:3]=="win": +if sys.platform[:3] == "win": CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67 # Emulate _fileobject class in 3.x implementation @@ -22,9 +23,10 @@ if sys.platform[:3]=="win": try: _fileobject = socket._fileobject except AttributeError: - def _fileobject(sock, *args, **kwargs): + def _fileobject(sock, *args, **kwargs): return _original_socket.makefile(sock, *args, **kwargs) + def socket_connect(descriptor, address): """ Attempts to connect to the address, returns the descriptor if it succeeds, @@ -37,11 +39,13 @@ def socket_connect(descriptor, address): raise socket.error(err, errno.errorcode[err]) return descriptor + def socket_checkerr(descriptor): err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err not in CONNECT_SUCCESS: raise socket.error(err, errno.errorcode[err]) + def socket_accept(descriptor): """ Attempts to accept() on the descriptor, returns a client,address tuple @@ -56,7 +60,7 @@ def socket_accept(descriptor): raise -if sys.platform[:3]=="win": +if sys.platform[:3] == "win": # winsock sometimes throws ENOTCONN SOCKET_BLOCKING = set((errno.EWOULDBLOCK,)) SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN)) @@ -124,7 +128,7 @@ class GreenSocket(object): self._timeout = fd.gettimeout() or socket.getdefaulttimeout() except AttributeError: self._timeout = socket.getdefaulttimeout() - + set_nonblocking(fd) self.fd = fd # when client calls setblocking(0) or settimeout(0) the socket must @@ -157,7 +161,7 @@ class GreenSocket(object): set_nonblocking(client) return type(self)(client), addr trampoline(fd, read=True, timeout=self.gettimeout(), - timeout_exc=socket.timeout("timed out")) + timeout_exc=socket.timeout("timed out")) def connect(self, address): if self.act_non_blocking: @@ -174,7 +178,7 @@ class GreenSocket(object): return if time.time() >= end: raise socket.timeout("timed out") - trampoline(fd, write=True, timeout=end-time.time(), + trampoline(fd, write=True, timeout=end - time.time(), timeout_exc=socket.timeout("timed out")) socket_checkerr(fd) @@ -197,7 +201,7 @@ class GreenSocket(object): return 0 if time.time() >= end: raise socket.timeout(errno.EAGAIN) - trampoline(fd, write=True, timeout=end-time.time(), + trampoline(fd, write=True, timeout=end - time.time(), timeout_exc=socket.timeout(errno.EAGAIN)) socket_checkerr(fd) except socket.error, ex: @@ -232,9 +236,9 @@ class GreenSocket(object): return '' else: raise - trampoline(fd, - read=True, - timeout=self.gettimeout(), + trampoline(fd, + read=True, + timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) def recvfrom(self, *args): @@ -316,6 +320,7 @@ class GreenSocket(object): def gettimeout(self): return self._timeout + class _SocketDuckForFd(object): """ Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls.""" def __init__(self, fileno): @@ -348,7 +353,7 @@ class _SocketDuckForFd(object): if get_errno(e) != errno.EAGAIN: raise IOError(*e.args) total_sent = 0 - while total_sent " % ( @@ -412,7 +420,7 @@ class GreenPipe(_fileobject): self.__class__.__name__, self.name, self.mode, - (id(self) < 0) and (sys.maxint +id(self)) or id(self)) + (id(self) < 0) and (sys.maxint + id(self)) or id(self)) def close(self): super(GreenPipe, self).close() @@ -432,7 +440,7 @@ class GreenPipe(_fileobject): return iterator(self) def readinto(self, buf): - data = self.read(len(buf)) #FIXME could it be done without allocating intermediate? + data = self.read(len(buf)) # FIXME could it be done without allocating intermediate? n = len(data) try: buf[:n] = data @@ -450,7 +458,7 @@ class GreenPipe(_fileobject): def _clear_readahead_buf(self): len = self._get_readahead_len() - if len>0: + if len > 0: self.read(len) def tell(self): @@ -462,7 +470,7 @@ class GreenPipe(_fileobject): def seek(self, offset, whence=0): self.flush() - if whence == 1 and offset==0: # tell synonym + if whence == 1 and offset == 0: # tell synonym return self.tell() if whence == 1: # adjust offset by what is read ahead offset -= self.get_readahead_len() @@ -477,7 +485,7 @@ class GreenPipe(_fileobject): if getattr(file, "truncate", None): # not all OSes implement truncate def truncate(self, size=-1): self.flush() - if size ==-1: + if size == -1: size = self.tell() try: rv = os.ftruncate(self.fileno(), size) @@ -534,4 +542,3 @@ def shutdown_safe(sock): # this will often be the case in an http server context if get_errno(e) != errno.ENOTCONN: raise - diff --git a/tests/greenio_test.py b/tests/greenio_test.py index a4b4b8b..3df5b5c 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -1,11 +1,9 @@ import socket as _orig_sock from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b, skip_if, skip_on_windows -from eventlet import event -from eventlet import greenio -from eventlet import debug +from eventlet import event, greenio, debug +from eventlet.hubs import get_hub +from eventlet.green import socket, time from eventlet.support import get_errno -from eventlet.green import socket -from eventlet.green import time import errno import eventlet @@ -14,6 +12,7 @@ import sys import array import tempfile, shutil + def bufsized(sock, size=1): """ Resize both send and receive buffers on a socket. Useful for testing trampoline. Returns the socket. @@ -25,6 +24,7 @@ def bufsized(sock, size=1): sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size) return sock + def min_buf_size(): """Return the minimum buffer size that the platform supports.""" test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -33,7 +33,6 @@ def min_buf_size(): def using_epoll_hub(_f): - from eventlet.hubs import get_hub try: return 'epolls' in type(get_hub()).__module__ except Exception: @@ -42,7 +41,7 @@ def using_epoll_hub(_f): class TestGreenSocket(LimitedTestCase): def assertWriteToClosedFileRaises(self, fd): - if sys.version_info[0]<3: + if sys.version_info[0] < 3: # 2.x socket._fileobjects are odd: writes don't check # whether the socket is closed or not, and you get an # AttributeError during flush if it is closed @@ -93,8 +92,9 @@ class TestGreenSocket(LimitedTestCase): listener = greenio.GreenSocket(socket.socket()) listener.bind(('', 0)) listener.listen(50) - + evt = event.Event() + def server(): # accept the connection in another greenlet sock, addr = listener.accept() @@ -155,6 +155,7 @@ class TestGreenSocket(LimitedTestCase): listener.listen(50) evt = event.Event() + def server(): # accept the connection in another greenlet sock, addr = listener.accept() @@ -183,6 +184,7 @@ class TestGreenSocket(LimitedTestCase): listener = bufsized(eventlet.listen(('', 0))) evt = event.Event() + def server(): # accept the connection in another greenlet sock, addr = listener.accept() @@ -197,7 +199,7 @@ class TestGreenSocket(LimitedTestCase): client.connect(addr) try: client.settimeout(0.00001) - msg = s2b("A")*(100000) # large enough number to overwhelm most buffers + msg = s2b("A") * 100000 # large enough number to overwhelm most buffers total_sent = 0 # want to exceed the size of the OS buffer so it'll block in a @@ -218,6 +220,7 @@ class TestGreenSocket(LimitedTestCase): listener.listen(50) evt = event.Event() + def server(): # accept the connection in another greenlet sock, addr = listener.accept() @@ -232,7 +235,7 @@ class TestGreenSocket(LimitedTestCase): client.connect(addr) try: - msg = s2b("A")*(8*1024*1024) + msg = s2b("A") * (8 << 20) # want to exceed the size of the OS buffer so it'll block client.sendall(msg) @@ -284,7 +287,7 @@ class TestGreenSocket(LimitedTestCase): fd.close() server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('0.0.0.0', 0)) server.listen(50) killer = eventlet.spawn(accept_close_early, server) @@ -292,7 +295,7 @@ class TestGreenSocket(LimitedTestCase): killer.wait() server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('0.0.0.0', 0)) server.listen(50) killer = eventlet.spawn(accept_close_late, server) @@ -312,8 +315,9 @@ class TestGreenSocket(LimitedTestCase): self.assertWriteToClosedFileRaises(conn) finally: listener.close() + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('127.0.0.1', 0)) server.listen(50) killer = eventlet.spawn(accept_once, server) @@ -329,7 +333,7 @@ class TestGreenSocket(LimitedTestCase): def test_full_duplex(self): large_data = s2b('*') * 10 * min_buf_size() listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind(('127.0.0.1', 0)) listener.listen(50) bufsized(listener) @@ -371,16 +375,18 @@ class TestGreenSocket(LimitedTestCase): # it may legitimately take a while, but will eventually complete self.timer.cancel() second_bytes = 10 + def test_sendall_impl(many_bytes): - bufsize = max(many_bytes//15, 2) + bufsize = max(many_bytes // 15, 2) + def sender(listener): (sock, addr) = listener.accept() sock = bufsized(sock, size=bufsize) - sock.sendall(s2b('x')*many_bytes) - sock.sendall(s2b('y')*second_bytes) + sock.sendall(s2b('x') * many_bytes) + sock.sendall(s2b('y') * second_bytes) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind(("", 0)) listener.listen(50) sender_coro = eventlet.spawn(sender, listener) @@ -389,7 +395,7 @@ class TestGreenSocket(LimitedTestCase): bufsized(client, size=bufsize) total = 0 while total < many_bytes: - data = client.recv(min(many_bytes - total, many_bytes//10)) + data = client.recv(min(many_bytes - total, many_bytes // 10)) if not data: break total += len(data) @@ -414,16 +420,16 @@ class TestGreenSocket(LimitedTestCase): pass # pre-2.6 else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('127.0.0.1', 0)) sock.listen(50) - ssl_sock = ssl.wrap_socket(sock) + ssl.wrap_socket(sock) def test_timeout_and_final_write(self): # This test verifies that a write on a socket that we've # stopped listening for doesn't result in an incorrect switch server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('127.0.0.1', 0)) server.listen(50) bound_port = server.getsockname()[1] @@ -437,7 +443,6 @@ class TestGreenSocket(LimitedTestCase): s2.close() evt.send('sent via event') - from eventlet import event evt = event.Event() eventlet.spawn(sender, evt) eventlet.sleep(0) # lets the socket enter accept mode, which @@ -449,7 +454,7 @@ class TestGreenSocket(LimitedTestCase): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', bound_port)) wrap_rfile = client.makefile() - _c = wrap_rfile.read(1) + wrap_rfile.read(1) self.fail() except eventlet.TimeoutError: pass @@ -467,10 +472,10 @@ class TestGreenSocket(LimitedTestCase): sock.recv(1) sock.sendall("a") raise eventlet.StopServe() + listener = eventlet.listen(('127.0.0.1', 0)) - server = eventlet.spawn(eventlet.serve, - listener, - handle) + eventlet.spawn(eventlet.serve, listener, handle) + def reader(s): s.recv(1) @@ -490,7 +495,7 @@ class TestGreenSocket(LimitedTestCase): try: sock.sendall('hello world') except socket.error, e: - if get_errno(e)== errno.EPIPE: + if get_errno(e) == errno.EPIPE: return raise @@ -517,12 +522,13 @@ class TestGreenSocket(LimitedTestCase): eventlet.spawn_n(closer) reader.wait() sender.wait() - + def test_invalid_connection(self): # find an unused port by creating a socket then closing it port = eventlet.listen(('127.0.0.1', 0)).getsockname()[1] self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port)) - + + class TestGreenPipe(LimitedTestCase): @skip_on_windows def setUp(self): @@ -534,17 +540,18 @@ class TestGreenPipe(LimitedTestCase): super(self.__class__, self).tearDown() def test_pipe(self): - r,w = os.pipe() - rf = greenio.GreenPipe(r, 'r'); - wf = greenio.GreenPipe(w, 'w', 0); + r, w = os.pipe() + rf = greenio.GreenPipe(r, 'r') + wf = greenio.GreenPipe(w, 'w', 0) + def sender(f, content): for ch in content: eventlet.sleep(0.0001) f.write(ch) f.close() - one_line = "12345\n"; - eventlet.spawn(sender, wf, one_line*5) + one_line = "12345\n" + eventlet.spawn(sender, wf, one_line * 5) for i in xrange(5): line = rf.readline() eventlet.sleep(0.01) @@ -587,7 +594,8 @@ class TestGreenPipe(LimitedTestCase): r = greenio.GreenPipe(r) w = greenio.GreenPipe(w, 'w') - large_message = "".join([1024*chr(i) for i in xrange(65)]) + large_message = "".join([1024 * chr(i) for i in xrange(65)]) + def writer(): w.write(large_message) w.close() @@ -596,27 +604,27 @@ class TestGreenPipe(LimitedTestCase): for i in xrange(65): buf = r.read(1024) - expected = 1024*chr(i) - self.assertEquals(buf, expected, - "expected=%r..%r, found=%r..%r iter=%d" + expected = 1024 * chr(i) + self.assertEquals(buf, expected, + "expected=%r..%r, found=%r..%r iter=%d" % (expected[:4], expected[-4:], buf[:4], buf[-4:], i)) gt.wait() def test_seek_on_buffered_pipe(self): - f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024) - self.assertEquals(f.tell(),0) - f.seek(0,2) - self.assertEquals(f.tell(),0) + f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024) + self.assertEquals(f.tell(), 0) + f.seek(0, 2) + self.assertEquals(f.tell(), 0) f.write('1234567890') - f.seek(0,2) - self.assertEquals(f.tell(),10) + f.seek(0, 2) + self.assertEquals(f.tell(), 10) f.seek(0) value = f.read(1) self.assertEqual(value, '1') - self.assertEquals(f.tell(),1) + self.assertEquals(f.tell(), 1) value = f.read(1) self.assertEqual(value, '2') - self.assertEquals(f.tell(),2) + self.assertEquals(f.tell(), 2) f.seek(0, 1) self.assertEqual(f.readline(), '34567890') f.seek(0) @@ -625,19 +633,21 @@ class TestGreenPipe(LimitedTestCase): self.assertEqual(f.readline(), '') def test_truncate(self): - f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024) + f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024) f.write('1234567890') f.truncate(9) self.assertEquals(f.tell(), 9) class TestGreenIoLong(LimitedTestCase): - TEST_TIMEOUT=10 # the test here might take a while depending on the OS + TEST_TIMEOUT = 10 # the test here might take a while depending on the OS + @skip_with_pyevent def test_multiple_readers(self, clibufsize=False): debug.hub_prevent_multiple_readers(False) recvsize = 2 * min_buf_size() sendsize = 10 * recvsize + # test that we can have multiple coroutines reading # from the same fd. We make no guarantees about which one gets which # bytes, but they should both get at least some @@ -651,9 +661,10 @@ class TestGreenIoLong(LimitedTestCase): results1 = [] results2 = [] listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind(('127.0.0.1', 0)) listener.listen(50) + def server(): (sock, addr) = listener.accept() sock = bufsized(sock) @@ -689,33 +700,35 @@ class TestGreenIoLong(LimitedTestCase): def test_multiple_readers2(self): self.test_multiple_readers(clibufsize=True) -class TestGreenIoStarvation(LimitedTestCase): + +class TestGreenIoStarvation(LimitedTestCase): # fixme: this doesn't succeed, because of eventlet's predetermined # ordering. two processes, one with server, one with client eventlets # might be more reliable? - - TEST_TIMEOUT=300 # the test here might take a while depending on the OS + + TEST_TIMEOUT = 300 # the test here might take a while depending on the OS + @skipped # by rdw, because it fails but it's not clear how to make it pass @skip_with_pyevent def test_server_starvation(self, sendloops=15): recvsize = 2 * min_buf_size() sendsize = 10000 * recvsize - + results = [[] for i in xrange(5)] - + listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind(('127.0.0.1', 0)) port = listener.getsockname()[1] listener.listen(50) - + base_time = time.time() - + def server(my_results): - (sock, addr) = listener.accept() - + sock, addr = listener.accept() + datasize = 0 - + t1 = None t2 = None try: @@ -726,7 +739,7 @@ class TestGreenIoStarvation(LimitedTestCase): if not data: t2 = time.time() - base_time my_results.append(datasize) - my_results.append((t1,t2)) + my_results.append((t1, t2)) break datasize += len(data) finally: @@ -736,7 +749,7 @@ class TestGreenIoStarvation(LimitedTestCase): pid = os.fork() if pid: return pid - + client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', port))