implemented cancel_timers for twisted hub; added few methods useful in tests - get_readers/writers, running property
This commit is contained in:
@@ -1,7 +1,12 @@
|
|||||||
import threading
|
import threading
|
||||||
|
import weakref
|
||||||
|
from eventlet.hubs.hub import _g_debug
|
||||||
from eventlet import greenlib
|
from eventlet import greenlib
|
||||||
from eventlet.support.greenlet import greenlet
|
from eventlet.support.greenlet import greenlet
|
||||||
|
|
||||||
|
from functools import wraps
|
||||||
|
import traceback
|
||||||
|
|
||||||
class socket_rwdescriptor:
|
class socket_rwdescriptor:
|
||||||
#implements(IReadWriteDescriptor)
|
#implements(IReadWriteDescriptor)
|
||||||
|
|
||||||
@@ -42,6 +47,7 @@ class BaseTwistedHub(object):
|
|||||||
def __init__(self, mainloop_greenlet):
|
def __init__(self, mainloop_greenlet):
|
||||||
self.greenlet = mainloop_greenlet
|
self.greenlet = mainloop_greenlet
|
||||||
self.waiters_by_greenlet = {}
|
self.waiters_by_greenlet = {}
|
||||||
|
self.timers_by_greenlet = {}
|
||||||
|
|
||||||
def switch(self):
|
def switch(self):
|
||||||
assert greenlet.getcurrent() is not self.greenlet, 'Impossible to switch() from the mainloop greenlet'
|
assert greenlet.getcurrent() is not self.greenlet, 'Impossible to switch() from the mainloop greenlet'
|
||||||
@@ -61,9 +67,8 @@ class BaseTwistedHub(object):
|
|||||||
self.switch()
|
self.switch()
|
||||||
|
|
||||||
def add_descriptor(self, fileno, read=None, write=None, exc=None):
|
def add_descriptor(self, fileno, read=None, write=None, exc=None):
|
||||||
#print 'add_descriptor', fileno, read, write, exc
|
|
||||||
descriptor = socket_rwdescriptor(fileno, read, write, exc)
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
|
descriptor = socket_rwdescriptor(fileno, read, write, exc)
|
||||||
if read:
|
if read:
|
||||||
reactor.addReader(descriptor)
|
reactor.addReader(descriptor)
|
||||||
if write:
|
if write:
|
||||||
@@ -88,13 +93,96 @@ class BaseTwistedHub(object):
|
|||||||
def exc_descriptor(self, _fileno):
|
def exc_descriptor(self, _fileno):
|
||||||
pass # XXX do something sensible here
|
pass # XXX do something sensible here
|
||||||
|
|
||||||
# required by greenlet_body
|
|
||||||
def cancel_timers(self, greenlet, quiet=False):
|
|
||||||
pass # XXX do something sensible here
|
|
||||||
|
|
||||||
def schedule_call(self, seconds, func, *args, **kwargs):
|
def schedule_call(self, seconds, func, *args, **kwargs):
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
return reactor.callLater(seconds, func, *args, **kwargs)
|
@wraps(func)
|
||||||
|
def wrap(*args, **kwargs):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
finally:
|
||||||
|
self.timer_finished(timer)
|
||||||
|
timer = reactor.callLater(seconds, wrap, *args, **kwargs)
|
||||||
|
if seconds:
|
||||||
|
self.track_timer(timer)
|
||||||
|
return timer
|
||||||
|
|
||||||
|
def track_timer(self, timer):
|
||||||
|
try:
|
||||||
|
current_greenlet = greenlet.getcurrent()
|
||||||
|
timer.greenlet = current_greenlet
|
||||||
|
self.timers_by_greenlet.setdefault(
|
||||||
|
current_greenlet,
|
||||||
|
weakref.WeakKeyDictionary())[timer] = True
|
||||||
|
except:
|
||||||
|
print 'track_timer failed'
|
||||||
|
traceback.print_exc()
|
||||||
|
raise
|
||||||
|
|
||||||
|
def timer_finished(self, timer):
|
||||||
|
try:
|
||||||
|
greenlet = timer.greenlet
|
||||||
|
del self.timers_by_greenlet[greenlet][timer]
|
||||||
|
if not self.timers_by_greenlet[greenlet]:
|
||||||
|
del self.timers_by_greenlet[greenlet]
|
||||||
|
except (AttributeError, KeyError):
|
||||||
|
pass
|
||||||
|
except:
|
||||||
|
print 'timer_finished failed'
|
||||||
|
traceback.print_exc()
|
||||||
|
raise
|
||||||
|
|
||||||
|
def cancel_timers(self, greenlet, quiet=False):
|
||||||
|
try:
|
||||||
|
if greenlet not in self.timers_by_greenlet:
|
||||||
|
return
|
||||||
|
for timer in self.timers_by_greenlet[greenlet].keys():
|
||||||
|
if not timer.cancelled and not timer.called and hasattr(timer, 'greenlet'):
|
||||||
|
## If timer.seconds is 0, this isn't a timer, it's
|
||||||
|
## actually eventlet's silly way of specifying whether
|
||||||
|
## a coroutine is "ready to run" or not.
|
||||||
|
## TwistedHub: I do the same, by not attaching 'greenlet' attribute to zero-timers QQQ
|
||||||
|
try:
|
||||||
|
# this might be None due to weirdness with weakrefs
|
||||||
|
timer.cancel()
|
||||||
|
except TypeError:
|
||||||
|
pass
|
||||||
|
if _g_debug and not quiet:
|
||||||
|
print 'Hub cancelling left-over timer %s' % timer
|
||||||
|
try:
|
||||||
|
del self.timers_by_greenlet[greenlet]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
except:
|
||||||
|
print 'cancel_timers failed'
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
if not quiet:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def abort(self):
|
||||||
|
from twisted.internet import reactor
|
||||||
|
reactor.crash()
|
||||||
|
|
||||||
|
# for test cases:
|
||||||
|
|
||||||
|
def get_readers(self):
|
||||||
|
from twisted.internet import reactor
|
||||||
|
readers = reactor.getReaders()
|
||||||
|
readers.remove(getattr(reactor, 'waker'))
|
||||||
|
return readers
|
||||||
|
|
||||||
|
def get_writers(self):
|
||||||
|
from twisted.internet import reactor
|
||||||
|
return reactor.getWriters()
|
||||||
|
|
||||||
|
def get_excs(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def running(self):
|
||||||
|
from twisted.internet import reactor
|
||||||
|
return reactor.running
|
||||||
|
|
||||||
|
|
||||||
class TwistedHub(BaseTwistedHub):
|
class TwistedHub(BaseTwistedHub):
|
||||||
# wrapper around reactor that runs reactor's main loop in a separate greenlet.
|
# wrapper around reactor that runs reactor's main loop in a separate greenlet.
|
||||||
|
Reference in New Issue
Block a user