renamed Buffer->GreenTransport, BufferCreator->GreenClientCreator etc; made them use consumer/producer features of twisted; removed doctests added a unittest

This commit is contained in:
Denis Bilenko
2008-12-10 16:35:01 +06:00
parent 541dabb72f
commit ecbdf00338
5 changed files with 415 additions and 226 deletions

View File

@@ -1,44 +1,96 @@
"""Basic twisted protocols converted to synchronous mode"""
import sys
from twisted.internet.protocol import Protocol as twistedProtocol
from twisted.internet.error import ConnectionDone
from twisted.internet.protocol import Factory, _InstanceFactory
from twisted.internet import defer
from twisted.internet.protocol import Factory, ClientFactory
from eventlet.api import spawn
from eventlet.coros import queue
from eventlet.twistedutil import block_on
from eventlet.coros import queue, event
class Producer2Event(object):
class BaseBuffer(object):
# implements IPushProducer
def __init__(self, event):
self.event = event
def resumeProducing(self):
self.event.send(1)
def pauseProducing(self):
self.event.reset()
def stopProducing(self):
del self.event
class GreenTransportBase(object):
write_event = None
def build_protocol(self):
# note to subclassers: self._queue must have send and send_exception that are non-blocking
self.protocol = self.protocol_class()
self._queue = self.protocol._queue = queue()
return self.protocol
# note to subclassers: self._queue must have send and send_exception that never block
self._queue = queue()
protocol = self.protocol_class(self, self._queue)
return protocol
def _wait(self):
return self._queue.wait()
self.transport.resumeProducing()
try:
return self._queue.wait()
finally:
self.transport.pauseProducing()
def write(self, data):
self.transport.write(data)
if self.write_event is not None:
self.write_event.wait()
@property
def transport(self):
return self.protocol.transport
def __getattr__(self, item):
return getattr(self.protocol.transport, item)
if item=='transport':
raise AttributeError(item)
return getattr(self.transport, item)
def resumeProducing(self):
self.paused -= 1
if self.paused==0:
self.transport.resumeProducing()
def pauseProducing(self):
self.paused += 1
if self.paused==1:
self.transport.pauseProducing()
def init_transport_producer(self, transport):
transport.pauseProducing()
self.paused = 1
def init_transport(self, transport):
self.init_transport_producer(transport)
ev = event()
ev.send(1)
transport.registerProducer(Producer2Event(ev), True)
self.write_event = ev
self.transport = transport
class Protocol(twistedProtocol):
def __init__(self, gtransport, queue):
self.gtransport = gtransport
self._queue = queue
def connectionMade(self):
self.gtransport.init_transport(self.transport)
del self.gtransport
def dataReceived(self, data):
self._queue.send(data)
def connectionLost(self, reason):
self._queue.send_exception(reason.type, reason.value, reason.tb)
self._queue = None
del self._queue
class Unbuffered(BaseBuffer):
class UnbufferedTransport(GreenTransportBase):
protocol_class = Protocol
@@ -47,31 +99,6 @@ class Unbuffered(BaseBuffer):
Return '' if connection was closed cleanly, raise the exception if it was closed
in a non clean fashion. After that all successive calls return ''.
>>> PORT = setup_server_tcp(exit='clean')
>>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT)
>>> buf.write('hello')
>>> buf.recv()
'you said hello. '
>>> buf.recv()
'BYE'
>>> buf.recv()
''
#>>> PORT = setup_server_tcp(exit='reset')
#>>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT)
#>>> buf.write('hello')
#>>> buf.recv()
#'you said hello. '
#>>> buf.recv()
#Traceback
# ...
#ConnectionLost
#>>> buf.recv()
#''
Note that server 'BYE' was received by twisted, but it was thrown away. Use Buffer if you
want to keep the received data.
"""
if self._queue is None:
return ''
@@ -80,7 +107,7 @@ class Unbuffered(BaseBuffer):
except ConnectionDone:
self._queue = None
return ''
except Exception:
except:
self._queue = None
raise
@@ -90,143 +117,73 @@ class Unbuffered(BaseBuffer):
return self
def next(self):
"""
>>> PORT = setup_server_tcp(exit='clean')
>>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT)
>>> buf.write('hello')
>>> for data in buf:
... print `data`
'you said hello. '
'BYE'
"""
result = self.recv()
if not result:
raise StopIteration
return result
class Buffer(BaseBuffer):
class GreenTransport(GreenTransportBase):
protocol_class = Protocol
def __init__(self):
self.buf = ''
self._error = None
def _wait(self):
# don't pause/resume producer here; read and recv methods will do it themselves
return self._queue.wait()
def read(self, size=-1):
"""Like file's read().
>>> PORT = setup_server_tcp(exit='clean')
>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT)
>>> buf.write('hello')
>>> buf.read(9)
'you said '
>>> buf.read(999)
'hello. BYE'
>>> buf.read(9)
''
>>> buf.read(1)
''
>>> print buf._queue
None
>>> PORT = setup_server_tcp(exit='clean')
>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT)
>>> buf.write('world')
>>> buf.read()
'you said world. BYE'
>>> buf.read()
''
#>>> PORT = setup_server_tcp(exit='reset')
#>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT)
#>>> buf.write('whoa')
#>>> buf.read(4)
#'you '
#>>> buf.read()
#Traceback
# ...
#ConnectionLost:
#>>> buf._queue
#None
#>>> buf.read(4)
#'said'
#>>> buf.read()
#' whoa. BYE'
"""
if self._queue is not None:
resumed = False
try:
while len(self.buf) < size or size < 0:
recvd = self._wait()
self.buf += recvd
if not resumed:
self.resumeProducing()
resumed = True
self.buf += self._wait()
except ConnectionDone:
self._queue = None
except Exception:
except:
self._queue = None
# signal the error, but keep buf intact for possible inspection
raise
self._error = sys.exc_info()
finally:
if resumed:
self.pauseProducing()
if size>=0:
result, self.buf = self.buf[:size], self.buf[size:]
else:
result, self.buf = self.buf, ''
if not result and self._error is not None:
error = self._error
self._error = None
raise error[0], error[1], error[2]
return result
def recv(self, buflen=None):
"""Like socket's recv().
>>> PORT = setup_server_tcp(exit='clean')
>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT)
>>> buf.write('hello')
>>> buf.recv()
'you said hello. '
>>> buf.recv()
'BYE'
>>> buf.recv()
''
>>> buf.recv()
''
>>> PORT = setup_server_tcp(exit='clean')
>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT)
>>> buf.write('whoa')
>>> buf.recv(9)
'you said '
>>> buf.recv(999)
'whoa. '
>>> buf.recv(9)
'BYE'
>>> buf.recv(1)
''
#>>> PORT = setup_server_tcp(exit='reset')
#>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT)
#>>> buf.write('whoa')
#>>> buf.recv()
#'you said whoa. '
#>>> buf.recv()
#Traceback
# ConnectionLost
#>>> buf.recv()
#'BYE'
#'hello. '
#>>> buf.read(9)
#'BYE'
#>>> buf.read(1)
"""
if self._queue is not None and not self.buf:
self.resumeProducing()
try:
recvd = self._wait()
#print 'recvd: %r' % recvd
#print 'received %r' % recvd
self.buf += recvd
except ConnectionDone:
self._queue = None
except Exception:
except:
self._queue = None
# signal the error, but if buf had any data keep it
raise
self._error = sys.exc_info()
finally:
self.pauseProducing()
if buflen is None:
result, self.buf = self.buf, ''
else:
result, self.buf = self.buf[:buflen], self.buf[buflen:]
if not result and self._error is not None:
error = self._error
self._error = None
raise error[0], error[1], error[2]
return result
# iterator protocol:
@@ -235,103 +192,102 @@ class Buffer(BaseBuffer):
return self
def next(self):
"""
>>> PORT = setup_server_tcp(exit='clean')
>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT)
>>> buf.write('hello')
>>> for data in buf:
... print `data`
'you said hello. '
'BYE'
"""
res = self.recv()
if not res:
raise StopIteration
return res
def sync_connect(reactor, connect_func, protocol_instance, address_args, *args, **kwargs):
d = defer.Deferred()
f = _InstanceFactory(reactor, protocol_instance, d)
connect_func(*(address_args + (f,) + args), **kwargs)
protocol = block_on(d)
class GreenInstanceFactory(ClientFactory):
class BufferCreator(object):
def __init__(self, instance, event):
self.instance = instance
self.event = event
buffer_class = Buffer
def buildProtocol(self, addr):
self.event.send(self.instance)
return self.instance
def __init__(self, reactor, buffer_class=None, *args, **kwargs):
def clientConnectionFailed(self, connector, reason):
self.event.send_exception(reason.type, reason.value, reason.tb)
class GreenClientCreator(object):
gtransport_class = GreenTransport
def __init__(self, reactor=None, klass=None, *args, **kwargs):
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
if buffer_class is not None:
self.buffer_class = buffer_class
if klass is not None:
self.gtransport_class = klass
self.args = args
self.kwargs = kwargs
def _make_transport_and_factory(self):
gtransport = self.gtransport_class(*self.args, **self.kwargs)
protocol = gtransport.build_protocol()
factory = GreenInstanceFactory(protocol, event())
return gtransport, factory
def connectTCP(self, host, port, *args, **kwargs):
buffer = self.buffer_class(*self.args, **self.kwargs)
protocol = buffer.build_protocol()
sync_connect(self.reactor, self.reactor.connectTCP, protocol, (host, port), *args, **kwargs)
return buffer
gtransport, factory = self._make_transport_and_factory()
self.reactor.connectTCP(host, port, factory, *args, **kwargs)
factory.event.wait()
return gtransport
def connectSSL(self, host, port, *args, **kwargs):
buffer = self.buffer_class(*self.args, **self.kwargs)
protocol = buffer.build_protocol()
sync_connect(self.reactor, self.reactor.connectSSL, protocol, (host, port), *args, **kwargs)
return buffer
gtransport, factory = self._make_transport_and_factory()
self.reactor.connectSSL(host, port, factory, *args, **kwargs)
factory.event.wait()
return gtransport
def connectTLS(self, host, port, *args, **kwargs):
buffer = self.buffer_class(*self.args, **self.kwargs)
protocol = buffer.build_protocol()
sync_connect(self.reactor, self.reactor.connectTLS, protocol, (host, port), *args, **kwargs)
return buffer
gtransport, factory = self._make_transport_and_factory()
self.reactor.connectTLS(host, port, factory, *args, **kwargs)
factory.event.wait()
return gtransport
def connectUNIX(self, address, *args, **kwargs):
buffer = self.buffer_class(*self.args, **self.kwargs)
protocol = buffer.build_protocol()
sync_connect(self.reactor, self.reactor.connectUNIX, protocol, (address,), *args, **kwargs)
return buffer
gtransport, factory = self._make_transport_and_factory()
self.reactor.connectUNIX(address, factory, *args, **kwargs)
factory.event.wait()
return gtransport
def connectSRV(self, service, domain, *args, **kwargs):
SRVConnector = kwargs.pop('ConnectorClass', None)
if SRVConnector is None:
from twisted.names.srvconnect import SRVConnector
gtransport, factory = self._make_transport_and_factory()
c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)
c.connect()
factory.event.wait()
return gtransport
def connect(self, required_args, ConnectorClass, *rest_args, **rest_kwargs):
gtransport, factory = self._make_transport_and_factory()
args = required_args + (factory, ) + rest_args
c = ConnectorClass(*args, **rest_kwargs)
c.connect()
factory.event.wait()
return gtransport
class SpawnFactory(Factory):
buffer_class = Buffer
gtransport_class = GreenTransport
def __init__(self, handler, buffer_class=None, *args, **kwargs):
def __init__(self, handler, gtransport_class=None, *args, **kwargs):
self.handler = handler
if buffer_class is not None:
self.buffer_class = buffer_class
if gtransport_class is not None:
self.gtransport_class = gtransport_class
self.args = args
self.kwargs = kwargs
def buildProtocol(self, addr):
buffer = self.buffer_class(*self.args, **self.kwargs)
protocol = buffer.build_protocol()
gtransport = self.gtransport_class(*self.args, **self.kwargs)
protocol = gtransport.build_protocol()
protocol.factory = self
spawn(self.handler, buffer)
spawn(self.handler, gtransport)
return protocol
def __setup_server_tcp(exit='clean', delay=0.1, port=0):
from eventlet.green import socket
from eventlet.api import sleep
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('127.0.0.1', port))
port = s.getsockname()[1]
s.listen(5)
s.settimeout(delay+1)
def serve():
conn, addr = s.accept()
conn.settimeout(delay+1)
hello = conn.recv(128)
conn.sendall('you said %s. ' % hello)
sleep(delay)
conn.sendall('BYE')
conn.close()
spawn(serve)
return port
if __name__ == '__main__':
import doctest
from twisted.internet import reactor
doctest.testmod(extraglobs = {'setup_server_tcp': __setup_server_tcp})

View File

@@ -1,16 +1,24 @@
from twisted.protocols import basic
from twisted.internet.error import ConnectionDone
from eventlet.twistedutil.protocol import BaseBuffer
from eventlet.twistedutil.protocol import GreenTransportBase
class LineOnlyReceiver(basic.LineOnlyReceiver):
def __init__(self, gtransport, queue):
self.gtransport = gtransport
self._queue = queue
def connectionMade(self):
self.gtransport.init_transport(self.transport)
del self.gtransport
def lineReceived(self, line):
self._queue.send(line)
def connectionLost(self, reason):
self._queue.send_exception(reason.type, reason.value, reason.tb)
class LineOnlyReceiverBuffer(BaseBuffer):
class LineOnlyReceiverTransport(GreenTransportBase):
protocol_class = LineOnlyReceiver

View File

@@ -1,17 +1,21 @@
"""Example for GreenTransport and GreenClientCreator.
In this example reactor is started implicitly upon the first
use of a blocking function.
"""
from twisted.internet import ssl
from twisted.internet.error import ConnectionClosed
from eventlet.twistedutil.protocol import BufferCreator
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverBuffer
from eventlet.twistedutil.protocol import GreenClientCreator
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
from twisted.internet import reactor
# read from TCP connection using default Buffer
conn = BufferCreator(reactor).connectTCP('www.google.com', 80)
# read from TCP connection
conn = GreenClientCreator(reactor).connectTCP('www.google.com', 80)
conn.write('GET / HTTP/1.0\r\n\r\n')
print conn.read()
# read from SSL connection line by line
conn = BufferCreator(reactor, LineOnlyReceiverBuffer).connectSSL('sf.net', 443, ssl.ClientContextFactory())
conn = GreenClientCreator(reactor, LineOnlyReceiverTransport).connectSSL('sf.net', 443, ssl.ClientContextFactory())
conn.write('GET / HTTP/1.0\r\n\r\n')
try:
for num, line in enumerate(conn):

View File

@@ -1,6 +1,13 @@
"""Simple chat demo application.
Listen on port 8007 and re-send all the data received to other participants.
Demonstrates how to
* plug in eventlet into a twisted application (join_reactor)
* how to use SpawnFactory to start a new greenlet for each new request.
"""
from eventlet.twistedutil import join_reactor
from eventlet.twistedutil.protocol import SpawnFactory
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverBuffer
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
class Chat:
@@ -10,13 +17,15 @@ class Chat:
def handler(self, conn):
peer = conn.getPeer()
print 'new connection from %s' % (peer, )
conn.write("Welcome! There're %s participants already\n" % (len(self.participants)))
self.participants.append(conn)
try:
for line in conn:
print 'received from %s: %s' % (peer, line)
for buddy in self.participants:
if buddy is not conn:
buddy.sendline('from %s: %s' % (peer, line))
if line:
print 'received from %s: %s' % (peer, line)
for buddy in self.participants:
if buddy is not conn:
buddy.sendline('from %s: %s' % (peer, line))
except Exception, ex:
print peer, ex
else:
@@ -26,5 +35,6 @@ class Chat:
chat = Chat()
from twisted.internet import reactor
reactor.listenTCP(8007, SpawnFactory(chat.handler, LineOnlyReceiverBuffer))
reactor.listenTCP(8007, SpawnFactory(chat.handler, LineOnlyReceiverTransport))
reactor.run()

View File

@@ -0,0 +1,211 @@
from twisted.internet import reactor
from greentest import exit_unless_twisted
exit_unless_twisted()
import sys
import unittest
from twisted.internet.error import ConnectionLost, ConnectionDone
from twisted.python import failure
import eventlet.twistedutil.protocol as pr
from eventlet.api import spawn, sleep, with_timeout, call_after
from eventlet.green import socket
DELAY=0.01
def setup_server_socket(self, delay=DELAY, port=0):
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('127.0.0.1', port))
port = s.getsockname()[1]
s.listen(5)
s.settimeout(delay*3)
def serve():
conn, addr = s.accept()
conn.settimeout(delay+1)
try:
hello = conn.recv(128)
except socket.timeout:
return
conn.sendall('you said %s. ' % hello)
sleep(delay)
conn.sendall('BYE')
sleep(delay)
#conn.close()
spawn(serve)
return port
def setup_server_SpawnFactory(self, delay=DELAY, port=0):
def handle(conn):
port.stopListening()
hello = conn.recv()
conn.write('you said %s. ' % hello)
sleep(delay)
conn.write('BYE')
sleep(delay)
conn.loseConnection()
port = reactor.listenTCP(0, pr.SpawnFactory(handle, pr.UnbufferedTransport))
return port.getHost().port
class TestCase(unittest.TestCase):
def setUp(self):
port = self.setup_server()
self.conn = self.connector.connectTCP('127.0.0.1', port)
class TestUnbufferedTransport(TestCase):
connector = pr.GreenClientCreator(reactor, pr.UnbufferedTransport)
setup_server = setup_server_socket
def test_recv(self):
self.conn.write('hello')
self.assertEqual(self.conn.recv(), 'you said hello. ')
self.assertEqual(self.conn.recv(), 'BYE')
self.assertEqual(self.conn.recv(), '')
self.assertEqual(self.conn.recv(), '')
def test_recv_error(self):
self.conn.write('hello')
self.assertEqual(self.conn.recv(), 'you said hello. ')
try:
1/0
except:
f = failure.Failure()
spawn(self.conn.protocol.connectionLost, f)
self.assertRaises(ZeroDivisionError, self.conn.recv)
def test_iterator(self):
self.conn.write('hello')
i = iter(self.conn)
self.assertEqual(i.next(), 'you said hello. ')
self.assertEqual(i.next(), 'BYE')
self.assertRaises(StopIteration, i.next)
class TestUnbufferedTransport_SpawnFactory(TestUnbufferedTransport):
setup_server = setup_server_SpawnFactory
class TestTransport(pr.GreenTransportBase):
protocol_class = pr.Protocol
def recv(self):
return self._wait()
class TestError(TestCase):
connector = pr.GreenClientCreator(reactor, TestTransport)
setup_server = setup_server_socket
def test_error(self):
self.conn.write('hello')
self.assertEqual(self.conn.recv(), 'you said hello. ')
self.assertEqual(self.conn.recv(), 'BYE')
self.assertRaises(ConnectionDone, self.conn.recv)
class TestError_SpawnFactory(TestError):
setup_server = setup_server_SpawnFactory
class TestGreenTransport(TestCase):
connector = pr.GreenClientCreator(reactor, pr.GreenTransport)
setup_server = setup_server_socket
def test_read(self):
self.conn.write('hello')
self.assertEqual(self.conn.read(9), 'you said ')
self.assertEqual(self.conn.read(999), 'hello. BYE')
self.assertEqual(self.conn.read(9), '')
self.assertEqual(self.conn.read(1), '')
self.assertEqual(None, self.conn._queue)
def test_read2(self):
self.conn.write('world')
self.assertEqual(self.conn.read(), 'you said world. BYE')
self.assertEqual(self.conn.read(), '')
self.assertEqual(self.conn.recv(), '')
def test_read_error(self):
self.conn.write('hello')
self.assertEqual(self.conn.read(9), 'you said ')
self.assertEqual(self.conn.recv(), 'hello. ')
sleep(DELAY*1.5) # make sure the rest of data arrives
try:
1/0
except:
#self.conn.loseConnection(failure.Failure()) # does not work, why?
spawn(self.conn._queue.send_exception, *sys.exc_info())
self.assertEqual(self.conn.read(9), 'BYE')
self.assertRaises(ZeroDivisionError, self.conn.read, 9)
self.assertEqual(None, self.conn._queue)
self.assertEqual(self.conn.read(1), '')
self.assertEqual(self.conn.read(1), '')
def test_recv(self):
self.conn.write('hello')
self.assertEqual('you said hello. ', self.conn.recv())
self.assertEqual('BYE', self.conn.recv())
self.assertEqual('', self.conn.recv())
self.assertEqual('', self.conn.recv())
def test_recv2(self):
self.conn.write('whoa')
self.assertEqual('you said ', self.conn.recv(9))
self.assertEqual('whoa. ', self.conn.recv(999))
self.assertEqual('BYE', self.conn.recv(9))
self.assertEqual('', self.conn.recv(1))
self.assertEqual('', self.conn.recv())
self.assertEqual('', self.conn.read())
def test_recv_error(self):
self.conn.write('hello')
self.assertEqual('you said hello. ', self.conn.recv())
sleep(DELAY*1.5) # make sure the rest of data arrives
try:
1/0
except:
#self.conn.loseConnection(failure.Failure()) # does not work, why?
spawn(self.conn._queue.send_exception, *sys.exc_info())
self.assertEqual('BYE', self.conn.recv())
self.assertRaises(ZeroDivisionError, self.conn.recv, 9)
self.assertEqual(None, self.conn._queue)
self.assertEqual('', self.conn.recv(1))
self.assertEqual('', self.conn.recv())
def test_iterator(self):
self.conn.write('hello')
self.assertEqual('you said hello. ', self.conn.next())
self.assertEqual('BYE', self.conn.next())
self.assertRaises(StopIteration, self.conn.next)
_tests = [x for x in locals().keys() if x.startswith('test_')]
def test_resume_producing(self):
for test in self._tests:
self.setUp()
self.conn.resumeProducing()
getattr(self, test)()
def test_pause_producing(self):
self.conn.pauseProducing()
self.conn.write('hi')
result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out')
self.assertEqual('timed out', result)
def test_pauseresume_producing(self):
self.conn.pauseProducing()
call_after(DELAY*5, self.conn.resumeProducing)
self.conn.write('hi')
result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out')
self.assertEqual('you said hi. BYE', result)
class TestGreenTransport_SpawnFactory(TestGreenTransport):
setup_server = setup_server_SpawnFactory
if __name__=='__main__':
unittest.main()