added BaseTwistedHubm which can be used with twisted's own mainloop (i.e. reactor.run())

This commit is contained in:
Denis Bilenko
2008-10-29 13:28:47 +06:00
parent 1d2d8981d6
commit 6baf93e434

View File

@@ -32,7 +32,71 @@ class socket_rwdescriptor:
return self.logstr return self.logstr
class TwistedHub(object): class BaseTwistedHub(object):
"""This hub does not run a dedicated greenlet for the mainloop (unlike TwistedHub).
Instead, it assumes that the mainloop is run in the main greenlet.
This makes running "green" functions in the main greenlet impossible but is useful
when you want to run reactor.run() yourself.
"""
def __init__(self, mainloop_greenlet):
self.greenlet = mainloop_greenlet
self.waiters_by_greenlet = {}
def switch(self):
assert greenlet.getcurrent() is not self.greenlet, 'Impossible to switch() from the mainloop greenlet'
try:
greenlet.getcurrent().parent = self.greenlet
except ValueError, ex:
pass
return greenlib.switch(self.greenlet)
def stop(self):
from twisted.internet import reactor
reactor.stop()
def sleep(self, seconds=0):
from twisted.internet import reactor
d = reactor.callLater(seconds, greenlib.switch, greenlet.getcurrent())
self.switch()
def add_descriptor(self, fileno, read=None, write=None, exc=None):
#print 'add_descriptor', fileno, read, write, exc
descriptor = socket_rwdescriptor(fileno, read, write, exc)
from twisted.internet import reactor
if read:
reactor.addReader(descriptor)
if write:
reactor.addWriter(descriptor)
# XXX exc will not work if no read nor write
self.waiters_by_greenlet[greenlet.getcurrent()] = descriptor
return descriptor
def remove_descriptor(self, descriptor):
from twisted.internet import reactor
reactor.removeReader(descriptor)
reactor.removeWriter(descriptor)
self.waiters_by_greenlet.pop(greenlet.getcurrent(), None)
def exc_greenlet(self, gr, exception_object):
fileno = self.waiters_by_greenlet.pop(gr, None)
if fileno is not None:
self.remove_descriptor(fileno)
greenlib.switch(gr, None, exception_object)
# required by GreenSocket
def exc_descriptor(self, _fileno):
pass # XXX do something sensible here
# required by greenlet_body
def cancel_timers(self, greenlet, quiet=False):
pass # XXX do something sensible here
def schedule_call(self, seconds, func, *args, **kwargs):
from twisted.internet import reactor
return reactor.callLater(seconds, func, *args, **kwargs)
class TwistedHub(BaseTwistedHub):
# wrapper around reactor that runs reactor's main loop in a separate greenlet. # wrapper around reactor that runs reactor's main loop in a separate greenlet.
# whenever you need to wait, i.e. inside a call that must appear # whenever you need to wait, i.e. inside a call that must appear
# blocking, call hub.switch() (then your blocking operation should switch back to you # blocking, call hub.switch() (then your blocking operation should switch back to you
@@ -53,7 +117,8 @@ class TwistedHub(object):
def __init__(self): def __init__(self):
assert Hub.state==0, ('This hub can only be instantiated once', Hub.state) assert Hub.state==0, ('This hub can only be instantiated once', Hub.state)
Hub.state = 1 Hub.state = 1
self.greenlet = None make_twisted_threadpool_daemonic() # otherwise the program would hang after the main greenlet exited
BaseTwistedHub.__init__(self, None)
def switch(self): def switch(self):
if not self.greenlet: if not self.greenlet:
@@ -110,42 +175,38 @@ class TwistedHub(object):
t = reactor.running and t2 t = reactor.running and t2
reactor.doIteration(t) reactor.doIteration(t)
def stop(self): def running_greenlets(self):
from twisted.internet import reactor res = []
reactor.stop() for g in greenlib.tracked_greenlets():
if g is self.greenlet:
continue
if hasattr(self.greenlet, 'parent') and g is self.greenlet.parent:
continue
res.append(g)
def sleep(self, seconds=0): def join(self, lst, timeout=None):
from twisted.internet import reactor """Wait for other greenlets to finish"""
d = reactor.callLater(seconds, greenlib.switch, greenlet.getcurrent()) waiting = [1]
self.switch() # if timeout is not None and self.running_greenlets():
# def stop():
def add_descriptor(self, fileno, read=None, write=None, exc=None): # waiting[0] = 0
#print 'add_descriptor', fileno, read, write, exc # self.schedule_call(timeout, stop)
descriptor = socket_rwdescriptor(fileno, read, write, exc) while True:
from twisted.internet import reactor print 'WHILE!'
if read: lst = [x for x in lst if not x.dead]
reactor.addReader(descriptor) print 'WHILE!', lst
if write: if not lst:
reactor.addWriter(descriptor) break # XXX collect return values
# XXX exc will not work if no read nor write print 'WHILE! - before switch', lst[0], lst[0].parent
return descriptor print 'WHILE! - before switch', greenlet.getcurrent(), greenlet.getcurrent().parent
print 'WHILE! - before switch', self.greenlet, getattr(self.greenlet, 'parent', '-')
def remove_descriptor(self, descriptor): for x in lst:
from twisted.internet import reactor x.parent = greenlet.getcurrent()
reactor.removeReader(descriptor) print 'AWHILE! - before switch', lst[0], lst[0].parent
reactor.removeWriter(descriptor) print 'AWHILE! - before switch', greenlet.getcurrent(), greenlet.getcurrent().parent
print 'AWHILE! - before switch', self.greenlet, getattr(self.greenlet, 'parent', '-')
# required by GreenSocket res = self.switch()
def exc_descriptor(self, _fileno): print 'res=%r' % res
pass # XXX do something sensible here
# required by greenlet_body
def cancel_timers(self, greenlet, quiet=False):
pass # XXX do something sensible here
def schedule_call(self, seconds, func, *args, **kwargs):
from twisted.internet import reactor
return reactor.callLater(seconds, func, *args, **kwargs)
Hub = TwistedHub Hub = TwistedHub
@@ -158,4 +219,4 @@ def make_twisted_threadpool_daemonic():
if ThreadPool.threadFactory != DaemonicThread: if ThreadPool.threadFactory != DaemonicThread:
ThreadPool.threadFactory = DaemonicThread ThreadPool.threadFactory = DaemonicThread
make_twisted_threadpool_daemonic() # otherwise the program would hang after the main greenlet exited