415 lines
13 KiB
Python
415 lines
13 KiB
Python
"""Basic twisted protocols converted to synchronous mode"""
|
|
import sys
|
|
from twisted.internet.protocol import Protocol as twistedProtocol
|
|
from twisted.internet.error import ConnectionDone
|
|
from twisted.internet.protocol import Factory, ClientFactory
|
|
from twisted.internet import main
|
|
from twisted.python import failure
|
|
|
|
from eventlet import greenthread
|
|
from eventlet import getcurrent
|
|
from eventlet.event import Event as BaseEvent
|
|
from eventlet.queue import Queue
|
|
|
|
|
|
class ValueQueue(Queue):
|
|
"""Queue that keeps the last item forever in the queue if it's an exception.
|
|
Useful if you send an exception over queue only once, and once sent it must be always
|
|
available.
|
|
"""
|
|
|
|
def send(self, value=None, exc=None):
|
|
if exc is not None or not self.has_error():
|
|
Queue.send(self, value, exc)
|
|
|
|
def wait(self):
|
|
"""The difference from Queue.wait: if there is an only item in the
|
|
Queue and it is an exception, raise it, but keep it in the Queue, so
|
|
that future calls to wait() will raise it again.
|
|
"""
|
|
if self.has_error() and len(self.items)==1:
|
|
# the last item, which is an exception, raise without emptying the Queue
|
|
getcurrent().throw(*self.items[0][1])
|
|
else:
|
|
return Queue.wait(self)
|
|
|
|
def has_error(self):
|
|
return self.items and self.items[-1][1] is not None
|
|
|
|
|
|
class Event(BaseEvent):
|
|
|
|
def send(self, value, exc=None):
|
|
if self.ready():
|
|
self.reset()
|
|
return BaseEvent.send(self, value, exc)
|
|
|
|
def send_exception(self, *throw_args):
|
|
if self.ready():
|
|
self.reset()
|
|
return BaseEvent.send_exception(self, *throw_args)
|
|
|
|
class Producer2Event(object):
|
|
|
|
# implements IPullProducer
|
|
|
|
def __init__(self, event):
|
|
self.event = event
|
|
|
|
def resumeProducing(self):
|
|
self.event.send(1)
|
|
|
|
def stopProducing(self):
|
|
del self.event
|
|
|
|
|
|
class GreenTransportBase(object):
|
|
|
|
transportBufferSize = None
|
|
|
|
def __init__(self, transportBufferSize=None):
|
|
if transportBufferSize is not None:
|
|
self.transportBufferSize = transportBufferSize
|
|
self._queue = ValueQueue()
|
|
self._write_event = Event()
|
|
self._disconnected_event = Event()
|
|
|
|
def build_protocol(self):
|
|
return self.protocol_class(self)
|
|
|
|
def _got_transport(self, transport):
|
|
self._queue.send(transport)
|
|
|
|
def _got_data(self, data):
|
|
self._queue.send(data)
|
|
|
|
def _connectionLost(self, reason):
|
|
self._disconnected_event.send(reason.value)
|
|
self._queue.send_exception(reason.value)
|
|
self._write_event.send_exception(reason.value)
|
|
|
|
def _wait(self):
|
|
if self.disconnecting or self._disconnected_event.ready():
|
|
if self._queue:
|
|
return self._queue.wait()
|
|
else:
|
|
raise self._disconnected_event.wait()
|
|
self.resumeProducing()
|
|
try:
|
|
return self._queue.wait()
|
|
finally:
|
|
self.pauseProducing()
|
|
|
|
def write(self, data, wait=True):
|
|
if self._disconnected_event.ready():
|
|
raise self._disconnected_event.wait()
|
|
if wait:
|
|
self._write_event.reset()
|
|
self.transport.write(data)
|
|
self._write_event.wait()
|
|
else:
|
|
self.transport.write(data)
|
|
|
|
def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE), wait=True):
|
|
self.transport.unregisterProducer()
|
|
self.transport.loseConnection(connDone)
|
|
if wait:
|
|
self._disconnected_event.wait()
|
|
|
|
def __getattr__(self, item):
|
|
if item=='transport':
|
|
raise AttributeError(item)
|
|
if hasattr(self, 'transport'):
|
|
try:
|
|
return getattr(self.transport, item)
|
|
except AttributeError:
|
|
me = type(self).__name__
|
|
trans = type(self.transport).__name__
|
|
raise AttributeError("Neither %r nor %r has attribute %r" % (me, trans, item))
|
|
else:
|
|
raise AttributeError(item)
|
|
|
|
def resumeProducing(self):
|
|
self.paused -= 1
|
|
if self.paused==0:
|
|
self.transport.resumeProducing()
|
|
|
|
def pauseProducing(self):
|
|
self.paused += 1
|
|
if self.paused==1:
|
|
self.transport.pauseProducing()
|
|
|
|
def _init_transport_producer(self):
|
|
self.transport.pauseProducing()
|
|
self.paused = 1
|
|
|
|
def _init_transport(self):
|
|
transport = self._queue.wait()
|
|
self.transport = transport
|
|
if self.transportBufferSize is not None:
|
|
transport.bufferSize = self.transportBufferSize
|
|
self._init_transport_producer()
|
|
transport.registerProducer(Producer2Event(self._write_event), False)
|
|
|
|
|
|
class Protocol(twistedProtocol):
|
|
|
|
def __init__(self, recepient):
|
|
self._recepient = recepient
|
|
|
|
def connectionMade(self):
|
|
self._recepient._got_transport(self.transport)
|
|
|
|
def dataReceived(self, data):
|
|
self._recepient._got_data(data)
|
|
|
|
def connectionLost(self, reason):
|
|
self._recepient._connectionLost(reason)
|
|
|
|
|
|
class UnbufferedTransport(GreenTransportBase):
|
|
"""A very simple implementation of a green transport without an additional buffer"""
|
|
|
|
protocol_class = Protocol
|
|
|
|
def recv(self):
|
|
"""Receive a single chunk of undefined size.
|
|
|
|
Return '' if connection was closed cleanly, raise the exception if it was closed
|
|
in a non clean fashion. After that all successive calls return ''.
|
|
"""
|
|
if self._disconnected_event.ready():
|
|
return ''
|
|
try:
|
|
return self._wait()
|
|
except ConnectionDone:
|
|
return ''
|
|
|
|
def read(self):
|
|
"""Read the data from the socket until the connection is closed cleanly.
|
|
|
|
If connection was closed in a non-clean fashion, the appropriate exception
|
|
is raised. In that case already received data is lost.
|
|
Next time read() is called it returns ''.
|
|
"""
|
|
result = ''
|
|
while True:
|
|
recvd = self.recv()
|
|
if not recvd:
|
|
break
|
|
result += recvd
|
|
return result
|
|
|
|
# iterator protocol:
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
result = self.recv()
|
|
if not result:
|
|
raise StopIteration
|
|
return result
|
|
|
|
|
|
class GreenTransport(GreenTransportBase):
|
|
|
|
protocol_class = Protocol
|
|
_buffer = ''
|
|
_error = None
|
|
|
|
def read(self, size=-1):
|
|
"""Read size bytes or until EOF"""
|
|
if not self._disconnected_event.ready():
|
|
try:
|
|
while len(self._buffer) < size or size < 0:
|
|
self._buffer += self._wait()
|
|
except ConnectionDone:
|
|
pass
|
|
except:
|
|
if not self._disconnected_event.has_exception():
|
|
raise
|
|
if size>=0:
|
|
result, self._buffer = self._buffer[:size], self._buffer[size:]
|
|
else:
|
|
result, self._buffer = self._buffer, ''
|
|
if not result and self._disconnected_event.has_exception():
|
|
try:
|
|
self._disconnected_event.wait()
|
|
except ConnectionDone:
|
|
pass
|
|
return result
|
|
|
|
def recv(self, buflen=None):
|
|
"""Receive a single chunk of undefined size but no bigger than buflen"""
|
|
if not self._disconnected_event.ready():
|
|
self.resumeProducing()
|
|
try:
|
|
try:
|
|
recvd = self._wait()
|
|
#print 'received %r' % recvd
|
|
self._buffer += recvd
|
|
except ConnectionDone:
|
|
pass
|
|
except:
|
|
if not self._disconnected_event.has_exception():
|
|
raise
|
|
finally:
|
|
self.pauseProducing()
|
|
if buflen is None:
|
|
result, self._buffer = self._buffer, ''
|
|
else:
|
|
result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]
|
|
if not result and self._disconnected_event.has_exception():
|
|
try:
|
|
self._disconnected_event.wait()
|
|
except ConnectionDone:
|
|
pass
|
|
return result
|
|
|
|
# iterator protocol:
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
res = self.recv()
|
|
if not res:
|
|
raise StopIteration
|
|
return res
|
|
|
|
|
|
class GreenInstanceFactory(ClientFactory):
|
|
|
|
def __init__(self, instance, event):
|
|
self.instance = instance
|
|
self.event = event
|
|
|
|
def buildProtocol(self, addr):
|
|
return self.instance
|
|
|
|
def clientConnectionFailed(self, connector, reason):
|
|
self.event.send_exception(reason.type, reason.value, reason.tb)
|
|
|
|
|
|
class GreenClientCreator(object):
|
|
"""Connect to a remote host and return a connected green transport instance.
|
|
"""
|
|
|
|
gtransport_class = GreenTransport
|
|
|
|
def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs):
|
|
if reactor is None:
|
|
from twisted.internet import reactor
|
|
self.reactor = reactor
|
|
if gtransport_class is not None:
|
|
self.gtransport_class = gtransport_class
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
|
|
def _make_transport_and_factory(self):
|
|
gtransport = self.gtransport_class(*self.args, **self.kwargs)
|
|
protocol = gtransport.build_protocol()
|
|
factory = GreenInstanceFactory(protocol, gtransport._queue)
|
|
return gtransport, factory
|
|
|
|
def connectTCP(self, host, port, *args, **kwargs):
|
|
gtransport, factory = self._make_transport_and_factory()
|
|
self.reactor.connectTCP(host, port, factory, *args, **kwargs)
|
|
gtransport._init_transport()
|
|
return gtransport
|
|
|
|
def connectSSL(self, host, port, *args, **kwargs):
|
|
gtransport, factory = self._make_transport_and_factory()
|
|
self.reactor.connectSSL(host, port, factory, *args, **kwargs)
|
|
gtransport._init_transport()
|
|
return gtransport
|
|
|
|
def connectTLS(self, host, port, *args, **kwargs):
|
|
gtransport, factory = self._make_transport_and_factory()
|
|
self.reactor.connectTLS(host, port, factory, *args, **kwargs)
|
|
gtransport._init_transport()
|
|
return gtransport
|
|
|
|
def connectUNIX(self, address, *args, **kwargs):
|
|
gtransport, factory = self._make_transport_and_factory()
|
|
self.reactor.connectUNIX(address, factory, *args, **kwargs)
|
|
gtransport._init_transport()
|
|
return gtransport
|
|
|
|
def connectSRV(self, service, domain, *args, **kwargs):
|
|
SRVConnector = kwargs.pop('ConnectorClass', None)
|
|
if SRVConnector is None:
|
|
from twisted.names.srvconnect import SRVConnector
|
|
gtransport, factory = self._make_transport_and_factory()
|
|
c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)
|
|
c.connect()
|
|
gtransport._init_transport()
|
|
return gtransport
|
|
|
|
|
|
class SimpleSpawnFactory(Factory):
|
|
"""Factory that spawns a new greenlet for each incoming connection.
|
|
|
|
For an incoming connection a new greenlet is created using the provided
|
|
callback as a function and a connected green transport instance as an
|
|
argument.
|
|
"""
|
|
|
|
gtransport_class = GreenTransport
|
|
|
|
def __init__(self, handler, gtransport_class=None, *args, **kwargs):
|
|
if callable(handler):
|
|
self.handler = handler
|
|
else:
|
|
self.handler = handler.send
|
|
if hasattr(handler, 'send_exception'):
|
|
self.exc_handler = handler.send_exception
|
|
if gtransport_class is not None:
|
|
self.gtransport_class = gtransport_class
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
|
|
def exc_handler(self, *args):
|
|
pass
|
|
|
|
def buildProtocol(self, addr):
|
|
gtransport = self.gtransport_class(*self.args, **self.kwargs)
|
|
protocol = gtransport.build_protocol()
|
|
protocol.factory = self
|
|
self._do_spawn(gtransport, protocol)
|
|
return protocol
|
|
|
|
def _do_spawn(self, gtransport, protocol):
|
|
greenthread.spawn(self._run_handler, gtransport, protocol)
|
|
|
|
def _run_handler(self, gtransport, protocol):
|
|
try:
|
|
gtransport._init_transport()
|
|
except Exception:
|
|
self.exc_handler(*sys.exc_info())
|
|
else:
|
|
self.handler(gtransport)
|
|
|
|
|
|
class SpawnFactory(SimpleSpawnFactory):
|
|
"""An extension to SimpleSpawnFactory that provides some control over
|
|
the greenlets it has spawned.
|
|
"""
|
|
|
|
def __init__(self, handler, gtransport_class=None, *args, **kwargs):
|
|
self.greenlets = set()
|
|
SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)
|
|
|
|
def _do_spawn(self, gtransport, protocol):
|
|
g = greenthread.spawn(self._run_handler, gtransport, protocol)
|
|
self.greenlets.add(g)
|
|
g.link(lambda *_: self.greenlets.remove(g))
|
|
|
|
def waitall(self):
|
|
results = []
|
|
for g in self.greenlets:
|
|
results.append(g.wait())
|
|
return results
|
|
|