From 9fb11d25648115703caf4369c07b2757e63647e6 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Tue, 16 Dec 2008 17:25:10 +0600 Subject: [PATCH] twistedutil.protocol.GreenTransport: made bufferSize of twisted's transport customizable --- eventlet/twistedutil/protocol.py | 46 +++--- greentest/test__twistedutil_protocol.py | 185 ++++++++++-------------- 2 files changed, 106 insertions(+), 125 deletions(-) diff --git a/eventlet/twistedutil/protocol.py b/eventlet/twistedutil/protocol.py index 26fa16e..1d60e3e 100644 --- a/eventlet/twistedutil/protocol.py +++ b/eventlet/twistedutil/protocol.py @@ -26,13 +26,18 @@ class Producer2Event(object): class GreenTransportBase(object): write_event = None + transportBufferSize = None + + def __init__(self, transportBufferSize=None): + if transportBufferSize is not None: + self.transportBufferSize = transportBufferSize def build_protocol(self): # note to subclassers: self._queue must have send and send_exception that never block self._queue = queue() protocol = self.protocol_class(self, self._queue) return protocol - + def _wait(self): self.transport.resumeProducing() try: @@ -44,7 +49,7 @@ class GreenTransportBase(object): self.transport.write(data) if self.write_event is not None: self.write_event.wait() - + def __getattr__(self, item): if item=='transport': raise AttributeError(item) @@ -59,17 +64,19 @@ class GreenTransportBase(object): self.paused -= 1 if self.paused==0: self.transport.resumeProducing() - + def pauseProducing(self): self.paused += 1 if self.paused==1: self.transport.pauseProducing() - + def init_transport_producer(self, transport): transport.pauseProducing() self.paused = 1 def init_transport(self, transport): + if self.transportBufferSize is not None: + transport.bufferSize = self.transportBufferSize self.init_transport_producer(transport) ev = event() ev.send(1) @@ -82,7 +89,7 @@ class Protocol(twistedProtocol): def __init__(self, gtransport, queue): self.gtransport = gtransport self._queue = queue - + def connectionMade(self): self.gtransport.init_transport(self.transport) del self.gtransport @@ -147,24 +154,23 @@ class UnbufferedTransport(GreenTransportBase): class GreenTransport(GreenTransportBase): protocol_class = Protocol - - def __init__(self): - self.buf = '' - self._error = None + _buffer = '' + _error = None def _wait(self): # don't pause/resume producer here; read and recv methods will do it themselves return self._queue.wait() - + def read(self, size=-1): + """Read size bytes or until EOF""" if self._queue is not None: resumed = False try: - while len(self.buf) < size or size < 0: + while len(self._buffer) < size or size < 0: if not resumed: self.resumeProducing() resumed = True - self.buf += self._wait() + self._buffer += self._wait() except ConnectionDone: self._queue = None except: @@ -174,22 +180,22 @@ class GreenTransport(GreenTransportBase): if resumed: self.pauseProducing() if size>=0: - result, self.buf = self.buf[:size], self.buf[size:] + result, self._buffer = self._buffer[:size], self._buffer[size:] else: - result, self.buf = self.buf, '' + result, self._buffer = self._buffer, '' if not result and self._error is not None: - error = self._error - self._error = None + error, self._error = self._error, None raise error[0], error[1], error[2] return result def recv(self, buflen=None): - if self._queue is not None and not self.buf: + """Receive a single chunk of undefined size but no bigger than buflen""" + if self._queue is not None and not self._buffer: self.resumeProducing() try: recvd = self._wait() #print 'received %r' % recvd - self.buf += recvd + self._buffer += recvd except ConnectionDone: self._queue = None except: @@ -198,9 +204,9 @@ class GreenTransport(GreenTransportBase): finally: self.pauseProducing() if buflen is None: - result, self.buf = self.buf, '' + result, self._buffer = self._buffer, '' else: - result, self.buf = self.buf[:buflen], self.buf[buflen:] + result, self._buffer = self._buffer[:buflen], self._buffer[buflen:] if not result and self._error is not None: error = self._error self._error = None diff --git a/greentest/test__twistedutil_protocol.py b/greentest/test__twistedutil_protocol.py index 842408d..c30c47b 100644 --- a/greentest/test__twistedutil_protocol.py +++ b/greentest/test__twistedutil_protocol.py @@ -8,6 +8,7 @@ from twisted.internet.error import ConnectionLost, ConnectionDone from twisted.python import failure import eventlet.twistedutil.protocol as pr +from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport from eventlet.api import spawn, sleep, with_timeout, call_after from eventlet.green import socket @@ -24,7 +25,7 @@ def setup_server_socket(self, delay=DELAY, port=0): conn, addr = s.accept() conn.settimeout(delay+1) try: - hello = conn.recv(128) + hello = conn.makefile().readline()[:-2] except socket.timeout: return conn.sendall('you said %s. ' % hello) @@ -38,148 +39,80 @@ def setup_server_socket(self, delay=DELAY, port=0): def setup_server_SpawnFactory(self, delay=DELAY, port=0): def handle(conn): port.stopListening() - hello = conn.recv() + try: + hello = conn.readline() + except ConnectionDone: + return conn.write('you said %s. ' % hello) sleep(delay) conn.write('BYE') sleep(delay) conn.loseConnection() - port = reactor.listenTCP(0, pr.SpawnFactory(handle, pr.UnbufferedTransport)) + port = reactor.listenTCP(0, pr.SpawnFactory(handle, LineOnlyReceiverTransport)) return port.getHost().port - class TestCase(unittest.TestCase): + transportBufferSize = None + + @property + def connector(self): + return pr.GreenClientCreator(reactor, self.gtransportClass, self.transportBufferSize) def setUp(self): port = self.setup_server() self.conn = self.connector.connectTCP('127.0.0.1', port) + if self.transportBufferSize is not None: + self.assertEqual(self.transportBufferSize, self.conn.transport.bufferSize) class TestUnbufferedTransport(TestCase): - - connector = pr.GreenClientCreator(reactor, pr.UnbufferedTransport) + gtransportClass = pr.UnbufferedTransport setup_server = setup_server_socket - def test_recv(self): - self.conn.write('hello') - self.assertEqual(self.conn.recv(), 'you said hello. ') - self.assertEqual(self.conn.recv(), 'BYE') - self.assertEqual(self.conn.recv(), '') - self.assertEqual(self.conn.recv(), '') - - def test_recv_error(self): - self.conn.write('hello') - self.assertEqual(self.conn.recv(), 'you said hello. ') - try: - 1/0 - except: - f = failure.Failure() - spawn(self.conn.protocol.connectionLost, f) - self.assertRaises(ZeroDivisionError, self.conn.recv) + def test_full_read(self): + self.conn.write('hello\r\n') + self.assertEqual(self.conn.read(), 'you said hello. BYE') + self.assertEqual(self.conn.read(), '') + self.assertEqual(self.conn.read(), '') def test_iterator(self): - self.conn.write('hello') - i = iter(self.conn) - self.assertEqual(i.next(), 'you said hello. ') - self.assertEqual(i.next(), 'BYE') - self.assertRaises(StopIteration, i.next) + self.conn.write('iterator\r\n') + self.assertEqual('you said iterator. BYE', ''.join(self.conn)) +class TestUnbufferedTransport_bufsize1(TestUnbufferedTransport): + transportBufferSize = 1 + setup_server = setup_server_SpawnFactory class TestUnbufferedTransport_SpawnFactory(TestUnbufferedTransport): setup_server = setup_server_SpawnFactory - -class TestTransport(pr.GreenTransportBase): - - protocol_class = pr.Protocol - - def recv(self): - return self._wait() - -class TestError(TestCase): - - connector = pr.GreenClientCreator(reactor, TestTransport) - setup_server = setup_server_socket - - def test_error(self): - self.conn.write('hello') - self.assertEqual(self.conn.recv(), 'you said hello. ') - self.assertEqual(self.conn.recv(), 'BYE') - self.assertRaises(ConnectionDone, self.conn.recv) - -class TestError_SpawnFactory(TestError): +class TestUnbufferedTransport_SpawnFactory_bufsize1(TestUnbufferedTransport): + transportBufferSize = 1 setup_server = setup_server_SpawnFactory -class TestGreenTransport(TestCase): - connector = pr.GreenClientCreator(reactor, pr.GreenTransport) +class TestGreenTransport(TestUnbufferedTransport): + gtransportClass = pr.GreenTransport setup_server = setup_server_socket def test_read(self): - self.conn.write('hello') + self.conn.write('hello\r\n') self.assertEqual(self.conn.read(9), 'you said ') self.assertEqual(self.conn.read(999), 'hello. BYE') + self.assertEqual(None, self.conn._queue) self.assertEqual(self.conn.read(9), '') self.assertEqual(self.conn.read(1), '') - self.assertEqual(None, self.conn._queue) + self.assertEqual(self.conn.recv(9), '') + self.assertEqual(self.conn.recv(1), '') def test_read2(self): - self.conn.write('world') + self.conn.write('world\r\n') self.assertEqual(self.conn.read(), 'you said world. BYE') self.assertEqual(self.conn.read(), '') self.assertEqual(self.conn.recv(), '') - def test_read_error(self): - self.conn.write('hello') - self.assertEqual(self.conn.read(9), 'you said ') - self.assertEqual(self.conn.recv(), 'hello. ') - sleep(DELAY*1.5) # make sure the rest of data arrives - try: - 1/0 - except: - #self.conn.loseConnection(failure.Failure()) # does not work, why? - spawn(self.conn._queue.send_exception, *sys.exc_info()) - self.assertEqual(self.conn.read(9), 'BYE') - self.assertRaises(ZeroDivisionError, self.conn.read, 9) - self.assertEqual(None, self.conn._queue) - self.assertEqual(self.conn.read(1), '') - self.assertEqual(self.conn.read(1), '') - - def test_recv(self): - self.conn.write('hello') - self.assertEqual('you said hello. ', self.conn.recv()) - self.assertEqual('BYE', self.conn.recv()) - self.assertEqual('', self.conn.recv()) - self.assertEqual('', self.conn.recv()) - - def test_recv2(self): - self.conn.write('whoa') - self.assertEqual('you said ', self.conn.recv(9)) - self.assertEqual('whoa. ', self.conn.recv(999)) - self.assertEqual('BYE', self.conn.recv(9)) - self.assertEqual('', self.conn.recv(1)) - self.assertEqual('', self.conn.recv()) - self.assertEqual('', self.conn.read()) - - def test_recv_error(self): - self.conn.write('hello') - self.assertEqual('you said hello. ', self.conn.recv()) - sleep(DELAY*1.5) # make sure the rest of data arrives - try: - 1/0 - except: - #self.conn.loseConnection(failure.Failure()) # does not work, why? - spawn(self.conn._queue.send_exception, *sys.exc_info()) - self.assertEqual('BYE', self.conn.recv()) - self.assertRaises(ZeroDivisionError, self.conn.recv, 9) - self.assertEqual(None, self.conn._queue) - self.assertEqual('', self.conn.recv(1)) - self.assertEqual('', self.conn.recv()) - def test_iterator(self): - self.conn.write('hello') - self.assertEqual('you said hello. ', self.conn.next()) - self.assertEqual('BYE', self.conn.next()) - self.assertRaises(StopIteration, self.conn.next) + self.conn.write('iterator\r\n') + self.assertEqual('you said iterator. BYE', ''.join(self.conn)) _tests = [x for x in locals().keys() if x.startswith('test_')] @@ -191,21 +124,63 @@ class TestGreenTransport(TestCase): def test_pause_producing(self): self.conn.pauseProducing() - self.conn.write('hi') + self.conn.write('hi\r\n') result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out') self.assertEqual('timed out', result) def test_pauseresume_producing(self): self.conn.pauseProducing() call_after(DELAY*5, self.conn.resumeProducing) - self.conn.write('hi') + self.conn.write('hi\r\n') result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out') self.assertEqual('you said hi. BYE', result) +class TestGreenTransport_bufsize1(TestGreenTransport): + transportBufferSize = 1 class TestGreenTransport_SpawnFactory(TestGreenTransport): setup_server = setup_server_SpawnFactory +class TestGreenTransport_SpawnFactory_bufsize1(TestGreenTransport): + transportBufferSize = 1 + setup_server = setup_server_SpawnFactory + +class TestGreenTransportError(TestCase): + setup_server = setup_server_socket + gtransportClass = pr.GreenTransport + + def test_read_error(self): + self.conn.write('hello\r\n') + sleep(DELAY*1.5) # make sure the rest of data arrives + try: + 1/0 + except: + #self.conn.loseConnection(failure.Failure()) # does not work, why? + spawn(self.conn._queue.send_exception, *sys.exc_info()) + self.assertEqual(self.conn.read(9), 'you said ') + self.assertEqual(self.conn.read(7), 'hello. ') + self.assertEqual(self.conn.read(9), 'BYE') + self.assertRaises(ZeroDivisionError, self.conn.read, 9) + self.assertEqual(None, self.conn._queue) + self.assertEqual(self.conn.read(1), '') + self.assertEqual(self.conn.read(1), '') + +# def test_recv_error(self): +# self.conn.write('hello') +# self.assertEqual('you said hello. ', self.conn.recv()) +# sleep(DELAY*1.5) # make sure the rest of data arrives +# try: +# 1/0 +# except: +# #self.conn.loseConnection(failure.Failure()) # does not work, why? +# spawn(self.conn._queue.send_exception, *sys.exc_info()) +# self.assertEqual('BYE', self.conn.recv()) +# self.assertRaises(ZeroDivisionError, self.conn.recv, 9) +# self.assertEqual(None, self.conn._queue) +# self.assertEqual('', self.conn.recv(1)) +# self.assertEqual('', self.conn.recv()) +# + if __name__=='__main__': unittest.main()