From ecbdf00338585ddc1c687ce06529b873e246e35b Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Wed, 10 Dec 2008 16:35:01 +0600 Subject: [PATCH] renamed Buffer->GreenTransport, BufferCreator->GreenClientCreator etc; made them use consumer/producer features of twisted; removed doctests added a unittest --- eventlet/twistedutil/protocol.py | 380 +++++++++++------------- eventlet/twistedutil/protocols/basic.py | 12 +- examples/twisted_basic_client.py | 16 +- examples/twisted_basic_server.py | 22 +- greentest/test__twistedutil_protocol.py | 211 +++++++++++++ 5 files changed, 415 insertions(+), 226 deletions(-) create mode 100644 greentest/test__twistedutil_protocol.py diff --git a/eventlet/twistedutil/protocol.py b/eventlet/twistedutil/protocol.py index 537a568..fef29eb 100644 --- a/eventlet/twistedutil/protocol.py +++ b/eventlet/twistedutil/protocol.py @@ -1,44 +1,96 @@ """Basic twisted protocols converted to synchronous mode""" +import sys from twisted.internet.protocol import Protocol as twistedProtocol from twisted.internet.error import ConnectionDone -from twisted.internet.protocol import Factory, _InstanceFactory -from twisted.internet import defer +from twisted.internet.protocol import Factory, ClientFactory from eventlet.api import spawn -from eventlet.coros import queue -from eventlet.twistedutil import block_on +from eventlet.coros import queue, event +class Producer2Event(object): -class BaseBuffer(object): + # implements IPushProducer + + def __init__(self, event): + self.event = event + + def resumeProducing(self): + self.event.send(1) + + def pauseProducing(self): + self.event.reset() + + def stopProducing(self): + del self.event + +class GreenTransportBase(object): + + write_event = None def build_protocol(self): - # note to subclassers: self._queue must have send and send_exception that are non-blocking - self.protocol = self.protocol_class() - self._queue = self.protocol._queue = queue() - return self.protocol + # note to subclassers: self._queue must have send and send_exception that never block + self._queue = queue() + protocol = self.protocol_class(self, self._queue) + return protocol def _wait(self): - return self._queue.wait() + self.transport.resumeProducing() + try: + return self._queue.wait() + finally: + self.transport.pauseProducing() + + def write(self, data): + self.transport.write(data) + if self.write_event is not None: + self.write_event.wait() - @property - def transport(self): - return self.protocol.transport - def __getattr__(self, item): - return getattr(self.protocol.transport, item) + if item=='transport': + raise AttributeError(item) + return getattr(self.transport, item) + def resumeProducing(self): + self.paused -= 1 + if self.paused==0: + self.transport.resumeProducing() + + def pauseProducing(self): + self.paused += 1 + if self.paused==1: + self.transport.pauseProducing() + + def init_transport_producer(self, transport): + transport.pauseProducing() + self.paused = 1 + + def init_transport(self, transport): + self.init_transport_producer(transport) + ev = event() + ev.send(1) + transport.registerProducer(Producer2Event(ev), True) + self.write_event = ev + self.transport = transport class Protocol(twistedProtocol): + def __init__(self, gtransport, queue): + self.gtransport = gtransport + self._queue = queue + + def connectionMade(self): + self.gtransport.init_transport(self.transport) + del self.gtransport + def dataReceived(self, data): self._queue.send(data) def connectionLost(self, reason): self._queue.send_exception(reason.type, reason.value, reason.tb) - self._queue = None + del self._queue -class Unbuffered(BaseBuffer): +class UnbufferedTransport(GreenTransportBase): protocol_class = Protocol @@ -47,31 +99,6 @@ class Unbuffered(BaseBuffer): Return '' if connection was closed cleanly, raise the exception if it was closed in a non clean fashion. After that all successive calls return ''. - - >>> PORT = setup_server_tcp(exit='clean') - >>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT) - >>> buf.write('hello') - >>> buf.recv() - 'you said hello. ' - >>> buf.recv() - 'BYE' - >>> buf.recv() - '' - - #>>> PORT = setup_server_tcp(exit='reset') - #>>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT) - #>>> buf.write('hello') - #>>> buf.recv() - #'you said hello. ' - #>>> buf.recv() - #Traceback - # ... - #ConnectionLost - #>>> buf.recv() - #'' - - Note that server 'BYE' was received by twisted, but it was thrown away. Use Buffer if you - want to keep the received data. """ if self._queue is None: return '' @@ -80,7 +107,7 @@ class Unbuffered(BaseBuffer): except ConnectionDone: self._queue = None return '' - except Exception: + except: self._queue = None raise @@ -90,143 +117,73 @@ class Unbuffered(BaseBuffer): return self def next(self): - """ - >>> PORT = setup_server_tcp(exit='clean') - >>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT) - >>> buf.write('hello') - >>> for data in buf: - ... print `data` - 'you said hello. ' - 'BYE' - """ result = self.recv() if not result: raise StopIteration return result -class Buffer(BaseBuffer): +class GreenTransport(GreenTransportBase): protocol_class = Protocol def __init__(self): self.buf = '' + self._error = None + def _wait(self): + # don't pause/resume producer here; read and recv methods will do it themselves + return self._queue.wait() + def read(self, size=-1): - """Like file's read(). - - >>> PORT = setup_server_tcp(exit='clean') - >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) - >>> buf.write('hello') - >>> buf.read(9) - 'you said ' - >>> buf.read(999) - 'hello. BYE' - >>> buf.read(9) - '' - >>> buf.read(1) - '' - >>> print buf._queue - None - - >>> PORT = setup_server_tcp(exit='clean') - >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) - >>> buf.write('world') - >>> buf.read() - 'you said world. BYE' - >>> buf.read() - '' - - #>>> PORT = setup_server_tcp(exit='reset') - #>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) - #>>> buf.write('whoa') - #>>> buf.read(4) - #'you ' - #>>> buf.read() - #Traceback - # ... - #ConnectionLost: - #>>> buf._queue - #None - #>>> buf.read(4) - #'said' - #>>> buf.read() - #' whoa. BYE' - """ if self._queue is not None: + resumed = False try: while len(self.buf) < size or size < 0: - recvd = self._wait() - self.buf += recvd + if not resumed: + self.resumeProducing() + resumed = True + self.buf += self._wait() except ConnectionDone: self._queue = None - except Exception: + except: self._queue = None - # signal the error, but keep buf intact for possible inspection - raise + self._error = sys.exc_info() + finally: + if resumed: + self.pauseProducing() if size>=0: result, self.buf = self.buf[:size], self.buf[size:] else: result, self.buf = self.buf, '' + if not result and self._error is not None: + error = self._error + self._error = None + raise error[0], error[1], error[2] return result def recv(self, buflen=None): - """Like socket's recv(). - - >>> PORT = setup_server_tcp(exit='clean') - >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) - >>> buf.write('hello') - >>> buf.recv() - 'you said hello. ' - >>> buf.recv() - 'BYE' - >>> buf.recv() - '' - >>> buf.recv() - '' - - >>> PORT = setup_server_tcp(exit='clean') - >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) - >>> buf.write('whoa') - >>> buf.recv(9) - 'you said ' - >>> buf.recv(999) - 'whoa. ' - >>> buf.recv(9) - 'BYE' - >>> buf.recv(1) - '' - - #>>> PORT = setup_server_tcp(exit='reset') - #>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) - #>>> buf.write('whoa') - #>>> buf.recv() - #'you said whoa. ' - #>>> buf.recv() - #Traceback - # ConnectionLost - #>>> buf.recv() - #'BYE' - #'hello. ' - #>>> buf.read(9) - #'BYE' - #>>> buf.read(1) - """ if self._queue is not None and not self.buf: + self.resumeProducing() try: recvd = self._wait() - #print 'recvd: %r' % recvd + #print 'received %r' % recvd self.buf += recvd except ConnectionDone: self._queue = None - except Exception: + except: self._queue = None - # signal the error, but if buf had any data keep it - raise + self._error = sys.exc_info() + finally: + self.pauseProducing() if buflen is None: result, self.buf = self.buf, '' else: result, self.buf = self.buf[:buflen], self.buf[buflen:] + if not result and self._error is not None: + error = self._error + self._error = None + raise error[0], error[1], error[2] return result # iterator protocol: @@ -235,103 +192,102 @@ class Buffer(BaseBuffer): return self def next(self): - """ - >>> PORT = setup_server_tcp(exit='clean') - >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) - >>> buf.write('hello') - >>> for data in buf: - ... print `data` - 'you said hello. ' - 'BYE' - """ res = self.recv() if not res: raise StopIteration return res -def sync_connect(reactor, connect_func, protocol_instance, address_args, *args, **kwargs): - d = defer.Deferred() - f = _InstanceFactory(reactor, protocol_instance, d) - connect_func(*(address_args + (f,) + args), **kwargs) - protocol = block_on(d) +class GreenInstanceFactory(ClientFactory): -class BufferCreator(object): + def __init__(self, instance, event): + self.instance = instance + self.event = event - buffer_class = Buffer + def buildProtocol(self, addr): + self.event.send(self.instance) + return self.instance - def __init__(self, reactor, buffer_class=None, *args, **kwargs): + def clientConnectionFailed(self, connector, reason): + self.event.send_exception(reason.type, reason.value, reason.tb) + + +class GreenClientCreator(object): + + gtransport_class = GreenTransport + + def __init__(self, reactor=None, klass=None, *args, **kwargs): + if reactor is None: + from twisted.internet import reactor self.reactor = reactor - if buffer_class is not None: - self.buffer_class = buffer_class + if klass is not None: + self.gtransport_class = klass self.args = args self.kwargs = kwargs + def _make_transport_and_factory(self): + gtransport = self.gtransport_class(*self.args, **self.kwargs) + protocol = gtransport.build_protocol() + factory = GreenInstanceFactory(protocol, event()) + return gtransport, factory + def connectTCP(self, host, port, *args, **kwargs): - buffer = self.buffer_class(*self.args, **self.kwargs) - protocol = buffer.build_protocol() - sync_connect(self.reactor, self.reactor.connectTCP, protocol, (host, port), *args, **kwargs) - return buffer + gtransport, factory = self._make_transport_and_factory() + self.reactor.connectTCP(host, port, factory, *args, **kwargs) + factory.event.wait() + return gtransport def connectSSL(self, host, port, *args, **kwargs): - buffer = self.buffer_class(*self.args, **self.kwargs) - protocol = buffer.build_protocol() - sync_connect(self.reactor, self.reactor.connectSSL, protocol, (host, port), *args, **kwargs) - return buffer + gtransport, factory = self._make_transport_and_factory() + self.reactor.connectSSL(host, port, factory, *args, **kwargs) + factory.event.wait() + return gtransport def connectTLS(self, host, port, *args, **kwargs): - buffer = self.buffer_class(*self.args, **self.kwargs) - protocol = buffer.build_protocol() - sync_connect(self.reactor, self.reactor.connectTLS, protocol, (host, port), *args, **kwargs) - return buffer + gtransport, factory = self._make_transport_and_factory() + self.reactor.connectTLS(host, port, factory, *args, **kwargs) + factory.event.wait() + return gtransport def connectUNIX(self, address, *args, **kwargs): - buffer = self.buffer_class(*self.args, **self.kwargs) - protocol = buffer.build_protocol() - sync_connect(self.reactor, self.reactor.connectUNIX, protocol, (address,), *args, **kwargs) - return buffer + gtransport, factory = self._make_transport_and_factory() + self.reactor.connectUNIX(address, factory, *args, **kwargs) + factory.event.wait() + return gtransport + + def connectSRV(self, service, domain, *args, **kwargs): + SRVConnector = kwargs.pop('ConnectorClass', None) + if SRVConnector is None: + from twisted.names.srvconnect import SRVConnector + gtransport, factory = self._make_transport_and_factory() + c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs) + c.connect() + factory.event.wait() + return gtransport + + def connect(self, required_args, ConnectorClass, *rest_args, **rest_kwargs): + gtransport, factory = self._make_transport_and_factory() + args = required_args + (factory, ) + rest_args + c = ConnectorClass(*args, **rest_kwargs) + c.connect() + factory.event.wait() + return gtransport + class SpawnFactory(Factory): - buffer_class = Buffer + gtransport_class = GreenTransport - def __init__(self, handler, buffer_class=None, *args, **kwargs): + def __init__(self, handler, gtransport_class=None, *args, **kwargs): self.handler = handler - if buffer_class is not None: - self.buffer_class = buffer_class + if gtransport_class is not None: + self.gtransport_class = gtransport_class self.args = args self.kwargs = kwargs def buildProtocol(self, addr): - buffer = self.buffer_class(*self.args, **self.kwargs) - protocol = buffer.build_protocol() + gtransport = self.gtransport_class(*self.args, **self.kwargs) + protocol = gtransport.build_protocol() protocol.factory = self - spawn(self.handler, buffer) + spawn(self.handler, gtransport) return protocol - - -def __setup_server_tcp(exit='clean', delay=0.1, port=0): - from eventlet.green import socket - from eventlet.api import sleep - s = socket.socket() - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(('127.0.0.1', port)) - port = s.getsockname()[1] - s.listen(5) - s.settimeout(delay+1) - def serve(): - conn, addr = s.accept() - conn.settimeout(delay+1) - hello = conn.recv(128) - conn.sendall('you said %s. ' % hello) - sleep(delay) - conn.sendall('BYE') - conn.close() - spawn(serve) - return port - -if __name__ == '__main__': - import doctest - from twisted.internet import reactor - doctest.testmod(extraglobs = {'setup_server_tcp': __setup_server_tcp}) - diff --git a/eventlet/twistedutil/protocols/basic.py b/eventlet/twistedutil/protocols/basic.py index f1328d2..1ffd3ae 100644 --- a/eventlet/twistedutil/protocols/basic.py +++ b/eventlet/twistedutil/protocols/basic.py @@ -1,16 +1,24 @@ from twisted.protocols import basic from twisted.internet.error import ConnectionDone -from eventlet.twistedutil.protocol import BaseBuffer +from eventlet.twistedutil.protocol import GreenTransportBase class LineOnlyReceiver(basic.LineOnlyReceiver): + def __init__(self, gtransport, queue): + self.gtransport = gtransport + self._queue = queue + + def connectionMade(self): + self.gtransport.init_transport(self.transport) + del self.gtransport + def lineReceived(self, line): self._queue.send(line) def connectionLost(self, reason): self._queue.send_exception(reason.type, reason.value, reason.tb) -class LineOnlyReceiverBuffer(BaseBuffer): +class LineOnlyReceiverTransport(GreenTransportBase): protocol_class = LineOnlyReceiver diff --git a/examples/twisted_basic_client.py b/examples/twisted_basic_client.py index b8f18e8..3f112ca 100644 --- a/examples/twisted_basic_client.py +++ b/examples/twisted_basic_client.py @@ -1,17 +1,21 @@ +"""Example for GreenTransport and GreenClientCreator. + +In this example reactor is started implicitly upon the first +use of a blocking function. +""" from twisted.internet import ssl from twisted.internet.error import ConnectionClosed -from eventlet.twistedutil.protocol import BufferCreator -from eventlet.twistedutil.protocols.basic import LineOnlyReceiverBuffer - +from eventlet.twistedutil.protocol import GreenClientCreator +from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport from twisted.internet import reactor -# read from TCP connection using default Buffer -conn = BufferCreator(reactor).connectTCP('www.google.com', 80) +# read from TCP connection +conn = GreenClientCreator(reactor).connectTCP('www.google.com', 80) conn.write('GET / HTTP/1.0\r\n\r\n') print conn.read() # read from SSL connection line by line -conn = BufferCreator(reactor, LineOnlyReceiverBuffer).connectSSL('sf.net', 443, ssl.ClientContextFactory()) +conn = GreenClientCreator(reactor, LineOnlyReceiverTransport).connectSSL('sf.net', 443, ssl.ClientContextFactory()) conn.write('GET / HTTP/1.0\r\n\r\n') try: for num, line in enumerate(conn): diff --git a/examples/twisted_basic_server.py b/examples/twisted_basic_server.py index 41f646a..c21bd2f 100644 --- a/examples/twisted_basic_server.py +++ b/examples/twisted_basic_server.py @@ -1,6 +1,13 @@ +"""Simple chat demo application. +Listen on port 8007 and re-send all the data received to other participants. + +Demonstrates how to + * plug in eventlet into a twisted application (join_reactor) + * how to use SpawnFactory to start a new greenlet for each new request. +""" from eventlet.twistedutil import join_reactor from eventlet.twistedutil.protocol import SpawnFactory -from eventlet.twistedutil.protocols.basic import LineOnlyReceiverBuffer +from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport class Chat: @@ -10,13 +17,15 @@ class Chat: def handler(self, conn): peer = conn.getPeer() print 'new connection from %s' % (peer, ) + conn.write("Welcome! There're %s participants already\n" % (len(self.participants))) self.participants.append(conn) try: for line in conn: - print 'received from %s: %s' % (peer, line) - for buddy in self.participants: - if buddy is not conn: - buddy.sendline('from %s: %s' % (peer, line)) + if line: + print 'received from %s: %s' % (peer, line) + for buddy in self.participants: + if buddy is not conn: + buddy.sendline('from %s: %s' % (peer, line)) except Exception, ex: print peer, ex else: @@ -26,5 +35,6 @@ class Chat: chat = Chat() from twisted.internet import reactor -reactor.listenTCP(8007, SpawnFactory(chat.handler, LineOnlyReceiverBuffer)) +reactor.listenTCP(8007, SpawnFactory(chat.handler, LineOnlyReceiverTransport)) reactor.run() + diff --git a/greentest/test__twistedutil_protocol.py b/greentest/test__twistedutil_protocol.py new file mode 100644 index 0000000..842408d --- /dev/null +++ b/greentest/test__twistedutil_protocol.py @@ -0,0 +1,211 @@ +from twisted.internet import reactor +from greentest import exit_unless_twisted +exit_unless_twisted() + +import sys +import unittest +from twisted.internet.error import ConnectionLost, ConnectionDone +from twisted.python import failure + +import eventlet.twistedutil.protocol as pr +from eventlet.api import spawn, sleep, with_timeout, call_after +from eventlet.green import socket + +DELAY=0.01 + +def setup_server_socket(self, delay=DELAY, port=0): + s = socket.socket() + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(('127.0.0.1', port)) + port = s.getsockname()[1] + s.listen(5) + s.settimeout(delay*3) + def serve(): + conn, addr = s.accept() + conn.settimeout(delay+1) + try: + hello = conn.recv(128) + except socket.timeout: + return + conn.sendall('you said %s. ' % hello) + sleep(delay) + conn.sendall('BYE') + sleep(delay) + #conn.close() + spawn(serve) + return port + +def setup_server_SpawnFactory(self, delay=DELAY, port=0): + def handle(conn): + port.stopListening() + hello = conn.recv() + conn.write('you said %s. ' % hello) + sleep(delay) + conn.write('BYE') + sleep(delay) + conn.loseConnection() + port = reactor.listenTCP(0, pr.SpawnFactory(handle, pr.UnbufferedTransport)) + return port.getHost().port + + +class TestCase(unittest.TestCase): + + def setUp(self): + port = self.setup_server() + self.conn = self.connector.connectTCP('127.0.0.1', port) + +class TestUnbufferedTransport(TestCase): + + connector = pr.GreenClientCreator(reactor, pr.UnbufferedTransport) + setup_server = setup_server_socket + + def test_recv(self): + self.conn.write('hello') + self.assertEqual(self.conn.recv(), 'you said hello. ') + self.assertEqual(self.conn.recv(), 'BYE') + self.assertEqual(self.conn.recv(), '') + self.assertEqual(self.conn.recv(), '') + + def test_recv_error(self): + self.conn.write('hello') + self.assertEqual(self.conn.recv(), 'you said hello. ') + try: + 1/0 + except: + f = failure.Failure() + spawn(self.conn.protocol.connectionLost, f) + self.assertRaises(ZeroDivisionError, self.conn.recv) + + def test_iterator(self): + self.conn.write('hello') + i = iter(self.conn) + self.assertEqual(i.next(), 'you said hello. ') + self.assertEqual(i.next(), 'BYE') + self.assertRaises(StopIteration, i.next) + + +class TestUnbufferedTransport_SpawnFactory(TestUnbufferedTransport): + setup_server = setup_server_SpawnFactory + + +class TestTransport(pr.GreenTransportBase): + + protocol_class = pr.Protocol + + def recv(self): + return self._wait() + +class TestError(TestCase): + + connector = pr.GreenClientCreator(reactor, TestTransport) + setup_server = setup_server_socket + + def test_error(self): + self.conn.write('hello') + self.assertEqual(self.conn.recv(), 'you said hello. ') + self.assertEqual(self.conn.recv(), 'BYE') + self.assertRaises(ConnectionDone, self.conn.recv) + +class TestError_SpawnFactory(TestError): + setup_server = setup_server_SpawnFactory + +class TestGreenTransport(TestCase): + + connector = pr.GreenClientCreator(reactor, pr.GreenTransport) + setup_server = setup_server_socket + + def test_read(self): + self.conn.write('hello') + self.assertEqual(self.conn.read(9), 'you said ') + self.assertEqual(self.conn.read(999), 'hello. BYE') + self.assertEqual(self.conn.read(9), '') + self.assertEqual(self.conn.read(1), '') + self.assertEqual(None, self.conn._queue) + + def test_read2(self): + self.conn.write('world') + self.assertEqual(self.conn.read(), 'you said world. BYE') + self.assertEqual(self.conn.read(), '') + self.assertEqual(self.conn.recv(), '') + + def test_read_error(self): + self.conn.write('hello') + self.assertEqual(self.conn.read(9), 'you said ') + self.assertEqual(self.conn.recv(), 'hello. ') + sleep(DELAY*1.5) # make sure the rest of data arrives + try: + 1/0 + except: + #self.conn.loseConnection(failure.Failure()) # does not work, why? + spawn(self.conn._queue.send_exception, *sys.exc_info()) + self.assertEqual(self.conn.read(9), 'BYE') + self.assertRaises(ZeroDivisionError, self.conn.read, 9) + self.assertEqual(None, self.conn._queue) + self.assertEqual(self.conn.read(1), '') + self.assertEqual(self.conn.read(1), '') + + def test_recv(self): + self.conn.write('hello') + self.assertEqual('you said hello. ', self.conn.recv()) + self.assertEqual('BYE', self.conn.recv()) + self.assertEqual('', self.conn.recv()) + self.assertEqual('', self.conn.recv()) + + def test_recv2(self): + self.conn.write('whoa') + self.assertEqual('you said ', self.conn.recv(9)) + self.assertEqual('whoa. ', self.conn.recv(999)) + self.assertEqual('BYE', self.conn.recv(9)) + self.assertEqual('', self.conn.recv(1)) + self.assertEqual('', self.conn.recv()) + self.assertEqual('', self.conn.read()) + + def test_recv_error(self): + self.conn.write('hello') + self.assertEqual('you said hello. ', self.conn.recv()) + sleep(DELAY*1.5) # make sure the rest of data arrives + try: + 1/0 + except: + #self.conn.loseConnection(failure.Failure()) # does not work, why? + spawn(self.conn._queue.send_exception, *sys.exc_info()) + self.assertEqual('BYE', self.conn.recv()) + self.assertRaises(ZeroDivisionError, self.conn.recv, 9) + self.assertEqual(None, self.conn._queue) + self.assertEqual('', self.conn.recv(1)) + self.assertEqual('', self.conn.recv()) + + def test_iterator(self): + self.conn.write('hello') + self.assertEqual('you said hello. ', self.conn.next()) + self.assertEqual('BYE', self.conn.next()) + self.assertRaises(StopIteration, self.conn.next) + + _tests = [x for x in locals().keys() if x.startswith('test_')] + + def test_resume_producing(self): + for test in self._tests: + self.setUp() + self.conn.resumeProducing() + getattr(self, test)() + + def test_pause_producing(self): + self.conn.pauseProducing() + self.conn.write('hi') + result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out') + self.assertEqual('timed out', result) + + def test_pauseresume_producing(self): + self.conn.pauseProducing() + call_after(DELAY*5, self.conn.resumeProducing) + self.conn.write('hi') + result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out') + self.assertEqual('you said hi. BYE', result) + + +class TestGreenTransport_SpawnFactory(TestGreenTransport): + setup_server = setup_server_SpawnFactory + +if __name__=='__main__': + unittest.main() +