More cleanup of util and wrappedfd.

This commit is contained in:
rdw
2008-03-17 23:04:35 -05:00
parent 9dc8a70b7a
commit 1e0cf35e52
3 changed files with 73 additions and 59 deletions

View File

@@ -111,7 +111,7 @@ class TestDyingProcessesLeavePool(tests.TestCase):
class TestProcessLivesForever(tests.TestCase): class TestProcessLivesForever(tests.TestCase):
mode = 'static' mode = 'static'
def setUp(self): def setUp(self):
self.pool = processes.ProcessPool('python', ['-c', 'print "y"; print "y"'], max_size=1) self.pool = processes.ProcessPool('python', ['-c', 'print "y"; import time; time.sleep(0.1); print "y"'], max_size=1)
def test_reading_twice_from_same_process(self): def test_reading_twice_from_same_process(self):
proc = self.pool.get() proc = self.pool.get()

View File

@@ -62,15 +62,6 @@ def g_log(*args):
ident = 'greenlet-%d' % (g_id,) ident = 'greenlet-%d' % (g_id,)
print >>sys.stderr, '[%s] %s' % (ident, ' '.join(map(str, args))) print >>sys.stderr, '[%s] %s' % (ident, ' '.join(map(str, args)))
CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)
CONNECT_SUCCESS = (0, errno.EISCONN)
def socket_connect(descriptor, address):
err = descriptor.connect_ex(address)
if err in CONNECT_ERR:
return None
if err not in CONNECT_SUCCESS:
raise socket.error(err, errno.errorcode[err])
return descriptor
__original_socket__ = socket.socket __original_socket__ = socket.socket
@@ -116,47 +107,6 @@ def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50):
descriptor.bind(addr) descriptor.bind(addr)
descriptor.listen(backlog) descriptor.listen(backlog)
return descriptor return descriptor
def socket_accept(descriptor):
try:
return descriptor.accept()
except socket.error, e:
if e[0] == errno.EWOULDBLOCK:
return None
raise
def socket_send(descriptor, data):
try:
return descriptor.send(data)
except socket.error, e:
if e[0] == errno.EWOULDBLOCK:
return 0
raise
except SSL.WantWriteError:
return 0
except SSL.WantReadError:
return 0
# winsock sometimes throws ENOTCONN
SOCKET_CLOSED = (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN)
def socket_recv(descriptor, buflen):
try:
return descriptor.recv(buflen)
except socket.error, e:
if e[0] == errno.EWOULDBLOCK:
return None
if e[0] in SOCKET_CLOSED:
return ''
raise
except SSL.WantReadError:
return None
except SSL.ZeroReturnError:
return ''
except SSL.SysCallError, e:
if e[0] == -1 or e[0] > 0:
raise socket.error(errno.ECONNRESET, errno.errorcode[errno.ECONNRESET])
raise
def set_reuse_addr(descriptor): def set_reuse_addr(descriptor):

View File

@@ -31,6 +31,8 @@ import socket, errno
from errno import EWOULDBLOCK, EAGAIN from errno import EWOULDBLOCK, EAGAIN
__all__ = ['GreenSocket', 'GreenFile', 'GreenPipe']
def higher_order_recv(recv_func): def higher_order_recv(recv_func):
def recv(self, buflen): def recv(self, buflen):
buf = self.recvbuffer buf = self.recvbuffer
@@ -64,6 +66,60 @@ def higher_order_send(send_func):
return send return send
CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)
CONNECT_SUCCESS = (0, errno.EISCONN)
def socket_connect(descriptor, address):
err = descriptor.connect_ex(address)
if err in CONNECT_ERR:
return None
if err not in CONNECT_SUCCESS:
raise socket.error(err, errno.errorcode[err])
return descriptor
def socket_accept(descriptor):
try:
return descriptor.accept()
except socket.error, e:
if e[0] == errno.EWOULDBLOCK:
return None
raise
def socket_send(descriptor, data):
try:
return descriptor.send(data)
except socket.error, e:
if e[0] == errno.EWOULDBLOCK:
return 0
raise
except SSL.WantWriteError:
return 0
except SSL.WantReadError:
return 0
# winsock sometimes throws ENOTCONN
SOCKET_CLOSED = (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN)
def socket_recv(descriptor, buflen):
try:
return descriptor.recv(buflen)
except socket.error, e:
if e[0] == errno.EWOULDBLOCK:
return None
if e[0] in SOCKET_CLOSED:
return ''
raise
except SSL.WantReadError:
return None
except SSL.ZeroReturnError:
return ''
except SSL.SysCallError, e:
if e[0] == -1 or e[0] > 0:
raise socket.error(errno.ECONNRESET, errno.errorcode[errno.ECONNRESET])
raise
def file_recv(fd, buflen): def file_recv(fd, buflen):
try: try:
return fd.read(buflen) return fd.read(buflen)
@@ -106,7 +162,7 @@ class GreenSocket(object):
def accept(self): def accept(self):
fd = self.fd fd = self.fd
while True: while True:
res = util.socket_accept(fd) res = socket_accept(fd)
if res is not None: if res is not None:
client, addr = res client, addr = res
util.set_nonblocking(client) util.set_nonblocking(client)
@@ -132,7 +188,7 @@ class GreenSocket(object):
def connect(self, address): def connect(self, address):
fd = self.fd fd = self.fd
connect = util.socket_connect connect = socket_connect
while not connect(fd, address): while not connect(fd, address):
trampoline(fd, read=True, write=True) trampoline(fd, read=True, write=True)
@@ -173,7 +229,7 @@ class GreenSocket(object):
def makefile(self, mode = None, bufsize = None): def makefile(self, mode = None, bufsize = None):
return GreenFile(self.dup()) return GreenFile(self.dup())
recv = higher_order_recv(util.socket_recv) recv = higher_order_recv(socket_recv)
def recvfrom(self, *args): def recvfrom(self, *args):
trampoline(self.fd, read=True) trampoline(self.fd, read=True)
@@ -182,7 +238,7 @@ class GreenSocket(object):
# TODO recvfrom_into # TODO recvfrom_into
# TODO recv_into # TODO recv_into
send = higher_order_send(util.socket_send) send = higher_order_send(socket_send)
def sendall(self, data): def sendall(self, data):
fd = self.fd fd = self.fd
@@ -317,7 +373,9 @@ class GreenFile(object):
return ''.join(lst) return ''.join(lst)
class MetaSocket(GreenSocket): class GreenPipeSocket(GreenSocket):
""" This is a weird class that looks like a socket but expects a file descriptor as an argument instead of a socket.
"""
recv = higher_order_recv(file_recv) recv = higher_order_recv(file_recv)
send = higher_order_send(file_send) send = higher_order_send(file_send)
@@ -325,10 +383,16 @@ class MetaSocket(GreenSocket):
class GreenPipe(GreenFile): class GreenPipe(GreenFile):
def __init__(self, fd): def __init__(self, fd):
self.fd = MetaSocket(fd) self.fd = GreenPipeSocket(fd)
self.recv = self.fd.recv
self.send = self.fd.send
super(GreenPipe, self).__init__(self.fd) super(GreenPipe, self).__init__(self.fd)
def recv(self, *args, **kw):
fn = self.recv = self.fd.recv
return fn(*args, **kw)
def send(self, *args, **kw):
fn = self.send = self.fd.send
return fn(*args, **kw)
def flush(self): def flush(self):
self.fd.fd.flush() self.fd.fd.flush()