diff --git a/eventlet/twisteds/basic.py b/eventlet/twisteds/basic.py index af33130..002f438 100644 --- a/eventlet/twisteds/basic.py +++ b/eventlet/twisteds/basic.py @@ -1,5 +1,5 @@ """Basic twisted protocols converted to synchronous mode""" -from twisted.internet.protocol import ClientCreator +from twisted.internet.protocol import ClientCreator, Protocol as _Protocol from twisted.protocols import basic from twisted.internet import reactor from twisted.internet.error import ConnectionDone @@ -34,6 +34,7 @@ class SpawnFactory(Factory): spawn(self.handler, self.buffer_class(protocol, chan)) return protocol + class buffer_base(object): def __init__(self, protocol, channel): @@ -47,6 +48,230 @@ class buffer_base(object): def __getattr__(self, item): return getattr(self.protocol.transport, item) + +class Protocol(_Protocol): + + def dataReceived(self, data): + spawn(self.channel.send, data) + + def connectionLost(self, reason): + self.channel.send_exception(reason.value) + self.channel = None # QQQ channel creates a greenlet. does it actually finish and memory is reclaimed? + + +class unbuffered(buffer_base): + + protocol_class = Protocol + + def recv(self): + """Receive a single chunk of undefined size. + + Return '' if connection was closed cleanly, raise the exception if it was closed + in a non clean fashion. After that all successive calls return ''. + + >>> PORT = setup_server_tcp(exit='clean') + >>> buf = connectTCP('127.0.0.1', PORT, buffer_class=unbuffered) + >>> buf.write('hello') + >>> buf.recv() + 'you said hello. ' + >>> buf.recv() + 'BYE' + >>> buf.recv() + '' + + #>>> PORT = setup_server_tcp(exit='reset') + #>>> buf = connectTCP('127.0.0.1', PORT, buffer_class=unbuffered) + #>>> buf.write('hello') + #>>> buf.recv() + #'you said hello. ' + #>>> buf.recv() + #Traceback + # ... + #ConnectionLost + #>>> buf.recv() + #'' + + Note that server 'BYE' was received by twisted, but it was thrown away. Use buffer if you + want to keep the received data. + """ + if self.channel is None: + return '' + try: + return self.channel.receive() + except ConnectionDone: + self.channel = None + return '' + except Exception: + self.channel = None + raise + + # iterator protocol: + + def __iter__(self): + return self + + def next(self): + """ + >>> buf = connectTCP('127.0.0.1', setup_server_tcp(exit='clean'), buffer_class=unbuffered) + >>> buf.write('hello') + >>> for data in buf: + ... print `data` + 'you said hello. ' + 'BYE' + """ + result = self.recv() + if not result: + raise StopIteration + return result + + +class buffer(buffer_base): + + protocol_class = Protocol + + def __init__(self, *args): + buffer_base.__init__(self, *args) + self.buf = '' + + def read(self, size=-1): + """Like file's read(). + + >>> PORT = setup_server_tcp(exit='clean') + >>> buf = connectTCP('127.0.0.1', PORT) + >>> buf.write('hello') + >>> buf.read(9) + 'you said ' + >>> buf.read(999) + 'hello. BYE' + >>> buf.read(9) + '' + >>> buf.read(1) + '' + >>> print buf.channel + None + + >>> PORT = setup_server_tcp(exit='clean') + >>> buf = connectTCP('127.0.0.1', PORT) + >>> buf.write('world') + >>> buf.read() + 'you said world. BYE' + >>> buf.read() + '' + + #>>> PORT = setup_server_tcp(exit='reset') + #>>> buf = connectTCP('127.0.0.1', PORT) + #>>> buf.write('whoa') + #>>> buf.read(4) + #'you ' + #>>> buf.read() + #Traceback + # ... + #ConnectionLost: + #>>> buf.channel + #None + #>>> buf.read(4) + #'said' + #>>> buf.read() + #' whoa. BYE' + """ + if self.channel is not None: + try: + while len(self.buf) < size or size < 0: + recvd = self.channel.receive() + self.buf += recvd + except ConnectionDone: + self.channel = None + except Exception: + self.channel = None + # signal the error, but keep buf intact for possible inspection + raise + if size>=0: + result, self.buf = self.buf[:size], self.buf[size:] + else: + result, self.buf = self.buf, '' + return result + + def recv(self, buflen=None): + """Like socket's recv(). + + >>> PORT = setup_server_tcp(exit='clean') + >>> buf = connectTCP('127.0.0.1', PORT) + >>> buf.write('hello') + >>> buf.recv() + 'you said hello. ' + >>> buf.recv() + 'BYE' + >>> buf.recv() + '' + >>> buf.recv() + '' + + >>> PORT = setup_server_tcp(exit='clean') + >>> buf = connectTCP('127.0.0.1', PORT) + >>> buf.write('whoa') + >>> buf.recv(9) + 'you said ' + >>> buf.recv(999) + 'whoa. ' + >>> buf.recv(9) + 'BYE' + >>> buf.recv(1) + '' + + #>>> PORT = setup_server_tcp(exit='reset') + #>>> buf = connectTCP('127.0.0.1', PORT) + #>>> buf.write('whoa') + #>>> buf.recv() + #'you said whoa. ' + #>>> buf.recv() + #Traceback + # ConnectionLost + #>>> buf.recv() + #'BYE' + #'hello. ' + #>>> buf.read(9) + #'BYE' + #>>> buf.read(1) + """ + if self.channel is not None and not self.buf: + try: + recvd = self.channel.receive() + #print 'recvd: %r' % recvd + self.buf += recvd + except ConnectionDone: + self.channel = None + except Exception: + self.channel = None + # signal the error, but if buf had any data keep it + raise + if buflen is None: + result, self.buf = self.buf, '' + else: + result, self.buf = self.buf[:buflen], self.buf[buflen:] + return result + + # iterator protocol: + + def __iter__(self): + return self + + def next(self): + """ + >>> buf = connectTCP('127.0.0.1', setup_server_tcp(exit='clean')) + >>> buf.write('hello') + >>> for data in buf: + ... print `data` + 'you said hello. ' + 'BYE' + """ + res = self.recv() + if not res: + raise StopIteration + return res + +DEFAULT_BUFFER = buffer + + class LineOnlyReceiver(basic.LineOnlyReceiver): def lineReceived(self, line): @@ -55,6 +280,7 @@ class LineOnlyReceiver(basic.LineOnlyReceiver): def connectionLost(self, reason): self.channel.send_exception(reason.value) + class line_only_receiver(buffer_base): protocol_class = LineOnlyReceiver