"""Basic twisted protocols converted to synchronous mode""" from twisted.internet.protocol import Protocol as twistedProtocol from twisted.internet.error import ConnectionDone from twisted.internet.protocol import Factory, _InstanceFactory from twisted.internet import defer from eventlet.api import spawn from eventlet.channel import channel from eventlet.twistedutil import block_on class BaseBuffer(object): def build_protocol(self): self.protocol = self.protocol_class() self.channel = self.protocol.channel = channel() return self.protocol @property def transport(self): return self.protocol.transport def __getattr__(self, item): return getattr(self.protocol.transport, item) class Protocol(twistedProtocol): def dataReceived(self, data): spawn(self.channel.send, data) def connectionLost(self, reason): spawn(self.channel.send_exception, reason.value) self.channel = None # QQQ channel creates a greenlet. does it actually finish and memory is reclaimed? class Unbuffered(BaseBuffer): protocol_class = Protocol def recv(self): """Receive a single chunk of undefined size. Return '' if connection was closed cleanly, raise the exception if it was closed in a non clean fashion. After that all successive calls return ''. >>> PORT = setup_server_tcp(exit='clean') >>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT) >>> buf.write('hello') >>> buf.recv() 'you said hello. ' >>> buf.recv() 'BYE' >>> buf.recv() '' #>>> PORT = setup_server_tcp(exit='reset') #>>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT) #>>> buf.write('hello') #>>> buf.recv() #'you said hello. ' #>>> buf.recv() #Traceback # ... #ConnectionLost #>>> buf.recv() #'' Note that server 'BYE' was received by twisted, but it was thrown away. Use Buffer if you want to keep the received data. """ if self.channel is None: return '' try: return self.channel.receive() except ConnectionDone: self.channel = None return '' except Exception: self.channel = None raise # iterator protocol: def __iter__(self): return self def next(self): """ >>> PORT = setup_server_tcp(exit='clean') >>> buf = BufferCreator(reactor, Unbuffered).connectTCP('127.0.0.1', PORT) >>> buf.write('hello') >>> for data in buf: ... print `data` 'you said hello. ' 'BYE' """ result = self.recv() if not result: raise StopIteration return result class Buffer(BaseBuffer): protocol_class = Protocol def __init__(self): self.buf = '' def read(self, size=-1): """Like file's read(). >>> PORT = setup_server_tcp(exit='clean') >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) >>> buf.write('hello') >>> buf.read(9) 'you said ' >>> buf.read(999) 'hello. BYE' >>> buf.read(9) '' >>> buf.read(1) '' >>> print buf.channel None >>> PORT = setup_server_tcp(exit='clean') >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) >>> buf.write('world') >>> buf.read() 'you said world. BYE' >>> buf.read() '' #>>> PORT = setup_server_tcp(exit='reset') #>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) #>>> buf.write('whoa') #>>> buf.read(4) #'you ' #>>> buf.read() #Traceback # ... #ConnectionLost: #>>> buf.channel #None #>>> buf.read(4) #'said' #>>> buf.read() #' whoa. BYE' """ if self.channel is not None: try: while len(self.buf) < size or size < 0: recvd = self.channel.receive() self.buf += recvd except ConnectionDone: self.channel = None except Exception: self.channel = None # signal the error, but keep buf intact for possible inspection raise if size>=0: result, self.buf = self.buf[:size], self.buf[size:] else: result, self.buf = self.buf, '' return result def recv(self, buflen=None): """Like socket's recv(). >>> PORT = setup_server_tcp(exit='clean') >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) >>> buf.write('hello') >>> buf.recv() 'you said hello. ' >>> buf.recv() 'BYE' >>> buf.recv() '' >>> buf.recv() '' >>> PORT = setup_server_tcp(exit='clean') >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) >>> buf.write('whoa') >>> buf.recv(9) 'you said ' >>> buf.recv(999) 'whoa. ' >>> buf.recv(9) 'BYE' >>> buf.recv(1) '' #>>> PORT = setup_server_tcp(exit='reset') #>>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) #>>> buf.write('whoa') #>>> buf.recv() #'you said whoa. ' #>>> buf.recv() #Traceback # ConnectionLost #>>> buf.recv() #'BYE' #'hello. ' #>>> buf.read(9) #'BYE' #>>> buf.read(1) """ if self.channel is not None and not self.buf: try: recvd = self.channel.receive() #print 'recvd: %r' % recvd self.buf += recvd except ConnectionDone: self.channel = None except Exception: self.channel = None # signal the error, but if buf had any data keep it raise if buflen is None: result, self.buf = self.buf, '' else: result, self.buf = self.buf[:buflen], self.buf[buflen:] return result # iterator protocol: def __iter__(self): return self def next(self): """ >>> PORT = setup_server_tcp(exit='clean') >>> buf = BufferCreator(reactor, Buffer).connectTCP('127.0.0.1', PORT) >>> buf.write('hello') >>> for data in buf: ... print `data` 'you said hello. ' 'BYE' """ res = self.recv() if not res: raise StopIteration return res def sync_connect(reactor, connect_func, protocol_instance, address_args, *args, **kwargs): d = defer.Deferred() f = _InstanceFactory(reactor, protocol_instance, d) connect_func(*(address_args + (f,) + args), **kwargs) protocol = block_on(d) class BufferCreator(object): buffer_class = Buffer def __init__(self, reactor, buffer_class=None, *args, **kwargs): self.reactor = reactor if buffer_class is not None: self.buffer_class = buffer_class self.args = args self.kwargs = kwargs def connectTCP(self, host, port, *args, **kwargs): buffer = self.buffer_class(*self.args, **self.kwargs) protocol = buffer.build_protocol() sync_connect(self.reactor, self.reactor.connectTCP, protocol, (host, port), *args, **kwargs) return buffer def connectSSL(self, host, port, *args, **kwargs): buffer = self.buffer_class(*self.args, **self.kwargs) protocol = buffer.build_protocol() sync_connect(self.reactor, self.reactor.connectSSL, protocol, (host, port), *args, **kwargs) return buffer def connectTLS(self, host, port, *args, **kwargs): buffer = self.buffer_class(*self.args, **self.kwargs) protocol = buffer.build_protocol() sync_connect(self.reactor, self.reactor.connectTLS, protocol, (host, port), *args, **kwargs) return buffer def connectUNIX(self, address, *args, **kwargs): buffer = self.buffer_class(*self.args, **self.kwargs) protocol = buffer.build_protocol() sync_connect(self.reactor, self.reactor.connectUNIX, protocol, (address,), *args, **kwargs) return buffer class SpawnFactory(Factory): buffer_class = Buffer def __init__(self, handler, buffer_class=None, *args, **kwargs): self.handler = handler if buffer_class is not None: self.buffer_class = buffer_class self.args = args self.kwargs = kwargs def buildProtocol(self, addr): buffer = self.buffer_class(*self.args, **self.kwargs) protocol = buffer.build_protocol() protocol.factory = self spawn(self.handler, buffer) return protocol def __setup_server_tcp(exit='clean', delay=0.1, port=0): from eventlet.green import socket from eventlet.api import sleep s = socket.socket() s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('127.0.0.1', port)) port = s.getsockname()[1] s.listen(5) s.settimeout(delay+1) def serve(): conn, addr = s.accept() conn.settimeout(delay+1) hello = conn.recv(128) conn.sendall('you said %s. ' % hello) sleep(delay) conn.sendall('BYE') conn.close() spawn(serve) return port if __name__ == '__main__': import doctest from twisted.internet import reactor doctest.testmod(extraglobs = {'setup_server_tcp': __setup_server_tcp})