hubs: drop Twisted support

This commit is contained in:
Sergey Shepelev
2014-07-03 12:36:05 +04:00
parent 4a6c7c6e26
commit 3bd19032e3
23 changed files with 22 additions and 1555 deletions

View File

@@ -1,4 +1,4 @@
recursive-include tests *.py *.crt *.key
recursive-include doc *.rst *.txt *.py Makefile *.png
recursive-include examples *.py *.html
include MANIFEST.in README.twisted NEWS AUTHORS LICENSE README.rst
include MANIFEST.in NEWS AUTHORS LICENSE README.rst

1
NEWS
View File

@@ -1,6 +1,7 @@
0.16.0 (not released yet)
=========================
* hubs: drop Twisted support
* removed deprecated modules: api, most of coros, pool, proc, processes and util
* improved Python 3 compatibility (including patch by raylu)

View File

@@ -2,8 +2,8 @@ Eventlet is a concurrent networking library for Python that allows you to change
It uses epoll or libevent for highly scalable non-blocking I/O. Coroutines ensure that the developer uses a blocking style of programming that is similar to threading, but provide the benefits of non-blocking I/O. The event dispatch is implicit, which means you can easily use Eventlet from the Python interpreter, or as a small part of a larger application.
It's easy to get started using Eventlet, and easy to convert existing
applications to use it. Start off by looking at the `examples`_,
It's easy to get started using Eventlet, and easy to convert existing
applications to use it. Start off by looking at the `examples`_,
`common design patterns`_, and the list of `basic API primitives`_.
.. _examples: http://eventlet.net/doc/examples.html
@@ -17,7 +17,7 @@ Quick Example
Here's something you can try right on the command line::
% python
>>> import eventlet
>>> import eventlet
>>> from eventlet.green import urllib2
>>> gt = eventlet.spawn(urllib2.urlopen, 'http://eventlet.net')
>>> gt2 = eventlet.spawn(urllib2.urlopen, 'http://secondlife.com')
@@ -48,3 +48,17 @@ To build a complete set of HTML documentation, you must have Sphinx, which can b
make html
The built html files can be found in doc/_build/html afterward.
Twisted
=======
Eventlet had Twisted hub in the past, but community interest to this integration has dropped over time,
now it is not supported, so with apologies for any inconvenience we discontinue Twisted integration.
If you have a project that uses Eventlet with Twisted, your options are:
* use last working release eventlet==0.14
* start a new project with only Twisted hub code, identify and fix problems. As of eventlet 0.13,
`EVENTLET_HUB` environment variable can point to external modules.
* fork Eventlet, revert Twisted removal, identify and fix problems. This work may be merged back into main project.

View File

@@ -1,181 +0,0 @@
--work in progress--
Introduction
------------
Twisted provides solid foundation for asynchronous programming in Python.
Eventlet makes asynchronous programming look like synchronous, thus
achieving higher signal-to-noise ratio than traditional twisted programs have.
Eventlet on top of twisted provides:
* stable twisted
* usable and readable synchronous style
* existing twisted code can be used without any changes
* existing blocking code can be used after trivial changes applied
NOTE: the maintainer of Eventlet's Twisted support no longer supports it; it still exists but may have had some breakage along the way. Please treat it as experimental, and if you'd like to maintain it, please do!
Eventlet features:
* utilities for spawning and controlling greenlet execution:
api.spawn, api.kill, proc module
* utilities for communicating between greenlets:
event.Event, queue.Queue, semaphore.Semaphore
* standard Python modules that won't block the reactor:
eventlet.green package
* utilities specific to twisted hub:
eventlet.twistedutil package
Getting started with eventlet on twisted
----------------------------------------
This section will only mention stuff that may be useful but it
won't explain in details how to use it. For that, refer to the
docstrings of the modules and the examples.
There are 2 ways of using twisted with eventlet, one that is
familiar to twisted developers and another that is familiar
to eventlet developers:
1. explicitly start the main loop in the main greenlet;
2. implicitly start the main loop in a dedicated greenlet.
To enable (1), add this line at the top of your program:
from eventlet.twistedutil import join_reactor
then start the reactor as you would do in a regular twisted application.
For (2) just make sure that you have reactor installed before using
any of eventlet functions. Otherwise a non-twisted hub will be selected
and twisted code won't work.
Most of examples/twisted_* use twisted style with the exception of
twisted_client.py and twisted_srvconnector.py. All of the non-twisted
examples in examples directory use eventlet-style (they work with any
of eventlet's hubs, not just twisted-based).
Eventlet implements "blocking" operations by switching to the main loop
greenlet, thus it's impossible to call a blocking function when you are
already in the main loop. Therefore one must be cautious in a twisted
callback, calling only a non-blocking subset of eventlet API here. The
following functions won't unschedule the current greenlet and are safe
to call from anywhere:
1. Greenlet creation functions: api.spawn, proc.spawn,
twistedutil.deferToGreenThread and others based on api.spawn.
2. send(), send_exception(), poll(), ready() methods of event.Event
and queue.Queue.
3. wait(timeout=0) is identical to poll(). Currently only Proc.wait
supports timeout parameter.
4. Proc.link/link_value/link_exception
Other classes that use these names should follow the convention.
For an example on how to take advantage of eventlet in a twisted
application using deferToGreenThread see examples/twisted_http_proxy.py
Although eventlet provides eventlet.green.socket module that implements
interface of the standard Python socket, there's also a way to use twisted's
network code in a synchronous fashion via GreenTransport class.
A GreenTransport interface is reminiscent of socket but it's not a drop-in
replacement. It combines features of TCPTransport and Protocol in a single
object:
* all of transport methods (like getPeer()) are available directly on
a GreenTransport instance; in addition, underlying transport object
is available via 'transport' attribute;
* write method is overriden: it may block if transport write buffer is full;
* read() and recv() methods are provided to retrieve the data from protocol
synchronously.
To make a GreenTransport instance use twistedutil.protocol.GreenClientCreator
(usage is similar to that of twisted.internet.protocol.ClientCreator)
For an example on how to get a connected GreenTransport instance,
see twisted_client.py, twisted_srvconnect.py or twisted_portforward.py.
For an example on how to use GreenTransport for incoming connections,
see twisted_server.py, twisted_portforward.py.
also
* twistedutil.block_on - wait for a deferred to fire
block_on(reactor.callInThread(func, args))
* twistedutil.protocol.basic.LineOnlyReceiverTransport - a green transport
variant built on top of LineOnlyReceiver protocol. Demonstrates how
to convert a protocol to a synchronous mode.
Coroutines
----------
To understand how eventlet works, one has to understand how to use greenlet:
http://codespeak.net/py/dist/greenlet.html
Essential points
* There always exists MAIN greenlet
* Every greenlet except MAIN has a parent. MAIN therefore could be detected as g.parent is None
* When greenlet is finished it's return value is propagated to the parent (i.e. switch() call
in the parent greenlet returns it)
* When an exception leaves a greelen, it's propagated to the parent (i.e. switch() in the parent
re-raises it) unless it's a subclass of GreenletExit, which is returned as a value.
* parent can be reassigned (by simply setting 'parent' attribute). A cycle would be detected and
rejected with ValueError
Note, that there's no scheduler of any sort; if a coroutine wants to be
scheduled again it must take care of it itself. As an application developer,
however, you don't need to worry about it as that's what eventlet does behind
the scenes. The cost of that is that you should not use greenlet's switch() and
throw() methods, they will likely leave the current greenlet unscheduled
forever. Eventlet also takes advantage of greenlet's `parent' attribute,
so you should not meddle with it either.
How does eventlet work
----------------------
Twisted's reactor and eventlet's hub are very similar in what they do.
Both continuously perform polling on the list of registered descriptors
and each time a specific event is fired, the associated callback function
is called. In addition, both maintain a list of scheduled calls.
Polling is performed by the main loop - a function that both reactor and hub have.
When twisted calls user's callback it's expected to return almost immediately,
without any blocking I/O calls.
Eventlet runs the main loop in a dedicated greenlet (MAIN_LOOP). It is the same
greenlet as MAIN if you use join_reactor. Otherwise it's a separate greenlet
started implicitly. The execution is organized in a such way that the switching
always involves MAIN_LOOP. All of functions in eventlet that appear "blocking"
use the following algorithm:
1. register a callback that switches back to the current greenlet when
an event of interest happens
2. switch to the MAIN_LOOP
For example, here's what eventlet's socket recv() does:
= blocking operation RECV on socket d =
user's greenlet (USER) main loop's greenlet (MAIN_LOOP)
|
(inside d.recv() call)
|
add_descriptor(d, RECV)
|
data=MAIN_LOOP.switch() ---------> poll for events
^---------------------\ |
| ... ---------------------------> may execute other greenlets here
| |
| event RECV on descriptor d?
| |
| d.remove_descriptor(d, RECV)
| |
| data = d.recv() # calling blocking op that will return immediately
| |
\--------- USER.switch(data) # argument data here becomes return value in user's switch
return data

View File

@@ -21,7 +21,7 @@ Lastly, you can just use nose directly if you want:
That's it! The output from running nose is the same as unittest's output, if the entire directory was one big test file.
Many tests are skipped based on environmental factors; for example, it makes no sense to test Twisted-specific functionality when Twisted is not installed. These are printed as S's during execution, and in the summary printed after the tests run it will tell you how many were skipped.
Many tests are skipped based on environmental factors; for example, it makes no sense to test kqueue-specific functionality when your OS does not support it. These are printed as S's during execution, and in the summary printed after the tests run it will tell you how many were skipped.
Doctests
--------

View File

@@ -1,270 +0,0 @@
import sys
import threading
from twisted.internet.base import DelayedCall as TwistedDelayedCall
from eventlet.support import greenlets as greenlet
from eventlet.hubs.hub import FdListener, READ, WRITE
class DelayedCall(TwistedDelayedCall):
"fix DelayedCall to behave like eventlet's Timer in some respects"
def cancel(self):
if self.cancelled or self.called:
self.cancelled = True
return
return TwistedDelayedCall.cancel(self)
class LocalDelayedCall(DelayedCall):
def __init__(self, *args, **kwargs):
self.greenlet = greenlet.getcurrent()
DelayedCall.__init__(self, *args, **kwargs)
def _get_cancelled(self):
if self.greenlet is None or self.greenlet.dead:
return True
return self.__dict__['cancelled']
def _set_cancelled(self, value):
self.__dict__['cancelled'] = value
cancelled = property(_get_cancelled, _set_cancelled)
def callLater(DelayedCallClass, reactor, _seconds, _f, *args, **kw):
# the same as original but creates fixed DelayedCall instance
assert callable(_f), "%s is not callable" % _f
if not isinstance(_seconds, (int, long, float)):
raise TypeError("Seconds must be int, long, or float, was " + type(_seconds))
assert sys.maxint >= _seconds >= 0, \
"%s is not greater than or equal to 0 seconds" % (_seconds,)
tple = DelayedCallClass(reactor.seconds() + _seconds, _f, args, kw,
reactor._cancelCallLater,
reactor._moveCallLaterSooner,
seconds=reactor.seconds)
reactor._newTimedCalls.append(tple)
return tple
class socket_rwdescriptor(FdListener):
# implements(IReadWriteDescriptor)
def __init__(self, evtype, fileno, cb):
super(socket_rwdescriptor, self).__init__(evtype, fileno, cb)
if not isinstance(fileno, (int, long)):
raise TypeError("Expected int or long, got %s" % type(fileno))
# Twisted expects fileno to be a callable, not an attribute
def _fileno():
return fileno
self.fileno = _fileno
# required by glib2reactor
disconnected = False
def doRead(self):
if self.evtype is READ:
self.cb(self)
def doWrite(self):
if self.evtype == WRITE:
self.cb(self)
def connectionLost(self, reason):
self.disconnected = True
if self.cb:
self.cb(reason)
# trampoline() will now switch into the greenlet that owns the socket
# leaving the mainloop unscheduled. However, when the next switch
# to the mainloop occurs, twisted will not re-evaluate the delayed calls
# because it assumes that none were scheduled since no client code was executed
# (it has no idea it was switched away). So, we restart the mainloop.
# XXX this is not enough, pollreactor prints the traceback for
# this and epollreactor times out. see test__hub.TestCloseSocketWhilePolling
raise greenlet.GreenletExit
logstr = "twistedr"
def logPrefix(self):
return self.logstr
class BaseTwistedHub(object):
"""This hub does not run a dedicated greenlet for the mainloop (unlike TwistedHub).
Instead, it assumes that the mainloop is run in the main greenlet.
This makes running "green" functions in the main greenlet impossible but is useful
when you want to call reactor.run() yourself.
"""
# XXX: remove me from here. make functions that depend on reactor
# XXX: hub's methods
uses_twisted_reactor = True
WRITE = WRITE
READ = READ
def __init__(self, mainloop_greenlet):
self.greenlet = mainloop_greenlet
def switch(self):
assert greenlet.getcurrent() is not self.greenlet, \
"Cannot switch from MAINLOOP to MAINLOOP"
try:
greenlet.getcurrent().parent = self.greenlet
except ValueError:
pass
return self.greenlet.switch()
def stop(self):
from twisted.internet import reactor
reactor.stop()
def add(self, evtype, fileno, cb):
from twisted.internet import reactor
descriptor = socket_rwdescriptor(evtype, fileno, cb)
if evtype is READ:
reactor.addReader(descriptor)
if evtype is WRITE:
reactor.addWriter(descriptor)
return descriptor
def remove(self, descriptor):
from twisted.internet import reactor
reactor.removeReader(descriptor)
reactor.removeWriter(descriptor)
def schedule_call_local(self, seconds, func, *args, **kwargs):
from twisted.internet import reactor
def call_if_greenlet_alive(*args1, **kwargs1):
if timer.greenlet.dead:
return
return func(*args1, **kwargs1)
timer = callLater(LocalDelayedCall, reactor, seconds,
call_if_greenlet_alive, *args, **kwargs)
return timer
schedule_call = schedule_call_local
def schedule_call_global(self, seconds, func, *args, **kwargs):
from twisted.internet import reactor
return callLater(DelayedCall, reactor, seconds, func, *args, **kwargs)
def abort(self):
from twisted.internet import reactor
reactor.crash()
@property
def running(self):
from twisted.internet import reactor
return reactor.running
# for debugging:
def get_readers(self):
from twisted.internet import reactor
readers = reactor.getReaders()
readers.remove(getattr(reactor, 'waker'))
return readers
def get_writers(self):
from twisted.internet import reactor
return reactor.getWriters()
def get_timers_count(self):
from twisted.internet import reactor
return len(reactor.getDelayedCalls())
class TwistedHub(BaseTwistedHub):
# wrapper around reactor that runs reactor's main loop in a separate greenlet.
# whenever you need to wait, i.e. inside a call that must appear
# blocking, call hub.switch() (then your blocking operation should switch back to you
# upon completion)
# unlike other eventlet hubs, which are created per-thread,
# this one cannot be instantiated more than once, because
# twisted doesn't allow that
# 0-not created
# 1-initialized but not started
# 2-started
# 3-restarted
state = 0
installSignalHandlers = False
def __init__(self):
assert Hub.state == 0, ('%s hub can only be instantiated once' % type(self).__name__,
Hub.state)
Hub.state = 1
make_twisted_threadpool_daemonic() # otherwise the program
# would hang after the main
# greenlet exited
g = greenlet.greenlet(self.run)
BaseTwistedHub.__init__(self, g)
def switch(self):
assert greenlet.getcurrent() is not self.greenlet, \
"Cannot switch from MAINLOOP to MAINLOOP"
if self.greenlet.dead:
self.greenlet = greenlet.greenlet(self.run)
try:
greenlet.getcurrent().parent = self.greenlet
except ValueError:
pass
return self.greenlet.switch()
def run(self, installSignalHandlers=None):
if installSignalHandlers is None:
installSignalHandlers = self.installSignalHandlers
# main loop, executed in a dedicated greenlet
from twisted.internet import reactor
assert Hub.state in [1, 3], ('run function is not reentrant', Hub.state)
if Hub.state == 1:
reactor.startRunning(installSignalHandlers=installSignalHandlers)
elif not reactor.running:
# if we're here, then reactor was explicitly stopped with reactor.stop()
# restarting reactor (like we would do after an exception) in this case
# is not an option.
raise AssertionError("reactor is not running")
try:
self.mainLoop(reactor)
except:
# an exception in the mainLoop is a normal operation (e.g. user's
# signal handler could raise an exception). In this case we will re-enter
# the main loop at the next switch.
Hub.state = 3
raise
# clean exit here is needed for abort() method to work
# do not raise an exception here.
def mainLoop(self, reactor):
Hub.state = 2
# Unlike reactor's mainLoop, this function does not catch exceptions.
# Anything raised goes into the main greenlet (because it is always the
# parent of this one)
while reactor.running:
# Advance simulation time in delayed event processors.
reactor.runUntilCurrent()
t2 = reactor.timeout()
t = reactor.running and t2
reactor.doIteration(t)
Hub = TwistedHub
class DaemonicThread(threading.Thread):
def _set_daemon(self):
return True
def make_twisted_threadpool_daemonic():
from twisted.python.threadpool import ThreadPool
if ThreadPool.threadFactory != DaemonicThread:
ThreadPool.threadFactory = DaemonicThread

View File

@@ -1,81 +0,0 @@
from eventlet.hubs import get_hub
from eventlet import spawn, getcurrent
def block_on(deferred):
cur = [getcurrent()]
synchronous = []
def cb(value):
if cur:
if getcurrent() is cur[0]:
synchronous.append((value, None))
else:
cur[0].switch(value)
return value
def eb(fail):
if cur:
if getcurrent() is cur[0]:
synchronous.append((None, fail))
else:
fail.throwExceptionIntoGenerator(cur[0])
deferred.addCallbacks(cb, eb)
if synchronous:
result, fail = synchronous[0]
if fail is not None:
fail.raiseException()
return result
try:
return get_hub().switch()
finally:
del cur[0]
def _putResultInDeferred(deferred, f, args, kwargs):
try:
result = f(*args, **kwargs)
except:
from twisted.python import failure
f = failure.Failure()
deferred.errback(f)
else:
deferred.callback(result)
def deferToGreenThread(func, *args, **kwargs):
from twisted.internet import defer
d = defer.Deferred()
spawn(_putResultInDeferred, d, func, args, kwargs)
return d
def callInGreenThread(func, *args, **kwargs):
return spawn(func, *args, **kwargs)
if __name__ == '__main__':
import sys
try:
num = int(sys.argv[1])
except:
sys.exit('Supply number of test as an argument, 0, 1, 2 or 3')
from twisted.internet import reactor
def test():
print(block_on(reactor.resolver.getHostByName('www.google.com')))
print(block_on(reactor.resolver.getHostByName('###')))
if num == 0:
test()
elif num == 1:
spawn(test)
from eventlet import sleep
print('sleeping..')
sleep(5)
print('done sleeping..')
elif num == 2:
from eventlet.twistedutil import join_reactor
spawn(test)
reactor.run()
elif num == 3:
from eventlet.twistedutil import join_reactor
print("fails because it's impossible to use block_on from the mainloop")
reactor.callLater(0, test)
reactor.run()

View File

@@ -1,12 +0,0 @@
"""Integrate eventlet with twisted's reactor mainloop.
You generally don't have to use it unless you need to call reactor.run()
yourself.
"""
from eventlet.hubs.twistedr import BaseTwistedHub
from eventlet.support import greenlets as greenlet
from eventlet.hubs import _threadlocal, use_hub
use_hub(BaseTwistedHub)
assert not hasattr(_threadlocal, 'hub')
hub = _threadlocal.hub = _threadlocal.Hub(greenlet.getcurrent())

View File

@@ -1,414 +0,0 @@
"""Basic twisted protocols converted to synchronous mode"""
import sys
from twisted.internet.protocol import Protocol as twistedProtocol
from twisted.internet.error import ConnectionDone
from twisted.internet.protocol import Factory, ClientFactory
from twisted.internet import main
from twisted.python import failure
from eventlet import greenthread
from eventlet import getcurrent
from eventlet.event import Event as BaseEvent
from eventlet.queue import Queue
class ValueQueue(Queue):
"""Queue that keeps the last item forever in the queue if it's an exception.
Useful if you send an exception over queue only once, and once sent it must be always
available.
"""
def send(self, value=None, exc=None):
if exc is not None or not self.has_error():
Queue.send(self, value, exc)
def wait(self):
"""The difference from Queue.wait: if there is an only item in the
Queue and it is an exception, raise it, but keep it in the Queue, so
that future calls to wait() will raise it again.
"""
if self.has_error() and len(self.items) == 1:
# the last item, which is an exception, raise without emptying the Queue
getcurrent().throw(*self.items[0][1])
else:
return Queue.wait(self)
def has_error(self):
return self.items and self.items[-1][1] is not None
class Event(BaseEvent):
def send(self, value, exc=None):
if self.ready():
self.reset()
return BaseEvent.send(self, value, exc)
def send_exception(self, *throw_args):
if self.ready():
self.reset()
return BaseEvent.send_exception(self, *throw_args)
class Producer2Event(object):
# implements IPullProducer
def __init__(self, event):
self.event = event
def resumeProducing(self):
self.event.send(1)
def stopProducing(self):
del self.event
class GreenTransportBase(object):
transportBufferSize = None
def __init__(self, transportBufferSize=None):
if transportBufferSize is not None:
self.transportBufferSize = transportBufferSize
self._queue = ValueQueue()
self._write_event = Event()
self._disconnected_event = Event()
def build_protocol(self):
return self.protocol_class(self)
def _got_transport(self, transport):
self._queue.send(transport)
def _got_data(self, data):
self._queue.send(data)
def _connectionLost(self, reason):
self._disconnected_event.send(reason.value)
self._queue.send_exception(reason.value)
self._write_event.send_exception(reason.value)
def _wait(self):
if self.disconnecting or self._disconnected_event.ready():
if self._queue:
return self._queue.wait()
else:
raise self._disconnected_event.wait()
self.resumeProducing()
try:
return self._queue.wait()
finally:
self.pauseProducing()
def write(self, data, wait=True):
if self._disconnected_event.ready():
raise self._disconnected_event.wait()
if wait:
self._write_event.reset()
self.transport.write(data)
self._write_event.wait()
else:
self.transport.write(data)
def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE), wait=True):
self.transport.unregisterProducer()
self.transport.loseConnection(connDone)
if wait:
self._disconnected_event.wait()
def __getattr__(self, item):
if item == 'transport':
raise AttributeError(item)
if hasattr(self, 'transport'):
try:
return getattr(self.transport, item)
except AttributeError:
me = type(self).__name__
trans = type(self.transport).__name__
raise AttributeError("Neither %r nor %r has attribute %r" % (me, trans, item))
else:
raise AttributeError(item)
def resumeProducing(self):
self.paused -= 1
if self.paused == 0:
self.transport.resumeProducing()
def pauseProducing(self):
self.paused += 1
if self.paused == 1:
self.transport.pauseProducing()
def _init_transport_producer(self):
self.transport.pauseProducing()
self.paused = 1
def _init_transport(self):
transport = self._queue.wait()
self.transport = transport
if self.transportBufferSize is not None:
transport.bufferSize = self.transportBufferSize
self._init_transport_producer()
transport.registerProducer(Producer2Event(self._write_event), False)
class Protocol(twistedProtocol):
def __init__(self, recepient):
self._recepient = recepient
def connectionMade(self):
self._recepient._got_transport(self.transport)
def dataReceived(self, data):
self._recepient._got_data(data)
def connectionLost(self, reason):
self._recepient._connectionLost(reason)
class UnbufferedTransport(GreenTransportBase):
"""A very simple implementation of a green transport without an additional buffer"""
protocol_class = Protocol
def recv(self):
"""Receive a single chunk of undefined size.
Return '' if connection was closed cleanly, raise the exception if it was closed
in a non clean fashion. After that all successive calls return ''.
"""
if self._disconnected_event.ready():
return ''
try:
return self._wait()
except ConnectionDone:
return ''
def read(self):
"""Read the data from the socket until the connection is closed cleanly.
If connection was closed in a non-clean fashion, the appropriate exception
is raised. In that case already received data is lost.
Next time read() is called it returns ''.
"""
result = ''
while True:
recvd = self.recv()
if not recvd:
break
result += recvd
return result
# iterator protocol:
def __iter__(self):
return self
def next(self):
result = self.recv()
if not result:
raise StopIteration
return result
class GreenTransport(GreenTransportBase):
protocol_class = Protocol
_buffer = ''
_error = None
def read(self, size=-1):
"""Read size bytes or until EOF"""
if not self._disconnected_event.ready():
try:
while len(self._buffer) < size or size < 0:
self._buffer += self._wait()
except ConnectionDone:
pass
except:
if not self._disconnected_event.has_exception():
raise
if size >= 0:
result, self._buffer = self._buffer[:size], self._buffer[size:]
else:
result, self._buffer = self._buffer, ''
if not result and self._disconnected_event.has_exception():
try:
self._disconnected_event.wait()
except ConnectionDone:
pass
return result
def recv(self, buflen=None):
"""Receive a single chunk of undefined size but no bigger than buflen"""
if not self._disconnected_event.ready():
self.resumeProducing()
try:
try:
recvd = self._wait()
# print 'received %r' % recvd
self._buffer += recvd
except ConnectionDone:
pass
except:
if not self._disconnected_event.has_exception():
raise
finally:
self.pauseProducing()
if buflen is None:
result, self._buffer = self._buffer, ''
else:
result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]
if not result and self._disconnected_event.has_exception():
try:
self._disconnected_event.wait()
except ConnectionDone:
pass
return result
# iterator protocol:
def __iter__(self):
return self
def next(self):
res = self.recv()
if not res:
raise StopIteration
return res
class GreenInstanceFactory(ClientFactory):
def __init__(self, instance, event):
self.instance = instance
self.event = event
def buildProtocol(self, addr):
return self.instance
def clientConnectionFailed(self, connector, reason):
self.event.send_exception(reason.type, reason.value, reason.tb)
class GreenClientCreator(object):
"""Connect to a remote host and return a connected green transport instance.
"""
gtransport_class = GreenTransport
def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs):
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
if gtransport_class is not None:
self.gtransport_class = gtransport_class
self.args = args
self.kwargs = kwargs
def _make_transport_and_factory(self):
gtransport = self.gtransport_class(*self.args, **self.kwargs)
protocol = gtransport.build_protocol()
factory = GreenInstanceFactory(protocol, gtransport._queue)
return gtransport, factory
def connectTCP(self, host, port, *args, **kwargs):
gtransport, factory = self._make_transport_and_factory()
self.reactor.connectTCP(host, port, factory, *args, **kwargs)
gtransport._init_transport()
return gtransport
def connectSSL(self, host, port, *args, **kwargs):
gtransport, factory = self._make_transport_and_factory()
self.reactor.connectSSL(host, port, factory, *args, **kwargs)
gtransport._init_transport()
return gtransport
def connectTLS(self, host, port, *args, **kwargs):
gtransport, factory = self._make_transport_and_factory()
self.reactor.connectTLS(host, port, factory, *args, **kwargs)
gtransport._init_transport()
return gtransport
def connectUNIX(self, address, *args, **kwargs):
gtransport, factory = self._make_transport_and_factory()
self.reactor.connectUNIX(address, factory, *args, **kwargs)
gtransport._init_transport()
return gtransport
def connectSRV(self, service, domain, *args, **kwargs):
SRVConnector = kwargs.pop('ConnectorClass', None)
if SRVConnector is None:
from twisted.names.srvconnect import SRVConnector
gtransport, factory = self._make_transport_and_factory()
c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)
c.connect()
gtransport._init_transport()
return gtransport
class SimpleSpawnFactory(Factory):
"""Factory that spawns a new greenlet for each incoming connection.
For an incoming connection a new greenlet is created using the provided
callback as a function and a connected green transport instance as an
argument.
"""
gtransport_class = GreenTransport
def __init__(self, handler, gtransport_class=None, *args, **kwargs):
if callable(handler):
self.handler = handler
else:
self.handler = handler.send
if hasattr(handler, 'send_exception'):
self.exc_handler = handler.send_exception
if gtransport_class is not None:
self.gtransport_class = gtransport_class
self.args = args
self.kwargs = kwargs
def exc_handler(self, *args):
pass
def buildProtocol(self, addr):
gtransport = self.gtransport_class(*self.args, **self.kwargs)
protocol = gtransport.build_protocol()
protocol.factory = self
self._do_spawn(gtransport, protocol)
return protocol
def _do_spawn(self, gtransport, protocol):
greenthread.spawn(self._run_handler, gtransport, protocol)
def _run_handler(self, gtransport, protocol):
try:
gtransport._init_transport()
except Exception:
self.exc_handler(*sys.exc_info())
else:
self.handler(gtransport)
class SpawnFactory(SimpleSpawnFactory):
"""An extension to SimpleSpawnFactory that provides some control over
the greenlets it has spawned.
"""
def __init__(self, handler, gtransport_class=None, *args, **kwargs):
self.greenlets = set()
SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)
def _do_spawn(self, gtransport, protocol):
g = greenthread.spawn(self._run_handler, gtransport, protocol)
self.greenlets.add(g)
g.link(lambda *_: self.greenlets.remove(g))
def waitall(self):
results = []
for g in self.greenlets:
results.append(g.wait())
return results

View File

@@ -1,40 +0,0 @@
from twisted.protocols import basic
from twisted.internet.error import ConnectionDone
from eventlet.twistedutil.protocol import GreenTransportBase
class LineOnlyReceiver(basic.LineOnlyReceiver):
def __init__(self, recepient):
self._recepient = recepient
def connectionMade(self):
self._recepient._got_transport(self.transport)
def connectionLost(self, reason):
self._recepient._connectionLost(reason)
def lineReceived(self, line):
self._recepient._got_data(line)
class LineOnlyReceiverTransport(GreenTransportBase):
protocol_class = LineOnlyReceiver
def readline(self):
return self._wait()
def sendline(self, line):
self.protocol.sendLine(line)
# iterator protocol:
def __iter__(self):
return self
def next(self):
try:
return self.readline()
except ConnectionDone:
raise StopIteration

View File

@@ -1 +0,0 @@
These are some examples demonstrating the use of the Twisted support in Eventlet. The maintainer of these examples, and the module, has departed, so they may or may not still work. If you find these useful, we're looking for a maintainer! :-)

View File

@@ -1,26 +0,0 @@
"""Example for GreenTransport and GreenClientCreator.
In this example reactor is started implicitly upon the first
use of a blocking function.
"""
from twisted.internet import ssl
from twisted.internet.error import ConnectionClosed
from eventlet.twistedutil.protocol import GreenClientCreator
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
from twisted.internet import reactor
# read from TCP connection
conn = GreenClientCreator(reactor).connectTCP('www.google.com', 80)
conn.write('GET / HTTP/1.0\r\n\r\n')
conn.loseWriteConnection()
print(conn.read())
# read from SSL connection line by line
conn = GreenClientCreator(reactor, LineOnlyReceiverTransport).connectSSL(
'sf.net', 443, ssl.ClientContextFactory())
conn.write('GET / HTTP/1.0\r\n\r\n')
try:
for num, line in enumerate(conn):
print('%3s %r' % (num, line))
except ConnectionClosed as ex:
print(ex)

View File

@@ -1,73 +0,0 @@
"""Listen on port 8888 and pretend to be an HTTP proxy.
It even works for some pages.
Demonstrates how to
* plug in eventlet into a twisted application (join_reactor)
* call green functions from places where blocking calls
are not allowed (deferToGreenThread)
* use eventlet.green package which provides [some of] the
standard library modules that don't block other greenlets.
"""
import re
from twisted.internet.protocol import Factory
from twisted.internet import reactor
from twisted.protocols import basic
from eventlet.twistedutil import deferToGreenThread
from eventlet.twistedutil import join_reactor
from eventlet.green import httplib
class LineOnlyReceiver(basic.LineOnlyReceiver):
def connectionMade(self):
self.lines = []
def lineReceived(self, line):
if line:
self.lines.append(line)
elif self.lines:
self.requestReceived(self.lines)
self.lines = []
def requestReceived(self, lines):
request = re.match('^(\w+) http://(.*?)(/.*?) HTTP/1..$', lines[0])
# print request.groups()
method, host, path = request.groups()
headers = dict(x.split(': ', 1) for x in lines[1:])
def callback(result):
self.transport.write(str(result))
self.transport.loseConnection()
def errback(err):
err.printTraceback()
self.transport.loseConnection()
d = deferToGreenThread(http_request, method, host, path, headers=headers)
d.addCallbacks(callback, errback)
def http_request(method, host, path, headers):
conn = httplib.HTTPConnection(host)
conn.request(method, path, headers=headers)
response = conn.getresponse()
body = response.read()
print(method, host, path, response.status, response.reason, len(body))
return format_response(response, body)
def format_response(response, body):
result = "HTTP/1.1 %s %s" % (response.status, response.reason)
for k, v in response.getheaders():
result += '\r\n%s: %s' % (k, v)
if body:
result += '\r\n\r\n'
result += body
result += '\r\n'
return result
class MyFactory(Factory):
protocol = LineOnlyReceiver
print(__doc__)
reactor.listenTCP(8888, MyFactory())
reactor.run()

View File

@@ -1,38 +0,0 @@
"""Port forwarder
USAGE: twisted_portforward.py local_port remote_host remote_port"""
import sys
from twisted.internet import reactor
from eventlet.twistedutil import join_reactor
from eventlet.twistedutil.protocol import GreenClientCreator, SpawnFactory, UnbufferedTransport
from eventlet import proc
def forward(source, dest):
try:
while True:
x = source.recv()
if not x:
break
print('forwarding %s bytes' % len(x))
dest.write(x)
finally:
dest.loseConnection()
def handler(local):
client = str(local.getHost())
print('accepted connection from %s' % client)
remote = GreenClientCreator(reactor, UnbufferedTransport).connectTCP(remote_host, remote_port)
a = proc.spawn(forward, remote, local)
b = proc.spawn(forward, local, remote)
proc.waitall([a, b], trap_errors=True)
print('closed connection to %s' % client)
try:
local_port, remote_host, remote_port = sys.argv[1:]
except ValueError:
sys.exit(__doc__)
local_port = int(local_port)
remote_port = int(remote_port)
reactor.listenTCP(local_port, SpawnFactory(handler))
reactor.run()

View File

@@ -1,42 +0,0 @@
"""Simple chat demo application.
Listen on port 8007 and re-send all the data received to other participants.
Demonstrates how to
* plug in eventlet into a twisted application (join_reactor)
* how to use SpawnFactory to start a new greenlet for each new request.
"""
from eventlet.twistedutil import join_reactor
from eventlet.twistedutil.protocol import SpawnFactory
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
class Chat:
def __init__(self):
self.participants = []
def handler(self, conn):
peer = conn.getPeer()
print('new connection from %s' % (peer, ))
conn.write("Welcome! There're %s participants already\n" % (len(self.participants)))
self.participants.append(conn)
try:
for line in conn:
if line:
print('received from %s: %s' % (peer, line))
for buddy in self.participants:
if buddy is not conn:
buddy.sendline('from %s: %s' % (peer, line))
except Exception as ex:
print(peer, ex)
else:
print(peer, 'connection done')
finally:
conn.loseConnection()
self.participants.remove(conn)
print(__doc__)
chat = Chat()
from twisted.internet import reactor
reactor.listenTCP(8007, SpawnFactory(chat.handler, LineOnlyReceiverTransport))
reactor.run()

View File

@@ -1,35 +0,0 @@
from twisted.internet import reactor
from twisted.names.srvconnect import SRVConnector
from gnutls.interfaces.twisted import X509Credentials
from eventlet.twistedutil.protocol import GreenClientCreator
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
class NoisySRVConnector(SRVConnector):
def pickServer(self):
host, port = SRVConnector.pickServer(self)
print('Resolved _%s._%s.%s --> %s:%s' %
(self.service, self.protocol, self.domain, host, port))
return host, port
cred = X509Credentials(None, None)
creator = GreenClientCreator(reactor, LineOnlyReceiverTransport)
conn = creator.connectSRV('msrps', 'ag-projects.com',
connectFuncName='connectTLS', connectFuncArgs=(cred,),
ConnectorClass=NoisySRVConnector)
request = """MSRP 49fh AUTH
To-Path: msrps://alice@intra.example.com;tcp
From-Path: msrps://alice.example.com:9892/98cjs;tcp
-------49fh$
""".replace('\n', '\r\n')
print('Sending:\n%s' % request)
conn.write(request)
print('Received:')
for x in conn:
print(repr(x))
if '-------' in x:
break

View File

@@ -1,33 +0,0 @@
from twisted.internet.protocol import Factory
from twisted.internet import reactor
from twisted.protocols import basic
from xcaplib.green import XCAPClient
from eventlet.twistedutil import deferToGreenThread
from eventlet.twistedutil import join_reactor
class LineOnlyReceiver(basic.LineOnlyReceiver):
def lineReceived(self, line):
print('received: %r' % line)
if not line:
return
app, context, node = (line + ' ').split(' ', 3)
context = {'u': 'users', 'g': 'global'}.get(context, context)
d = deferToGreenThread(client._get, app, node, globaltree=context == 'global')
def callback(result):
self.transport.write(str(result))
def errback(error):
self.transport.write(error.getTraceback())
d.addCallback(callback)
d.addErrback(errback)
class MyFactory(Factory):
protocol = LineOnlyReceiver
client = XCAPClient('https://xcap.sipthor.net/xcap-root', 'alice@example.com', '123')
reactor.listenTCP(8007, MyFactory())
reactor.run()

View File

@@ -82,17 +82,6 @@ def skip_unless(condition):
return skipped_wrapper
def requires_twisted(func):
""" Decorator that skips a test if Twisted is not present."""
def requirement(_f):
from eventlet.hubs import get_hub
try:
return 'Twisted' in type(get_hub()).__name__
except Exception:
return False
return skip_unless(requirement)(func)
def using_pyevent(_f):
from eventlet.hubs import get_hub
return 'pyevent' in type(get_hub()).__module__

View File

@@ -17,10 +17,8 @@ def check_hub():
for nm in 'get_readers', 'get_writers':
dct = getattr(hub, nm)()
assert not dct, "hub.%s not empty: %s" % (nm, dct)
# Stop the runloop (unless it's twistedhub which does not support that)
if not getattr(hub, 'uses_twisted_reactor', None):
hub.abort(True)
assert not hub.running
hub.abort(True)
assert not hub.running
class TestApi(TestCase):

View File

@@ -209,9 +209,6 @@ class TestExceptionInGreenthread(LimitedTestCase):
class TestHubSelection(LimitedTestCase):
def test_explicit_hub(self):
if getattr(hubs.get_hub(), 'uses_twisted_reactor', None):
# doesn't work with twisted
return
oldhub = hubs.get_hub()
try:
hubs.use_hub(Foo)

View File

@@ -1,41 +0,0 @@
from tests import requires_twisted
import unittest
try:
from twisted.internet import reactor
from twisted.internet.error import DNSLookupError
from twisted.internet import defer
from twisted.python.failure import Failure
from eventlet.twistedutil import block_on
except ImportError:
pass
class Test(unittest.TestCase):
@requires_twisted
def test_block_on_success(self):
from twisted.internet import reactor
d = reactor.resolver.getHostByName('www.google.com')
ip = block_on(d)
assert len(ip.split('.')) == 4, ip
ip2 = block_on(d)
assert ip == ip2, (ip, ip2)
@requires_twisted
def test_block_on_fail(self):
from twisted.internet import reactor
d = reactor.resolver.getHostByName('xxx')
self.assertRaises(DNSLookupError, block_on, d)
@requires_twisted
def test_block_on_already_succeed(self):
d = defer.succeed('hey corotwine')
res = block_on(d)
assert res == 'hey corotwine', repr(res)
@requires_twisted
def test_block_on_already_failed(self):
d = defer.fail(Failure(ZeroDivisionError()))
self.assertRaises(ZeroDivisionError, block_on, d)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,245 +0,0 @@
from tests import requires_twisted
import unittest
try:
from twisted.internet import reactor
from twisted.internet.error import ConnectionDone
import eventlet.twistedutil.protocol as pr
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
except ImportError:
# stub out some of the twisted dependencies so it at least imports
class dummy(object):
pass
pr = dummy()
pr.UnbufferedTransport = None
pr.GreenTransport = None
pr.GreenClientCreator = lambda *a, **k: None
class reactor(object):
pass
from eventlet import spawn, sleep, with_timeout, spawn_after
from eventlet.event import Event
try:
from eventlet.green import socket
except SyntaxError:
socket = None
DELAY = 0.01
if socket is not None:
def setup_server_socket(self, delay=DELAY, port=0):
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('127.0.0.1', port))
port = s.getsockname()[1]
s.listen(5)
s.settimeout(delay * 3)
def serve():
conn, addr = s.accept()
conn.settimeout(delay + 1)
try:
hello = conn.makefile().readline()[:-2]
except socket.timeout:
return
conn.sendall('you said %s. ' % hello)
sleep(delay)
conn.sendall('BYE')
sleep(delay)
# conn.close()
spawn(serve)
return port
def setup_server_SpawnFactory(self, delay=DELAY, port=0):
def handle(conn):
port.stopListening()
try:
hello = conn.readline()
except ConnectionDone:
return
conn.write('you said %s. ' % hello)
sleep(delay)
conn.write('BYE')
sleep(delay)
conn.loseConnection()
port = reactor.listenTCP(0, pr.SpawnFactory(handle, LineOnlyReceiverTransport))
return port.getHost().port
class TestCase(unittest.TestCase):
transportBufferSize = None
@property
def connector(self):
return pr.GreenClientCreator(reactor, self.gtransportClass, self.transportBufferSize)
@requires_twisted
def setUp(self):
port = self.setup_server()
self.conn = self.connector.connectTCP('127.0.0.1', port)
if self.transportBufferSize is not None:
self.assertEqual(self.transportBufferSize, self.conn.transport.bufferSize)
class TestUnbufferedTransport(TestCase):
gtransportClass = pr.UnbufferedTransport
setup_server = setup_server_SpawnFactory
@requires_twisted
def test_full_read(self):
self.conn.write('hello\r\n')
self.assertEqual(self.conn.read(), 'you said hello. BYE')
self.assertEqual(self.conn.read(), '')
self.assertEqual(self.conn.read(), '')
@requires_twisted
def test_iterator(self):
self.conn.write('iterator\r\n')
self.assertEqual('you said iterator. BYE', ''.join(self.conn))
class TestUnbufferedTransport_bufsize1(TestUnbufferedTransport):
transportBufferSize = 1
setup_server = setup_server_SpawnFactory
class TestGreenTransport(TestUnbufferedTransport):
gtransportClass = pr.GreenTransport
setup_server = setup_server_SpawnFactory
@requires_twisted
def test_read(self):
self.conn.write('hello\r\n')
self.assertEqual(self.conn.read(9), 'you said ')
self.assertEqual(self.conn.read(999), 'hello. BYE')
self.assertEqual(self.conn.read(9), '')
self.assertEqual(self.conn.read(1), '')
self.assertEqual(self.conn.recv(9), '')
self.assertEqual(self.conn.recv(1), '')
@requires_twisted
def test_read2(self):
self.conn.write('world\r\n')
self.assertEqual(self.conn.read(), 'you said world. BYE')
self.assertEqual(self.conn.read(), '')
self.assertEqual(self.conn.recv(), '')
@requires_twisted
def test_iterator(self):
self.conn.write('iterator\r\n')
self.assertEqual('you said iterator. BYE', ''.join(self.conn))
_tests = [x for x in locals().keys() if x.startswith('test_')]
@requires_twisted
def test_resume_producing(self):
for test in self._tests:
self.setUp()
self.conn.resumeProducing()
getattr(self, test)()
@requires_twisted
def test_pause_producing(self):
self.conn.pauseProducing()
self.conn.write('hi\r\n')
result = with_timeout(DELAY * 10, self.conn.read, timeout_value='timed out')
self.assertEqual('timed out', result)
@requires_twisted
def test_pauseresume_producing(self):
self.conn.pauseProducing()
spawn_after(DELAY * 5, self.conn.resumeProducing)
self.conn.write('hi\r\n')
result = with_timeout(DELAY * 10, self.conn.read, timeout_value='timed out')
self.assertEqual('you said hi. BYE', result)
class TestGreenTransport_bufsize1(TestGreenTransport):
transportBufferSize = 1
# class TestGreenTransportError(TestCase):
# setup_server = setup_server_SpawnFactory
# gtransportClass = pr.GreenTransport
#
# def test_read_error(self):
# self.conn.write('hello\r\n')
# sleep(DELAY*1.5) # make sure the rest of data arrives
# try:
# 1//0
# except:
# self.conn.loseConnection(failure.Failure()) # does not work, why?
# spawn(self.conn._queue.send_exception, *sys.exc_info())
# self.assertEqual(self.conn.read(9), 'you said ')
# self.assertEqual(self.conn.read(7), 'hello. ')
# self.assertEqual(self.conn.read(9), 'BYE')
# self.assertRaises(ZeroDivisionError, self.conn.read, 9)
# self.assertEqual(self.conn.read(1), '')
# self.assertEqual(self.conn.read(1), '')
#
# def test_recv_error(self):
# self.conn.write('hello')
# self.assertEqual('you said hello. ', self.conn.recv())
# sleep(DELAY*1.5) # make sure the rest of data arrives
# try:
# 1//0
# except:
# self.conn.loseConnection(failure.Failure()) # does not work, why?
# spawn(self.conn._queue.send_exception, *sys.exc_info())
# self.assertEqual('BYE', self.conn.recv())
# self.assertRaises(ZeroDivisionError, self.conn.recv, 9)
# self.assertEqual('', self.conn.recv(1))
# self.assertEqual('', self.conn.recv())
#
if socket is not None:
class TestUnbufferedTransport_socketserver(TestUnbufferedTransport):
setup_server = setup_server_socket
class TestUnbufferedTransport_socketserver_bufsize1(TestUnbufferedTransport):
transportBufferSize = 1
setup_server = setup_server_socket
class TestGreenTransport_socketserver(TestGreenTransport):
setup_server = setup_server_socket
class TestGreenTransport_socketserver_bufsize1(TestGreenTransport):
transportBufferSize = 1
setup_server = setup_server_socket
class TestTLSError(unittest.TestCase):
@requires_twisted
def test_server_connectionMade_never_called(self):
# trigger case when protocol instance is created,
# but it's connectionMade is never called
from gnutls.interfaces.twisted import X509Credentials
from gnutls.errors import GNUTLSError
cred = X509Credentials(None, None)
ev = Event()
def handle(conn):
ev.send("handle must not be called")
s = reactor.listenTLS(0, pr.SpawnFactory(handle, LineOnlyReceiverTransport), cred)
creator = pr.GreenClientCreator(reactor, LineOnlyReceiverTransport)
try:
conn = creator.connectTLS('127.0.0.1', s.getHost().port, cred)
except GNUTLSError:
pass
assert ev.poll() is None, repr(ev.poll())
try:
import gnutls.interfaces.twisted
except ImportError:
del TestTLSError
@requires_twisted
def main():
unittest.main()
if __name__ == '__main__':
main()