twistedutil.protocol.GreenTransport: made bufferSize of twisted's transport customizable

This commit is contained in:
Denis Bilenko
2008-12-16 17:25:10 +06:00
parent bf40b1b344
commit 9fb11d2564
2 changed files with 106 additions and 125 deletions

View File

@@ -26,13 +26,18 @@ class Producer2Event(object):
class GreenTransportBase(object):
write_event = None
transportBufferSize = None
def __init__(self, transportBufferSize=None):
if transportBufferSize is not None:
self.transportBufferSize = transportBufferSize
def build_protocol(self):
# note to subclassers: self._queue must have send and send_exception that never block
self._queue = queue()
protocol = self.protocol_class(self, self._queue)
return protocol
def _wait(self):
self.transport.resumeProducing()
try:
@@ -44,7 +49,7 @@ class GreenTransportBase(object):
self.transport.write(data)
if self.write_event is not None:
self.write_event.wait()
def __getattr__(self, item):
if item=='transport':
raise AttributeError(item)
@@ -59,17 +64,19 @@ class GreenTransportBase(object):
self.paused -= 1
if self.paused==0:
self.transport.resumeProducing()
def pauseProducing(self):
self.paused += 1
if self.paused==1:
self.transport.pauseProducing()
def init_transport_producer(self, transport):
transport.pauseProducing()
self.paused = 1
def init_transport(self, transport):
if self.transportBufferSize is not None:
transport.bufferSize = self.transportBufferSize
self.init_transport_producer(transport)
ev = event()
ev.send(1)
@@ -82,7 +89,7 @@ class Protocol(twistedProtocol):
def __init__(self, gtransport, queue):
self.gtransport = gtransport
self._queue = queue
def connectionMade(self):
self.gtransport.init_transport(self.transport)
del self.gtransport
@@ -147,24 +154,23 @@ class UnbufferedTransport(GreenTransportBase):
class GreenTransport(GreenTransportBase):
protocol_class = Protocol
def __init__(self):
self.buf = ''
self._error = None
_buffer = ''
_error = None
def _wait(self):
# don't pause/resume producer here; read and recv methods will do it themselves
return self._queue.wait()
def read(self, size=-1):
"""Read size bytes or until EOF"""
if self._queue is not None:
resumed = False
try:
while len(self.buf) < size or size < 0:
while len(self._buffer) < size or size < 0:
if not resumed:
self.resumeProducing()
resumed = True
self.buf += self._wait()
self._buffer += self._wait()
except ConnectionDone:
self._queue = None
except:
@@ -174,22 +180,22 @@ class GreenTransport(GreenTransportBase):
if resumed:
self.pauseProducing()
if size>=0:
result, self.buf = self.buf[:size], self.buf[size:]
result, self._buffer = self._buffer[:size], self._buffer[size:]
else:
result, self.buf = self.buf, ''
result, self._buffer = self._buffer, ''
if not result and self._error is not None:
error = self._error
self._error = None
error, self._error = self._error, None
raise error[0], error[1], error[2]
return result
def recv(self, buflen=None):
if self._queue is not None and not self.buf:
"""Receive a single chunk of undefined size but no bigger than buflen"""
if self._queue is not None and not self._buffer:
self.resumeProducing()
try:
recvd = self._wait()
#print 'received %r' % recvd
self.buf += recvd
self._buffer += recvd
except ConnectionDone:
self._queue = None
except:
@@ -198,9 +204,9 @@ class GreenTransport(GreenTransportBase):
finally:
self.pauseProducing()
if buflen is None:
result, self.buf = self.buf, ''
result, self._buffer = self._buffer, ''
else:
result, self.buf = self.buf[:buflen], self.buf[buflen:]
result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]
if not result and self._error is not None:
error = self._error
self._error = None

View File

@@ -8,6 +8,7 @@ from twisted.internet.error import ConnectionLost, ConnectionDone
from twisted.python import failure
import eventlet.twistedutil.protocol as pr
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
from eventlet.api import spawn, sleep, with_timeout, call_after
from eventlet.green import socket
@@ -24,7 +25,7 @@ def setup_server_socket(self, delay=DELAY, port=0):
conn, addr = s.accept()
conn.settimeout(delay+1)
try:
hello = conn.recv(128)
hello = conn.makefile().readline()[:-2]
except socket.timeout:
return
conn.sendall('you said %s. ' % hello)
@@ -38,148 +39,80 @@ def setup_server_socket(self, delay=DELAY, port=0):
def setup_server_SpawnFactory(self, delay=DELAY, port=0):
def handle(conn):
port.stopListening()
hello = conn.recv()
try:
hello = conn.readline()
except ConnectionDone:
return
conn.write('you said %s. ' % hello)
sleep(delay)
conn.write('BYE')
sleep(delay)
conn.loseConnection()
port = reactor.listenTCP(0, pr.SpawnFactory(handle, pr.UnbufferedTransport))
port = reactor.listenTCP(0, pr.SpawnFactory(handle, LineOnlyReceiverTransport))
return port.getHost().port
class TestCase(unittest.TestCase):
transportBufferSize = None
@property
def connector(self):
return pr.GreenClientCreator(reactor, self.gtransportClass, self.transportBufferSize)
def setUp(self):
port = self.setup_server()
self.conn = self.connector.connectTCP('127.0.0.1', port)
if self.transportBufferSize is not None:
self.assertEqual(self.transportBufferSize, self.conn.transport.bufferSize)
class TestUnbufferedTransport(TestCase):
connector = pr.GreenClientCreator(reactor, pr.UnbufferedTransport)
gtransportClass = pr.UnbufferedTransport
setup_server = setup_server_socket
def test_recv(self):
self.conn.write('hello')
self.assertEqual(self.conn.recv(), 'you said hello. ')
self.assertEqual(self.conn.recv(), 'BYE')
self.assertEqual(self.conn.recv(), '')
self.assertEqual(self.conn.recv(), '')
def test_recv_error(self):
self.conn.write('hello')
self.assertEqual(self.conn.recv(), 'you said hello. ')
try:
1/0
except:
f = failure.Failure()
spawn(self.conn.protocol.connectionLost, f)
self.assertRaises(ZeroDivisionError, self.conn.recv)
def test_full_read(self):
self.conn.write('hello\r\n')
self.assertEqual(self.conn.read(), 'you said hello. BYE')
self.assertEqual(self.conn.read(), '')
self.assertEqual(self.conn.read(), '')
def test_iterator(self):
self.conn.write('hello')
i = iter(self.conn)
self.assertEqual(i.next(), 'you said hello. ')
self.assertEqual(i.next(), 'BYE')
self.assertRaises(StopIteration, i.next)
self.conn.write('iterator\r\n')
self.assertEqual('you said iterator. BYE', ''.join(self.conn))
class TestUnbufferedTransport_bufsize1(TestUnbufferedTransport):
transportBufferSize = 1
setup_server = setup_server_SpawnFactory
class TestUnbufferedTransport_SpawnFactory(TestUnbufferedTransport):
setup_server = setup_server_SpawnFactory
class TestTransport(pr.GreenTransportBase):
protocol_class = pr.Protocol
def recv(self):
return self._wait()
class TestError(TestCase):
connector = pr.GreenClientCreator(reactor, TestTransport)
setup_server = setup_server_socket
def test_error(self):
self.conn.write('hello')
self.assertEqual(self.conn.recv(), 'you said hello. ')
self.assertEqual(self.conn.recv(), 'BYE')
self.assertRaises(ConnectionDone, self.conn.recv)
class TestError_SpawnFactory(TestError):
class TestUnbufferedTransport_SpawnFactory_bufsize1(TestUnbufferedTransport):
transportBufferSize = 1
setup_server = setup_server_SpawnFactory
class TestGreenTransport(TestCase):
connector = pr.GreenClientCreator(reactor, pr.GreenTransport)
class TestGreenTransport(TestUnbufferedTransport):
gtransportClass = pr.GreenTransport
setup_server = setup_server_socket
def test_read(self):
self.conn.write('hello')
self.conn.write('hello\r\n')
self.assertEqual(self.conn.read(9), 'you said ')
self.assertEqual(self.conn.read(999), 'hello. BYE')
self.assertEqual(None, self.conn._queue)
self.assertEqual(self.conn.read(9), '')
self.assertEqual(self.conn.read(1), '')
self.assertEqual(None, self.conn._queue)
self.assertEqual(self.conn.recv(9), '')
self.assertEqual(self.conn.recv(1), '')
def test_read2(self):
self.conn.write('world')
self.conn.write('world\r\n')
self.assertEqual(self.conn.read(), 'you said world. BYE')
self.assertEqual(self.conn.read(), '')
self.assertEqual(self.conn.recv(), '')
def test_read_error(self):
self.conn.write('hello')
self.assertEqual(self.conn.read(9), 'you said ')
self.assertEqual(self.conn.recv(), 'hello. ')
sleep(DELAY*1.5) # make sure the rest of data arrives
try:
1/0
except:
#self.conn.loseConnection(failure.Failure()) # does not work, why?
spawn(self.conn._queue.send_exception, *sys.exc_info())
self.assertEqual(self.conn.read(9), 'BYE')
self.assertRaises(ZeroDivisionError, self.conn.read, 9)
self.assertEqual(None, self.conn._queue)
self.assertEqual(self.conn.read(1), '')
self.assertEqual(self.conn.read(1), '')
def test_recv(self):
self.conn.write('hello')
self.assertEqual('you said hello. ', self.conn.recv())
self.assertEqual('BYE', self.conn.recv())
self.assertEqual('', self.conn.recv())
self.assertEqual('', self.conn.recv())
def test_recv2(self):
self.conn.write('whoa')
self.assertEqual('you said ', self.conn.recv(9))
self.assertEqual('whoa. ', self.conn.recv(999))
self.assertEqual('BYE', self.conn.recv(9))
self.assertEqual('', self.conn.recv(1))
self.assertEqual('', self.conn.recv())
self.assertEqual('', self.conn.read())
def test_recv_error(self):
self.conn.write('hello')
self.assertEqual('you said hello. ', self.conn.recv())
sleep(DELAY*1.5) # make sure the rest of data arrives
try:
1/0
except:
#self.conn.loseConnection(failure.Failure()) # does not work, why?
spawn(self.conn._queue.send_exception, *sys.exc_info())
self.assertEqual('BYE', self.conn.recv())
self.assertRaises(ZeroDivisionError, self.conn.recv, 9)
self.assertEqual(None, self.conn._queue)
self.assertEqual('', self.conn.recv(1))
self.assertEqual('', self.conn.recv())
def test_iterator(self):
self.conn.write('hello')
self.assertEqual('you said hello. ', self.conn.next())
self.assertEqual('BYE', self.conn.next())
self.assertRaises(StopIteration, self.conn.next)
self.conn.write('iterator\r\n')
self.assertEqual('you said iterator. BYE', ''.join(self.conn))
_tests = [x for x in locals().keys() if x.startswith('test_')]
@@ -191,21 +124,63 @@ class TestGreenTransport(TestCase):
def test_pause_producing(self):
self.conn.pauseProducing()
self.conn.write('hi')
self.conn.write('hi\r\n')
result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out')
self.assertEqual('timed out', result)
def test_pauseresume_producing(self):
self.conn.pauseProducing()
call_after(DELAY*5, self.conn.resumeProducing)
self.conn.write('hi')
self.conn.write('hi\r\n')
result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out')
self.assertEqual('you said hi. BYE', result)
class TestGreenTransport_bufsize1(TestGreenTransport):
transportBufferSize = 1
class TestGreenTransport_SpawnFactory(TestGreenTransport):
setup_server = setup_server_SpawnFactory
class TestGreenTransport_SpawnFactory_bufsize1(TestGreenTransport):
transportBufferSize = 1
setup_server = setup_server_SpawnFactory
class TestGreenTransportError(TestCase):
setup_server = setup_server_socket
gtransportClass = pr.GreenTransport
def test_read_error(self):
self.conn.write('hello\r\n')
sleep(DELAY*1.5) # make sure the rest of data arrives
try:
1/0
except:
#self.conn.loseConnection(failure.Failure()) # does not work, why?
spawn(self.conn._queue.send_exception, *sys.exc_info())
self.assertEqual(self.conn.read(9), 'you said ')
self.assertEqual(self.conn.read(7), 'hello. ')
self.assertEqual(self.conn.read(9), 'BYE')
self.assertRaises(ZeroDivisionError, self.conn.read, 9)
self.assertEqual(None, self.conn._queue)
self.assertEqual(self.conn.read(1), '')
self.assertEqual(self.conn.read(1), '')
# def test_recv_error(self):
# self.conn.write('hello')
# self.assertEqual('you said hello. ', self.conn.recv())
# sleep(DELAY*1.5) # make sure the rest of data arrives
# try:
# 1/0
# except:
# #self.conn.loseConnection(failure.Failure()) # does not work, why?
# spawn(self.conn._queue.send_exception, *sys.exc_info())
# self.assertEqual('BYE', self.conn.recv())
# self.assertRaises(ZeroDivisionError, self.conn.recv, 9)
# self.assertEqual(None, self.conn._queue)
# self.assertEqual('', self.conn.recv(1))
# self.assertEqual('', self.conn.recv())
#
if __name__=='__main__':
unittest.main()