From 1e0cf35e52f568b801d22a707d4c9ff41e70677a Mon Sep 17 00:00:00 2001 From: rdw Date: Mon, 17 Mar 2008 23:04:35 -0500 Subject: [PATCH] More cleanup of util and wrappedfd. --- eventlet/processes_test.py | 2 +- eventlet/util.py | 50 ------------------------ eventlet/wrappedfd.py | 80 ++++++++++++++++++++++++++++++++++---- 3 files changed, 73 insertions(+), 59 deletions(-) diff --git a/eventlet/processes_test.py b/eventlet/processes_test.py index 7830a2e..7bf9b97 100644 --- a/eventlet/processes_test.py +++ b/eventlet/processes_test.py @@ -111,7 +111,7 @@ class TestDyingProcessesLeavePool(tests.TestCase): class TestProcessLivesForever(tests.TestCase): mode = 'static' def setUp(self): - self.pool = processes.ProcessPool('python', ['-c', 'print "y"; print "y"'], max_size=1) + self.pool = processes.ProcessPool('python', ['-c', 'print "y"; import time; time.sleep(0.1); print "y"'], max_size=1) def test_reading_twice_from_same_process(self): proc = self.pool.get() diff --git a/eventlet/util.py b/eventlet/util.py index 5d1c232..8cdbf4d 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -62,15 +62,6 @@ def g_log(*args): ident = 'greenlet-%d' % (g_id,) print >>sys.stderr, '[%s] %s' % (ident, ' '.join(map(str, args))) -CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK) -CONNECT_SUCCESS = (0, errno.EISCONN) -def socket_connect(descriptor, address): - err = descriptor.connect_ex(address) - if err in CONNECT_ERR: - return None - if err not in CONNECT_SUCCESS: - raise socket.error(err, errno.errorcode[err]) - return descriptor __original_socket__ = socket.socket @@ -116,47 +107,6 @@ def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50): descriptor.bind(addr) descriptor.listen(backlog) return descriptor - -def socket_accept(descriptor): - try: - return descriptor.accept() - except socket.error, e: - if e[0] == errno.EWOULDBLOCK: - return None - raise - -def socket_send(descriptor, data): - try: - return descriptor.send(data) - except socket.error, e: - if e[0] == errno.EWOULDBLOCK: - return 0 - raise - except SSL.WantWriteError: - return 0 - except SSL.WantReadError: - return 0 - - -# winsock sometimes throws ENOTCONN -SOCKET_CLOSED = (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN) -def socket_recv(descriptor, buflen): - try: - return descriptor.recv(buflen) - except socket.error, e: - if e[0] == errno.EWOULDBLOCK: - return None - if e[0] in SOCKET_CLOSED: - return '' - raise - except SSL.WantReadError: - return None - except SSL.ZeroReturnError: - return '' - except SSL.SysCallError, e: - if e[0] == -1 or e[0] > 0: - raise socket.error(errno.ECONNRESET, errno.errorcode[errno.ECONNRESET]) - raise def set_reuse_addr(descriptor): diff --git a/eventlet/wrappedfd.py b/eventlet/wrappedfd.py index 3ba7230..bc2851e 100644 --- a/eventlet/wrappedfd.py +++ b/eventlet/wrappedfd.py @@ -31,6 +31,8 @@ import socket, errno from errno import EWOULDBLOCK, EAGAIN +__all__ = ['GreenSocket', 'GreenFile', 'GreenPipe'] + def higher_order_recv(recv_func): def recv(self, buflen): buf = self.recvbuffer @@ -64,6 +66,60 @@ def higher_order_send(send_func): return send +CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK) +CONNECT_SUCCESS = (0, errno.EISCONN) +def socket_connect(descriptor, address): + err = descriptor.connect_ex(address) + if err in CONNECT_ERR: + return None + if err not in CONNECT_SUCCESS: + raise socket.error(err, errno.errorcode[err]) + return descriptor + + +def socket_accept(descriptor): + try: + return descriptor.accept() + except socket.error, e: + if e[0] == errno.EWOULDBLOCK: + return None + raise + + +def socket_send(descriptor, data): + try: + return descriptor.send(data) + except socket.error, e: + if e[0] == errno.EWOULDBLOCK: + return 0 + raise + except SSL.WantWriteError: + return 0 + except SSL.WantReadError: + return 0 + + +# winsock sometimes throws ENOTCONN +SOCKET_CLOSED = (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN) +def socket_recv(descriptor, buflen): + try: + return descriptor.recv(buflen) + except socket.error, e: + if e[0] == errno.EWOULDBLOCK: + return None + if e[0] in SOCKET_CLOSED: + return '' + raise + except SSL.WantReadError: + return None + except SSL.ZeroReturnError: + return '' + except SSL.SysCallError, e: + if e[0] == -1 or e[0] > 0: + raise socket.error(errno.ECONNRESET, errno.errorcode[errno.ECONNRESET]) + raise + + def file_recv(fd, buflen): try: return fd.read(buflen) @@ -106,7 +162,7 @@ class GreenSocket(object): def accept(self): fd = self.fd while True: - res = util.socket_accept(fd) + res = socket_accept(fd) if res is not None: client, addr = res util.set_nonblocking(client) @@ -132,7 +188,7 @@ class GreenSocket(object): def connect(self, address): fd = self.fd - connect = util.socket_connect + connect = socket_connect while not connect(fd, address): trampoline(fd, read=True, write=True) @@ -173,7 +229,7 @@ class GreenSocket(object): def makefile(self, mode = None, bufsize = None): return GreenFile(self.dup()) - recv = higher_order_recv(util.socket_recv) + recv = higher_order_recv(socket_recv) def recvfrom(self, *args): trampoline(self.fd, read=True) @@ -182,7 +238,7 @@ class GreenSocket(object): # TODO recvfrom_into # TODO recv_into - send = higher_order_send(util.socket_send) + send = higher_order_send(socket_send) def sendall(self, data): fd = self.fd @@ -317,7 +373,9 @@ class GreenFile(object): return ''.join(lst) -class MetaSocket(GreenSocket): +class GreenPipeSocket(GreenSocket): + """ This is a weird class that looks like a socket but expects a file descriptor as an argument instead of a socket. + """ recv = higher_order_recv(file_recv) send = higher_order_send(file_send) @@ -325,10 +383,16 @@ class MetaSocket(GreenSocket): class GreenPipe(GreenFile): def __init__(self, fd): - self.fd = MetaSocket(fd) - self.recv = self.fd.recv - self.send = self.fd.send + self.fd = GreenPipeSocket(fd) super(GreenPipe, self).__init__(self.fd) + def recv(self, *args, **kw): + fn = self.recv = self.fd.recv + return fn(*args, **kw) + + def send(self, *args, **kw): + fn = self.send = self.fd.send + return fn(*args, **kw) + def flush(self): self.fd.fd.flush()